OPTION(mon_min_osdmap_epochs, OPT_INT, 500)
OPTION(mon_max_pgmap_epochs, OPT_INT, 500)
OPTION(mon_max_log_epochs, OPT_INT, 500)
-OPTION(paxos_slurp_bytes, OPT_INT, 256*1024) // limit size of slurp messages
+OPTION(mon_probe_timeout, OPT_DOUBLE, 2.0)
+OPTION(mon_slurp_timeout, OPT_DOUBLE, 10.0)
+OPTION(mon_slurp_bytes, OPT_INT, 256*1024) // limit size of slurp messages
OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first slurp
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity
elector(this),
leader(0),
+ probe_timeout_event(NULL),
+
paxos(PAXOS_NUM), paxos_service(PAXOS_NUM),
routed_request_tid(0)
{
{
dout(10) << "bootstrap" << dendl;
+ cancel_probe_timeout();
+
// note my rank
rank = monmap->get_rank(name);
state = STATE_PROBING;
leader_since = utime_t();
quorum.clear();
- clear_probe_info();
+ outside_quorum.clear();
for (vector<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
(*p)->restart();
return;
}
+ reset_probe_timeout();
+
// i'm outside the quorum
outside_quorum.insert(name);
}
}
-void Monitor::clear_probe_info()
+void Monitor::cancel_probe_timeout()
{
- outside_quorum.clear();
+ if (probe_timeout_event) {
+ dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
+ timer.cancel_event(probe_timeout_event);
+ probe_timeout_event = NULL;
+ } else {
+ dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
+ }
+}
+
+void Monitor::reset_probe_timeout()
+{
+ cancel_probe_timeout();
+ probe_timeout_event = new C_ProbeTimeout(this);
+ double t = is_probing() ? g_conf->mon_probe_timeout : g_conf->mon_slurp_timeout;
+ timer.add_event_after(t, probe_timeout_event);
+ dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
+}
+
+void Monitor::probe_timeout(int r)
+{
+ dout(4) << "probe_timeout " << probe_timeout_event << dendl;
+ assert(is_probing() || is_slurping());
+ assert(probe_timeout_event);
+ probe_timeout_event = NULL;
+ bootstrap();
}
void Monitor::handle_probe(MMonProbe *m)
{
dout(10) << "slurp " << slurp_source << " " << slurp_versions << dendl;
+ reset_probe_timeout();
+
state = STATE_SLURPING;
map<string,version_t>::iterator p = slurp_versions.begin();
++p) {
len += store->get_bl_sn(r->paxos_values[*p][v], p->c_str(), v);
}
- if (len >= g_conf->paxos_slurp_bytes)
+ if (len >= g_conf->mon_slurp_bytes)
break;
}
{
dout(10) << "start_election" << dendl;
+ cancel_probe_timeout();
+
// call a new election
state = STATE_ELECTING;
clog.info() << "mon." << name << " calling new monitor election\n";
}
bool is_probing() const { return state == STATE_PROBING; }
+ bool is_slurping() const { return state == STATE_SLURPING; }
bool is_electing() const { return state == STATE_ELECTING; }
bool is_leader() const { return state == STATE_LEADER; }
bool is_peon() const { return state == STATE_PEON; }
entity_inst_t slurp_source;
map<string,version_t> slurp_versions;
- void clear_probe_info();
+ Context *probe_timeout_event; // for probing and slurping states
+
+ struct C_ProbeTimeout : public Context {
+ Monitor *mon;
+ C_ProbeTimeout(Monitor *m) : mon(m) {}
+ void finish(int r) {
+ mon->probe_timeout(r);
+ }
+ };
+
+ void reset_probe_timeout();
+ void cancel_probe_timeout();
+ void probe_timeout(int r);
+
void slurp();
public: