map<int, hash_set<version_t> > pending_commit_tids; // mdstable
set<metareqid_t> uncommitted_masters;
+ set<dirfrag_t> uncommitted_fragments;
// client request ids
map<int, tid_t> last_client_tids;
mut->add_updated_lock(&diri->nestlock);
*/
- add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, le->orig_frags);
+ add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, le->orig_frags, mdr->ls);
mds->mdlog->submit_entry(le, new C_MDC_FragmentPrep(this, mdr));
mds->mdlog->flush();
}
}
void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags,
- bufferlist *rollback)
+ LogSegment *ls, bufferlist *rollback)
{
dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl;
assert(!uncommitted_fragments.count(basedirfrag));
ufragment& uf = uncommitted_fragments[basedirfrag];
uf.old_frags = old_frags;
uf.bits = bits;
+ uf.ls = ls;
+ ls->uncommitted_fragments.insert(basedirfrag);
if (rollback)
uf.rollback.swap(*rollback);
}
if (op != EFragment::OP_FINISH && !uf.old_frags.empty()) {
uf.committed = true;
} else {
+ uf.ls->uncommitted_fragments.erase(basedirfrag);
+ mds->queue_waiters(uf.waiters);
uncommitted_fragments.erase(basedirfrag);
}
}
uf.old_frags.swap(old_frags);
uf.committed = true;
} else {
+ uf.ls->uncommitted_fragments.erase(basedirfrag);
uncommitted_fragments.erase(basedirfrag);
}
}
struct ufragment {
int bits;
bool committed;
+ LogSegment *ls;
+ list<Context*> waiters;
list<frag_t> old_frags;
bufferlist rollback;
- ufragment() : bits(0), committed(false) {}
+ ufragment() : bits(0), committed(false), ls(NULL) {}
};
map<dirfrag_t, ufragment> uncommitted_fragments;
void handle_fragment_notify(MMDSFragmentNotify *m);
void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag,
- bufferlist *rollback=NULL);
+ LogSegment *ls, bufferlist *rollback=NULL);
void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op);
void rollback_uncommitted_fragment(dirfrag_t basedirfrag, list<frag_t>& old_frags);
public:
+ void wait_for_uncommitted_fragment(dirfrag_t dirfrag, Context *c) {
+ assert(uncommitted_fragments.count(dirfrag));
+ uncommitted_fragments[dirfrag].waiters.push_back(c);
+ }
void split_dir(CDir *dir, int byn);
void merge_dir(CInode *diri, frag_t fg);
void rollback_uncommitted_fragments();
mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
}
+ // uncommitted fragments
+ for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
+ p != uncommitted_fragments.end();
+ ++p) {
+ dout(10) << "try_to_expire waiting for uncommitted fragment " << *p << dendl;
+ mds->mdcache->wait_for_uncommitted_fragment(*p, gather_bld.new_sub());
+ }
+
// nudge scatterlocks
for (elist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
CInode *in = *p;
switch (op) {
case OP_PREPARE:
- mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, orig_frags, &rollback);
+ mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, orig_frags, _segment, &rollback);
// fall-thru
case OP_ONESHOT:
if (in)