]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: exclude lagging peers during backlog generation
authorSage Weil <sage@newdream.net>
Fri, 7 Aug 2009 22:37:27 +0000 (15:37 -0700)
committerSage Weil <sage@newdream.net>
Mon, 10 Aug 2009 20:14:31 +0000 (13:14 -0700)
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc

index b9ee9faac6aca1d5ca1d67592de12a2cba7277fc..fce332b6517ce8b5e86e71a619c2ba118b46e7cc 100644 (file)
@@ -2206,7 +2206,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
 
     dout(10) << *pg
             << " up " << oldup << " -> " << pg->up 
-            << " acting " << oldacting << " -> " << pg->acting 
+            << ", acting " << oldacting << " -> " << pg->acting 
             << ", role " << oldrole << " -> " << role << dendl; 
     
     // pg->on_*
@@ -2334,6 +2334,8 @@ void OSD::activate_map(ObjectStore::Transaction& t)
 
   clear_map_cache();  // we're done with it
   update_heartbeat_peers();
+
+  send_pg_temp();
 }
 
 
@@ -2740,7 +2742,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
     creating_pgs[pgid].parent = parent;
     creating_pgs[pgid].split_bits = split_bits;
     creating_pgs[pgid].acting.swap(acting);
-    calc_priors_during(pgid, created, history.same_primary_since, 
+    calc_priors_during(pgid, created, history.same_acting_since, 
                       creating_pgs[pgid].prior);
 
     // poll priors
@@ -3364,8 +3366,6 @@ void OSD::generate_backlog(PG *pg)
     goto out;
   }
 
-  assert(!pg->is_active());
-
   if (!pg->build_backlog_map(omap))
     goto out;
 
@@ -3380,10 +3380,10 @@ void OSD::generate_backlog(PG *pg)
     dout(10) << *pg << " generate_backlog aborting" << dendl;
     goto out2;
   }
