]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cleanup of osd failure recovery
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 15 Sep 2006 21:53:31 +0000 (21:53 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 15 Sep 2006 21:53:31 +0000 (21:53 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@858 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/TODO
ceph/common/Clock.h
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/PG.cc
ceph/osd/PG.h
ceph/osdc/Objecter.cc
ceph/osdc/Objecter.h

index 7d670169710e93f8e3cddc70ea370a6c28f57c1b..66dadb3f38603603ed44c1da4fb4cb4cb511db0d 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -4,17 +4,13 @@
 - how to get usage feedback to monitor?
 
 
-- fix tare weirdness
-
-
 osd/rados
-- clean up writeahead logs
+- consider implications of nvram writeahead logs
+- deal with divergent replicas that recover
 - fix heartbeat wrt new replication
-- figure out new rep failure cases
-- same_primary_since -> same_tail_since
 - mark residual pgs obsolete  ???
-- deal with divergent disconnected primaries
 - rdlocks
+- optimize remove wrt recovery pushes
 - pg_bit changes
 - use pg->info.same_role_since wrt replication ops.
 - report crashed pgs?
@@ -39,7 +35,6 @@ monitor
 
 
 objecter
-- handle new rep mode failure modes...  head, tail, middle
 
 objectcacher
 - ocacher flushing
index 10543a41cee760276bd701c7697f23d7eb02074a..1fed020eddfa434eb5d445ed99ca05758d9469fa 100644 (file)
@@ -127,6 +127,7 @@ class Clock {
  public:
   Clock() {
        // set offset
+       tare();
   }
 
   // real time.
index 2f2533e507e1d6009cf86ee224b9311cf7af26ba..802ac1eb2c4127a5553d7ea356b4a7fad79215b3 100644 (file)
@@ -64,6 +64,7 @@
 #include "config.h"
 #undef dout
 #define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
+#define  derr(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cerr << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
 
 char *osd_base_path = "./osddata";
 char *ebofs_base_path = "./ebofsdev";
@@ -433,7 +434,7 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch)
          if (pg->is_crashed() &&
                  pg->is_replay() &&
                  pg->get_role() == 0 &&
-                 pg->info.same_primary_since <= epoch) {
+                 pg->info.history.same_primary_since <= epoch) {
                ObjectStore::Transaction t;
                pg->activate(t);
                store->apply_transaction(t);
@@ -980,9 +981,9 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                pg->acting.swap(acting);
                pg->last_epoch_started_any = 
                  pg->info.last_epoch_started = 
-                 pg->info.same_primary_since = 
-                 pg->info.same_acker_since = 
-                 pg->info.same_role_since = osdmap->get_epoch();
+                 pg->info.history.same_since = 
+                 pg->info.history.same_primary_since = 
+                 pg->info.history.same_acker_since = osdmap->get_epoch();
                pg->activate(t);
                
                dout(7) << "created " << *pg << endl;
@@ -999,9 +1000,9 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          pg->set_role(role);
          pg->last_epoch_started_any = 
                pg->info.last_epoch_started = 
-               pg->info.same_primary_since = 
-               pg->info.same_acker_since = 
-               pg->info.same_role_since = osdmap->get_epoch();
+               pg->info.history.same_primary_since = 
+               pg->info.history.same_acker_since = 
+               pg->info.history.same_since = osdmap->get_epoch();
          pg->activate(t);
                
          dout(7) << "created " << *pg << endl;
@@ -1023,16 +1024,16 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          }       
 
          // get new acting set
-         vector<int> acting;
-         int nrep = osdmap->pg_to_acting_osds(pgid, acting);
-         int role = osdmap->calc_pg_role(whoami, acting, nrep);
+         vector<int> tacting;
+         int nrep = osdmap->pg_to_acting_osds(pgid, tacting);
+         int role = osdmap->calc_pg_role(whoami, tacting, nrep);
 
          // no change?
-         if (acting == pg->acting) 
+         if (tacting == pg->acting) 
                continue;
 
-         int primary = -1;
-         if (nrep > 0) primary = acting[0];
+         // -- there was a change! --
+         _lock_pg(pgid);
          
          int oldrole = pg->get_role();
          int oldprimary = pg->get_primary();
@@ -1040,25 +1041,49 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          vector<int> oldacting = pg->acting;
          
          // update PG
-         pg->acting.swap(acting);
+         pg->acting.swap(tacting);
          pg->set_role(role);
          
          // did primary|acker change?
-         if (oldprimary != primary) {
-               pg->info.same_primary_since = osdmap->get_epoch();
+         pg->info.history.same_since = osdmap->get_epoch();
+         if (oldprimary != pg->get_primary()) {
+               pg->info.history.same_primary_since = osdmap->get_epoch();
                pg->cancel_recovery();
          }
          if (oldacker != pg->get_acker()) {
-               pg->info.same_acker_since = osdmap->get_epoch();
+               pg->info.history.same_acker_since = osdmap->get_epoch();
          }
-         if (oldprimary != primary || oldacker != pg->get_acker()) {
+
+         // deactivate.
+         pg->state_clear(PG::STATE_ACTIVE);
+         
+         // discard any repops in progress.
+         if (oldacker == whoami) {
                // drop our write-ahead log.  (we'll only have on if we were just the acker)
                pg->trim_write_ahead();
+
+               // drop repops
+               for (map<tid_t,PG::RepOpGather*>::iterator p = pg->repop_gather.begin();
+                        p != pg->repop_gather.end();
+                        p++) {
+                 dout(-1) << *pg << " discarding repop " << p->second << endl;
+                 delete p->second->op;
+                 delete p->second;
+               }
+               pg->repop_gather.clear();
+               
+               // and repop waiters
+               for (map<tid_t, list<Message*> >::iterator p = pg->waiting_for_repop.begin();
+                        p != pg->waiting_for_repop.end();
+                        p++)
+                 for (list<Message*>::iterator pm = p->second.begin();
+                          pm != p->second.end();
+                          pm++)
+                       delete *pm;
+               pg->waiting_for_repop.clear();
          }
 
          if (role != oldrole) {
-               pg->info.same_role_since = osdmap->get_epoch();
-
                // old primary?
                if (oldrole == 0) {
                  pg->state_clear(PG::STATE_CLEAN);
@@ -1089,12 +1114,10 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                // new primary?
                if (role == 0) {
                  // i am new primary
-                 pg->state_clear(PG::STATE_ACTIVE);
                  pg->state_clear(PG::STATE_STRAY);
                  pg->last_epoch_started_any = pg->info.last_epoch_started;
                } else {
                  // i am now replica|stray.  we need to send a notify.
-                 pg->state_clear(PG::STATE_ACTIVE);
                  pg->state_set(PG::STATE_STRAY);
 
                  if (nrep == 0) {
@@ -1104,30 +1127,28 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                }
                
                // my role changed.
-               dout(10) << *pg << " " << oldacting << " -> " << acting 
+               dout(10) << *pg << " " << oldacting << " -> " << pg->acting 
                                 << ", role " << oldrole << " -> " << role << endl; 
                
          } else {
                // no role change.
                // did primary change?
-               if (primary != oldprimary) {    
+               if (pg->get_primary() != oldprimary) {  
                  // we need to announce
                  pg->state_set(PG::STATE_STRAY);
-                 pg->state_clear(PG::STATE_ACTIVE);
                  
-                 dout(10) << *pg << " " << oldacting << " -> " << acting 
+                 dout(10) << *pg << " " << oldacting << " -> " << pg->acting 
                                   << ", acting primary " 
-                                  << oldprimary << " -> " << primary 
+                                  << oldprimary << " -> " << pg->get_primary() 
                                   << endl;
                } else {
                  // primary is the same.
                  if (role == 0) {
                        // i am (still) primary. but my replica set changed.
-                       pg->state_clear(PG::STATE_ACTIVE);
                        pg->state_clear(PG::STATE_CLEAN);
                        pg->state_clear(PG::STATE_REPLAY);
 
-                       dout(10) << *pg << " " << oldacting << " -> " << acting
+                       dout(10) << *pg << " " << oldacting << " -> " << pg->acting
                                         << ", replicas changed" << endl;
 
                        // completely restart peering process.
@@ -1156,6 +1177,8 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                }
          }
          
+
+         _unlock_pg(pgid);
        }
   }
 }
@@ -1183,7 +1206,8 @@ void OSD::activate_map(ObjectStore::Transaction& t)
          pg->build_prior();
          pg->peer(t, query_map);
        }
-       else if (pg->is_stray()) {
+       else if (pg->is_stray() &&
+                        pg->get_primary() >= 0) {
          // i am residual|replica
          notify_list[pg->get_primary()].push_back(pg->info);
        }
@@ -1394,30 +1418,63 @@ void OSD::load_pgs()
 }
  
 /**
- * check epochs starting from start to verify the primary hasn't changed
+ * check epochs starting from start to verify the pg acting set hasn't changed
  * up until now
  */
-epoch_t OSD::calc_pg_primary_since(int primary, pg_t pgid, epoch_t start)
+void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
 {
+  dout(-15) << "project_pg_history " << hex << pgid << dec
+                  << " from " << from << " to " << osdmap->get_epoch()
+                  << ", start " << h
+                  << endl;
+
+  vector<int> last;
+  osdmap->pg_to_acting_osds(pgid, last);
+
   for (epoch_t e = osdmap->get_epoch()-1;
-          e >= start;
+          e >= from;
           e--) {
        // verify during intermediate epoch
-       vector<int> acting;
-       
        OSDMap oldmap;
        get_map(e, oldmap);
+
+       vector<int> acting;
        oldmap.pg_to_acting_osds(pgid, acting);
 
-       if (acting[0] != primary) 
-         return e+1;  // nope, primary only goes back through e!
-  }
+       // acting set change?
+       if (acting != last && 
+               e <= h.same_since) {
+         dout(-15) << "project_pg_history " << hex << pgid << dec << " changed in " << e+1 
+                               << " from " << acting << " -> " << last << endl;
+         h.same_since = e+1;
+       }
 
-  return start;  // same all the way back thru start!
-}
+       // primary change?
+       if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
+               e <= h.same_primary_since) {
+         dout(-15) << "project_pg_history " << hex << pgid << dec << " primary changed in " << e+1 << endl;
+         h.same_primary_since = e+1;
+       
+         if (g_conf.osd_rep == OSD_REP_PRIMARY)
+               h.same_acker_since = h.same_primary_since;
+       }
 
+       // acker change?
+       if (g_conf.osd_rep != OSD_REP_PRIMARY) {
+         if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
+                 e <= h.same_acker_since) {
+               dout(-15) << "project_pg_history " << hex << pgid << dec << " acker changed in " << e+1 << endl;
+               h.same_acker_since = e+1;
+         }
+       }
 
+       if (h.same_since > e &&
+               h.same_primary_since > e &&
+               h.same_acker_since > e) break;
+  }
 
+  dout(-15) << "project_pg_history end " << h << endl;
+}
 
 
 /** do_notifies
@@ -1488,37 +1545,21 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
        PG *pg;
 
        if (pg_map.count(pgid) == 0) {
-         // check mapping.
-         vector<int> acting;
-         int nrep = osdmap->pg_to_acting_osds(pgid, acting);
-         if (!nrep) {
-               dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " has null mapping" << endl;
-               continue;
-         }
-         
-         // am i still the primary?
-         assert(it->same_primary_since <= osdmap->get_epoch());
-         if (acting.empty() || acting[0] != whoami) {
-               // not primary now, so who cares!
-               dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and i'm not the primary" << endl;
+         // same primary?
+         PG::Info::History history = it->history;
+         project_pg_history(pgid, history, m->get_epoch());
+
+         if (m->get_epoch() < history.same_primary_since) {
+               dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and primary changed in "
+                                << history.same_primary_since << " (msg from " << m->get_epoch() << ")" << endl;
                continue;
-         } else {
-               // ok, well, i'm primary now... was it continuous since caller's epoch?
-               epoch_t since = calc_pg_primary_since(whoami, pgid, m->get_epoch());
-               if (since > m->get_epoch()) {
-                 dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and i wasn't primary during intermediate epoch " << since
-                                  << " (caller " << m->get_epoch() << " < " << since << " < now " << osdmap->get_epoch() << ")" << endl;
-                 continue;
-               }
          }
          
          // ok, create PG!
          pg = create_pg(pgid, t);
-         pg->acting.swap( acting );
+         osdmap->pg_to_acting_osds(pgid, pg->acting);
          pg->set_role(0);
-         pg->info.same_primary_since = it->same_primary_since;
-         pg->info.same_acker_since = it->same_acker_since;
-         pg->info.same_role_since = osdmap->get_epoch();
+         pg->info.history = history;
 
          pg->last_epoch_started_any = it->last_epoch_started;
          pg->build_prior();
@@ -1533,21 +1574,14 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
                waiting_for_pg.erase(pgid);
          }
 
-         pg = _lock_pg(pgid);
+         _lock_pg(pgid);
        } else {
          // already had it.  am i (still) the primary?
          pg = _lock_pg(pgid);
-         if (pg->is_primary()) {
-               if (pg->info.same_primary_since > m->get_epoch()) {
-                 dout(10) << *pg << " requestor epoch " << m->get_epoch() 
-                                  << " < my primary start epoch " << pg->info.same_primary_since 
-                                  << endl;
-                 _unlock_pg(pgid);
-                 continue;
-               }
-         } else {
-               dout(10) << *pg << " not primary" << endl;
-               assert(m->get_epoch() < osdmap->get_epoch());
+         if (m->get_epoch() < pg->info.history.same_primary_since) {
+               dout(10) << *pg << " handle_pg_notify primary changed in "
+                                << pg->info.history.same_primary_since
+                                << " (msg from " << m->get_epoch() << ")" << endl;
                _unlock_pg(pgid);
                continue;
          }
@@ -1628,12 +1662,12 @@ void OSD::handle_pg_log(MOSDPGLog *m)
   assert(pg);
 
   dout(7) << "handle_pg_log " << *pg 
-                 << " got " << m->log
+                 << " got " << m->log << " " << m->missing
                  << " from " << m->get_source() << endl;
 
   ObjectStore::Transaction t;
 
-  if (pg->acting[0] == whoami) {
+  if (pg->is_primary()) {
        // i am PRIMARY
        assert(pg->peer_log_requested.count(from) ||
                   pg->peer_summary_requested.count(from));
@@ -1657,9 +1691,7 @@ void OSD::handle_pg_log(MOSDPGLog *m)
        pg->merge_log(m->log, m->missing, from);
        assert(pg->missing.num_lost() == 0);
 
-       // ok active!
-       pg->info.same_primary_since = m->info.same_primary_since;
-       pg->info.same_acker_since = m->info.same_acker_since;
+       // ok activate!
        pg->activate(t);
   }
 
@@ -1689,61 +1721,60 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
           it != m->pg_list.end();
           it++) {
        pg_t pgid = it->first;
-       PG *pg;
+       PG *pg = 0;
 
        if (pg_map.count(pgid) == 0) {
+         // same primary?
+         PG::Info::History history = it->second.history;
+         project_pg_history(pgid, history, m->get_epoch());
+
+         if (m->get_epoch() < history.same_primary_since) {
+               dout(10) << " pg " << hex << pgid << dec << " dne, and primary has changed in "
+                                << history.same_primary_since << " (msg from " << m->get_epoch() << ")" << endl;
+               continue;
+         }
+
          // get active rush mapping
          vector<int> acting;
          int nrep = osdmap->pg_to_acting_osds(pgid, acting);
          int role = osdmap->calc_pg_role(whoami, acting, nrep);
 
-         if (role == 0) {
-               dout(10) << " pg " << hex << pgid << dec << " dne, and i am primary.  just waiting for notify." << endl;
-               continue;
-         }
          if (role < 0) {
                dout(10) << " pg " << hex << pgid << dec << " dne, and i am not an active replica" << endl;
                PG::Info empty(pgid);
                notify_list[from].push_back(empty);
                continue;
          }
-         
+         assert(role > 0);
+
          ObjectStore::Transaction t;
-         PG *pg = create_pg(pgid, t);
+         pg = create_pg(pgid, t);
          pg->acting.swap( acting );
          pg->set_role(role);
-         
-         pg->info.same_primary_since = it->second.same_primary_since;
-         pg->info.same_acker_since = it->second.same_acker_since;
-         pg->info.same_role_since = osdmap->get_epoch();
+         pg->info.history = history;
 
          t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
          store->apply_transaction(t);
 
          dout(10) << *pg << " dne (before), but i am role " << role << endl;
-       }
-       pg = _lock_pg(pgid);
-       
-       // verify this is from same primary
-       if (pg->is_primary()) { 
-         dout(10) << *pg << " i am primary, skipping" << endl;
-         _unlock_pg(pgid);
-         continue;
-       } else {
-         if (from == pg->acting[0]) {
-               if (m->get_epoch() < pg->info.same_primary_since) {
-                 dout(10) << *pg << " not same primary since " << m->get_epoch() << ", skipping" << endl;
-                 _unlock_pg(pgid);
-                 continue;
-               }
-         } else {
-               dout(10) << *pg << " query not from primary, skipping" << endl;
-               assert(m->get_epoch() < osdmap->get_epoch());
+         _lock_pg(pgid);
+       } else {
+         pg = _lock_pg(pgid);
+         
+         // same primary?
+         if (m->get_epoch() < pg->info.history.same_primary_since) {
+               dout(10) << *pg << " handle_pg_query primary changed in "
+                                << pg->info.history.same_primary_since
+                                << " (msg from " << m->get_epoch() << ")" << endl;
                _unlock_pg(pgid);
                continue;
          }
        }
 
+       // ok, process query!
+       assert(!pg->acting.empty());
+       assert(from == pg->acting[0]);
+
        if (it->second.type == PG::Query::INFO) {
          // info
          dout(10) << *pg << " sending info" << endl;
@@ -1828,9 +1859,10 @@ void OSD::handle_pg_remove(MOSDPGRemove *m)
 
 /** pull - request object from a peer
  */
