]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: Monitor: add timecheck infrastructure to detect clock skews
authorJoao Eduardo Luis <joao.luis@inktank.com>
Thu, 27 Dec 2012 20:11:33 +0000 (20:11 +0000)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Fri, 11 Jan 2013 00:44:21 +0000 (00:44 +0000)
Fixes: #3633
Fixes: #3695
Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
src/common/config_opts.h
src/mon/Monitor.cc
src/mon/Monitor.h

index 896534d88cf47d8e39998db1f5dd02659d1f1a3f..d90700bff24a410629611fab1de5a62fa7e97cde 100644 (file)
@@ -131,6 +131,7 @@ OPTION(mon_lease_renew_interval, OPT_FLOAT, 3) // on leader, to renew the lease
 OPTION(mon_lease_ack_timeout, OPT_FLOAT, 10.0) // on leader, if lease isn't acked by all peons
 OPTION(mon_clock_drift_allowed, OPT_FLOAT, .050) // allowed clock drift between monitors
 OPTION(mon_clock_drift_warn_backoff, OPT_FLOAT, 5) // exponential backoff for clock drift warnings
+OPTION(mon_timecheck_interval, OPT_FLOAT, 300.0) // on leader, timecheck (clock drift check) interval (seconds)
 OPTION(mon_accept_timeout, OPT_FLOAT, 10.0)    // on leader, if paxos update isn't accepted
 OPTION(mon_pg_create_interval, OPT_FLOAT, 30.0) // no more than every 30s
 OPTION(mon_pg_stuck_threshold, OPT_INT, 300) // number of seconds after which pgs can be considered inactive, unclean, or stale (see doc/control.rst under dump_stuck for more info)
index c5d3b5ebca39cb5371c364c2b5049d4d63194035..b0ca075122fe5c2cd1094025b163aad262e40df9 100644 (file)
@@ -46,6 +46,8 @@
 
 #include "messages/MAuthReply.h"
 
+#include "messages/MTimeCheck.h"
+
 #include "common/strtol.h"
 #include "common/ceph_argparse.h"
 #include "common/Timer.h"
@@ -127,6 +129,11 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorStore *s, Messenger *m, Mo
   elector(this),
   leader(0),
   quorum_features(0),
+
+  timecheck_epoch(0),
+  timecheck_round(0),
+  timecheck_event(NULL),
+
   probe_timeout_event(NULL),
 
   paxos_service(PAXOS_NUM),
@@ -729,6 +736,9 @@ void Monitor::_add_bootstrap_peer_hint(string cmd, string args, ostream& ss)
 void Monitor::reset()
 {
   dout(10) << "reset" << dendl;
+
+  timecheck_cleanup();
+
   leader_since = utime_t();
   if (!quorum.empty()) {
     exited_quorum = ceph_clock_now(g_ceph_context);
@@ -1172,13 +1182,14 @@ void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features)
 
   clog.info() << "mon." << name << "@" << rank
                << " won leader election with quorum " << quorum << "\n";
-  
+
   for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
     (*p)->leader_init();
   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
     (*p)->election_finished();
 
   finish_election();
+  timecheck();
 }
 
 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features) 
@@ -1216,7 +1227,7 @@ void Monitor::finish_election()
     messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
                            monmap->get_inst(*quorum.begin()));
   }
-} 
+}
 
 
 bool Monitor::_allowed_command(MonSession *s, const vector<string>& cmd)
@@ -1337,6 +1348,57 @@ void Monitor::get_health(string& status, bufferlist *detailbl, Formatter *f)
   }
   if (f)
     f->close_section();
