]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: fix split use after free; merge works
authorSage Weil <sage@newdream.net>
Wed, 27 Oct 2010 03:06:14 +0000 (20:06 -0700)
committerSage Weil <sage@newdream.net>
Wed, 27 Oct 2010 03:06:14 +0000 (20:06 -0700)
Signed-off-by: Sage Weil <sage@newdream.net>
src/mds/CDir.cc
src/mds/MDCache.cc

index f0b66744b1f15751d7c4e19af2db0e44d71f951f..f11cfbf128508f2f24aaa7ad3c4f50bcd387be5f 100644 (file)
@@ -740,7 +740,6 @@ void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters, bool repl
   dout(10) << "               " << subfrags[0]->fnode.accounted_fragstat << dendl;
 
   purge_stolen(waiters, replay);
-  inode->close_dirfrag(frag); // selft deletion, watch out.
 }
 
 void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
@@ -764,7 +763,16 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
       if (p->second > cur)
        replica_map[p->first] = p->second;
     }
-    
+
+    // merge version
+    if (dir->get_version() > get_version())
+      set_version(dir->get_version());
+
+    if (fnode.fragstat.version == 0)
+      fnode.fragstat.version = dir->fnode.fragstat.version;
+    if (fnode.rstat.version == 0)
+      fnode.rstat.version = dir->fnode.rstat.version;
+
     // merge state
     state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
     dir_auth = dir->dir_auth;
index 3531d80e52b58e141159ef4e0a4bc043f614dec6..ec9b46c0e27bf47004df01ba2f3f8ae012a342da 100644 (file)
@@ -8468,54 +8468,88 @@ void MDCache::adjust_dir_fragments(CInode *diri,
   // split
   CDir *baseparent = diri->get_parent_dir();
 
-  for (list<CDir*>::iterator p = srcfrags.begin(); 
-       p != srcfrags.end();
-       p++) {
-    CDir *dir = *p;
-
-    if (bits > 0) {
+  if (bits > 0) {
+    // SPLIT
+    assert(srcfrags.size() == 1);
+    CDir *dir = srcfrags.front();
 
-      dir->split(bits, resultfrags, waiters, replay);
+    dir->split(bits, resultfrags, waiters, replay);
 
-      // did i change the subtree map?
-      if (dir->is_subtree_root()) {
-       // new frags are now separate subtrees
+    // did i change the subtree map?
+    if (dir->is_subtree_root()) {
+      // new frags are now separate subtrees
+      for (list<CDir*>::iterator p = resultfrags.begin();
+          p != resultfrags.end();
+          ++p)
+       subtrees[*p].clear();   // new frag is now its own subtree
+      
+      // was i a bound?
+      if (baseparent) {
+       CDir *parent = get_subtree_root(baseparent);
+       assert(subtrees[parent].count(dir));
+       subtrees[parent].erase(dir);
        for (list<CDir*>::iterator p = resultfrags.begin();
             p != resultfrags.end();
             ++p)
-         subtrees[*p].clear();   // new frag is now its own subtree
-
-       // was i a bound?
-       if (baseparent) {
-         CDir *parent = get_subtree_root(baseparent);
-         assert(subtrees[parent].count(dir));
-         subtrees[parent].erase(dir);
-         for (list<CDir*>::iterator p = resultfrags.begin();
-              p != resultfrags.end();
-              ++p)
-           subtrees[parent].insert(*p);
-       }
+         subtrees[parent].insert(*p);
+      }
+      
+      // adjust my bounds.
+      set<CDir*> bounds;
+      bounds.swap(subtrees[dir]);
+      subtrees.erase(dir);
+      for (set<CDir*>::iterator p = bounds.begin();
+          p != bounds.end();
+          ++p) {
+       CDir *frag = get_subtree_root((*p)->get_parent_dir());
+       subtrees[frag].insert(*p);
+      }
+      
+      show_subtrees(10);
+    }
+    
+    diri->close_dirfrag(dir->get_frag());
+    
+  } else {
+    // MERGE
 
-       // adjust my bounds.
-       set<CDir*> bounds;
-       bounds.swap(subtrees[dir]);
-       subtrees.erase(dir);
-       for (set<CDir*>::iterator p = bounds.begin();
-            p != bounds.end();
-            ++p) {
-         CDir *frag = get_subtree_root((*p)->get_parent_dir());
-         subtrees[frag].insert(*p);
+    // are my constituent bits subtrees?  if so, i will be too.
+    // (it's all or none, actually.)
+    bool was_subtree = false;
+    set<CDir*> new_bounds;
+    for (list<CDir*>::iterator p = srcfrags.begin(); p != srcfrags.end(); p++) {
+      CDir *dir = *p;
+      if (dir->is_subtree_root()) {
+       dout(10) << " taking srcfrag subtree bounds from " << *dir << dendl;
+       was_subtree = true;
+       map<CDir*, set<CDir*> >::iterator q = subtrees.find(dir);
+       set<CDir*>::iterator r = q->second.begin();
+       while (r != subtrees[dir].end()) {
+         new_bounds.insert(*r);
+         subtrees[dir].erase(r++);
        }
+       subtrees.erase(q);
        
-       show_subtrees(10);
+       // remove myself as my parent's bound
+       if (baseparent)
+         subtrees[baseparent].erase(dir);
       }
-    } else {
-      // merge
-      CDir *f = new CDir(diri, basefrag, this, srcfrags.front()->is_auth());
-      f->merge(srcfrags, waiters, replay);
-
-      assert(0 == "fix subtree map...not implemented");
     }
+    
+    // merge
+    CDir *f = new CDir(diri, basefrag, this, srcfrags.front()->is_auth());
+    f->merge(srcfrags, waiters, replay);
+    diri->add_dirfrag(f);
+
+    if (was_subtree) {
+      subtrees[f].swap(new_bounds);
+      if (baseparent)
+       subtrees[baseparent].insert(f);
+      
+      show_subtrees(10);
+    }
+
+    resultfrags.push_back(f);
   }
 }