]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd updates and bugfixes
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 14 Aug 2007 20:21:06 +0000 (20:21 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 14 Aug 2007 20:21:06 +0000 (20:21 +0000)
* MOSDPGActivateSet for faster startup
* "clean" osd (shut)down
* fixed some pg log/attribute bugs, strengthened invariant
* fixed project_pg_history (and subsequent assert(from == pg->acting[0]) bug)

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1634 29311d96-e01e-0410-9327-a35deaab8ce9

17 files changed:
branches/sage/mds/TODO
branches/sage/mds/client/SyntheticClient.cc
branches/sage/mds/common/Timer.cc
branches/sage/mds/config.cc
branches/sage/mds/config.h
branches/sage/mds/messages/MOSDPGActivateSet.h [new file with mode: 0644]
branches/sage/mds/messages/MOSDPGLog.h
branches/sage/mds/mon/ClientMonitor.cc
branches/sage/mds/mon/OSDMonitor.cc
branches/sage/mds/msg/Message.cc
branches/sage/mds/msg/Message.h
branches/sage/mds/osd/OSD.cc
branches/sage/mds/osd/OSD.h
branches/sage/mds/osd/OSDMap.h
branches/sage/mds/osd/PG.cc
branches/sage/mds/osd/PG.h
branches/sage/mds/osdc/Objecter.cc

index e054faa939406f89a3b63d14dd1b4590d15d5d85..b2e8df3e69553e67b09b7fa10c1f81f0019e90b6 100644 (file)
@@ -36,10 +36,6 @@ general kernel planning
 - soft consistency on (kernel) lookup?
 - accurate reconstruction of (syscall) path?
 
-software raid layer for EBOFS?
-- actually, we just need software raid _awareness_ in the allocator, so
-  that we can write only full stripes, without fear of clobbering things on
-  failure.  then use MD or similar layer provided by kernel.
 
 
 sage doc
index 52dba42418c14a6b9753fdb18d190e73bce75792..6e8b2da099f23ac77ed3d4d23b5ce6e2beb39370 100644 (file)
@@ -2281,7 +2281,7 @@ void SyntheticClient::import_find(const char *base, const char *find, bool data)
       } else {
        int fd = client->open(f.c_str(), O_WRONLY|O_CREAT);
        assert(fd > 0); 
-       client->write(fd, " ", 1, size-1);
+       client->write(fd, "", 0, size);
        client->close(fd);
 
        client->chmod(f.c_str(), mode & 0777);
index 1ddf5d18e8bbfda7bd2ddb7488e7adf4a6d08b1a..0681e4f4f1324b611afac211378972202a4ba2ae 100644 (file)
@@ -22,8 +22,8 @@
 #include "include/Context.h"
 
 #undef dout
-#define dout(x)  if (x <= g_conf.debug) cout << g_clock.now() << " TIMER "
-#define derr(x)  if (x <= g_conf.debug) cerr << g_clock.now() << " TIMER "
+#define dout(x)  if (x <= g_conf.debug_timer) cout << g_clock.now() << " TIMER "
+#define derr(x)  if (x <= g_conf.debug_timer) cerr << g_clock.now() << " TIMER "
 
 #define DBL 10
 
index d8a789ea001bcb8393c75077565925f2ae2a3e41..f4cae3d1522b76c421ba02a0318d80e62e023eb2 100644 (file)
@@ -87,6 +87,7 @@ md_config_t g_conf = {
   debug_mds_log: 1,
   debug_mds_migrator: 1,
   debug_buffer: 0,
+  debug_timer: 0,
   debug_filer: 0,
   debug_objecter: 0,
   debug_journaler: 0,
@@ -538,6 +539,11 @@ void parse_config_options(std::vector<char*>& args)
         g_conf.debug_buffer = atoi(args[++i]);
       else 
         g_debug_after_conf.debug_buffer = atoi(args[++i]);
+    else if (strcmp(args[i], "--debug_timer") == 0) 
+      if (!g_conf.debug_after) 
+        g_conf.debug_timer = atoi(args[++i]);
+      else 
+        g_debug_after_conf.debug_timer = atoi(args[++i]);
     else if (strcmp(args[i], "--debug_filer") == 0) 
       if (!g_conf.debug_after) 
         g_conf.debug_filer = atoi(args[++i]);
index 9076092aba7399ebf6c11de3fe4f49c61bc787f6..7babeb3340b8df47ae2574a57bd2a51d5af2f561 100644 (file)
@@ -73,6 +73,7 @@ struct md_config_t {
   int debug_mds_log;
   int debug_mds_migrator;
   int debug_buffer;
+  int debug_timer;
   int debug_filer;
   int debug_objecter;
   int debug_journaler;
diff --git a/branches/sage/mds/messages/MOSDPGActivateSet.h b/branches/sage/mds/messages/MOSDPGActivateSet.h
new file mode 100644 (file)
index 0000000..cdee799
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MOSDPGACTIVATESET_H
+#define __MOSDPGACTIVATESET_H
+
+#include "msg/Message.h"
+
+class MOSDPGActivateSet : public Message {
+  epoch_t epoch;
+
+public:
+  list<PG::Info> pg_info;
+
+  epoch_t get_epoch() { return epoch; }
+
+  MOSDPGActivateSet() {}
+  MOSDPGActivateSet(version_t mv) :
+    Message(MSG_OSD_PG_ACTIVATE_SET),
+    epoch(mv) { }
+
+  char *get_type_name() { return "pg_activate_set"; }
+  void print(ostream& out) {
+    out << "pg_activate_set(" << pg_info.size() << " pgs e" << epoch << ")";
+  }
+
+  void encode_payload() {
+    ::_encode(epoch, payload);
+    ::_encode(pg_info, payload);
+  }
+  void decode_payload() {
+    int off = 0;
+    ::_decode(epoch, payload, off);
+    ::_decode(pg_info, payload, off);
+  }
+};
+
+#endif
index 950c7df7eb1b27fd4e0a158809d36ffa25fede77..653bb9f10570cda2939649792766a73c6deee50f 100644 (file)
@@ -20,7 +20,6 @@
 
 class MOSDPGLog : public Message {
   epoch_t epoch;
-  pg_t    pgid;
 
 public:
   PG::Info info;
@@ -28,23 +27,20 @@ public:
   PG::Missing missing;
 
   epoch_t get_epoch() { return epoch; }
-  pg_t get_pgid() { return pgid; }
+  pg_t get_pgid() { return info.pgid; }
 
   MOSDPGLog() {}
-  MOSDPGLog(version_t mv, pg_t pgid) :
-    Message(MSG_OSD_PG_LOG) {
-    this->epoch = mv;
-    this->pgid = pgid;
-  }
+  MOSDPGLog(version_t mv, PG::Info& i) :
+    Message(MSG_OSD_PG_LOG),
+    epoch(mv), info(i) { }
 
   char *get_type_name() { return "PGlog"; }
   void print(ostream& out) {
-    out << "pg_log(" << pgid << " e" << epoch << ")";
+    out << "pg_log(" << info.pgid << " e" << epoch << ")";
   }
 
   void encode_payload() {
     payload.append((char*)&epoch, sizeof(epoch));
-    payload.append((char*)&pgid, sizeof(pgid));
     payload.append((char*)&info, sizeof(info));
     log._encode(payload);
     missing._encode(payload);
@@ -53,8 +49,6 @@ public:
     int off = 0;
     payload.copy(off, sizeof(epoch), (char*)&epoch);
     off += sizeof(epoch);
-    payload.copy(off, sizeof(pgid), (char*)&pgid);
-    off += sizeof(pgid);
     payload.copy(off, sizeof(info), (char*)&info);
     off += sizeof(info);
     log._decode(payload, off);
index 018cbcadc6bf9e60c2ffccdc540d873120628903..2dc5d27672b114ee200ad8de093c7fd82a1c45d8 100644 (file)
@@ -164,13 +164,16 @@ bool ClientMonitor::prepare_update(Message *m)
        client = mount->get_source().num();
 
       // choose a client id
-      if (client < 0 || 
-         (client_map.client_addr.count(client) && 
-          client_map.client_addr[client] != addr)) {
+      if (client < 0) {
        client = pending_inc.next_client;
        dout(10) << "mount: assigned client" << client << " to " << addr << endl;
       } else {
        dout(10) << "mount: client" << client << " requested by " << addr << endl;
+       if (client_map.client_addr.count(client)) {
+         assert(client_map.client_addr[client] != addr);
+         dout(0) << "mount: WARNING: client" << client << " requested by " << addr
+                 << ", which used to be " << client_map.client_addr[client] << endl;
+       }
       }
       
       pending_inc.add_mount(client, addr);
index 659a704b6aaa7ecb9504f8cec21c118c568ca053..8c642514c9de149a14785ae74706ec6ecc849e7b 100644 (file)
@@ -57,7 +57,8 @@ void OSDMonitor::fake_osd_failure(int osd, bool down)
 {
   if (down) {
     dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
-    pending_inc.new_down[osd] = osdmap.osd_inst[osd];
+    pending_inc.new_down[osd].first = osdmap.osd_inst[osd];
+    pending_inc.new_down[osd].second = false;
   } else {
     dout(1) << "fake_osd_failure OUT osd" << osd << endl;
     pending_inc.new_out.push_back(osd);
@@ -139,7 +140,7 @@ void OSDMonitor::create_initial()
     for (int dom=0; dom<ndom; dom++) {
       for (int j=0; j<nper; j++) {
        newmap.osds.insert(i);
-       newmap.down_osds.insert(i); // initially DOWN
+       newmap.down_osds[i] = true; // initially DOWN
        domain[dom]->add_item(i, 1.0);
        //cerr << "osd" << i << " in domain " << dom << endl;
        i++;
@@ -189,7 +190,7 @@ void OSDMonitor::create_initial()
     int root = newmap.crush.add_bucket(b);
     for (int i=0; i<g_conf.num_osd; i++) {
       newmap.osds.insert(i);
-      newmap.down_osds.insert(i);
+      newmap.down_osds[i] = true;
       b->add_item(i, 1.0);
     }
     
@@ -214,7 +215,7 @@ void OSDMonitor::create_initial()
     // add mds osds, but don't put them in the crush mapping func
     for (int i=0; i<g_conf.num_mds; i++) {
       newmap.osds.insert(i+10000);
-      newmap.down_osds.insert(i+10000);
+      newmap.down_osds[i+10000] = true;
     }
   }
   
@@ -303,12 +304,12 @@ void OSDMonitor::encode_pending(bufferlist &bl)
   pending_inc.mon_epoch = mon->mon_epoch;
   
   // tell me about it
-  for (map<int,entity_inst_t>::iterator i = pending_inc.new_down.begin();
+  for (map<int,pair<entity_inst_t,bool> >::iterator i = pending_inc.new_down.begin();
        i != pending_inc.new_down.end();
        i++) {
-    dout(0) << " osd" << i->first << " DOWN " << i->second << endl;
-    derr(0) << " osd" << i->first << " DOWN " << i->second << endl;
-    mon->messenger->mark_down(i->second.addr);
+    dout(0) << " osd" << i->first << " DOWN " << i->second.first << " clean=" << i->second.second << endl;
+    derr(0) << " osd" << i->first << " DOWN " << i->second.first << " clean=" << i->second.second << endl;
+    mon->messenger->mark_down(i->second.first.addr);
   }
   for (map<int,entity_inst_t>::iterator i = pending_inc.new_up.begin();
        i != pending_inc.new_up.end(); 
@@ -465,7 +466,8 @@ bool OSDMonitor::prepare_failure(MOSDFailure *m)
   assert(osdmap.is_up(badboy));
   assert(osdmap.osd_inst[badboy] == m->get_failed());
   
-  pending_inc.new_down[badboy] = m->get_failed();
+  pending_inc.new_down[badboy].first = m->get_failed();
+  pending_inc.new_down[badboy].second = false;
   
   if (osdmap.is_in(badboy))
     down_pending_out[badboy] = g_clock.now();
@@ -521,7 +523,8 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m)
     assert(osdmap.get_inst(from) != m->inst);  // preproces should have caught it
     
     // mark previous guy down
-    pending_inc.new_down[from] = osdmap.osd_inst[from];
+    pending_inc.new_down[from].first = osdmap.osd_inst[from];
+    pending_inc.new_down[from].second = false;
     
     paxos->wait_for_commit(new C_RetryMessage(this, m));
   } else {
@@ -731,7 +734,8 @@ void OSDMonitor::mark_all_down()
        it != osdmap.get_osds().end();
        it++) {
     if (osdmap.is_down(*it)) continue;
-    pending_inc.new_down[*it] = osdmap.get_inst(*it);
+    pending_inc.new_down[*it].first = osdmap.get_inst(*it);
+    pending_inc.new_down[*it].second = true;   // FIXME: am i sure it's clean? we need a proper osd shutdown sequence!
   }
 
   propose_pending();
index 4d400735713ee865384dff73d89db5797d611e3e..766f90fb41fb8668cc49013c9794f347b554ad76 100644 (file)
@@ -36,10 +36,12 @@ using namespace std;
 #include "messages/MOSDOpReply.h"
 #include "messages/MOSDMap.h"
 #include "messages/MOSDGetMap.h"
+
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGActivateSet.h"
 
 #include "messages/MClientMount.h"
 #include "messages/MClientUnmount.h"
@@ -192,6 +194,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_OSD_PG_REMOVE:
     m = new MOSDPGRemove();
     break;
+  case MSG_OSD_PG_ACTIVATE_SET:
+    m = new MOSDPGActivateSet();
+    break;
 
     // clients
   case MSG_CLIENT_MOUNT:
index 693b0ba7f628c69e1c73c23f20d9d1a82972bd9d..b6c2873804b8d55b80c5e4bb56357a78a5e34314 100644 (file)
@@ -63,6 +63,7 @@
 #define MSG_OSD_PG_SUMMARY     52
 #define MSG_OSD_PG_LOG         53
 #define MSG_OSD_PG_REMOVE      54
+#define MSG_OSD_PG_ACTIVATE_SET 55
 
 // -- client --
 // to monitor
index 6285c0b340203d118e303af4eda276ccacb91288..8fe67d180bea1ddfc0c3be5ea1f3db66d8340ea8 100644 (file)
@@ -54,6 +54,7 @@
 #include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGActivateSet.h"
 
 #include "common/Logger.h"
 #include "common/LogType.h"
@@ -437,7 +438,6 @@ void OSD::_remove_unlock_pg(PG *pg)
 
   // remove from map
   pg_map.erase(pgid);
-  pg->put();   
 
   // unlock, and probably delete
   pg->put_unlock();     // will delete, if last reference
@@ -481,16 +481,14 @@ void OSD::load_pgs()
  * check epochs starting from start to verify the pg acting set hasn't changed
  * up until now
  */
-void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
+void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from,
+                            vector<int>& last)
 {
   dout(15) << "project_pg_history " << pgid
            << " from " << from << " to " << osdmap->get_epoch()
            << ", start " << h
            << dendl;
 
-  vector<int> last;
-  osdmap->pg_to_acting_osds(pgid, last);
-
   for (epoch_t e = osdmap->get_epoch()-1;
        e >= from;
        e--) {
@@ -503,7 +501,7 @@ void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
 
     // acting set change?
     if (acting != last && 
-        e <= h.same_since) {
+        e > h.same_since) {
       dout(15) << "project_pg_history " << pgid << " changed in " << e+1 
                 << " from " << acting << " -> " << last << dendl;
       h.same_since = e+1;
@@ -511,7 +509,7 @@ void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
 
     // primary change?
     if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
-        e <= h.same_primary_since) {
+        e > h.same_primary_since) {
       dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << dendl;
       h.same_primary_since = e+1;
     
@@ -522,7 +520,7 @@ void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
     // acker change?
     if (g_conf.osd_rep != OSD_REP_PRIMARY) {
       if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
-          e <= h.same_acker_since) {
+          e > h.same_acker_since) {
         dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << dendl;
         h.same_acker_since = e+1;
       }
@@ -781,6 +779,9 @@ void OSD::dispatch(Message *m)
       case MSG_OSD_PG_REMOVE:
         handle_pg_remove((MOSDPGRemove*)m);
         break;
+      case MSG_OSD_PG_ACTIVATE_SET:
+        handle_pg_activate_set((MOSDPGActivateSet*)m);
+        break;
 
       case MSG_OSD_OP:
         handle_op((MOSDOp*)m);
@@ -1004,12 +1005,12 @@ void OSD::handle_osd_map(MOSDMap *m)
       t.write( get_osdmap_object_name(cur+1), 0, bl.length(), bl);
 
       // notify messenger
-      for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
+      for (map<int,pair<entity_inst_t,bool> >::iterator i = inc.new_down.begin();
            i != inc.new_down.end();
            i++) {
         int osd = i->first;
         if (osd == whoami) continue;
-        messenger->mark_down(i->second.addr);
+        messenger->mark_down(i->second.first.addr);
         peer_map_epoch.erase(MSG_ADDR_OSD(osd));
       
         // kick any replica ops
@@ -1129,6 +1130,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          pg->info.history.same_since = 
          pg->info.history.same_primary_since = 
            pg->info.history.same_acker_since = osdmap->get_epoch();
+       pg->write_log(t);
        pg->activate(t);
 
        dout(7) << "created " << *pg << dendl;
@@ -1150,6 +1152,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          pg->info.history.same_primary_since = 
          pg->info.history.same_acker_since = 
          pg->info.history.same_since = osdmap->get_epoch();
+       pg->write_log(t);
        pg->activate(t);
        
        dout(7) << "created " << *pg << dendl;
@@ -1176,6 +1179,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          pg->info.history.same_since = 
          pg->info.history.same_primary_since = 
            pg->info.history.same_acker_since = osdmap->get_epoch();
+       pg->write_log(t);
        pg->activate(t);
 
        dout(7) << "created " << *pg << dendl;
@@ -1197,6 +1201,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          pg->info.history.same_primary_since = 
          pg->info.history.same_acker_since = 
          pg->info.history.same_since = osdmap->get_epoch();
+       pg->write_log(t);
        pg->activate(t);
        
        dout(7) << "created " << *pg << dendl;
@@ -1290,8 +1295,18 @@ void OSD::advance_map(ObjectStore::Transaction& t)
           pg->state_set(PG::STATE_STRAY);
 
           if (nrep == 0) {
-            pg->state_set(PG::STATE_CRASHED);
-            dout(1) << *pg << " is crashed" << dendl;
+           // did they all shut down cleanly?
+           bool clean = true;
+           vector<int> inset;
+           osdmap->pg_to_osds(pg->info.pgid, inset);
+           for (unsigned i=0; i<inset.size(); i++)
+             if (!osdmap->is_down_clean(inset[i])) clean = false;
+           if (clean) {
+             dout(1) << *pg << " is cleanly inactive" << dendl;
+           } else {
+             pg->state_set(PG::STATE_CRASHED);
+             dout(1) << *pg << " is crashed" << dendl;
+           }
           }
         }
         
@@ -1334,6 +1349,7 @@ void OSD::activate_map(ObjectStore::Transaction& t)
 
   map< int, list<PG::Info> >  notify_list;  // primary -> list
   map< int, map<pg_t,PG::Query> > query_map;    // peer -> PG -> get_summary_since
+  map<int,MOSDPGActivateSet*> activator_map;  // peer -> message
 
   // scan pg's
   for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
@@ -1349,24 +1365,21 @@ void OSD::activate_map(ObjectStore::Transaction& t)
     else if (pg->get_role() == 0 && !pg->is_active()) {
       // i am (inactive) primary
       pg->build_prior();
-      pg->peer(t, query_map);
+      pg->peer(t, query_map, &activator_map);
     }
     else if (pg->is_stray() &&
              pg->get_primary() >= 0) {
       // i am residual|replica
       notify_list[pg->get_primary()].push_back(pg->info);
     }
-
   }  
 
   if (osdmap->is_mkfs())    // hack: skip the queries/summaries if it's a mkfs
     return;
 
-  // notify? (residual|replica)
-  do_notifies(notify_list);
-  
-  // do queries.
+  do_notifies(notify_list);  // notify? (residual|replica)
   do_queries(query_map);
+  do_activators(activator_map);
 
   logger->set("numpg", pg_map.size());
 }
@@ -1541,6 +1554,17 @@ void OSD::do_queries(map< int, map<pg_t,PG::Query> >& query_map)
 }
 
 
+void OSD::do_activators(map<int,MOSDPGActivateSet*>& activator_map)
+{
+  for (map<int,MOSDPGActivateSet*>::iterator p = activator_map.begin();
+       p != activator_map.end();
+       ++p) 
+    messenger->send_message(p->second, osdmap->get_inst(p->first));
+  activator_map.clear();
+}
+
+
+
 
 
 /** PGNotify
@@ -1559,6 +1583,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
   
   // look for unknown PGs i'm primary for
   map< int, map<pg_t,PG::Query> > query_map;
+  map<int, MOSDPGActivateSet*> activator_map;
 
   for (list<PG::Info>::iterator it = m->get_pg_list().begin();
        it != m->get_pg_list().end();
@@ -1568,25 +1593,29 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
 
     if (!_have_pg(pgid)) {
       // same primary?
+      vector<int> acting;
+      int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+      int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
+
       PG::Info::History history = it->history;
-      project_pg_history(pgid, history, m->get_epoch());
+      project_pg_history(pgid, history, m->get_epoch(), acting);
 
       if (m->get_epoch() < history.same_primary_since) {
         dout(10) << "handle_pg_notify pg " << pgid << " dne, and primary changed in "
                  << history.same_primary_since << " (msg from " << m->get_epoch() << ")" << dendl;
         continue;
       }
+
+      assert(role == 0);  // otherwise, probably bug in project_pg_history.
       
       // ok, create PG!
       pg = _create_lock_pg(pgid, t);
-      osdmap->pg_to_acting_osds(pgid, pg->acting);
-      pg->set_role(0);
+      pg->acting.swap(acting);
+      pg->set_role(role);
       pg->info.history = history;
-
       pg->last_epoch_started_any = it->last_epoch_started;
       pg->build_prior();
-
-      t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
+      pg->write_log(t);
       
       dout(10) << *pg << " is new" << dendl;
     
@@ -1642,7 +1671,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
         pg->adjust_prior();
       
       // peer
-      pg->peer(t, query_map);
+      pg->peer(t, query_map, &activator_map);
     }
 
     pg->unlock();
@@ -1652,6 +1681,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
   assert(tr == 0);
 
   do_queries(query_map);
+  do_activators(activator_map);
   
   delete m;
 }
@@ -1666,35 +1696,30 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
  * NOTE: called with opqueue active.
  */
 
-void OSD::handle_pg_log(MOSDPGLog *m) 
-{
-  int from = m->get_source().num();
-  const pg_t pgid = m->get_pgid();
 
-  if (!require_same_or_newer_map(m, m->get_epoch())) return;
-  if (pg_map.count(pgid) == 0) {
-    dout(10) << "handle_pg_log don't have pg " << pgid << ", dropping" << dendl;
-    assert(m->get_epoch() < osdmap->get_epoch());
-    delete m;
+void OSD::_process_pg_info(epoch_t epoch, int from,
+                          PG::Info &info, 
+                          PG::Log &log, 
+                          PG::Missing &missing,
+                          map<int, MOSDPGActivateSet*>* activator_map)
+{
+  if (pg_map.count(info.pgid) == 0) {
+    dout(10) << "_process_pg_info " << info << " don't have pg" << dendl;
+    assert(epoch < osdmap->get_epoch());
     return;
   }
 
-  PG *pg = _lookup_lock_pg(pgid);
+  PG *pg = _lookup_lock_pg(info.pgid);
   assert(pg);
 
-  if (m->get_epoch() < pg->info.history.same_since) {
-    dout(10) << "handle_pg_log " << *pg 
-            << " from " << m->get_source() 
-            << " is old, discarding"
-            << dendl;
-    delete m;
+  dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
+
+  if (epoch < pg->info.history.same_since) {
+    dout(10) << *pg << " got old info " << info << ", ignoring" << dendl;
+    pg->unlock();
     return;
   }
 
-  dout(7) << "handle_pg_log " << *pg 
-          << " got " << m->log << " " << m->missing
-          << " from " << m->get_source() << dendl;
-
   //m->log.print(cout);
   
   ObjectStore::Transaction t;
@@ -1704,30 +1729,61 @@ void OSD::handle_pg_log(MOSDPGLog *m)
     assert(pg->peer_log_requested.count(from) ||
            pg->peer_summary_requested.count(from));
     
-    pg->proc_replica_log(m->log, m->missing, from);
+    pg->proc_replica_log(log, missing, from);
 
     // peer
     map< int, map<pg_t,PG::Query> > query_map;
-    pg->peer(t, query_map);
+    pg->peer(t, query_map, activator_map);
     do_queries(query_map);
 
   } else {
     // i am REPLICA
-    dout(10) << *pg << " got " << m->log << " " << m->missing << dendl;
-
     // merge log
-    pg->merge_log(m->log, m->missing, from);
-    pg->proc_missing(m->log, m->missing, from);
+    pg->merge_log(log, missing, from);
+    pg->proc_missing(log, missing, from);
     assert(pg->missing.num_lost() == 0);
 
     // ok activate!
-    pg->activate(t);
+    pg->activate(t, activator_map);
   }
 
   unsigned tr = store->apply_transaction(t);
   assert(tr == 0);
 
   pg->unlock();
+}
+
+
+void OSD::handle_pg_log(MOSDPGLog *m) 
+{
+  dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
+
+  int from = m->get_source().num();
+  if (!require_same_or_newer_map(m, m->get_epoch())) return;
+
+  _process_pg_info(m->get_epoch(), from, 
+                  m->info, m->log, m->missing, 0);
+
+  delete m;
+}
+
+void OSD::handle_pg_activate_set(MOSDPGActivateSet *m)
+{
+  dout(7) << "handle_pg_activate_set " << *m << " from " << m->get_source() << dendl;
+
+  int from = m->get_source().num();
+  if (!require_same_or_newer_map(m, m->get_epoch())) return;
+
+  PG::Log empty_log;
+  PG::Missing empty_missing;
+  map<int,MOSDPGActivateSet*> activator_map;
+
+  for (list<PG::Info>::iterator p = m->pg_info.begin();
+       p != m->pg_info.end();
+       ++p) 
+    _process_pg_info(m->get_epoch(), from, *p, empty_log, empty_missing, &activator_map);
+
+  do_activators(activator_map);
 
   delete m;
 }
@@ -1753,9 +1809,14 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
     PG *pg = 0;
 
     if (pg_map.count(pgid) == 0) {
+      // get active crush mapping
+      vector<int> acting;
+      int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+      int role = osdmap->calc_pg_role(whoami, acting, nrep);
+
       // same primary?
       PG::Info::History history = it->second.history;
-      project_pg_history(pgid, history, m->get_epoch());
+      project_pg_history(pgid, history, m->get_epoch(), acting);
 
       if (m->get_epoch() < history.same_since) {
         dout(10) << " pg " << pgid << " dne, and pg has changed in "
@@ -1763,11 +1824,6 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
         continue;
       }
 
-      // get active crush mapping
-      vector<int> acting;
-      int nrep = osdmap->pg_to_acting_osds(pgid, acting);
-      int role = osdmap->calc_pg_role(whoami, acting, nrep);
-
       if (role < 0) {
         dout(10) << " pg " << pgid << " dne, and i am not an active replica" << dendl;
         PG::Info empty(pgid);
@@ -1781,8 +1837,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
       pg->acting.swap( acting );
       pg->set_role(role);
       pg->info.history = history;
-
-      t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
+      pg->write_log(t);
       store->apply_transaction(t);
 
       dout(10) << *pg << " dne (before), but i am role " << role << dendl;
@@ -1808,8 +1863,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
       dout(10) << *pg << " sending info" << dendl;
       notify_list[from].push_back(pg->info);
     } else {
-      MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->get_pgid());
-      m->info = pg->info;
+      MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
       m->missing = pg->missing;
 
       if (it->second.type == PG::Query::LOG) {
index 86f2950884c12a778ff91b768d8ebd6649656d58..cd3e29c3be8c0c7a9a65f99a38efaa29e0a8468d 100644 (file)
@@ -257,7 +257,8 @@ private:
   void  _remove_unlock_pg(PG *pg);         // remove from store and memory
 
   void load_pgs();
-  void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from);
+  void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from,
+                         vector<int>& last);
   void activate_pg(pg_t pgid, epoch_t epoch);
 
   class C_Activate : public Context {
@@ -291,6 +292,7 @@ private:
 
   void do_notifies(map< int, list<PG::Info> >& notify_list);
   void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
+  void do_activators(map<int, MOSDPGActivateSet*>& activator_map);
   void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
 
   bool require_current_map(Message *m, epoch_t v);
@@ -299,8 +301,16 @@ private:
   void handle_pg_query(class MOSDPGQuery *m);
   void handle_pg_notify(class MOSDPGNotify *m);
   void handle_pg_log(class MOSDPGLog *m);
+  void handle_pg_activate_set(class MOSDPGActivateSet *m);
   void handle_pg_remove(class MOSDPGRemove *m);
 
+  // helper for handle_pg_log and handle_pg_activate_set
+  void _process_pg_info(epoch_t epoch, int from,
+                       PG::Info &info, 
+                       PG::Log &log, 
+                       PG::Missing &missing,
+                       map<int, MOSDPGActivateSet*>* activator_map);
+
 
  public:
   OSD(int id, Messenger *m, MonMap *mm, char *dev = 0);
index 6bf3f10670ed48eaa830ca1413862157430515aa..83795a2be5740936aaeee5ced2913bb0c8d0e064 100644 (file)
@@ -93,7 +93,7 @@ public:
 
     // incremental
     map<int32_t,entity_inst_t> new_up;
-    map<int32_t,entity_inst_t> new_down;
+    map<int32_t,pair<entity_inst_t,bool> > new_down;
     list<int32_t> new_in;
     list<int32_t> new_out;
     map<int32_t,float> new_overload;  // updated overload value
@@ -135,7 +135,7 @@ private:
   int32_t localized_pg_num_mask; // ditto
 
   set<int32_t>  osds;        // all osds
-  set<int32_t>  down_osds;   // list of down disks
+  map<int32_t, bool> down_osds;   // list of down disks, -> clean shutdown (true/false)
   set<int32_t>  out_osds;    // list of unmapped disks
   map<int32_t,float> overload_osds; 
   map<int32_t,entity_inst_t> osd_inst;
@@ -176,12 +176,13 @@ private:
   void get_all_osds(set<int>& ls) { ls = osds; }
 
   const set<int>& get_osds() { return osds; }
-  const set<int>& get_down_osds() { return down_osds; }
+  const map<int,bool>& get_down_osds() { return down_osds; }
   const set<int>& get_out_osds() { return out_osds; }
   const map<int,float>& get_overload_osds() { return overload_osds; }
   
   bool exists(int osd) { return osds.count(osd); }
   bool is_down(int osd) { return down_osds.count(osd); }
+  bool is_down_clean(int osd) { return down_osds.count(osd) && down_osds[osd]; }
   bool is_up(int osd) { return exists(osd) && !is_down(osd); }
   bool is_out(int osd) { return out_osds.count(osd); }
   bool is_in(int osd) { return exists(osd) && !is_out(osd); }
@@ -201,7 +202,7 @@ private:
     return false;
   }
   
-  void mark_down(int o) { down_osds.insert(o); }
+  void mark_down(int o, bool clean) { down_osds[o] = clean; }
   void mark_up(int o) { down_osds.erase(o); }
   void mark_out(int o) { out_osds.insert(o); }
   void mark_in(int o) { out_osds.erase(o); }
@@ -220,13 +221,13 @@ private:
     }
 
     // nope, incremental.
-    for (map<int32_t,entity_inst_t>::iterator i = inc.new_down.begin();
+    for (map<int32_t,pair<entity_inst_t,bool> >::iterator i = inc.new_down.begin();
          i != inc.new_down.end();
          i++) {
       assert(down_osds.count(i->first) == 0);
-      down_osds.insert(i->first);
+      down_osds[i->first] = i->second.second;
       assert(osd_inst.count(i->first) == 0 ||
-             osd_inst[i->first] == i->second);
+             osd_inst[i->first] == i->second.first);
       osd_inst.erase(i->first);
       //cout << "epoch " << epoch << " down osd" << i->first << endl;
     }
index 93cad0d06654c28c1eb227fddb1354977dc2249d..568021d875dbb9b7c0c76264a0fa21806b7f2ece 100644 (file)
@@ -24,6 +24,7 @@
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGActivateSet.h"
 
 #undef dout
 #define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << dbeginl << g_clock.now() << " osd" << osd->whoami << " " << (osd->osdmap ? osd->osdmap->get_epoch():0) << " " << *this << " "
@@ -575,7 +576,8 @@ void PG::clear_primary_state()
 }
 
 void PG::peer(ObjectStore::Transaction& t, 
-              map< int, map<pg_t,Query> >& query_map)
+              map< int, map<pg_t,Query> >& query_map,
+             map<int, MOSDPGActivateSet*> *activator_map)
 {
   dout(10) << "peer.  acting is " << acting 
            << ", prior_set is " << prior_set << dendl;
@@ -619,6 +621,7 @@ void PG::peer(ObjectStore::Transaction& t,
     // start with the last active set of replicas
     set<int> last_started;
     vector<int> acting;
+    bool cleanly_down = true;
     omap.pg_to_acting_osds(get_pgid(), acting);
     for (unsigned i=0; i<acting.size(); i++)
       last_started.insert(acting[i]);
@@ -638,6 +641,8 @@ void PG::peer(ObjectStore::Transaction& t,
         //dout(10) << " down in epoch " << e << " is " << omap.get_down_osds() << dendl;
         if (omap.is_up(*i))
           still_up.insert(*i);
+       else if (!omap.is_down_clean(*i))
+         cleanly_down = false;    
       }
 
       last_started.swap(still_up);
@@ -645,8 +650,12 @@ void PG::peer(ObjectStore::Transaction& t,
     }
     
     if (last_started.empty()) {
-      dout(10) << " crashed since epoch " << last_epoch_started_any << dendl;
-      state_set(STATE_CRASHED);
+      if (cleanly_down) {
+       dout(10) << " cleanly stopped since epoch " << last_epoch_started_any << dendl;
+      } else {
+       dout(10) << " crashed since epoch " << last_epoch_started_any << dendl;
+       state_set(STATE_CRASHED);
+      }
     } else {
       dout(10) << " still active from last started: " << last_started << dendl;
     }
@@ -815,12 +824,13 @@ void PG::peer(ObjectStore::Transaction& t,
   } 
   else if (!is_active()) {
     // -- ok, activate!
-    activate(t);
+    activate(t, activator_map);
   }
 }
 
 
-void PG::activate(ObjectStore::Transaction& t)
+void PG::activate(ObjectStore::Transaction& t,
+                 map<int, MOSDPGActivateSet*> *activator_map)
 {
   assert(!is_active());
 
@@ -873,7 +883,6 @@ void PG::activate(ObjectStore::Transaction& t)
     dout(10) << "activate - not complete, " << missing << dendl;
   }
 
-
   // if primary..
   if (role == 0 &&
       osd->osdmap->post_mkfs()) {
@@ -887,26 +896,35 @@ void PG::activate(ObjectStore::Transaction& t)
       int peer = acting[i];
       assert(peer_info.count(peer));
       
-      MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(), 
-                                   info.pgid);
-      m->info = info;
+      MOSDPGLog *m = 0;
       
       if (peer_info[peer].last_update == info.last_update) {
         // empty log
-      } 
-      else if (peer_info[peer].last_update < log.bottom) {
-        // summary/backlog
-        assert(log.backlog);
-        m->log = log;
+       if (activator_map) {
+         dout(10) << "activate - peer osd" << peer << " is up to date, queueing in pending_activators" << dendl;
+         if (activator_map->count(peer) == 0)
+           (*activator_map)[peer] = new MOSDPGActivateSet(osd->osdmap->get_epoch());
+         (*activator_map)[peer]->pg_info.push_back(info);
+       } else {
+         dout(10) << "activate - peer osd" << peer << " is up to date, but sending pg_log anyway" << dendl;
+         m = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+       }
       } 
       else {
-        // incremental log
-        assert(peer_info[peer].last_update < info.last_update);
-        m->log.copy_after(log, peer_info[peer].last_update);
+       m = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+       if (peer_info[peer].last_update < log.bottom) {
+         // summary/backlog
+         assert(log.backlog);
+         m->log = log;
+       } else {
+         // incremental log
+         assert(peer_info[peer].last_update < info.last_update);
+         m->log.copy_after(log, peer_info[peer].last_update);
+       }
       }
 
       // update local version of peer's missing list!
-      {
+      if (m) {
         eversion_t plu = peer_info[peer].last_update;
         Missing& pm = peer_missing[peer];
         for (list<Log::Entry>::iterator p = m->log.log.begin();
@@ -916,10 +934,12 @@ void PG::activate(ObjectStore::Transaction& t)
             pm.add(p->oid, p->version);
       }
       
-      dout(10) << "activate sending " << m->log << " " << m->missing
-               << " to osd" << peer << dendl;
-      //m->log.print(cout);
-      osd->messenger->send_message(m, osd->osdmap->get_inst(peer));
+      if (m) {
+       dout(10) << "activate sending " << m->log << " " << m->missing
+                << " to osd" << peer << dendl;
+       //m->log.print(cout);
+       osd->messenger->send_message(m, osd->osdmap->get_inst(peer));
+      }
 
       // update our missing
       if (peer_missing[peer].num_missing() == 0) {
@@ -977,8 +997,6 @@ void PG::activate(ObjectStore::Transaction& t)
 
 
 
-
-
 void PG::write_log(ObjectStore::Transaction& t)
 {
   dout(10) << "write_log" << dendl;
@@ -1083,9 +1101,9 @@ void PG::read_log(ObjectStore *store)
   // load bounds
   ondisklog.bottom = ondisklog.top = 0;
   r = store->collection_getattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
-  //assert(r == sizeof(ondisklog.bottom));
+  assert(r == sizeof(ondisklog.bottom));
   r = store->collection_getattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
-  //assert(r == sizeof(ondisklog.top));
+  assert(r == sizeof(ondisklog.top));
 
   dout(10) << "read_log [" << ondisklog.bottom << "," << ondisklog.top << ")" << dendl;
 
index 545c42807c182020c7e9bae0853b02e3d48204e8..d68164cee9dd005abdca2069e75afe8d92918b97 100644 (file)
@@ -34,7 +34,7 @@ using namespace __gnu_cxx;
 class OSD;
 class MOSDOp;
 class MOSDOpReply;
-
+class MOSDPGActivateSet;
 
 /** PG - Replica Placement Group
  *
@@ -515,9 +515,11 @@ public:
   
   void trim_write_ahead();
 
-  void peer(ObjectStore::Transaction& t, map< int, map<pg_t,Query> >& query_map);
-
-  void activate(ObjectStore::Transaction& t);
+  void peer(ObjectStore::Transaction& t, 
+           map< int, map<pg_t,Query> >& query_map,
+           map<int, MOSDPGActivateSet*> *activator_map=0);
+  void activate(ObjectStore::Transaction& t, 
+               map<int, MOSDPGActivateSet*> *activator_map=0);
 
   virtual void clean_up_local(ObjectStore::Transaction& t) = 0;
 
@@ -638,7 +640,7 @@ inline ostream& operator<<(ostream& out, const PG::Info::History& h)
 
 inline ostream& operator<<(ostream& out, const PG::Info& pgi) 
 {
-  out << "pginfo(" << pgi.pgid;
+  out << pgi.pgid << "(";
   if (pgi.is_empty())
     out << " empty";
   else
index 8d1d58929c4ad70eb1bf57aaa7d40adf61c33c19..8eb02e1a10329c7d589de747621c8c5304380618 100644 (file)
@@ -70,10 +70,10 @@ void Objecter::handle_osd_map(MOSDMap *m)
         osdmap->apply_incremental(inc);
     
         // notify messenger
-        for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
+        for (map<int,pair<entity_inst_t,bool> >::iterator i = inc.new_down.begin();
              i != inc.new_down.end();
              i++) 
-          messenger->mark_down(i->second.addr);
+          messenger->mark_down(i->second.first.addr);
         
       }
       else if (m->maps.count(e)) {