]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: delete orphan dirfrags after fragmentating directory
authorYan, Zheng <zheng.z.yan@intel.com>
Thu, 19 Sep 2013 03:07:17 +0000 (11:07 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Fri, 4 Oct 2013 09:29:20 +0000 (17:29 +0800)
delete old dirfrags after the EFragment::OP_COMMIT event is logged.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/journal.cc

index 91fadb73df90f6a40461e72e43255c78f5a69e96..8e9dd89db71b787aac55ae68c5e3a937c16a8ab6 100644 (file)
@@ -11051,6 +11051,20 @@ public:
   }
 };
 
+class C_MDC_FragmentCommit : public Context {
+  MDCache *mdcache;
+  dirfrag_t basedirfrag;
+  list<CDir*> resultfrags;
+public:
+  C_MDC_FragmentCommit(MDCache *m, inodeno_t ino, frag_t f, list<CDir*>& l) :
+    mdcache(m), basedirfrag(ino, f) {
+    resultfrags.swap(l);
+  }
+  virtual void finish(int r) {
+    mdcache->_fragment_committed(basedirfrag, resultfrags);
+  }
+};
+
 void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
 {
   dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
@@ -11111,6 +11125,10 @@ void MDCache::dispatch_fragment_dir(MDRequest *mdr)
                                info.basefrag, info.bits);
   mds->mdlog->start_entry(le);
 
+  list<frag_t> old_frags;
+  for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p)
+    old_frags.push_back((*p)->get_frag());
+
   // refragment
   list<Context*> waiters;
   adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits,
@@ -11149,10 +11167,12 @@ void MDCache::dispatch_fragment_dir(MDRequest *mdr)
     le->metablob.add_dir(dir, false);
 
     // freeze and store them too
+    dir->auth_pin(this);
     dir->state_set(CDir::STATE_FRAGMENTING);
     dir->commit(0, gather.new_sub(), true);  // ignore authpinnability
   }
 
+  add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, old_frags);
   mds->mdlog->submit_entry(le, gather.new_sub());
   mds->mdlog->flush();
   gather.activate();
@@ -11166,11 +11186,6 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr)
 
   dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag
           << " bits " << info.bits << " on " << *diri << dendl;
-  
-  // journal commit
-  EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(),
-                               info.basefrag, info.bits);
-  mds->mdlog->start_submit_entry(le);
 
   // tell peers
   CDir *first = *info.resultfrags.begin();
@@ -11199,9 +11214,6 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr)
        ++p) {
     CDir *dir = *p;
     dout(10) << " result frag " << *dir << dendl;
-    
-    // unmark, unfreeze
-    dir->state_clear(CDir::STATE_FRAGMENTING);  
 
     for (CDir::map_t::iterator p = dir->items.begin();
         p != dir->items.end();
@@ -11212,13 +11224,65 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr)
       dn->put(CDentry::PIN_FRAGMENTING);
     }
 
+    // unfreeze
     dir->unfreeze_dir();
   }
 
+  // journal commit
+  EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT,
+                               diri->ino(), info.basefrag, info.bits);
+  mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, diri->ino(), info.basefrag,
+                                                             info.resultfrags));
+
   fragment_requests.erase(mdr->reqid);
   request_finish(mdr);
 }
 
+void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+  dout(10) << "fragment_committed " << basedirfrag << dendl;
+  assert(uncommitted_fragments.count(basedirfrag));
+  ufragment &uf = uncommitted_fragments[basedirfrag];
+
+  // remove old frags
+  C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFinish(this, basedirfrag, resultfrags));
+
+  SnapContext nullsnapc;
+  object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+  for (list<frag_t>::iterator p = uf.old_frags.begin();
+       p != uf.old_frags.end();
+       ++p) {
+    object_t oid = CInode::get_object_name(basedirfrag.ino, *p, "");
+    ObjectOperation op;
+    if (*p == frag_t()) {
+      // backtrace object
+      dout(10) << " truncate orphan dirfrag " << oid << dendl;
+      op.truncate(0);
+    } else {
+      dout(10) << " removing orphan dirfrag " << oid << dendl;
+      op.remove();
+    }
+    mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context),
+                         0, NULL, gather.new_sub());
+  }
+
+  assert(gather.has_subs());
+  gather.activate();
+}
+
+void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+  dout(10) << "fragment_finish " << basedirfrag << dendl;
+  assert(uncommitted_fragments.count(basedirfrag));
+
+  // unmark & auth_unpin
+  for (list<CDir*>::iterator p = resultfrags.begin(); p != resultfrags.end(); ++p) {
+    (*p)->state_clear(CDir::STATE_FRAGMENTING);
+    (*p)->auth_unpin(this);
+  }
+
+  finish_uncommitted_fragment(basedirfrag);
+}
 
 /* This function DOES put the passed message before returning */
 void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
@@ -11264,23 +11328,40 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
   notify->put();
 }
 
+void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags)
+{
+  dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl;
+  assert(!uncommitted_fragments.count(basedirfrag));
+  ufragment& uf = uncommitted_fragments[basedirfrag];
+  uf.old_frags = old_frags;
+  uf.bits = bits;
+}
+
+void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag)
+{
+  dout(10) << "finish_uncommitted_fragments: base dirfrag " << basedirfrag << dendl;
+  if (uncommitted_fragments.count(basedirfrag)) {
+    uncommitted_fragments.erase(basedirfrag);
+  }
+}
 
 void MDCache::rollback_uncommitted_fragments()
 {
   dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl;
-  for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin();
+  for (map<dirfrag_t, ufragment>::iterator p = uncommitted_fragments.begin();
        p != uncommitted_fragments.end();
        ++p) {
+    ufragment &uf = p->second;
     CInode *diri = get_inode(p->first.ino);
     assert(diri);
-    dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl;
+    dout(10) << " rolling back " << p->first << " refragment by " << uf.bits << " bits" << dendl;
     list<CDir*> resultfrags;
     list<Context*> waiters;
-    adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true);
+    adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true);
     if (g_conf->mds_debug_frag)
       diri->verify_dirfrags();
 
-    EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, p->second);
+    EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, uf.bits);
     mds->mdlog->start_submit_entry(le);
   }
   uncommitted_fragments.clear();
index cb219360ccfcff3cd4287bdbd5b78663175a39a7..8560ce481fbc2e73ca929136d2dd7312cab65357 100644 (file)
@@ -942,10 +942,14 @@ protected:
 
 
   // -- fragmenting --
-public:
-  set< pair<dirfrag_t,int> > uncommitted_fragments;  // prepared but uncommitted refragmentations
-
 private:
+  struct ufragment {
+    int bits;
+    list<frag_t> old_frags;
+    ufragment() : bits(0) {}
+  };
+  map<dirfrag_t, ufragment> uncommitted_fragments;
+
   struct fragment_info_t {
     frag_t basefrag;
     int bits;
@@ -981,6 +985,9 @@ private:
   void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
   void dispatch_fragment_dir(MDRequest *mdr);
   void fragment_logged_and_stored(MDRequest *mdr);
+  void _fragment_committed(dirfrag_t f, list<CDir*>& resultfrags);
+  void _fragment_finish(dirfrag_t f, list<CDir*>& resultfrags);
+
 public:
   void rollback_uncommitted_fragments();
 private:
@@ -988,9 +995,12 @@ private:
   friend class C_MDC_FragmentFrozen;
   friend class C_MDC_FragmentMarking;
   friend class C_MDC_FragmentLoggedAndStored;
+  friend class C_MDC_FragmentCommit;
 
   void handle_fragment_notify(MMDSFragmentNotify *m);
 
+  void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag);
+  void finish_uncommitted_fragment(dirfrag_t basedirfrag);
 
   // -- updates --
   //int send_inode_updates(CInode *in);
index aeff07eb905e3bf81b3127db63194d34eb269e9b..49fdb9ce5dab07e3f0c05c5896bedff7f441f652 100644 (file)
@@ -2381,6 +2381,7 @@ void EFragment::replay(MDS *mds)
 
   list<CDir*> resultfrags;
   list<Context*> waiters;
+  list<frag_t> old_frags;
   pair<dirfrag_t,int> desc(dirfrag_t(ino,basefrag), bits);
 
   // in may be NULL if it wasn't in our cache yet.  if it's a prepare
@@ -2390,26 +2391,23 @@ void EFragment::replay(MDS *mds)
 
   switch (op) {
   case OP_PREPARE:
-    mds->mdcache->uncommitted_fragments.insert(desc);
+    mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, old_frags);
     // fall-thru
   case OP_ONESHOT:
     if (in)
       mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
     break;
 
-  case OP_COMMIT:
-    mds->mdcache->uncommitted_fragments.erase(desc);
-    break;
-
   case OP_ROLLBACK:
-    if (mds->mdcache->uncommitted_fragments.count(desc)) {
-      mds->mdcache->uncommitted_fragments.erase(desc);
-      assert(in);
+    if (in)
       mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
-    } else {
-      dout(10) << " no record of prepare for " << desc << dendl;
-    }
+    // fall-thru
+  case OP_COMMIT:
+    mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag));
     break;
+
+  default:
+    assert(0);
   }
   metablob.replay(mds, _segment);
   if (in && g_conf->mds_debug_frag)