]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD,PG: Peering refactor
authorSamuel Just <samuel.just@dreamhost.com>
Tue, 3 May 2011 00:03:56 +0000 (17:03 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Tue, 3 May 2011 17:19:45 +0000 (10:19 -0700)
Previously, peering was handled by a defacto state machine in do_peer
and related methods.  Peering state will now be encapsulated in
RecoveryState, which uses boost::state_chart internally to enforce an
explicit state machine abstraction.  OSD::handle_pg_* pass off to
PG::handle_*, which pass messages to the state machine.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.h

index 4c23be8897d0d4a99fe71de9e95a1d4aebf3d203..75cace88af105eac98d2988ac35c1dcb78fced08 100644 (file)
@@ -1168,6 +1168,11 @@ PG *OSD::get_or_create_pg(const PG::Info& info, epoch_t epoch, int from, int& cr
     
     // kick any waiters
     wake_pg_waiters(pg->info.pgid);
+
+    map<int, map<pg_t, PG::Query> > query_map;
+    PG::RecoveryCtx rctx(&query_map, 0, 0, 0, 0); // GetInfo only needs a query_map
+    pg->handle_create(&rctx);
+    do_queries(query_map);
   } else {
     // already had it.  did the mapping change?
     pg = _lookup_lock_pg(info.pgid);
@@ -3203,25 +3208,8 @@ void OSD::advance_map(ObjectStore::Transaction& t)
     PG *pg = it->second;
 
     pg->lock();
-    if (pg->handle_advance_map(*osdmap, *lastmap)) {
-      pg->unlock();
-      continue;
-    }
-       
-    // make sure we clear out any pg_temp change requests
-    pg_temp_wanted.erase(pgid);
-    pg->prior_set.reset(NULL);
-
-    
-    pg->cancel_recovery();
-
-
-    // sanity check pg_temp
-    if (pg->acting.empty() && pg->up.size() && pg->up[0] == whoami) {
-      dout(10) << *pg << " acting empty, but i am up[0], clearing pg_temp" << dendl;
-      queue_want_pg_temp(pg->info.pgid, pg->acting);
-    }
-
+    dout(10) << "Scanning pg " << *pg << dendl;
+    pg->handle_advance_map(*osdmap, *lastmap, 0);
     pg->unlock();
   }
 }
@@ -3236,8 +3224,6 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
   map< int, map<pg_t,PG::Query> > query_map;    // peer -> PG -> get_summary_since
   map<int,MOSDPGInfo*> info_map;  // peer -> message
 
-  epoch_t up_thru = osdmap->get_up_thru(whoami);
-  
   int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
 
   epoch_t oldest_last_clean = osdmap->get_epoch();
@@ -3248,7 +3234,6 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
        it++) {
     PG *pg = it->second;
     pg->lock();
-    pg->check_recovery_op_pulls(osdmap);
 
     if (pg->is_primary())
       num_pg_primary++;
@@ -3260,44 +3245,16 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
     if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean)
       oldest_last_clean = pg->info.history.last_epoch_clean;
     
-    if (g_conf.osd_check_for_log_corruption)
-      pg->check_log_for_corruption(store);
-
-    if (pg->is_active() && pg->is_primary() &&
-       (pg->missing.num_missing() > pg->missing_loc.size())) {
-      if (pg->all_unfound_are_lost(osdmap)) {
-       pg->mark_all_unfound_as_lost(t);
-      }
-    }
-
     if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
       //pool is deleted!
       queue_pg_for_deletion(pg);
       pg->unlock();
       continue;
     }
-    if (pg->is_active()) {
-      // i am active
-      if (pg->is_primary() &&
-         !pg->snap_trimq.empty() &&
-         pg->is_clean())
-       pg->queue_snap_trim();
-    }
-    else if (pg->is_primary() &&
-            !pg->is_active()) {
-      // i am (inactive) primary
-      if ((!pg->is_peering() && !pg->is_replay()) || 
-         (pg->need_up_thru && up_thru >= pg->info.history.same_acting_since))
-       pg->do_peer(t, tfin, query_map, &info_map);
-    }
-    else if (pg->is_stray() &&
-            pg->get_primary() >= 0) {
-      // i am residual|replica
-      notify_list[pg->get_primary()].push_back(pg->info);
-    }
-    if (pg->is_primary())
-      pg->update_stats();
 
+    PG::RecoveryCtx rctx(&query_map, &info_map, &notify_list, &tfin, &t);
+    pg->handle_activate_map(&rctx);
+    
     pg->unlock();
   }  
 
@@ -3652,7 +3609,9 @@ void OSD::kick_pg_split_queue()
 
       wake_pg_waiters(pg->info.pgid);
 
-      pg->do_peer(*t, fin->contexts, query_map, &info_map);
+      PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+      pg->handle_create(&rctx);
+
       pg->update_stats();
       pg->unlock();
       created++;
@@ -3869,7 +3828,8 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
       creating_pgs.erase(pgid);
 
       wake_pg_waiters(pg->info.pgid);
-      pg->do_peer(*t, fin->contexts, query_map, &info_map);
+      PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+      pg->handle_create(&rctx);
       pg->update_stats();
 
       int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
@@ -3975,39 +3935,8 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
     if (!pg)
       continue;
 
-    if (pg->peer_info.count(from) &&
-       pg->peer_info[from].last_update == it->last_update) {
-      dout(10) << *pg << " got dup osd" << from << " info " << *it << ", identical to ours" << dendl;
-    } else if (pg->peer_info.count(from) &&
-              pg->is_active()) {
-      dout(10) << *pg << " got dup osd" << from << " info " << *it
-              << " but pg is active, keeping our info " << pg->peer_info[from]
-              << dendl;
-    } else {
-      dout(10) << *pg << " got osd" << from << " info " << *it << dendl;
-      pg->peer_info[from] = *it;
-      pg->might_have_unfound.insert(from);
-
-      unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
-      pg->info.history.merge(it->history);
-      reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
-
-      // stray?
-      if (!pg->is_acting(from)) {
-       dout(10) << *pg << " osd" << from << " has stray content: " << *it << dendl;
-       pg->stray_set.insert(from);
-       pg->state_clear(PG_STATE_CLEAN);
-      }
-      
-      pg->do_peer(*t, fin->contexts, query_map, &info_map);
-      pg->update_stats();
-    }
-
-    if (pg->is_active() && pg->have_unfound()) {
-      // Make sure we've requested MISSING information from every OSD
-      // we know about.
-      pg->discover_all_missing(query_map);
-    }
+    PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+    pg->handle_notify(from, *it, &rctx);
 
     int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
     assert(tr == 0);
@@ -4025,140 +3954,6 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
   m->put();
 }
 
-
-
-/** PGLog
- * from non-primary to primary
- *  includes log and info
- * from primary to non-primary
- *  includes log for use in recovery
- * NOTE: called with opqueue active.
- */
-
-void OSD::_process_pg_info(epoch_t epoch, int from,
-                          PG::Info &info, 
-                          PG::Log &log, 
-                          PG::Missing *missing,
-                          map< int, map<pg_t,PG::Query> >& query_map,
-                          map<int, MOSDPGInfo*>* info_map,
-                          int& created)
-{
-  ObjectStore::Transaction *t;
-  C_Contexts *fin;  
-  PG *pg;
-
-  pg = get_or_create_pg(info, epoch, from, created, false, &t, &fin);
-  if (!pg)
-    return;
-
-  dout(10) << *pg << ": " << __func__ << " info: " << info << ", ";
-  if (log.empty())
-    *_dout << "(log omitted)";
-  else
-    *_dout << "log: " << log;
-  *_dout << ", ";
-  if (!missing)
-    *_dout << "(missing omitted)";
-  else
-    *_dout << "missing: " << *missing;
-  *_dout << dendl;
-
-  // don't update history (yet) if we are active and primary; the replica
-  // may be telling us they have activated (and committed) but we can't
-  // share that until _everyone_ does the same.
-  if (pg->is_active() && pg->is_primary() && pg->is_acting(from) &&
-      pg->info.history.last_epoch_started < pg->info.history.same_acting_since &&
-      info.history.last_epoch_started >= pg->info.history.same_acting_since) {
-    dout(10) << " peer osd" << from << " activated and committed" << dendl;
-    pg->peer_activated.insert(from);
-    if (pg->peer_activated.size() == pg->acting.size())
-      pg->all_activated_and_committed();
-  } else {
-    unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
-    pg->info.history.merge(info.history);
-    reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
-  }
-
-  // dump log
-  dout(15) << *pg << " my log = ";
-  pg->log.print(*_dout);
-  *_dout << std::endl;
-  if (!log.empty()) {
-    *_dout << *pg << " osd" << from << " log = ";
-    log.print(*_dout);
-  }
-  *_dout << dendl;
-
-  if (pg->is_primary()) {
-    // i am PRIMARY
-    if (pg->is_active())  {
-      // PG is ACTIVE
-      if (!pg->is_clean()) {
-       dout(10) << *pg << " searching osd" << from << " log for unfound items." << dendl;
-       pg->search_for_missing(info, missing, from);
-       if (pg->have_unfound()) {
-         // Make sure we've requested MISSING information from every OSD
-         // we know about.
-         pg->discover_all_missing(query_map);
-       }
-       queue_for_recovery(pg);  // in case we found something.
-      }
-    }
-    else if (missing) {
-      // PG is INACTIVE
-      pg->proc_replica_log(*t, info, log, *missing, from);
-      
-      // peer
-      pg->do_peer(*t, fin->contexts, query_map, info_map);
-      pg->update_stats();
-    }
-  } else if (!pg->info.dne()) {
-    if (!pg->is_active()) {
-      // INACTIVE REPLICA
-      assert(from == pg->acting[0]);
-      pg->merge_log(*t, info, log, from);
-
-      // We should have the right logs before activating.
-      assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
-      assert(pg->log.head == pg->info.last_update);
-
-      pg->activate(*t, fin->contexts, query_map, info_map);
-    } else {
-      // ACTIVE REPLICA
-      assert(pg->is_replica());
-
-      // just update our stats
-      dout(10) << *pg << " writing updated stats" << dendl;
-      pg->info.stats = info.stats;
-
-      // Handle changes to purged_snaps
-      interval_set<snapid_t> p;
-      p.union_of(info.purged_snaps, pg->info.purged_snaps);
-      p.subtract(pg->info.purged_snaps);
-      pg->info.purged_snaps = info.purged_snaps;
-      if (!p.empty()) {
-       dout(10) << " purged_snaps " << pg->info.purged_snaps
-                << " -> " << info.purged_snaps
-                << " removed " << p << dendl;
-       pg->adjust_local_snaps(*t, p);
-      }
-    }
-    
-    pg->write_info(*t);
-    
-    if (!log.empty()) {
-      dout(10) << *pg << ": inactive replica merging new PG log entries" << dendl;
-      pg->merge_log(*t, info, log, from);
-    }
-  }
-
-  int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
-  assert(tr == 0);
-
-  pg->unlock();
-}
-
-
 void OSD::handle_pg_log(MOSDPGLog *m) 
 {
   dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
@@ -4167,17 +3962,29 @@ void OSD::handle_pg_log(MOSDPGLog *m)
     return;
 
   int from = m->get_source().num();
-  int created = 0;
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
 
+  int created = 0;
+  ObjectStore::Transaction *t;
+  C_Contexts *fin;  
+  PG *pg = get_or_create_pg(m->info, m->get_epoch(), 
+                           from, created, false, &t, &fin);
+  if (!pg)
+    return;
+
   map< int, map<pg_t,PG::Query> > query_map;
-  _process_pg_info(m->get_epoch(), from, 
-                  m->info, m->log, &m->missing, query_map, 0,
-                  created);
+  map< int, MOSDPGInfo* > info_map;
+  PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+  pg->handle_log(from, m, &rctx);
+  pg->unlock();
   do_queries(query_map);
+  do_infos(info_map);
+
+  int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
+  assert(!tr);
+
   if (created)
     update_heartbeat_peers();
-
   m->put();
 }
 
@@ -4190,18 +3997,28 @@ void OSD::handle_pg_info(MOSDPGInfo *m)
 
   int from = m->get_source().num();
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
+  map< int, MOSDPGInfo* > info_map;
 
-  PG::Log empty_log;
-  map<int,MOSDPGInfo*> info_map;
   int created = 0;
-  map< int, map<pg_t,PG::Query> > query_map;
 
   for (vector<PG::Info>::iterator p = m->pg_info.begin();
        p != m->pg_info.end();
-       ++p) 
-    _process_pg_info(m->get_epoch(), from, *p, empty_log, NULL, query_map, &info_map, created);
+       ++p) {
+    ObjectStore::Transaction *t = 0;
+    C_Contexts *fin = 0;
+    PG *pg = get_or_create_pg(*p, m->get_epoch(), 
+                             from, created, false, &t, &fin);
+    PG::RecoveryCtx rctx(0, &info_map, 0, &fin->contexts, t);
+    if (!pg)
+      continue;
+    pg->handle_info(from, *p, &rctx);
+
+    int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
+    assert(!tr);
+
+    pg->unlock();
+  }
 
-  do_queries(query_map);
   do_infos(info_map);
   if (created)
     update_heartbeat_peers();
@@ -4255,6 +4072,8 @@ void OSD::handle_pg_trim(MOSDPGTrim *m)
 
 void OSD::handle_pg_missing(MOSDPGMissing *m)
 {
+  assert(0); // MOSDPGMissing is fantastical
+#if 0
   dout(7) << __func__  << " " << *m << " from " << m->get_source() << dendl;
 
   if (!require_osd_peer(m))
@@ -4267,13 +4086,14 @@ void OSD::handle_pg_missing(MOSDPGMissing *m)
   map< int, map<pg_t,PG::Query> > query_map;
   PG::Log empty_log;
   int created = 0;
-  _process_pg_info(m->get_epoch(), from, m->info,
+  _pro-cess_pg_info(m->get_epoch(), from, m->info, //misspelling added to prevent erroneous finds
                   empty_log, &m->missing, query_map, NULL, created);
   do_queries(query_map);
   if (created)
     update_heartbeat_peers();
 
   m->put();
+#endif
 }
 
 /** PGQuery
@@ -4292,7 +4112,6 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
   
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
 
-  int created = 0;
   map< int, vector<PG::Info> > notify_list;
   
   for (map<pg_t,PG::Query>::iterator it = m->pg_list.begin();
@@ -4322,15 +4141,15 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
       PG::Info empty(pgid);
       notify_list[from].push_back(empty);
       continue;
-    } else {
-      pg = _lookup_lock_pg(pgid);
-      if (m->get_epoch() < pg->info.history.same_acting_since) {
-        dout(10) << *pg << " handle_pg_query changed in "
-                 << pg->info.history.same_acting_since
-                 << " (msg from " << m->get_epoch() << ")" << dendl;
-       pg->unlock();
-        continue;
-      }
+    }
+
+    pg = _lookup_lock_pg(pgid);
+    if (m->get_epoch() < pg->info.history.same_acting_since) {
+      dout(10) << *pg << " handle_pg_query changed in "
+              << pg->info.history.same_acting_since
+              << " (msg from " << m->get_epoch() << ")" << dendl;
+      pg->unlock();
+      continue;
     }
 
     if (pg->deleting) {
@@ -4352,56 +4171,14 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
     reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
 
     // ok, process query!
-    assert(!pg->acting.empty());
-    assert(from == pg->acting[0]);
-
-    if (it->second.type == PG::Query::INFO) {
-      // info
-      dout(10) << *pg << " sending info" << dendl;
-      notify_list[from].push_back(pg->info);
-    } else {
-      if (it->second.type == PG::Query::BACKLOG &&
-         !pg->log.backlog) {
-       dout(10) << *pg << " requested info+missing+backlog - queueing for backlog" << dendl;
-       queue_generate_backlog(pg);
-      } else {
-       MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), pg->info);
-       mlog->missing = pg->missing;
-       
-       // primary -> other, when building master log
-       if (it->second.type == PG::Query::LOG) {
-         dout(10) << *pg << " sending info+missing+log since " << it->second.since
-                  << dendl;
-         mlog->log.copy_after(pg->log, it->second.since);
-       }
-       
-       if (it->second.type == PG::Query::BACKLOG) {
-         dout(10) << *pg << " sending info+missing+backlog" << dendl;
-         assert(pg->log.backlog);
-         mlog->log = pg->log;
-       } 
-       else if (it->second.type == PG::Query::FULLLOG) {
-         dout(10) << *pg << " sending info+missing+full log" << dendl;
-         mlog->log.copy_non_backlog(pg->log);
-       }
-       
-       dout(10) << *pg << " sending " << mlog->log << " " << mlog->missing << dendl;
-       //m->log.print(cout);
-       
-       _share_map_outgoing(osdmap->get_cluster_inst(from));
-       cluster_messenger->send_message(mlog, m->get_connection());
-      }
-    }    
-
+    PG::RecoveryCtx rctx(0, 0, &notify_list, 0, 0);
+    pg->handle_query(from, it->second, &rctx);
     pg->unlock();
   }
   
   do_notifies(notify_list);   
 
   m->put();
-
-  if (created)
-    update_heartbeat_peers();
 }
 
 
@@ -4627,6 +4404,13 @@ void OSD::generate_backlog(PG *pg)
   pg->lock();
   dout(10) << *pg << " generate_backlog" << dendl;
 
+  int tr;
+  map< int, map<pg_t,PG::Query> > query_map;
+  map< int, MOSDPGInfo* > info_map;
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  C_Contexts *fin = new C_Contexts;
+  PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+
   if (!pg->generate_backlog_epoch) {
     dout(10) << *pg << " generate_backlog was canceled" << dendl;
     goto out;
@@ -4647,27 +4431,14 @@ void OSD::generate_backlog(PG *pg)
     goto out2;
   }
 
-  if (!pg->is_primary()) {
-    dout(10) << *pg << "  sending info+missing+backlog to primary" << dendl;
-    assert(!pg->is_active());  // for now
-    MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
-    m->missing = pg->missing;
-    m->log = pg->log;
-    _share_map_outgoing(osdmap->get_cluster_inst(pg->get_primary()));
-    cluster_messenger->send_message(m, osdmap->get_cluster_inst(pg->get_primary()));
-  } else {
-    dout(10) << *pg << "  generated backlog, peering" << dendl;
-
-    map< int, map<pg_t,PG::Query> > query_map;    // peer -> PG -> get_summary_since
-    ObjectStore::Transaction *t = new ObjectStore::Transaction;
-    C_Contexts *fin = new C_Contexts;
-    pg->do_peer(*t, fin->contexts, query_map, NULL);
-    do_queries(query_map);
-    pg->write_info(*t);
-    pg->write_log(*t);
-    int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
-    assert(tr == 0);
-  }
+  pg->handle_backlog_generated(&rctx);
+  do_queries(query_map);
+  do_infos(info_map);
+  pg->write_info(*t);
+  pg->write_log(*t);
+  tr = store->queue_transaction(&pg->osr, t, 
+                               new ObjectStore::C_DeleteTransaction(t), fin);
+  assert(!tr);
 
  out2:
   map_lock.put_read();
index 748ccab71a46a1be9f17904a38d244b71286b66a..40ff50644ba19f355e0be013e886d63854f430c6 100644 (file)
@@ -661,15 +661,6 @@ protected:
   void queue_pg_for_deletion(PG *pg);
   void _remove_pg(PG *pg);
 
-  // helper for handle_pg_log and handle_pg_info
-  void _process_pg_info(epoch_t epoch, int from,
-                       PG::Info &info, 
-                       PG::Log &log, 
-                       PG::Missing *missing,
-                       map< int, map<pg_t,PG::Query> >& query_map,
-                       map<int, MOSDPGInfo*>* info_map,
-                       int& created);
-
   // backlogs
   xlist<PG*> backlog_queue;
 
index 96604c1e4b17b80d2a6f310bcb3e7dc1f16517a1..3596ba00917c0b7c646b82c5e61c48ea67a16ca8 100644 (file)
@@ -177,91 +177,98 @@ void PG::trim_write_ahead()
 
 }
 
-void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from)
+void PG::proc_master_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from)
 {
   dout(10) << "proc_replica_log for osd" << from << ": " << olog << " " << omissing << dendl;
-  assert(!is_active());
+  assert(!is_active() && is_primary());
 
-  if (!have_master_log) {
-    // merge log into our own log to build master log.  no need to
-    // make any adjustments to their missing map; we are taking their
-    // log to be authoritative (i.e., their entries are by definitely
-    // non-divergent).
-    merge_log(t, oinfo, olog, from);
+  // merge log into our own log to build master log.  no need to
+  // make any adjustments to their missing map; we are taking their
+  // log to be authoritative (i.e., their entries are by definitely
+  // non-divergent).
+  merge_log(t, oinfo, olog, from);
+  peer_info[from] = oinfo;
+  dout(10) << " peer osd" << from << " now " << oinfo << " " << omissing << dendl;
+  might_have_unfound.insert(from);
 
-  } else if (is_acting(from)) {
-    // replica.  have master log. 
-    // populate missing; check for divergence
+  search_for_missing(oinfo, &omissing, from);
+  peer_missing[from].swap(omissing);
+}
     
-    /*
-      basically what we're doing here is rewinding the remote log,
-      dropping divergent entries, until we find something that matches
-      our master log.  we then reset last_update to reflect the new
-      point up to which missing is accurate.
-
-      later, in activate(), missing will get wound forward again and
-      we will send the peer enough log to arrive at the same state.
-    */
-
-    list<Log::Entry>::const_reverse_iterator pp = olog.log.rbegin();
-    eversion_t lu(oinfo.last_update);
-    while (true) {
-      if (pp == olog.log.rend()) {
-       lu = olog.tail;
-       break;
-      }
-      const Log::Entry& oe = *pp;
+void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from) {
+  /*
+    basically what we're doing here is rewinding the remote log,
+    dropping divergent entries, until we find something that matches
+    our master log.  we then reset last_update to reflect the new
+    point up to which missing is accurate.
 
-      // don't continue past the tail of our log.
-      if (oe.version <= log.tail)
-       break;
+    later, in activate(), missing will get wound forward again and
+    we will send the peer enough log to arrive at the same state.
+  */
 
-      if (!log.objects.count(oe.soid)) {
-        dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
-        ++pp;
-        continue;
-      }
+  for (map<sobject_t, Missing::item>::iterator i = omissing.missing.begin();
+       i != omissing.missing.end();
+       ++i) {
+    dout(10) << "Missing sobject: " << i->first << dendl;
+  }
+  list<Log::Entry>::const_reverse_iterator pp = olog.log.rbegin();
+  eversion_t lu(oinfo.last_update);
+  while (true) {
+    if (pp == olog.log.rend()) {
+      lu = olog.tail;
+      break;
+    }
+    const Log::Entry& oe = *pp;
+
+    // don't continue past the tail of our log.
+    if (oe.version <= log.tail)
+      break;
+
+    if (!log.objects.count(oe.soid)) {
+      dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
+      ++pp;
+      continue;
+    }
       
-      Log::Entry& ne = *log.objects[oe.soid];
-      if (ne.version == oe.version) {
-       dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
-       lu = pp->version;
-       break;
-      }
-      if (ne.version > oe.version) {
-       dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+    Log::Entry& ne = *log.objects[oe.soid];
+    if (ne.version == oe.version) {
+      dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
+      lu = pp->version;
+      break;
+    }
+    if (ne.version > oe.version) {
+      dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+    } else {
+      if (oe.is_delete()) {
+       if (ne.is_delete()) {
+         // old and new are delete
+         dout(20) << " had " << oe << " new " << ne << " : both deletes" << dendl;
+       } else {
+         // old delete, new update.
+         dout(20) << " had " << oe << " new " << ne << " : missing" << dendl;
+         omissing.add(ne.soid, ne.version, eversion_t());
+       }
       } else {
-       if (oe.is_delete()) {
-         if (ne.is_delete()) {
-           // old and new are delete
-           dout(20) << " had " << oe << " new " << ne << " : both deletes" << dendl;
-         } else {
-           // old delete, new update.
-           dout(20) << " had " << oe << " new " << ne << " : missing" << dendl;
-           omissing.add(ne.soid, ne.version, eversion_t());
-         }
+       if (ne.is_delete()) {
+         // old update, new delete
+         dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+         omissing.rm(oe.soid, oe.version);
        } else {
-         if (ne.is_delete()) {
-           // old update, new delete
-           dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
-           omissing.rm(oe.soid, oe.version);
-         } else {
-           // old update, new update
-           dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
-           omissing.revise_need(ne.soid, ne.version);
-         }
+         // old update, new update
+         dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+         omissing.revise_need(ne.soid, ne.version);
        }
       }
+    }
 
-      ++pp;
-    }    
+    ++pp;
+  }    
 
-    if (lu < oinfo.last_update) {
-      dout(10) << " peer osd" << from << " last_update now " << lu << dendl;
-      oinfo.last_update = lu;
-      if (lu < oinfo.last_complete)
-       oinfo.last_complete = lu;
-    }
+  if (lu < oinfo.last_update) {
+    dout(10) << " peer osd" << from << " last_update now " << lu << dendl;
+    oinfo.last_update = lu;
+    if (lu < oinfo.last_complete)
+      oinfo.last_complete = lu;
   }
 
   peer_info[from] = oinfo;
@@ -269,9 +276,35 @@ void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, M
   might_have_unfound.insert(from);
 
   search_for_missing(oinfo, &omissing, from);
+  for (map<sobject_t, Missing::item>::iterator i = omissing.missing.begin();
+       i != omissing.missing.end();
+       ++i) {
+    dout(10) << "Final Missing sobject: " << i->first << dendl;
+  }
   peer_missing[from].swap(omissing);
 }
 
+void PG::proc_replica_info(int from, Info &info)
+{
+  assert(is_primary());
+  peer_info[from] = info;
+  might_have_unfound.insert(from);
+  
+  osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
+  info.history.merge(info.history);
+  osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
+  
+  // stray?
+  if (!is_acting(from)) {
+    dout(10) << " osd" << from << " has stray content: " << info << dendl;
+    stray_set.insert(from);
+    if (is_clean()) {
+      purge_strays();
+    }
+  }
+  update_stats();
+}
+
 
 /*
  * merge an old (possibly divergent) log entry into the new log.  this 
@@ -915,10 +948,8 @@ void PG::trim_past_intervals()
 
 
 // true if the given map affects the prior set
-bool PG::prior_set_affected(const OSDMap *osdmap) const
+bool PG::prior_set_affected(PgPriorSet &prior, const OSDMap *osdmap) const
 {
-  const PgPriorSet &prior = *prior_set.get();
-
   for (set<int>::iterator p = prior.cur.begin();
        p != prior.cur.end();
        ++p)
@@ -1083,7 +1114,7 @@ void PG::mark_all_unfound_as_lost(ObjectStore::Transaction& t)
   share_pg_log(old_last_update);
 }
 
-void PG::build_prior()
+void PG::build_prior(std::auto_ptr<PgPriorSet> &prior_set)
 {
   if (1) {
     // sanity check
@@ -1093,11 +1124,6 @@ void PG::build_prior()
       assert(info.history.last_epoch_started >= it->second.history.last_epoch_started);
     }
   }
-  generate_past_intervals();
-
-  state_clear(PG_STATE_CRASHED);
-  state_clear(PG_STATE_DOWN);
-
   stringstream out;
   prior_set.reset(new PgPriorSet(osd->whoami,
                                 *osd->osdmap,
@@ -1109,7 +1135,6 @@ void PG::build_prior()
   dout(10) << out << dendl;
   PgPriorSet &prior(*prior_set.get());
                                 
-
   dout(10) << "build_prior: " << *this << " "
           << (prior.crashed ? " crashed":"")
           << (prior.pg_down ? " down":"")
@@ -1142,7 +1167,6 @@ void PG::clear_primary_state()
 
   // clear peering state
   have_master_log = false;
-  prior_set.reset(NULL);
   stray_set.clear();
   peer_info_requested.clear();
   peer_log_requested.clear();
@@ -1216,127 +1240,6 @@ bool PG::choose_acting(int newest_update_osd)
   return true;
 }
 
-// if false, stop.
-bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map,
-                           eversion_t &oldest_update)
-{
-  dout(10) << "recover_master_log" << dendl;
-
-  if (is_down())
-    return false;
-
-  if (peer_info_requested.empty()) {
-    stringstream out;
-    prior_set->gen_query_map(*osd->osdmap, info, query_map);
-    dout(10) << out << dendl;
-    for (map< int, map<pg_t, Query> >::const_iterator i = query_map.begin();
-        i != query_map.end(); ++i) {
-      peer_info_requested.insert(i->first);
-    }
-  }
-
-  bool lack_info = false;
-  for (set<int>::const_iterator it = prior_set->cur.begin();
-       it != prior_set->cur.end();
-       ++it) {
-    if (peer_info.count(*it)) {
-      dout(10) << " have info from osd" << *it 
-               << ": " << peer_info[*it]
-               << dendl;      
-      continue;
-    }
-    lack_info = true;
-
-    if (peer_info_requested.find(*it) != peer_info_requested.end()) {
-      dout(10) << " waiting for osd" << *it << dendl;
-      continue;
-    }
-  }
-  if (lack_info)
-    return false;
-
-  // -- ok, we have all (prior_set) info.  (and maybe others.)
-  dout(10) << " have prior_set info.  min_last_complete_ondisk " << min_last_complete_ondisk << dendl;
-
-  bool need_backlog, wait_for_backlog;
-  int pull_from;
-  eversion_t newest_update;
-  stringstream out;
-  choose_log_location(need_backlog, 
-                     wait_for_backlog, pull_from, newest_update, 
-                     oldest_update);
-  dout(10) << out << dendl;
-
-  if (pull_from == -1) {
-    pull_from = osd->whoami;
-  }
-
-  // -- do i need to generate backlog?
-  if (need_backlog && !info.log_backlog) {
-    osd->queue_generate_backlog(this);
-  }
-
-  // -- decide what acting set i want, based on state of up set
-  if (!choose_acting(pull_from))
-    return false;
-
-  // gather log(+missing) from that person!
-  if (pull_from != osd->whoami) {
-    const Info& pi = peer_info[pull_from];
-    if (pi.log_tail <= log.head) {
-      if (peer_log_requested.count(pull_from)) {
-       dout(10) << " newest update on osd" << pull_from
-                << " v " << newest_update
-                << ", already queried log"
-                << dendl;
-      } else {
-       // we'd _like_ it back to oldest_update, but take what we can get.
-       dout(10) << " newest update on osd" << pull_from
-                << " v " << newest_update
-                << ", querying since oldest_update " << oldest_update
-                << dendl;
-       query_map[pull_from][info.pgid] = Query(Query::LOG, oldest_update, info.history);
-       peer_log_requested.insert(pull_from);
-      }
-    } else {
-      dout(10) << " newest update on osd" << pull_from
-              << ", whose log.tail " << pi.log_tail
-              << " > my log.head " << log.head
-              << ", i will need backlog for me+them." << dendl;
-      // it's possible another peer could fill in the missing bits, but
-      // pretty unlikely.  someday it may be worth the complexity to
-      // try.  until then, just get the full backlogs.
-      if (!log.backlog) {
-       osd->queue_generate_backlog(this);
-       return false;
-      }
-      
-      if (peer_backlog_requested.count(pull_from)) {
-       dout(10) << " newest update on osd" << pull_from
-                << " v " << newest_update
-                << ", already queried summary/backlog"
-                << dendl;
-      } else {
-       dout(10) << " newest update on osd" << pull_from
-                << " v " << newest_update 
-                << ", querying entire summary/backlog"
-                << dendl;
-       query_map[pull_from][info.pgid] = Query(Query::BACKLOG, info.history);
-       peer_backlog_requested.insert(pull_from);
-      }
-    }
-    return false;
-  } else {
-    dout(10) << " newest_update " << info.last_update << " (me)" << dendl;
-  }
-
-  dout(10) << " oldest_update " << oldest_update << dendl;
-
-  have_master_log = true;
-
-  return true;
-}
-
 void PG::choose_log_location(bool &need_backlog,
                             bool &wait_on_backlog,
                             int &pull_from,
@@ -1416,149 +1319,6 @@ void PG::choose_log_location(bool &need_backlog,
   }
 }
 
-void PG::do_peer(ObjectStore::Transaction& t, list<Context*>& tfin,
-              map< int, map<pg_t,Query> >& query_map,
-             map<int, MOSDPGInfo*> *activator_map)
-{
-  dout(10) << "PG::do_peer: peer up " << up << ", acting "
-          << acting << dendl;
-
-  if (!is_active())
-    state_set(PG_STATE_PEERING);
-  
-  if (!prior_set.get())
-    build_prior();
-
-  dout(10) << "PG::do_peer: peer prior_set is "
-          << *prior_set << dendl;
-  
-  eversion_t oldest_update;
-  if (!have_master_log) {
-    if (!recover_master_log(query_map, oldest_update))
-      return;
-  }
-  else {
-    if (up != acting) {
-      // are we done generating backlog(s)?
-      if (!choose_acting(osd->whoami))
-       return;
-    }
-  }
-
-  /** COLLECT MISSING+LOG FROM PEERS **********/
-  /*
-    we also detect divergent replicas here by pulling the full log
-    from everyone.  
-
-    for example:
-   0:    1:    2:    
-    2'6   2'6    2'6
-    2'7   2'7    2'7
-    3'8 | 2'8    2'8
-    3'9 |        2'9
-    
-  */  
-
-  // gather missing from peers
-  bool have_all_missing = true;
-  for (unsigned i=1; i<acting.size(); i++) {
-    int peer = acting[i];
-    Info& pi = peer_info[peer];
-    dout(10) << " peer osd" << peer << " " << pi << dendl;
-
-    if (pi.is_empty())
-      continue;
-    if (peer_missing.find(peer) == peer_missing.end()) {
-      if (pi.last_update == pi.last_complete &&  // peer has no missing
-         pi.last_update == info.last_update) {  // peer is up to date
-       // replica has no missing and identical log as us.  no need to
-       // pull anything.
-       dout(10) << " infering up to date and no missing (last_update==last_complete) for osd" << peer << dendl;
-       peer_missing[peer].num_missing();  // just create the entry.
-       search_for_missing(peer_info[peer], &peer_missing[peer], peer);
-       continue;
-      } else {
-       dout(10) << " still need log+missing from osd" << peer << dendl;
-       have_all_missing = false;
-      }
-    }
-    if (peer_log_requested.find(peer) != peer_log_requested.end())
-      continue;
-    if (peer_backlog_requested.find(peer) != peer_backlog_requested.end())
-      continue;
-   
-    assert(pi.last_update <= log.head);
-
-    if (pi.last_update < log.tail) {
-      // we need the full backlog in order to build this node's missing map.
-      dout(10) << " osd" << peer << " last_update " << pi.last_update
-              << " < log.tail " << log.tail
-              << ", pulling missing+backlog" << dendl;
-      query_map[peer][info.pgid] = Query(Query::BACKLOG, info.history);
-      peer_backlog_requested.insert(peer);
-    } else {
-      // we need just enough log to get any divergent items so that we
-      // can appropriate adjust the missing map.  that can be as far back
-      // as the peer's last_epoch_started.
-      eversion_t from(pi.history.last_epoch_started, 0);
-      dout(10) << " osd" << peer << " last_update " << pi.last_update
-              << ", pulling missing+log from it's last_epoch_started " << from << dendl;
-      query_map[peer][info.pgid] = Query(Query::LOG, from, info.history);
-      peer_log_requested.insert(peer);
-    }
-  }
-  if (!have_all_missing)
-    return;
-
-  {
-    int num_missing = missing.num_missing();
-    int num_locs = missing_loc.size();
-    dout(10) << "num_missing = " << num_missing
-            << ", num_unfound = " << (num_missing - num_locs) << dendl;
-  }
-
-  // sanity check
-  assert(info.last_complete >= log.tail || log.backlog);
-
-  // -- do need to notify the monitor?
-  if (true) {
-    // NOTE: we can skip the up_thru check if this is a new PG and there
-    // were no prior intervals.
-    if (info.history.epoch_created < info.history.same_acting_since &&
-       osd->osdmap->get_up_thru(osd->whoami) < info.history.same_acting_since) {
-      dout(10) << "up_thru " << osd->osdmap->get_up_thru(osd->whoami)
-              << " < same_since " << info.history.same_acting_since
-              << ", must notify monitor" << dendl;
-      need_up_thru = true;
-      osd->queue_want_up_thru(info.history.same_acting_since);
-      return;
-    } else {
-      dout(10) << "up_thru " << osd->osdmap->get_up_thru(osd->whoami)
-              << " >= same_since " << info.history.same_acting_since
-              << ", all is well" << dendl;
-    }
-  }
-
-  // -- crash recovery?
-  if (is_crashed()) {
-    replay_until = g_clock.now();
-    replay_until += g_conf.osd_replay_window;
-    dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window
-            << " until " << replay_until << dendl;
-    state_set(PG_STATE_REPLAY);
-    osd->replay_queue_lock.Lock();
-    osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
-    osd->replay_queue_lock.Unlock();
-  }
-
-  if (!is_active()) {
-    activate(t, tfin, query_map, activator_map);
-  }
-
-  if (is_all_uptodate()) 
-    finish_recovery(t, tfin);
-}
-
 /* Build the might_have_unfound set.
  *
  * This is used by the primary OSD during recovery.
@@ -1625,6 +1385,17 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
                  map<int, MOSDPGInfo*> *activator_map)
 {
   assert(!is_active());
+  // -- crash recovery?
+  if (is_crashed()) {
+    replay_until = g_clock.now();
+    replay_until += g_conf.osd_replay_window;
+    dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window
+            << " until " << replay_until << dendl;
+    state_set(PG_STATE_REPLAY);
+    osd->replay_queue_lock.Lock();
+    osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
+    osd->replay_queue_lock.Unlock();
+  }
 
   // twiddle pg state
   state_set(PG_STATE_ACTIVE);
@@ -1658,9 +1429,6 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
 
   need_up_thru = false;
 
-  // clear prior set (and dependency info)... we are done peering!
-  prior_set.reset(NULL);
-
   // if we are building a backlog, cancel it!
   if (up == acting)
     osd->cancel_generate_backlog(this);
@@ -1730,7 +1498,7 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
 
       if (pi.last_update == info.last_update && !need_old_log_entries) {
         // empty log
-       if (activator_map) {
+       if (!pi.is_empty() && activator_map) {
          dout(10) << "activate peer osd" << peer << " is up to date, queueing in pending_activators" << dendl;
          if (activator_map->count(peer) == 0)
            (*activator_map)[peer] = new MOSDPGInfo(osd->osdmap->get_epoch());
@@ -1850,7 +1618,9 @@ void PG::_activate_committed(epoch_t e)
 
   if (is_primary()) {
     peer_activated.insert(osd->whoami);
-    dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated << dendl;
+    dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated 
+            << " last_epoch_started " << info.history.last_epoch_started
+            << " same_acting_since " << info.history.same_acting_since << dendl;
     if (peer_activated.size() == acting.size())
       all_activated_and_committed();
   } else {
@@ -2044,6 +1814,7 @@ void PG::purge_strays()
     peer_info.erase(*p);
   }
 
+  state_set(PG_STATE_CLEAN);
   stray_set.clear();
 
   // clear _requested maps; we may have to peer() again if we discover
@@ -3503,14 +3274,52 @@ void PG::share_pg_log(const eversion_t &oldver)
   }
 }
 
-bool PG::handle_advance_map(OSDMap &osdmap,
-                           const OSDMap &lastmap)
+void PG::fulfill_info(int from, const Query &query, 
+                     pair<int, Info> &notify_info)
 {
-  if (acting_up_affected(osdmap, lastmap)) {
-    return true;
+  assert(!acting.empty());
+  assert(from == acting[0]);
+  assert(query.type == PG::Query::INFO);
+
+  // info
+  dout(10) << "sending info" << dendl;
+  notify_info = make_pair(from, info);
+}
+
+void PG::fulfill_log(int from, const Query &query)
+{
+  assert(!acting.empty());
+  assert(from == acting[0]);
+  assert(query.type != PG::Query::INFO);
+  if (query.type == PG::Query::BACKLOG &&
+      !log.backlog) {
+    assert(0); // generated in the state machine
   } else {
-    warm_restart();
-    return false;
+    MOSDPGLog *mlog = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+    mlog->missing = missing;
+       
+    // primary -> other, when building master log
+    if (query.type == PG::Query::LOG) {
+      dout(10) << " sending info+missing+log since " << query.since
+              << dendl;
+      mlog->log.copy_after(log, query.since);
+    }
+       
+    if (query.type == PG::Query::BACKLOG) {
+      dout(10) << "sending info+missing+backlog" << dendl;
+      assert(log.backlog);
+      mlog->log = log;
+    } 
+    else if (query.type == PG::Query::FULLLOG) {
+      dout(10) << " sending info+missing+full log" << dendl;
+      mlog->log.copy_non_backlog(log);
+    }
+       
+    dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
+       
+    osd->_share_map_outgoing(osd->osdmap->get_cluster_inst(from));
+    osd->cluster_messenger->send_message(mlog, 
+                                        osd->osdmap->get_cluster_inst(from));
   }
 }
 
@@ -3687,6 +3496,26 @@ void PG::warm_restart()
   }
 }
 
+void PG::process_primary_info(ObjectStore::Transaction &t, const Info &oinfo)
+{
+  assert(is_replica());
+  assert(is_active());
+  info.stats = oinfo.stats;
+
+  // Handle changes to purged_snaps
+  interval_set<snapid_t> p;
+  p.union_of(oinfo.purged_snaps, info.purged_snaps);
+  p.subtract(info.purged_snaps);
+  info.purged_snaps = oinfo.purged_snaps;
+  if (!p.empty()) {
+    dout(10) << " purged_snaps " << info.purged_snaps
+            << " -> " << oinfo.purged_snaps
+            << " removed " << p << dendl;
+    adjust_local_snaps(t, p);
+  }
+  write_info(t);
+}
+
 unsigned int PG::Missing::num_missing() const
 {
   return missing.size();
@@ -3886,6 +3715,583 @@ std::ostream& operator<<(std::ostream& oss,
       << "lost=" << prior.lost << " ]]";
   return oss;
 }
+
+/*------------ Recovery State Machine----------------*/
+#undef dout_prefix
+#define dout_prefix (*_dout << context< RecoveryMachine >().pg->gen_prefix() << "StateMachine: ")
+
+/*------Started-------*/
+boost::statechart::result 
+PG::RecoveryState::Started::react(const AdvMap& advmap) {
+  dout(10) << "Started advmap" << dendl;
+  PG *pg = context< RecoveryMachine >().pg;
+  if (pg->acting_up_affected(advmap.osdmap, advmap.lastmap)) {
+    dout(10) << "up or acting affected, transitioning to Reset" << dendl;
+    return transit< Reset >();
+  }
+  return discard_event();
+}
+
+/*--------Reset---------*/
+boost::statechart::result 
+PG::RecoveryState::Reset::react(const AdvMap& advmap) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "Reset advmap" << dendl;
+  if (pg->acting_up_affected(advmap.osdmap, advmap.lastmap)) {
+    dout(10) << "up or acting affected, calling warm_restart again" << dendl;
+    pg->warm_restart();
+  }
+  return discard_event();
+}
+
+boost::statechart::result 
+PG::RecoveryState::Reset::react(const ActMap& actmap) {
+  PG *pg = context< RecoveryMachine >().pg;
+  if (pg->is_stray() && pg->get_primary() >= 0) {
+    context< RecoveryMachine >().send_notify(pg->get_primary(),
+                                            pg->info);
+  }
+  return transit< Started >();
+}
+
+PG::RecoveryState::Reset::Reset(my_context ctx) : my_base(ctx) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "Reseting" << dendl;
+  pg->warm_restart();
+  post_event(Initialize());
+}
+       
+/*-------Start---------*/
+PG::RecoveryState::Start::Start(my_context ctx) : my_base(ctx) {
+  PG *pg = context< RecoveryMachine >().pg;
+  if (pg->is_primary()) {
+    dout(1) << "transitioning to Primary" << dendl;
+    post_event(MakePrimary());
+  } else { //is_stray
+    dout(1) << "transitioning to Stray" << dendl; 
+    post_event(MakeStray());
+  }
+}
+
+/*---------Primary--------*/
+boost::statechart::result 
+PG::RecoveryState::Primary::react(const BacklogComplete&) {
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->choose_acting(pg->osd->whoami);
+  return discard_event();
+}
+
+boost::statechart::result 
+PG::RecoveryState::Primary::react(const MNotifyRec& notevt) {
+  dout(7) << "handle_pg_notify from " << notevt.from << dendl;
+  PG *pg = context< RecoveryMachine >().pg;
+  if (pg->peer_info.count(notevt.from) &&
+      pg->peer_info[notevt.from].last_update == notevt.info.last_update) {
+    dout(10) << *pg << " got dup osd" << notevt.from << " info " << notevt.info
+            << ", identical to ours" << dendl;
+  } else {
+    pg->proc_replica_info(notevt.from, notevt.info);
+  }
+  return discard_event();
+}
+
+boost::statechart::result 
+PG::RecoveryState::Primary::react(const ActMap&) {
+  dout(7) << "handle ActMap primary" << dendl;
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->update_stats();
+  return forward_event();
+}
+
+/*---------Peering--------*/
+PG::RecoveryState::Peering::Peering(my_context ctx)
+  : my_base(ctx) {
+  PG *pg = context< RecoveryMachine >().pg;
+  assert(!pg->is_active());
+  assert(!pg->is_peering());
+  assert(pg->is_primary());
+  pg->state_set(PG_STATE_PEERING);
+} 
+
+boost::statechart::result 
+PG::RecoveryState::Peering::react(const AdvMap& advmap) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "Peering advmap" << dendl;
+  if (pg->prior_set_affected(*prior_set.get(), &advmap.osdmap)) {
+    dout(1) << "Peering, priors_set_affected, going to Reset" << dendl;
+    return transit< Reset >();
+  }
+  return forward_event();
+}
+
+void PG::RecoveryState::Peering::exit() {
+  dout(10) << "Leaving Peering" << dendl;
+}
+
+/*---------Active---------*/
+PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) {
+  PG *pg = context< RecoveryMachine >().pg;
+  assert(pg->is_primary());
+  dout(10) << "In Active, about to call activate" << dendl;
+  pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
+              *context< RecoveryMachine >().get_context_list(),
+              *context< RecoveryMachine >().get_query_map(),
+              context< RecoveryMachine >().get_info_map());
+  assert(pg->is_active());
+  dout(10) << "Activate Finished" << dendl;
+}
+
+boost::statechart::result 
+PG::RecoveryState::Active::react(const AdvMap& advmap) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "Active advmap" << dendl;
+  if (!pg->pool->newly_removed_snaps.empty()) {
+    pg->snap_trimq.union_of(pg->pool->newly_removed_snaps);
+    dout(10) << *pg << " snap_trimq now " << pg->snap_trimq << dendl;
+    pg->dirty_info = true;
+  }
+  return forward_event();
+}
+    
+
+boost::statechart::result 
+PG::RecoveryState::Active::react(const ActMap&) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "Active: handling ActMap" << dendl;
+  assert(pg->is_active());
+  assert(pg->is_primary());
+  pg->check_recovery_op_pulls(pg->osd->osdmap);
+       
+  if (g_conf.osd_check_for_log_corruption)
+    pg->check_log_for_corruption(pg->osd->store);
+
+  if (pg->missing.num_missing() > pg->missing_loc.size()) {
+    if (pg->all_unfound_are_lost(pg->osd->osdmap)) {
+      pg->mark_all_unfound_as_lost(
+       *context< RecoveryMachine >().get_cur_transaction());
+    }
+  }
+
+  if (!pg->snap_trimq.empty() &&
+      pg->is_clean()) {
+    dout(10) << "Active: queuing snap trim" << dendl;
+    pg->queue_snap_trim();
+  }
+
+  if (pg->is_all_uptodate()) {
+    dout(10) << "Active: all up to date, going clean" << dendl;
+    pg->finish_recovery(*context< RecoveryMachine >().get_cur_transaction(),
+                       *context< RecoveryMachine >().get_context_list());
+  }
+
+  pg->choose_acting(pg->osd->whoami);
+
+  return forward_event();
+}
+
+
+boost::statechart::result 
+PG::RecoveryState::Active::react(const MNotifyRec& notevt) {
+  PG *pg = context< RecoveryMachine >().pg;
+  assert(pg->is_active());
+  assert(pg->is_primary());
+  if (pg->peer_info.count(notevt.from)) {
+    dout(10) << "Active: got notify from " << notevt.from 
+            << ", already have info from that osd, ignoring" 
+            << dendl;
+  } else {
+    dout(10) << "Active: got notify from " << notevt.from 
+            << ", calling proc_replica_info and discover_all_missing"
+            << dendl;
+    pg->proc_replica_info(notevt.from, notevt.info);
+    if (pg->have_unfound()) {
+      pg->discover_all_missing(*context< RecoveryMachine >().get_query_map());
+    }
+  }
+  return discard_event();
+}
+
+boost::statechart::result 
+PG::RecoveryState::Active::react(const MInfoRec& infoevt) {
+  PG *pg = context< RecoveryMachine >().pg;
+  assert(pg->is_active());
+  assert(pg->is_primary());
+
+  // don't update history (yet) if we are active and primary; the replica
+  // may be telling us they have activated (and committed) but we can't
+  // share that until _everyone_ does the same.
+  if (pg->is_acting(infoevt.from)) {
+    assert(pg->info.history.last_epoch_started < 
+          pg->info.history.same_acting_since);
+    assert(infoevt.info.history.last_epoch_started >= 
+          pg->info.history.same_acting_since);
+    dout(10) << " peer osd" << infoevt.from << " activated and committed" 
+            << dendl;
+    pg->peer_activated.insert(infoevt.from);
+  }
+
+  if (pg->peer_activated.size() == pg->acting.size()) {
+    pg->all_activated_and_committed();
+  }
+  return discard_event();
+}
+
+/*------ReplicaActive-----*/
+PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx) 
+  : my_base(ctx) {
+  dout(10) << "In ReplicaActive, about to call activate" << dendl;
+  PG *pg = context< RecoveryMachine >().pg;
+  map< int, map< pg_t, Query> > query_map;
+  pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
+              *context< RecoveryMachine >().get_context_list(),
+              query_map, NULL);
+  dout(10) << "Activate Finished" << dendl;
+}
+
+boost::statechart::result 
+PG::RecoveryState::ReplicaActive::react(const MInfoRec& infoevt) {
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->process_primary_info(*context<RecoveryMachine>().get_cur_transaction(),
+                          infoevt.info);
+  return discard_event();
+}
+
+/*-------Stray---*/
+PG::RecoveryState::Stray::Stray(my_context ctx) 
+  : my_base(ctx), backlog_requested(false) {
+  PG *pg = context< RecoveryMachine >().pg;
+  assert(!pg->is_active());
+  assert(!pg->is_peering());
+  assert(!pg->is_primary());
+  pg->state_set(PG_STATE_PEERING);
+}
+
+boost::statechart::result 
+PG::RecoveryState::Stray::react(const MLogRec& logevt) {
+  PG *pg = context< RecoveryMachine >().pg;
+  MOSDPGLog *msg = logevt.msg;
+  dout(10) << "received log from " << logevt.from << dendl;
+  pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
+               msg->info, msg->log, logevt.from);
+
+  assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
+  assert(pg->log.head == pg->info.last_update);
+
+  pg->write_info(*context<RecoveryMachine>().get_cur_transaction());
+  dout(10) << "activating!" << dendl;
+  post_event(Activate());
+  return discard_event();
+};
+
+boost::statechart::result 
+PG::RecoveryState::Stray::react(const MInfoRec& infoevt) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "received info from " << infoevt.from << dendl;
+
+  Log empty_log;
+  pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
+               infoevt.info, empty_log, infoevt.from);
+
+  assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
+  assert(pg->log.head == pg->info.last_update);
+
+  pg->write_info(*context<RecoveryMachine>().get_cur_transaction());
+  dout(10) << "activating!" << dendl;
+  post_event(Activate());
+  return discard_event();
+};
+
+boost::statechart::result 
+PG::RecoveryState::Stray::react(const BacklogComplete&) {
+  PG *pg = context< RecoveryMachine >().pg;
+  assert(backlog_requested);
+  dout(10) << "BacklogComplete" << dendl;
+  for (map<int, Query>::iterator i = pending_queries.begin();
+       i != pending_queries.end();
+       pending_queries.erase(i++)) {
+    dout(10) << "sending log to " << i->first << dendl;
+    pg->fulfill_log(i->first, i->second);
+  }
+  backlog_requested = false;
+  return discard_event();
+}
+
+boost::statechart::result 
+PG::RecoveryState::Stray::react(const MQuery& query) {
+  PG *pg = context< RecoveryMachine >().pg;
+  if (query.query.type == Query::BACKLOG) {
+    if (!pg->log.backlog) {
+      dout(10) << "Stray, need a backlog!" 
+              << dendl;
+      pending_queries[query.from] = query.query;
+      if (!backlog_requested) {
+       dout(10) << "Stray, generating a backlog!" 
+                << dendl;
+       backlog_requested = true;
+       pg->osd->queue_generate_backlog(pg);
+      }
+      return discard_event();
+    }
+  }
+
+  if (query.query.type == Query::INFO) {
+    pair<int, Info> notify_info;
+    pg->fulfill_info(query.from, query.query, notify_info);
+    context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second);
+  } else {
+    pg->fulfill_log(query.from, query.query);
+  }
+  return discard_event();
+}
+
+/*--------GetInfo---------*/
+PG::RecoveryState::GetInfo::GetInfo(my_context ctx) : my_base(ctx) {
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->generate_past_intervals();
+  auto_ptr<PgPriorSet> &prior_set = context< Peering >().prior_set;
+
+  if (!prior_set.get()) {
+    stringstream out;
+    pg->build_prior(prior_set);
+    dout(10) << "PG::do_peer: peer prior_set is "
+            << *prior_set << dendl;
+
+    if (pg->need_up_thru) {
+      dout(10) << "transitioning to pending, need upthru" << dendl;
+      post_event(NeedNewMap());
+    } else {
+      stringstream out;
+      map< int, map< pg_t, Query > > query_map;
+      prior_set->gen_query_map(*pg->osd->osdmap,
+                              pg->info,
+                              query_map);
+      for (map< int, map< pg_t, Query> >::iterator i = query_map.begin();
+          i != query_map.end();
+          ++i) {
+       for (map< pg_t, Query >::iterator j = i->second.begin();
+            j != i->second.end();
+            ++j) {
+         context< RecoveryMachine >().send_query(i->first, j->second);
+         peer_info_requested.insert(i->first);
+       }
+      }
+    }
+  }
+  if (peer_info_requested.empty()) {
+    post_event(GotInfo());
+  }
+}
+
+boost::statechart::result 
+PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt) {
+  if (peer_info_requested.count(infoevt.from)) {
+    peer_info_requested.erase(infoevt.from);
+  }
+  if (peer_info_requested.empty()) {
+    post_event(GotInfo());
+  }
+  return forward_event();
+}
+
+/*------GetLog------------*/
+PG::RecoveryState::GetLog::GetLog(my_context ctx) : 
+  my_base(ctx), newest_update_osd(-1), need_backlog(false), msg(0) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "In GetLog, selecting log location" << dendl;
+  eversion_t newest_update;
+  eversion_t oldest_update;
+  stringstream out;
+  pg->choose_log_location(need_backlog,
+                         wait_on_backlog,
+                         newest_update_osd,
+                         newest_update,
+                         oldest_update);
+
+  if (!pg->choose_acting(newest_update_osd == -1 ? 0 : newest_update_osd)) {
+    dout(10) << "transitioning to pending, need new acting" << dendl;
+    post_event(NeedNewMap());
+  } else {
+       
+    if (need_backlog && !pg->log.backlog) {
+      dout(10) << "In GetLog, need backlog" << dendl;
+      pg->osd->queue_generate_backlog(pg);
+    }
+    
+    if (newest_update_osd != -1) {
+      dout(10) << "Sending request to osd" << newest_update_osd
+              << "for the master log" << dendl;
+      context<RecoveryMachine>().send_query(newest_update_osd,
+       Query(wait_on_backlog ? Query::BACKLOG : Query::LOG,
+             oldest_update,
+             pg->info.history));
+    }
+
+    if (pg->log.backlog) {
+      wait_on_backlog = false;
+    }
+
+    if (!wait_on_backlog && newest_update_osd == -1) {
+      dout(10) << "GetLog: neither backlog nor master log needed, "
+              << "moving to GetMissing" << dendl;
+      post_event(GotLog());
+    }
+  }
+}
+
+boost::statechart::result 
+PG::RecoveryState::GetLog::react(const MLogRec& logevt) {
+  assert(!msg);
+  assert(logevt.from == newest_update_osd);
+  dout(10) << "GetLog: recieved master log from osd" 
+          << logevt.from << dendl;
+  msg = logevt.msg;
+  msg->get();
+  if (!wait_on_backlog) {
+    dout(10) << "GetLog: already have/don't need backlog." 
+            << "moving on to GetMissing" << dendl;
+    post_event(GotLog());
+  }
+  dout(10) << "GetLog: Still need backlog" << dendl;
+  return discard_event();
+}
+
+boost::statechart::result 
+PG::RecoveryState::GetLog::react(const BacklogComplete&) {
+  wait_on_backlog = false;
+  if (msg || newest_update_osd == -1) {
+    dout(10) << "GetLog: already have/don't need master log." 
+            << "moving on to GetMissing" << dendl;
+    post_event(GotLog());
+  } else {
+    dout(10) << "GetLog: Still need master log" << dendl;
+  }
+  return forward_event();
+}
+
+void PG::RecoveryState::GetLog::exit() {
+  PG *pg = context< RecoveryMachine >().pg;
+  if (msg) {
+    dout(10) << "processing master log" << dendl;
+    pg->proc_master_log(*context<RecoveryMachine>().get_cur_transaction(),
+                       msg->info, msg->log, msg->missing, 
+                       newest_update_osd);
+  }
+}
+
+PG::RecoveryState::GetLog::~GetLog() {
+  dout(10) << "leaving GetLog" << dendl;
+  if (msg) msg->put();
+}
+
+/*------GetMissing--------*/
+PG::RecoveryState::GetMissing::GetMissing(my_context ctx) : my_base(ctx) {
+  PG *pg = context< RecoveryMachine >().pg;
+  map<int, Missing> &peer_missing = pg->peer_missing;
+  for (vector<int>::iterator i = pg->acting.begin()++;
+       i != pg->acting.end();
+       ++i) {
+    const Info& pi = pg->peer_info[*i];
+    if (pi.is_empty()) continue; // Does not have the PG yet
+    if (pi.last_update == pi.last_complete &&  // peer has no missing
+       pi.last_update == pg->info.last_update) {  // peer is up to date
+      // replica has no missing and identical log as us.  no need to
+      // pull anything.
+      dout(10) << "osd" << *i << " has no missing, identical log" << dendl;
+      peer_missing[*i];
+      pg->search_for_missing(pg->peer_info[*i], &peer_missing[*i], *i);
+    }
+    dout(10) << "Requesting missing from osd" << *i << dendl;
+    context< RecoveryMachine >().send_query(*i,
+      Query(Query::LOG,
+           eversion_t(pi.history.last_epoch_started, 0),
+           pg->info.history));
+    peer_missing_requested.insert(*i);
+  }
+
+  if (peer_missing_requested.empty()) {
+    post_event(Activate());
+  }
+}
+
+boost::statechart::result PG::RecoveryState::GetMissing::react(const MLogRec& logevt) {
+  PG *pg = context< RecoveryMachine >().pg;
+  dout(10) << "GetMissing: Got a missing from osd" << logevt.from 
+          << "waiting on " << peer_missing_requested.size() - 1 
+          << " more" << dendl;
+  MOSDPGLog *msg = logevt.msg;
+  peer_missing_requested.erase(logevt.from);
+  pg->proc_replica_log(*context<RecoveryMachine>().get_cur_transaction(),
+                      msg->info, msg->log, msg->missing, logevt.from);
+  
+  if (peer_missing_requested.empty()) {
+    post_event(Activate());
+  }
+  return discard_event();
+};
+
+/*----Public Methods-----*/
+void PG::RecoveryState::handle_notify(int from, PG::Info& i,
+                                     RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(MNotifyRec(from, i));
+  end_handle();
+}
+
+void PG::RecoveryState::handle_info(int from, PG::Info& i,
+                                   RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(MInfoRec(from, i));
+  end_handle();
+}
+
+void PG::RecoveryState::handle_log(int from,
+                                  MOSDPGLog *msg,
+                                  RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(MLogRec(from, msg));
+  end_handle();
+}
+
+void PG::RecoveryState::handle_query(int from, const PG::Query& q,
+                                    RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(MQuery(from, q));
+  end_handle();
+}
+
+void PG::RecoveryState::handle_advance_map(OSDMap &osdmap, OSDMap &lastmap,
+                                          RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(AdvMap(osdmap, lastmap));
+  end_handle();
+}
+
+void PG::RecoveryState::handle_activate_map(RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(ActMap());
+  end_handle();
+}
+
+void PG::RecoveryState::handle_backlog_generated(RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(BacklogComplete());
+  end_handle();
+}
+
+void PG::RecoveryState::handle_create(RecoveryCtx *rctx)
+{
+  start_handle(rctx);
+  machine.process_event(Initialize());
+  end_handle();
+}
+/*---------------------------------------------------*/
 #undef dout_prefix
 #define dout_prefix (*_dout << pg->gen_prefix() << "PgPriorSet: ")
 