-  assert(!pg->is_active());
 
   if (!pg->is_primary()) {
     dout(10) << *pg << "  sending info+missing+backlog to primary" << dendl;
+    assert(!pg->is_active());  // for now
     MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
     m->missing = pg->missing;
     m->log = pg->log;
index 07ddb19eb237aed3765861159fda371589d22a22..58bcd1357bd04400dd43f01a7cc310affacba6c3 100644 (file)
@@ -555,7 +555,9 @@ bool PG::build_backlog_map(map<eversion_t,Log::Entry>& omap)
     Log::Entry e;
     e.soid = poid;
     bufferlist bv;
-    osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+    int r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+    if (r < 0)
+      continue;  // musta just been deleted!
     object_info_t oi(bv);
     e.version = oi.version;
     e.prior_version = oi.prior_version;
@@ -593,6 +595,7 @@ void PG::assemble_backlog(map<eversion_t,Log::Entry>& omap)
   
   assert(!log.backlog);
   log.backlog = true;
+  info.log_backlog = true;
 
   /*
    * note that we don't create prior_version backlog entries for
@@ -660,6 +663,7 @@ void PG::drop_backlog()
 
   assert(log.backlog);
   log.backlog = false;
+  info.log_backlog = false;
   
   while (!log.log.empty()) {
     Log::Entry &e = *log.log.begin();
@@ -860,9 +864,13 @@ void PG::build_prior()
 
   clear_prior();
 
-  // current nodes, of course.
-  for (unsigned i=1; i<up.size(); i++)
-    prior_set.insert(up[i]);
+  // current up and/or acting nodes, of course.
+  for (unsigned i=0; i<up.size(); i++)
+    if (up[i] != osd->whoami)
+      prior_set.insert(up[i]);
+  for (unsigned i=0; i<acting.size(); i++)
+    if (acting[i] != osd->whoami)
+      prior_set.insert(acting[i]);
 
   // and prior PG mappings.  move backwards in time.
   state_clear(PG_STATE_CRASHED);
@@ -1009,10 +1017,56 @@ void PG::clear_primary_state()
   osd->recovery_wq.dequeue(this);
 }
 
+bool PG::choose_acting(int newest_update_osd)
+{
+  vector<int> want = up;
+  
+  Info& newest = (newest_update_osd == osd->whoami) ? info : peer_info[newest_update_osd];
+  Info& oprimi = (want[0] == osd->whoami) ? info : peer_info[want[0]];
+  if (newest_update_osd != want[0] &&
+      oprimi.last_update < newest.log_tail && !newest.log_backlog) {
+    // up[0] needs a backlog to catch up
+    // make newest_update_osd primary instead?
+    for (unsigned i=1; i<want.size(); i++)
+      if (want[i] == newest_update_osd) {
+       dout(10) << "choose_acting  up[0] osd" << want[0] << " needs backlog to catch up, making "
+                << want[i] << " primary" << dendl;
+       want[0] = want[i];
+       want[i] = up[0];
+       break;
+      }
+  }
+  // exclude peers who need backlogs to catch up?
+  Info& primi = (want[0] == osd->whoami) ? info : peer_info[want[0]];
+  for (vector<int>::iterator p = want.begin() + 1; p != want.end(); ) {
+    Info& pi = (*p == osd->whoami) ? info : peer_info[*p];
+    if (pi.last_update < primi.log_tail && !primi.log_backlog) {
+      dout(10) << "choose_acting  osd" << *p << " needs primary backlog to catch up" << dendl;
+      want.erase(p);
+    } else {
+      dout(10) << "choose_acting  osd" << *p << " can catch up with osd" << want[0] << " log" << dendl;
+      p++;
+    }
+  }
+  if (want != acting) {
+    dout(10) << "choose_acting  want " << want << " != acting " << acting
+            << ", requesting pg_temp change" << dendl;
+    if (want == up) {
+      vector<int> empty;
+      osd->queue_want_pg_temp(info.pgid, empty);
+    } else
+      osd->queue_want_pg_temp(info.pgid, want);
+    return false;
+  }
+  dout(10) << "choose_acting want " << want << " (== acting)" << dendl;
+  return true;
+}
 
 // if false, stop.
 bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map)
 {
+  dout(10) << "recover_master_log" << dendl;
+
   // -- query info from everyone in prior_set.
   bool missing_info = false;
   for (set<int>::iterator it = prior_set.begin();
@@ -1063,11 +1117,13 @@ bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map)
       newest_update = it->second.last_update;
       newest_update_osd = it->first;
     }
-    if (is_up(it->first)) {
+    if (is_acting(it->first)) {
       if (it->second.last_update < oldest_update) {
         oldest_update = it->second.last_update;
        oldest_who = it->first;
       }
+    }
+    if (is_up(it->first)) {
       if (it->second.last_complete < min_last_complete_ondisk)
         min_last_complete_ondisk = it->second.last_complete;
     }
@@ -1076,41 +1132,8 @@ bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map)
     newest_update_osd = osd->whoami;
   
   // -- decide what acting set i want, based on state of up set
-  vector<int> want = up;
-  if (newest_update_osd != osd->whoami &&
-      log.head < peer_info[newest_update_osd].log_tail) {
-    // up[0] needs a backlog to catch up
-    // make newest_update_osd primary instead?
-    for (unsigned i=1; i<want.size(); i++)
-      if (want[i] == newest_update_osd) {
-       dout(10) << " osd" << want[0] << " needs backlog to catch up, making " << want[i] << " primary" << dendl;
-       want[0] = want[i];
-       want[i] = up[0];
-       break;
-      }
-  }
-  // exclude peers who need backlogs to catch up?
-  Info& primi = (want[0] == osd->whoami) ? info : peer_info[want[0]];
-  for (vector<int>::iterator p = want.begin() + 1; p != want.end(); ) {
-    Info& pi = (*p == osd->whoami) ? info : peer_info[*p];
-    if (pi.last_update < primi.log_tail) {
-      dout(10) << " osd" << *p << " needs primary's backlog to catch up" << dendl;
-      want.erase(p);
-    } else {
-      dout(10) << " osd" << *p << " can catch up with osd" << want[0] << " log" << dendl;
-      p++;
-    }
-  }
-  if (want != acting) {
-    dout(10) << " want up " << want << " != acting " << acting << ", requesting pg_temp change" << dendl;
-    if (want == up) {
-      vector<int> empty;
-      osd->queue_want_pg_temp(info.pgid, empty);
-    } else
-      osd->queue_want_pg_temp(info.pgid, want);
+  if (!choose_acting(newest_update_osd))
     return false;
-  }
-  dout(10) << " want " << want << " (== acting)" << dendl;
 
   // gather log(+missing) from that person!
   if (newest_update_osd != osd->whoami) {
@@ -1179,7 +1202,7 @@ void PG::peer(ObjectStore::Transaction& t,
               map< int, map<pg_t,Query> >& query_map,
              map<int, MOSDPGInfo*> *activator_map)
 {
-  dout(10) << "peer acting is " << acting << dendl;
+  dout(10) << "peer up " << up << ", acting " << acting << dendl;
 
   if (!is_active())
     state_set(PG_STATE_PEERING);
@@ -1190,9 +1213,14 @@ void PG::peer(ObjectStore::Transaction& t,
   dout(10) << "peer prior_set is " << prior_set << dendl;
   
   
-  if (!have_master_log)
+  if (!have_master_log) {
     if (!recover_master_log(query_map))
       return;
+  } else if (up != acting) {
+    // are we done generating backlog(s)?
+    if (!choose_acting(osd->whoami))
+      return;
+  }
 
 
   // -- do i need to generate backlog for any of my peers?
@@ -1392,7 +1420,8 @@ void PG::activate(ObjectStore::Transaction& t,
   clear_prior();
 
   // if we are building a backlog, cancel it!
-  osd->cancel_generate_backlog(this);
+  if (up == acting)
+    osd->cancel_generate_backlog(this);
 
   // write pg info, log
   write_info(t);
index f7265d765952047cc2585b07723d0d4e170d1ce1..2566b40c70ba2232f697f6b468d3ef5162905125 100644 (file)
@@ -119,14 +119,17 @@ public:
        ::encode(last_epoch_started, bl);
        ::encode(same_acting_since, bl);
        ::encode(same_up_since, bl);
-       //::encode(same_primary_since, bl);
+       ::encode(same_primary_since, bl);
       }
-      void decode(bufferlist::iterator &bl) {
+      void decode(bufferlist::iterator &bl, version_t v) {
        ::decode(epoch_created, bl);
        ::decode(last_epoch_started, bl);
        ::decode(same_acting_since, bl);
-       ::decode(same_up_since, bl);
-       //::decode(same_primary_since, bl);
+       if (v >= 20)
+         ::decode(same_up_since, bl);
+       else
+         same_up_since = same_acting_since;
+       ::decode(same_primary_since, bl);
       }
     } history;
     
@@ -160,11 +163,11 @@ public:
       ::decode(log_tail, bl);
       ::decode(log_backlog, bl);
       ::decode(stats, bl);
-      history.decode(bl);
+      history.decode(bl, v);
       ::decode(snap_trimq, bl);
     }
   };
-  WRITE_CLASS_ENCODER(Info::History)
+  //WRITE_CLASS_ENCODER(Info::History)
   WRITE_CLASS_ENCODER(Info)
 
   
@@ -199,7 +202,7 @@ public:
     void decode(bufferlist::iterator &bl) {
       ::decode(type, bl);
       ::decode(since, bl);
-      history.decode(bl);
+      history.decode(bl, ~0ull);
     }
   };
   WRITE_CLASS_ENCODER(Query)
@@ -677,7 +680,7 @@ public:
  public:
   vector<int> up, acting;
   map<int,eversion_t> peer_last_complete_ondisk;
-  eversion_t  min_last_complete_ondisk;  // min over last_complete_ondisk, peer_last_complete_ondisk
+  eversion_t  min_last_complete_ondisk;  // up: min over last_complete_ondisk, peer_last_complete_ondisk
   eversion_t  pg_trim_to;
 
   // [primary only] content recovery state
@@ -690,7 +693,7 @@ public:
   bool        need_up_thru;
   set<int>    stray_set;   // non-acting osds that have PG data.
   set<int>    uptodate_set;  // current OSDs that are uptodate
-  eversion_t  oldest_update; // lowest (valid) last_update in active set
+  eversion_t  oldest_update; // acting: lowest (valid) last_update in active set
   map<int,Info>        peer_info;   // info from peers (stray or prior)
   set<int>             peer_info_requested;
   map<int, Missing>    peer_missing;
@@ -737,7 +740,7 @@ public:
   bool is_prior(int osd) const { return prior_set.count(osd); }
   bool is_stray(int osd) const { return stray_set.count(osd); }
   
-  bool is_all_uptodate() const { return uptodate_set.size() == acting.size(); }
+  bool is_all_uptodate() const { return uptodate_set.size() == acting.size() && up == acting; }
 
   void generate_past_intervals();
   void trim_past_intervals();
@@ -773,6 +776,7 @@ public:
   
   void trim_write_ahead();
 
+  bool choose_acting(int newest_update_osd);
   bool recover_master_log(map< int, map<pg_t,Query> >& query_map);
   void peer(ObjectStore::Transaction& t, 
            map< int, map<pg_t,Query> >& query_map,
@@ -918,7 +922,7 @@ public:
   virtual void on_shutdown() = 0;
 };
 
-WRITE_CLASS_ENCODER(PG::Info::History)
+//WRITE_CLASS_ENCODER(PG::Info::History)
 WRITE_CLASS_ENCODER(PG::Info)
 WRITE_CLASS_ENCODER(PG::Query)
 WRITE_CLASS_ENCODER(PG::Missing::item)
@@ -1001,11 +1005,11 @@ inline ostream& operator<<(ostream& out, const PG::Interval& i)
 
 inline ostream& operator<<(ostream& out, const PG& pg)
 {
-  out << "pg[" << pg.info 
-      << " " << pg.acting;
-  out << " r=" << pg.get_role();
+  out << "pg[" << pg.info
+      << " " << pg.up;
   if (pg.acting != pg.up)
-    out << " up=" << pg.up;
+    out << "/" << pg.acting;
+  out << " r=" << pg.get_role();
   
   if (pg.recovery_ops_active)
     out << " rops=" << pg.recovery_ops_active;
index 0fc2ebc0d7a40ccf511201984fc0855f9e036dcf..533049cb759a5e423f9945ec4484abdeac8ab092 100644 (file)
@@ -57,18 +57,18 @@ static const int LOAD_HYBRID     = 3;
 
 bool ReplicatedPG::same_for_read_since(epoch_t e)
 {
-  return (e >= info.history.same_acting_since);
+  return (e >= info.history.same_primary_since);
 }
 
 bool ReplicatedPG::same_for_modify_since(epoch_t e)
 {
-  return (e >= info.history.same_acting_since);
+  return (e >= info.history.same_primary_since);
 }
 
 bool ReplicatedPG::same_for_rep_modify_since(epoch_t e)
 {
   // check osd map: same set, or primary+acker?
-  return e >= info.history.same_acting_since;
+  return e >= info.history.same_primary_since;
 }
 
 // ====================