-void OSD::pull(PG *pg, object_t oid, eversion_t v)
+void OSD::pull(PG *pg, object_t oid)
 {
   assert(pg->missing.loc.count(oid));
+  eversion_t v = pg->missing.missing[oid];
   int osd = pg->missing.loc[oid];
   
   dout(7) << *pg << " pull " << hex << oid << dec 
@@ -1905,19 +1937,33 @@ void OSD::op_pull(MOSDOp *op, PG *pg)
   const eversion_t v = op->get_version();
   int from = op->get_source().num();
 
-  dout(7) << "op_pull " << hex << oid << dec << " v " << op->get_version()
+  dout(7) << *pg << " op_pull " << hex << oid << dec << " v " << op->get_version()
                  << " from " << op->get_source()
                  << endl;
 
   // is a replica asking?  are they missing it?
-  if (pg->is_primary() &&
-         (pg->peer_missing.count(from) == 0 ||
-          !pg->peer_missing[from].is_missing(oid))) {
-       dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << endl;
-       delete op;
-       return;
+  if (pg->is_primary()) {
+       // primary
+       assert(pg->peer_missing.count(from));  // we had better know this, from the peering process.
+
+       if (!pg->peer_missing[from].is_missing(oid)) {
+         dout(7) << *pg << " op_pull replica isn't actually missing it, we must have already pushed to them" << endl;
+         delete op;
+         return;
+       }
+
+       // do we have it yet?
+       if (waitfor_missing_object(op, pg))
+         return;
+  } else {
+       // non-primary
+       if (pg->missing.is_missing(oid)) {
+         dout(7) << *pg << " op_pull not primary, and missing " << hex << oid << dec << ", ignoring" << endl;
+         delete op;
+         return;
+       }
   }
-  
+    
   // push it back!
   push(pg, oid, op->get_source().num());
 }
@@ -1932,15 +1978,14 @@ void OSD::op_push(MOSDOp *op, PG *pg)
   eversion_t v = op->get_version();
 
   if (!pg->missing.is_missing(oid)) {
-       dout(7) << "op_push not missing object " << hex << oid << dec << endl;
+       dout(7) << *pg << " op_push not missing " << hex << oid << dec << endl;
        return;
   }
   
-  dout(7) << "op_push " 
+  dout(7) << *pg << " op_push " 
                  << hex << oid << dec 
                  << " v " << v 
                  << " size " << op->get_length() << " " << op->get_data().length()
-                 << " in " << *pg
                  << endl;
 
   assert(op->get_data().length() == op->get_length());
@@ -1952,6 +1997,13 @@ void OSD::op_push(MOSDOp *op, PG *pg)
   t.setattrs(oid, op->get_attrset());
   t.collection_add(pg->info.pgid, oid);
 
+  // close out pull op?
+  num_pulling--;
+  if (pg->objects_pulling.count(oid))
+       pg->objects_pulling.erase(oid);
+  pg->missing.got(oid, v);
+
+
   // raise last_complete?
   assert(pg->log.complete_to != pg->log.log.end());
   while (pg->log.complete_to != pg->log.log.end()) {
@@ -1962,34 +2014,30 @@ void OSD::op_push(MOSDOp *op, PG *pg)
   }
   dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl;
   
+  
   // apply to disk!
   t.collection_setattr(pg->info.pgid, "info", &pg->info, sizeof(pg->info));
   unsigned r = store->apply_transaction(t);
   assert(r == 0);
 
 
-  // close out pull op?
-  num_pulling--;
-  if (pg->objects_pulling.count(oid))
-       pg->objects_pulling.erase(oid);
-  pg->missing.got(oid, v);
 
   // am i primary?  are others missing this too?
   if (pg->is_primary()) {
        for (unsigned i=1; i<pg->acting.size(); i++) {
          int peer = pg->acting[i];
-         if (pg->peer_missing.count(peer) &&
-                 pg->peer_missing[peer].is_missing(oid)) {
+         assert(pg->peer_missing.count(peer));
+         if (pg->peer_missing[peer].is_missing(oid)) {
                // ok, push it, and they (will) have it now.
                pg->peer_missing[peer].got(oid, v);
                push(pg, oid, peer);
          }
        }
-
-       // continue recovery
-       pg->do_recovery();
   }
 
+  // continue recovery
+  pg->do_recovery();
+  
   // kick waiters
   if (pg->waiting_for_missing_object.count(oid)) 
        take_waiters(pg->waiting_for_missing_object[oid]);
@@ -2206,21 +2254,23 @@ void OSD::handle_op(MOSDOp *op)
        if (read) {
          // read. am i the (same) acker?
          if (pg->get_acker() != whoami ||
-                 op->get_map_epoch() < pg->info.same_acker_since) {
+                 op->get_map_epoch() < pg->info.history.same_acker_since) {
                dout(7) << "acting acker is osd" << pg->get_acker()
-                               << " since " << pg->info.same_acker_since 
+                               << " since " << pg->info.history.same_acker_since 
                                << ", dropping" << endl;
                assert(op->get_map_epoch() < osdmap->get_epoch());
+               delete op;
                return;
          }
        } else {
          // write. am i the (same) primary?
          if (pg->get_primary() != whoami ||
-                 op->get_map_epoch() < pg->info.same_primary_since) {
+                 op->get_map_epoch() < pg->info.history.same_primary_since) {
                dout(7) << "acting primary is osd" << pg->get_primary()
-                               << " since " << pg->info.same_primary_since 
+                               << " since " << pg->info.history.same_primary_since 
                                << ", dropping" << endl;
                assert(op->get_map_epoch() < osdmap->get_epoch());
+               delete op;
                return;
          }
        }
@@ -2255,27 +2305,33 @@ void OSD::handle_op(MOSDOp *op)
 
        // have pg?
        if (!pg) {
-         dout(7) << "handle_rep_op " << op 
-                         << " pgid " << hex << pgid << dec << " dne" << endl;
+         derr(-7) << "handle_rep_op " << op 
+                          << " pgid " << hex << pgid << dec << " dne" << endl;
          delete op;
+         //assert(0); // wtf, shouldn't happen.
          return;
        }
        
-    // check osd map: same primary+acker?
-       if (op->get_map_epoch() != osdmap->get_epoch()) {
-         if (op->get_map_epoch() < pg->info.same_primary_since ||
-                 op->get_map_epoch() < pg->info.same_acker_since) {
-               // drop message.
-               delete op;
-               return;
-         }
-         assert(pg->get_role() >= 0);
-
-         dout(5) << "handle_rep_op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
-                         << ", but primary+acker same in " << *pg
-                         << endl;
+    // check osd map: same set, or primary+acker?
+       if (g_conf.osd_rep == OSD_REP_CHAIN &&
+               op->get_map_epoch() < pg->info.history.same_since) {
+         dout(10) << "handle_rep_op pg changed " << pg->info.history
+                          << " after " << op->get_map_epoch() 
+                          << ", dropping" << endl;
+         delete op;
+         return;
        }
-       
+       if (g_conf.osd_rep != OSD_REP_CHAIN &&
+               (op->get_map_epoch() < pg->info.history.same_primary_since ||
+                op->get_map_epoch() < pg->info.history.same_acker_since)) {
+         dout(10) << "handle_rep_op pg primary|acker changed " << pg->info.history
+                          << " after " << op->get_map_epoch() 
+                          << ", dropping" << endl;
+         delete op;
+         return;
+       }
+
+       assert(pg->get_role() >= 0);
        dout(7) << "handle_rep_op " << op << " in " << *pg << endl;
   }
   
@@ -2367,10 +2423,10 @@ void OSD::dequeue_op(pg_t pgid)
        ls.pop_front();
        
        if (pgid) {
-         dout(10) << "dequeue_op write pg " << hex << pgid << dec << " op " << op << ", " 
+         dout(10) << "dequeue_op " << op << " write pg " << hex << pgid << dec 
                           << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
        } else {
-         dout(10) << "dequeue_op read op " << op << ", " 
+         dout(10) << "dequeue_op " << op << " read "
                           << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
        }
        
@@ -2390,7 +2446,7 @@ void OSD::dequeue_op(pg_t pgid)
          _unlock_pg(pgid);
        }
        
-       dout(10) << "dequeue_op finish op " << op << endl;
+       dout(10) << "dequeue_op " << op << " finish" << endl;
        assert(pending_ops > 0);
        
        if (pending_ops > g_conf.osd_max_opq) 
@@ -2412,7 +2468,7 @@ void OSD::dequeue_op(pg_t pgid)
  */
 void OSD::do_op(Message *m, PG *pg) 
 {
-  //dout(15) << "do_op " << *op << " in " << *pg << endl;
+  //dout(15) << "do_op " << *m << endl;
 
   if (m->get_type() == MSG_OSD_OP) {
        MOSDOp *op = (MOSDOp*)m;
@@ -2549,7 +2605,7 @@ bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg)
                          << " in " << *pg
                          << ", pulling"
                          << endl;
-         pull(pg, oid, v);
+         pull(pg, oid);
        }
        pg->waiting_for_missing_object[oid].push_back(op);
        return true;
@@ -2695,13 +2751,13 @@ void OSD::put_repop_gather(PG *pg, PG::RepOpGather *repop)
        // adjust peers_complete_thru
        if (!repop->pg_complete_thru.empty()) {
          eversion_t min = pg->info.last_complete;  // hrm....
-         for (unsigned i=1; i<pg->acting.size(); i++) {
-               if (repop->pg_complete_thru[i] < min)      // note: if we haven't heard, it'll be zero, which is what we want.
-                 min = repop->pg_complete_thru[i];
+         for (unsigned i=0; i<pg->acting.size(); i++) {
+               if (repop->pg_complete_thru[pg->acting[i]] < min)      // note: if we haven't heard, it'll be zero, which is what we want.
+                 min = repop->pg_complete_thru[pg->acting[i]];
          }
          
          if (min > pg->peers_complete_thru) {
-               dout(10) << *pg << "put_repop  peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+               dout(10) << "put_repop  peers_complete_thru " << pg->peers_complete_thru << " -> " << min << " in " << *pg << endl;
                pg->peers_complete_thru = min;
          }
        }
@@ -2877,7 +2933,7 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
   // are any peers missing this?
   for (unsigned i=1; i<pg->acting.size(); i++) {
        int peer = pg->acting[i];
-       if (pg->peer_missing.count(i) &&
+       if (pg->peer_missing.count(peer) &&
                pg->peer_missing[peer].is_missing(oid)) {
          // push it before this update.  FIXME, this is probably extra much work (eg if we're about to overwrite)
          pg->peer_missing[peer].got(oid);
index ef5fa1a4cf826f9ff8ff288921a9f217454c3a2a..05ba3e03d430c4f7e4c19f038a0df21713723cb5 100644 (file)
@@ -192,15 +192,14 @@ public:
 
   // PG
   hash_map<pg_t, PG*>      pg_map;
-  //void  get_pg_list(list<pg_t>& ls);
   void  load_pgs();
   bool  pg_exists(pg_t pg);
   PG   *create_pg(pg_t pg, ObjectStore::Transaction& t);          // create new PG
   PG   *get_pg(pg_t pg);             // return existing PG, or null
-  //void  close_pg(pg_t pg);           // close in-memory state
   void  _remove_pg(pg_t pg);         // remove from store and memory
 
-  epoch_t calc_pg_primary_since(int primary, pg_t pgid, epoch_t start);
+  void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from);
+
   void activate_pg(pg_t pgid, epoch_t epoch);
 
   class C_Activate : public Context {
@@ -236,7 +235,7 @@ public:
   void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
   void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
 
-  void pull(PG *pg, object_t, eversion_t);
+  void pull(PG *pg, object_t oid);
   void push(PG *pg, object_t oid, int dest);
 
   bool require_current_map(Message *m, epoch_t v);
index d4cbeedcfcb7ea644f6b40a1f0ce3b0de45be3d8..542624d1f68a93a91745eccbd658d74cadb853f4 100644 (file)
@@ -224,11 +224,12 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd)
                dout(10) << "merge_log missing " << hex << p->first << dec << " " << p->second
                                 << " also LOST on source, osd" << fromosd << endl;
          }
-       } else {
+       } 
+       else if (p->second <= olog.top) {
          dout(10) << "merge_log missing " << hex << p->first << dec << " " << p->second
                           << " on source, osd" << fromosd << endl;
          missing.loc[p->first] = fromosd;
-       }
+       } 
   }
 
   dout(10) << "merge_log missing " << hex << missing.missing << dec << endl;
