]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: Monitor: Add monitor store synchronization support
authorJoao Eduardo Luis <joao.luis@inktank.com>
Tue, 12 Feb 2013 13:25:16 +0000 (13:25 +0000)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Thu, 21 Feb 2013 18:02:22 +0000 (18:02 +0000)
Synchronize two monitor stores when one of the monitors has diverged
significantly from the remaining monitor cluster.

This process roughly consists of the following steps:

  0. mon.X tries to join the cluster;
  1. mon.X verifies that it has diverged from the remaining cluster;
  2. mon.X asks the leader to sync;
  3. the leader allows mon.X to sync, pointing out a mon.Y from
     which mon.X should sync;
  4. mon.X asks mon.Y to sync;
  5. mon.Y sends its own store in one or more chunks;
  6. mon.X acks each received chunk; go to 5;
  7. mon.X receives the last chunk from mon.Y;
  8. mon.X informs the leader that it has finished synchronizing;
  9. the leader acks mon.X's finished sync;
 10. mon.X bootstraps and retries joining the cluster (goto 0.)

This is the most simple and straightforward process that can be hoped
for. However, things may go sideways at any time (monitors failing, for
instance), which could potentially lead to a corrupted monitor store.
There are however mechanisms at work to avoid such scenario at any step
of the process.

Some of these mechanisms include:

 - aborting the sync if the leader fails or leadership changes;
 - state barriers on synchronization functions to avoid stray/outdated
   messages from interfering on the normal monitor behavior or on-going
   synchronization;
 - store clean-up before any synchronization process starts;
 - store clean-up if a sync process fails;
 - resuming sync from a different monitor mon.Z if mon.Y fails mid-sync;
 - several timeouts to guarantee that all the involved parties are still
   alive and participating in the sync effort.
 - request forwarding when mon.X contacts a monitor outside the quorum
   that might know who the leader is (or might know someone who does)
   [4].

Changes:
  - Adapt the MMonProbe message for the single-paxos approach, dropping
    the version map and using a lower and upper bound version instead.
  - Remove old slurp code.
  - Add 'sync force' command; 'sync_force' through the admin socket.

Notes:

[1] It's important to keep track of the paxos version at the time at
    which a store sync starts.  Given that after the sync we end up with
    the same state as the monitor we are synchronizing from, there is a
    chance that we might end up with an uncommitted paxos version if we
    are synchronizing with the leader (there's some paxos stashing done
    prior to commit on the leader).  By keeping track at which version
    the sync started, we can then let the requester to which version he
    should cap its paxos store.

[2] Furthermore, the enforced paxos cap, described on [1], is even more
    important if we consider the need to reapply the paxos versions that
    were received during the sync, to make sure the paxos store is
    consistent.  If we happened to have some yet-uncommitted version in
    the store, we could end up applying it.

[3] What is described in [1] and [2]:

Fixes: #4026
Fixes: #4037
Fixes: #4040
[4] Whenever a given monitor mon.X is on the probing phase and notices
    that there is a mon.Y with a paxos version considerably higher than
    the one mon.X has, then mon.X will attempt to synchronize from
    mon.Y.  This is the basis for the store sync.  However this might
    hold true, the fact is that there might be a chance that, by the
    time mon.Y handles the sync request from mon.X, mon.Y might already
    be attempting a sync himself with some other mon.Z.  In this case,
    the appropriate thing for mon.Y to do is to forward mon.X's request
    to mon.Z, as mon.Z should be part of the quorum, know who the leader
    is or be the leader himself -- if not, at least it is guaranteed
    that mon.Z has a higher version than both mon.X and mon.Y, so it
    should be okay to sync from him.

Fixes: #4162
Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
src/common/config_opts.h
src/messages/MMonProbe.h
src/messages/MMonSync.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/MonitorDBStore.h
src/mon/Paxos.cc
src/mon/Paxos.h

index 04298fb45d8d693c812d7d4954b629d3b5a7a1fa..66fe3882968ac4524848a286dfe55cb399e52116 100644 (file)
@@ -155,9 +155,21 @@ OPTION(mon_slurp_bytes, OPT_INT, 256*1024)    // limit size of slurp messages
 OPTION(mon_client_bytes, OPT_U64, 100ul << 20)  // client msg data allowed in memory (in bytes)
 OPTION(mon_daemon_bytes, OPT_U64, 400ul << 20)  // mds, osd message memory cap (in bytes)
 OPTION(mon_max_log_entries_per_event, OPT_INT, 4096)
+OPTION(mon_sync_trim_timeout, OPT_DOUBLE, 30.0)
+OPTION(mon_sync_heartbeat_timeout, OPT_DOUBLE, 30.0)
+OPTION(mon_sync_heartbeat_interval, OPT_DOUBLE, 5.0)
+OPTION(mon_sync_backoff_timeout, OPT_DOUBLE, 30.0)
+OPTION(mon_sync_timeout, OPT_DOUBLE, 30.0)
+OPTION(mon_sync_max_retries, OPT_INT, 5)
 OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB)
 OPTION(mon_sync_debug, OPT_BOOL, false) // enable sync-specific debug
-OPTION(paxos_max_join_drift, OPT_INT, 10)       // max paxos iterations before we must first slurp
+OPTION(mon_sync_debug_leader, OPT_STR, "") // monitor to be used as the sync leader
+OPTION(mon_sync_debug_provider, OPT_STR, "") // monitor to be used as the sync provider
+OPTION(mon_sync_debug_provider_fallback, OPT_STR, "") // monitor to be used as fallback if sync provider fails
+OPTION(mon_sync_leader_kill_at, OPT_INT, 0) // kill the sync leader at a specifc point in the work flow
+OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a specific point in the work flow
+OPTION(mon_sync_requester_kill_at, OPT_INT, 0) // kill the sync requester at a specific point in the work flow
+OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first sync the monitor stores
 OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0)  // gather updates for this long before proposing a map update
 OPTION(paxos_min_wait, OPT_DOUBLE, 0.05)  // min time to gather updates for after period of inactivity
 OPTION(paxos_trim_tolerance, OPT_INT, 3) // number of extra proposals tolerated before trimming
index b15b6738c305cb470252cf9b75038a4540e56f42..37f7295c2b2a524f0d57a6b6a9396bbe2ca7aa1a 100644 (file)
@@ -49,7 +49,8 @@ public:
   string name;
   set<int32_t> quorum;
   bufferlist monmap_bl;
-  map<string, version_t> paxos_versions;
+  version_t paxos_first_version;
+  version_t paxos_last_version;
   bool has_ever_joined;
 
   string machine_name;
@@ -72,8 +73,12 @@ public:
     out << "mon_probe(" << get_opname(op) << " " << fsid << " name " << name;
     if (quorum.size())
       out << " quorum " << quorum;
-    if (paxos_versions.size())
-      out << " versions " << paxos_versions;
+    if (op == OP_REPLY) {
+      out << " paxos("
+       << " fc " << paxos_first_version
+       << " lc " << paxos_last_version
+       << " )";
+    }
     if (machine_name.length())
       out << " machine_name " << machine_name << " " << oldest_version << "-" << newest_version;
     if (!has_ever_joined)
@@ -95,7 +100,6 @@ public:
     ::encode(name, payload);
     ::encode(quorum, payload);
     ::encode(monmap_bl, payload);
-    ::encode(paxos_versions, payload);
     ::encode(machine_name, payload);
     ::encode(oldest_version, payload);
     ::encode(newest_version, payload);
@@ -103,6 +107,8 @@ public:
     ::encode(latest_value, payload);
     ::encode(latest_version, payload);
     ::encode(has_ever_joined, payload);
+    ::encode(paxos_first_version, payload);
+    ::encode(paxos_last_version, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
@@ -111,7 +117,6 @@ public:
     ::decode(name, p);
     ::decode(quorum, p);
     ::decode(monmap_bl, p);
-    ::decode(paxos_versions, p);
     ::decode(machine_name, p);
     ::decode(oldest_version, p);
     ::decode(newest_version, p);
@@ -122,6 +127,8 @@ public:
       ::decode(has_ever_joined, p);
     else
       has_ever_joined = false;
+    ::decode(paxos_first_version, p);
+    ::decode(paxos_last_version, p);
   }
 };
 
index c03ceba9e8e921dfbd6ad3463f0068dcac725e15..1facaf1c00426609f3792597a54478e0d5f70ddf 100644 (file)
@@ -150,6 +150,11 @@ public:
   * Obtain this message type's name */
   const char *get_type_name() const { return "mon_sync"; }
 