index 1a1c159d1edeef92dbd8a1a64887d37e04d1205b..90f611b475a59339f55447becf971973fdd604a7 100644 (file)
 #include "msg/Messenger.h"
 #include "messages/MOSDRepScrub.h"
 
+#include <boost/statechart/custom_reaction.hpp>
+#include <boost/statechart/event.hpp>
+#include <boost/statechart/simple_state.hpp>
+#include <boost/statechart/state.hpp>
+#include <boost/statechart/state_machine.hpp>
+#include <boost/statechart/transition.hpp>
+
 #include "common/DecayCounter.h"
 
 #include <list>
@@ -47,6 +54,7 @@ class MOSDOp;
 class MOSDSubOp;
 class MOSDSubOpReply;
 class MOSDPGInfo;
+class MOSDPGLog;
 
 
 struct PGPool {
@@ -766,7 +774,320 @@ public:
   friend std::ostream& operator<<(std::ostream& oss,
                                  const struct PgPriorSet &prior);
 
-  std::auto_ptr < PgPriorSet > prior_set;
+public:    
+  struct RecoveryCtx {
+    map< int, map<pg_t, Query> > *query_map;
+    map< int, MOSDPGInfo* > *info_map;
+    map< int, vector<Info> > *notify_list;
+    list< Context* > *context_list;
+    ObjectStore::Transaction *transaction;
+    RecoveryCtx() : query_map(0), info_map(0), notify_list(0),
+                   context_list(0), transaction(0) {}
+    RecoveryCtx(map< int, map<pg_t, Query> > *query_map,
+               map< int, MOSDPGInfo* > *info_map,
+               map< int, vector<Info> > *notify_list,
+               list< Context* > *context_list,
+               ObjectStore::Transaction *transaction)
+      : query_map(query_map), info_map(info_map), 
+       notify_list(notify_list),
+       context_list(context_list), transaction(transaction) {}
+  };
+
+  /* Encapsulates PG recovery process */
+  class RecoveryState {
+    void start_handle(RecoveryCtx *new_ctx) {
+      assert(!rctx);
+      rctx = new_ctx;
+    }
+
+    void end_handle() {
+      rctx = 0;
+    }
+
+    struct MInfoRec : boost::statechart::event< MInfoRec > {
+      int from;
+      Info &info;
+      MInfoRec(int from, Info &info) :
+       from(from), info(info) {}
+    };
+
+    struct MLogRec : boost::statechart::event< MLogRec > {
+      int from;
+      MOSDPGLog *msg;
+      MLogRec(int from, MOSDPGLog *msg) :
+       from(from), msg(msg) {}
+    };
+
+    struct MNotifyRec : boost::statechart::event< MNotifyRec > {
+      int from;
+      Info &info;
+      MNotifyRec(int from, Info &info) :
+       from(from), info(info) {}
+    };
+
+    struct MQuery : boost::statechart::event< MQuery > {
+      int from;
+      const Query &query;
+      MQuery(int from, const Query &query):
+       from(from), query(query) {}
+    };
+
+    struct AdvMap : boost::statechart::event< AdvMap > {
+      OSDMap &osdmap;
+      OSDMap &lastmap;
+      AdvMap(OSDMap &osdmap, OSDMap &lastmap):
+       osdmap(osdmap), lastmap(lastmap) {}
+    };
+
+    struct BacklogComplete : boost::statechart::event< BacklogComplete > {};
+    struct ActMap : boost::statechart::event< ActMap > {};
+    struct Activate : boost::statechart::event< Activate > {};
+    struct Initialize : boost::statechart::event< Initialize > {};
+
+
+    /* States */
+    struct Initial;
+    class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
+      RecoveryState *state;
+    public:
+      PG *pg;
+
+      RecoveryMachine(RecoveryState *state, PG *pg) : state(state), pg(pg) {}
+
+      /* Accessor functions for state methods */
+      ObjectStore::Transaction* get_cur_transaction() {
+       assert(state->rctx->transaction);
+       return state->rctx->transaction;
+      }
+
+      void send_query(int to, const Query &query) {
+       assert(state->rctx->query_map);
+       (*state->rctx->query_map)[to][pg->info.pgid] = query;
+      }
+
+      map<int, map<pg_t, Query> > *get_query_map() {
+       assert(state->rctx->query_map);
+       return state->rctx->query_map;
+      }
+
+      map<int, MOSDPGInfo*> *get_info_map() {
+       assert(state->rctx->info_map);
+       return state->rctx->info_map;
+      }
+
+      list< Context* > *get_context_list() {
+       assert(state->rctx->context_list);
+       return state->rctx->context_list;
+      }
+
+      void send_notify(int to, const Info &info) {
+       assert(state->rctx->notify_list);
+       (*state->rctx->notify_list)[to].push_back(info);
+      }
+    };
+    friend class RecoveryMachine;
+
+    /* States */
+    struct Crashed :
+      boost::statechart::state< Crashed, RecoveryMachine > {
+      Crashed(my_context ctx) : my_base(ctx) { assert(0); }
+    };
+
+    struct Started;
+    struct Initial :
+      boost::statechart::simple_state< Initial, RecoveryMachine > {
+      typedef boost::mpl::list <
+       boost::statechart::transition< Initialize, Started >
+       > reactions;
+    };
+
+    struct Reset :
+      boost::statechart::state< Reset, RecoveryMachine > {
+      typedef boost::mpl::list <
+       boost::statechart::custom_reaction< AdvMap >,
+       boost::statechart::custom_reaction< ActMap >
+       > reactions;
+
+      /* Entry function for RecoveryMachine.  Should initialize relevant peering
+       * state */
+      Reset(my_context ctx);
+      boost::statechart::result react(const AdvMap&);
+      boost::statechart::result react(const ActMap&);
+    };
+
+    struct Start;
+    struct Started :
+      boost::statechart::simple_state< Started, RecoveryMachine, Start > {
+      typedef boost::mpl::list <
+       boost::statechart::custom_reaction< AdvMap >
+       > reactions;
+
+      boost::statechart::result react(const AdvMap&);
+    };
+
+    struct MakePrimary : boost::statechart::event< MakePrimary > {};
+    struct MakeStray : boost::statechart::event< MakeStray > {};
+    struct Primary;
+    struct Stray;
+    struct Start :
+      boost::statechart::state< Start, Started > {
+      typedef boost::mpl::list <
+       boost::statechart::transition< MakePrimary, Primary >,
+       boost::statechart::transition< MakeStray, Stray >
+       > reactions;
+      Start(my_context ctx);
+    };
+
+    struct Peering;
+    struct Pending;
+    struct NeedNewMap : boost::statechart::event< NeedNewMap > {};
+    struct Primary :
+      boost::statechart::simple_state< Primary, Started, Peering > {
+      typedef boost::mpl::list <
+       boost::statechart::custom_reaction< ActMap >,
+       boost::statechart::custom_reaction< BacklogComplete >,
+       boost::statechart::custom_reaction< MNotifyRec >,
+       boost::statechart::transition< NeedNewMap, Pending >
+       > reactions;
+       boost::statechart::result react(const BacklogComplete&);
+       boost::statechart::result react(const ActMap&);
+       boost::statechart::result react(const MNotifyRec&);
+    };
+
+    struct Pending :
+      boost::statechart::simple_state< Pending, Primary> {};
+    
+
+    struct GetInfo;
+    struct Active;
+    struct Peering : 
+      boost::statechart::state< Peering, Primary, GetInfo > {
+      typedef boost::mpl::list <
+       boost::statechart::transition< Activate, Active >,
+       boost::statechart::custom_reaction< AdvMap >
+       > reactions;
+      std::auto_ptr< PgPriorSet > prior_set;
+
+      Peering(my_context ctx);
+      boost::statechart::result react(const AdvMap &advmap);
+      void exit();
+    };
+
+    struct Active : 
+      boost::statechart::state< Active, Primary > {
+      typedef boost::mpl::list <
+       boost::statechart::custom_reaction< ActMap >,
+       boost::statechart::custom_reaction< AdvMap >,
+       boost::statechart::custom_reaction< MInfoRec >,
+       boost::statechart::custom_reaction< MNotifyRec >
+       > reactions;
+
+      Active(my_context ctx);
+      boost::statechart::result react(const ActMap&);
+      boost::statechart::result react(const AdvMap&);
+      boost::statechart::result react(const MInfoRec& infoevt);
+      boost::statechart::result react(const MNotifyRec& notevt);
+    };
+
+    struct ReplicaActive : boost::statechart::state< ReplicaActive, Started > {
+      typedef boost::mpl::list <
+       boost::statechart::transition< MQuery, Crashed >,
+       boost::statechart::custom_reaction< MInfoRec >
+       > reactions;
+
+      ReplicaActive(my_context ctx);
+      boost::statechart::result react(const MInfoRec& infoevt);
+    };
+
+    struct Stray : boost::statechart::state< Stray, Started > {
+      bool backlog_requested;
+      map<int, Query> pending_queries;
+      typedef boost::mpl::list <
+       boost::statechart::custom_reaction< MQuery >,
+       boost::statechart::custom_reaction< MLogRec >,
+       boost::statechart::custom_reaction< MInfoRec >,
+       boost::statechart::custom_reaction< BacklogComplete >,
+       boost::statechart::transition< Activate, ReplicaActive >
+       > reactions;
+
+      Stray(my_context ctx);
+
+      boost::statechart::result react(const MQuery& query);
+      boost::statechart::result react(const BacklogComplete&);
+      boost::statechart::result react(const MLogRec& logevt);
+      boost::statechart::result react(const MInfoRec& infoevt);
+    };
+
+    struct GetLog;
+    struct GotInfo : boost::statechart::event< GotInfo > {};
+    struct GetInfo :
+      boost::statechart::state< GetInfo, Peering > {
+      set<int> peer_info_requested;
+      typedef boost::mpl::list <
+       boost::statechart::transition< GotInfo, GetLog >,
+       boost::statechart::custom_reaction< MNotifyRec >,
+       boost::statechart::transition< MLogRec, Crashed >,
+       boost::statechart::transition< BacklogComplete, Crashed >
+       > reactions;
+
+      GetInfo(my_context ctx);
+      boost::statechart::result react(const MNotifyRec& infoevt);
+    };
+
+    struct GetMissing;
+    struct GotLog : boost::statechart::event< GotLog > {};
+    struct GetLog :
+      boost::statechart::state< GetLog, Peering > {
+      int newest_update_osd;
+      bool need_backlog;
+      bool wait_on_backlog;
+      MOSDPGLog *msg;
+      typedef boost::mpl::list <
+       boost::statechart::custom_reaction< MLogRec >,
+       boost::statechart::custom_reaction< BacklogComplete >,
+       boost::statechart::transition< GotLog, GetMissing >
+       > reactions;
+
+      GetLog(my_context ctx);
+      boost::statechart::result react(const MLogRec& logevt);
+      boost::statechart::result react(const BacklogComplete&);
+      void exit();
+      ~GetLog();
+    };
+
+    struct GetMissing :
+      boost::statechart::state< GetMissing, Peering > {
+      set<int> peer_missing_requested;
+      typedef boost::mpl::list <
+       boost::statechart::custom_reaction< MLogRec >
+       > reactions;
+
+      GetMissing(my_context ctx);
+      boost::statechart::result react(const MLogRec& logevt);
+    };
+
+    RecoveryMachine machine;
+    PG *pg;
+    RecoveryCtx *rctx;
+
+  public:
+    RecoveryState(PG *pg) : machine(this, pg), pg(pg), rctx(0) {
+      machine.initiate();
+    }
+
+    void handle_notify(int from, Info& i, RecoveryCtx *ctx);
+    void handle_info(int from, Info& i, RecoveryCtx *ctx);
+    void handle_log(int from,
+                   MOSDPGLog *msg,
+                   RecoveryCtx *ctx);
+    void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx);
+    void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, RecoveryCtx *ctx);
+    void handle_activate_map(RecoveryCtx *ctx);
+    void handle_backlog_generated(RecoveryCtx *ctx);
+    void handle_create(RecoveryCtx *ctx);
+  } recovery_state;
+
+protected:
 
   bool        need_up_thru;
   set<int>    stray_set;   // non-acting osds that have PG data.