@@ -391,7 +392,7 @@ void PG::peer(ObjectStore::Transaction& t,
        }
        
        dout(10) << " querying info from osd" << *it << endl;
-       query_map[*it][info.pgid] = Query(Query::INFO, info.same_primary_since, info.same_acker_since);
+       query_map[*it][info.pgid] = Query(Query::INFO, info.history);
        peer_info_requested.insert(*it);
   }
   if (missing_info) return;
@@ -465,7 +466,20 @@ void PG::peer(ObjectStore::Transaction& t,
        }       
   }
 
-  // gather log?
+  // gather log+missing?
+  // ...from all active
+  for (unsigned i=1; i<acting.size(); i++) {
+       int peer = acting[i];
+       if (peer_log_requested.count(peer)) continue;
+       
+       dout(10) << " pulling log from osd" << peer
+                        << " from v " << oldest_update_needed
+                        << endl;
+       query_map[peer][info.pgid] = Query(Query::LOG, oldest_update_needed, info.history);
+       peer_log_requested[peer] = oldest_update_needed;
+  }
+
+  // ...and the newest too
   if (newest_update_osd != osd->whoami) {
        if (peer_log_requested.count(newest_update_osd) ||
                peer_summary_requested.count(newest_update_osd)) {
@@ -479,8 +493,7 @@ void PG::peer(ObjectStore::Transaction& t,
                                 << " v " << newest_update 
                                 << ", querying since " << oldest_update_needed
                                 << endl;
-               query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed, 
-                                                                                                               info.same_primary_since, info.same_acker_since);
+               query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed, info.history);
                peer_log_requested[newest_update_osd] = oldest_update_needed;
          } else {
                dout(10) << " newest update on osd" << newest_update_osd
@@ -490,8 +503,7 @@ void PG::peer(ObjectStore::Transaction& t,
                assert((peer_info[newest_update_osd].last_complete >= 
                                peer_info[newest_update_osd].log_bottom) ||
                           peer_info[newest_update_osd].log_backlog);  // or else we're in trouble.
-               query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG, 
-                                                                                                               info.same_primary_since, info.same_acker_since);
+               query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG, info.history);
                peer_summary_requested.insert(newest_update_osd);
          }
        }
