OPTION(mds_bal_need_max, OPT_FLOAT, 1.2)
OPTION(mds_bal_midchunk, OPT_FLOAT, .3) // any sub bigger than this taken in full
OPTION(mds_bal_minchunk, OPT_FLOAT, .001) // never take anything smaller than this
-OPTION(mds_bal_target_removal_min, OPT_INT, 5) // min balance iterations before old target is removed
-OPTION(mds_bal_target_removal_max, OPT_INT, 10) // max balance iterations before old target is removed
+OPTION(mds_bal_target_decay, OPT_DOUBLE, 10.0) // target decay half-life in MDSMap (2x larger is approx. 2x slower)
OPTION(mds_replay_interval, OPT_FLOAT, 1.0) // time to wait before starting replay again
OPTION(mds_shutdown_check, OPT_INT, 0)
OPTION(mds_thrash_exports, OPT_INT, 0)
#include "include/Context.h"
#include "msg/Messenger.h"
#include "messages/MHeartbeat.h"
-#include "messages/MMDSLoadTargets.h"
#include <fstream>
#include <iostream>
-double MDBalancer::try_match(mds_rank_t ex, double& maxex,
+double MDBalancer::try_match(balance_state_t& state, mds_rank_t ex, double& maxex,
mds_rank_t im, double& maxim)
{
if (maxex <= 0 || maxim <= 0) return 0.0;
dout(5) << " - mds." << ex << " exports " << howmuch << " to mds." << im << dendl;
if (ex == mds->get_nodeid())
- my_targets[im] += howmuch;
+ state.targets[im] += howmuch;
- exported[ex] += howmuch;
- imported[im] += howmuch;
+ state.exported[ex] += howmuch;
+ state.imported[im] += howmuch;
maxex -= howmuch;
maxim -= howmuch;
void MDBalancer::prep_rebalance(int beat)
{
+ balance_state_t state;
+
if (g_conf->mds_thrash_exports) {
//we're going to randomly export to all the mds in the cluster
- my_targets.clear();
set<mds_rank_t> up_mds;
mds->get_mds_map()->get_up_mds_set(up_mds);
- for (set<mds_rank_t>::iterator i = up_mds.begin();
- i != up_mds.end();
- ++i)
- my_targets[*i] = 0.0;
+ for (const auto &rank : up_mds) {
+ state.targets[rank] = 0.0;
+ }
} else {
int cluster_size = mds->get_mds_map()->get_num_in_mds();
mds_rank_t whoami = mds->get_nodeid();
rebalance_time = ceph_clock_now();
- // reset
- my_targets.clear();
- imported.clear();
- exported.clear();
-
dout(5) << " prep_rebalance: cluster loads are" << dendl;
mds->mdcache->migrator->clear_export_queue();
for (multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
ex != exporters.rend();
++ex) {
- double maxex = get_maxex(ex->second);
+ double maxex = get_maxex(state, ex->second);
if (maxex <= .001) continue;
// check importers. for now, just in arbitrary order (no intelligent matching).
for (map<mds_rank_t, float>::iterator im = mds_import_map[ex->second].begin();
im != mds_import_map[ex->second].end();
++im) {
- double maxim = get_maxim(im->first);
+ double maxim = get_maxim(state, im->first);
if (maxim <= .001) continue;
- try_match(ex->second, maxex,
- im->first, maxim);
+ try_match(state, ex->second, maxex, im->first, maxim);
if (maxex <= .001) break;
}
}
multimap<double,mds_rank_t>::iterator im = importers.begin();
while (ex != exporters.rend() &&
im != importers.end()) {
- double maxex = get_maxex(ex->second);
- double maxim = get_maxim(im->second);
+ double maxex = get_maxex(state, ex->second);
+ double maxim = get_maxim(state, im->second);
if (maxex < .001 || maxim < .001) break;
- try_match(ex->second, maxex,
- im->second, maxim);
+ try_match(state, ex->second, maxex, im->second, maxim);
if (maxex <= .001) ++ex;
if (maxim <= .001) ++im;
}
multimap<double,mds_rank_t>::iterator im = importers.begin();
while (ex != exporters.end() &&
im != importers.end()) {
- double maxex = get_maxex(ex->second);
- double maxim = get_maxim(im->second);
+ double maxex = get_maxex(state, ex->second);
+ double maxim = get_maxim(state, im->second);
if (maxex < .001 || maxim < .001) break;
- try_match(ex->second, maxex,
- im->second, maxim);
+ try_match(state, ex->second, maxex, im->second, maxim);
if (maxex <= .001) ++ex;
if (maxim <= .001) ++im;
}
}
}
- try_rebalance();
+ try_rebalance(state);
}
-
+void MDBalancer::hit_targets(const balance_state_t& state)
+{
+ utime_t now = ceph_clock_now();
+ for (auto &it : state.targets) {
+ mds_rank_t target = it.first;
+ mds->hit_export_target(now, target, g_conf->mds_bal_target_decay);
+ }
+}
int MDBalancer::mantle_prep_rebalance()
{
+ balance_state_t state;
+
/* refresh balancer if it has changed */
if (bal_version != mds->mdsmap->get_balancer()) {
bal_version.assign("");
/* prepare for balancing */
int cluster_size = mds->get_mds_map()->get_num_in_mds();
rebalance_time = ceph_clock_now();
- my_targets.clear();
- imported.clear();
- exported.clear();
mds->mdcache->migrator->clear_export_queue();
/* fill in the metrics for each mds by grabbing load struct */
/* execute the balancer */
Mantle mantle;
- int ret = mantle.balance(bal_code, mds->get_nodeid(), metrics, my_targets);
- dout(2) << " mantle decided that new targets=" << my_targets << dendl;
+ int ret = mantle.balance(bal_code, mds->get_nodeid(), metrics, state.targets);
+ dout(2) << " mantle decided that new targets=" << state.targets << dendl;
/* mantle doesn't know about cluster size, so check target len here */
- if ((int) my_targets.size() != cluster_size)
+ if ((int) state.targets.size() != cluster_size)
return -EINVAL;
else if (ret)
return ret;
- try_rebalance();
+ try_rebalance(state);
return 0;
}
-void MDBalancer::try_rebalance()
+void MDBalancer::try_rebalance(balance_state_t& state)
{
- if (!check_targets())
+ if (!check_targets(state))
return;
if (g_conf->mds_thrash_exports) {
// do my exports!
set<CDir*> already_exporting;
- for (map<mds_rank_t,double>::iterator it = my_targets.begin();
- it != my_targets.end();
- ++it) {
- mds_rank_t target = (*it).first;
- double amount = (*it).second;
+ for (auto &it : state.targets) {
+ mds_rank_t target = it.first;
+ double amount = it.second;
if (amount < MIN_OFFLOAD) continue;
if (amount / target_load < .2) continue;
}
-/* returns true if all my_target MDS are in the MDSMap.
- */
-bool MDBalancer::check_targets()
+/* Check that all targets are in the MDSMap export_targets for my rank. */
+bool MDBalancer::check_targets(const balance_state_t& state)
{
- // get MonMap's idea of my_targets
- const set<mds_rank_t>& map_targets = mds->mdsmap->get_mds_info(mds->get_nodeid()).export_targets;
-
- bool send = false;
- bool ok = true;
-
- // make sure map targets are in the old_prev_targets map
- for (set<mds_rank_t>::iterator p = map_targets.begin(); p != map_targets.end(); ++p) {
- if (old_prev_targets.count(*p) == 0)
- old_prev_targets[*p] = 0;
- if (my_targets.count(*p) == 0)
- old_prev_targets[*p]++;
- }
-
- // check if the current MonMap has all our targets
- set<mds_rank_t> need_targets;
- for (map<mds_rank_t,double>::iterator i = my_targets.begin();
- i != my_targets.end();
- ++i) {
- need_targets.insert(i->first);
- old_prev_targets[i->first] = 0;
-
- if (!map_targets.count(i->first)) {
- dout(20) << " target mds." << i->first << " not in map's export_targets" << dendl;
- send = true;
- ok = false;
- }
- }
-
- set<mds_rank_t> want_targets = need_targets;
- map<mds_rank_t, int>::iterator p = old_prev_targets.begin();
- while (p != old_prev_targets.end()) {
- if (map_targets.count(p->first) == 0 &&
- need_targets.count(p->first) == 0) {
- old_prev_targets.erase(p++);
- continue;
+ for (const auto &it : state.targets) {
+ if (!mds->is_export_target(it.first)) {
+ return false;
}
- dout(20) << " target mds." << p->first << " has been non-target for " << p->second << dendl;
- if (p->second < g_conf->mds_bal_target_removal_min)
- want_targets.insert(p->first);
- if (p->second >= g_conf->mds_bal_target_removal_max)
- send = true;
- ++p;
- }
-
- dout(10) << "check_targets have " << map_targets << " need " << need_targets << " want " << want_targets << dendl;
-
- if (send) {
- MMDSLoadTargets* m = new MMDSLoadTargets(mds_gid_t(mon_client->get_global_id()), want_targets);
- mon_client->send_mon_message(m);
}
- return ok;
+ return true;
}
void MDBalancer::find_exports(CDir *dir,
class MDBalancer {
friend class C_Bal_SendHeartbeat;
-
public:
MDBalancer(MDSRank *m, Messenger *msgr, MonClient *monc) :
mds(m),
messenger(msgr),
mon_client(monc),
beat_epoch(0),
- last_epoch_under(0), last_epoch_over(0), my_load(0.0), target_load(0.0) { }
+ last_epoch_under(0), last_epoch_over(0), my_load(0.0), target_load(0.0)
+ { }
mds_load_t get_load(utime_t);
*/
void tick();
- /**
- * Try to rebalance after receiving monitor mdsmap update.
- *
- * Check if the monitor has recorded the current export targets;
- * if it has then do the actual export. Otherwise send off our
- * export targets message again.
- */
- void try_rebalance();
-
void subtract_export(CDir *ex, utime_t now);
void add_import(CDir *im, utime_t now);
void maybe_fragment(CDir *dir, bool hot);
private:
+ typedef struct {
+ std::map<mds_rank_t, double> targets;
+ std::map<mds_rank_t, double> imported;
+ std::map<mds_rank_t, double> exported;
+ } balance_state_t;
+
//set up the rebalancing targets for export and do one if the
//MDSMap is up to date
void prep_rebalance(int beat);
void export_empties();
int localize_balancer();
- bool check_targets();
+ bool check_targets(const balance_state_t& state);
+ void hit_targets(const balance_state_t& state);
void send_heartbeat();
void handle_heartbeat(MHeartbeat *m);
void find_exports(CDir *dir,
double& have,
set<CDir*>& already_exporting);
- double try_match(mds_rank_t ex, double& maxex,
+ double try_match(balance_state_t &state,
+ mds_rank_t ex, double& maxex,
mds_rank_t im, double& maxim);
- double get_maxim(mds_rank_t im) {
- return target_load - mds_meta_load[im] - imported[im];
+
+ double get_maxim(balance_state_t &state, mds_rank_t im) {
+ return target_load - mds_meta_load[im] - state.imported[im];
}
- double get_maxex(mds_rank_t ex) {
- return mds_meta_load[ex] - target_load - exported[ex];
+ double get_maxex(balance_state_t &state, mds_rank_t ex) {
+ return mds_meta_load[ex] - target_load - state.exported[ex];
}
+ /**
+ * Try to rebalance.
+ *
+ * Check if the monitor has recorded the current export targets;
+ * if it has then do the actual export. Otherwise send off our
+ * export targets message again.
+ */
+ void try_rebalance(balance_state_t& state);
+
MDSRank *mds;
Messenger *messenger;
MonClient *mon_client;
// per-epoch state
double my_load, target_load;
- map<mds_rank_t,double> my_targets;
- map<mds_rank_t,double> imported;
- map<mds_rank_t,double> exported;
-
- map<mds_rank_t, int> old_prev_targets; // # iterations they _haven't_ been targets
};
#endif
dispatch_fragment_dir(mdr);
break;
case CEPH_MDS_OP_EXPORTDIR:
- migrator->dispatch_export_dir(mdr);
+ migrator->dispatch_export_dir(mdr, 0);
break;
case CEPH_MDS_OP_ENQUEUE_SCRUB:
enqueue_scrub_work(mdr);
#include "common/errno.h"
#include "messages/MClientRequestForward.h"
+#include "messages/MMDSLoadTargets.h"
#include "messages/MMDSMap.h"
#include "messages/MMDSTableRequest.h"
#include "messages/MCommand.h"
finisher->start();
}
+void MDSRank::update_targets(utime_t now)
+{
+ // get MonMap's idea of my export_targets
+ const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
+
+ dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
+
+ bool send = false;
+ set<mds_rank_t> new_map_targets;
+
+ auto it = export_targets.begin();
+ while (it != export_targets.end()) {
+ mds_rank_t rank = it->first;
+ double val = it->second.get(now);
+ dout(20) << "export target mds." << rank << " value is " << val << " @ " << now << dendl;
+
+ if (val <= 0.01) {
+ dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
+ export_targets.erase(it++);
+ send = true;
+ continue;
+ }
+ if (!map_targets.count(rank)) {
+ dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
+ send = true;
+ }
+ new_map_targets.insert(rank);
+ it++;
+ }
+ if (new_map_targets.size() < map_targets.size()) {
+ dout(15) << "export target map holds stale targets, sending update" << dendl;
+ send = true;
+ }
+
+ if (send) {
+ dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
+ MMDSLoadTargets* m = new MMDSLoadTargets(mds_gid_t(monc->get_global_id()), new_map_targets);
+ monc->send_mon_message(m);
+ }
+}
+
+void MDSRank::hit_export_target(utime_t now, mds_rank_t rank, double amount)
+{
+ double rate = g_conf->mds_bal_target_decay;
+ if (amount < 0.0) {
+ amount = 100.0/g_conf->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
+ }
+ auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(now, DecayRate(rate)));
+ if (em.second) {
+ dout(15) << "hit export target (new) " << amount << " @ " << now << dendl;
+ } else {
+ dout(15) << "hit export target " << amount << " @ " << now << dendl;
+ }
+ em.first->second.hit(now, amount);
+}
+
void MDSRankDispatcher::tick()
{
heartbeat_reset();
}
// log
- utime_t now = ceph_clock_now();
- mds_load_t load = balancer->get_load(now);
+ mds_load_t load = balancer->get_load(ceph_clock_now());
if (logger) {
logger->set(l_mds_load_cent, 100 * load.mds_load());
if (is_active()) {
balancer->tick();
+ update_targets(ceph_clock_now());
mdcache->find_stale_fragment_freeze();
mdcache->migrator->find_stale_export_freeze();
if (snapserver)
mdcache->migrator->handle_mds_failure_or_stop(*p);
}
- if (!is_any_replay())
- balancer->try_rebalance();
-
{
map<epoch_t,list<MDSInternalContextBase*> >::iterator p = waiting_for_mdsmap.begin();
while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
#ifndef MDS_RANK_H_
#define MDS_RANK_H_
-#include "common/TrackedOp.h"
+#include "common/DecayCounter.h"
#include "common/LogClient.h"
#include "common/Timer.h"
+#include "common/TrackedOp.h"
#include "messages/MCommand.h"
void bcast_mds_map(); // to mounted clients
epoch_t last_client_mdsmap_bcast;
+ map<mds_rank_t,DecayCounter> export_targets; /* targets this MDS is exporting to or wants/tries to */
+
void create_logger();
public:
void dump_status(Formatter *f) const;
+ void hit_export_target(utime_t now, mds_rank_t rank, double amount=-1.0);
+ bool is_export_target(mds_rank_t rank) {
+ const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
+ return map_targets.count(rank);
+ }
+
protected:
void dump_clientreplay_status(Formatter *f) const;
void command_scrub_path(Formatter *f, const string& path, vector<string>& scrubop_vec);
void handle_mds_recovery(mds_rank_t who);
void handle_mds_failure(mds_rank_t who);
// <<<
+
+ /* Update MDSMap export_targets for this rank. Called on ::tick(). */
+ void update_targets(utime_t now);
};
/* This expects to be given a reference which it is responsible for.
}
+class C_M_ExportTargetWait : public MigratorContext {
+ MDRequestRef mdr;
+ int count;
+public:
+ C_M_ExportTargetWait(Migrator *m, MDRequestRef mdr, int count)
+ : MigratorContext(m), mdr(mdr), count(count) {}
+ void finish(int r) override {
+ mig->dispatch_export_dir(mdr, count);
+ }
+};
+
+
/** export_dir(dir, dest)
* public method to initiate an export.
* will fail if the directory is freezing, frozen, unpinnable, or root.
return;
}
+ mds->hit_export_target(ceph_clock_now(), dest, -1);
+
dir->auth_pin(this);
dir->state_set(CDir::STATE_EXPORTING);
return mds->mdcache->dispatch_request(mdr);
}
-void Migrator::dispatch_export_dir(MDRequestRef& mdr)
+void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
{
dout(7) << "dispatch_export_dir " << *mdr << dendl;
- CDir *dir = mdr->more()->export_dir;
+ CDir *dir = mdr->more()->export_dir;
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
// export must have aborted.
}
assert(it->second.state == EXPORT_LOCKING);
+ mds_rank_t dest = it->second.peer;
+
+ if (!mds->is_export_target(dest)) {
+ dout(7) << "dest is not yet an export target" << dendl;
+ if (count > 3) {
+ dout(5) << "dest has not been added as export target after three MDSMap epochs, canceling export" << dendl;
+ mds->mdcache->request_finish(mdr);
+ return;
+ }
+ mds->wait_for_mdsmap(mds->mdsmap->get_epoch(), new C_M_ExportTargetWait(this, mdr, count+1));
+ return;
+ }
+
if (mdr->aborted || dir->is_frozen() || dir->is_freezing()) {
dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
export_try_cancel(dir);
MExportDirDiscover *discover = new MExportDirDiscover(dir->dirfrag(), path,
mds->get_nodeid(),
it->second.tid);
- mds->send_message_mds(discover, it->second.peer);
+ mds->send_message_mds(discover, dest);
assert(g_conf->mds_kill_export_at != 2);
it->second.last_cum_auth_pins_change = ceph_clock_now();
void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
+ mds_rank_t dest(m->get_source().num());
+ utime_t now = ceph_clock_now();
assert(dir);
dout(7) << "export_discover_ack from " << m->get_source()
<< " on " << *dir << dendl;
+ mds->hit_export_target(now, dest, -1);
+
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() ||
it->second.tid != m->get_tid() ||
- it->second.peer != mds_rank_t(m->get_source().num())) {
+ it->second.peer != dest) {
dout(7) << "must have aborted" << dendl;
} else {
assert(it->second.state == EXPORT_DISCOVERING);
void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
+ mds_rank_t dest(m->get_source().num());
+ utime_t now = ceph_clock_now();
assert(dir);
dout(7) << "export_prep_ack " << *dir << dendl;
+ mds->hit_export_target(now, dest, -1);
+
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() ||
it->second.tid != m->get_tid() ||
void Migrator::export_go_synced(CDir *dir, uint64_t tid)
{
-
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() ||
it->second.state == EXPORT_CANCELLING ||
return;
}
assert(it->second.state == EXPORT_WARNING);
+ mds_rank_t dest = it->second.peer;
- dout(7) << "export_go_synced " << *dir << " to " << it->second.peer << dendl;
+ dout(7) << "export_go_synced " << *dir << " to " << dest << dendl;
cache->show_subtrees();
assert(dir->get_cum_auth_pins() == 0);
// set ambiguous auth
- cache->adjust_subtree_auth(dir, mds->get_nodeid(), it->second.peer);
+ cache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
// take away the popularity we're sending.
utime_t now = ceph_clock_now();
req->add_export((*p)->dirfrag());
// send
- mds->send_message_mds(req, it->second.peer);
+ mds->send_message_mds(req, dest);
assert(g_conf->mds_kill_export_at != 8);
+ mds->hit_export_target(now, dest, num_exported_inodes+1);
+
// stats
if (mds->logger) mds->logger->inc(l_mds_exported);
if (mds->logger) mds->logger->inc(l_mds_exported_inodes, num_exported_inodes);
void Migrator::handle_export_ack(MExportDirAck *m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
+ mds_rank_t dest(m->get_source().num());
+ utime_t now = ceph_clock_now();
assert(dir);
assert(dir->is_frozen_tree_root()); // i'm exporting!
// yay!
dout(7) << "handle_export_ack " << *dir << dendl;
+ mds->hit_export_target(now, dest, -1);
+
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
assert(it != export_state.end());
assert(it->second.state == EXPORT_EXPORTING);
void Migrator::handle_export_notify_ack(MExportDirNotifyAck *m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
+ mds_rank_t dest(m->get_source().num());
+ utime_t now = ceph_clock_now();
assert(dir);
mds_rank_t from = mds_rank_t(m->get_source().num());
+ mds->hit_export_target(now, dest, -1);
+
auto export_state_entry = export_state.find(dir);
if (export_state_entry != export_state.end()) {
export_state_t& stat = export_state_entry->second;
// -- import/export --
// exporter
- void dispatch_export_dir(MDRequestRef& mdr);
+ void dispatch_export_dir(MDRequestRef& mdr, int count);
void export_dir(CDir *dir, mds_rank_t dest);
void export_empty_import(CDir *dir);