@@ -828,9 +1149,9 @@ public:
 
   void generate_past_intervals();
   void trim_past_intervals();
-  void build_prior();
+  void build_prior(std::auto_ptr<PgPriorSet> &prior_set);
   void clear_prior();
-  bool prior_set_affected(const OSDMap *map) const;
+  bool prior_set_affected(PgPriorSet &prior, const OSDMap *osdmap) const;
 
   bool all_unfound_are_lost(const OSDMap* osdmap) const;
   void mark_obj_as_lost(ObjectStore::Transaction& t,
@@ -856,6 +1177,9 @@ public:
 
   void proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog,
                        Missing& omissing, int from);
+  void proc_master_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog,
+                      Missing& omissing, int from);
+  void proc_replica_info(int from, Info &info);
   bool merge_old_entry(ObjectStore::Transaction& t, Log::Entry& oe);
   void merge_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, int from);
   void search_for_missing(const Info &oinfo, const Missing *omissing,
@@ -892,6 +1216,8 @@ public:
   void _activate_committed(epoch_t e);
   void all_activated_and_committed();
 
+  void process_primary_info(ObjectStore::Transaction &t, const Info &info);
+
   bool have_unfound() const { 
     return missing.num_missing() > missing_loc.size();
   }
@@ -977,6 +1303,7 @@ public:
     role(0),
     state(0),
     have_master_log(true),
+    recovery_state(this),
     need_up_thru(false),
     pg_stats_lock("PG::pg_stats_lock"),
     pg_stats_valid(false),
@@ -1056,11 +1383,24 @@ public:
 
   void warm_restart();
                    
+  void fulfill_info(int from, const Query &query, 
+                   pair<int, Info> &notify_info);
+  void fulfill_log(int from, const Query &query);
   bool acting_up_affected(OSDMap &osdmap,
               const OSDMap &lastmap);
+    
   // abstract bits
-  virtual bool handle_advance_map(OSDMap &osdmap,
-                                 const OSDMap &lastmap);
+  virtual void handle_notify(int from, PG::Info& i, RecoveryCtx *ctx) = 0;
+  virtual void handle_info(int from, PG::Info& i, RecoveryCtx *ctx) = 0;
+  virtual void handle_log(int from,
+                         MOSDPGLog *msg,
+                         RecoveryCtx *ctx) = 0;
+  virtual void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx) = 0;
+  virtual void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, 
+                                 RecoveryCtx *ctx) = 0;
+  virtual void handle_activate_map(RecoveryCtx *ctx) = 0;
+  virtual void handle_backlog_generated(RecoveryCtx *ctx) = 0;
+  virtual void handle_create(RecoveryCtx *ctx) = 0;
   virtual void do_op(MOSDOp *op) = 0;
   virtual void do_sub_op(MOSDSubOp *op) = 0;
   virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;