@@ -499,6 +511,18 @@ void PG::peer(ObjectStore::Transaction& t,
   } else {
        dout(10) << " i have the most up-to-date pg v " << info.last_update << endl;
   }
+
+  // did we get them all?
+  bool have_missing = true;
+  for (unsigned i=1; i<acting.size(); i++) {
+       int peer = acting[i];
+       if (peer_missing.count(peer)) continue;
+       
+       dout(10) << " waiting for log+missing from osd" << peer << endl;
+       have_missing = false;
+  }
+  if (!have_missing) return;
+
   dout(10) << " peers_complete_thru " << peers_complete_thru << endl;
   dout(10) << " oldest_update_needed " << oldest_update_needed << endl;
 
@@ -532,8 +556,7 @@ void PG::peer(ObjectStore::Transaction& t,
                           << ".  fetching summary/backlog from osd" << who
                           << endl;
          assert(who != osd->whoami); // can't be me, or we're in trouble.
-         query_map[who][info.pgid] = Query(Query::BACKLOG, 
-                                                                               info.same_primary_since, info.same_acker_since);
+         query_map[who][info.pgid] = Query(Query::BACKLOG, info.history);
          peer_summary_requested.insert(who);
        }
        return;
@@ -557,9 +580,7 @@ void PG::peer(ObjectStore::Transaction& t,
          }
 
          dout(10) << " requesting summary/backlog from osd" << peer << endl;     
