for (set<mds_rank_t>::iterator p = up.begin(); p != up.end(); ++p) {
if (*p == mds->get_nodeid())
continue;
- MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
+ MHeartbeat *hb = new MHeartbeat(load, beat_epoch, last_epoch_under);
hb->get_import_map() = import_map;
messenger->send_message(hb,
mds->mdsmap->get_inst(*p));
}
}
mds_import_map[ who ] = m->get_import_map();
+ mds_last_epoch_under_info[who] = m->get_last_epoch_under();
//dout(0) << " load is " << load << " have " << mds_load.size() << dendl;
dout(15) << " mds." << it->second << " is importer" << dendl;
importers.insert(pair<double,mds_rank_t>(it->first,it->second));
importer_set.insert(it->second);
- } else {
+ } else if ((it->first > target_load * (1.0 + g_conf->mds_bal_min_rebalance)) &&
+ (it->second == whoami || !mds_last_epoch_under_info[it->second] || beat_epoch - mds_last_epoch_under_info[it->second] >= 2)){
dout(15) << " mds." << it->second << " is exporter" << dendl;
exporters.insert(pair<double,mds_rank_t>(it->first,it->second));
exporter_set.insert(it->second);
map<mds_rank_t, mds_load_t> mds_load;
map<mds_rank_t, double> mds_meta_load;
map<mds_rank_t, map<mds_rank_t, float> > mds_import_map;
+ map<mds_rank_t, int> mds_last_epoch_under_info;
// per-epoch state
double my_load, target_load;
#include "msg/Message.h"
class MHeartbeat : public Message {
+ static const int HEAD_VERSION = 2;
+ static const int COMPAT_VERSION = 1;
mds_load_t load;
__s32 beat = 0;
+ __s32 last_epoch_under = 0;
map<mds_rank_t, float> import_map;
public:
mds_load_t& get_load() { return load; }
int get_beat() { return beat; }
+ int get_last_epoch_under() { return last_epoch_under; }
map<mds_rank_t, float>& get_import_map() {
return import_map;
}
MHeartbeat()
- : Message(MSG_MDS_HEARTBEAT), load(utime_t()) { }
- MHeartbeat(mds_load_t& load, int beat)
- : Message(MSG_MDS_HEARTBEAT),
+ : Message(MSG_MDS_HEARTBEAT, HEAD_VERSION, COMPAT_VERSION), load(utime_t()) { }
+ MHeartbeat(mds_load_t& load, int beat, int last_epoch_under)
+ : Message(MSG_MDS_HEARTBEAT, HEAD_VERSION, COMPAT_VERSION),
load(load) {
this->beat = beat;
+ this->last_epoch_under = last_epoch_under;
}
private:
~MHeartbeat() override {}
::encode(load, payload);
::encode(beat, payload);
::encode(import_map, payload);
+ ::encode(last_epoch_under, payload);
}
void decode_payload() override {
bufferlist::iterator p = payload.begin();
::decode(load, now, p);
::decode(beat, p);
::decode(import_map, p);
+ if (header.version >= 2) {
+ ::decode(last_epoch_under, p);
+ } else {
+ last_epoch_under = 0;
+ }
}
};