+  void set_reply_to(entity_inst_t other) {
+    reply_to = other;
+    flags |= FLAG_REPLY_TO;
+  }
+
   /**
   * Print this message in a pretty format to @p out
   *
index aba6e53c2edf73e05ed833f13468dd428a44cf8d..5afbb9bf7543dcbbc84984c3d179148f949712cc 100644 (file)
@@ -35,6 +35,7 @@
 #include "messages/MGenericMessage.h"
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
+#include "messages/MMonSync.h"
 #include "messages/MMonProbe.h"
 #include "messages/MMonJoin.h"
 #include "messages/MMonPaxos.h"
@@ -132,6 +133,15 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   leader(0),
   quorum_features(0),
 
+  // trim & store sync
+  sync_role(SYNC_ROLE_NONE),
+  trim_lock("Monitor::trim_lock"),
+  trim_enable_timer(NULL),
+  sync_rng(getpid()),
+  sync_state(SYNC_STATE_NONE),
+  sync_leader(),
+  sync_provider(),
+
   timecheck_round(0),
   timecheck_event(NULL),
 
@@ -213,7 +223,17 @@ void Monitor::do_admin_command(string command, string args, ostream& ss)
     _mon_status(ss);
   else if (command == "quorum_status")
     _quorum_status(ss);
-  else if (command.find("add_bootstrap_peer_hint") == 0)
+  else if (command == "sync_status")
+    _sync_status(ss);
+  else if (command == "sync_force") {
+    if (args != "--yes-i-really-mean-it") {
+      ss << "are you SURE? this will mean the monitor store will be erased "
+            "the next time the monitor is restarted.  pass "
+            "'--yes-i-really-mean-it' if you really do.";
+      return;
+    }
+    _sync_force(ss);
+  } else if (command.find("add_bootstrap_peer_hint") == 0)
     _add_bootstrap_peer_hint(command, args, ss);
   else
     assert(0 == "bad AdminSocket command binding");
@@ -374,14 +394,30 @@ int Monitor::preinit()
     }
   }
 
-  paxos->init();
-  // init paxos
-  for (int i = 0; i < PAXOS_NUM; ++i) {
-    if (paxos->is_consistent()) {
-      paxos_service[i]->update_from_paxos();
-    } // else we don't do anything; handle_probe_reply will detect it's slurping
+  {
+    // We have a potentially inconsistent store state in hands. Get rid of it
+    // and start fresh.
+    bool clear_store = false;
+    if (store->get("mon_sync", "in_sync") > 0) {
+      dout(1) << __func__ << " clean up potentially inconsistent store state"
+             << dendl;
+      clear_store = true;
+    }
+
+    if (store->get("mon_sync", "force_sync") > 0) {
+      dout(1) << __func__ << " force sync by clearing store state" << dendl;
+      clear_store = true;
+    }
+
+    if (clear_store) {
+      set<string> sync_prefixes = get_sync_targets_names();
+      sync_prefixes.insert("mon_sync");
+      store->clear(sync_prefixes);
+    }
   }
 
+  init_paxos();
+
   // we need to bootstrap authentication keys so we can form an
   // initial quorum.
   if (authmon()->get_version() == 0) {
@@ -425,6 +461,9 @@ int Monitor::preinit()
   r = admin_socket->register_command("quorum_status", admin_hook,
                                         "show current quorum status");
   assert(r == 0);
+  r = admin_socket->register_command("sync_status", admin_hook,
+                                    "show current synchronization status");
+  assert(r == 0);
   r = admin_socket->register_command("add_bootstrap_peer_hint", admin_hook,
                                     "add peer address as potential bootstrap peer for cluster bringup");
   assert(r == 0);
@@ -452,6 +491,18 @@ int Monitor::init()
   return 0;
 }
 
+void Monitor::init_paxos()
+{
+  dout(10) << __func__ << dendl;
+  paxos->init();
+  // init paxos
+  for (int i = 0; i < PAXOS_NUM; ++i) {
+    if (paxos->is_consistent()) {
+      paxos_service[i]->update_from_paxos();
+    }
+  }
+}
+
 void Monitor::register_cluster_logger()
 {
   if (!cluster_logger_registered) {
@@ -491,6 +542,7 @@ void Monitor::shutdown()
     AdminSocket* admin_socket = cct->get_admin_socket();
     admin_socket->unregister_command("mon_status");
     admin_socket->unregister_command("quorum_status");
+    admin_socket->unregister_command("sync_status");
     delete admin_hook;
     admin_hook = NULL;
   }
@@ -551,83 +603,1092 @@ void Monitor::bootstrap()
     messenger->mark_down_all();
   }
 
-  // reset
-  state = STATE_PROBING;
+  reset_sync();
+
+  // reset
+  state = STATE_PROBING;
+
+  reset();
+
+  // singleton monitor?
+  if (monmap->size() == 1 && rank == 0) {
+    win_standalone_election();
+    return;
+  }
+
+  reset_probe_timeout();
+
+  // i'm outside the quorum
+  if (monmap->contains(name))
+    outside_quorum.insert(name);
+
+  // probe monitors
+  dout(10) << "probing other monitors" << dendl;
+  for (unsigned i = 0; i < monmap->size(); i++) {
+    if ((int)i != rank)
+      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
+                             monmap->get_inst(i));
+  }
+  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
+       p != extra_probe_peers.end();
+       ++p) {
+    if (*p != messenger->get_myaddr()) {
+      entity_inst_t i;
+      i.name = entity_name_t::MON(-1);
+      i.addr = *p;
+      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
+    }
+  }
+}
+
+void Monitor::_add_bootstrap_peer_hint(string cmd, string args, ostream& ss)
+{
+  dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" << args << "'" << dendl;
+
+  entity_addr_t addr;
+  const char *end = 0;
+  if (!addr.parse(args.c_str(), &end)) {
+    ss << "failed to parse addr '" << args << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
+    return;
+  }
+
+  if (is_leader() || is_peon()) {
+    ss << "mon already active; ignoring bootstrap hint";
+    return;
+  }
+
+  if (addr.get_port() == 0)
+    addr.set_port(CEPH_MON_PORT);
+
+  extra_probe_peers.insert(addr);
+  ss << "adding peer " << addr << " to list: " << extra_probe_peers;
+}
+
+// called by bootstrap(), or on leader|peon -> electing
+void Monitor::reset()
+{
+  dout(10) << "reset" << dendl;
+
+  timecheck_finish();
+
+  leader_since = utime_t();
+  if (!quorum.empty()) {
+    exited_quorum = ceph_clock_now(g_ceph_context);
+  }
+  quorum.clear();
+  outside_quorum.clear();
+
+  paxos->restart();
+
+  for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
+    (*p)->restart();
+}
+
+set<string> Monitor::get_sync_targets_names() {
+  set<string> targets;
+  targets.insert(paxos->get_name());
+  for (int i = 0; i < PAXOS_NUM; ++i)
+    targets.insert(paxos_service[i]->get_service_name());
+
+  return targets;
+}
+
+/**
+ * Reset any lingering sync/trim informations we might have.
+ */
+void Monitor::reset_sync(bool abort)
+{
+  dout(10) << __func__ << dendl;
+  // clear everything trim/sync related
+  {
+    map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
+    for (; iter != trim_timeouts.end(); ++iter) {
+      if (!iter->second)
+        continue;
+
+      timer.cancel_event(iter->second);
+      if (abort) {
+        MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
+        entity_inst_t other = iter->first;
+        messenger->send_message(msg, other);
+      }
+    }
+    trim_timeouts.clear();
+  }
+  {
+    map<entity_inst_t,SyncEntity>::iterator iter = sync_entities.begin();
+    for (; iter != sync_entities.end(); ++iter) {
+      (*iter).second->cancel_timeout();
+    }
+    sync_entities.clear();
+  }
+
+  sync_entities_states.clear();
+  trim_entities_states.clear();
+
+  sync_leader.reset();
+  sync_provider.reset();
+
+  sync_state = SYNC_STATE_NONE;
+  sync_role = SYNC_ROLE_NONE;
+}
+
+// leader
+
+void Monitor::sync_send_heartbeat(entity_inst_t &other, bool reply)
+{
+  dout(10) << __func__ << " " << other << " reply(" << reply << ")" << dendl;
+  uint32_t op = (reply ? MMonSync::OP_HEARTBEAT_REPLY : MMonSync::OP_HEARTBEAT);
+  MMonSync *msg = new MMonSync(op);
+  messenger->send_message(msg, other);
+}
+
+void Monitor::handle_sync_start(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  /* If we are not the leader, then some monitor picked us as the point of
+   * entry to the quorum during its synchronization process. Therefore, we
+   * have an obligation of forwarding this message to leader, so the sender
+   * can start synchronizing.
+   */
+  if (!is_leader() && quorum.size() > 0) {
+    assert(!(sync_role & SYNC_ROLE_REQUESTER));
+    assert(!(sync_role & SYNC_ROLE_LEADER));
+    assert(!is_synchronizing());
+
+    entity_inst_t leader = monmap->get_inst(get_leader());
+    MMonSync *msg = new MMonSync(m);
+    // keep forwarding the message up the chain if it has already been
+    // forwarded.
+    if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
+      msg->set_reply_to(m->get_source_inst());
+    }
+    dout(10) << __func__ << " forward " << *m
+            << " to leader at " << leader << dendl;
+    assert(g_conf->mon_sync_provider_kill_at != 1);
+    messenger->send_message(msg, leader);
+    assert(g_conf->mon_sync_provider_kill_at != 2);
+    m->put();
+    return;
+  }
+
+  // If we are synchronizing, then it means that we know someone who has a
+  // higher version than the one we have; and if someone attempted to sync
+  // from us, then that must mean they have a lower version than us.
+  // Therefore, they must be much more interested in synchronizing from the
+  // one we are trying to synchronize from than they are from us.
+  // Moreover, if we are already synchronizing under the REQUESTER role, then
+  // we must know someone who is in the quorum, either because we were lucky
+  // enough to contact them in the first place or we managed to contact
+  // someone who knew who they were. Therefore, just forward this request to
+  // our sync leader.
+  if (is_synchronizing()) {
+    assert(!(sync_role & SYNC_ROLE_LEADER));
+    assert(!(sync_role & SYNC_ROLE_PROVIDER));
+    assert(quorum.size() == 0);
+    assert(sync_leader.get() != NULL);
+
+    dout(10) << __func__ << " forward " << *m
+             << " to our sync leader at "
+             << sync_leader->entity << dendl;
+
+    MMonSync *msg = new MMonSync(m);
+    // keep forwarding the message up the chain if it has already been
+    // forwarded.
+    if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
+      msg->set_reply_to(m->get_source_inst());
+    }
+    messenger->send_message(msg, sync_leader->entity);
+    m->put();
+    return;
+  }
+
+  // At this point we may or may not be the leader.  If we are not the leader,
+  // it means that we are not in the quorum, but someone would still very much
+  // like to synchronize with us.  In certain circumstances, letting them
+  // synchronize with us is by far our only option -- for instance, when we
+  // need them to form a quorum and they have started fresh or are severely
+  // out of date --, but it won't hurt to let them sync from us anyway: if
+  // they chose us, then they must have noticed that we had a higher version
+  // than they do, so it makes sense to let them try their luck and join the
+  // party.
+
+  Mutex::Locker l(trim_lock);
+  entity_inst_t other =
+    (m->flags & MMonSync::FLAG_REPLY_TO ? m->reply_to : m->get_source_inst());
+
+  assert(g_conf->mon_sync_leader_kill_at != 1);
+
+  if (trim_timeouts.count(other) > 0) {
+    dout(1) << __func__ << " sync session already in progress for " << other
+           << dendl;
+
+    if (trim_entities_states[other] != SYNC_STATE_NONE) {
+      dout(1) << __func__ << "    ignore stray message" << dendl;
+      m->put();
+      return;
+    }
+
+    dout(1) << __func__<< "    destroying current state and creating new"
+           << dendl;
+
+    if (trim_timeouts[other])
+      timer.cancel_event(trim_timeouts[other]);
+    trim_timeouts.erase(other);
+    trim_entities_states.erase(other);
+  }
+
+  MMonSync *msg = new MMonSync(MMonSync::OP_START_REPLY);
+
+  if (((quorum.size() > 0) && paxos->should_trim())
+      || (trim_enable_timer != NULL)) {
+    msg->flags |= MMonSync::FLAG_RETRY;
+  } else {
+    trim_timeouts.insert(make_pair(other, new C_TrimTimeout(this, other)));
+    timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
+
+    trim_entities_states[other] = SYNC_STATE_START;
+    sync_role |= SYNC_ROLE_LEADER;
+
+    paxos->trim_disable();
+
+    // Is the one that contacted us in the quorum?
+    if (quorum.count(m->get_source().num())) {
+      // Then it must have been a forwarded message from someone else
+      assert(m->flags & MMonSync::FLAG_REPLY_TO);
+      // And must not be synchronizing. What sense would that make, eh?
+      assert(trim_timeouts.count(m->get_source_inst()) == 0);
+      // Set the provider as the one that contacted us. He's in the
+      // quorum, so he's up to the job (i.e., he's not synchronizing)
+      msg->set_reply_to(m->get_source_inst());
+      dout(10) << __func__ << " set provider to " << msg->reply_to << dendl;
+    } else if (quorum.size() > 0) {
+      // grab someone from the quorum and assign them as the sync provider
+      int n = _pick_random_quorum_mon(rank);
+      if (n >= 0) {
+        msg->set_reply_to(monmap->get_inst(n));
+        dout(10) << __func__ << " set quorum-based provider to "
+                 << msg->reply_to << dendl;
+      } else {
+        assert(0 == "We shouldn't get here!");
+      }
+    } else {
+      // There is no quorum, so we must either be in a quorum-less cluster,
+      // or we must be mid-election.  Either way, tell them it is okay to
+      // sync from us by not setting the reply-to field.
+      assert(!(msg->flags & MMonSync::FLAG_REPLY_TO));
+    }
+  }
+  messenger->send_message(msg, other);
+  m->put();
+
+  assert(g_conf->mon_sync_leader_kill_at != 2);
+}
+
+void Monitor::handle_sync_heartbeat(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  entity_inst_t other = m->get_source_inst();
+  if (!(sync_role & SYNC_ROLE_LEADER)
+      || !trim_entities_states.count(other)
+      || (trim_entities_states[other] != SYNC_STATE_START)) {
+    // stray message; ignore.
+    dout(1) << __func__ << " ignored stray message " << *m << dendl;
+    m->put();
+    return;
+  }
+
+  if (!is_leader() && (quorum.size() > 0)
+      && (trim_timeouts.count(other) > 0)) {
+    // we must have been the leader before, but we lost leadership to
+    // someone else.
+    sync_finish_abort(other);
+    m->put();
+    return;
+  }
+
+  assert(trim_timeouts.count(other) > 0);
+
+  if (trim_timeouts[other])
+    timer.cancel_event(trim_timeouts[other]);
+  trim_timeouts[other] = new C_TrimTimeout(this, other);
+  timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
+
+  assert(g_conf->mon_sync_leader_kill_at != 3);
+  sync_send_heartbeat(other, true);
+  assert(g_conf->mon_sync_leader_kill_at != 4);
+
+  m->put();
+}
+
+void Monitor::sync_finish(entity_inst_t &entity, bool abort)
+{
+  dout(10) << __func__ << " entity(" << entity << ")" << dendl;
+
+  Mutex::Locker l(trim_lock);
+
+  if (!trim_timeouts.count(entity)) {
+    dout(1) << __func__ << " we know of no sync effort from "
+           << entity << " -- ignore it." << dendl;
+    return;
+  }
+
+  if (trim_timeouts[entity] != NULL)
+    timer.cancel_event(trim_timeouts[entity]);
+
+  trim_timeouts.erase(entity);
+  trim_entities_states.erase(entity);
+
+  if (abort) {
+    MMonSync *m = new MMonSync(MMonSync::OP_ABORT);
+    assert(g_conf->mon_sync_leader_kill_at != 5);
+    messenger->send_message(m, entity);
+    assert(g_conf->mon_sync_leader_kill_at != 6);
+  }
+
+  if (trim_timeouts.size() > 0)
+    return;
+
+  dout(10) << __func__ << " no longer a sync leader" << dendl;
+  sync_role &= ~SYNC_ROLE_LEADER;
+
+  // we may have been the leader, but by now we may no longer be.
+  // this can happen when the we sync'ed a monitor that became the
+  // leader, or that same monitor simply came back to life and got
+  // elected as the new leader.
+  if (is_leader() && paxos->is_trim_disabled()) {
+    trim_enable_timer = new C_TrimEnable(this);
+    timer.add_event_after(30.0, trim_enable_timer);
+  }
+}
+
+void Monitor::handle_sync_finish(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  entity_inst_t other = m->get_source_inst();
+
+  if (!trim_timeouts.count(other) || !trim_entities_states.count(other)
+      || (trim_entities_states[other] != SYNC_STATE_START)) {
+    dout(1) << __func__ << " ignored stray message from " << other << dendl;
+    if (!trim_timeouts.count(other))
+      dout(1) << __func__ << "  not on trim_timeouts" << dendl;
+    if (!trim_entities_states.count(other))
+      dout(1) << __func__ << "  not on trim_entities_states" << dendl;
+    else if (trim_entities_states[other] != SYNC_STATE_START)
+      dout(1) << __func__ << "  state " << trim_entities_states[other] << dendl;
+    m->put();
+    return;
+  }
+
+  // We may no longer the leader. In such case, we should just inform the
+  // other monitor that he should abort his sync. However, it appears that
+  // his sync has finished, so there is no use in scraping the whole thing
+  // now. Therefore, just go along and acknowledge.
+  if (!is_leader()) {
+    dout(10) << __func__ << " We are no longer the leader; reply nonetheless"
+            << dendl;
+  }
+
+  MMonSync *msg = new MMonSync(MMonSync::OP_FINISH_REPLY);
+  assert(g_conf->mon_sync_leader_kill_at != 7);
+  messenger->send_message(msg, other);
+  assert(g_conf->mon_sync_leader_kill_at != 8);
+
+  sync_finish(other);
+  m->put();
+}
+
+// end of leader
+
+// synchronization provider
+
+string Monitor::_pick_random_mon(int other)
+{
+  assert(monmap->size() > 0);
+  if (monmap->size() == 1)
+    return monmap->get_name(0);
+
+  int max = monmap->size();
+  int n = sync_rng() % max;
+  if (other >= 0 && n >= other)
+    n++;
+  return monmap->get_name(n);
+}
+
+int Monitor::_pick_random_quorum_mon(int other)
+{
+  assert(monmap->size() > 0);
+  if (quorum.size() == 0)
+    return -1;
+  set<int>::iterator p = quorum.begin();
+  for (int n = sync_rng() % quorum.size(); p != quorum.end() && n; ++p, --n);
+  if (other >= 0 && p != quorum.end() && *p == other)
+    ++p;
+
+  return (p == quorum.end() ? *(quorum.rbegin()) : *p);
+}
+
+void Monitor::sync_timeout(entity_inst_t &entity)
+{
+  if (state == STATE_SYNCHRONIZING) {
+    assert(sync_role == SYNC_ROLE_REQUESTER);
+    assert(sync_state == SYNC_STATE_CHUNKS);
+
+    // we are a sync requester; our provider just timed out, so find another
+    // monitor to synchronize with.
+    dout(1) << __func__ << " " << sync_provider->entity << dendl;
+
+    sync_provider->attempts++;
+    if ((sync_provider->attempts > g_conf->mon_sync_max_retries)
+       || (monmap->size() == 2)) {
+      // We either tried too many times to sync, or there's just us and the
+      // monitor we were attempting to sync with.
+      // Therefore, just abort the whole sync and start off fresh whenever he
+      // (or somebody else) comes back.
+      sync_requester_abort();
+      return;
+    }
+
+    int i = 0;
+    string entity_name = monmap->get_name(entity.addr);
+    string debug_mon = g_conf->mon_sync_debug_provider;
+    string debug_fallback = g_conf->mon_sync_debug_provider_fallback;
+    while ((i++) < 2*monmap->size()) {
+      // we are trying to pick a random monitor, but we cannot do this forever.
+      // in case something goes awfully wrong, just stop doing it after a
+      // couple of attempts and try again later.
+      string new_mon = _pick_random_mon();
+
+      if (!debug_fallback.empty()) {
+       if (entity_name != debug_fallback)
+         new_mon = debug_fallback;
+       else if (!debug_mon.empty() && (entity_name != debug_mon))
+         new_mon = debug_mon;
+      }
+
+      if ((new_mon != name) && (new_mon != entity_name)) {
+       sync_provider->entity = monmap->get_inst(new_mon);
+       sync_state = SYNC_STATE_START;
+       sync_start_chunks(sync_provider);
+       return;
+      }
+    }
+
+    assert(0 == "Unable to find a new monitor to connect to. Not cool.");
+  } else if (sync_role & SYNC_ROLE_PROVIDER) {
+    dout(10) << __func__ << " cleanup " << entity << dendl;
+    sync_provider_cleanup(entity);
+    return;
+  } else
+    assert(0 == "We should never reach this");
+}
+
+void Monitor::sync_provider_cleanup(entity_inst_t &entity)
+{
+  dout(10) << __func__ << " " << entity << dendl;
+  if (sync_entities.count(entity) > 0) {
+    sync_entities[entity]->cancel_timeout();
+    sync_entities.erase(entity);
+    sync_entities_states.erase(entity);
+  }
+
+  if (sync_entities.size() == 0) {
+    dout(1) << __func__ << " no longer a sync provider" << dendl;
+    sync_role &= ~SYNC_ROLE_PROVIDER;
+  }
+}
+
+void Monitor::handle_sync_start_chunks(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+  assert(!(sync_role & SYNC_ROLE_REQUESTER));
+
+  entity_inst_t other = m->get_source_inst();
+
+  // if we have a sync going on for this entity, just drop the message. If it
+  // was a stray message, we did the right thing. If it wasn't, then it means
+  // that we still have an old state of this entity, and that the said entity
+  // failed in the meantime and is now up again; therefore, just let the
+  // timeout timers fulfill their purpose and deal with state cleanup when
+  // they are triggered. Until then, no Sir, we won't accept your messages.
+  if (sync_entities.count(other) > 0) {
+    dout(1) << __func__ << " sync session already in progress for " << other
+           << " -- assumed as stray message." << dendl;
+    m->put();
+    return;
+  }
+
+  SyncEntity sync = get_sync_entity(other, this);
+  sync->version = paxos->get_version();
+
+  if (!m->last_key.first.empty() && !m->last_key.second.empty()) {
+    sync->last_received_key = m->last_key;
+    dout(10) << __func__ << " set last received key to ("
+            << sync->last_received_key.first << ","
+            << sync->last_received_key.second << ")" << dendl;
+  }
+
+  sync->sync_init();
+
+  sync_entities.insert(make_pair(other, sync));
+  sync_entities_states[other] = SYNC_STATE_START;
+  sync_role |= SYNC_ROLE_PROVIDER;
+
+  sync_send_chunks(sync);
+  m->put();
+}
+
+void Monitor::handle_sync_chunk_reply(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  entity_inst_t other = m->get_source_inst();
+
+  if (!(sync_role & SYNC_ROLE_PROVIDER)
+      || !sync_entities.count(other)
+      || (sync_entities_states[other] != SYNC_STATE_START)) {
+    dout(1) << __func__ << " ignored stray message from " << other << dendl;
+    m->put();
+    return;
+  }
+
+  if (m->flags & MMonSync::FLAG_LAST) {
+    // they acked the last chunk. Clean up.
+    sync_provider_cleanup(other);
+    m->put();
+    return;
+  }
+
+  sync_send_chunks(sync_entities[other]);
+  m->put();
+}
+
+void Monitor::sync_send_chunks(SyncEntity sync)
+{
+  dout(10) << __func__ << " entity(" << sync->entity << ")" << dendl;
+
+  sync->cancel_timeout();
+
+  assert(sync->synchronizer.use_count() > 0);
+  assert(sync->synchronizer->has_next_chunk());
+
+  MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK);
+
+  sync->synchronizer->get_chunk(msg->chunk_bl);
+  msg->last_key = sync->synchronizer->get_last_key();
+  dout(10) << __func__ << " last key ("
+          << msg->last_key.first << ","
+          << msg->last_key.second << ")" << dendl;
+
+  sync->sync_update();
+
+  if (sync->has_crc()) {
+    msg->flags |= MMonSync::FLAG_CRC;
+    msg->crc = sync->crc_get();
+    sync->crc_clear();
+  }
+
+  if (!sync->synchronizer->has_next_chunk()) {
+    msg->flags |= MMonSync::FLAG_LAST;
+    msg->version = sync->get_version();
+    sync->synchronizer.reset();
+  }
+
+  sync->set_timeout(new C_SyncTimeout(this, sync->entity),
+                   g_conf->mon_sync_timeout);
+  assert(g_conf->mon_sync_provider_kill_at != 3);
+  messenger->send_message(msg, sync->entity);
+  assert(g_conf->mon_sync_provider_kill_at != 4);
+
+  // kill the monitor as soon as we move into synchronizing the paxos versions.
+  // This is intended as debug.
+  if (sync->sync_state == SyncEntityImpl::STATE_PAXOS)
+    assert(g_conf->mon_sync_provider_kill_at != 5);
+
+
+}
+// end of synchronization provider
+
+// start of synchronization requester
+
+void Monitor::sync_requester_abort()
+{
+  dout(10) << __func__;
+  assert(state == STATE_SYNCHRONIZING);
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+
+  if (sync_leader.get() != NULL) {
+    *_dout << " " << sync_leader->entity;
+    sync_leader->cancel_timeout();
+    sync_leader.reset();
+  }
+
+  if (sync_provider.get() != NULL) {
+    *_dout << " " << sync_provider->entity;
+    sync_provider->cancel_timeout();
+
+    MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
+    messenger->send_message(msg, sync_provider->entity);
+
+    sync_provider.reset();
+  }
+  *_dout << " clearing potentially inconsistent store" << dendl;
+
+  // Given that we are explicitely aborting the whole sync process, we should
+  // play it safe and clear the store.
+  set<string> targets = get_sync_targets_names();
+  targets.insert("mon_sync");
+  store->clear(targets);
+
+  dout(1) << __func__ << " no longer a sync requester" << dendl;
+  sync_role = SYNC_ROLE_NONE;
+  sync_state = SYNC_STATE_NONE;
+
+  state = 0;
+
+  bootstrap();
+}
+
+/**
+ * Start Sync process
+ *
+ * Create SyncEntity instances for the leader and the provider;
+ * Send OP_START message to the leader;
+ * Set trim timeout on the leader
+ *
+ * @param other Synchronization provider to-be.
+ */
+void Monitor::sync_start(entity_inst_t &other)
+{
+  cancel_probe_timeout();
+
+  dout(10) << __func__ << " entity( " << other << " )" << dendl;
+  if ((state == STATE_SYNCHRONIZING) && (sync_role == SYNC_ROLE_REQUESTER)) {
+    dout(1) << __func__ << " already synchronizing; drop it" << dendl;
+    return;
+  }
+
+  // Looks like we are the acting leader for someone.  Better force them to
+  // abort their endeavours.  After all, if they are trying to sync from us,
+  // it means that we must have a higher paxos version than the one they
+  // have; however, if we are trying to sync as well, it must mean that
+  // someone has a higher version than the one we have.  Everybody wins if
+  // we force them to cancel their sync and try again.
+  if (sync_role & SYNC_ROLE_LEADER) {
+    dout(10) << __func__ << " we are acting as a leader to someone; "
+             << "destroy their dreams" << dendl;
+
+    assert(trim_timeouts.size() > 0);
+    reset_sync();
+  }
+
+  assert(sync_role == SYNC_ROLE_NONE);
+  assert(sync_state == SYNC_STATE_NONE);
+
+  state = STATE_SYNCHRONIZING;
+  sync_role = SYNC_ROLE_REQUESTER;
+  sync_state = SYNC_STATE_START;
+
+  // clear the underlying store, since we are starting a whole
+  // sync process from the bare beginning.
+  set<string> targets = get_sync_targets_names();
+  targets.insert("mon_sync");
+  store->clear(targets);
+
+  MonitorDBStore::Transaction t;
+  t.put("mon_sync", "in_sync", 1);
+  store->apply_transaction(t);
+
+  // assume 'other' as the leader. We will update the leader once we receive
+  // a reply to the sync start.
+  entity_inst_t leader = other;
+  entity_inst_t provider = other;
+
+  if (!g_conf->mon_sync_debug_leader.empty()) {
+    leader = monmap->get_inst(g_conf->mon_sync_debug_leader);
+    dout(10) << __func__ << " assuming " << leader
+            << " as the leader for debug" << dendl;
+  }
+
+  if (!g_conf->mon_sync_debug_provider.empty()) {
+    provider = monmap->get_inst(g_conf->mon_sync_debug_provider);
+    dout(10) << __func__ << " assuming " << provider
+            << " as the provider for debug" << dendl;
+  }
+
+  sync_leader = get_sync_entity(leader, this);
+  sync_provider = get_sync_entity(provider, this);
+
+  // this message may bounce through 'other' (if 'other' is not the leader)
+  // in order to reach the leader. Therefore, set a higher timeout to allow
+  // breathing room for the reply message to reach us.
+  sync_leader->set_timeout(new C_SyncStartTimeout(this),
+                          g_conf->mon_sync_trim_timeout*2);
+
+  MMonSync *m = new MMonSync(MMonSync::OP_START);
+  messenger->send_message(m, other);
+  assert(g_conf->mon_sync_requester_kill_at != 1);
+}
+
+void Monitor::sync_start_chunks(SyncEntity provider)
+{
+  dout(10) << __func__ << " provider(" << provider->entity << ")" << dendl;
+
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+  assert(sync_state == SYNC_STATE_START);
+
+  sync_state = SYNC_STATE_CHUNKS;
+
+  provider->set_timeout(new C_SyncTimeout(this, provider->entity),
+                       g_conf->mon_sync_timeout);
+  MMonSync *msg = new MMonSync(MMonSync::OP_START_CHUNKS);
+  pair<string,string> last_key = provider->last_received_key;
+  if (!last_key.first.empty() && !last_key.second.empty())
+    msg->last_key = last_key;
+
+  assert(g_conf->mon_sync_requester_kill_at != 4);
+  messenger->send_message(msg, provider->entity);
+  assert(g_conf->mon_sync_requester_kill_at != 5);
+}
+
+void Monitor::sync_start_reply_timeout()
+{
+  dout(10) << __func__ << dendl;
+
+  assert(state == STATE_SYNCHRONIZING);
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+  assert(sync_state == SYNC_STATE_START);
+
+  // Restart the sync attempt. It's not as if we were going to lose a vast
+  // amount of work, and if we take into account that we are timing out while
+  // waiting for a reply from the Leader, it sure seems like the right path
+  // to take.
+  sync_requester_abort();
+}
+
+void Monitor::handle_sync_start_reply(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  entity_inst_t other = m->get_source_inst();
+
+  if ((sync_role != SYNC_ROLE_REQUESTER)
+      || (sync_state != SYNC_STATE_START)) {
+    // If the leader has sent this message before we failed, there is no point
+    // in replying to it, as he has no idea that we actually received it. On
+    // the other hand, if he received one of our stray messages (because it was
+    // delivered once he got back up after failing) and replied accordingly,
+    // there is a chance that he did stopped trimming on our behalf. However,
+    // we have no way to know it, and we really don't want to mess with his
+    // state if that is not the case. Therefore, just drop it and let the
+    // timeouts figure it out. Eventually.
+    dout(1) << __func__ << " stray message -- drop it." << dendl;
+    goto out;
+  }
+
+  assert(state == STATE_SYNCHRONIZING);
+  assert(sync_leader.get() != NULL);
+  assert(sync_provider.get() != NULL);
+
+  // We now know for sure who the leader is.
+  sync_leader->entity = other;
+  sync_leader->cancel_timeout();
+
+  if (m->flags & MMonSync::FLAG_RETRY) {
+    dout(10) << __func__ << " retrying sync at a later time" << dendl;
+    sync_role = SYNC_ROLE_NONE;
+    sync_state = SYNC_STATE_NONE;
+    sync_leader->set_timeout(new C_SyncStartRetry(this, sync_leader->entity),
+                            g_conf->mon_sync_backoff_timeout);
+    goto out;
+  }
+
+  if (m->flags & MMonSync::FLAG_REPLY_TO) {
+    dout(10) << __func__ << " leader told us to use " << m->reply_to
+             << " as sync provider" << dendl;
+    sync_provider->entity = m->reply_to;
+  } else {
+    dout(10) << __func__ << " synchronizing from leader at " << other << dendl;
+    sync_provider->entity = other;
+  }
+
+  sync_leader->set_timeout(new C_HeartbeatTimeout(this),
+                          g_conf->mon_sync_heartbeat_timeout);
+
+  assert(g_conf->mon_sync_requester_kill_at != 2);
+  sync_send_heartbeat(sync_leader->entity);
+  assert(g_conf->mon_sync_requester_kill_at != 3);
+
+  sync_start_chunks(sync_provider);
+out:
+  m->put();
+}
+
+void Monitor::handle_sync_heartbeat_reply(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  entity_inst_t other = m->get_source_inst();
+  if ((sync_role != SYNC_ROLE_REQUESTER)
+      || (sync_state == SYNC_STATE_NONE)
+      || (sync_leader.get() == NULL)
+      || (other != sync_leader->entity)) {
+    dout(1) << __func__ << " stray message -- drop it." << dendl;
+    m->put();
+    return;
+  }
+
+  assert(state == STATE_SYNCHRONIZING);
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+  assert(sync_state != SYNC_STATE_NONE);
+
+  assert(sync_leader.get() != NULL);
+  assert(sync_leader->entity == other);
+
+  sync_leader->cancel_timeout();
+  sync_leader->set_timeout(new C_HeartbeatInterval(this, sync_leader->entity),
+                          g_conf->mon_sync_heartbeat_interval);
+  m->put();
+}
+
+void Monitor::handle_sync_chunk(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  entity_inst_t other = m->get_source_inst();
+
+  if ((sync_role != SYNC_ROLE_REQUESTER)
+      || (sync_state != SYNC_STATE_CHUNKS)
+      || (sync_provider.get() == NULL)
+      || (other != sync_provider->entity)) {
+    dout(1) << __func__ << " stray message -- drop it." << dendl;
+    m->put();
+    return;
+  }
+
+  assert(state == STATE_SYNCHRONIZING);
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+  assert(sync_state == SYNC_STATE_CHUNKS);
+
+  assert(sync_leader.get() != NULL);
+
+  assert(sync_provider.get() != NULL);
+  assert(other == sync_provider->entity);
+
+  sync_provider->cancel_timeout();
+
+  MonitorDBStore::Transaction tx;
+  tx.append_from_encoded(m->chunk_bl);
+
+  sync_provider->set_timeout(new C_SyncTimeout(this, sync_provider->entity),
+                            g_conf->mon_sync_timeout);
+  sync_provider->last_received_key = m->last_key;
 
-  reset();
+  MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK_REPLY);
 