-         query_map[peer][info.pgid] = Query(Query::INFO, 
-                                                                                info.same_primary_since,
-                                                                                info.same_acker_since);
+         query_map[peer][info.pgid] = Query(Query::INFO, info.history);
          peer_summary_requested.insert(peer);
          waiting = true;
        }
@@ -589,7 +610,8 @@ void PG::peer(ObjectStore::Transaction& t,
        state_set(STATE_REPLAY);
        g_timer.add_event_after(g_conf.osd_replay_window,
                                                        new OSD::C_Activate(osd, info.pgid, osd->osdmap->get_epoch()));
-  } else {
+  } 
+  else if (!is_active()) {
     // -- ok, activate!
        activate(t);
   }
@@ -598,6 +620,8 @@ void PG::peer(ObjectStore::Transaction& t,
 
 void PG::activate(ObjectStore::Transaction& t)
 {
+  assert(!is_active());
+
   // twiddle pg state
   state_set(STATE_ACTIVE);
   state_clear(STATE_STRAY);
@@ -629,7 +653,8 @@ void PG::activate(ObjectStore::Transaction& t)
        log.complete_to == log.log.end();
        log.requested_to = log.log.end();
   } 
-  else if (is_primary()) {
+  //else if (is_primary()) {
+  else if (true) {
        dout(10) << "activate - not complete, " << missing << ", starting recovery" << endl;
        
        // init complete_to
@@ -638,7 +663,7 @@ void PG::activate(ObjectStore::Transaction& t)
          log.complete_to++;
          assert(log.complete_to != log.log.end());
        }
-
+       
        // start recovery
        log.requested_to = log.complete_to;
     do_recovery();
@@ -660,10 +685,6 @@ void PG::activate(ObjectStore::Transaction& t)
          int peer = acting[i];
          assert(peer_info.count(peer));
          
-         if (peer_info[peer].is_clean()) 
-               clean_set.insert(peer);
-         
-         
          MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(), 
                                                                   info.pgid);
          m->info = info;
@@ -681,19 +702,42 @@ void PG::activate(ObjectStore::Transaction& t)
                assert(peer_info[peer].last_update < info.last_update);
                m->log.copy_after(log, peer_info[peer].last_update);
          }
