#include "messages/MAuthReply.h"
+#include "messages/MTimeCheck.h"
+
#include "common/strtol.h"
#include "common/ceph_argparse.h"
#include "common/Timer.h"
elector(this),
leader(0),
quorum_features(0),
+
+ timecheck_epoch(0),
+ timecheck_round(0),
+ timecheck_event(NULL),
+
probe_timeout_event(NULL),
paxos_service(PAXOS_NUM),
void Monitor::reset()
{
dout(10) << "reset" << dendl;
+
+ timecheck_cleanup();
+
leader_since = utime_t();
if (!quorum.empty()) {
exited_quorum = ceph_clock_now(g_ceph_context);
clog.info() << "mon." << name << "@" << rank
<< " won leader election with quorum " << quorum << "\n";
-
+
for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
(*p)->leader_init();
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
(*p)->election_finished();
finish_election();
+ timecheck();
}
void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features)
messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
monmap->get_inst(*quorum.begin()));
}
-}
+}
bool Monitor::_allowed_command(MonSession *s, const vector<string>& cmd)
}
if (f)
f->close_section();
+
+ if (f)
+ f->open_array_section("timechecks");
+ if (timecheck_skews.size() != 0) {
+ list<string> warns;
+ for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
+ i != timecheck_skews.end(); ++i) {
+ entity_inst_t inst = i->first;
+ double skew = i->second;
+ double latency = timecheck_latencies[inst];
+ string name = monmap->get_name(inst.addr);
+
+ ostringstream tcss;
+ health_status_t tcstatus = timecheck_status(tcss, skew, latency);
+ if (tcstatus != HEALTH_OK) {
+ overall = tcstatus;
+ warns.push_back(name);
+
+ ostringstream tmp_ss;
+ tmp_ss << "mon." << name
+ << " addr " << inst.addr << " " << tcss.str()
+ << " (latency " << latency << "s)";
+ detail.push_back(make_pair(tcstatus, tmp_ss.str()));
+ }
+
+ if (f) {
+ f->open_object_section(name.c_str());
+ f->dump_string("name", name.c_str());
+ f->dump_float("skew", skew);
+ f->dump_float("latency", latency);
+ f->dump_stream("health") << tcstatus;
+ if (tcstatus != HEALTH_OK)
+ f->dump_stream("details") << tcss.str();
+ f->close_section();
+ }
+ }
+ if (!warns.empty()) {
+ if (!ss.str().empty())
+ ss << ";";
+ ss << " clock skew detected on";
+ while (!warns.empty()) {
+ ss << " mon." << warns.front();
+ warns.pop_front();
+ if (!warns.empty())
+ ss << ",";
+ }
+ }
+ }
+ if (f)
+ f->close_section();
+
stringstream fss;
fss << overall;
status = fss.str() + ss.str();
handle_forward((MForward *)m);
break;
+ case MSG_TIMECHECK:
+ handle_timecheck((MTimeCheck *)m);
+ break;
+
default:
ret = false;
}
return ret;
}
+void Monitor::timecheck_cleanup()
+{
+ timecheck_round = 0;
+
+ if (timecheck_event) {
+ timer.cancel_event(timecheck_event);
+ timecheck_event = NULL;
+ }
+
+ if (timecheck_waiting.size() > 0)
+ timecheck_waiting.clear();
+ timecheck_skews.clear();
+ timecheck_latencies.clear();
+}
+
+void Monitor::timecheck_report()
+{
+ dout(10) << __func__ << dendl;
+ assert(is_leader());
+ if (monmap->size() == 1) {
+ assert(0 == "We are alone; we shouldn't have gotten here!");
+ return;
+ }
+
+ assert(timecheck_latencies.size() == timecheck_skews.size());
+ for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
+ if (monmap->get_name(*q) == name)
+ continue;
+
+ MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
+ m->epoch = get_epoch();
+ m->round = timecheck_round;
+
+ for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin(); it != timecheck_skews.end(); ++it) {
+ double skew = it->second;
+ double latency = timecheck_latencies[it->first];
+
+ m->skews[it->first] = skew;
+ m->latencies[it->first] = latency;
+
+ dout(10) << __func__ << " " << it->first
+ << " latency " << latency
+ << " skew " << skew << dendl;
+ }
+ entity_inst_t inst = monmap->get_inst(*q);
+ dout(10) << __func__ << " send report to " << inst << dendl;
+ messenger->send_message(m, inst);
+ }
+}
+
+void Monitor::timecheck()
+{
+ dout(10) << __func__ << dendl;
+ assert(is_leader());
+
+ if (monmap->size() == 1) {
+ assert(0 == "We are alone; this shouldn't have been scheduled!");
+ return;
+ }
+
+ timecheck_epoch = get_epoch();
+ timecheck_round++;
+
+ dout(10) << __func__ << " start timecheck epoch " << timecheck_epoch
+ << " round " << timecheck_round << dendl;
+
+ // we are at the eye of the storm; the point of reference
+ timecheck_skews[monmap->get_inst(name)] = 0.0;
+ timecheck_latencies[monmap->get_inst(name)] = 0.0;
+
+ for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
+ if (monmap->get_name(*it) == name)
+ continue;
+
+ entity_inst_t inst = monmap->get_inst(*it);
+ utime_t curr_time = ceph_clock_now(g_ceph_context);
+ timecheck_waiting[inst] = curr_time;
+ MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
+ m->epoch = get_epoch();
+ m->round = timecheck_round;
+ dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
+ messenger->send_message(m, inst);
+ }
+
+ dout(10) << __func__ << " setting up next event and timeout" << dendl;
+ timecheck_event = new C_TimeCheck(this);
+
+ timer.add_event_after(g_conf->mon_timecheck_interval, timecheck_event);
+}
+
+health_status_t Monitor::timecheck_status(ostringstream &ss,
+ const double skew_bound,
+ const double latency)
+{
+ health_status_t status = HEALTH_OK;
+ double abs_skew = (skew_bound > 0 ? skew_bound : -skew_bound);
+ assert(latency >= 0);
+
+ if (abs_skew > g_conf->mon_clock_drift_allowed) {
+ status = HEALTH_WARN;
+ ss << "clock skew " << abs_skew << "s"
+ << " > max " << g_conf->mon_clock_drift_allowed << "s";
+ }
+
+ return status;
+}
+
+void Monitor::handle_timecheck_leader(MTimeCheck *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ /* handles PONG's */
+ assert(m->op == MTimeCheck::OP_PONG);
+ assert(m->epoch == timecheck_epoch);
+
+ entity_inst_t other = m->get_source_inst();
+
+ if (m->round < timecheck_round) {
+ dout(1) << __func__ << " got old round " << m->round
+ << " from " << other
+ << " curr " << timecheck_round << " -- discard" << dendl;
+ return;
+ }
+
+ utime_t curr_time = ceph_clock_now(g_ceph_context);
+
+ assert(timecheck_waiting.count(other) > 0);
+ utime_t timecheck_sent = timecheck_waiting[other];
+ timecheck_waiting.erase(other);
+ if (curr_time < timecheck_sent) {
+ // our clock was readjusted -- drop everything until it all makes sense.
+ dout(1) << __func__ << " our clock was readjusted --"
+ << " bump round and drop current check"
+ << dendl;
+ timecheck_round++;
+ timecheck_waiting.clear();
+ return;
+ }
+
+ /* update peer latencies */
+ double latency = (double)(curr_time - timecheck_sent);
+
+ if (timecheck_latencies.count(other) == 0)
+ timecheck_latencies[other] = latency;
+ else {
+ double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
+ timecheck_latencies[other] = avg_latency;
+ }
+
+ /*
+ * update skews
+ *
+ * some nasty thing goes on if we were to do 'a - b' between two utime_t,
+ * and 'a' happens to be lower than 'b'; so we use double instead.
+ *
+ * latency is always expected to be >= 0.
+ *
+ * delta, the difference between theirs timestamp and ours, may either be
+ * lower or higher than 0; will hardly ever be 0.
+ *
+ * The absolute skew is the absolute delta minus the latency, which is
+ * taken as a whole instead of an rtt given that there is some queueing
+ * and dispatch times involved and it's hard to assess how long exactly
+ * it took for the message to travel to the other side and be handled. So
+ * we call it a bounded skew, the worst case scenario.
+ *
+ * Now, to math!
+ *
+ * Given that the latency is always positive, we can establish that the
+ * bounded skew will be:
+ *
+ * 1. positive if the absolute delta is higher than the latency and
+ * delta is positive
+ * 2. negative if the absolute delta is higher than the latency and
+ * delta is negative.
+ * 3. zero if the absolute delta is lower than the latency.
+ *
+ * On 3. we make a judgement call and treat the skew as non-existent.
+ * This is because that, if the absolute delta is lower than the
+ * latency, then the apparently existing skew is nothing more than a
+ * side-effect of the high latency at work.
+ *
+ * This may not be entirely true though, as a severely skewed clock
+ * may be masked by an even higher latency, but with high latencies
+ * we probably have worse issues to deal with than just skewed clocks.
+ */
+ assert(latency >= 0);
+
+ double delta = ((double) m->timestamp) - ((double) curr_time);
+ double abs_delta = (delta > 0 ? delta : -delta);
+ double skew_bound = abs_delta - latency;
+ if (skew_bound < 0)
+ skew_bound = 0;
+ else if (delta < 0)
+ skew_bound = -skew_bound;
+
+ ostringstream ss;
+ health_status_t status = timecheck_status(ss, skew_bound, latency);
+ if (status == HEALTH_ERR)
+ clog.error() << other << " " << ss.str() << "\n";
+ else if (status == HEALTH_WARN)
+ clog.warn() << other << " " << ss.str() << "\n";
+
+ dout(10) << __func__ << " from " << other << " ts " << m->timestamp
+ << " delta " << delta << " skew_bound " << skew_bound
+ << " latency " << latency << dendl;
+
+ if (timecheck_skews.count(other) == 0) {
+ timecheck_skews[other] = skew_bound;
+ } else {
+ timecheck_skews[other] = (timecheck_skews[other]*0.8)+(skew_bound*0.2);
+ }
+
+ if (timecheck_waiting.size() == 0)
+ timecheck_report();
+}
+
+void Monitor::handle_timecheck_peon(MTimeCheck *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ assert(is_peon());
+ assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
+
+ if (m->epoch != get_epoch()) {
+ dout(1) << __func__ << " got wrong epoch "
+ << "(ours " << get_epoch()
+ << " theirs: " << m->epoch << ") -- discarding" << dendl;
+ return;
+ }
+
+ if ((m->round < timecheck_round)
+ || (m->round == timecheck_round && m->op != MTimeCheck::OP_REPORT)) {
+ dout(1) << __func__ << " got old round " << m->round
+ << " current " << timecheck_round << " -- discarding" << dendl;
+ return;
+ }
+
+ if (m->op == MTimeCheck::OP_REPORT) {
+ timecheck_latencies.swap(m->latencies);
+ timecheck_skews.swap(m->skews);
+ return;
+ }
+
+ timecheck_round = m->round;
+
+ MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
+ utime_t curr_time = ceph_clock_now(g_ceph_context);
+ reply->timestamp = curr_time;
+ reply->epoch = m->epoch;
+ reply->round = m->round;
+ dout(10) << __func__ << " send " << *m
+ << " to " << m->get_source_inst() << dendl;
+ messenger->send_message(reply, m->get_connection());
+}
+
+void Monitor::handle_timecheck(MTimeCheck *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ if (is_leader()) {
+ if (m->op != MTimeCheck::OP_PONG) {
+ dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
+ } else {
+ handle_timecheck_leader(m);
+ }
+ } else if (is_peon()) {
+ if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
+ dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
+ } else {
+ handle_timecheck_peon(m);
+ }
+ } else {
+ dout(1) << __func__ << " drop unexpected msg" << dendl;
+ }
+ m->put();
+}
+
void Monitor::handle_subscribe(MMonSubscribe *m)
{
dout(10) << "handle_subscribe " << *m << dendl;