-  // singleton monitor?
-  if (monmap->size() == 1 && rank == 0) {
-    win_standalone_election();
-    return;
+  bool stop = false;
+  if (m->flags & MMonSync::FLAG_LAST) {
+    msg->flags |= MMonSync::FLAG_LAST;
+    assert(m->version > 0);
+    tx.put(paxos->get_name(), "last_committed", m->version);
+    stop = true;
   }
+  assert(g_conf->mon_sync_requester_kill_at != 8);
+  messenger->send_message(msg, sync_provider->entity);
 
-  reset_probe_timeout();
+  store->apply_transaction(tx);
 
-  // i'm outside the quorum
-  if (monmap->contains(name))
-    outside_quorum.insert(name);
+  if (g_conf->mon_sync_debug && (m->flags & MMonSync::FLAG_CRC)) {
+    dout(10) << __func__ << " checking CRC" << dendl;
+    MonitorDBStore::Synchronizer sync;
+    if (m->flags & MMonSync::FLAG_LAST) {
+      dout(10) << __func__ << " checking CRC only for Paxos" << dendl;
+      string paxos_name("paxos");
+      sync = store->get_synchronizer(paxos_name);
+    } else {
+      dout(10) << __func__ << " checking CRC for all prefixes" << dendl;
+      set<string> prefixes = get_sync_targets_names();
+      pair<string,string> empty_key;
+      sync = store->get_synchronizer(empty_key, prefixes);
+    }
 
-  // probe monitors
-  dout(10) << "probing other monitors" << dendl;
-  for (unsigned i = 0; i < monmap->size(); i++) {
-    if ((int)i != rank)
-      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
-                             monmap->get_inst(i));
-  }
-  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
-       p != extra_probe_peers.end();
-       ++p) {
-    if (*p != messenger->get_myaddr()) {
-      entity_inst_t i;
-      i.name = entity_name_t::MON(-1);
-      i.addr = *p;
-      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
+    while (sync->has_next_chunk()) {
+      bufferlist bl;
+      sync->get_chunk(bl);
     }
+    __u32 got_crc = sync->crc();
+    dout(10) << __func__ << " expected crc " << m->crc
+            << " got " << got_crc << dendl;
+
+    assert(m->crc == got_crc);
+    dout(10) << __func__ << " CRC matches" << dendl;
   }
+
+  m->put();
+  if (stop)
+    sync_stop();
 }
 