index cd70f783c5192f98fd04558285729b10cf355159..ef83da2331e09053c79c40e398447fd1b512e085 100644 (file)
@@ -662,6 +662,35 @@ public:
   { }
   ~ReplicatedPG() {}
 
+
+  void handle_notify(int from, PG::Info& i, RecoveryCtx *rctx) {
+    recovery_state.handle_notify(from, i, rctx);
+  }
+  void handle_info(int from, PG::Info& i, RecoveryCtx *rctx) {
+    recovery_state.handle_info(from, i, rctx);
+  }
+  void handle_log(int from,
+                 MOSDPGLog *msg,
+                 RecoveryCtx *rctx) {
+    recovery_state.handle_log(from, msg, rctx);
+  }
+  void handle_query(int from, const PG::Query& q, RecoveryCtx *rctx) {
+    recovery_state.handle_query(from, q, rctx);
+  }
+  void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, 
+                         RecoveryCtx *rctx) {
+    recovery_state.handle_advance_map(osdmap, lastmap, rctx);
+  }
+  void handle_activate_map(RecoveryCtx *rctx) {
+    recovery_state.handle_activate_map(rctx);
+  }
+  void handle_backlog_generated(RecoveryCtx *rctx) {
+    recovery_state.handle_backlog_generated(rctx);
+  }
+  void handle_create(RecoveryCtx *rctx) {
+    recovery_state.handle_create(rctx);
+  }
+
   void do_op(MOSDOp *op);
   void do_pg_op(MOSDOp *op);
   void do_sub_op(MOSDSubOp *op);