+
+  if (f)
+    f->open_array_section("timechecks");
+  if (timecheck_skews.size() != 0) {
+    list<string> warns;
+    for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
+         i != timecheck_skews.end(); ++i) {
+      entity_inst_t inst = i->first;
+      double skew = i->second;
+      double latency = timecheck_latencies[inst];
+      string name = monmap->get_name(inst.addr);
+
+      ostringstream tcss;
+      health_status_t tcstatus = timecheck_status(tcss, skew, latency);
+      if (tcstatus != HEALTH_OK) {
+        overall = tcstatus;
+        warns.push_back(name);
+
+        ostringstream tmp_ss;
+        tmp_ss << "mon." << name
+               << " addr " << inst.addr << " " << tcss.str()
+              << " (latency " << latency << "s)";
+        detail.push_back(make_pair(tcstatus, tmp_ss.str()));
+      }
+
+      if (f) {
+        f->open_object_section(name.c_str());
+        f->dump_string("name", name.c_str());
+        f->dump_float("skew", skew);
+        f->dump_float("latency", latency);
+        f->dump_stream("health") << tcstatus;
+        if (tcstatus != HEALTH_OK)
+          f->dump_stream("details") << tcss.str();
+        f->close_section();
+      }
+    }
+    if (!warns.empty()) {
+      if (!ss.str().empty())
+        ss << ";";
+      ss << " clock skew detected on";
+      while (!warns.empty()) {
+        ss << " mon." << warns.front();
+        warns.pop_front();
+        if (!warns.empty())
+          ss << ",";
+      }
+    }
+  }
+  if (f)
+    f->close_section();
+
   stringstream fss;
   fss << overall;
   status = fss.str() + ss.str();
@@ -2097,6 +2159,10 @@ bool Monitor::_ms_dispatch(Message *m)
       handle_forward((MForward *)m);
       break;
 
+    case MSG_TIMECHECK:
+      handle_timecheck((MTimeCheck *)m);
+      break;
+
     default:
       ret = false;
     }
@@ -2108,6 +2174,283 @@ bool Monitor::_ms_dispatch(Message *m)
   return ret;
 }
 