+
+         // update local version of peer's missing list!
+         {
+               eversion_t plu = peer_info[peer].last_update;
+               Missing& pm = peer_missing[peer];
+               for (list<Log::Entry>::iterator p = m->log.log.begin();
+                        p != m->log.log.end();
+                        p++) 
+                 if (p->version > plu)
+                       pm.add(p->oid, p->version);
+         }
          
-         dout(10) << "sending " << m->log << " " << m->missing
+         dout(10) << "activate sending " << m->log << " " << m->missing
                           << " to osd" << peer << endl;
-         
          //m->log.print(cout);
-         
          osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
+
+         // update our missing
+         if (peer_missing[peer].num_missing() == 0) {
+               dout(10) << "activate peer osd" << peer << " already clean, " << peer_info[peer] << endl;
+               assert(peer_info[peer].last_complete == info.last_update);
+               clean_set.insert(peer);
+         } else {
+               dout(10) << "activate peer osd" << peer << " " << peer_info[peer]
+                                << " missing " << peer_missing[peer] << endl;
+         }
+                 
        }
+
+       // discard unneeded peering state
+       //peer_log.clear(); // actually, do this carefully, in case peer() is called again.
        
        // all clean?
        if (is_all_clean()) {
          state_set(STATE_CLEAN);
-         dout(10) << "all replicas clean" << endl;
+         dout(10) << "activate all replicas clean" << endl;
          clean_replicas();     
        }
   }
