]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG,OSD: activate pg during replay
authorSamuel Just <samuel.just@dreamhost.com>
Tue, 15 Mar 2011 00:25:46 +0000 (17:25 -0700)
committerSage Weil <sage@newdream.net>
Thu, 17 Mar 2011 19:00:20 +0000 (12:00 -0700)
Replay PGs already accept and queue transactions.  PGs will now go to
active during replay in order to simplify the state reported to the user
and to allow recovery to being.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h

index a503091769ce0d98b12a78ef89f079caeb80688a..427544f6c70a68c2476b4e723342a6c8f9815313 100644 (file)
@@ -4896,25 +4896,19 @@ void OSD::activate_pg(pg_t pgid, utime_t activate_at)
 {
   assert(osd_lock.is_locked());
 
-  map< int, map<pg_t,PG::Query> > query_map;    // peer -> PG -> get_summary_since
-
+  dout(10) << "activate_pg" << dendl;
   if (pg_map.count(pgid)) {
     PG *pg = _lookup_lock_pg(pgid);
-    if (pg->is_crashed() &&
+    if (pg->is_active() &&
        pg->is_replay() &&
        pg->get_role() == 0 &&
        pg->replay_until == activate_at) {
-      ObjectStore::Transaction *t = new ObjectStore::Transaction;
-      C_Contexts *fin = new C_Contexts;
-      pg->activate(*t, fin->contexts, query_map);
-      int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
-      assert(tr == 0);
+
+      pg->replay_queued_ops();
     }
     pg->unlock();
   }
 
-  do_queries(query_map);
-
   // wake up _all_ pg waiters; raw pg -> actual pg mapping may have shifted
   wake_all_pg_waiters();
 }
@@ -5187,22 +5181,17 @@ void OSD::handle_op(MOSDOp *op)
 
  
   // pg must be active.
-  if (!pg->is_active()) {
-    // replay?
+  if (pg->is_replay()) {
     if (op->get_version().version > 0) {
-      if (op->get_version() > pg->info.last_update) {
-       dout(7) << *pg << " queueing replay at " << op->get_version()
-               << " for " << *op << dendl;
-       pg->replay_queue[op->get_version()] = op;
-       pg->unlock();
-       return;
-      } else {
-       dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update 
-               << " for " << *op
-               << ", will queue for WRNOOP" << dendl;
-      }
+      dout(7) << *pg << " queueing replay at " << op->get_version()
+             << " for " << *op << dendl;
+      pg->replay_queue[op->get_version()] = op;
+      pg->unlock();
+      return;
     }
-    
+  }
+       
+  if (!pg->is_active()) {
     dout(7) << *pg << " not active (yet)" << dendl;
     pg->waiting_for_active.push_back(op);
     pg->unlock();
index 8c32f3ef1b0e972bd543b6159e28c494efe754a3..fbd9779b812e3bb41b0e694c661e3482f60ca49a 100644 (file)
@@ -1712,16 +1712,16 @@ void PG::do_peer(ObjectStore::Transaction& t, list<Context*>& tfin,
     dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window
             << " until " << replay_until << dendl;
     state_set(PG_STATE_REPLAY);
-    state_clear(PG_STATE_PEERING);
     osd->replay_queue_lock.Lock();
     osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
     osd->replay_queue_lock.Unlock();
-  } 
-  else if (!is_active()) {
-    // -- ok, activate!
+  }
+
+  if (!is_active()) {
     activate(t, tfin, query_map, activator_map);
   }
-  else if (is_all_uptodate()) 
+
+  if (is_all_uptodate()) 
     finish_recovery(t, tfin);
 }
 
@@ -1797,11 +1797,7 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
   state_clear(PG_STATE_STRAY);
   state_clear(PG_STATE_DOWN);
   state_clear(PG_STATE_PEERING);
-  if (is_crashed()) {
-    //assert(is_replay());      // HELP.. not on replica?
-    state_clear(PG_STATE_CRASHED);
-    state_clear(PG_STATE_REPLAY);
-  }
+  state_clear(PG_STATE_CRASHED);
   if (is_primary() && 
       osd->osdmap->get_pg_size(info.pgid) != acting.size())
     state_set(PG_STATE_DEGRADED);
@@ -1976,35 +1972,39 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
     update_stats();
   }
 
-  
-  // replay (queue them _before_ other waiting ops!)
-  if (!replay_queue.empty()) {
-    eversion_t c = info.last_update;
-    list<Message*> replay;
-    for (map<eversion_t,MOSDOp*>::iterator p = replay_queue.begin();
-         p != replay_queue.end();
-         p++) {
-      if (p->first <= info.last_update) {
-        dout(10) << "activate will WRNOOP " << p->first << " " << *p->second << dendl;
-        replay.push_back(p->second);
-        continue;
-      }
-      if (p->first.version != c.version+1) {
-        dout(10) << "activate replay " << p->first
-                 << " skipping " << c.version+1 - p->first.version 
-                 << " ops"
-                 << dendl;      
-      }
-      dout(10) << "activate replay " << p->first << " " << *p->second << dendl;
-      replay.push_back(p->second);
+  // waiters
+  if (!is_replay()) {
+    osd->take_waiters(waiting_for_active);
+  }
+}
+
+
+void PG::replay_queued_ops()
+{
+  assert(is_replay() && is_active() && !is_crashed());
+  eversion_t c = info.last_update;
+  list<Message*> replay;
+  dout(10) << "replay_queued_ops" << dendl;
+  state_clear(PG_STATE_REPLAY);
+
+  for (map<eversion_t,MOSDOp*>::iterator p = replay_queue.begin();
+       p != replay_queue.end();
+       p++) {
+    if (p->first.version != c.version+1) {
+      dout(10) << "activate replay " << p->first
+              << " skipping " << c.version+1 - p->first.version 
+              << " ops"
+              << dendl;      
       c = p->first;
     }
-    replay_queue.clear();
-    osd->take_waiters(replay);
+    dout(10) << "activate replay " << p->first << " " << *p->second << dendl;
+    replay.push_back(p->second);
   }
-
-  // waiters
+  replay_queue.clear();
+  osd->take_waiters(replay);
   osd->take_waiters(waiting_for_active);
+  state_clear(PG_STATE_REPLAY);
+  update_stats();
 }
 
 void PG::_activate_committed(epoch_t e)
index 7810cb1ba6f3bab8dd67306a11a4e1af7b4740f1..b79ac7f48c523449cabbe5dc8478df3481a85ceb 100644 (file)
@@ -861,6 +861,7 @@ public:
              map< int, map<pg_t,Query> >& query_map,
              map<int, MOSDPGInfo*> *activator_map=0);
   void build_might_have_unfound();
+  void replay_queued_ops();
   void activate(ObjectStore::Transaction& t, list<Context*>& tfin,
                map< int, map<pg_t,Query> >& query_map,
                map<int, MOSDPGInfo*> *activator_map=0);