+void Monitor::timecheck_cleanup()
+{
+  timecheck_round = 0;
+
+  if (timecheck_event) {
+    timer.cancel_event(timecheck_event);
+    timecheck_event = NULL;
+  }
+
+  if (timecheck_waiting.size() > 0)
+    timecheck_waiting.clear();
+  timecheck_skews.clear();
+  timecheck_latencies.clear();
+}
+
+void Monitor::timecheck_report()
+{
+  dout(10) << __func__ << dendl;
+  assert(is_leader());
+  if (monmap->size() == 1) {
+    assert(0 == "We are alone; we shouldn't have gotten here!");
+    return;
+  }
+
+  assert(timecheck_latencies.size() == timecheck_skews.size());
+  for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
+    if (monmap->get_name(*q) == name)
+      continue;
+
+    MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
+    m->epoch = get_epoch();
+    m->round = timecheck_round;
+
+    for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin(); it != timecheck_skews.end(); ++it) {
+      double skew = it->second;
+      double latency = timecheck_latencies[it->first];
+
+      m->skews[it->first] = skew;
+      m->latencies[it->first] = latency;
+
+      dout(10) << __func__ << " " << it->first
+               << " latency " << latency
+               << " skew " << skew << dendl;
+    }
+    entity_inst_t inst = monmap->get_inst(*q);
+    dout(10) << __func__ << " send report to " << inst << dendl;
+    messenger->send_message(m, inst);
+  }
+}
+
+void Monitor::timecheck()
+{
+  dout(10) << __func__ << dendl;
+  assert(is_leader());
+
+  if (monmap->size() == 1) {
+    assert(0 == "We are alone; this shouldn't have been scheduled!");
+    return;
+  }
+
+  timecheck_epoch = get_epoch();
+  timecheck_round++;
+
+  dout(10) << __func__ << " start timecheck epoch " << timecheck_epoch
+           << " round " << timecheck_round << dendl;
+
+  // we are at the eye of the storm; the point of reference
+  timecheck_skews[monmap->get_inst(name)] = 0.0;
+  timecheck_latencies[monmap->get_inst(name)] = 0.0;
+
+  for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
+    if (monmap->get_name(*it) == name)
+      continue;
+
+    entity_inst_t inst = monmap->get_inst(*it);
+    utime_t curr_time = ceph_clock_now(g_ceph_context);
+    timecheck_waiting[inst] = curr_time;
+    MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
+    m->epoch = get_epoch();
+    m->round = timecheck_round;
+    dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
+    messenger->send_message(m, inst);
+  }
+
+  dout(10) << __func__ << " setting up next event and timeout" << dendl;
+  timecheck_event = new C_TimeCheck(this);
+
+  timer.add_event_after(g_conf->mon_timecheck_interval, timecheck_event);
+}
+
+health_status_t Monitor::timecheck_status(ostringstream &ss,
+                                          const double skew_bound,
+                                          const double latency)
+{
+  health_status_t status = HEALTH_OK;
+  double abs_skew = (skew_bound > 0 ? skew_bound : -skew_bound);
+  assert(latency >= 0);
+
+  if (abs_skew > g_conf->mon_clock_drift_allowed) {
+    status = HEALTH_WARN;
+    ss << "clock skew " << abs_skew << "s"
+       << " > max " << g_conf->mon_clock_drift_allowed << "s";
+  }
+
+  return status;
+}
+
+void Monitor::handle_timecheck_leader(MTimeCheck *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+  /* handles PONG's */
+  assert(m->op == MTimeCheck::OP_PONG);
+  assert(m->epoch == timecheck_epoch);
+
+  entity_inst_t other = m->get_source_inst();
+
+  if (m->round < timecheck_round) {
+    dout(1) << __func__ << " got old round " << m->round
+            << " from " << other
+            << " curr " << timecheck_round << " -- discard" << dendl;
+    return;
+  }
+
+  utime_t curr_time = ceph_clock_now(g_ceph_context);
+
+  assert(timecheck_waiting.count(other) > 0);
+  utime_t timecheck_sent = timecheck_waiting[other];
+  timecheck_waiting.erase(other);
+  if (curr_time < timecheck_sent) {
+    // our clock was readjusted -- drop everything until it all makes sense.
+    dout(1) << __func__ << " our clock was readjusted --"
+            << " bump round and drop current check"
+            << dendl;
+    timecheck_round++;
+    timecheck_waiting.clear();
+    return;
+  }
+
+  /* update peer latencies */
+  double latency = (double)(curr_time - timecheck_sent);
+
+  if (timecheck_latencies.count(other) == 0)
+    timecheck_latencies[other] = latency;
+  else {
+    double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
+    timecheck_latencies[other] = avg_latency;
+  }
+
+  /*
+   * update skews
+   *
+   * some nasty thing goes on if we were to do 'a - b' between two utime_t,
+   * and 'a' happens to be lower than 'b'; so we use double instead.
+   *
+   * latency is always expected to be >= 0.
+   *
+   * delta, the difference between theirs timestamp and ours, may either be
+   * lower or higher than 0; will hardly ever be 0.
+   *
+   * The absolute skew is the absolute delta minus the latency, which is
+   * taken as a whole instead of an rtt given that there is some queueing
+   * and dispatch times involved and it's hard to assess how long exactly
+   * it took for the message to travel to the other side and be handled. So
+   * we call it a bounded skew, the worst case scenario.
+   *
+   * Now, to math!
+   *
+   * Given that the latency is always positive, we can establish that the
+   * bounded skew will be:
+   *
+   *  1. positive if the absolute delta is higher than the latency and
+   *     delta is positive
+   *  2. negative if the absolute delta is higher than the latency and
+   *     delta is negative.
+   *  3. zero if the absolute delta is lower than the latency.
+   *
+   * On 3. we make a judgement call and treat the skew as non-existent.
+   * This is because that, if the absolute delta is lower than the
+   * latency, then the apparently existing skew is nothing more than a
+   * side-effect of the high latency at work.
+   *
+   * This may not be entirely true though, as a severely skewed clock
+   * may be masked by an even higher latency, but with high latencies
+   * we probably have worse issues to deal with than just skewed clocks.
+   */
+  assert(latency >= 0);
+
+  double delta = ((double) m->timestamp) - ((double) curr_time);
+  double abs_delta = (delta > 0 ? delta : -delta);
+  double skew_bound = abs_delta - latency;
+  if (skew_bound < 0)
+    skew_bound = 0;
+  else if (delta < 0)
+    skew_bound = -skew_bound;
+
+  ostringstream ss;
+  health_status_t status = timecheck_status(ss, skew_bound, latency);
+  if (status == HEALTH_ERR)
+    clog.error() << other << " " << ss.str() << "\n";
+  else if (status == HEALTH_WARN)
+    clog.warn() << other << " " << ss.str() << "\n";
+
+  dout(10) << __func__ << " from " << other << " ts " << m->timestamp
+          << " delta " << delta << " skew_bound " << skew_bound
+          << " latency " << latency << dendl;
+
+  if (timecheck_skews.count(other) == 0) {
+    timecheck_skews[other] = skew_bound;
+  } else {
+    timecheck_skews[other] = (timecheck_skews[other]*0.8)+(skew_bound*0.2);
+  }
+
+  if (timecheck_waiting.size() == 0)
+    timecheck_report();
+}
+
+void Monitor::handle_timecheck_peon(MTimeCheck *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  assert(is_peon());
+  assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
+
+  if (m->epoch != get_epoch()) {
+    dout(1) << __func__ << " got wrong epoch "
+            << "(ours " << get_epoch()
+            << " theirs: " << m->epoch << ") -- discarding" << dendl;
+    return;
+  }
+
+  if ((m->round < timecheck_round)
+      || (m->round == timecheck_round && m->op != MTimeCheck::OP_REPORT)) {
+    dout(1) << __func__ << " got old round " << m->round
+            << " current " << timecheck_round << " -- discarding" << dendl;
+    return;
+  }
+
+  if (m->op == MTimeCheck::OP_REPORT) {
+    timecheck_latencies.swap(m->latencies);
+    timecheck_skews.swap(m->skews);
+    return;
+  }
+
+  timecheck_round = m->round;
+
+  MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
+  utime_t curr_time = ceph_clock_now(g_ceph_context);
+  reply->timestamp = curr_time;
+  reply->epoch = m->epoch;
+  reply->round = m->round;
+  dout(10) << __func__ << " send " << *m
+           << " to " << m->get_source_inst() << dendl;
+  messenger->send_message(reply, m->get_connection());
+}
+
+void Monitor::handle_timecheck(MTimeCheck *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  if (is_leader()) {
+    if (m->op != MTimeCheck::OP_PONG) {
+      dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
+    } else {
+      handle_timecheck_leader(m);
+    }
+  } else if (is_peon()) {
+    if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
+      dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
+    } else {
+      handle_timecheck_peon(m);
+    }
+  } else {
+    dout(1) << __func__ << " drop unexpected msg" << dendl;
+  }
+  m->put();
+}
+
 void Monitor::handle_subscribe(MMonSubscribe *m)
 {
   dout(10) << "handle_subscribe " << *m << dendl;
index 275b1554508293ffe6c209584e192a5d546449f2..e4eeb4a75c903ec311f68dc702e67877458c5bdd 100644 (file)
@@ -93,6 +93,7 @@ class MMonSubscribe;
 class MAuthRotating;
 class MRoute;
 class MForward;
+class MTimeCheck;
 
 #define COMPAT_SET_LOC "feature_set"
 
@@ -206,6 +207,62 @@ public:
   }
 
 private:
+  /**
+   * @defgroup Monitor_h_TimeCheck Monitor Clock Drift Early Warning System
+   * @{
+   *
+   * We use time checks to keep track of any clock drifting going on in the
+   * cluster. This is accomplished by periodically ping each monitor in the
+   * quorum and register its response time on a map, assessing how much its
+   * clock has drifted. We also take this opportunity to assess the latency
+   * on response.
+   *
+   * This mechanism works as follows:
+   *
+   *  - Leader sends out a 'PING' message to each other monitor in the quorum.
+   *    The message is timestamped with the leader's current time. The leader's
+   *    current time is recorded in a map, associated with each peon's
+   *    instance.
+   *  - The peon replies to the leader with a timestamped 'PONG' message.
+   *  - The leader calculates a delta between the peon's timestamp and its
+   *    current time and stashes it.
+   *  - The leader also calculates the time it took to receive the 'PONG'
+   *    since the 'PING' was sent, and stashes an approximate latency estimate.
+   *  - Once all the quorum members have pong'ed, the leader will share the
+   *    clock skew and latency maps with all the monitors in the quorum.
+   */
+  map<entity_inst_t, utime_t> timecheck_waiting;
+  map<entity_inst_t, double> timecheck_skews;
+  map<entity_inst_t, double> timecheck_latencies;
+  version_t timecheck_epoch;
+  version_t timecheck_round;
+  /**
+   * Time Check event.
+   */
+  Context *timecheck_event;
+
+  struct C_TimeCheck : public Context {
+    Monitor *mon;
+    C_TimeCheck(Monitor *m) : mon(m) { }
+    void finish(int r) {
+      mon->timecheck();
+    }
+  };
+
+  void timecheck_cleanup();
+  void timecheck_report();
+  void timecheck();
+  health_status_t timecheck_status(ostringstream &ss,
+                                   const double skew_bound,
+                                   const double latency);
+  void handle_timecheck_leader(MTimeCheck *m);
+  void handle_timecheck_peon(MTimeCheck *m);
+  void handle_timecheck(MTimeCheck *m);
+  /**
+   * @}
+   */
+
+
   Context *probe_timeout_event;  // for probing and slurping states
 
   struct C_ProbeTimeout : public Context {