@@ -839,7 +883,7 @@ bool PG::do_recovery()
        if (latest->is_update() &&
                !objects_pulling.count(latest->oid) &&
                missing.is_missing(latest->oid)) {
-         osd->pull(this, latest->oid, latest->version);
+         osd->pull(this, latest->oid);
          return true;
        }
        
index 9ea3600accb09864b6181d3e92eddcca35912b63..19c7b7d7494161caefa33aab762a70da69978dca 100644 (file)
@@ -90,19 +90,21 @@ public:
 
        epoch_t last_epoch_started;  // last epoch started.
        epoch_t last_epoch_finished; // last epoch finished.
-       epoch_t same_primary_since;  // upper bound: same primary at least back through this epoch.
-       epoch_t same_acker_since;    // upper bound: same acker at least back through this epoch.
-       epoch_t same_role_since;     // upper bound: i have held same role since
 
+       struct History {
+         epoch_t same_since;          // same acting set since
+         epoch_t same_primary_since;  // same primary at least back through this epoch.
+         epoch_t same_acker_since;    // same acker at least back through this epoch.
+         History() : same_since(0), same_primary_since(0), same_acker_since(0) {}
+       } history;
+       
        Info(pg_t p=0) : pgid(p), 
                                         log_backlog(false),
-                                        last_epoch_started(0), last_epoch_finished(0),
-                                        same_primary_since(0), same_acker_since(0), 
-                                        same_role_since(0) {}
+                                        last_epoch_started(0), last_epoch_finished(0) {}
        bool is_clean() { return last_update == last_complete; }
   };
-
-
+  
+  
   /** 
    * Query - used to ask a peer for information about a pg.
    *
@@ -116,16 +118,13 @@ public:
 
        int type;
        eversion_t version;
-       epoch_t same_primary_since;
-       epoch_t same_acker_since;
-
-       Query() : type(-1), same_primary_since(0), same_acker_since(0) {}
-       Query(int t, epoch_t ps, epoch_t as) : 
-         type(t), 
-         same_primary_since(ps), same_acker_since(as) {}
-       Query(int t, eversion_t v, epoch_t ps, epoch_t as) : 
-         type(t), version(v), 
-         same_primary_since(ps), same_acker_since(as) {}
+       Info::History history;
+
+       Query() : type(-1) {}
+       Query(int t, Info::History& h) : 
+         type(t), history(h) {}
+       Query(int t, eversion_t v, Info::History& h) : 
+         type(t), version(v), history(h) {}
   };
   
   
@@ -409,7 +408,7 @@ public:
 public:
   // any
   static const int STATE_ACTIVE = 1; // i am active.  (primary: replicas too)
-
+  
   // primary
   static const int STATE_CLEAN =  2;  // peers are complete, clean of stray replicas.
   static const int STATE_CRASHED = 4; // all replicas went down.
@@ -601,6 +600,12 @@ public:
 };
 
 
+
+inline ostream& operator<<(ostream& out, const PG::Info::History& h) 
+{
+  return out << h.same_since << "/" << h.same_primary_since << "/" << h.same_acker_since;
+}
+
 inline ostream& operator<<(ostream& out, const PG::Info& pgi) 
 {
   return out << "pginfo(" << hex << pgi.pgid << dec 
@@ -608,6 +613,7 @@ inline ostream& operator<<(ostream& out, const PG::Info& pgi)
                         << " (" << pgi.log_bottom << "," << pgi.last_update << "]"
                         << (pgi.log_backlog ? "+backlog":"")
                         << " e " << pgi.last_epoch_started << "/" << pgi.last_epoch_finished
+                        << " " << pgi.history
                         << ")";
 }
 
index 08860bd7a973654b86b6d09f9d39ffb904bd4a33..95a961587e7b6b0bceca953ebeb006c4504a3419 100644 (file)
@@ -107,51 +107,45 @@ void Objecter::scan_pgs(set<pg_t>& changed_pgs)
           i++) {
        pg_t pgid = i->first;
        PG& pg = i->second;
+       
+       // calc new.
+       vector<int> other;
+       osdmap->pg_to_acting_osds(pgid, other);
 
-       int oldu = pg.updater;
-       int oldr = pg.reader;
-       pg.calc(pgid, osdmap);
-
-       if (oldu != pg.updater ||
-               oldr != pg.reader) {
-         /*
-         if (oldu < 0) {
-               dout(10) << "scan_pgs pg " << hex << pgid << dec 
-                                << " (" << pg.active_tids << ")"
-                                << " updater " << oldu << " -> " << pg.updater
-                                << " (was crashed)"
-                                << endl;
-               //recovering_pgs.insert(pgid);
-         }
-         else if (osdmap->is_down(oldu)) {
-               dout(10) << "scan_pgs pg " << hex << pgid << dec 
-                                << " (" << pg.active_tids << ")"
-                                << " updater " << oldu << " -> " << pg.updater
-                                << " (updater went down)"
-                                << endl;
-               //down_pgs.insert(pgid);
-         } 
-         else {
-               dout(10) << "scan_pgs pg " << hex << pgid << dec 
-                                << " (" << pg.active_tids << ")"
-                                << " updater " << oldu << " -> " << pg.updater
-                                << " (primary changed)"
-                                << endl;
-         }
-         */
-         dout(10) << "scan_pgs pg " << hex << pgid << dec 
-                          << " (" << pg.active_tids << ")"
-                          << " updater " << oldu << " -> " << pg.updater
-                          << ", reader " << oldr << " -> " << pg.reader
-                          << endl;
-         changed_pgs.insert(pgid);
+       if (other == pg.acting) 
+         continue; // no change.
+       
+       other.swap(pg.acting);
+
+       if (g_conf.osd_rep == OSD_REP_PRIMARY) {
+         // same primary?
+         if (!other.empty() &&
+                 !pg.acting.empty() &&
+                 other[0] == pg.acting[0]) 
+               continue;
+       }
+       else if (g_conf.osd_rep == OSD_REP_SPLAY) {
+         // same primary and acker?
+         if (!other.empty() &&
+                 !pg.acting.empty() &&
+                 other[0] == pg.acting[0] &&
+                 other[other.size()-1] == pg.acting[pg.acting.size()-1]) 
+               continue;
        }
