]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
idallocator recovery replay working!
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 24 Oct 2006 19:52:34 +0000 (19:52 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 24 Oct 2006 19:52:34 +0000 (19:52 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@945 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/mds/IdAllocator.cc
ceph/mds/IdAllocator.h
ceph/mds/LogEvent.h
ceph/mds/MDLog.cc
ceph/mds/MDLog.h
ceph/mds/MDS.cc
ceph/mds/events/EAlloc.h

index cd3821cb1291d9ed0bf3f1ab1918f88d0aeeba44..7399b4792de79b6d202c111049fbf64ccc396ef2 100644 (file)
@@ -29,7 +29,7 @@
 #define dout(x)  if (x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".idalloc: "
 
 
-idno_t IdAllocator::alloc_id() 
+idno_t IdAllocator::alloc_id(bool replay
 {
   assert(is_active());
   
@@ -41,12 +41,13 @@ idno_t IdAllocator::alloc_id()
   version++;
   
   // log it
-  mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_ALLOC, version));
+  if (!replay)
+    mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_ALLOC, version));
   
   return id;
 }
 
-void IdAllocator::reclaim_id(idno_t id) 
+void IdAllocator::reclaim_id(idno_t id, bool replay
 {
   assert(is_active());
   
@@ -55,7 +56,8 @@ void IdAllocator::reclaim_id(idno_t id)
 
   version++;
   
-  mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_FREE, version));
+  if (!replay)
+    mds->mdlog->submit_entry(new EAlloc(IDTYPE_INO, id, EALLOC_EV_FREE, version));
 }
 
 
@@ -63,41 +65,54 @@ void IdAllocator::reclaim_id(idno_t id)
 class C_ID_Save : public Context {
   IdAllocator *ida;
   version_t version;
-  Context *onfinish;
 public:
-  C_ID_Save(IdAllocator *i, version_t v, Context *c) : ida(i), version(v), onfinish(c) {}
+  C_ID_Save(IdAllocator *i, version_t v) : ida(i), version(v) {}
   void finish(int r) {
-    ida->save_2(version, onfinish);
+    ida->save_2(version);
   }
 };
 
-void IdAllocator::save(Context *onfinish)
+void IdAllocator::save(Context *onfinish, version_t v)
 {
+  if (v > 0 && v <= committing_version) {
+    dout(10) << "save v " << version << " - already saving "
+            << committing_version << " >= needed " << v << endl;
+    waitfor_save[v].push_back(onfinish);
+    return;
+  }
+  
   dout(10) << "save v " << version << endl;
   assert(is_active());
-
+  
   bufferlist bl;
 
   bl.append((char*)&version, sizeof(version));
   ::_encode(free.m, bl);
 
+  committing_version = version;
+
+  waitfor_save[version].push_back(onfinish);
+
   // write (async)
   mds->filer->write(inode,
                     0, bl.length(), bl,
                     0,
-                   0, new C_ID_Save(this, version, onfinish));
+                   0, new C_ID_Save(this, version));
 }
 