-void Monitor::_add_bootstrap_peer_hint(string cmd, string args, ostream& ss)
+void Monitor::sync_stop()
 {
-  dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" << args << "'" << dendl;
+  dout(10) << __func__ << dendl;
 
-  entity_addr_t addr;
-  const char *end = 0;
-  if (!addr.parse(args.c_str(), &end)) {
-    ss << "failed to parse addr '" << args << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
-    return;
-  }
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+  assert(sync_state == SYNC_STATE_CHUNKS);
 
-  if (is_leader() || is_peon()) {
-    ss << "mon already active; ignoring bootstrap hint";
+  sync_state = SYNC_STATE_STOP;
+
+  sync_leader->cancel_timeout();
+  sync_provider->cancel_timeout();
+  sync_provider.reset();
+
+  entity_inst_t leader = sync_leader->entity;
+
+  sync_leader->set_timeout(new C_SyncFinishReplyTimeout(this),
+                          g_conf->mon_sync_timeout);
+
+  MMonSync *msg = new MMonSync(MMonSync::OP_FINISH);
+  assert(g_conf->mon_sync_requester_kill_at != 9);
+  messenger->send_message(msg, leader);
+  assert(g_conf->mon_sync_requester_kill_at != 10);
+}
+
+void Monitor::sync_finish_reply_timeout()
+{
+  dout(10) << __func__ << dendl;
+  assert(state == STATE_SYNCHRONIZING);
+  assert(sync_leader.get() != NULL);
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+  assert(sync_state == SYNC_STATE_STOP);
+
+  sync_requester_abort();
+}
+
+void Monitor::handle_sync_finish_reply(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+  entity_inst_t other = m->get_source_inst();
+
+  if ((sync_role != SYNC_ROLE_REQUESTER)
+      || (sync_state != SYNC_STATE_STOP)
+      || (sync_leader.get() == NULL)
+      || (sync_leader->entity != other)) {
+    dout(1) << __func__ << " stray message -- drop it." << dendl;
+    m->put();
     return;
   }
 
-  if (addr.get_port() == 0)
-    addr.set_port(CEPH_MON_PORT);
+  assert(sync_role == SYNC_ROLE_REQUESTER);
+  assert(sync_state == SYNC_STATE_STOP);
 
-  extra_probe_peers.insert(addr);
-  ss << "adding peer " << addr << " to list: " << extra_probe_peers;
+  assert(sync_leader.get() != NULL);
+  assert(sync_leader->entity == other);
+
+  sync_role = SYNC_ROLE_NONE;
+  sync_state = SYNC_STATE_NONE;
+
+  sync_leader->cancel_timeout();
+  sync_leader.reset();
+
+  paxos->reapply_all_versions();
+
+  MonitorDBStore::Transaction t;
+  t.erase("mon_sync", "in_sync");
+  store->apply_transaction(t);
+
+  init_paxos();
+
+  assert(g_conf->mon_sync_requester_kill_at != 11);
+
+  m->put();
+
+  bootstrap();
 }
 
-// called by bootstrap(), or on leader|peon -> electing
-void Monitor::reset()
+void Monitor::handle_sync_abort(MMonSync *m)
 {
-  dout(10) << "reset" << dendl;
+  dout(10) << __func__ << " " << *m << dendl;
+  /* This function's responsabilities are manifold, and they depend on
+   * who we (the monitor) are and what is our role in the sync.
+   *
+   * If we are the sync requester (i.e., if we are synchronizing), it
+   * means that we *must* abort the current sync and bootstrap. This may
+   * be required if there was a leader change and we are talking to the
+   * wrong leader, which makes continuing with the current sync way too
+   * risky, given that a Paxos trim may be underway and we certainly incur
+   * in the chance of ending up with an inconsistent store state.
+   *
+   * If we are the sync provider, it means that the requester wants to
+   * abort his sync, either because he lost connectivity to the leader
+   * (i.e., his heartbeat timeout was triggered) or he became aware of a
+   * leader change.
+   *
+   * As a leader, we should never receive such a message though, unless we
+   * have just won an election, in which case we should have been a sync
+   * provider before. In such a case, we should behave as if we were a sync
+   * provider and clean up the requester's state.
+   */
+  entity_inst_t other = m->get_source_inst();
 
-  timecheck_finish();
+  if ((sync_role == SYNC_ROLE_REQUESTER)
+      && (sync_leader.get() != NULL)
+      && (sync_leader->entity == other)) {
 
-  leader_since = utime_t();
-  if (!quorum.empty()) {
-    exited_quorum = ceph_clock_now(g_ceph_context);
-  }
-  quorum.clear();
-  outside_quorum.clear();
+    sync_requester_abort();
+  } else if ((sync_role & SYNC_ROLE_PROVIDER)
+            && (sync_entities.count(other) > 0)
+            && (sync_entities_states[other] == SYNC_STATE_START)) {
 
-  paxos->restart();
+    sync_provider_cleanup(other);
+  } else {
+    dout(1) << __func__ << " stray message -- drop it." << dendl;
+  }
+  m->put();
+}
 
-  for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
-    (*p)->restart();
+void Monitor::handle_sync(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+  switch (m->op) {
+  case MMonSync::OP_START:
+    handle_sync_start(m);
+    break;
+  case MMonSync::OP_START_REPLY:
+    handle_sync_start_reply(m);
+    break;
+  case MMonSync::OP_HEARTBEAT:
+    handle_sync_heartbeat(m);
+    break;
+  case MMonSync::OP_HEARTBEAT_REPLY:
+    handle_sync_heartbeat_reply(m);
+    break;
+  case MMonSync::OP_FINISH:
+    handle_sync_finish(m);
+    break;
+  case MMonSync::OP_START_CHUNKS:
+    handle_sync_start_chunks(m);
+    break;
+  case MMonSync::OP_CHUNK:
+    handle_sync_chunk(m);
+    break;
+  case MMonSync::OP_CHUNK_REPLY:
+    handle_sync_chunk_reply(m);
+    break;
+  case MMonSync::OP_FINISH_REPLY:
+    handle_sync_finish_reply(m);
+    break;
+  case MMonSync::OP_ABORT:
+    handle_sync_abort(m);
+    break;
+  default:
+    dout(0) << __func__ << " unknown op " << m->op << dendl;
+    m->put();
+    assert(0 == "unknown op");
+    break;
+  }
 }
 
 void Monitor::cancel_probe_timeout()
@@ -645,7 +1706,7 @@ void Monitor::reset_probe_timeout()
 {
   cancel_probe_timeout();
   probe_timeout_event = new C_ProbeTimeout(this);
-  double t = is_probing() ? g_conf->mon_probe_timeout : g_conf->mon_slurp_timeout;
+  double t = g_conf->mon_probe_timeout;
   timer.add_event_after(t, probe_timeout_event);
   dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
 }
@@ -653,7 +1714,7 @@ void Monitor::reset_probe_timeout()
 void Monitor::probe_timeout(int r)
 {
   dout(4) << "probe_timeout " << probe_timeout_event << dendl;
-  assert(is_probing() || is_slurping());
+  assert(is_probing() || is_synchronizing());
   assert(probe_timeout_event);
   probe_timeout_event = NULL;
   bootstrap();
@@ -678,18 +1739,6 @@ void Monitor::handle_probe(MMonProbe *m)
     handle_probe_reply(m);
     break;
 
-  case MMonProbe::OP_SLURP:
-    handle_probe_slurp(m);
-    break;
-
-  case MMonProbe::OP_SLURP_LATEST:
-    handle_probe_slurp_latest(m);
-    break;
-
-  case MMonProbe::OP_DATA:
-    handle_probe_data(m);
-    break;
-
   default:
     m->put();
   }
@@ -705,7 +1754,8 @@ void Monitor::handle_probe_probe(MMonProbe *m)
   r->name = name;
   r->quorum = quorum;
   monmap->encode(r->monmap_bl, m->get_connection()->get_features());
-  r->paxos_versions[paxos->get_name()] = paxos->get_version();
+  r->paxos_first_version = paxos->get_first_committed();
+  r->paxos_last_version = paxos->get_version();
   messenger->send_message(r, m->get_connection());
 
   // did we discover a peer here?
@@ -771,247 +1821,77 @@ void Monitor::handle_probe_reply(MMonProbe *m)
     }
   }
 
