send_heartbeat();
num_bal_times--;
}
-
- // hash?
- if ((g_conf->mds_bal_frag || g_conf->mds_thrash_fragments) &&
- g_conf->mds_bal_fragment_interval > 0 &&
- now.sec() - last_fragment.sec() > g_conf->mds_bal_fragment_interval) {
- last_fragment = now;
- do_fragmenting();
- }
}
return howmuch;
}
-void MDBalancer::queue_split(CDir *dir)
+void MDBalancer::queue_split(const CDir *dir, bool fast)
{
+ dout(10) << __func__ << " enqueuing " << *dir
+ << " (fast=" << fast << ")" << dendl;
+
assert(mds->mdsmap->allows_dirfrags());
- split_queue.insert(dir->dirfrag());
-}
+ const dirfrag_t frag = dir->dirfrag();
+
+ auto callback = [this, frag](int r) {
+ if (split_pending.erase(frag) == 0) {
+ // Someone beat me to it. This can happen in the fast splitting
+ // path, because we spawn two contexts, one with mds->timer and
+ // one with mds->queue_waiter. The loser can safely just drop
+ // out.
+ return;
+ }
-void MDBalancer::queue_merge(CDir *dir)
-{
- merge_queue.insert(dir->dirfrag());
+ CDir *split_dir = mds->mdcache->get_dirfrag(frag);
+ if (!split_dir) {
+ dout(10) << "drop split on " << frag << " because not in cache" << dendl;
+ return;
+ }
+
+ // Pass on to MDCache: note that the split might still not
+ // happen if the checks in MDCache::can_fragment fail.
+ dout(10) << __func__ << " splitting " << *split_dir << dendl;
+ mds->mdcache->split_dir(split_dir, g_conf->mds_bal_split_bits);
+ };
+
+ bool is_new = false;
+ if (split_pending.count(frag) == 0) {
+ split_pending.insert(frag);
+ is_new = true;
+ }
+
+ if (fast) {
+ // Do the split ASAP: enqueue it in the MDSRank waiters which are
+ // run at the end of dispatching the current request
+ mds->queue_waiter(new MDSInternalContextWrapper(mds,
+ new FunctionContext(callback)));
+ } else if (is_new) {
+ // Set a timer to really do the split: we don't do it immediately
+ // so that bursts of ops on a directory have a chance to go through
+ // before we freeze it.
+ mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
+ new FunctionContext(callback));
+ }
}
-void MDBalancer::do_fragmenting()
+void MDBalancer::queue_merge(CDir *dir)
{
- if (split_queue.empty() && merge_queue.empty()) {
- dout(20) << "do_fragmenting has nothing to do" << dendl;
- return;
- }
+ const auto frag = dir->dirfrag();
+ auto callback = [this, frag](int r) {
+ assert(frag.frag != frag_t());
+
+ // frag must be in this set because only one context is in flight
+ // for a given frag at a time (because merge_pending is checked before
+ // starting one), and this context is the only one that erases it.
+ merge_pending.erase(frag);
+
+ CDir *dir = mds->mdcache->get_dirfrag(frag);
+ if (!dir) {
+ dout(10) << "drop merge on " << frag << " because not in cache" << dendl;
+ return;
+ }
+ assert(dir->dirfrag() == frag);
- if (!split_queue.empty()) {
- dout(10) << "do_fragmenting " << split_queue.size() << " dirs marked for possible splitting" << dendl;
+ if(!dir->is_auth()) {
+ dout(10) << "drop merge on " << *dir << " because lost auth" << dendl;
+ return;
+ }
- set<dirfrag_t> q;
- q.swap(split_queue);
+ dout(10) << "merging " << *dir << dendl;
- for (set<dirfrag_t>::iterator i = q.begin();
- i != q.end();
- ++i) {
- CDir *dir = mds->mdcache->get_dirfrag(*i);
- if (!dir ||
- !dir->is_auth())
- continue;
+ CInode *diri = dir->get_inode();
- dout(10) << "do_fragmenting splitting " << *dir << dendl;
- mds->mdcache->split_dir(dir, g_conf->mds_bal_split_bits);
+ frag_t fg = dir->get_frag();
+ while (fg != frag_t()) {
+ frag_t sibfg = fg.get_sibling();
+ list<CDir*> sibs;
+ bool complete = diri->get_dirfrags_under(sibfg, sibs);
+ if (!complete) {
+ dout(10) << " not all sibs under " << sibfg << " in cache (have " << sibs << ")" << dendl;
+ break;
+ }
+ bool all = true;
+ for (list<CDir*>::iterator p = sibs.begin(); p != sibs.end(); ++p) {
+ CDir *sib = *p;
+ if (!sib->is_auth() || !sib->should_merge()) {
+ all = false;
+ break;
+ }
+ }
+ if (!all) {
+ dout(10) << " not all sibs under " << sibfg << " " << sibs << " should_merge" << dendl;
+ break;
+ }
+ dout(10) << " all sibs under " << sibfg << " " << sibs << " should merge" << dendl;
+ fg = fg.parent();
}
- }
- if (!merge_queue.empty()) {
- dout(10) << "do_fragmenting " << merge_queue.size() << " dirs marked for possible merging" << dendl;
-
- set<dirfrag_t> q;
- q.swap(merge_queue);
-
- for (set<dirfrag_t>::iterator i = q.begin();
- i != q.end();
- ++i) {
- CDir *dir = mds->mdcache->get_dirfrag(*i);
- if (!dir ||
- !dir->is_auth() ||
- dir->get_frag() == frag_t()) // ok who's the joker?
- continue;
-
- dout(10) << "do_fragmenting merging " << *dir << dendl;
-
- CInode *diri = dir->get_inode();
-
- frag_t fg = dir->get_frag();
- while (fg != frag_t()) {
- frag_t sibfg = fg.get_sibling();
- list<CDir*> sibs;
- bool complete = diri->get_dirfrags_under(sibfg, sibs);
- if (!complete) {
- dout(10) << " not all sibs under " << sibfg << " in cache (have " << sibs << ")" << dendl;
- break;
- }
- bool all = true;
- for (list<CDir*>::iterator p = sibs.begin(); p != sibs.end(); ++p) {
- CDir *sib = *p;
- if (!sib->is_auth() || !sib->should_merge()) {
- all = false;
- break;
- }
- }
- if (!all) {
- dout(10) << " not all sibs under " << sibfg << " " << sibs << " should_merge" << dendl;
- break;
- }
- dout(10) << " all sibs under " << sibfg << " " << sibs << " should merge" << dendl;
- fg = fg.parent();
- }
+ if (fg != dir->get_frag())
+ mds->mdcache->merge_dir(diri, fg);
+ };
- if (fg != dir->get_frag())
- mds->mdcache->merge_dir(diri, fg);
- }
+ if (merge_pending.count(frag) == 0) {
+ dout(20) << __func__ << " enqueued dir " << *dir << dendl;
+ merge_pending.insert(frag);
+ mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
+ new FunctionContext(callback));
+ } else {
+ dout(20) << __func__ << " dir already in queue " << *dir << dendl;
}
}
-
-
void MDBalancer::prep_rebalance(int beat)
{
if (g_conf->mds_thrash_exports) {
mds->mdsmap->allows_dirfrags() &&
(dir->should_split() ||
(v > g_conf->mds_bal_split_rd && type == META_POP_IRD) ||
- (v > g_conf->mds_bal_split_wr && type == META_POP_IWR)) &&
- split_queue.count(dir->dirfrag()) == 0) {
- dout(10) << "hit_dir " << type << " pop is " << v << ", putting in split_queue: " << *dir << dendl;
- split_queue.insert(dir->dirfrag());
+ (v > g_conf->mds_bal_split_wr && type == META_POP_IWR))) {
+ dout(10) << "hit_dir " << type << " pop is " << v
+ << ", putting in split_pending: " << *dir << dendl;
+ if (split_pending.count(dir->dirfrag()) == 0) {
+ queue_split(dir, false);
+ } else {
+ if (dir->should_split_fast()) {
+ dout(4) << "hit_dir: fragment hit hard limit, splitting immediately ("
+ << *dir << ")" << dendl;
+ queue_split(dir, true);
+ } else {
+ dout(10) << "hit_dir: fragment already enqueued to split: "
+ << *dir << dendl;
+ }
+ }
}
// merge?
if (dir->get_frag() != frag_t() && dir->should_merge() &&
- merge_queue.count(dir->dirfrag()) == 0) {
- dout(10) << "hit_dir " << type << " pop is " << v << ", putting in merge_queue: " << *dir << dendl;
- merge_queue.insert(dir->dirfrag());
+ merge_pending.count(dir->dirfrag()) == 0) {
+ dout(10) << "hit_dir " << type << " pop is " << v << ", putting in merge_pending: " << *dir << dendl;
+ queue_merge(dir);
}
}