dout(10) << " " << subfrags[0]->fnode.accounted_fragstat << dendl;
purge_stolen(waiters, replay);
- inode->close_dirfrag(frag); // selft deletion, watch out.
}
void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
if (p->second > cur)
replica_map[p->first] = p->second;
}
-
+
+ // merge version
+ if (dir->get_version() > get_version())
+ set_version(dir->get_version());
+
+ if (fnode.fragstat.version == 0)
+ fnode.fragstat.version = dir->fnode.fragstat.version;
+ if (fnode.rstat.version == 0)
+ fnode.rstat.version = dir->fnode.rstat.version;
+
// merge state
state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
dir_auth = dir->dir_auth;
// split
CDir *baseparent = diri->get_parent_dir();
- for (list<CDir*>::iterator p = srcfrags.begin();
- p != srcfrags.end();
- p++) {
- CDir *dir = *p;
-
- if (bits > 0) {
+ if (bits > 0) {
+ // SPLIT
+ assert(srcfrags.size() == 1);
+ CDir *dir = srcfrags.front();
- dir->split(bits, resultfrags, waiters, replay);
+ dir->split(bits, resultfrags, waiters, replay);
- // did i change the subtree map?
- if (dir->is_subtree_root()) {
- // new frags are now separate subtrees
+ // did i change the subtree map?
+ if (dir->is_subtree_root()) {
+ // new frags are now separate subtrees
+ for (list<CDir*>::iterator p = resultfrags.begin();
+ p != resultfrags.end();
+ ++p)
+ subtrees[*p].clear(); // new frag is now its own subtree
+
+ // was i a bound?
+ if (baseparent) {
+ CDir *parent = get_subtree_root(baseparent);
+ assert(subtrees[parent].count(dir));
+ subtrees[parent].erase(dir);
for (list<CDir*>::iterator p = resultfrags.begin();
p != resultfrags.end();
++p)
- subtrees[*p].clear(); // new frag is now its own subtree
-
- // was i a bound?
- if (baseparent) {
- CDir *parent = get_subtree_root(baseparent);
- assert(subtrees[parent].count(dir));
- subtrees[parent].erase(dir);
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
- ++p)
- subtrees[parent].insert(*p);
- }
+ subtrees[parent].insert(*p);
+ }
+
+ // adjust my bounds.
+ set<CDir*> bounds;
+ bounds.swap(subtrees[dir]);
+ subtrees.erase(dir);
+ for (set<CDir*>::iterator p = bounds.begin();
+ p != bounds.end();
+ ++p) {
+ CDir *frag = get_subtree_root((*p)->get_parent_dir());
+ subtrees[frag].insert(*p);
+ }
+
+ show_subtrees(10);
+ }
+
+ diri->close_dirfrag(dir->get_frag());
+
+ } else {
+ // MERGE
- // adjust my bounds.
- set<CDir*> bounds;
- bounds.swap(subtrees[dir]);
- subtrees.erase(dir);
- for (set<CDir*>::iterator p = bounds.begin();
- p != bounds.end();
- ++p) {
- CDir *frag = get_subtree_root((*p)->get_parent_dir());
- subtrees[frag].insert(*p);
+ // are my constituent bits subtrees? if so, i will be too.
+ // (it's all or none, actually.)
+ bool was_subtree = false;
+ set<CDir*> new_bounds;
+ for (list<CDir*>::iterator p = srcfrags.begin(); p != srcfrags.end(); p++) {
+ CDir *dir = *p;
+ if (dir->is_subtree_root()) {
+ dout(10) << " taking srcfrag subtree bounds from " << *dir << dendl;
+ was_subtree = true;
+ map<CDir*, set<CDir*> >::iterator q = subtrees.find(dir);
+ set<CDir*>::iterator r = q->second.begin();
+ while (r != subtrees[dir].end()) {
+ new_bounds.insert(*r);
+ subtrees[dir].erase(r++);
}
+ subtrees.erase(q);
- show_subtrees(10);
+ // remove myself as my parent's bound
+ if (baseparent)
+ subtrees[baseparent].erase(dir);
}
- } else {
- // merge
- CDir *f = new CDir(diri, basefrag, this, srcfrags.front()->is_auth());
- f->merge(srcfrags, waiters, replay);
-
- assert(0 == "fix subtree map...not implemented");
}
+
+ // merge
+ CDir *f = new CDir(diri, basefrag, this, srcfrags.front()->is_auth());
+ f->merge(srcfrags, waiters, replay);
+ diri->add_dirfrag(f);
+
+ if (was_subtree) {
+ subtrees[f].swap(new_bounds);
+ if (baseparent)
+ subtrees[baseparent].insert(f);
+
+ show_subtrees(10);
+ }
+
+ resultfrags.push_back(f);
}
}