+  assert(paxos != NULL);
+
+  if (is_synchronizing()) {
+    dout(10) << " we are currently synchronizing, so that will continue."
+             << dendl;
+    m->put();
+    return;
+  }
+
+  entity_inst_t other = m->get_source_inst();
   // is there an existing quorum?
   if (m->quorum.size()) {
     dout(10) << " existing quorum " << m->quorum << dendl;
 
-    // do i need to catch up?
-    bool ok = true;
-    for (map<string,version_t>::iterator p = m->paxos_versions.begin();
-        p != m->paxos_versions.end();
-        ++p) {
-      if (!paxos) {
-       dout(0) << " peer has paxos machine " << p->first << " but i don't... weird" << dendl;
-       continue;  // weird!
-      }
-      if (paxos->is_slurping()) {
-        dout(10) << " My paxos machine " << p->first
-                 << " is currently slurping, so that will continue. Peer has v "
-                 << p->second << dendl;
-        ok = false;
-      } else if (paxos->get_version() + g_conf->paxos_max_join_drift < p->second) {
-       dout(10) << " peer paxos machine " << p->first << " v " << p->second
-                << " vs my v " << paxos->get_version()
-                << " (too far ahead)"
-                << dendl;
-       ok = false;
-      } else {
-       dout(10) << " peer paxos machine " << p->first << " v " << p->second
-                << " vs my v " << paxos->get_version()
-                << " (ok)"
-                << dendl;
-      }
+    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+      dout(10) << " peer paxos version " << m->paxos_last_version
+              << " vs my version " << paxos->get_version()
+              << " (too far ahead)"
+              << dendl;
+      sync_start(other);
+      m->put();
+      return;
     }
-    if (ok) {
-      if (monmap->contains(name) &&
-         !monmap->get_addr(name).is_blank_ip()) {
-       // i'm part of the cluster; just initiate a new election
-       start_election();
-      } else {
-       dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
-       messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
-                               monmap->get_inst(*m->quorum.begin()));
-      }
+    dout(10) << " peer paxos version " << m->paxos_last_version
+             << " vs my version " << paxos->get_version()
+             << " (ok)"
+             << dendl;
+
+    if (monmap->contains(name) &&
+        !monmap->get_addr(name).is_blank_ip()) {
+      // i'm part of the cluster; just initiate a new election
+      start_election();
     } else {
-      slurp_source = m->get_source_inst();
-      slurp_versions = m->paxos_versions;
-      slurp();
+      dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
+      messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
+                              monmap->get_inst(*m->quorum.begin()));
     }
   } else {
-    // not part of a quorum
-    if (monmap->contains(m->name))
+    if (monmap->contains(m->name)) {
+      dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
       outside_quorum.insert(m->name);
-    else
+    } else {
       dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
+      m->put();
+      return;
+    }
+
+    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+      dout(10) << " peer paxos version " << m->paxos_last_version
+              << " vs my version " << paxos->get_version()
+              << " (too far ahead)"
+              << dendl;
+      sync_start(other);
+      m->put();
+      return;
+    }
 
     unsigned need = monmap->size() / 2 + 1;
     dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
-
     if (outside_quorum.size() >= need) {
       if (outside_quorum.count(name)) {
-       dout(10) << " that's enough to form a new quorum, calling election" << dendl;
-       start_election();
+        dout(10) << " that's enough to form a new quorum, calling election" << dendl;
+        start_election();
       } else {
-       dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
+        dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
       }
     } else {
-      dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
-    }
-  }
-  m->put();
-}
-
-/*
- * The whole slurp process is currently a bit of a hack.  Given the
- * current storage model, we should be sharing code with Paxos to make
- * sure we copy the right content.  But that model sucks and will
- * hopefully soon change, and it's less work to kludge around it here
- * than it is to make the current model clean.
- *
- * So: more or less duplicate the work of resyncing each paxos state
- * machine here.  And move the monitor storage refactor stuff up the
- * todo list.
- *
- */
-
-void Monitor::slurp()
-{
-  dout(10) << "slurp " << slurp_source << " " << slurp_versions << dendl;
-
-  /*
-  reset_probe_timeout();
-
-  state = STATE_SLURPING;
-
-  map<string,version_t>::iterator p = slurp_versions.begin();
-  while (p != slurp_versions.end()) {
-    Paxos *pax = get_paxos_by_name(p->first);
-    if (!pax) {
-      p++;
-      continue;
-    }
-
-    dout(10) << " " << p->first << " v " << p->second << " vs my " << pax->get_version() << dendl;
-    if (p->second > pax->get_version() ||
-       pax->get_stashed_version() > pax->get_version()) {
-      if (!pax->is_slurping()) {
-        pax->start_slurping();
-      }
-      MMonProbe *m = new MMonProbe(monmap->fsid, MMonProbe::OP_SLURP, name, has_ever_joined);
-      m->machine_name = p->first;
-      m->oldest_version = pax->get_first_committed();
-      m->newest_version = pax->get_version();
-      messenger->send_message(m, slurp_source);
-      return;
-    }
-
-    // latest?
-    if (pax->get_first_committed() > 1 &&   // don't need it!
-       pax->get_stashed_version() < pax->get_first_committed()) {
-      if (!pax->is_slurping()) {
-        pax->start_slurping();
-      }
-      MMonProbe *m = new MMonProbe(monmap->fsid, MMonProbe::OP_SLURP_LATEST, name, has_ever_joined);
-      m->machine_name = p->first;
-      m->oldest_version = pax->get_first_committed();
-      m->newest_version = pax->get_version();
-      messenger->send_message(m, slurp_source);
-      return;
-    }
-
-    PaxosService *paxs = get_paxos_service_by_name(p->first);
-    assert(paxs);
-    paxs->update_from_paxos();
-
-    pax->end_slurping();
-
-    slurp_versions.erase(p++);
-  }
-
-  dout(10) << "done slurping" << dendl;
-  bootstrap();
-  */
-}
-
-MMonProbe *Monitor::fill_probe_data(MMonProbe *m, Paxos *pax)
-{
-  dout(10) << __func__ << *m << dendl;
-  /*
-  MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_DATA, name, has_ever_joined);
-  r->machine_name = m->machine_name;
-  r->oldest_version = pax->get_first_committed();
-  r->newest_version = pax->get_version();
-
-  version_t v = MAX(pax->get_first_committed(), m->newest_version + 1);
-  int len = 0;
-  for (; v <= pax->get_version(); v++) {
-    len += store->get(m->machine_name.c_str(), v, 
-                      r->paxos_values[m->machine_name][v]);
-
-    for (list<string>::iterator p = pax->extra_state_dirs.begin();
-         p != pax->extra_state_dirs.end();
-         ++p) {
-      len += store->get(p->c_str(), v, r->paxos_values[*p][v]);
-    }
-    if (len >= g_conf->mon_slurp_bytes)
-      break;
-  }
-
-  return r;
-  */
-  return NULL;
-}
-
-void Monitor::handle_probe_slurp(MMonProbe *m)
-{
-  dout(10) << "handle_probe_slurp " << *m << dendl;
-  /*
-  Paxos *pax = get_paxos_by_name(m->machine_name);
-  assert(pax);
-
-  MMonProbe *r = fill_probe_data(m, pax);
-  messenger->send_message(r, m->get_connection());
-  */
-  m->put();
-}
-
-void Monitor::handle_probe_slurp_latest(MMonProbe *m)
-{
-  dout(10) << "handle_probe_slurp_latest " << *m << dendl;
-  /*
-  Paxos *pax = get_paxos_by_name(m->machine_name);
-  assert(pax);
-
-  MMonProbe *r = fill_probe_data(m, pax);
-  r->latest_version = pax->get_stashed(r->latest_value);
-
-  messenger->send_message(r, m->get_connection());
-  */
-  m->put();
-}
-
-void Monitor::handle_probe_data(MMonProbe *m)
-{
-  dout(10) << "handle_probe_data " << *m << dendl;
-  /*
-  Paxos *pax = get_paxos_by_name(m->machine_name);
-  assert(pax);
-
-  // trim old cruft?
-  if (m->oldest_version > pax->get_first_committed())
-    pax->trim_to(m->oldest_version, true);
-
-  // note new latest version?
-  if (slurp_versions.count(m->machine_name))
-    slurp_versions[m->machine_name] = m->newest_version;
-
-  // store any new stuff
-  if (m->paxos_values.size()) {
-    map<string, map<version_t,bufferlist> >::iterator p;
-   
-    // bundle everything on a single transaction
-    MonitorDBStore::Transaction t;
-    
-    for (p = m->paxos_values.begin(); p != m->paxos_values.end(); ++p) {
-      store->put(&t, p->first.c_str(), p->second.begin(), p->second.end());
+       dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
     }
-
-    pax->last_committed = m->paxos_values.begin()->second.rbegin()->first;
-    store->put(&t, m->machine_name.c_str(), 
-               "last_committed", pax->last_committed);
-
-    store->apply_transaction(t);
-  }
-
-  // latest?
-  if (m->latest_version) {
-    pax->stash_latest(m->latest_version, m->latest_value);
   }
-
   m->put();
-
-  slurp();
-  */
 }
 
 void Monitor::start_election()
@@ -1090,6 +1970,19 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features
   dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
           << " quorum is " << quorum << " features are " << quorum_features << dendl;
 
+  // let everyone currently syncing know that we are no longer the leader and
+  // that they should all abort their on-going syncs
+  for (map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
+       iter != trim_timeouts.end();
+       ++iter) {
+    timer.cancel_event((*iter).second);
+    entity_inst_t entity = (*iter).first;
+    MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
+    messenger->send_message(msg, entity);
+  }
+  trim_timeouts.clear();
+  sync_role &= ~SYNC_ROLE_LEADER;
+  
   paxos->peon_init();
   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
     (*p)->election_finished();
@@ -1142,6 +2035,92 @@ bool Monitor::_allowed_command(MonSession *s, const vector<string>& cmd)
   return false;
 }
 
