]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: change refragment journaling/store strategy
authorSage Weil <sage@newdream.net>
Wed, 5 Jan 2011 23:31:06 +0000 (15:31 -0800)
committerSage Weil <sage@newdream.net>
Thu, 6 Jan 2011 00:48:17 +0000 (16:48 -0800)
We had a serious problem before where we were updating the cache and
redivvying up the dentries among fragments, but not immediately
journaling it.  This was okay only if we were lucky and no other update
journaled something (e.g. some random child journaling its ancestors).

Instead, journal (PREPARE) immediately and in parallel with the new
dirfrag stores.  When the stores complete, journal again (COMMIT).  On
journal replay, for any PREPAREs without matching COMMITS we immediately
journal a ROLLBACK.

Other behavior is essentially unchanged.  We don't send the notify until
both the PREPARE and STORES complete.  But that part doesn't really matter:
if we restart and rollback, peers will find out during resolve/rejoin,
as before.

Signed-off-by: Sage Weil <sage@newdream.net>
src/mds/CDir.cc
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/MDS.cc
src/mds/MDS.h
src/mds/events/EFragment.h
src/mds/journal.cc

index d146c67b60e12808e13b2121000e648ae57b21fc..d3b0b88ff5a124b15d58fa7efd612b748b2e0a8d 100644 (file)
@@ -784,7 +784,7 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
   for (list<CDir*>::iterator p = subs.begin(); p != subs.end(); p++) {
     CDir *dir = *p;
     dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
-    assert(!dir->is_auth() || dir->is_complete());
+    assert(!dir->is_auth() || dir->is_complete() || replay);
     
     // steal dentries
     while (!dir->items.empty()) 
index 621a43a326cb361b50c0ec39edbc16916b25f927..fbecbd530302cb0d517bf23fa06af0a4f2ffe6c4 100644 (file)
@@ -8947,7 +8947,11 @@ void MDCache::merge_dir(CInode *diri, frag_t frag)
     return;
   }
 
-  C_Gather *gather = new C_Gather(new C_MDC_FragmentFrozen(this, dirs, frag, 0));
+  CDir *first = dirs.front();
+  int bits = first->get_frag().bits() - frag.bits();
+  dout(10) << " we are merginb by " << bits << " bits" << dendl;
+
+  C_Gather *gather = new C_Gather(new C_MDC_FragmentFrozen(this, dirs, frag, bits));
   fragment_freeze_dirs(dirs, gather);
 
   // initial mark+complete pass
@@ -9039,16 +9043,17 @@ void MDCache::fragment_unmark_unfreeze_dirs(list<CDir*>& dirs)
   }
 }
 
-class C_MDC_FragmentStored : public Context {
+class C_MDC_FragmentLoggedAndStored : public Context {
   MDCache *mdcache;
-  list<CDir*> dirs;
+  Mutation *mut;
+  list<CDir*> resultfrags;
   frag_t basefrag;
   int bits;
 public:
-  C_MDC_FragmentStored(MDCache *m, list<CDir*>& d, frag_t bf, int b) :
-    mdcache(m), dirs(d), basefrag(bf), bits(b) {}
+  C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) : 
+    mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
   virtual void finish(int r) {
-    mdcache->fragment_stored(dirs, basefrag, bits);
+    mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits);
   }
 };
 
@@ -9059,7 +9064,7 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
   if (bits > 0) {
     assert(dirs.size() == 1);
   } else {
-    assert(bits == 0);
+    assert(bits < 0);
   }
 
   dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits 
@@ -9083,44 +9088,11 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
   adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false);
   mds->queue_waiters(waiters);
 
-  C_Gather *gather = new C_Gather(new C_MDC_FragmentStored(this, resultfrags, basefrag, bits));
-
-  // freeze, store resulting frags
-  for (list<CDir*>::iterator p = resultfrags.begin();
-       p != resultfrags.end();
-       p++) {
-    CDir *dir = *p;
-    dout(10) << " result frag " << *dir << dendl;
-    dir->state_set(CDir::STATE_FRAGMENTING);
-    dir->commit(0, gather->new_sub(), true);  // ignore authpinnability
-    dir->_freeze_dir();
-  }  
-}
-
-class C_MDC_FragmentLogged : public Context {
-  MDCache *mdcache;
-  Mutation *mut;
-  list<CDir*> resultfrags;
-  frag_t basefrag;
-  int bits;
-public:
-  C_MDC_FragmentLogged(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) : 
-    mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
-  virtual void finish(int r) {
-    mdcache->fragment_logged(mut, resultfrags, basefrag, bits);
-  }
-};
-
-void MDCache::fragment_stored(list<CDir*>& resultfrags, frag_t basefrag, int bits)
-{
-  CInode *diri = resultfrags.front()->get_inode();
-  dout(10) << "fragment_stored " << resultfrags << " " << basefrag << " by " << bits 
-          << " on " << *diri << dendl;
-
+  // journal
   Mutation *mut = new Mutation;
 
   mut->ls = mds->mdlog->get_current_segment();
-  EFragment *le = new EFragment(mds->mdlog, diri->ino(), basefrag, bits);
+  EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits);
   mds->mdlog->start_entry(le);
 
   le->metablob.add_dir_context(*resultfrags.begin());
@@ -9142,27 +9114,38 @@ void MDCache::fragment_stored(list<CDir*>& resultfrags, frag_t basefrag, int bit
   mut->add_updated_lock(&diri->nestlock);
   */
 
-  // journal new dirfrag fragstats for each new fragment.
+  // freeze, journal, and store resulting frags
+  C_Gather *gather = new C_Gather(new C_MDC_FragmentLoggedAndStored(this, mut, resultfrags, basefrag, bits));
+
   for (list<CDir*>::iterator p = resultfrags.begin();
        p != resultfrags.end();
        p++) {
     CDir *dir = *p;
     dout(10) << " result frag " << *dir << dendl;
     le->metablob.add_dir(dir, false);
+
+    // freeze and store them too
+    dir->state_set(CDir::STATE_FRAGMENTING);
+    dir->commit(0, gather->new_sub(), true);  // ignore authpinnability
+    dir->_freeze_dir();
   }
-  
-  mds->mdlog->submit_entry(le,
-                          new C_MDC_FragmentLogged(this, mut, resultfrags, basefrag, bits));
+
+  mds->mdlog->submit_entry(le, gather->new_sub());
   mds->mdlog->flush();
 }
 
-void MDCache::fragment_logged(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
+void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
 {
   CInode *diri = resultfrags.front()->get_inode();
 
-  dout(10) << "fragment_logged " << resultfrags << " " << basefrag << " bits " << bits 
+  dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits 
           << " on " << *diri << dendl;
   
+  // journal commit
+  EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits);
+  mds->mdlog->start_entry(le);
+  mds->mdlog->submit_entry(le);
+
   // tell peers
   CDir *first = *resultfrags.begin();
   for (map<int,int>::iterator p = first->replica_map.begin();
@@ -9240,6 +9223,21 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
 }
 
 
+void MDCache::rollback_uncommitted_fragments()
+{
+  dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl;
+  for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin();
+       p != uncommitted_fragments.end();
+       ++p) {
+    CInode *diri = get_inode(p->first.ino);
+    assert(diri);
+    dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl;
+    list<CDir*> resultfrags;
+    list<Context*> waiters;
+    adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true);
+  }
+  uncommitted_fragments.clear();
+}
 
 
 
index 6c2fc069c0bd98e2a3670e4ded9a59df0bc6504d..12d122bffd04da77f1236bd578da8cadef81f030 100644 (file)
@@ -1096,6 +1096,9 @@ protected:
 
 
   // -- fragmenting --
+public:
+  set< pair<dirfrag_t,int> > uncommitted_fragments;  // prepared but uncommitted refragmentations
+
 private:
   void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
                            list<CDir*>& frags, list<Context*>& waiters, bool replay);
@@ -1123,12 +1126,14 @@ private:
   void fragment_mark_and_complete(list<CDir*>& dirs);
   void fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits);
   void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
-  void fragment_stored(list<CDir*>& resultfrags, frag_t basefrag, int bits);
-  void fragment_logged(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits);
+  void fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits);
+public:
+  void rollback_uncommitted_fragments();
+private:
+
   friend class C_MDC_FragmentFrozen;
   friend class C_MDC_FragmentMarking;
-  friend class C_MDC_FragmentStored;
-  friend class C_MDC_FragmentLogged;
+  friend class C_MDC_FragmentLoggedAndStored;
 
   void handle_fragment_notify(MMDSFragmentNotify *m);
 
index d795ea504dac8958bebcadf0beb4b6e7dfab273d..6a4d4334ad140cd3f1a40d42ef8339121cfdf2b0 100644 (file)
@@ -1225,13 +1225,22 @@ void MDS::replay_done()
   }
 }
 