-void IdAllocator::save_2(version_t v, Context *onfinish)
+void IdAllocator::save_2(version_t v)
 {
   dout(10) << "save_2 v " << v << endl;
-    
+  
   committed_version = v;
-
-  if (onfinish) {
-    onfinish->finish(0);
-    delete onfinish;
+  
+  list<Context*> ls;
+  while (!waitfor_save.empty()) {
+    if (waitfor_save.begin()->first > v) break;
+    ls.splice(ls.end(), waitfor_save.begin()->second);
+    waitfor_save.erase(waitfor_save.begin());
   }
+  finish_contexts(ls,0);
 }
 
 
index c04d7e60ae1a862d6ddce5b49f6d479c79a02852..745d863be99d336a6af8577a7ee5a7d438ee696e 100644 (file)
@@ -35,22 +35,24 @@ class IdAllocator {
   //static const int STATE_COMMITTING = 3;
   int state;
 
-  version_t version, committed_version;
+  version_t version, committing_version, committed_version;
 
   interval_set<idno_t> free;   // unused ids
   
+  map<version_t, list<Context*> > waitfor_save;
+
  public:
   IdAllocator(MDS *m, inode_t i) :
     mds(m),
     inode(i),
     state(STATE_UNDEF),
-    version(0), committed_version(0)
+    version(0), committing_version(0), committed_version(0)
   {
   }
 
   // alloc or reclaim ids
-  idno_t alloc_id();
-  void reclaim_id(idno_t id);
+  idno_t alloc_id(bool replay=false);
+  void reclaim_id(idno_t id, bool replay=false);
 
   version_t get_version() { return version; }
   version_t get_committed_version() { return committed_version; }
@@ -61,8 +63,8 @@ class IdAllocator {
   bool is_opening() { return state == STATE_OPENING; }
 
   void reset();
-  void save(Context *onfinish=0);
-  void save_2(version_t v, Context *onfinish);
+  void save(Context *onfinish=0, version_t need=0);
+  void save_2(version_t v);
 
   void shutdown() {
     if (is_active()) save(0);
index d69f17111414a5a3bdf110536b9db6347d4bc71e..b2f29735d8d6f4047bc3bb6a62ced5d0ea89c53e 100644 (file)
@@ -48,6 +48,11 @@ class LogEvent {
   static LogEvent *decode(bufferlist &bl);
 
 
+  virtual void print(ostream& out) { 
+    out << "event(" << _type << ")";
+  }
+
+
   /*** live journal ***/
 
   /* obsolete() - is this entry committed to primary store, such that
@@ -74,8 +79,13 @@ class LogEvent {
 
   /* replay() - replay given event
    */
-  virtual void replay(MDS *m, Context *c) { assert(0); }
+  virtual void replay(MDS *m) { assert(0); }
 
 };
 
+inline ostream& operator<<(ostream& out, LogEvent& le) {
+  le.print(out);
+  return out;
+}
+
 #endif
index 4b0e77396ffb5c823502ee9d341145da0bacd51f..e8061a98fe56c03ff8ebd39c442e2b7efb5b1f11 100644 (file)
@@ -100,8 +100,8 @@ void MDLog::write_head(Context *c)
 void MDLog::submit_entry( LogEvent *e,
                          Context *c ) 
 {
-  dout(5) << "submit_entry at " << num_events << endl;
-
+  dout(5) << "submit_entry " << journaler->get_write_pos() << " : " << *e << endl;
+  
   if (g_conf.mds_log) {
     // encode it, with event type
     bufferlist bl;
@@ -196,7 +196,7 @@ void MDLog::_did_read()
 
 void MDLog::_trimmed(LogEvent *le) 
 {
-  dout(7) << "  trimmed " << le << endl;
+  dout(7) << "  trimmed " << *le << endl;
   
   assert(le->can_expire(mds));
 
@@ -246,14 +246,14 @@ void MDLog::trim(Context *c)
       // we just read an event.
       if (le->can_expire(mds) == true) {
         // obsolete
-        dout(7) << "trim  obsolete " << le << endl;
+        dout(7) << "trim  obsolete " << *le << endl;
         delete le;
         logger->inc("obs");
       } else {
         assert ((int)trimming.size() < g_conf.mds_log_max_trimming);
 
         // trim!
-        dout(7) << "trim  trimming " << le << endl;
+        dout(7) << "trim  trimming " << *le << endl;
         trimming[le->_end_off] = le;
         le->retire(mds, new C_MDL_Trimmed(this, le));
         logger->inc("retire");
@@ -286,3 +286,86 @@ void MDLog::trim(Context *c)
 }
 
 
+void MDLog::replay(Context *c)
+{
+  assert(journaler->is_active());
+
+  // start reading at the last known expire point.
+  journaler->set_read_pos( journaler->get_expire_pos() );
+
+  // empty?
+  if (journaler->get_read_pos() == journaler->get_write_pos()) {
+    dout(10) << "replay - journal empty, done." << endl;
+    if (c) {
+      c->finish(0);
+      delete c;
+    }
+    return;
+  }
+
+  // add waiter
+  if (c)
+    waitfor_replay.push_back(c);
+
+  // go!
+  dout(10) << "replay start, from " << journaler->get_read_pos()
+          << " to " << journaler->get_write_pos() << endl;
+
+  assert(num_events == 0);
+
+  _replay(); 
+}
+
+class C_MDL_Replay : public Context {
+  MDLog *mdlog;
+public:
+  C_MDL_Replay(MDLog *l) : mdlog(l) {}
+  void finish(int r) { mdlog->_replay(); }
+};
+
+void MDLog::_replay()
+{
+  // read what's buffered
+  while (journaler->is_readable() &&
+        journaler->get_read_pos() < journaler->get_write_pos()) {
+    // read it
+    off_t pos = journaler->get_read_pos();
+    bufferlist bl;
+    bool r = journaler->try_read_entry(bl);
+    assert(r);
+    
+    // unpack event
+    LogEvent *le = LogEvent::decode(bl);
+    num_events++;
+
+    if (le->has_happened(mds)) {
+      dout(10) << "_replay " << pos << " / " << journaler->get_write_pos() 
+              << " : " << *le << " : already happened" << endl;
+    } else {
+      dout(10) << "_replay " << pos << " / " << journaler->get_write_pos() 
+              << " : " << *le << " : applying" << endl;
+      le->replay(mds);
+    }
+    delete le;
+  }
+
+  // wait for read?
+  if (journaler->get_read_pos() < journaler->get_write_pos()) {
+    journaler->wait_for_readable(new C_MDL_Replay(this));
+    return;    
+  }
+
+  // done!
+  assert(journaler->get_read_pos() == journaler->get_write_pos());
+  dout(10) << "_replay - complete" << endl;
+
+  // move read pointer _back_ to expire pos, for eventual trimming
+  journaler->set_read_pos(journaler->get_expire_pos());
+
+  // kick waiter(s)
+  list<Context*> ls;
+  ls.swap(waitfor_replay);
+  finish_contexts(ls,0);  
+}
+
+
index 7ff64f81d904526495ab6c8fcc2d7180bd132801..37329a164e7816346999308931b8542b3fc51a62 100644 (file)
@@ -50,6 +50,7 @@ class MDLog {
 
   inode_t log_inode;
   Journaler *journaler;
+
   
   //hash_map<LogEvent*>  trimming;       // events currently being trimmed
   map<off_t, LogEvent*> trimming;
@@ -61,6 +62,8 @@ class MDLog {
 
   Logger *logger;
   
+  list<Context*> waitfor_replay;
+
  public:
   MDLog(MDS *m);
   ~MDLog();
@@ -80,6 +83,9 @@ class MDLog {
   void reset();  // fresh, empty log! 
   void open(Context *onopen);
   void write_head(Context *onfinish);
+
+  void replay(Context *onfinish);
+  void _replay();
 };
 
 #endif
index bd0bf4d3ca5d0e5a42fe21bf885e2a604c76f205..6e7f50775e1b6f6096f67c070815f85ed1028a9a 100644 (file)
@@ -339,8 +339,8 @@ void MDS::boot_recover(int step)
     
   case 4:
     dout(2) << "boot_recover " << step << ": replaying mds log" << endl;
-
-    //break;
+    mdlog->replay(new C_MDS_BootRecover(this, 5));
+    break;
 
   case 5:
     dout(2) << "boot_recover " << step << ": done." << endl;
@@ -493,11 +493,13 @@ void MDS::my_dispatch(Message *m)
   */
 
 
-  // flush log to disk after every op.  for now.
-  mdlog->flush();
+  if (is_active()) {
+    // flush log to disk after every op.  for now.
+    mdlog->flush();
 
-  // trim cache
-  mdcache->trim();
+    // trim cache
+    mdcache->trim();
+  }
   
   // finish any triggered contexts
   if (finished_queue.size()) {
@@ -508,7 +510,6 @@ void MDS::my_dispatch(Message *m)
     finish_contexts(ls);
   }
 
-
   
 
   // hash root?
@@ -525,7 +526,8 @@ void MDS::my_dispatch(Message *m)
   // periodic crap (1-second resolution)
   static utime_t last_log = g_clock.recent_now();
   utime_t now = g_clock.recent_now();
-  if (last_log.sec() != now.sec()) {
+  if (is_active() && 
+      last_log.sec() != now.sec()) {
 
     // log
     last_log = now;
index 5cdfa4355cf99a0298512f9a148e275ca93df31d..9959e5704bfbe7456953bac76fd8ce8a537193eb 100644 (file)
@@ -46,6 +46,14 @@ class EAlloc : public LogEvent {
   }
 
 
+  void print(ostream& out) {
+       if (what == EALLOC_EV_ALLOC) 
+         out << "alloc " << hex << id << dec << " tablev " << table_version;
+       else
+         out << "dealloc " << hex << id << dec << " tablev " << table_version;
+  }
+  
+
   // live journal
   bool can_expire(MDS *mds) {
        if (mds->idalloc->get_committed_version() <= table_version)
@@ -55,24 +63,28 @@ class EAlloc : public LogEvent {
   }
 
   void retire(MDS *mds, Context *c) {
-       mds->idalloc->save(c);
+       mds->idalloc->save(c, table_version);
   }
 
 
   // recovery
   bool has_happened(MDS *mds) {
-       return (mds->idalloc->get_version() >= table_version);
+       if (mds->idalloc->get_version() >= table_version) {
+         cout << " event " << table_version << " <= table " << mds->idalloc->get_version() << endl;
+         return true;
+       } else 
+         return false;
   }
 
-  void replay(MDS *mds, Context *c) {
+  void replay(MDS *mds) {
        assert(table_version-1 == mds->idalloc->get_version());
        
        if (what == EALLOC_EV_ALLOC) {
-         idno_t nid = mds->idalloc->alloc_id();
+         idno_t nid = mds->idalloc->alloc_id(true);
          assert(nid == id);       // this should match.
        } 
        else if (what == EALLOC_EV_FREE) {
-         mds->idalloc->reclaim_id(id);
+         mds->idalloc->reclaim_id(id, true);
        } 
        else
          assert(0);