class C_MDC_FragmentFrozen : public Context {
MDCache *mdcache;
- list<CDir*> dirs;
- frag_t basefrag;
- int by;
+ dirfrag_t basedirfrag;
public:
- C_MDC_FragmentFrozen(MDCache *m, list<CDir*> d, frag_t bf, int b) : mdcache(m), dirs(d), basefrag(bf), by(b) {}
+ C_MDC_FragmentFrozen(MDCache *m, dirfrag_t df) :
+ mdcache(m), basedirfrag(df) {}
virtual void finish(int r) {
- mdcache->fragment_frozen(dirs, basefrag, by);
+ mdcache->fragment_frozen(basedirfrag, r);
}
};
if (!can_fragment(diri, dirs))
return;
- C_GatherBuilder gather(g_ceph_context,
- new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits));
+ assert(fragments.count(dir->dirfrag()) == 0);
+ fragment_info_t& info = fragments[dir->dirfrag()];
+ info.dirs.push_back(dir);
+ info.bits = bits;
+ info.last_cum_auth_pins_change = ceph_clock_now(g_ceph_context);
+
+ C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFrozen(this, dir->dirfrag()));
fragment_freeze_dirs(dirs, gather);
gather.activate();
int bits = first->get_frag().bits() - frag.bits();
dout(10) << " we are merginb by " << bits << " bits" << dendl;
+ dirfrag_t df(diri->ino(), frag);
+ assert(fragments.count(df) == 0);
+ fragment_info_t& info = fragments[df];
+ info.dirs = dirs;
+ info.bits = -bits;
+ info.last_cum_auth_pins_change = ceph_clock_now(g_ceph_context);
+
C_GatherBuilder gather(g_ceph_context,
- new C_MDC_FragmentFrozen(this, dirs, frag, -bits));
+ new C_MDC_FragmentFrozen(this, dirfrag_t(diri->ino(), frag)));
fragment_freeze_dirs(dirs, gather);
gather.activate();
}
}
+void MDCache::fragment_freeze_inc_num_waiters(CDir *dir)
+{
+ map<dirfrag_t,fragment_info_t>::iterator p;
+ for (p = fragments.lower_bound(dirfrag_t(dir->ino(), 0));
+ p != fragments.end() && p->first.ino == dir->ino();
+ ++p) {
+ if (p->first.frag.contains(dir->get_frag())) {
+ p->second.num_remote_waiters++;
+ return;
+ }
+ }
+ assert(0);
+}
+
+void MDCache::find_stale_fragment_freeze()
+{
+ dout(10) << "find_stale_fragment_freeze" << dendl;
+ // see comment in Migrator::find_stale_export_freeze()
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t cutoff = now;
+ cutoff -= g_conf->mds_freeze_tree_timeout;
+
+ for (map<dirfrag_t,fragment_info_t>::iterator p = fragments.begin();
+ p != fragments.end(); ) {
+ dirfrag_t df = p->first;
+ fragment_info_t& info = p->second;
+ ++p;
+ if (info.dirs_frozen)
+ continue;
+ CDir *dir;
+ int total_auth_pins = 0;
+ for (list<CDir*>::iterator q = info.dirs.begin();
+ q != info.dirs.end();
+ ++q) {
+ dir = *q;
+ if (!dir->state_test(CDir::STATE_DNPINNEDFRAG)) {
+ total_auth_pins = -1;
+ break;
+ }
+ if (dir->is_frozen_dir())
+ continue;
+ total_auth_pins += dir->get_auth_pins() + dir->get_dir_auth_pins();
+ }
+ if (total_auth_pins < 0)
+ continue;
+ if (info.last_cum_auth_pins != total_auth_pins) {
+ info.last_cum_auth_pins = total_auth_pins;
+ info.last_cum_auth_pins_change = now;
+ continue;
+ }
+ if (info.last_cum_auth_pins_change >= cutoff)
+ continue;
+ dir = info.dirs.front();
+ if (info.num_remote_waiters > 0 ||
+ (!dir->inode->is_root() && dir->get_parent_dir()->is_freezing())) {
+ dout(10) << " cancel fragmenting " << df << " bit " << info.bits << dendl;
+ list<CDir*> dirs;
+ dirs.swap(info.dirs);
+ fragments.erase(df);
+ fragment_unmark_unfreeze_dirs(dirs);
+ }
+ }
+}
+
class C_MDC_FragmentPrep : public Context {
MDCache *mdcache;
MDRequest *mdr;
dirfrag_t basedirfrag;
list<CDir*> resultfrags;
public:
- C_MDC_FragmentCommit(MDCache *m, inodeno_t ino, frag_t f, list<CDir*>& l) :
- mdcache(m), basedirfrag(ino, f) {
+ C_MDC_FragmentCommit(MDCache *m, dirfrag_t df, list<CDir*>& l) :
+ mdcache(m), basedirfrag(df) {
resultfrags.swap(l);
}
virtual void finish(int r) {
}
};
-void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
+void MDCache::fragment_frozen(dirfrag_t basedirfrag, int r)
{
- dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
- << " on " << dirs.front()->get_inode() << dendl;
+ map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
+ if (r < 0) {
+ dout(7) << "fragment_frozen " << basedirfrag << " must have aborted" << dendl;
+ assert(it == fragments.end());
+ return;
+ }
+ assert(it != fragments.end());
+ fragment_info_t& info = it->second;
- if (bits > 0)
- assert(dirs.size() == 1);
- else if (bits < 0)
- assert(dirs.size() > 1);
- else
- assert(0);
+ dout(10) << "fragment_frozen " << basedirfrag.frag << " by " << info.bits
+ << " on " << info.dirs.front()->get_inode() << dendl;
- MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR);
- fragment_info_t &info = fragment_requests[mdr->reqid];
- info.basefrag = basefrag;
- info.bits = bits;
- info.dirs = dirs;
+ info.dirs_frozen = true;
+ MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR);
+ mdr->more()->fragment_base = basedirfrag;
dispatch_fragment_dir(mdr);
}
void MDCache::dispatch_fragment_dir(MDRequest *mdr)
{
- map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid);
- assert(it != fragment_requests.end());
- fragment_info_t &info = it->second;
+ dirfrag_t basedirfrag = mdr->more()->fragment_base;
+ map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
+ assert(it != fragments.end());
+ fragment_info_t& info = it->second;
CInode *diri = info.dirs.front()->get_inode();
- dout(10) << "dispatch_fragment_dir " << info.resultfrags << " "
- << info.basefrag << " bits " << info.bits << " on " << *diri << dendl;
+ dout(10) << "dispatch_fragment_dir " << basedirfrag << " bits " << info.bits
+ << " on " << *diri << dendl;
// avoid freeze dir deadlock
if (!mdr->is_auth_pinned(diri)) {
else
mds->balancer->queue_merge(info.dirs.front());
fragment_unmark_unfreeze_dirs(info.dirs);
- fragment_requests.erase(mdr->reqid);
+ fragments.erase(it);
request_finish(mdr);
return;
}
return;
mdr->ls = mds->mdlog->get_current_segment();
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(),
- info.basefrag, info.bits);
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE,
+ basedirfrag.ino, basedirfrag.frag, info.bits);
mds->mdlog->start_entry(le);
for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p) {
// refragment
list<Context*> waiters;
- adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits,
+ adjust_dir_fragments(diri, info.dirs, basedirfrag.frag, info.bits,
info.resultfrags, waiters, false);
if (g_conf->mds_debug_frag)
diri->verify_dirfrags();
mut->add_updated_lock(&diri->nestlock);
*/
- add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, le->orig_frags, mdr->ls);
+ add_uncommitted_fragment(basedirfrag, info.bits, le->orig_frags, mdr->ls);
mds->mdlog->submit_entry(le, new C_MDC_FragmentPrep(this, mdr));
mds->mdlog->flush();
}
void MDCache::_fragment_logged(MDRequest *mdr)
{
- map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid);
- assert(it != fragment_requests.end());
+ dirfrag_t basedirfrag = mdr->more()->fragment_base;
+ map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
+ assert(it != fragments.end());
fragment_info_t &info = it->second;
CInode *diri = info.resultfrags.front()->get_inode();
- dout(10) << "fragment_logged " << info.resultfrags << " " << info.basefrag
- << " bits " << info.bits << " on " << *diri << dendl;
+ dout(10) << "fragment_logged " << basedirfrag << " bits " << info.bits
+ << " on " << *diri << dendl;
// store resulting frags
C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentStore(this, mdr));
void MDCache::_fragment_stored(MDRequest *mdr)
{
- map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid);
- assert(it != fragment_requests.end());
+ dirfrag_t basedirfrag = mdr->more()->fragment_base;
+ map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
+ assert(it != fragments.end());
fragment_info_t &info = it->second;
CInode *diri = info.resultfrags.front()->get_inode();
- dout(10) << "fragment_stored " << info.resultfrags << " " << info.basefrag
- << " bits " << info.bits << " on " << *diri << dendl;
+ dout(10) << "fragment_stored " << basedirfrag << " bits " << info.bits
+ << " on " << *diri << dendl;
// tell peers
CDir *first = *info.resultfrags.begin();
rejoin_gather.count(p->first)))
continue;
- MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits);
+ MMDSFragmentNotify *notify = new MMDSFragmentNotify(basedirfrag.ino, basedirfrag.frag, info.bits);
// freshly replicate new dirs to peers
for (list<CDir*>::iterator q = info.resultfrags.begin();
// journal commit
EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT,
- diri->ino(), info.basefrag, info.bits);
- mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, diri->ino(), info.basefrag,
+ basedirfrag, info.bits);
+ mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, basedirfrag,
info.resultfrags));
- fragment_requests.erase(it);
+ fragments.erase(it);
request_finish(mdr);
}