mds->server->dispatch_slave_request(mdr);
} else {
switch (mdr->internal_op) {
-
- // ...
-
+ case CEPH_MDS_OP_FRAGMENTDIR:
+ dispatch_fragment_dir(mdr);
+ break;
default:
assert(0);
}
}
};
-
-bool MDCache::can_fragment_lock(CInode *diri)
-{
- if (!diri->dirfragtreelock.can_wrlock(-1)) {
- dout(7) << "can_fragment: can't wrlock dftlock" << dendl;
- mds->locker->scatter_nudge(&diri->dirfragtreelock, NULL);
- return false;
- }
- return true;
-}
-
bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs)
{
if (mds->mdsmap->is_degraded()) {
if (!can_fragment(diri, dirs))
return;
- if (!can_fragment_lock(diri)) {
- dout(10) << " requeuing dir " << dir->dirfrag() << dendl;
- mds->balancer->queue_split(dir);
- return;
- }
C_GatherBuilder gather(g_ceph_context,
new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits));
if (!can_fragment(diri, dirs))
return;
- if (!can_fragment_lock(diri)) {
- //dout(10) << " requeuing dir " << dir->dirfrag() << dendl;
- //mds->mdbalancer->split_queue.insert(dir->dirfrag());
- return;
- }
CDir *first = dirs.front();
int bits = first->get_frag().bits() - frag.bits();
class C_MDC_FragmentLoggedAndStored : public Context {
MDCache *mdcache;
- Mutation *mut;
- list<CDir*> resultfrags;
- frag_t basefrag;
- int bits;
+ MDRequest *mdr;
public:
- C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) :
- mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
+ C_MDC_FragmentLoggedAndStored(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
virtual void finish(int r) {
- mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits);
+ mdcache->fragment_logged_and_stored(mdr);
}
};
void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
{
- CInode *diri = dirs.front()->get_inode();
+ dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
+ << " on " << dirs.front()->get_inode() << dendl;
- if (bits > 0) {
+ if (bits > 0)
assert(dirs.size() == 1);
- } else {
- assert(bits < 0);
- }
+ else if (bits < 0)
+ assert(dirs.size() > 1);
+ else
+ assert(0);
- dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
- << " on " << *diri << dendl;
+ MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR);
+ fragment_info_t &info = fragment_requests[mdr->reqid];
+ info.basefrag = basefrag;
+ info.bits = bits;
+ info.dirs = dirs;
- // wrlock dirfragtreelock
- if (!diri->dirfragtreelock.can_wrlock(-1)) {
- dout(10) << " can't wrlock " << diri->dirfragtreelock << " on " << *diri << dendl;
- fragment_unmark_unfreeze_dirs(dirs);
- return;
+ dispatch_fragment_dir(mdr);
+}
+
+void MDCache::dispatch_fragment_dir(MDRequest *mdr)
+{
+ assert(fragment_requests.count(mdr->reqid));
+ fragment_info_t &info = fragment_requests[mdr->reqid];
+ CInode *diri = info.dirs.front()->get_inode();
+
+ dout(10) << "dispatch_fragment_dir " << info.resultfrags << " "
+ << info.basefrag << " bits " << info.bits << " on " << *diri << dendl;
+
+ // avoid freeze dir deadlock
+ if (!mdr->is_auth_pinned(diri)) {
+ if (!diri->can_auth_pin()) {
+ dout(10) << " can't auth_pin " << *diri << ", requeuing dir "
+ << info.dirs.front()->dirfrag() << dendl;
+ if (info.bits > 0)
+ mds->balancer->queue_split(info.dirs.front());
+ else
+ mds->balancer->queue_merge(info.dirs.front());
+ fragment_unmark_unfreeze_dirs(info.dirs);
+ fragment_requests.erase(mdr->reqid);
+ request_finish(mdr);
+ return;
+ }
+ mdr->auth_pin(diri);
}
- diri->dirfragtreelock.get_wrlock(true);
+ set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ wrlocks.insert(&diri->dirfragtreelock);
// prevent a racing gather on any other scatterlocks too
- diri->nestlock.get_wrlock(true);
- diri->filelock.get_wrlock(true);
+ wrlocks.insert(&diri->nestlock);
+ wrlocks.insert(&diri->filelock);
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ return;
+
+ mdr->ls = mds->mdlog->get_current_segment();
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(),
+ info.basefrag, info.bits);
+ mds->mdlog->start_entry(le);
// refragment
- list<CDir*> resultfrags;
list<Context*> waiters;
- adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false);
+ adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits,
+ info.resultfrags, waiters, false);
if (g_conf->mds_debug_frag)
diri->verify_dirfrags();
mds->queue_waiters(waiters);
- // journal
- Mutation *mut = new Mutation;
-
- mut->ls = mds->mdlog->get_current_segment();
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits);
- mds->mdlog->start_entry(le);
-
- le->metablob.add_dir_context(*resultfrags.begin());
+ le->metablob.add_dir_context(*info.resultfrags.begin());
// dft lock
mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock);
- mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
- mut->add_updated_lock(&diri->dirfragtreelock);
+ mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
+ mdr->add_updated_lock(&diri->dirfragtreelock);
/*
// filelock
*/
// freeze, journal, and store resulting frags
- C_GatherBuilder gather(g_ceph_context,
- new C_MDC_FragmentLoggedAndStored(this, mut,
- resultfrags, basefrag, bits));
+ C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentLoggedAndStored(this, mdr));
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
+ for (list<CDir*>::iterator p = info.resultfrags.begin();
+ p != info.resultfrags.end();
++p) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
gather.activate();
}
-void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
+void MDCache::fragment_logged_and_stored(MDRequest *mdr)
{
- CInode *diri = resultfrags.front()->get_inode();
+ assert(fragment_requests.count(mdr->reqid));
+ fragment_info_t &info = fragment_requests[mdr->reqid];
+ CInode *diri = info.resultfrags.front()->get_inode();
- dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits
- << " on " << *diri << dendl;
+ dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag
+ << " bits " << info.bits << " on " << *diri << dendl;
// journal commit
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits);
- mds->mdlog->start_entry(le);
- mds->mdlog->submit_entry(le);
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(),
+ info.basefrag, info.bits);
+ mds->mdlog->start_submit_entry(le);
// tell peers
- CDir *first = *resultfrags.begin();
+ CDir *first = *info.resultfrags.begin();
for (map<int,int>::iterator p = first->replica_map.begin();
p != first->replica_map.end();
++p) {
if (mds->mdsmap->get_state(p->first) <= MDSMap::STATE_REJOIN)
continue;
- MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), basefrag, bits);
+ MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits);
/*
// freshly replicate new dirs to peers
mds->send_message_mds(notify, p->first);
}
- mut->apply(); // mark scatterlock
- mds->locker->drop_locks(mut);
- mut->cleanup();
- delete mut;
-
- // drop dft wrlock
- bool need_issue = false;
- mds->locker->wrlock_finish(&diri->dirfragtreelock, NULL, &need_issue);
- mds->locker->wrlock_finish(&diri->nestlock, NULL, &need_issue);
- mds->locker->wrlock_finish(&diri->filelock, NULL, &need_issue);
+ mdr->apply(); // mark scatterlock
+ mds->locker->drop_locks(mdr);
// unfreeze resulting frags
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
+ for (list<CDir*>::iterator p = info.resultfrags.begin();
+ p != info.resultfrags.end();
++p) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
dir->unfreeze_dir();
}
- if (need_issue)
- mds->locker->issue_caps(diri);
+ fragment_requests.erase(mdr->reqid);
+ request_finish(mdr);
}