+void MDS::reopen_log()
+{
+  dout(1) << "reopen_log" << dendl;
+
+  // start new segment
+  mdlog->start_new_segment(0);
+
+  mdcache->rollback_uncommitted_fragments();
+}
+
 
 void MDS::resolve_start()
 {
   dout(1) << "resolve_start" << dendl;
 
-  // start new segment
-  mdlog->start_new_segment(0);
+  reopen_log();
 
   mdcache->resolve_start();
 }
@@ -1246,7 +1255,7 @@ void MDS::reconnect_start()
   dout(1) << "reconnect_start" << dendl;
 
   if (last_state == MDSMap::STATE_REPLAY)
-    mdlog->start_new_segment(0);
+    reopen_log();
 
   server->reconnect_clients();
   finish_contexts(waiting_for_reconnect);
index 8b982cf7202f555c53a8925b55b94a3e4c60c4cd..a8609a8ece58ffff91948857bb0df460b1504fee 100644 (file)
@@ -353,6 +353,7 @@ class MDS : public Dispatcher {
   void creating_done();
   void starting_done();
   void replay_done();
+  void reopen_log();
 
   void resolve_start();
   void resolve_done();
index 72adf0063f7245627b0b0ed1903e49a9be43cd78..3c9a93b549d27fdb388506b17ecbe009638ed97b 100644 (file)
 class EFragment : public LogEvent {
 public:
   EMetaBlob metablob;
+  __u8 op;
   inodeno_t ino;
   frag_t basefrag;
   __s32 bits;         // positive for split (from basefrag), negative for merge (to basefrag)
 
   EFragment() : LogEvent(EVENT_FRAGMENT) { }
-  EFragment(MDLog *mdlog, inodeno_t i, frag_t bf, int b) : 
+  EFragment(MDLog *mdlog, int o, inodeno_t i, frag_t bf, int b) : 
     LogEvent(EVENT_FRAGMENT), metablob(mdlog), 
-    ino(i), basefrag(bf), bits(b) { }
+    op(o), ino(i), basefrag(bf), bits(b) { }
   void print(ostream& out) {
-    out << "EFragment " << ino << " " << basefrag << " by " << bits << " " << metablob;
+    out << "EFragment " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << " " << metablob;
+  }
+
+  enum {
+    OP_PREPARE = 1,
+    OP_COMMIT = 2,
+    OP_ROLLBACK = 3,
+    OP_ONESHOT = 4,  // (legacy) PREPARE+COMMIT
+  };
+  const char *op_name(int o) {
+    switch (o) {
+    case OP_PREPARE: return "prepare";
+    case OP_COMMIT: return "commit";
+    case OP_ROLLBACK: return "rollback";
+    default: return "???";
+    }
   }
 
   void encode(bufferlist &bl) const {
-    __u8 struct_v = 2;
+    __u8 struct_v = 3;
     ::encode(struct_v, bl);
     ::encode(stamp, bl);
+    ::encode(op, bl);
     ::encode(ino, bl);
     ::encode(basefrag, bl);
     ::encode(bits, bl);
@@ -47,6 +64,10 @@ public:
     ::decode(struct_v, bl);
     if (struct_v >= 2)
       ::decode(stamp, bl);
+    if (struct_v >= 3)
+      ::decode(op, bl);
+    else
+      op = OP_ONESHOT;
     ::decode(ino, bl);
     ::decode(basefrag, bl);
     ::decode(bits, bl);
index 3cb188bd1c4aeb58edc63e7cdeffe9944eb0128a..62228a1b1cd683b8b6cbd4ae0a0cf83cbb3a0a75 100644 (file)
@@ -983,12 +983,29 @@ void ESubtreeMap::replay(MDS *mds)
 
 void EFragment::replay(MDS *mds)
 {
-  dout(10) << "EFragment.replay " << ino << " " << basefrag << " by " << bits << dendl;
+  dout(10) << "EFragment.replay " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << dendl;
   CInode *in = mds->mdcache->get_inode(ino);
   if (in) {
     list<CDir*> resultfrags;
     list<Context*> waiters;
-    mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
+
+    pair<dirfrag_t,int> desc(dirfrag_t(ino,basefrag), bits);
+
+    switch (op) {
+    case OP_PREPARE:
+      mds->mdcache->uncommitted_fragments.insert(desc);
+    case OP_ONESHOT:
+      mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
+      break;
+
+    case OP_COMMIT:
+      mds->mdcache->uncommitted_fragments.erase(desc);
+      break;
+
+    case OP_ROLLBACK:
+      mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
+      break;
+    }
   }
   metablob.replay(mds, _segment);
 }