+       else if (g_conf.osd_rep == OSD_REP_CHAIN) {
+         // any change is significant.
+       }
+       
+       // changed significantly.
+       dout(10) << "scan_pgs pg " << hex << pgid << dec 
+                        << " (" << pg.active_tids << ")"
+                        << " " << other << " -> " << pg.acting
+                        << endl;
+       changed_pgs.insert(pgid);
   }
 }
 
-void Objecter::kick_requests(set<pg_t>& changed_pgs/*, 
-                                                        set<pg_t>& down_pgs,
-                                                        set<pg_t>& recovering_pgs*/) 
+void Objecter::kick_requests(set<pg_t>& changed_pgs) 
 {
   dout(10) << "kick_requests in pgs " << hex << changed_pgs << dec << endl;
 
@@ -274,11 +268,11 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex)
                   << " oid " << hex << ex.oid << dec  << " " << ex.start << "~" << ex.length
                   << " (" << ex.buffer_extents.size() << " buffer fragments)" 
                   << " pg " << hex << ex.pgid << dec
-                  << " osd" << pg.reader 
+                  << " osd" << pg.acker() 
                   << endl;
 
-  if (pg.reader >= 0) 
-       messenger->send_message(m, MSG_ADDR_OSD(pg.reader), 0);
+  if (pg.acker() >= 0) 
+       messenger->send_message(m, MSG_ADDR_OSD(pg.acker()), 0);
        
   // add to gather set
   rd->ops[last_tid] = ex;
@@ -517,7 +511,6 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
 {
   // find
   PG &pg = get_pg( ex.pgid );
-  //int osd = osdmap->get_pg_acting_primary( ex.pgid );
        
   // send
   tid_t tid;
@@ -566,10 +559,10 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
                   << "  oid " << hex << ex.oid << dec 
                   << " " << ex.start << "~" << ex.length 
                   << " pg " << hex << ex.pgid << dec 
-                  << " osd" << pg.updater 
+                  << " osd" << pg.primary()
                   << endl;
-  if (pg.updater >= 0)
-       messenger->send_message(m, MSG_ADDR_OSD(pg.updater), 0);
+  if (pg.primary() >= 0)
+       messenger->send_message(m, MSG_ADDR_OSD(pg.primary()), 0);
   
   return tid;
 }
@@ -601,7 +594,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
   PG &pg = get_pg( m->get_pg() );
 
   // ignore?
-  if (pg.reader != m->get_source().num()) {
+  if (pg.acker() != m->get_source().num()) {
        dout(7) << " ignoring ack|commit from non-acker" << endl;
        delete m;
        return;
index 902d270ed7fdc34d31aa58ec8ba7e3d676a264df..140bb58dc7efb5dc6e5e5c53801ce50a3f2629d8 100644 (file)
@@ -68,15 +68,7 @@ class Objecter {
        OSDWrite(bufferlist &b) : OSDModify(OSD_OP_WRITE), bl(b) {}
   };
 
-  /*
-  class OSDLock : public OSDModify {
-  public:
-       map<int,ObjectExtent> by_osd;
-       map<int,ObjectExtent>::iterator next;
-       OSDLock(int o) : OSDModify(o) {}
-  };
-  */
-
+  
 
  private:
   // pending ops
@@ -89,27 +81,32 @@ class Objecter {
    */
   class PG {
   public:
-       int updater;         // where i write
-       int reader;          // where i read, and expect acks from
+       vector<int> acting;
        set<tid_t>  active_tids; // active ops
-
-       PG() : updater(-1), reader(-1) {}
-
-       void calc(pg_t pgid, OSDMap *osdmap) {  // return true if change
-         updater = osdmap->get_pg_acting_primary(pgid);
+       
+       PG() {}
+       
+       // primary - where i write
+       int primary() {
+         if (acting.empty()) return -1;
+         return acting[0];
+       }
+       // acker - where i read, and receive acks from
+       int acker() {
+         if (acting.empty()) return -1;
          if (g_conf.osd_rep == OSD_REP_PRIMARY)
-               reader = updater;
-         else 
-               reader = osdmap->get_pg_acting_tail(pgid);
+               return acting[0];
+         else
+               return acting[acting.size()-1];
        }
   };
 
   hash_map<pg_t,PG> pg_map;
-
+  
   
   PG &get_pg(pg_t pgid) {
        if (!pg_map.count(pgid)) 
-         pg_map[pgid].calc(pgid, osdmap);
+         osdmap->pg_to_acting_osds(pgid, pg_map[pgid].acting);
        return pg_map[pgid];
   }
   void close_pg(pg_t pgid) {
@@ -117,8 +114,8 @@ class Objecter {
        assert(pg_map[pgid].active_tids.empty());
        pg_map.erase(pgid);
   }
-  void scan_pgs(set<pg_t>& chnaged_pgs);//, set<pg_t>& down_pgs);
-  void kick_requests(set<pg_t>& changed_pgs);//, set<pg_t>& down_pgs);
+  void scan_pgs(set<pg_t>& chnaged_pgs);
+  void kick_requests(set<pg_t>& changed_pgs);
        
 
  public: