}
};
+class C_MDC_FragmentCommit : public Context {
+ MDCache *mdcache;
+ dirfrag_t basedirfrag;
+ list<CDir*> resultfrags;
+public:
+ C_MDC_FragmentCommit(MDCache *m, inodeno_t ino, frag_t f, list<CDir*>& l) :
+ mdcache(m), basedirfrag(ino, f) {
+ resultfrags.swap(l);
+ }
+ virtual void finish(int r) {
+ mdcache->_fragment_committed(basedirfrag, resultfrags);
+ }
+};
+
void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
{
dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
info.basefrag, info.bits);
mds->mdlog->start_entry(le);
+ list<frag_t> old_frags;
+ for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p)
+ old_frags.push_back((*p)->get_frag());
+
// refragment
list<Context*> waiters;
adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits,
le->metablob.add_dir(dir, false);
// freeze and store them too
+ dir->auth_pin(this);
dir->state_set(CDir::STATE_FRAGMENTING);
dir->commit(0, gather.new_sub(), true); // ignore authpinnability
}
+ add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, old_frags);
mds->mdlog->submit_entry(le, gather.new_sub());
mds->mdlog->flush();
gather.activate();
dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag
<< " bits " << info.bits << " on " << *diri << dendl;
-
- // journal commit
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(),
- info.basefrag, info.bits);
- mds->mdlog->start_submit_entry(le);
// tell peers
CDir *first = *info.resultfrags.begin();
++p) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
-
- // unmark, unfreeze
- dir->state_clear(CDir::STATE_FRAGMENTING);
for (CDir::map_t::iterator p = dir->items.begin();
p != dir->items.end();
dn->put(CDentry::PIN_FRAGMENTING);
}
+ // unfreeze
dir->unfreeze_dir();
}
+ // journal commit
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT,
+ diri->ino(), info.basefrag, info.bits);
+ mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, diri->ino(), info.basefrag,
+ info.resultfrags));
+
fragment_requests.erase(mdr->reqid);
request_finish(mdr);
}
+void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+ dout(10) << "fragment_committed " << basedirfrag << dendl;
+ assert(uncommitted_fragments.count(basedirfrag));
+ ufragment &uf = uncommitted_fragments[basedirfrag];
+
+ // remove old frags
+ C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFinish(this, basedirfrag, resultfrags));
+
+ SnapContext nullsnapc;
+ object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+ for (list<frag_t>::iterator p = uf.old_frags.begin();
+ p != uf.old_frags.end();
+ ++p) {
+ object_t oid = CInode::get_object_name(basedirfrag.ino, *p, "");
+ ObjectOperation op;
+ if (*p == frag_t()) {
+ // backtrace object
+ dout(10) << " truncate orphan dirfrag " << oid << dendl;
+ op.truncate(0);
+ } else {
+ dout(10) << " removing orphan dirfrag " << oid << dendl;
+ op.remove();
+ }
+ mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, gather.new_sub());
+ }
+
+ assert(gather.has_subs());
+ gather.activate();
+}
+
+void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+ dout(10) << "fragment_finish " << basedirfrag << dendl;
+ assert(uncommitted_fragments.count(basedirfrag));
+
+ // unmark & auth_unpin
+ for (list<CDir*>::iterator p = resultfrags.begin(); p != resultfrags.end(); ++p) {
+ (*p)->state_clear(CDir::STATE_FRAGMENTING);
+ (*p)->auth_unpin(this);
+ }
+
+ finish_uncommitted_fragment(basedirfrag);
+}
/* This function DOES put the passed message before returning */
void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
notify->put();
}
+void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags)
+{
+ dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl;
+ assert(!uncommitted_fragments.count(basedirfrag));
+ ufragment& uf = uncommitted_fragments[basedirfrag];
+ uf.old_frags = old_frags;
+ uf.bits = bits;
+}
+
+void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag)
+{
+ dout(10) << "finish_uncommitted_fragments: base dirfrag " << basedirfrag << dendl;
+ if (uncommitted_fragments.count(basedirfrag)) {
+ uncommitted_fragments.erase(basedirfrag);
+ }
+}
void MDCache::rollback_uncommitted_fragments()
{
dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl;
- for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin();
+ for (map<dirfrag_t, ufragment>::iterator p = uncommitted_fragments.begin();
p != uncommitted_fragments.end();
++p) {
+ ufragment &uf = p->second;
CInode *diri = get_inode(p->first.ino);
assert(diri);
- dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl;
+ dout(10) << " rolling back " << p->first << " refragment by " << uf.bits << " bits" << dendl;
list<CDir*> resultfrags;
list<Context*> waiters;
- adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true);
+ adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true);
if (g_conf->mds_debug_frag)
diri->verify_dirfrags();
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, p->second);
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, uf.bits);
mds->mdlog->start_submit_entry(le);
}
uncommitted_fragments.clear();
list<CDir*> resultfrags;
list<Context*> waiters;
+ list<frag_t> old_frags;
pair<dirfrag_t,int> desc(dirfrag_t(ino,basefrag), bits);
// in may be NULL if it wasn't in our cache yet. if it's a prepare
switch (op) {
case OP_PREPARE:
- mds->mdcache->uncommitted_fragments.insert(desc);
+ mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, old_frags);
// fall-thru
case OP_ONESHOT:
if (in)
mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
break;
- case OP_COMMIT:
- mds->mdcache->uncommitted_fragments.erase(desc);
- break;
-
case OP_ROLLBACK:
- if (mds->mdcache->uncommitted_fragments.count(desc)) {
- mds->mdcache->uncommitted_fragments.erase(desc);
- assert(in);
+ if (in)
mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
- } else {
- dout(10) << " no record of prepare for " << desc << dendl;
- }
+ // fall-thru
+ case OP_COMMIT:
+ mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag));
break;
+
+ default:
+ assert(0);
}
metablob.replay(mds, _segment);
if (in && g_conf->mds_debug_frag)