+void Monitor::_sync_status(ostream& ss)
+{
+  JSONFormatter jf(true);
+  jf.open_object_section("sync_status");
+  jf.dump_string("state", get_state_name());
+  jf.dump_unsigned("paxos_version", paxos->get_version());
+
+  if (is_leader() || (sync_role == SYNC_ROLE_LEADER)) {
+    Mutex::Locker l(trim_lock);
+    jf.open_object_section("trim");
+    jf.dump_int("disabled", paxos->is_trim_disabled());
+    jf.dump_int("should_trim", paxos->should_trim());
+    if (trim_timeouts.size() > 0) {
+      jf.open_array_section("mons");
+      for (map<entity_inst_t,Context*>::iterator it = trim_timeouts.begin();
+          it != trim_timeouts.end();
+          ++it) {
+       entity_inst_t e = (*it).first;
+       jf.dump_stream("mon") << e;
+       int s = -1;
+       if (trim_entities_states.count(e))
+         s = trim_entities_states[e];
+       jf.dump_stream("sync_state") << get_sync_state_name(s);
+      }
+    }
+    jf.close_section();
+  }
+
+  if ((sync_entities.size() > 0) || (sync_role == SYNC_ROLE_PROVIDER)) {
+    jf.open_array_section("on_going");
+    for (map<entity_inst_t,SyncEntity>::iterator it = sync_entities.begin();
+        it != sync_entities.end();
+        ++it) {
+      entity_inst_t e = (*it).first;
+      jf.open_object_section("mon");
+      jf.dump_stream("addr") << e;
+      jf.dump_string("state", (*it).second->get_state());
+      int s = -1;
+      if (sync_entities_states.count(e))
+         s = sync_entities_states[e];
+      jf.dump_stream("sync_state") << get_sync_state_name(s);
+
+      jf.close_section();
+    }
+    jf.close_section();
+  }
+
+  if (is_synchronizing() || (sync_role == SYNC_ROLE_REQUESTER)) {
+    jf.open_object_section("leader");
+    SyncEntity sync_entity = sync_leader;
+    if (sync_entity.get() != NULL)
+      jf.dump_stream("addr") << sync_entity->entity;
+    jf.close_section();
+
+    jf.open_object_section("provider");
+    sync_entity = sync_provider;
+    if (sync_entity.get() != NULL)
+      jf.dump_stream("addr") << sync_entity->entity;
+    jf.close_section();
+  }
+
+  if (g_conf->mon_sync_leader_kill_at > 0)
+    jf.dump_int("leader_kill_at", g_conf->mon_sync_leader_kill_at);
+  if (g_conf->mon_sync_provider_kill_at > 0)
+    jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
+  if (g_conf->mon_sync_requester_kill_at > 0)
+    jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
+
+  jf.close_section();
+  jf.flush(ss);
+}
+
+void Monitor::_sync_force(ostream& ss)
+{
+  MonitorDBStore::Transaction tx;
+  tx.put("mon_sync", "force_sync", 1);
+  store->apply_transaction(tx);
+
+  JSONFormatter jf(true);
+  jf.open_object_section("sync_force");
+  jf.dump_int("ret", 0);
+  jf.dump_stream("msg") << "forcing store sync the next time the monitor starts";
+  jf.close_section();
+  jf.flush(ss);
+}
+
 void Monitor::_quorum_status(ostream& ss)
 {
   JSONFormatter jf(true);
@@ -1180,12 +2159,9 @@ void Monitor::_mon_status(ostream& ss)
     jf.dump_string("mon", *p);
   jf.close_section();
 
-  if (is_slurping()) {
-    jf.dump_stream("slurp_source") << slurp_source;
-    jf.open_object_section("slurp_version");
-    for (map<string,version_t>::iterator p = slurp_versions.begin(); p != slurp_versions.end(); ++p)
-      jf.dump_int(p->first.c_str(), p->second);          
-    jf.close_section();
+  if (is_synchronizing()) {
+    jf.dump_stream("sync_leader") << sync_leader->entity;
+    jf.dump_stream("sync_provider") << sync_provider->entity;
   }
 
   jf.open_object_section("monmap");
@@ -1581,6 +2557,35 @@ void Monitor::handle_command(MMonCommand *m)
     _mon_status(ss);
     rs = ss.str();
     r = 0;
+  } else if (m->cmd[0] == "sync") {
+      if (!access_r) {
+       r = -EACCES;
+       rs = "access denied";
+       goto out;
+      }
+      if (m->cmd[1] == "status") {
+       stringstream ss;
+       _sync_status(ss);
+       rs = ss.str();
+       r = 0;
+      } else if (m->cmd[1] == "force") {
+        if (m->cmd.size() < 4 || m->cmd[2] != "--yes-i-really-mean-it"
+            || m->cmd[3] != "--i-know-what-i-am-doing") {
+          r = -EINVAL;
+          rs = "are you SURE? this will mean the monitor store will be "
+               "erased.  pass '--yes-i-really-mean-it "
+               "--i-know-what-i-am-doing' if you really do.";
+          goto out;
+        }
+       stringstream ss;
+       _sync_force(ss);
+       rs = ss.str();
+       r = 0;
+      } else {
+       rs = "unknown command";
+       r = -EINVAL;
+       goto out;
+      }
   } else if (m->cmd[0] == "heap") {
     if (!access_all) {
       r = -EACCES;
@@ -2008,6 +3013,11 @@ bool Monitor::_ms_dispatch(Message *m)
       handle_probe((MMonProbe*)m);
       break;
 
+    // Sync (i.e., the new slurp, but on steroids)
+    case MSG_MON_SYNC:
+      handle_sync((MMonSync*)m);
+      break;
+
       // OSDs
     case MSG_OSD_FAILURE:
     case MSG_OSD_BOOT:
@@ -2069,6 +3079,15 @@ bool Monitor::_ms_dispatch(Message *m)
          break;
        }
 
+       if (state == STATE_SYNCHRONIZING) {
+         // we are synchronizing. These messages would do us no
+         // good, thus just drop them and ignore them.
+         dout(10) << __func__ << " ignore paxos msg from "
+                  << pm->get_source_inst() << dendl;
+         pm->put();
+         break;
+       }
+
        // sanitize
        if (pm->epoch > get_epoch()) {
          bootstrap();
@@ -2101,7 +3120,7 @@ bool Monitor::_ms_dispatch(Message *m)
        m->put();
        break;
       }
-      if (!is_probing() && !is_slurping()) {
+      if (!is_probing() && !is_synchronizing()) {
        elector.dispatch(m);
       } else {
        m->put();
@@ -2680,10 +3699,8 @@ void Monitor::tick()
   // ok go.
   dout(11) << "tick" << dendl;
   
-  if (!is_slurping()) {
-    for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) {
-      (*p)->tick();
-    }
+  for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) {
+    (*p)->tick();
   }
   
   // trim sessions
index 3d2e35b9ad17022c3acf6907a386e1bc5a7c3500..efed1b65d056b02275dde1988cce3e46c3d9abb5 100644 (file)
@@ -36,6 +36,7 @@
 #include "osd/OSDMap.h"
 
 #include "common/LogClient.h"
+#include "common/SimpleRNG.h"
 
 #include "auth/cephx/CephxKeyServer.h"
 #include "auth/AuthMethodList.h"
@@ -47,6 +48,7 @@
 #include "mon/MonitorDBStore.h"
 
 #include <memory>
+#include <tr1/memory>
 #include <errno.h>
 
 
@@ -87,6 +89,7 @@ class AdminSocketHook;
 
 class MMonGetMap;
 class MMonGetVersion;
+class MMonSync;
 class MMonProbe;
 class MMonSubscribe;
 class MAuthRotating;
@@ -142,7 +145,7 @@ public:
 private:
   enum {
     STATE_PROBING = 1,
-    STATE_SLURPING,
+    STATE_SYNCHRONIZING,
     STATE_ELECTING,
     STATE_LEADER,
     STATE_PEON,
@@ -154,7 +157,7 @@ public:
   static const char *get_state_name(int s) {
     switch (s) {
     case STATE_PROBING: return "probing";
-    case STATE_SLURPING: return "slurping";
+    case STATE_SYNCHRONIZING: return "synchronizing";
     case STATE_ELECTING: return "electing";
     case STATE_LEADER: return "leader";
     case STATE_PEON: return "peon";
@@ -162,11 +165,14 @@ public:
     }
   }
   const char *get_state_name() const {
-    return get_state_name(state);
+    string sn(get_state_name(state));
+    string sync_name(get_sync_state_name());
+    sn.append(sync_name);
+    return sn.c_str();
   }
 
   bool is_probing() const { return state == STATE_PROBING; }
-  bool is_slurping() const { return state == STATE_SLURPING; }
+  bool is_synchronizing() const { return state == STATE_SYNCHRONIZING; }
   bool is_electing() const { return state == STATE_ELECTING; }
   bool is_leader() const { return state == STATE_LEADER; }
   bool is_peon() const { return state == STATE_PEON; }
@@ -186,8 +192,882 @@ private:
   uint64_t quorum_features;  ///< intersection of quorum member feature bits
 
   set<string> outside_quorum;
-  entity_inst_t slurp_source;
-  map<string,version_t> slurp_versions;
+
+  /**
+   * @defgroup Synchronization
+   * @{
+   */
+  /**
+   * Obtain the synchronization target prefixes in set form.
+   *
+   * We consider a target prefix all those that are relevant when
+   * synchronizing two stores. That is, all those that hold paxos service's
+   * versions, as well as paxos versions, or any control keys such as the
+   * first or last committed version.
+   *
+   * Given the current design, this function should return the name of all and
+   * any available paxos service, plus the paxos name.
+   *
+   * @returns a set of strings referring to the prefixes being synchronized
+   */
+  set<string> get_sync_targets_names();
+  /**
+   * Handle a sync-related message
+   *
+   * This function will call the appropriate handling functions for each
+   * operation type.
+   *
+   * @param m A sync-related message (i.e., of type MMonSync)
+   */
+  void handle_sync(MMonSync *m);
+  /**
+   * Handle a sync-related message of operation type OP_ABORT.
+   *
+   * @param m A sync-related message of type OP_ABORT
+   */
+  void handle_sync_abort(MMonSync *m);
+  /**
+   * Reset the monitor's sync-related data structures and state.
+   */
+  void reset_sync(bool abort = false);
+
+  /**
+   * @defgroup Synchronization_Roles
+   * @{
+   */
+  /**
+   * The monitor has no role in any on-going synchronization.
+   */
+  static const uint8_t SYNC_ROLE_NONE      = 0x0;
+  /**
+   * The monitor is the Leader in at least one synchronization.
+   */
+  static const uint8_t SYNC_ROLE_LEADER            = 0x1;
+  /**
+   * The monitor is the Provider in at least one synchronization.
+   */
+  static const uint8_t SYNC_ROLE_PROVIDER   = 0x2;
+  /**
+   * The monitor is a requester in the on-going synchronization.
+   */
+  static const uint8_t SYNC_ROLE_REQUESTER  = 0x4;
+
+  /**
+   * The monitor's current role in on-going synchronizations, if any.
+   *
+   * A monitor can either be part of no synchronization at all, in which case
+   * @p sync_role shall hold the value @p SYNC_ROLE_NONE, or it can be part of
+   * an on-going synchronization, in which case it may be playing either one or
+   * two roles at the same time:
+   *
+   *  - If the monitor is the sync requester (i.e., be the one synchronizing
+   *    against some other monitor), the @p sync_role field will hold only the
+   *    @p SYNC_ROLE_REQUESTER value.
+   *  - Otherwise, the monitor can be either a sync leader, or a sync provider,
+   *    or both, in which case @p sync_role will hold a binary OR of both
+   *    @p SYNC_ROLE_LEADER and @p SYNC_ROLE_PROVIDER.
+   */
+  uint8_t sync_role;
+  /**
+   * @}
+   */
+  /**
+   * @defgroup Leader-specific
+   * @{
+   */
+  /**
+   * Guarantee mutual exclusion access to the @p trim_timeouts map.
+   *
+   * We need this mutex specially when we have a monitor starting a sync with
+   * the leader and another one finishing or aborting an on-going sync, that
+   * happens to be the last on-going trim on the map. Given that we will
+   * enable the Paxos trim once we deplete the @p trim_timeouts map, we must
+   * then ensure that we either add the new sync start to the map before
+   * removing the one just finishing, or that we remove the finishing one
+   * first and enable the trim before we add the new one. If we fail to do
+   * this, nasty repercussions could follow.
+   */
+  Mutex trim_lock;
+  /**
+   * Map holding all on-going syncs' timeouts.
+   *
+   * An on-going sync leads to the Paxos trim to be suspended, and this map
+   * will associate entities to the timeouts to be triggered if the monitor
+   * being synchronized fails to check-in with the leader, letting him know
+   * that the sync is still in effect and that in no circumstances should the
+   * Paxos trim be enabled.
+   */
+  map<entity_inst_t, Context*> trim_timeouts;
+  map<entity_inst_t, uint8_t> trim_entities_states;
+  /**
+   * Map associating monitors to a sync state.
+   *
+   * This map is used by both the Leader and the Sync Provider, and has the
+   * sole objective of keeping track of the state each monitor's sync process
+   * is in.
+   */
+  map<entity_inst_t, uint8_t> sync_entities_states;
+  /**
+   * Timer that will enable the Paxos trim.
+   *
+   * This timer is set after the @p trim_timeouts map is depleted, and once
+   * fired it will enable the Paxos trim (if still disabled). By setting
+   * this timer, we avoid a scenario in which a monitor has just finished
+   * synchronizing, but because the Paxos trim had been disabled for a long,
+   * long time and a lot of trims were proposed in the timespan of the monitor
+   * finishing its sync and actually joining the cluster, the monitor happens
+   * to be out-of-sync yet again. Backing off enabling the Paxos trim will
+   * allow the other monitor to join the cluster before actually trimming.
+   */
+  Context *trim_enable_timer;
+
+  /**
+   * Callback class responsible for finishing a monitor's sync session on the
+   * leader's side, because the said monitor failed to acknowledge its
+   * liveliness in a timely manner, thus being assumed as failed.
+   */
+  struct C_TrimTimeout : public Context {
+    Monitor *mon;
+    entity_inst_t entity;
+
+    C_TrimTimeout(Monitor *m, entity_inst_t& entity)
+      : mon(m), entity(entity) { }
+    void finish(int r) {
+      mon->sync_finish(entity);
+    }
+  };
+
+  /**
+   * Callback class responsible for enabling the Paxos trim if there are no
+   * more on-going syncs.
+   */
+  struct C_TrimEnable : public Context {
+    Monitor *mon;
+
+    C_TrimEnable(Monitor *m) : mon(m) { }
+    void finish(int r) {
+      Mutex::Locker(mon->trim_lock);
+      // even if we are no longer the leader, we should re-enable trim if
+      // we have disabled it in the past. It doesn't mean we are going to
+      // do anything about it, but if we happen to become the leader
+      // sometime down the future, we sure want to have the trim enabled.
+      if (!mon->trim_timeouts.size())
+       mon->paxos->trim_enable();
+      mon->trim_enable_timer = NULL;
+    }
+  };
+
+  /**
+   * Send a heartbeat message to another entity.
+   *
+   * The sent message may be a heartbeat reply if the @p reply parameter is
+   * set to true.
+   *
+   * This function is used both by the leader (always with @p reply = true),
+   * and by the sync requester (always with @p reply = false).
+   *
+   * @param other The target monitor's entity instance.
+   * @param reply Whether the message to be sent should be a heartbeat reply.
+   */
+  void sync_send_heartbeat(entity_inst_t &other, bool reply = false);
+  /**
+   * Handle a Sync Start request.
+   *
+   * Monitors wanting to synchronize with the cluster will have to first ask
+   * the leader to do so. The only objective with this is so that the we can
+   * gurantee that the leader won't trim the paxos state.
+   *
+   * The leader may not be the only one receiving this request. A sync provider
+   * may also receive it when it is taken as the point of entry onto the
+   * cluster. In this scenario, the provider must then forward this request to
+   * the leader, if he know of one, or assume himself as the leader for this
+   * sync purpose (this may happen if there is no formed quorum).
+   *
+   * @param m Sync message with operation type MMonSync::OP_START
+   */
+  void handle_sync_start(MMonSync *m);
+  /**
+   * Handle a Heartbeat sent by a sync requester.
+   *
+   * We use heartbeats as a way to guarantee that both the leader and the sync
+   * requester are still alive. Receiving this message means that the requester
+   * if still working on getting his store synchronized.
+   *
+   * @param m Sync message with operation type MMonSync::OP_HEARTBEAT
+   */
+  void handle_sync_heartbeat(MMonSync *m);
+  /**
+   * Handle a Sync Finish.
+   *
+   * A MMonSync::OP_FINISH is the way the sync requester has to inform the
+   * leader that he finished synchronizing his store.
+   *
+   * @param m Sync message with operation type MMonSync::OP_FINISH
+   */
+  void handle_sync_finish(MMonSync *m);
+  /**
+   * Finish a given monitor's sync process on the leader's side.
+   *
+   * This means cleaning up the state referring to the monitor whose sync has
+   * finished (may it have been finished successfully, by receiving a message
+   * with type MMonSync::OP_FINISH, or due to the assumption that the said
+   * monitor failed).
+   *
+   * If we happen to know of no other monitor synchronizing, we may then enable
+   * the paxos trim.
+   *
+   * @param entity Entity instance of the monitor whose sync we are considering
+   *              as finished.
+   * @param abort If true, we consider this sync has finished due to an abort.
+   */
+  void sync_finish(entity_inst_t &entity, bool abort = false);
+  /**
+   * Abort a given monitor's sync process on the leader's side.
+   *
+   * This function is a wrapper for Monitor::sync_finish().
+   *
+   * @param entity Entity instance of the monitor whose sync we are aborting.
+   */
+  void sync_finish_abort(entity_inst_t &entity) {
+    sync_finish(entity, true);
+  }
+  /**
+   * @} // Leader-specific
+   */
+  /**
+   * @defgroup Synchronization Provider-specific
+   * @{
+   */
+  /**
+   * Represents a participant in a synchronization, along with its state.
+   *
+   * This class is used to track down all the sync requesters we are providing
+   * to. In such scenario, it won't be uncommon to have the @p synchronizer
+   * field set with a connection to the MonitorDBStore, the @p timeout field
+   * containing a timeout event and @p entity containing the entity instance
+   * of the monitor we are here representing.
+   *
+   * The sync requester will also use this class to represent both the sync
+   * leader and the sync provider.
+   */
+  struct SyncEntityImpl {
+
+    /**
+     * Store synchronization related Sync state.
+     */
+    enum {
+      /**
+       * No state whatsoever. We are not providing any sync suppport.
+       */
+      STATE_NONE   = 0,
+      /**
+       * This entity's sync effort is currently focused on reading and sharing
+       * our whole store state with @p entity. This means all the entries in
+       * the key/value space.
+       */
+      STATE_WHOLE  = 1,
+      /**
+       * This entity's sync effor is currently focused on reading and sharing
+       * our Paxos state with @p entity. This means all the Paxos-related
+       * key/value entries, such as the Paxos versions.
+       */
+      STATE_PAXOS  = 2
+    };
+
+    /**
+     * The entity instace of the monitor whose sync effort we are representing.
+     */
+    entity_inst_t entity;
+    /**
+     * Our Monitor.
+     */
+    Monitor *mon;
+    /**
+     * The Paxos version we are focusing on.
+     *
+     * @note This is not used at the moment. We are still assessing whether we
+     *      need it.
+     */
+    version_t version;
+    /**
+     * Timeout event. Its type and purpose varies depending on the situation.
+     */
+    Context *timeout;
+    /**
+     * Last key received during a sync effort.
+     *
+     * This field is mainly used by the sync requester to track the last
+     * received key, in case he needs to switch providers due to failure. The
+     * sync provider will also use this field whenever the requester specifies
+     * a last received key when requesting the provider to start sending his
+     * store chunks.
+     */
+    pair<string,string> last_received_key;
+    /**
+     * Hold the Store Synchronization related Sync State.
+     */
+    int sync_state;
+    /**
+     * The MonitorDBStore's chunk iterator instance we are currently using
+     * to obtain the store's chunks and pack them to the sync requester.
+     */
+    MonitorDBStore::Synchronizer synchronizer;
+    MonitorDBStore::Synchronizer paxos_synchronizer;
+    /* Should only be used for debugging purposes */
+    /**
+     * crc of the contents read from the store.
+     *
+     * @note may not always be available, as it is used only on specific
+     *      points in time during the sync process.
+     * @note depends on '--mon-sync-debug' being set.
+     */
+    __u32 crc;
+    /**
+     * Should be true if @p crc has been set.
+     */
+    bool crc_available;
+    /**
+     * Total synchronization attempts.
+     */
+    int attempts;
+
+    SyncEntityImpl(entity_inst_t &entity, Monitor *mon)
+      : entity(entity),
+       mon(mon),
+       version(0),
+       timeout(NULL),
+       sync_state(STATE_NONE),
+       crc_available(false),
+       attempts(0)
+    { }
+
+    /**
+     * Obtain current Sync State name.
+     *
+     * @returns Name of current sync state.
+     */
+    string get_state() {
+      switch (sync_state) {
+       case STATE_NONE: return "none";
+       case STATE_WHOLE: return "whole";
+       case STATE_PAXOS: return "paxos";
+       default: return "unknown";
+      }
+    }
+    /**
+     * Obtain the paxos version at which this sync started.
+     *
+     * @returns Paxos version at which this sync started
+     */
+    version_t get_version() {
+      return version;
+    }
+    /**
+     * Set a timeout event for this sync entity.
+     *
+     * @param event Timeout class to be called after @p fire_after seconds.
+     * @param fire_after Number of seconds until we fire the @p event event.
+     */
+    void set_timeout(Context *event, double fire_after) {
+      cancel_timeout();
+      timeout = event;
+      mon->timer.add_event_after(fire_after, timeout);
+    }
+    /**
+     * Cancel the currently set timeout, if any.
+     */
+    void cancel_timeout() {
+      if (timeout)
+       mon->timer.cancel_event(timeout);
+      timeout = NULL;
+    }
+    /**
+     * Initiate the required fields for obtaining chunks out of the
+     * MonitorDBStore.
+     *
+     * This function will initiate @p synchronizer with a chunk iterator whose
+     * scope is all the keys/values that belong to one of the sync targets
+     * (i.e., paxos services or paxos).
+     *
+     * Calling @p Monitor::sync_update() will be essential during the efforts
+     * of providing a correct store state to the requester, since we will need
+     * to eventually update the iterator in order to start packing the Paxos
+     * versions.
+     */
+    void sync_init() {
+      sync_state = STATE_WHOLE;
+      set<string> sync_targets = mon->get_sync_targets_names();
+
+      string prefix("paxos");
+      paxos_synchronizer = mon->store->get_synchronizer(prefix);
+      version = mon->paxos->get_version();
+      generic_dout(10) << __func__ << " version " << version << dendl;
+
+      synchronizer = mon->store->get_synchronizer(last_received_key,
+                                                 sync_targets);
+      sync_update();
+      assert(synchronizer->has_next_chunk());
+    }
+    /**
+     * Update the @p synchronizer chunk iterator, if needed.
+     *
+     * Whenever we reach the end of the iterator during @p STATE_WHOLE, we
+     * must update the @p synchronizer to an iterator focused on reading only
+     * Paxos versions. This is an essential part of the sync store approach,
+     * and it will guarantee that we end up with a consistent store.
+     */
+    void sync_update() {
+      assert(sync_state != STATE_NONE);
+      assert(synchronizer.use_count() != 0);
+
+      if (!synchronizer->has_next_chunk()) {
+       crc_set(synchronizer->crc());
+       if (sync_state == STATE_WHOLE) {
+          assert(paxos_synchronizer.use_count() != 0);
+         sync_state = STATE_PAXOS;
+          synchronizer = paxos_synchronizer;
+       }
+      }
+    }
+
+    /* For debug purposes only */
+    /**
+     * Check if we have a CRC available.
+     *
+     * @returns true if crc is available; false otherwise.
+     */
+    bool has_crc() {
+      return (g_conf->mon_sync_debug && crc_available);
+    }
+    /**
+     * Set @p crc to @p to_set
+     *
+     * @param to_set a crc value to set.
+     */
+    void crc_set(__u32 to_set) {
+      crc = to_set;
+      crc_available = true;
+    }
+    /**
+     * Get the current CRC value from @p crc
+     *
+     * @returns the currenct CRC value from @p crc
+     */
+    __u32 crc_get() {
+      return crc;
+    }
+    /**
+     * Clear the current CRC.
+     */
+    void crc_clear() {
+      crc_available = false;
+    }
+  };
+  typedef std::tr1::shared_ptr< SyncEntityImpl > SyncEntity;
+  /**
+   * Get a Monitor::SyncEntity instance.
+   *
+   * @param entity The monitor's entity instance that we want to associate
+   *              with this Monitor::SyncEntity.
+   * @param mon The Monitor.
+   *
+   * @returns A Monitor::SyncEntity
+   */
+  SyncEntity get_sync_entity(entity_inst_t &entity, Monitor *mon) {
+    return std::tr1::shared_ptr<SyncEntityImpl>(
+       new SyncEntityImpl(entity, mon));
+  }
+  /**
+   * Callback class responsible for dealing with the consequences of a sync
+   * process timing out.
+   */
+  struct C_SyncTimeout : public Context {
+    Monitor *mon;
+    entity_inst_t entity;
+
+    C_SyncTimeout(Monitor *mon, entity_inst_t &entity)
+      : mon(mon), entity(entity)
+    { }
+
+    void finish(int r) {
+      mon->sync_timeout(entity);
+    }
+  };
+  /**
+   * Map containing all the monitor entities to whom we are acting as sync
+   * providers.
+   */
+  map<entity_inst_t, SyncEntity> sync_entities;
+  /**
+   * RNG used for the sync (currently only used to pick random monitors)
+   */
+  SimpleRNG sync_rng;
+  /**
+   * Obtain random monitor from the monmap.
+   *
+   * @param other Any monitor other than the one with rank @p other
+   * @returns The picked monitor's name.
+   */
+  string _pick_random_mon(int other = -1);
+  int _pick_random_quorum_mon(int other = -1);
+  /**
+   * Deal with the consequences of @p entity's sync timing out.
+   *
+   * @note Both the sync provider and the sync requester make use of this
+   *      function, since both use the @p Monitor::C_SyncTimeout callback.
+   *
+   * Being the sync provider, whenever a Monitor::C_SyncTimeout is triggered,
+   * we only have to clean up the sync requester's state we are maintaining.
+   *
+   * Being the sync requester, we will have to choose a new sync provider, and
+   * resume our sync from where it was left.
+   *
+   * @param entity Entity instance of the monitor whose sync has timed out.
+   */
+  void sync_timeout(entity_inst_t &entity);
+  /**
+   * Cleanup the state we, the provider, are keeping during @p entity's sync.
+   *
+   * @param entity Entity instance of the monitor whose sync state we are
+   *              cleaning up.
+   */
+  void sync_provider_cleanup(entity_inst_t &entity);
+  /**
+   * Handle a Sync Start Chunks request from a sync requester.
+   *
+   * This request will create the necessary state our the provider's end, and
+   * the provider will then be able to send chunks of his own store to the
+   * requester.
+   *
+   * @param m Sync message with operation type MMonSync::OP_START_CHUNKS
+   */
+  void handle_sync_start_chunks(MMonSync *m);
+  /**
+   * Handle a requester's reply to the last chunk we sent him.
+   *
+   * We will only send a new chunk to the sync requester once he has acked the
+   * reception of the last chunk we sent them.
+   *
+   * That's also how we will make sure that, on their end, they became aware
+   * that there are no more chunks to send (since we shall tag a message with
+   * MMonSync::FLAG_LAST when we are sending them the last chunk of all),
+   * allowing us to clean up the requester's state.
+   *
+   * @param m Sync message with operation type MMonSync::OP_CHUNK_REPLY
+   */
+  void handle_sync_chunk_reply(MMonSync *m);
+  /**
+   * Send a chunk to the sync entity represented by @p sync.
+   *
+   * This function will send the next chunk available on the synchronizer. If
+   * it happens to be the last chunk, then the message shall be marked as
+   * such using MMonSync::FLAG_LAST.
+   *
+   * @param sync A Monitor::SyncEntity representing a sync requester monitor.
+   */
+  void sync_send_chunks(SyncEntity sync);
+  /**
+   * @} // Synchronization Provider-specific
+   */
+  /**
+   * @defgroup Synchronization Requester-specific
+   * @{
+   */
+  /**
+   * The state in which we (the sync leader, provider or requester) are in
+   * regard to our sync process (if we are the requester) or any entity that
+   * we may be leading or providing to.
+   */
+  enum {
+    /**
+     * We are not part of any synchronization effort, or it has not began yet.
+     */
+    SYNC_STATE_NONE   = 0,
+    /**
+     * We have started our role in the synchronization.
+     *
+     * This state may have multiple meanings, depending on which entity is
+     * employing it and within which context.
+     *
+     * For instance, the leader will consider a sync requester to enter
+     * SYNC_STATE_START whenever it receives a MMonSync::OP_START from the
+     * said requester. On the other hand, the provider will consider that the
+     * requester enters this state after receiving a MMonSync::OP_START_CHUNKS.
+     * The sync requester will enter this state as soon as it begins its sync
+     * efforts.
+     */
+    SYNC_STATE_START  = 1,
+    /**
+     * We are synchronizing chunks.
+     *
+     * This state is not used by the sync leader; only the sync requester and
+     * the sync provider will.
+     */
+    SYNC_STATE_CHUNKS = 2,
+    /**
+     * We are stopping the sync effort.
+     */
+    SYNC_STATE_STOP   = 3
+  };
+  /**
+   * The current sync state.
+   *
+   * This field is only used by the sync requester, being the only one that
+   * will take this state as part of its global state. The sync leader and the
+   * sync provider will only associate sync states to other entities (i.e., to
+   * sync requesters), and those shall be kept in the @p sync_entities_states
+   * map.
+   */
+  int sync_state;
+  /**
+   * Callback class responsible for dealing with the consequences of the sync
+   * requester not receiving a MMonSync::OP_START_REPLY in a timely manner.
+   */
+  struct C_SyncStartTimeout : public Context {
+    Monitor *mon;
+
+    C_SyncStartTimeout(Monitor *mon)
+      : mon(mon)
+    { }
+
+    void finish(int r) {
+      mon->sync_start_reply_timeout();
+    }
+  };
+  /**
+   * Callback class responsible for retrying a Sync Start after a given
+   * backoff period, whenever the Sync Leader flags a MMonSync::OP_START_REPLY
+   * with the MMonSync::FLAG_RETRY flag.
+   */
+  struct C_SyncStartRetry : public Context {
+    Monitor *mon;
+    entity_inst_t entity;
+
+    C_SyncStartRetry(Monitor *mon, entity_inst_t &entity)
+      : mon(mon), entity(entity)
+    { }
+
+    void finish(int r) {
+      mon->bootstrap();
+    }
+  };
+  /**
+   * We use heartbeats to check if both the Leader and the Synchronization
+   * Requester are both still alive, so we can determine if we should continue
+   * with the synchronization process, granted that trim is disabled.
+   */
+  struct C_HeartbeatTimeout : public Context {
+    Monitor *mon;
+
+    C_HeartbeatTimeout(Monitor *mon)
+      : mon(mon)
+    { }
+
+    void finish(int r) {
+      mon->sync_requester_abort();
+    }
+  };
+  /**
+   * Callback class responsible for sending a heartbeat message to the sync
+   * leader. We use this callback to keep an assynchronous heartbeat with
+   * the sync leader at predefined intervals.
+   */
+  struct C_HeartbeatInterval : public Context {
+    Monitor *mon;
+    entity_inst_t entity;
+
+    C_HeartbeatInterval(Monitor *mon, entity_inst_t &entity)
+      : mon(mon), entity(entity)
+    { }
+
+    void finish(int r) {
+      mon->sync_leader->set_timeout(new C_HeartbeatTimeout(mon),
+                                   g_conf->mon_sync_heartbeat_timeout);
+      mon->sync_send_heartbeat(entity);
+    }
+  };
+  /**
+   * Callback class responsible for dealing with the consequences of never
+   * receiving a reply to a MMonSync::OP_FINISH sent to the sync leader.
+   */
+  struct C_SyncFinishReplyTimeout : public Context {
+    Monitor *mon;
+
+    C_SyncFinishReplyTimeout(Monitor *mon)
+      : mon(mon)
+    { }
+
+    void finish(int r) {
+      mon->sync_finish_reply_timeout();
+    }
+  };
+  /**
+   * The entity we, the sync requester, consider to be our sync leader. If
+   * there is a formed quorum, the @p sync_leader should represent the actual
+   * cluster Leader; otherwise, it can be any monitor and will likely be the
+   * same as @p sync_provider.
+   */
+  SyncEntity sync_leader;
+  /**
+   * The entity we, the sync requester, are synchronizing against. This entity
+   * will be our source of store chunks, and we will ultimately obtain a store
+   * state equal (or very similar, maybe off by a couple of versions) as their
+   * own.
+   */
+  SyncEntity sync_provider;
+  /**
+   * Clean up the Sync Requester's state (both in-memory and in-store).
+   */
+  void sync_requester_cleanup();
+  /**
+   * Abort the current sync effort.
+   *
+   * This will be translated into a MMonSync::OP_ABORT sent to the sync leader
+   * and to the sync provider, and ultimately it will also involve calling
+   * @p Monitor::sync_requester_cleanup() to clean up our current sync state.
+   */
+  void sync_requester_abort();
+  /**
+   * Deal with a timeout while waiting for a MMonSync::OP_FINISH_REPLY.
+   *
+   * This will be assumed as a leader failure, and having been exposed to the
+   * side-effects of a new Leader being elected, we have no other choice but
+   * to abort our sync process and start fresh.
+   */
+  void sync_finish_reply_timeout();
+  /**
+   * Deal with a timeout while waiting for a MMonSync::OP_START_REPLY.
+   *
+   * This will be assumed as a leader failure. Since we didn't get to do
+   * much work (as we haven't even started our sync), we will simply bootstrap
+   * and start off fresh with a new sync leader.
+   */
+  void sync_start_reply_timeout();
+  /**
+   * Start the synchronization efforts.
+   *
+   * This function should be called whenever we find the need to synchronize
+   * our store state with the remaining cluster.
+   *
+   * Starting the sync process means that we will have to request the cluster
+   * Leader (if there is a formed quorum) to stop trimming the Paxos state and
+   * allow us to start synchronizing with the sync provider we picked.
+   *
+   * @param entity An entity instance referring to the sync provider we picked.
+   */
+  void sync_start(entity_inst_t &entity);
+  /**
+   * Request the provider to start sending the chunks of his store, in order
+   * for us to obtain a consistent store state similar to the one shared by
+   * the cluster.
+   *
+   * @param provider The SyncEntity representing the Sync Provider.
+   */
+  void sync_start_chunks(SyncEntity provider);
+  /**
+   * Handle a MMonSync::OP_START_REPLY sent by the Sync Leader.
+   *
+   * Reception of this message may be twofold: if it was marked with the
+   * MMonSync::FLAG_RETRY flag, we must backoff for a while and retry starting
+   * the sync at a later time; otherwise, we have the green-light to request
+   * the Sync Provider to start sharing his chunks with us.
+   *
+   * @param m Sync message with operation type MMonSync::OP_START_REPLY
+   */
+  void handle_sync_start_reply(MMonSync *m);
+  /**
+   * Handle a Heartbeat reply sent by the Sync Leader.
+   *
+   * We use heartbeats to keep the Sync Leader aware that we are keeping our
+   * sync efforts alive. We also use them to make sure our Sync Leader is
+   * still alive. If the Sync Leader fails, we will have to abort our on-going
+   * sync, or we could incurr in an inconsistent store state due to a trim on
+   * the Paxos state of the monitor provinding us with his store chunks.
+   *
+   * @param m Sync message with operation type MMonSync::OP_HEARTBEAT_REPLY
+   */
+  void handle_sync_heartbeat_reply(MMonSync *m);
+  /**
+   * Handle a chunk sent by the Sync Provider.
+   *
+   * We will receive the Sync Provider's store in chunks. These are encoded
+   * in bufferlists containing a transaction that will be directly applied
+   * onto our MonitorDBStore.
+   *
+   * Whenever we receive such a message, we must reply to the Sync Provider,
+   * as a way of acknowledging the reception of its last chunk. If the message
+   * is tagged with a MMonSync::FLAG_LAST, we can then consider we have
+   * received all the chunks the Sync Provider had to offer, and finish our
+   * sync efforts with the Sync Leader.
+   *
+   * @param m Sync message with operation type MMonSync::OP_CHUNK
+   */
+  void handle_sync_chunk(MMonSync *m);
+  /**
+   * Handle a reply sent by the Sync Leader to a MMonSync::OP_FINISH.
+   *
+   * As soon as we receive this message, we know we finally have a store state
+   * consistent with the remaining cluster (give or take a couple of versions).
+   * We may then bootstrap and attempt to join the other monitors in the
+   * cluster.
+   *
+   * @param m Sync message with operation type MMonSync::OP_FINISH_REPLY
+   */
+  void handle_sync_finish_reply(MMonSync *m);
+  /**
+   * Stop our synchronization effort by sending a MMonSync::OP_FINISH to the
+   * Sync Leader.
+   *
+   * Once we receive the last chunk from the Sync Provider, we are in
+   * conditions of officially finishing our sync efforts. With that purpose in
+   * mind, we must then send a MMonSync::OP_FINISH to the Leader, letting him
+   * know that we no longer require the Paxos state to be preserved.
+   */
+  void sync_stop();
+  /**
+   * @} // Synchronization Requester-specific
+   */
+  const string get_sync_state_name(int s) const {
+    switch (s) {
+    case SYNC_STATE_NONE: return "none";
+    case SYNC_STATE_START: return "start";
+    case SYNC_STATE_CHUNKS: return "chunks";
+    case SYNC_STATE_STOP: return "stop";
+    }
+    return "???";
+  }
+  /**
+   * Obtain a string describing the current Sync State.
+   *
+   * @returns A string describing the current Sync State, if any, or an empty
+   *         string if no sync (or sync effort we know of) is in progress.
+   */
+  const string get_sync_state_name() const {
+    string sn;
+
+    if (sync_role == SYNC_ROLE_NONE)
+      return "";
+
+    sn.append(" sync(");
+
+    if (sync_role & SYNC_ROLE_LEADER)
+      sn.append(" leader");
+    if (sync_role & SYNC_ROLE_PROVIDER)
+      sn.append(" provider");
+    if (sync_role & SYNC_ROLE_REQUESTER)
+      sn.append(" requester");
+
+    sn.append(" state ");
+    sn.append(get_sync_state_name(sync_state));
+
+    sn.append(" )");
+
+    return sn;
+  }
+
+  /**
+   * @} // Synchronization
+   */
 
   list<Context*> waitfor_quorum;
   list<Context*> maybe_wait_for_quorum;
@@ -256,7 +1136,7 @@ private:
    */
 
 
-  Context *probe_timeout_event;  // for probing and slurping states
+  Context *probe_timeout_event;  // for probing
 
   struct C_ProbeTimeout : public Context {
     Monitor *mon;
@@ -270,9 +1150,6 @@ private:
   void cancel_probe_timeout();
   void probe_timeout(int r);
 
-  void slurp();
-
 public:
   epoch_t get_epoch();
   int get_leader() { return leader; }
@@ -354,6 +1231,8 @@ public:
   bool _allowed_command(MonSession *s, const vector<std::string>& cmd);
   void _mon_status(ostream& ss);
   void _quorum_status(ostream& ss);
+  void _sync_status(ostream& ss);
+  void _sync_force(ostream& ss);
   void _add_bootstrap_peer_hint(string cmd, string args, ostream& ss);
   void handle_command(class MMonCommand *m);
   void handle_route(MRoute *m);
@@ -370,6 +1249,9 @@ public:
   void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
   void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version);
 
+  /**
+   * Handle Synchronization-related messages.
+   */
   void handle_probe(MMonProbe *m);
   /**
    * Handle a Probe Operation, replying with our name, quorum and known versions.
@@ -387,20 +1269,6 @@ public:
    */
   void handle_probe_probe(MMonProbe *m);
   void handle_probe_reply(MMonProbe *m);
-  void handle_probe_slurp(MMonProbe *m);
-  void handle_probe_slurp_latest(MMonProbe *m);
-  void handle_probe_data(MMonProbe *m);
-  /**
-   * Given an MMonProbe and associated Paxos machine, create a reply,
-   * fill it with the missing Paxos states and current commit pointers
-   *
-   * @param m The incoming MMonProbe. We use this to determine the range
-   * of paxos states to include in the reply.
-   * @param pax The Paxos state machine which m is associated with.
-   *
-   * @returns A new MMonProbe message, initialized as OP_DATA, and filled
-   * with the necessary Paxos states. */
-  MMonProbe *fill_probe_data(MMonProbe *m, Paxos *pax);
 
   // request routing
   struct RoutedRequest {
@@ -501,6 +1369,7 @@ public:
 
   int preinit();
   int init();
+  void init_paxos();
   void shutdown();
   void tick();
 
index 8463b50a353d0babfd3c68d54bc403683a05d5a4..d5cd3254ae0037869abe3959817c1e4e108f637d 100644 (file)
@@ -121,7 +121,11 @@ class MonitorDBStore
     }
 
     bool empty() {
-      return (ops.size() == 0);
+      return (size() == 0);
+    }
+
+    bool size() {
+      return ops.size();
     }
 
     void dump(ceph::Formatter *f) {
index add582ea263c5edf97141e1b232c910b73cc62bf..802c7a8e0f38a65c847b8e69ab5e3b6b6bfc7c4b 100644 (file)
@@ -49,6 +49,38 @@ MonitorDBStore *Paxos::get_store()
   return mon->store;
 }
 
+
+void Paxos::apply_version(MonitorDBStore::Transaction &tx, version_t v)
+{
+  bufferlist bl;
+  int err = get_store()->get(get_name(), v, bl);
+  assert(err == 0);
+  assert(bl.length());
+  decode_append_transaction(tx, bl);
+}
+
+void Paxos::reapply_all_versions()
+{
+  version_t first = get_store()->get(get_name(), "first_committed");
+  version_t last = get_store()->get(get_name(), "last_committed");
+  dout(10) << __func__ << " first " << first << " last " << last << dendl;
+
+  MonitorDBStore::Transaction tx;
+  for (version_t v = first; v <= last; ++v) {
+    dout(30) << __func__ << " apply version " << v << dendl;
+    apply_version(tx, v);
+  }
+  dout(15) << __func__ << " total versions " << (last-first) << dendl;
+
+  JSONFormatter f(true);
+  tx.dump(&f);
+  dout(30) << __func__ << " tx dump:\n";
+  f.flush(*_dout);
+  *_dout << dendl;
+
+  get_store()->apply_transaction(tx);
+}
+
 void Paxos::init()
 {
   // load paxos variables from stable storage
index ca1304365f0ff6dc73f71d87b008672f9ba0c9b7..61da2c3fa1939317dcb524cd7e54d31d53be4b3b 100644 (file)
@@ -1024,6 +1024,9 @@ public:
 
   void dispatch(PaxosServiceMessage *m);
 
+  void reapply_all_versions();
+  void apply_version(MonitorDBStore::Transaction &tx, version_t v);
+
   void init();
   /**
    * This function runs basic consistency checks. Importantly, if