]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: only update last_epoch_started after all replicas commit peering results
authorSage Weil <sage.weil@dreamhost.com>
Wed, 16 Mar 2011 05:18:45 +0000 (22:18 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Wed, 16 Mar 2011 05:18:45 +0000 (22:18 -0700)
The PG info.history.last_epoch_started is important because it bounds how
far back in time we think we need to look in order to fully recover the
contents of the PG.  That's because every replica commits the PG peering
result (the info and pg log) when it activates.

In order for this to work properly, we can only advance last_epoch_started
_after_ the peer results are stable on disk on all replicas.  Otherwise a
poorly timed failure (or set of failures) could lose the PG peer results
and we wouldn't go back far enough in time to find them.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h

index 7fc43db2b571a89fcba18cfae39860826cb1d707..a503091769ce0d98b12a78ef89f079caeb80688a 100644 (file)
@@ -4233,9 +4233,21 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
     *_dout << "missing: " << *missing;
   *_dout << dendl;
 
-  unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
-  pg->info.history.merge(info.history);
-  reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+  // don't update history (yet) if we are active and primary; the replica
+  // may be telling us they have activated (and committed) but we can't
+  // share that until _everyone_ does the same.
+  if (pg->is_active() && pg->is_primary() && pg->is_acting(from) &&
+      pg->info.history.last_epoch_started < pg->info.history.same_acting_since &&
+      info.history.last_epoch_started >= pg->info.history.same_acting_since) {
+    dout(10) << " peer osd" << from << " activated and committed" << dendl;
+    pg->peer_activated.insert(from);
+    if (pg->peer_activated.size() == pg->acting.size())
+      pg->all_activated_and_committed();
+  } else {
+    unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+    pg->info.history.merge(info.history);
+    reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+  }
 
   // dump log
   dout(15) << *pg << " my log = ";
index 6417be3d3646868f94ac70982ed98c0394b01986..8c32f3ef1b0e972bd543b6159e28c494efe754a3 100644 (file)
@@ -1312,6 +1312,7 @@ void PG::clear_primary_state()
   peer_missing.clear();
   need_up_thru = false;
   peer_last_complete_ondisk.clear();
+  peer_activated.clear();
   min_last_complete_ondisk = eversion_t();
   stray_purged.clear();
   might_have_unfound.clear();
@@ -1776,6 +1777,15 @@ void PG::build_might_have_unfound()
   dout(15) << __func__ << ": built " << might_have_unfound << dendl;
 }
 
+struct C_PG_ActivateCommitted : public Context {
+  PG *pg;
+  epoch_t epoch;
+  C_PG_ActivateCommitted(PG *p, epoch_t e) : pg(p), epoch(e) {}
+  void finish(int r) {
+    pg->_activate_committed(epoch);
+  }
+};
+
 void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
                  map< int, map<pg_t,Query> >& query_map,
                  map<int, MOSDPGInfo*> *activator_map)
@@ -1807,10 +1817,7 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
       build_might_have_unfound();
     }
   }
-
-  info.history.last_epoch_started = osd->osdmap->get_epoch();
-  trim_past_intervals();
-  
   if (role == 0) {    // primary state
     last_update_ondisk = info.last_update;
     min_last_complete_ondisk = eversion_t(0,0);  // we don't know (yet)!
@@ -1835,6 +1842,9 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
   // clean up stray objects
   clean_up_local(t); 
 
+  // find out when we commit
+  tfin.push_back(new C_PG_ActivateCommitted(this, info.history.same_acting_since));
+  
   // initialize snap_trimq
   if (is_primary()) {
     snap_trimq = pool->cached_removed_snaps;
@@ -1997,6 +2007,48 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
   osd->take_waiters(waiting_for_active);
 }
 
+void PG::_activate_committed(epoch_t e)
+{
+  if (e < info.history.same_acting_since) {
+    dout(10) << "_activate_committed " << e << ", that was an old interval" << dendl;
+    return;
+  }
+
+  if (is_primary()) {
+    peer_activated.insert(osd->whoami);
+    dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated << dendl;
+    if (peer_activated.size() == acting.size())
+      all_activated_and_committed();
+  } else {
+    dout(10) << "_activate_committed " << e << " telling primary" << dendl;
+    MOSDPGInfo *m = new MOSDPGInfo(osd->osdmap->get_epoch());
+    PG::Info i = info;
+    i.history.last_epoch_started = e;
+    m->pg_info.push_back(i);
+    osd->cluster_messenger->send_message(m, osd->osdmap->get_cluster_inst(acting[0]));
+  }
+}
+
+/*
+ * update info.history.last_epoch_started ONLY after we and all
+ * replicas have activated AND committed the activate transaction
+ * (i.e. the peering results are stable on disk).
+ */
+void PG::all_activated_and_committed()
+{
+  dout(10) << "all_activated_and_committed" << dendl;
+  assert(is_primary());
+  assert(peer_activated.size() == acting.size());
+
+  info.history.last_epoch_started = osd->osdmap->get_epoch();
+  share_pg_info();
+
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  write_info(*t);
+  int tr = osd->store->queue_transaction(&osr, t);
+  assert(tr == 0);
+}
+
 void PG::queue_snap_trim()
 {
   if (osd->snap_trim_wq.queue(this))
index c93956cae32219ebb2ef764d5b13cefd8ab50cab..7810cb1ba6f3bab8dd67306a11a4e1af7b4740f1 100644 (file)
@@ -759,6 +759,7 @@ public:
   set<int>             peer_backlog_requested;
   set<int>             peer_missing_requested;
   set<int>             stray_purged;  // i deleted these strays; ignore racing PGInfo from them
+  set<int>             peer_activated;
 
   // primary-only, recovery-only state
   set<int>             might_have_unfound;  // These osds might have objects on them
@@ -863,6 +864,8 @@ public:
   void activate(ObjectStore::Transaction& t, list<Context*>& tfin,
                map< int, map<pg_t,Query> >& query_map,
                map<int, MOSDPGInfo*> *activator_map=0);
+  void _activate_committed(epoch_t e);
+  void all_activated_and_committed();
 
   bool have_unfound() const { 
     return missing.num_missing() > missing_loc.size();