]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: implement a simpler sync
authorSage Weil <sage@inktank.com>
Tue, 9 Jul 2013 01:13:31 +0000 (18:13 -0700)
committerSage Weil <sage@inktank.com>
Tue, 9 Jul 2013 18:05:47 +0000 (11:05 -0700)
The previous sync implementation was highly stateful and very complex.
This made it very hard to understand and to debug, and there were bugs
still lurking in the timeout code (at least).

Replace it with something much simpler:

 - sync providers are almost stateless.  they keep an iterator, identified
   by a unique cookie, that times out in a simple way.
 - sync requesters sync from whomever they fancy.  namely anyone with newer
   committed paxos state.

There are a few extra fields that might allow sync continuation later, but
this is complex and not necessary at this point.

Signed-off-by: Sage Weil <sage@inktank.com>
src/common/config_opts.h
src/messages/MMonSync.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/MonitorDBStore.h
src/mon/Paxos.cc
src/mon/Paxos.h

index fb36058e4ba31a9eb5ad2945a043b78e76679289..ec12228413dbcf902ab21910c9aef0d2119f2484 100644 (file)
@@ -171,17 +171,13 @@ OPTION(mon_health_data_update_interval, OPT_FLOAT, 60.0)
 OPTION(mon_data_avail_crit, OPT_INT, 5)
 OPTION(mon_data_avail_warn, OPT_INT, 30)
 OPTION(mon_config_key_max_entry_size, OPT_INT, 4096) // max num bytes per config-key entry
-OPTION(mon_sync_trim_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_heartbeat_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_heartbeat_interval, OPT_DOUBLE, 5.0)
-OPTION(mon_sync_backoff_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_timeout, OPT_DOUBLE, 30.0)
-OPTION(mon_sync_max_retries, OPT_INT, 5)
+OPTION(mon_sync_timeout, OPT_DOUBLE, 60.0)
 OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB)
 OPTION(mon_sync_debug, OPT_BOOL, false) // enable sync-specific debug
 OPTION(mon_sync_debug_leader, OPT_INT, -1) // monitor to be used as the sync leader
 OPTION(mon_sync_debug_provider, OPT_INT, -1) // monitor to be used as the sync provider
 OPTION(mon_sync_debug_provider_fallback, OPT_INT, -1) // monitor to be used as fallback if sync provider fails
+OPTION(mon_inject_sync_get_chunk_delay, OPT_DOUBLE, 0)  // inject N second delay on each get_chunk request
 OPTION(mon_osd_min_down_reporters, OPT_INT, 1)   // number of OSDs who need to report a down OSD for it to count
 OPTION(mon_osd_min_down_reports, OPT_INT, 3)     // number of times a down OSD must be reported for it to count
 
@@ -189,8 +185,7 @@ OPTION(mon_osd_min_down_reports, OPT_INT, 3)     // number of times a down OSD m
 OPTION(mon_debug_dump_transactions, OPT_BOOL, false)
 OPTION(mon_debug_dump_location, OPT_STR, "/var/log/ceph/$cluster-$name.tdump")
 
-OPTION(mon_sync_leader_kill_at, OPT_INT, 0) // kill the sync leader at a specifc point in the work flow
-OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a specific point in the work flow
+OPTION(mon_sync_provider_kill_at, OPT_INT, 0)  // kill the sync provider at a specific point in the work flow
 OPTION(mon_sync_requester_kill_at, OPT_INT, 0) // kill the sync requester at a specific point in the work flow
 OPTION(mon_leveldb_write_buffer_size, OPT_U64, 32*1024*1024) // monitor's leveldb write buffer size
 OPTION(mon_leveldb_cache_size, OPT_U64, 512*1024*1024) // monitor's leveldb cache size
index 6183578e167082fc20d429f00081cb511b205a4e..a5415a8f45157485e1414cdc21606daeb53fe8b2 100644 (file)
 
 class MMonSync : public Message
 {
-  static const int HEAD_VERSION = 1;
-  static const int COMPAT_VERSION = 1;
+  static const int HEAD_VERSION = 2;
+  static const int COMPAT_VERSION = 2;
 
 public:
   /**
   * Operation types
   */
   enum {
-    /**
-    * Start synchronization request
-    * (mon.X -> Leader)
-    */
-    OP_START           = 1,
-    /**
-     * Reply to an OP_START
-     * (Leader -> mon.X)
-     */
-    OP_START_REPLY     = 2,
-    /**
-     * Let the Leader know we are still synchronizing
-     * (mon.X -> Leader)
-     */
-    OP_HEARTBEAT       = 3,
-    /**
-     * Reply to a hearbeat
-     * (Leader -> mon.X)
-     */
-    OP_HEARTBEAT_REPLY = 4,
-    /**
-     * Let the Leader know we finished synchronizing
-     * (mon.X -> Leader)
-     */
-    OP_FINISH          = 5,
-    /**
-     * Request a given monitor (mon.Y) to start synchronizing with us, hence
-     * sending us chunks.
-     * (mon.X -> mon.Y)
-     */
-    OP_START_CHUNKS    = 6,
-    /**
-     * Send a chunk to a given monitor (mon.X)
-     * (mon.Y -> mon.X)
-     */
-    OP_CHUNK           = 7,
-    /**
-     * Acknowledge that we received the last chunk sent
-     * (mon.X -> mon.Y)
-     */
-    OP_CHUNK_REPLY     = 8,
-    /**
-     * Reply to an OP_FINISH
-     * (Leader -> mon.X)
-     */
-    OP_FINISH_REPLY    = 9,
-    /**
-     * Let the receiver know that he should abort whatever he is in the middle
-     * of doing with the sender.
-     */
-    OP_ABORT           = 10,
+    OP_GET_COOKIE_FULL = 1,   // -> start a session (full scan)
+    OP_GET_COOKIE_RECENT = 2, // -> start a session (only recent paxos events)
+    OP_COOKIE = 3,            // <- pass the iterator cookie, or
+    OP_GET_CHUNK = 4,         // -> get some keys
+    OP_CHUNK = 5,             // <- return some keys
+    OP_LAST_CHUNK = 6,        // <- return the last set of keys
+    OP_NO_COOKIE = 8,         // <- sorry, no cookie
   };
 
-  /**
-  * Chunk is the last available
-  */
-  const static uint8_t FLAG_LAST      = 0x01;
- /**
-  * Let the other monitor it should retry again its last operation.
-  */
-  const static uint8_t FLAG_RETRY     = 0x02;
-  /**
-   * This message contains a crc
-   */
-  const static uint8_t FLAG_CRC              = 0x04;
-  /**
-   * Do not reply to this message to the sender, but to @p reply_to.
-   */
-  const static uint8_t FLAG_REPLY_TO  = 0x08;
-
   /**
   * Obtain a string corresponding to the operation type @p op
   *
@@ -103,119 +42,68 @@ public:
   */
   static const char *get_opname(int op) {
     switch (op) {
-    case OP_START: return "start";
-    case OP_START_REPLY: return "start_reply";
-    case OP_HEARTBEAT: return "heartbeat";
-    case OP_HEARTBEAT_REPLY: return "heartbeat_reply";
-    case OP_FINISH: return "finish";
-    case OP_FINISH_REPLY: return "finish_reply";
-    case OP_START_CHUNKS: return "start_chunks";
+    case OP_GET_COOKIE_FULL: return "get_cookie_full";
+    case OP_GET_COOKIE_RECENT: return "get_cookie_recent";
+    case OP_COOKIE: return "cookie";
+    case OP_GET_CHUNK: return "get_chunk";
     case OP_CHUNK: return "chunk";
-    case OP_CHUNK_REPLY: return "chunk_reply";
-    case OP_ABORT: return "abort";
+    case OP_LAST_CHUNK: return "last_chunk";
+    case OP_NO_COOKIE: return "no_cookie";
     default: assert("unknown op type"); return NULL;
     }
   }
 
   uint32_t op;
-  uint8_t flags;
-  version_t version;
-  bufferlist chunk_bl;
+  uint64_t cookie;
+  version_t last_committed;
   pair<string,string> last_key;
-  __u32 crc;
+  bufferlist chunk_bl;
   entity_inst_t reply_to;
 
   MMonSync()
     : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION)
   { }
 
-  MMonSync(uint32_t op)
-    : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION),
-      op(op), flags(0), version(0), crc(0)
-  { }
-
-  MMonSync(uint32_t op, bufferlist bl, uint8_t flags = 0) 
-    : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION),
-      op(op), flags(flags), version(0), chunk_bl(bl), crc(0)
-  { }
-
-  MMonSync(MMonSync *m)
+  MMonSync(uint32_t op, uint64_t c = 0)
     : Message(MSG_MON_SYNC, HEAD_VERSION, COMPAT_VERSION),
-      op(m->op), flags(m->flags), version(m->version),
-      chunk_bl(m->chunk_bl), last_key(m->last_key),
-      crc(m->crc), reply_to(m->reply_to)
+      op(op),
+      cookie(c),
+      last_committed(0)
   { }
 
-  /**
-  * Obtain this message type's name */
   const char *get_type_name() const { return "mon_sync"; }
 
-  void set_reply_to(entity_inst_t other) {
-    reply_to = other;
-    flags |= FLAG_REPLY_TO;
-  }
-
-  /**
-  * Print this message in a pretty format to @p out
-  *
-  * @param out The output stream to output to
-  */
   void print(ostream& out) const {
-    out << "mon_sync( " << get_opname(op);
-
-    if (version > 0)
-      out << " v " << version;
-
-    if (flags) {
-      out << " flags( ";
-      if (flags & FLAG_LAST)
-       out << "last ";
-      if (flags & FLAG_RETRY)
-       out << "retry ";
-      if (flags & FLAG_CRC)
-       out << "crc(" << crc << ") ";
-      if (flags & FLAG_REPLY_TO)
-       out << "reply-to(" << reply_to << ") ";
-      out << ")";
-    }
-
+    out << "mon_sync(" << get_opname(op);
+    if (cookie)
+      out << " cookie " << cookie;
+    if (last_committed > 0)
+      out << " lc " << last_committed;
     if (chunk_bl.length())
       out << " bl " << chunk_bl.length() << " bytes";
-
-    if (!last_key.first.empty() || !last_key.second.empty()) {
-      out << " last_key ( " << last_key.first << ","
-         << last_key.second << " )";
-    }
-
-    out << " )";       
+    if (!last_key.first.empty() || !last_key.second.empty())
+      out << " last_key " << last_key.first << "," << last_key.second;
+    out << ")";
   }
 
-  /**
-  * Encode this message into the Message's payload
-  */
   void encode_payload(uint64_t features) {
     ::encode(op, payload);
-    ::encode(flags, payload);
-    ::encode(version, payload);
-    ::encode(chunk_bl, payload);
+    ::encode(cookie, payload);
+    ::encode(last_committed, payload);
     ::encode(last_key.first, payload);
     ::encode(last_key.second, payload);
-    ::encode(crc, payload);
+    ::encode(chunk_bl, payload);
     ::encode(reply_to, payload);
   }
 
-  /**
-  * Decode the message's payload into this message
-  */
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(op, p);
-    ::decode(flags, p);
-    ::decode(version, p);
-    ::decode(chunk_bl, p);
+    ::decode(cookie, p);
+    ::decode(last_committed, p);
     ::decode(last_key.first, p);
     ::decode(last_key.second, p);
-    ::decode(crc, p);
+    ::decode(chunk_bl, p);
     ::decode(reply_to, p);
   }
 };
index 2e81a9da73602dbb62b92ffde5ba0188cace09d9..fa858b734c689339690a79c5ed6e91440b398810 100644 (file)
@@ -144,14 +144,12 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
   leader(0),
   quorum_features(0),
 
-  // trim & store sync
-  sync_role(SYNC_ROLE_NONE),
-  trim_lock("Monitor::trim_lock"),
-  trim_enable_timer(NULL),
-  sync_rng(getpid()),
-  sync_state(SYNC_STATE_NONE),
-  sync_leader(),
-  sync_provider(),
+  // sync state
+  sync_provider_count(0),
+  sync_cookie(0),
+  sync_full(false),
+  sync_start_version(0),
+  sync_timeout_event(NULL),
 
   timecheck_round(0),
   timecheck_acks(0),
@@ -240,8 +238,6 @@ void Monitor::do_admin_command(string command, string args, ostream& ss)
     _mon_status(ss);
   else if (command == "quorum_status")
     _quorum_status(ss);
-  else if (command == "sync_status")
-    _sync_status(ss);
   else if (command == "sync_force") {
     if (args != "--yes-i-really-mean-it") {
       ss << "are you SURE? this will mean the monitor store will be erased "
@@ -249,7 +245,7 @@ void Monitor::do_admin_command(string command, string args, ostream& ss)
             "'--yes-i-really-mean-it' if you really do.";
       return;
     }
-    _sync_force(ss);
+    sync_force(ss);
   } else if (command.find("add_bootstrap_peer_hint") == 0)
     _add_bootstrap_peer_hint(command, args, ss);
   else
@@ -484,9 +480,6 @@ int Monitor::preinit()
   r = admin_socket->register_command("quorum_status", "quorum_status",
                                     admin_hook, "show current quorum status");
   assert(r == 0);
-  r = admin_socket->register_command("sync_status", "sync_status", admin_hook,
-                                    "show current synchronization status");
-  assert(r == 0);
   r = admin_socket->register_command("add_bootstrap_peer_hint",
                                     "add_bootstrap_peer_hint name=addr,type=CephIPAddr",
                                     admin_hook,
@@ -582,7 +575,6 @@ void Monitor::shutdown()
     AdminSocket* admin_socket = cct->get_admin_socket();
     admin_socket->unregister_command("mon_status");
     admin_socket->unregister_command("quorum_status");
-    admin_socket->unregister_command("sync_status");
     admin_socket->unregister_command("add_bootstrap_peer_hint");
     delete admin_hook;
     admin_hook = NULL;
@@ -625,6 +617,7 @@ void Monitor::bootstrap()
 {
   dout(10) << "bootstrap" << dendl;
 
+  sync_reset();
   unregister_cluster_logger();
   cancel_probe_timeout();
 
@@ -646,8 +639,6 @@ void Monitor::bootstrap()
     messenger->mark_down_all();
   }
 
-  reset_sync();
-
   // reset
   state = STATE_PROBING;
 
@@ -738,7 +729,12 @@ void Monitor::reset()
   health_monitor->finish();
 }
 
-set<string> Monitor::get_sync_targets_names() {
+
+// -----------------------------------------------------------
+// sync
+
+set<string> Monitor::get_sync_targets_names()
+{
   set<string> targets;
   targets.insert(paxos->get_name());
   for (int i = 0; i < PAXOS_NUM; ++i)
@@ -747,606 +743,11 @@ set<string> Monitor::get_sync_targets_names() {
   return targets;
 }
 
-/**
- * Reset any lingering sync/trim informations we might have.
- */
-void Monitor::reset_sync(bool abort)
-{
-  dout(10) << __func__ << dendl;
-  // clear everything trim/sync related
-  {
-    map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
-    for (; iter != trim_timeouts.end(); ++iter) {
-      if (!iter->second)
-        continue;
-
-      timer.cancel_event(iter->second);
-      if (abort) {
-        MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
-        entity_inst_t other = iter->first;
-        messenger->send_message(msg, other);
-      }
-    }
-    trim_timeouts.clear();
-  }
-  {
-    map<entity_inst_t,SyncEntity>::iterator iter = sync_entities.begin();
-    for (; iter != sync_entities.end(); ++iter) {
-      (*iter).second->cancel_timeout();
-    }
-    sync_entities.clear();
-  }
-
-  sync_entities_states.clear();
-  trim_entities_states.clear();
-
-  sync_leader.reset();
-  sync_provider.reset();
-
-  sync_state = SYNC_STATE_NONE;
-  sync_role = SYNC_ROLE_NONE;
-}
-
-// leader
-
-void Monitor::sync_send_heartbeat(entity_inst_t &other, bool reply)
-{
-  dout(10) << __func__ << " " << other << " reply(" << reply << ")" << dendl;
-  uint32_t op = (reply ? MMonSync::OP_HEARTBEAT_REPLY : MMonSync::OP_HEARTBEAT);
-  MMonSync *msg = new MMonSync(op);
-  messenger->send_message(msg, other);
-}
-
-void Monitor::handle_sync_start(MMonSync *m)
-{
-  dout(10) << __func__ << " " << *m << dendl;
-
-  /* If we are not the leader, then some monitor picked us as the point of
-   * entry to the quorum during its synchronization process. Therefore, we
-   * have an obligation of forwarding this message to leader, so the sender
-   * can start synchronizing.
-   */
-  if (!is_leader() && !quorum.empty()) {
-    assert(!(sync_role & SYNC_ROLE_REQUESTER));
-    assert(!(sync_role & SYNC_ROLE_LEADER));
-    assert(!is_synchronizing());
-
-    entity_inst_t leader = monmap->get_inst(get_leader());
-    MMonSync *msg = new MMonSync(m);
-    // keep forwarding the message up the chain if it has already been
-    // forwarded.
-    if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
-      msg->set_reply_to(m->get_source_inst());
-    }
-    dout(10) << __func__ << " forward " << *m
-            << " to leader at " << leader << dendl;
-    assert(g_conf->mon_sync_provider_kill_at != 1);
-    messenger->send_message(msg, leader);
-    assert(g_conf->mon_sync_provider_kill_at != 2);
-    m->put();
-    return;
-  }
-
-  // If we are synchronizing, then it means that we know someone who has a
-  // higher version than the one we have; and if someone attempted to sync
-  // from us, then that must mean they have a lower version than us.
-  // Therefore, they must be much more interested in synchronizing from the
-  // one we are trying to synchronize from than they are from us.
-  // Moreover, if we are already synchronizing under the REQUESTER role, then
-  // we must know someone who is in the quorum, either because we were lucky
-  // enough to contact them in the first place or we managed to contact
-  // someone who knew who they were. Therefore, just forward this request to
-  // our sync leader.
-  if (is_synchronizing()) {
-    assert(!(sync_role & SYNC_ROLE_LEADER));
-    assert(!(sync_role & SYNC_ROLE_PROVIDER));
-    assert(quorum.empty());
-    assert(sync_leader.get() != NULL);
-
-    /**
-     * This looks a bit odd, but we've seen cases where sync start messages
-     * get bounced around and end up at the originator without anybody
-     * noticing!* If it happens, just drop the message and the timeouts
-     * will clean everything up -- eventually.
-     * [*] If a leader gets elected who is too far behind, he'll drop into
-     * bootstrap and sync, but the person he sends his sync to thinks he's
-     * still the leader and forwards the reply back.
-     */
-    if (m->reply_to == messenger->get_myinst()) {
-      m->put();
-      return;
-    }
-
-    dout(10) << __func__ << " forward " << *m
-             << " to our sync leader at "
-             << sync_leader->entity << dendl;
-
-    MMonSync *msg = new MMonSync(m);
-    // keep forwarding the message up the chain if it has already been
-    // forwarded.
-    if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
-      msg->set_reply_to(m->get_source_inst());
-    }
-    messenger->send_message(msg, sync_leader->entity);
-    m->put();
-    return;
-  }
-
-  // At this point we may or may not be the leader.  If we are not the leader,
-  // it means that we are not in the quorum, but someone would still very much
-  // like to synchronize with us.  In certain circumstances, letting them
-  // synchronize with us is by far our only option -- for instance, when we
-  // need them to form a quorum and they have started fresh or are severely
-  // out of date --, but it won't hurt to let them sync from us anyway: if
-  // they chose us, then they must have noticed that we had a higher version
-  // than they do, so it makes sense to let them try their luck and join the
-  // party.
-
-  Mutex::Locker l(trim_lock);
-  entity_inst_t other =
-    (m->flags & MMonSync::FLAG_REPLY_TO ? m->reply_to : m->get_source_inst());
-
-  assert(g_conf->mon_sync_leader_kill_at != 1);
-
-  if (trim_timeouts.count(other) > 0) {
-    dout(1) << __func__ << " sync session already in progress for " << other
-           << dendl;
-
-    if (trim_entities_states[other] != SYNC_STATE_NONE) {
-      dout(1) << __func__ << "    ignore stray message" << dendl;
-      m->put();
-      return;
-    }
-
-    dout(1) << __func__<< "    destroying current state and creating new"
-           << dendl;
-
-    if (trim_timeouts[other])
-      timer.cancel_event(trim_timeouts[other]);
-    trim_timeouts.erase(other);
-    trim_entities_states.erase(other);
-  }
-
-  MMonSync *msg = new MMonSync(MMonSync::OP_START_REPLY);
-
-  if ((!quorum.empty() && paxos->should_trim())
-      || (trim_enable_timer != NULL)) {
-    msg->flags |= MMonSync::FLAG_RETRY;
-  } else {
-    trim_timeouts.insert(make_pair(other, new C_TrimTimeout(this, other)));
-    timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
-
-    trim_entities_states[other] = SYNC_STATE_START;
-    sync_role |= SYNC_ROLE_LEADER;
-
-    paxos->trim_disable();
-
-    // Is the one that contacted us in the quorum?
-    if (quorum.count(m->get_source().num())) {
-      // Was it forwarded by someone?
-      if (m->flags & MMonSync::FLAG_REPLY_TO) {
-        // Then they must not be synchronizing. What sense would that make, eh?
-        assert(trim_timeouts.count(m->get_source_inst()) == 0);
-        // Set the provider as the one that contacted us. He's in the
-        // quorum, so he's up to the job (i.e., he's not synchronizing)
-        msg->set_reply_to(m->get_source_inst());
-        dout(10) << __func__ << " set provider to " << msg->reply_to << dendl;
-      } else {
-        // Then they must have gotten into the quorum, and boostrapped.
-        // We should tell them to abort and bootstrap ourselves.
-        msg->put();
-        msg = new MMonSync(MMonSync::OP_ABORT);
-        messenger->send_message(msg, other);
-        m->put();
-        bootstrap();
-        return;
-      }
-    } else if (!quorum.empty()) {
-      // grab someone from the quorum and assign them as the sync provider
-      int n = _pick_random_quorum_mon(rank);
-      if (n >= 0) {
-        msg->set_reply_to(monmap->get_inst(n));
-        dout(10) << __func__ << " set quorum-based provider to "
-                 << msg->reply_to << dendl;
-      } else {
-        assert(0 == "We shouldn't get here!");
-      }
-    } else {
-      // There is no quorum, so we must either be in a quorum-less cluster,
-      // or we must be mid-election.  Either way, tell them it is okay to
-      // sync from us by not setting the reply-to field.
-      assert(!(msg->flags & MMonSync::FLAG_REPLY_TO));
-    }
-  }
-  messenger->send_message(msg, other);
-  m->put();
-
-  assert(g_conf->mon_sync_leader_kill_at != 2);
-}
-
-void Monitor::handle_sync_heartbeat(MMonSync *m)
-{
-  dout(10) << __func__ << " " << *m << dendl;
-
-  entity_inst_t other = m->get_source_inst();
-  if (!(sync_role & SYNC_ROLE_LEADER)
-      || !trim_entities_states.count(other)
-      || (trim_entities_states[other] != SYNC_STATE_START)) {
-    // stray message; ignore.
-    dout(1) << __func__ << " ignored stray message " << *m << dendl;
-    m->put();
-    return;
-  }
-
-  if (!is_leader() && !quorum.empty()
-      && (trim_timeouts.count(other) > 0)) {
-    // we must have been the leader before, but we lost leadership to
-    // someone else.
-    sync_finish_abort(other);
-    m->put();
-    return;
-  }
-
-  assert(trim_timeouts.count(other) > 0);
-
-  if (trim_timeouts[other])
-    timer.cancel_event(trim_timeouts[other]);
-  trim_timeouts[other] = new C_TrimTimeout(this, other);
-  timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
-
-  assert(g_conf->mon_sync_leader_kill_at != 3);
-  sync_send_heartbeat(other, true);
-  assert(g_conf->mon_sync_leader_kill_at != 4);
-
-  m->put();
-}
-
-void Monitor::sync_finish(entity_inst_t &entity, bool abort)
-{
-  dout(10) << __func__ << " entity(" << entity << ")" << dendl;
-
-  Mutex::Locker l(trim_lock);
-
-  if (!trim_timeouts.count(entity)) {
-    dout(1) << __func__ << " we know of no sync effort from "
-           << entity << " -- ignore it." << dendl;
-    return;
-  }
-
-  if (trim_timeouts[entity] != NULL)
-    timer.cancel_event(trim_timeouts[entity]);
-
-  trim_timeouts.erase(entity);
-  trim_entities_states.erase(entity);
-
-  if (abort) {
-    MMonSync *m = new MMonSync(MMonSync::OP_ABORT);
-    assert(g_conf->mon_sync_leader_kill_at != 5);
-    messenger->send_message(m, entity);
-    assert(g_conf->mon_sync_leader_kill_at != 6);
-  }
-
-  if (!trim_timeouts.empty())
-    return;
-
-  dout(10) << __func__ << " no longer a sync leader" << dendl;
-  sync_role &= ~SYNC_ROLE_LEADER;
-
-  // we may have been the leader, but by now we may no longer be.
-  // this can happen when the we sync'ed a monitor that became the
-  // leader, or that same monitor simply came back to life and got
-  // elected as the new leader.
-  if (is_leader() && paxos->is_trim_disabled()) {
-    trim_enable_timer = new C_TrimEnable(this);
-    timer.add_event_after(30.0, trim_enable_timer);
-  }
-
-  finish_contexts(g_ceph_context, maybe_wait_for_quorum);
-}
-
-void Monitor::handle_sync_finish(MMonSync *m)
-{
-  dout(10) << __func__ << " " << *m << dendl;
-
-  entity_inst_t other = m->get_source_inst();
-
-  if (!trim_timeouts.count(other) || !trim_entities_states.count(other)
-      || (trim_entities_states[other] != SYNC_STATE_START)) {
-    dout(1) << __func__ << " ignored stray message from " << other << dendl;
-    if (!trim_timeouts.count(other))
-      dout(1) << __func__ << "  not on trim_timeouts" << dendl;
-    if (!trim_entities_states.count(other))
-      dout(1) << __func__ << "  not on trim_entities_states" << dendl;
-    else if (trim_entities_states[other] != SYNC_STATE_START)
-      dout(1) << __func__ << "  state " << trim_entities_states[other] << dendl;
-    m->put();
-    return;
-  }
-
-  // We may no longer the leader. In such case, we should just inform the
-  // other monitor that he should abort his sync. However, it appears that
-  // his sync has finished, so there is no use in scraping the whole thing
-  // now. Therefore, just go along and acknowledge.
-  if (!is_leader()) {
-    dout(10) << __func__ << " We are no longer the leader; reply nonetheless"
-            << dendl;
-  }
-
-  MMonSync *msg = new MMonSync(MMonSync::OP_FINISH_REPLY);
-  assert(g_conf->mon_sync_leader_kill_at != 7);
-  messenger->send_message(msg, other);
-  assert(g_conf->mon_sync_leader_kill_at != 8);
-
-  sync_finish(other);
-  m->put();
-}
-
-// end of leader
-
-// synchronization provider
-
-int Monitor::_pick_random_mon(int other)
-{
-  assert(monmap->size() > 0);
-  if (monmap->size() == 1)
-    return 0;
-
-  int max = monmap->size();
-  if (other >= 0)
-    max--;
-  int n = sync_rng() % max;
-  if (other >= 0 && n >= other)
-    n++;
-  return n;
-}
-
-int Monitor::_pick_random_quorum_mon(int other)
-{
-  assert(monmap->size() > 0);
-  if (quorum.empty())
-    return -1;
-  set<int>::iterator p = quorum.begin();
-  for (int n = sync_rng() % quorum.size(); p != quorum.end() && n; ++p, --n);
-  if (other >= 0 && p != quorum.end() && *p == other)
-    ++p;
-
-  return (p == quorum.end() ? *(quorum.rbegin()) : *p);
-}
-
-void Monitor::sync_timeout(entity_inst_t &entity)
-{
-  if (state == STATE_SYNCHRONIZING) {
-    assert(sync_role == SYNC_ROLE_REQUESTER);
-    assert(sync_state == SYNC_STATE_CHUNKS);
-
-    // we are a sync requester; our provider just timed out, so find another
-    // monitor to synchronize with.
-    dout(1) << __func__ << " " << sync_provider->entity << dendl;
-
-    sync_provider->attempts++;
-    if ((sync_provider->attempts > g_conf->mon_sync_max_retries)
-       || (monmap->size() == 2)) {
-      // We either tried too many times to sync, or there's just us and the
-      // monitor we were attempting to sync with.
-      // Therefore, just abort the whole sync and start off fresh whenever he
-      // (or somebody else) comes back.
-      sync_requester_abort();
-      return;
-    }
-
-    unsigned int i = 0;
-    int entity_rank = monmap->get_rank(entity.addr);
-    int debug_mon = g_conf->mon_sync_debug_provider;
-    int debug_fallback = g_conf->mon_sync_debug_provider_fallback;
-
-    dout(10) << __func__ << " entity " << entity << " rank " << entity_rank << dendl;
-    dout(10) << __func__ << " our-rank " << rank << dendl;
-
-    while ((i++) < 2*monmap->size()) {
-      // we are trying to pick a random monitor, but we cannot do this forever.
-      // in case something goes awfully wrong, just stop doing it after a
-      // couple of attempts and try again later.
-      int new_mon = _pick_random_mon(rank);
-
-      if (debug_fallback >= 0) {
-       if (entity_rank != debug_fallback)
-         new_mon = debug_fallback;
-       else if (debug_mon >= 0 && (entity_rank != debug_mon))
-         new_mon = debug_mon;
-      }
-
-      dout(10) << __func__ << " randomly picking mon rank " << new_mon << dendl;
-
-      if ((new_mon != rank) && (new_mon != entity_rank)) {
-       sync_provider->entity = monmap->get_inst(new_mon);
-       dout(10) << __func__ << " randomly choosing " << sync_provider->entity << " rank " << new_mon << dendl;
-       sync_state = SYNC_STATE_START;
-       sync_start_chunks(sync_provider);
-       return;
-      }
-    }
-
-    // well that sucks. Let's see if we can find a monitor to connect to
-    for (int i = 0; i < (int)monmap->size(); ++i) {
-      entity_inst_t i_inst = monmap->get_inst(i);
-      if (i != rank && i_inst != entity) {
-       sync_provider->entity = i_inst;
-       sync_state = SYNC_STATE_START;
-       sync_start_chunks(sync_provider);
-       return;
-      }
-    }
-
-    assert(0 == "Unable to find a new monitor to connect to. Not cool.");
-  } else if (sync_role & SYNC_ROLE_PROVIDER) {
-    dout(10) << __func__ << " cleanup " << entity << dendl;
-    sync_provider_cleanup(entity);
-    return;
-  } else
-    assert(0 == "We should never reach this");
-}
-
-void Monitor::sync_provider_cleanup(entity_inst_t &entity)
-{
-  dout(10) << __func__ << " " << entity << dendl;
-  if (sync_entities.count(entity) > 0) {
-    sync_entities[entity]->cancel_timeout();
-    sync_entities.erase(entity);
-    sync_entities_states.erase(entity);
-  }
-
-  if (sync_entities.empty()) {
-    dout(1) << __func__ << " no longer a sync provider" << dendl;
-    sync_role &= ~SYNC_ROLE_PROVIDER;
-  }
-}
-
-void Monitor::handle_sync_start_chunks(MMonSync *m)
-{
-  dout(10) << __func__ << " " << *m << dendl;
-  assert(!(sync_role & SYNC_ROLE_REQUESTER));
-
-  entity_inst_t other = m->get_source_inst();
-
-  // if we have a sync going on for this entity, just drop the message. If it
-  // was a stray message, we did the right thing. If it wasn't, then it means
-  // that we still have an old state of this entity, and that the said entity
-  // failed in the meantime and is now up again; therefore, just let the
-  // timeout timers fulfill their purpose and deal with state cleanup when
-  // they are triggered. Until then, no Sir, we won't accept your messages.
-  if (sync_entities.count(other) > 0) {
-    dout(1) << __func__ << " sync session already in progress for " << other
-           << " -- assumed as stray message." << dendl;
-    m->put();
-    return;
-  }
-
-  SyncEntity sync = get_sync_entity(other, this);
-  sync->version = paxos->get_version();
-
-  if (!m->last_key.first.empty() && !m->last_key.second.empty()) {
-    sync->last_received_key = m->last_key;
-    dout(10) << __func__ << " set last received key to ("
-            << sync->last_received_key.first << ","
-            << sync->last_received_key.second << ")" << dendl;
-  }
-
-  sync->sync_init();
 
-  sync_entities.insert(make_pair(other, sync));
-  sync_entities_states[other] = SYNC_STATE_START;
-  sync_role |= SYNC_ROLE_PROVIDER;
-
-  sync_send_chunks(sync);
-  m->put();
-}
-
-void Monitor::handle_sync_chunk_reply(MMonSync *m)
-{
-  dout(10) << __func__ << " " << *m << dendl;
-
-  entity_inst_t other = m->get_source_inst();
-
-  if (!(sync_role & SYNC_ROLE_PROVIDER)
-      || !sync_entities.count(other)
-      || (sync_entities_states[other] != SYNC_STATE_START)) {
-    dout(1) << __func__ << " ignored stray message from " << other << dendl;
-    m->put();
-    return;
-  }
-
-  if (m->flags & MMonSync::FLAG_LAST) {
-    // they acked the last chunk. Clean up.
-    sync_provider_cleanup(other);
-    m->put();
-    return;
-  }
-
-  sync_send_chunks(sync_entities[other]);
-  m->put();
-}
-
-void Monitor::sync_send_chunks(SyncEntity sync)
-{
-  dout(10) << __func__ << " entity(" << sync->entity << ")" << dendl;
-
-  sync->cancel_timeout();
-
-  assert(sync->synchronizer.use_count() > 0);
-  assert(sync->synchronizer->has_next_chunk());
-
-  MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK);
-
-  sync->synchronizer->get_chunk(msg->chunk_bl);
-  msg->last_key = sync->synchronizer->get_last_key();
-  dout(10) << __func__ << " last key ("
-          << msg->last_key.first << ","
-          << msg->last_key.second << ")" << dendl;
-
-  sync->sync_update();
-
-  if (sync->has_crc()) {
-    msg->flags |= MMonSync::FLAG_CRC;
-    msg->crc = sync->crc_get();
-    sync->crc_clear();
-  }
-
-  if (!sync->synchronizer->has_next_chunk()) {
-    msg->flags |= MMonSync::FLAG_LAST;
-    msg->version = sync->get_version();
-    sync->synchronizer.reset();
-  }
-
-  sync->set_timeout(new C_SyncTimeout(this, sync->entity),
-                   g_conf->mon_sync_timeout);
-  assert(g_conf->mon_sync_provider_kill_at != 3);
-  messenger->send_message(msg, sync->entity);
-  assert(g_conf->mon_sync_provider_kill_at != 4);
-
-  // kill the monitor as soon as we move into synchronizing the paxos versions.
-  // This is intended as debug.
-  if (sync->sync_state == SyncEntityImpl::STATE_PAXOS)
-    assert(g_conf->mon_sync_provider_kill_at != 5);
-
-
-}
-// end of synchronization provider
-
-// start of synchronization requester
-
-void Monitor::sync_requester_abort()
+void Monitor::sync_timeout()
 {
   dout(10) << __func__ << dendl;
   assert(state == STATE_SYNCHRONIZING);
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-
-  if (sync_leader.get() != NULL) {
-    dout(10) << __func__ << " leader " << sync_leader->entity << dendl;
-    sync_leader->cancel_timeout();
-    sync_leader.reset();
-  }
-
-  if (sync_provider.get() != NULL) {
-    dout(10) << __func__ << " provider " << sync_provider->entity << dendl;
-    sync_provider->cancel_timeout();
-
-    MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
-    messenger->send_message(msg, sync_provider->entity);
-
-    sync_provider.reset();
-  }
-
-  // Given that we are explicitely aborting the whole sync process, we should
-  // play it safe and clear the store.
-  set<string> targets = get_sync_targets_names();
-  store->clear(targets);
-
-  dout(1) << __func__ << " no longer a sync requester" << dendl;
-  sync_role = SYNC_ROLE_NONE;
-  sync_state = SYNC_STATE_NONE;
-
-  state = 0;
-
   bootstrap();
 }
 
@@ -1400,473 +801,391 @@ void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
   latest_monmap.encode(bl, CEPH_FEATURES_ALL);
 }
 
-/**
- *
- */
-void Monitor::sync_store_init()
+bool Monitor::is_sync_on_going()
 {
-  MonitorDBStore::Transaction t;
-  t.put("mon_sync", "in_sync", 1);
-
-  bufferlist backup_monmap;
-  sync_obtain_latest_monmap(backup_monmap);
-  assert(backup_monmap.length() > 0);
-
-  t.put("mon_sync", "latest_monmap", backup_monmap);
-
-  store->apply_transaction(t);
+  return store->exists("mon_sync", "in_sync");
 }
 
-void Monitor::sync_store_cleanup()
+void Monitor::sync_reset()
 {
-  MonitorDBStore::Transaction t;
-  t.erase("mon_sync", "in_sync");
-  t.erase("mon_sync", "latest_monmap");
-  store->apply_transaction(t);
-}
+  if (sync_timeout_event) {
+    timer.cancel_event(sync_timeout_event);
+    sync_timeout_event = NULL;
+  }
 
-bool Monitor::is_sync_on_going()
-{
-  return store->exists("mon_sync", "in_sync");
+  // leader state
+  sync_providers.clear();
+
+  // requester state
+  sync_provider = entity_inst_t();
+  sync_cookie = 0;
+  sync_full = false;
+  sync_start_version = 0;
 }
 
 /**
- * Start Sync process
+ * Start sync process
  *
- * Create SyncEntity instances for the leader and the provider;
- * Send OP_START message to the leader;
- * Set trim timeout on the leader
+ * Start pulling committed state from another monitor.
  *
  * @param other Synchronization provider to-be.
+ * @param whether to do a full sync or just catch up on recent paxos
  */
-void Monitor::sync_start(entity_inst_t &other)
+void Monitor::sync_start(entity_inst_t &other, bool full)
 {
-  cancel_probe_timeout();
+  dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
 
-  dout(10) << __func__ << " entity( " << other << " )" << dendl;
-  if ((state == STATE_SYNCHRONIZING) && (sync_role == SYNC_ROLE_REQUESTER)) {
-    dout(1) << __func__ << " already synchronizing; drop it" << dendl;
-    return;
-  }
+  assert(state == STATE_PROBING ||
+        state == STATE_SYNCHRONIZING);
+  state = STATE_SYNCHRONIZING;
 
-  // Looks like we are the acting leader for someone.  Better force them to
-  // abort their endeavours.  After all, if they are trying to sync from us,
-  // it means that we must have a higher paxos version than the one they
-  // have; however, if we are trying to sync as well, it must mean that
-  // someone has a higher version than the one we have.  Everybody wins if
-  // we force them to cancel their sync and try again.
-  if (sync_role & SYNC_ROLE_LEADER) {
-    dout(10) << __func__ << " we are acting as a leader to someone; "
-             << "destroy their dreams" << dendl;
+  // make sure are not a provider for anyone!
+  sync_reset();
 
-    assert(!trim_timeouts.empty());
-    reset_sync();
-  }
+  sync_full = full;
 
-  assert(sync_role == SYNC_ROLE_NONE);
-  assert(sync_state == SYNC_STATE_NONE);
+  if (sync_full) {
+    // mark that we are syncing
+    MonitorDBStore::Transaction t;
 
-  state = STATE_SYNCHRONIZING;
-  sync_role = SYNC_ROLE_REQUESTER;
-  sync_state = SYNC_STATE_START;
+    bufferlist backup_monmap;
+    sync_obtain_latest_monmap(backup_monmap);
+    assert(backup_monmap.length() > 0);
 
-  // First init the store (grab the monmap and all that) and only then
-  // clear the store (except for the mon_sync prefix).  This avoids that
-  // we end up losing the monmaps from the store.
-  sync_store_init();
+    t.put("mon_sync", "latest_monmap", backup_monmap);
+    t.put("mon_sync", "in_sync", 1);
+    store->apply_transaction(t);
 
-  // clear the underlying store, since we are starting a whole
-  // sync process from the bare beginning.
-  set<string> targets = get_sync_targets_names();
-  store->clear(targets);
+    assert(g_conf->mon_sync_requester_kill_at != 1);
 
+    // clear the underlying store
+    set<string> targets = get_sync_targets_names();
+    dout(10) << __func__ << " clearing prefixes " << targets << dendl;
+    store->clear(targets);
 
-  // assume 'other' as the leader. We will update the leader once we receive
-  // a reply to the sync start.
-  entity_inst_t leader = other;
-  entity_inst_t provider = other;
-
-  if (g_conf->mon_sync_debug_leader >= 0) {
-    leader = monmap->get_inst(g_conf->mon_sync_debug_leader);
-    dout(10) << __func__ << " assuming " << leader
-            << " as the leader for debug" << dendl;
+    assert(g_conf->mon_sync_requester_kill_at != 2);
   }
 
-  if (g_conf->mon_sync_debug_provider >= 0) {
-    provider = monmap->get_inst(g_conf->mon_sync_debug_provider);
-    dout(10) << __func__ << " assuming " << provider
-            << " as the provider for debug" << dendl;
-  }
+  // assume 'other' as the leader. We will update the leader once we receive
+  // a reply to the sync start.
+  sync_provider = other;
 
-  sync_leader = get_sync_entity(leader, this);
-  sync_provider = get_sync_entity(provider, this);
+  sync_reset_timeout();
 
-  // this message may bounce through 'other' (if 'other' is not the leader)
-  // in order to reach the leader. Therefore, set a higher timeout to allow
-  // breathing room for the reply message to reach us.
-  sync_leader->set_timeout(new C_SyncStartTimeout(this),
-                          g_conf->mon_sync_trim_timeout*2);
+  MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
+  if (!sync_full)
+    m->last_committed = paxos->get_version();
+  messenger->send_message(m, sync_provider);
+}
 
-  MMonSync *m = new MMonSync(MMonSync::OP_START);
-  messenger->send_message(m, other);
-  assert(g_conf->mon_sync_requester_kill_at != 1);
+void Monitor::sync_reset_timeout()
+{
+  dout(10) << __func__ << dendl;
+  if (sync_timeout_event)
+    timer.cancel_event(sync_timeout_event);
+  sync_timeout_event = new C_SyncTimeout(this);
+  timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
 }
 
-void Monitor::sync_start_chunks(SyncEntity provider)
+void Monitor::sync_finish(version_t last_committed)
 {
-  dout(10) << __func__ << " provider(" << provider->entity << ")" << dendl;
+  dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
 
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-  assert(sync_state == SYNC_STATE_START);
+  assert(g_conf->mon_sync_requester_kill_at != 7);
 
-  sync_state = SYNC_STATE_CHUNKS;
+  if (sync_full) {
+    // finalize the paxos commits
+    MonitorDBStore::Transaction tx;
+    paxos->read_and_prepare_transactions(&tx, sync_start_version, last_committed);
+    tx.put(paxos->get_name(), "last_committed", last_committed);
 
-  provider->set_timeout(new C_SyncTimeout(this, provider->entity),
-                       g_conf->mon_sync_timeout);
-  MMonSync *msg = new MMonSync(MMonSync::OP_START_CHUNKS);
-  pair<string,string> last_key = provider->last_received_key;
-  if (!last_key.first.empty() && !last_key.second.empty())
-    msg->last_key = last_key;
+    dout(30) << __func__ << " final tx dump:\n";
+    JSONFormatter f(true);
+    tx.dump(&f);
+    f.flush(*_dout);
+    *_dout << dendl;
 
-  assert(g_conf->mon_sync_requester_kill_at != 4);
-  messenger->send_message(msg, provider->entity);
-  assert(g_conf->mon_sync_requester_kill_at != 5);
-}
+    store->apply_transaction(tx);
+  }
 
-void Monitor::sync_start_reply_timeout()
-{
-  dout(10) << __func__ << dendl;
+  assert(g_conf->mon_sync_requester_kill_at != 8);
 
-  assert(state == STATE_SYNCHRONIZING);
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-  assert(sync_state == SYNC_STATE_START);
-
-  // Restart the sync attempt. It's not as if we were going to lose a vast
-  // amount of work, and if we take into account that we are timing out while
-  // waiting for a reply from the Leader, it sure seems like the right path
-  // to take.
-  sync_requester_abort();
+  MonitorDBStore::Transaction t;
+  t.erase("mon_sync", "in_sync");
+  t.erase("mon_sync", "force_sync");
+  store->apply_transaction(t);
+
+  sync_reset();
+
+  assert(g_conf->mon_sync_requester_kill_at != 9);
+
+  init_paxos();
+
+  assert(g_conf->mon_sync_requester_kill_at != 10);
+
+  bootstrap();
 }
 
-void Monitor::handle_sync_start_reply(MMonSync *m)
+void Monitor::handle_sync(MMonSync *m)
 {
   dout(10) << __func__ << " " << *m << dendl;
+  switch (m->op) {
 
-  entity_inst_t other = m->get_source_inst();
-
-  if ((sync_role != SYNC_ROLE_REQUESTER)
-      || (sync_state != SYNC_STATE_START)) {
-    // If the leader has sent this message before we failed, there is no point
-    // in replying to it, as he has no idea that we actually received it. On
-    // the other hand, if he received one of our stray messages (because it was
-    // delivered once he got back up after failing) and replied accordingly,
-    // there is a chance that he did stopped trimming on our behalf. However,
-    // we have no way to know it, and we really don't want to mess with his
-    // state if that is not the case. Therefore, just drop it and let the
-    // timeouts figure it out. Eventually.
-    dout(1) << __func__ << " stray message -- drop it." << dendl;
-    goto out;
-  }
+    // provider ---------
 
-  assert(state == STATE_SYNCHRONIZING);
-  assert(sync_leader.get() != NULL);
-  assert(sync_provider.get() != NULL);
-
-  // We now know for sure who the leader is.
-  sync_leader->entity = other;
-  sync_leader->cancel_timeout();
-
-  if (m->flags & MMonSync::FLAG_RETRY) {
-    dout(10) << __func__ << " retrying sync at a later time" << dendl;
-    sync_role = SYNC_ROLE_NONE;
-    sync_state = SYNC_STATE_NONE;
-    sync_leader->set_timeout(new C_SyncStartRetry(this, sync_leader->entity),
-                            g_conf->mon_sync_backoff_timeout);
-    goto out;
-  }
+  case MMonSync::OP_GET_COOKIE_FULL:
+  case MMonSync::OP_GET_COOKIE_RECENT:
+    handle_sync_get_cookie(m);
+    break;
+  case MMonSync::OP_GET_CHUNK:
+    handle_sync_get_chunk(m);
+    break;
 
-  if (m->flags & MMonSync::FLAG_REPLY_TO) {
-    dout(10) << __func__ << " leader told us to use " << m->reply_to
-             << " as sync provider" << dendl;
-    sync_provider->entity = m->reply_to;
-  } else {
-    dout(10) << __func__ << " synchronizing from leader at " << other << dendl;
-    sync_provider->entity = other;
-  }
+    // client -----------
 
-  sync_leader->set_timeout(new C_HeartbeatTimeout(this),
-                          g_conf->mon_sync_heartbeat_timeout);
+  case MMonSync::OP_COOKIE:
+    handle_sync_cookie(m);
+    break;
 
-  assert(g_conf->mon_sync_requester_kill_at != 2);
-  sync_send_heartbeat(sync_leader->entity);
-  assert(g_conf->mon_sync_requester_kill_at != 3);
+  case MMonSync::OP_CHUNK:
+  case MMonSync::OP_LAST_CHUNK:
+    handle_sync_chunk(m);
+    break;
+  case MMonSync::OP_NO_COOKIE:
+    handle_sync_no_cookie(m);
+    break;
 
-  sync_start_chunks(sync_provider);
-out:
+  default:
+    dout(0) << __func__ << " unknown op " << m->op << dendl;
+    assert(0 == "unknown op");
+  }
   m->put();
 }
 
-void Monitor::handle_sync_heartbeat_reply(MMonSync *m)
+// leader
+
+void Monitor::_sync_reply_no_cookie(MMonSync *m)
 {
-  dout(10) << __func__ << " " << *m << dendl;
+  MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
+  messenger->send_message(reply, m->get_connection());
+}
 
-  entity_inst_t other = m->get_source_inst();
-  if ((sync_role != SYNC_ROLE_REQUESTER)
-      || (sync_state == SYNC_STATE_NONE)
-      || (sync_leader.get() == NULL)
-      || (other != sync_leader->entity)) {
-    dout(1) << __func__ << " stray message -- drop it." << dendl;
-    m->put();
+void Monitor::handle_sync_get_cookie(MMonSync *m)
+{
+  if (is_synchronizing()) {
+    _sync_reply_no_cookie(m);
     return;
   }
 
-  assert(state == STATE_SYNCHRONIZING);
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-  assert(sync_state != SYNC_STATE_NONE);
+  assert(g_conf->mon_sync_provider_kill_at != 1);
 
-  assert(sync_leader.get() != NULL);
-  assert(sync_leader->entity == other);
+  // make up a unique cookie.  include election epoch (which persists
+  // across restarts for the whole cluster) and a counter for this
+  // process instance.  there is no need to be unique *across*
+  // monitors, though.
+  uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
+  assert(sync_providers.count(cookie) == 0);
 
-  sync_leader->cancel_timeout();
-  sync_leader->set_timeout(new C_HeartbeatInterval(this, sync_leader->entity),
-                          g_conf->mon_sync_heartbeat_interval);
-  m->put();
+  dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
+
+  SyncProvider& sp = sync_providers[cookie];
+  sp.cookie = cookie;
+  sp.entity = m->get_source_inst();
+  sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
+
+  set<string> sync_targets;
+  if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
+    // full scan
+    sync_targets = get_sync_targets_names();
+    sp.last_committed = paxos->get_version();
+    sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
+    sp.full = true;
+    dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
+  } else {
+    // just catch up paxos
+    sp.last_committed = m->last_committed;
+  }
+  dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
+
+  MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
+  reply->last_committed = sp.last_committed;
+  messenger->send_message(reply, m->get_connection());
 }
 
-void Monitor::handle_sync_chunk(MMonSync *m)
+void Monitor::handle_sync_get_chunk(MMonSync *m)
 {
   dout(10) << __func__ << " " << *m << dendl;
 
-  entity_inst_t other = m->get_source_inst();
-
-  if ((sync_role != SYNC_ROLE_REQUESTER)
-      || (sync_state != SYNC_STATE_CHUNKS)
-      || (sync_provider.get() == NULL)
-      || (other != sync_provider->entity)) {
-    dout(1) << __func__ << " stray message -- drop it." << dendl;
-    m->put();
+  if (sync_providers.count(m->cookie) == 0) {
+    dout(10) << __func__ << " no cookie " << m->cookie << dendl;
+    _sync_reply_no_cookie(m);
     return;
   }
 
-  assert(state == STATE_SYNCHRONIZING);
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-  assert(sync_state == SYNC_STATE_CHUNKS);
-
-  assert(sync_leader.get() != NULL);
+  assert(g_conf->mon_sync_provider_kill_at != 2);
 
-  assert(sync_provider.get() != NULL);
-  assert(other == sync_provider->entity);
+  SyncProvider& sp = sync_providers[m->cookie];
+  sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
 
-  sync_provider->cancel_timeout();
+  if (sp.last_committed < paxos->get_first_committed()) {
+    dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
+            << " < our fc " << paxos->get_first_committed() << dendl;
+    sync_providers.erase(m->cookie);
+    _sync_reply_no_cookie(m);
+    return;
+  }
 
+  MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
   MonitorDBStore::Transaction tx;
-  tx.append_from_encoded(m->chunk_bl);
-
-  sync_provider->set_timeout(new C_SyncTimeout(this, sync_provider->entity),
-                            g_conf->mon_sync_timeout);
-  sync_provider->last_received_key = m->last_key;
 
-  MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK_REPLY);
-
-  bool stop = false;
-  if (m->flags & MMonSync::FLAG_LAST) {
-    msg->flags |= MMonSync::FLAG_LAST;
-    assert(m->version > 0);
-    tx.put(paxos->get_name(), "last_committed", m->version);
-    stop = true;
+  int left = g_conf->mon_sync_max_payload_size;
+  while (sp.last_committed < paxos->get_version() && left > 0) {
+    bufferlist bl;
+    sp.last_committed++;
+    store->get(paxos->get_name(), sp.last_committed, bl);
+    tx.put(paxos->get_name(), sp.last_committed, bl);
+    left -= bl.length();
+    dout(20) << __func__ << " including paxos state " << sp.last_committed << dendl;
   }
-  assert(g_conf->mon_sync_requester_kill_at != 8);
-  messenger->send_message(msg, sync_provider->entity);
+  reply->last_committed = sp.last_committed;
 
-  store->apply_transaction(tx);
+  if (sp.full && left > 0) {
+    sp.synchronizer->get_chunk_tx(tx, left);
+    sp.last_key = sp.synchronizer->get_last_key();
+    reply->last_key = sp.last_key;
+  }
 
-  if (g_conf->mon_sync_debug && (m->flags & MMonSync::FLAG_CRC)) {
-    dout(10) << __func__ << " checking CRC" << dendl;
-    MonitorDBStore::Synchronizer sync;
-    if (m->flags & MMonSync::FLAG_LAST) {
-      dout(10) << __func__ << " checking CRC only for Paxos" << dendl;
-      string paxos_name("paxos");
-      sync = store->get_synchronizer(paxos_name);
-    } else {
-      dout(10) << __func__ << " checking CRC for all prefixes" << dendl;
-      set<string> prefixes = get_sync_targets_names();
-      pair<string,string> empty_key;
-      sync = store->get_synchronizer(empty_key, prefixes);
-    }
+  if ((sp.full && sp.synchronizer->has_next_chunk()) ||
+      sp.last_committed < paxos->get_version()) {
+    dout(10) << __func__ << " chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl;
+  } else {
+    dout(10) << __func__ << " last chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl;
+    reply->op = MMonSync::OP_LAST_CHUNK;
 
-    while (sync->has_next_chunk()) {
-      bufferlist bl;
-      sync->get_chunk(bl);
-    }
-    __u32 got_crc = sync->crc();
-    dout(10) << __func__ << " expected crc " << m->crc
-            << " got " << got_crc << dendl;
+    assert(g_conf->mon_sync_provider_kill_at != 3);
 
-    assert(m->crc == got_crc);
-    dout(10) << __func__ << " CRC matches" << dendl;
+    // clean up our local state
+    sync_providers.erase(sp.cookie);
   }
 
-  m->put();
-  if (stop)
-    sync_stop();
-}
-
-void Monitor::sync_stop()
-{
-  dout(10) << __func__ << dendl;
+  ::encode(tx, reply->chunk_bl);
 
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-  assert(sync_state == SYNC_STATE_CHUNKS);
-
-  sync_state = SYNC_STATE_STOP;
+  messenger->send_message(reply, m->get_connection());
+}
 
-  sync_leader->cancel_timeout();
-  sync_provider->cancel_timeout();
-  sync_provider.reset();
+// requester
 
-  entity_inst_t leader = sync_leader->entity;
+void Monitor::handle_sync_cookie(MMonSync *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+  if (sync_cookie) {
+    dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
+    return;
+  }
+  if (m->get_source_inst() != sync_provider) {
+    dout(10) << __func__ << " source does not match, discarding" << dendl;
+    return;
+  }
+  sync_cookie = m->cookie;
+  sync_start_version = m->last_committed;
 
-  sync_leader->set_timeout(new C_SyncFinishReplyTimeout(this),
-                          g_conf->mon_sync_timeout);
+  sync_reset_timeout();
+  sync_get_next_chunk();
 
-  MMonSync *msg = new MMonSync(MMonSync::OP_FINISH);
-  assert(g_conf->mon_sync_requester_kill_at != 9);
-  messenger->send_message(msg, leader);
-  assert(g_conf->mon_sync_requester_kill_at != 10);
+  assert(g_conf->mon_sync_requester_kill_at != 3);
 }
 
-void Monitor::sync_finish_reply_timeout()
+void Monitor::sync_get_next_chunk()
 {
-  dout(10) << __func__ << dendl;
-  assert(state == STATE_SYNCHRONIZING);
-  assert(sync_leader.get() != NULL);
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-  assert(sync_state == SYNC_STATE_STOP);
+  dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
+  if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
+    dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
+    usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
+  }
+  MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
+  messenger->send_message(r, sync_provider);
 
-  sync_requester_abort();
+  assert(g_conf->mon_sync_requester_kill_at != 4);
 }
 
-void Monitor::handle_sync_finish_reply(MMonSync *m)
+void Monitor::handle_sync_chunk(MMonSync *m)
 {
   dout(10) << __func__ << " " << *m << dendl;
-  entity_inst_t other = m->get_source_inst();
 
-  if ((sync_role != SYNC_ROLE_REQUESTER)
-      || (sync_state != SYNC_STATE_STOP)
-      || (sync_leader.get() == NULL)
-      || (sync_leader->entity != other)) {
-    dout(1) << __func__ << " stray message -- drop it." << dendl;
-    m->put();
+  if (m->cookie != sync_cookie) {
+    dout(10) << __func__ << " cookie does not match, discarding" << dendl;
+    return;
+  }
+  if (m->get_source_inst() != sync_provider) {
+    dout(10) << __func__ << " source does not match, discarding" << dendl;
     return;
   }
 
-  assert(sync_role == SYNC_ROLE_REQUESTER);
-  assert(sync_state == SYNC_STATE_STOP);
+  assert(state == STATE_SYNCHRONIZING);
+  assert(g_conf->mon_sync_requester_kill_at != 5);
 
-  assert(sync_leader.get() != NULL);
-  assert(sync_leader->entity == other);
+  MonitorDBStore::Transaction tx;
+  tx.append_from_encoded(m->chunk_bl);
 
-  sync_role = SYNC_ROLE_NONE;
-  sync_state = SYNC_STATE_NONE;
+  dout(30) << __func__ << " tx dump:\n";
+  JSONFormatter f(true);
+  tx.dump(&f);
+  f.flush(*_dout);
+  *_dout << dendl;
 
-  sync_leader->cancel_timeout();
-  sync_leader.reset();
+  store->apply_transaction(tx);
 
-  paxos->reapply_all_versions();
+  assert(g_conf->mon_sync_requester_kill_at != 6);
 
-  sync_store_cleanup();
+  if (!sync_full) {
+    dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
+    MonitorDBStore::Transaction tx;
+    paxos->read_and_prepare_transactions(&tx, paxos->get_version(), m->last_committed);
+    tx.put(paxos->get_name(), "last_committed", m->last_committed);
 
-  init_paxos();
+    dout(30) << __func__ << " tx dump:\n";
+    JSONFormatter f(true);
+    tx.dump(&f);
+    f.flush(*_dout);
+    *_dout << dendl;
 
-  assert(g_conf->mon_sync_requester_kill_at != 11);
+    store->apply_transaction(tx);
+    paxos->init();  // to refresh what we just wrote
+  }
 
-  m->put();
+  if (m->op == MMonSync::OP_CHUNK) {
+    sync_reset_timeout();
+    sync_get_next_chunk();
+  } else if (m->op == MMonSync::OP_LAST_CHUNK) {
+    sync_finish(m->last_committed);
+  }
+}
 
+void Monitor::handle_sync_no_cookie(MMonSync *m)
+{
+  dout(10) << __func__ << dendl;
+  sync_reset();
   bootstrap();
 }
 
-void Monitor::handle_sync_abort(MMonSync *m)
+void Monitor::sync_trim_providers()
 {
-  dout(10) << __func__ << " " << *m << dendl;
-  /* This function's responsabilities are manifold, and they depend on
-   * who we (the monitor) are and what is our role in the sync.
-   *
-   * If we are the sync requester (i.e., if we are synchronizing), it
-   * means that we *must* abort the current sync and bootstrap. This may
-   * be required if there was a leader change and we are talking to the
-   * wrong leader, which makes continuing with the current sync way too
-   * risky, given that a Paxos trim may be underway and we certainly incur
-   * in the chance of ending up with an inconsistent store state.
-   *
-   * If we are the sync provider, it means that the requester wants to
-   * abort his sync, either because he lost connectivity to the leader
-   * (i.e., his heartbeat timeout was triggered) or he became aware of a
-   * leader change.
-   *
-   * As a leader, we should never receive such a message though, unless we
-   * have just won an election, in which case we should have been a sync
-   * provider before. In such a case, we should behave as if we were a sync
-   * provider and clean up the requester's state.
-   */
-  entity_inst_t other = m->get_source_inst();
-
-  if ((sync_role == SYNC_ROLE_REQUESTER)
-      && (sync_leader.get() != NULL)
-      && (sync_leader->entity == other)) {
-
-    sync_requester_abort();
-  } else if ((sync_role & SYNC_ROLE_PROVIDER)
-            && (sync_entities.count(other) > 0)
-            && (sync_entities_states[other] == SYNC_STATE_START)) {
+  dout(20) << __func__ << dendl;
 
-    sync_provider_cleanup(other);
-  } else {
-    dout(1) << __func__ << " stray message -- drop it." << dendl;
+  utime_t now = ceph_clock_now(g_ceph_context);
+  map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
+  while (p != sync_providers.end()) {
+    if (now > p->second.timeout) {
+      dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
+      sync_providers.erase(p++);
+    } else {
+      ++p;
+    }
   }
-  m->put();
 }
 
-void Monitor::handle_sync(MMonSync *m)
-{
-  dout(10) << __func__ << " " << *m << dendl;
-  switch (m->op) {
-  case MMonSync::OP_START:
-    handle_sync_start(m);
-    break;
-  case MMonSync::OP_START_REPLY:
-    handle_sync_start_reply(m);
-    break;
-  case MMonSync::OP_HEARTBEAT:
-    handle_sync_heartbeat(m);
-    break;
-  case MMonSync::OP_HEARTBEAT_REPLY:
-    handle_sync_heartbeat_reply(m);
-    break;
-  case MMonSync::OP_FINISH:
-    handle_sync_finish(m);
-    break;
-  case MMonSync::OP_START_CHUNKS:
-    handle_sync_start_chunks(m);
-    break;
-  case MMonSync::OP_CHUNK:
-    handle_sync_chunk(m);
-    break;
-  case MMonSync::OP_CHUNK_REPLY:
-    handle_sync_chunk_reply(m);
-    break;
-  case MMonSync::OP_FINISH_REPLY:
-    handle_sync_finish_reply(m);
-    break;
-  case MMonSync::OP_ABORT:
-    handle_sync_abort(m);
-    break;
-  default:
-    dout(0) << __func__ << " unknown op " << m->op << dendl;
-    m->put();
-    assert(0 == "unknown op");
-    break;
-  }
-}
+// ---------------------------------------------------
+// probe
 
 void Monitor::cancel_probe_timeout()
 {
@@ -2001,27 +1320,40 @@ void Monitor::handle_probe_reply(MMonProbe *m)
   assert(paxos != NULL);
 
   if (is_synchronizing()) {
-    dout(10) << " we are currently synchronizing, so that will continue."
-             << dendl;
+    dout(10) << " currently syncing" << dendl;
     m->put();
     return;
   }
 
   entity_inst_t other = m->get_source_inst();
+
+  if (paxos->get_version() < m->paxos_first_version &&
+      m->paxos_first_version > 1) {  // no need to sync if we're 0 and they start at 1.
+    dout(10) << " peer paxos versions [" << m->paxos_first_version
+            << "," << m->paxos_last_version << "]"
+            << " vs my version " << paxos->get_version()
+            << " (too far ahead)"
+            << dendl;
+    cancel_probe_timeout();
+    sync_start(other, true);
+    m->put();
+    return;
+  }
+  if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+    dout(10) << " peer paxos version " << m->paxos_last_version
+            << " vs my version " << paxos->get_version()
+            << " (too far ahead)"
+            << dendl;
+    cancel_probe_timeout();
+    sync_start(other, false);
+    m->put();
+    return;
+  }
+
   // is there an existing quorum?
   if (m->quorum.size()) {
     dout(10) << " existing quorum " << m->quorum << dendl;
 
-    if (paxos->get_version() < m->paxos_first_version) {
-      dout(10) << " peer paxos versions [" << m->paxos_first_version
-               << "," << m->paxos_last_version << "]"
-              << " vs my version " << paxos->get_version()
-              << " (too far ahead)"
-              << dendl;
-      sync_start(other);
-      m->put();
-      return;
-    }
     dout(10) << " peer paxos version " << m->paxos_last_version
              << " vs my version " << paxos->get_version()
              << " (ok)"
@@ -2046,16 +1378,6 @@ void Monitor::handle_probe_reply(MMonProbe *m)
       return;
     }
 
-    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
-      dout(10) << " peer paxos version " << m->paxos_last_version
-              << " vs my version " << paxos->get_version()
-              << " (too far ahead)"
-              << dendl;
-      sync_start(other);
-      m->put();
-      return;
-    }
-
     unsigned need = monmap->size() / 2 + 1;
     dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
     if (outside_quorum.size() >= need) {
@@ -2066,7 +1388,7 @@ void Monitor::handle_probe_reply(MMonProbe *m)
         dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
       }
     } else {
-       dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
+      dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
     }
   }
   m->put();
@@ -2149,19 +1471,6 @@ void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features
   dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
           << " quorum is " << quorum << " features are " << quorum_features << dendl;
 
-  // let everyone currently syncing know that we are no longer the leader and
-  // that they should all abort their on-going syncs
-  for (map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
-       iter != trim_timeouts.end();
-       ++iter) {
-    timer.cancel_event((*iter).second);
-    entity_inst_t entity = (*iter).first;
-    MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
-    messenger->send_message(msg, entity);
-  }
-  trim_timeouts.clear();
-  sync_role &= ~SYNC_ROLE_LEADER;
-  
   paxos->peon_init();
   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
     (*p)->election_finished();
@@ -2216,79 +1525,7 @@ bool Monitor::_allowed_command(MonSession *s, map<string, cmd_vartype>& cmd)
   return retval;
 }
 
-void Monitor::_sync_status(ostream& ss)
-{
-  JSONFormatter jf(true);
-  jf.open_object_section("sync_status");
-  jf.dump_string("state", get_state_name());
-  jf.dump_unsigned("paxos_version", paxos->get_version());
-
-  if (is_leader() || (sync_role == SYNC_ROLE_LEADER)) {
-    Mutex::Locker l(trim_lock);
-    jf.open_object_section("trim");
-    jf.dump_int("disabled", paxos->is_trim_disabled());
-    jf.dump_int("should_trim", paxos->should_trim());
-    if (!trim_timeouts.empty()) {
-      jf.open_array_section("mons");
-      for (map<entity_inst_t,Context*>::iterator it = trim_timeouts.begin();
-          it != trim_timeouts.end();
-          ++it) {
-       entity_inst_t e = (*it).first;
-       jf.dump_stream("mon") << e;
-       int s = -1;
-       if (trim_entities_states.count(e))
-         s = trim_entities_states[e];
-       jf.dump_stream("sync_state") << get_sync_state_name(s);
-      }
-    }
-    jf.close_section();
-  }
-
-  if (!sync_entities.empty() || (sync_role == SYNC_ROLE_PROVIDER)) {
-    jf.open_array_section("on_going");
-    for (map<entity_inst_t,SyncEntity>::iterator it = sync_entities.begin();
-        it != sync_entities.end();
-        ++it) {
-      entity_inst_t e = (*it).first;
-      jf.open_object_section("mon");
-      jf.dump_stream("addr") << e;
-      jf.dump_string("state", (*it).second->get_state());
-      int s = -1;
-      if (sync_entities_states.count(e))
-         s = sync_entities_states[e];
-      jf.dump_stream("sync_state") << get_sync_state_name(s);
-
-      jf.close_section();
-    }
-    jf.close_section();
-  }
-
-  if (is_synchronizing() || (sync_role == SYNC_ROLE_REQUESTER)) {
-    jf.open_object_section("leader");
-    SyncEntity sync_entity = sync_leader;
-    if (sync_entity.get() != NULL)
-      jf.dump_stream("addr") << sync_entity->entity;
-    jf.close_section();
-
-    jf.open_object_section("provider");
-    sync_entity = sync_provider;
-    if (sync_entity.get() != NULL)
-      jf.dump_stream("addr") << sync_entity->entity;
-    jf.close_section();
-  }
-
-  if (g_conf->mon_sync_leader_kill_at > 0)
-    jf.dump_int("leader_kill_at", g_conf->mon_sync_leader_kill_at);
-  if (g_conf->mon_sync_provider_kill_at > 0)
-    jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
-  if (g_conf->mon_sync_requester_kill_at > 0)
-    jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
-
-  jf.close_section();
-  jf.flush(ss);
-}
-
-void Monitor::_sync_force(ostream& ss)
+void Monitor::sync_force(ostream& ss)
 {
   MonitorDBStore::Transaction tx;
   tx.put("mon_sync", "force_sync", 1);
@@ -2347,17 +1584,31 @@ void Monitor::_mon_status(ostream& ss)
     jf.dump_stream("peer") << *p;
   jf.close_section();
 
+  jf.open_array_section("sync_provider");
+  for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
+       p != sync_providers.end();
+       ++p) {
+    jf.dump_unsigned("cookie", p->second.cookie);
+    jf.dump_stream("entity") << p->second.entity;
+    jf.dump_stream("timeout") << p->second.timeout;
+    jf.dump_unsigned("last_committed", p->second.last_committed);
+    jf.dump_stream("last_key") << p->second.last_key;
+  }
+  jf.close_section();
+
   if (is_synchronizing()) {
-    if (sync_leader)
-      jf.dump_stream("sync_leader") << sync_leader->entity;
-    else
-      jf.dump_string("sync_leader", "");
-    if (sync_provider)
-      jf.dump_stream("sync_provider") << sync_provider->entity;
-    else
-      jf.dump_string("sync_provider", "");
+    jf.open_object_section("sync");
+    jf.dump_stream("sync_provider") << sync_provider;
+    jf.dump_unsigned("sync_cookie", sync_cookie);
+    jf.dump_unsigned("sync_start_version", sync_start_version);
+    jf.close_section();
   }
 
+  if (g_conf->mon_sync_provider_kill_at > 0)
+    jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
+  if (g_conf->mon_sync_requester_kill_at > 0)
+    jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
+
   jf.open_object_section("monmap");
   monmap->dump(&jf);
   jf.close_section();
@@ -2843,16 +2094,6 @@ void Monitor::handle_command(MMonCommand *m)
     rdata.append(ds);
     rs = "";
     r = 0;
-  } else if (prefix == "sync status") {
-      if (!access_r) {
-       r = -EACCES;
-       rs = "access denied";
-       goto out;
-      }
-      _sync_status(ds);
-      rdata.append(ds);
-      rs = "";
-      r = 0;
   } else if (prefix == "sync force") {
     string validate1, validate2;
     cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
@@ -2865,7 +2106,7 @@ void Monitor::handle_command(MMonCommand *m)
           "--i-know-what-i-am-doing' if you really do.";
       goto out;
     }
-    _sync_force(ds);
+    sync_force(ds);
     rs = ds.str();
     r = 0;
   } else if (prefix == "heap") {
@@ -4168,6 +3409,8 @@ void Monitor::tick()
     }
   }
 
+  sync_trim_providers();
+
   if (!maybe_wait_for_quorum.empty()) {
     finish_contexts(g_ceph_context, maybe_wait_for_quorum);
   }
index 004292b97780e16b07a55c04a33bfe6b32291807..56d634276a8b8242f1dfaf30e321599b31e9b1ee 100644 (file)
@@ -171,11 +171,8 @@ public:
     default: return "???";
     }
   }
-  const string get_state_name() const {
-    string sn(get_state_name(state));
-    string sync_name(get_sync_state_name());
-    sn.append(sync_name);
-    return sn;
+  const char *get_state_name() const {
+    return get_state_name(state);
   }
 
   bool is_shutdown() const { return state == STATE_SHUTDOWN; }
@@ -224,878 +221,143 @@ private:
    * @{
    */
   /**
-   * Obtain the synchronization target prefixes in set form.
-   *
-   * We consider a target prefix all those that are relevant when
-   * synchronizing two stores. That is, all those that hold paxos service's
-   * versions, as well as paxos versions, or any control keys such as the
-   * first or last committed version.
-   *
-   * Given the current design, this function should return the name of all and
-   * any available paxos service, plus the paxos name.
-   *
-   * @returns a set of strings referring to the prefixes being synchronized
+   * @} // provider state
    */
-  set<string> get_sync_targets_names();
-  /**
-   * Handle a sync-related message
-   *
-   * This function will call the appropriate handling functions for each
-   * operation type.
-   *
-   * @param m A sync-related message (i.e., of type MMonSync)
-   */
-  void handle_sync(MMonSync *m);
-  /**
-   * Handle a sync-related message of operation type OP_ABORT.
-   *
-   * @param m A sync-related message of type OP_ABORT
-   */
-  void handle_sync_abort(MMonSync *m);
-  /**
-   * Reset the monitor's sync-related data structures and state.
-   */
-  void reset_sync(bool abort = false);
+  struct SyncProvider {
+    entity_inst_t entity;  ///< who
+    uint64_t cookie;       ///< unique cookie for this sync attempt
+    utime_t timeout;       ///< when we give up and expire this attempt
+    version_t last_committed; ///< last paxos version on peer
+    pair<string,string> last_key; ///< last key sent to (or on) peer
+    bool full;             ///< full scan?
+    MonitorDBStore::Synchronizer synchronizer;   ///< iterator
 
-  /**
-   * @defgroup Synchronization_Roles
-   * @{
-   */
-  /**
-   * The monitor has no role in any on-going synchronization.
-   */
-  static const uint8_t SYNC_ROLE_NONE      = 0x0;
-  /**
-   * The monitor is the Leader in at least one synchronization.
-   */
-  static const uint8_t SYNC_ROLE_LEADER            = 0x1;
-  /**
-   * The monitor is the Provider in at least one synchronization.
-   */
-  static const uint8_t SYNC_ROLE_PROVIDER   = 0x2;
-  /**
-   * The monitor is a requester in the on-going synchronization.
-   */
-  static const uint8_t SYNC_ROLE_REQUESTER  = 0x4;
+    SyncProvider() : cookie(0), last_committed(0), full(false) {}
 
-  /**
-   * The monitor's current role in on-going synchronizations, if any.
-   *
-   * A monitor can either be part of no synchronization at all, in which case
-   * @p sync_role shall hold the value @p SYNC_ROLE_NONE, or it can be part of
-   * an on-going synchronization, in which case it may be playing either one or
-   * two roles at the same time:
-   *
-   *  - If the monitor is the sync requester (i.e., be the one synchronizing
-   *    against some other monitor), the @p sync_role field will hold only the
-   *    @p SYNC_ROLE_REQUESTER value.
-   *  - Otherwise, the monitor can be either a sync leader, or a sync provider,
-   *    or both, in which case @p sync_role will hold a binary OR of both
-   *    @p SYNC_ROLE_LEADER and @p SYNC_ROLE_PROVIDER.
-   */
-  uint8_t sync_role;
-  /**
-   * @}
-   */
-  /**
-   * @defgroup Leader-specific
-   * @{
-   */
-  /**
-   * Guarantee mutual exclusion access to the @p trim_timeouts map.
-   *
-   * We need this mutex specially when we have a monitor starting a sync with
-   * the leader and another one finishing or aborting an on-going sync, that
-   * happens to be the last on-going trim on the map. Given that we will
-   * enable the Paxos trim once we deplete the @p trim_timeouts map, we must
-   * then ensure that we either add the new sync start to the map before
-   * removing the one just finishing, or that we remove the finishing one
-   * first and enable the trim before we add the new one. If we fail to do
-   * this, nasty repercussions could follow.
-   */
-  Mutex trim_lock;
-  /**
-   * Map holding all on-going syncs' timeouts.
-   *
-   * An on-going sync leads to the Paxos trim to be suspended, and this map
-   * will associate entities to the timeouts to be triggered if the monitor
-   * being synchronized fails to check-in with the leader, letting him know
-   * that the sync is still in effect and that in no circumstances should the
-   * Paxos trim be enabled.
-   */
-  map<entity_inst_t, Context*> trim_timeouts;
-  map<entity_inst_t, uint8_t> trim_entities_states;
-  /**
-   * Map associating monitors to a sync state.
-   *
-   * This map is used by both the Leader and the Sync Provider, and has the
-   * sole objective of keeping track of the state each monitor's sync process
-   * is in.
-   */
-  map<entity_inst_t, uint8_t> sync_entities_states;
-  /**
-   * Timer that will enable the Paxos trim.
-   *
-   * This timer is set after the @p trim_timeouts map is depleted, and once
-   * fired it will enable the Paxos trim (if still disabled). By setting
-   * this timer, we avoid a scenario in which a monitor has just finished
-   * synchronizing, but because the Paxos trim had been disabled for a long,
-   * long time and a lot of trims were proposed in the timespan of the monitor
-   * finishing its sync and actually joining the cluster, the monitor happens
-   * to be out-of-sync yet again. Backing off enabling the Paxos trim will
-   * allow the other monitor to join the cluster before actually trimming.
-   */
-  Context *trim_enable_timer;
-
-  /**
-   * Callback class responsible for finishing a monitor's sync session on the
-   * leader's side, because the said monitor failed to acknowledge its
-   * liveliness in a timely manner, thus being assumed as failed.
-   */
-  struct C_TrimTimeout : public Context {
-    Monitor *mon;
-    entity_inst_t entity;
-
-    C_TrimTimeout(Monitor *m, entity_inst_t& entity)
-      : mon(m), entity(entity) { }
-    void finish(int r) {
-      mon->sync_finish(entity);
+    void reset_timeout(CephContext *cct, int grace) {
+      timeout = ceph_clock_now(cct);
+      timeout += grace;
     }
   };
 
-  /**
-   * Callback class responsible for enabling the Paxos trim if there are no
-   * more on-going syncs.
-   */
-  struct C_TrimEnable : public Context {
-    Monitor *mon;
-
-    C_TrimEnable(Monitor *m) : mon(m) { }
-    void finish(int r) {
-      Mutex::Locker(mon->trim_lock);
-      // even if we are no longer the leader, we should re-enable trim if
-      // we have disabled it in the past. It doesn't mean we are going to
-      // do anything about it, but if we happen to become the leader
-      // sometime down the future, we sure want to have the trim enabled.
-      if (mon->trim_timeouts.empty())
-       mon->paxos->trim_enable();
-      mon->trim_enable_timer = NULL;
-    }
-  };
-
-  void sync_obtain_latest_monmap(bufferlist &bl);
-  void sync_store_init();
-  void sync_store_cleanup();
-  bool is_sync_on_going();
+  map<uint64_t, SyncProvider> sync_providers;  ///< cookie -> SyncProvider for those syncing from us
+  uint64_t sync_provider_count;   ///< counter for issued cookies to keep them unique
 
   /**
-   * Send a heartbeat message to another entity.
-   *
-   * The sent message may be a heartbeat reply if the @p reply parameter is
-   * set to true.
-   *
-   * This function is used both by the leader (always with @p reply = true),
-   * and by the sync requester (always with @p reply = false).
-   *
-   * @param other The target monitor's entity instance.
-   * @param reply Whether the message to be sent should be a heartbeat reply.
-   */
-  void sync_send_heartbeat(entity_inst_t &other, bool reply = false);
-  /**
-   * Handle a Sync Start request.
-   *
-   * Monitors wanting to synchronize with the cluster will have to first ask
-   * the leader to do so. The only objective with this is so that the we can
-   * gurantee that the leader won't trim the paxos state.
-   *
-   * The leader may not be the only one receiving this request. A sync provider
-   * may also receive it when it is taken as the point of entry onto the
-   * cluster. In this scenario, the provider must then forward this request to
-   * the leader, if he know of one, or assume himself as the leader for this
-   * sync purpose (this may happen if there is no formed quorum).
-   *
-   * @param m Sync message with operation type MMonSync::OP_START
-   */
-  void handle_sync_start(MMonSync *m);
-  /**
-   * Handle a Heartbeat sent by a sync requester.
-   *
-   * We use heartbeats as a way to guarantee that both the leader and the sync
-   * requester are still alive. Receiving this message means that the requester
-   * if still working on getting his store synchronized.
-   *
-   * @param m Sync message with operation type MMonSync::OP_HEARTBEAT
-   */
-  void handle_sync_heartbeat(MMonSync *m);
-  /**
-   * Handle a Sync Finish.
-   *
-   * A MMonSync::OP_FINISH is the way the sync requester has to inform the
-   * leader that he finished synchronizing his store.
-   *
-   * @param m Sync message with operation type MMonSync::OP_FINISH
+   * @} // requester state
    */
-  void handle_sync_finish(MMonSync *m);
-  /**
-   * Finish a given monitor's sync process on the leader's side.
-   *
-   * This means cleaning up the state referring to the monitor whose sync has
-   * finished (may it have been finished successfully, by receiving a message
-   * with type MMonSync::OP_FINISH, or due to the assumption that the said
-   * monitor failed).
-   *
-   * If we happen to know of no other monitor synchronizing, we may then enable
-   * the paxos trim.
-   *
-   * @param entity Entity instance of the monitor whose sync we are considering
-   *              as finished.
-   * @param abort If true, we consider this sync has finished due to an abort.
-   */
-  void sync_finish(entity_inst_t &entity, bool abort = false);
-  /**
-   * Abort a given monitor's sync process on the leader's side.
-   *
-   * This function is a wrapper for Monitor::sync_finish().
-   *
-   * @param entity Entity instance of the monitor whose sync we are aborting.
-   */
-  void sync_finish_abort(entity_inst_t &entity) {
-    sync_finish(entity, true);
-  }
-  /**
-   * @} // Leader-specific
-   */
-  /**
-   * @defgroup Synchronization Provider-specific
-   * @{
-   */
-  /**
-   * Represents a participant in a synchronization, along with its state.
-   *
-   * This class is used to track down all the sync requesters we are providing
-   * to. In such scenario, it won't be uncommon to have the @p synchronizer
-   * field set with a connection to the MonitorDBStore, the @p timeout field
-   * containing a timeout event and @p entity containing the entity instance
-   * of the monitor we are here representing.
-   *
-   * The sync requester will also use this class to represent both the sync
-   * leader and the sync provider.
-   */
-  struct SyncEntityImpl {
-
-    /**
-     * Store synchronization related Sync state.
-     */
-    enum {
-      /**
-       * No state whatsoever. We are not providing any sync suppport.
-       */
-      STATE_NONE   = 0,
-      /**
-       * This entity's sync effort is currently focused on reading and sharing
-       * our whole store state with @p entity. This means all the entries in
-       * the key/value space.
-       */
-      STATE_WHOLE  = 1,
-      /**
-       * This entity's sync effor is currently focused on reading and sharing
-       * our Paxos state with @p entity. This means all the Paxos-related
-       * key/value entries, such as the Paxos versions.
-       */
-      STATE_PAXOS  = 2
-    };
-
-    /**
-     * The entity instace of the monitor whose sync effort we are representing.
-     */
-    entity_inst_t entity;
-    /**
-     * Our Monitor.
-     */
-    Monitor *mon;
-    /**
-     * The Paxos version we are focusing on.
-     *
-     * @note This is not used at the moment. We are still assessing whether we
-     *      need it.
-     */
-    version_t version;
-    /**
-     * Timeout event. Its type and purpose varies depending on the situation.
-     */
-    Context *timeout;
-    /**
-     * Last key received during a sync effort.
-     *
-     * This field is mainly used by the sync requester to track the last
-     * received key, in case he needs to switch providers due to failure. The
-     * sync provider will also use this field whenever the requester specifies
-     * a last received key when requesting the provider to start sending his
-     * store chunks.
-     */
-    pair<string,string> last_received_key;
-    /**
-     * Hold the Store Synchronization related Sync State.
-     */
-    int sync_state;
-    /**
-     * The MonitorDBStore's chunk iterator instance we are currently using
-     * to obtain the store's chunks and pack them to the sync requester.
-     */
-    MonitorDBStore::Synchronizer synchronizer;
-    MonitorDBStore::Synchronizer paxos_synchronizer;
-    /* Should only be used for debugging purposes */
-    /**
-     * crc of the contents read from the store.
-     *
-     * @note may not always be available, as it is used only on specific
-     *      points in time during the sync process.
-     * @note depends on '--mon-sync-debug' being set.
-     */
-    __u32 crc;
-    /**
-     * Should be true if @p crc has been set.
-     */
-    bool crc_available;
-    /**
-     * Total synchronization attempts.
-     */
-    int attempts;
-
-    SyncEntityImpl(entity_inst_t &entity, Monitor *mon)
-      : entity(entity),
-       mon(mon),
-       version(0),
-       timeout(NULL),
-       sync_state(STATE_NONE),
-       crc(0),
-       crc_available(false),
-       attempts(0)
-    { }
-
-    /**
-     * Obtain current Sync State name.
-     *
-     * @returns Name of current sync state.
-     */
-    string get_state() {
-      switch (sync_state) {
-       case STATE_NONE: return "none";
-       case STATE_WHOLE: return "whole";
-       case STATE_PAXOS: return "paxos";
-       default: return "unknown";
-      }
-    }
-    /**
-     * Obtain the paxos version at which this sync started.
-     *
-     * @returns Paxos version at which this sync started
-     */
-    version_t get_version() {
-      return version;
-    }
-    /**
-     * Set a timeout event for this sync entity.
-     *
-     * @param event Timeout class to be called after @p fire_after seconds.
-     * @param fire_after Number of seconds until we fire the @p event event.
-     */
-    void set_timeout(Context *event, double fire_after) {
-      cancel_timeout();
-      timeout = event;
-      mon->timer.add_event_after(fire_after, timeout);
-    }
-    /**
-     * Cancel the currently set timeout, if any.
-     */
-    void cancel_timeout() {
-      if (timeout)
-       mon->timer.cancel_event(timeout);
-      timeout = NULL;
-    }
-    /**
-     * Initiate the required fields for obtaining chunks out of the
-     * MonitorDBStore.
-     *
-     * This function will initiate @p synchronizer with a chunk iterator whose
-     * scope is all the keys/values that belong to one of the sync targets
-     * (i.e., paxos services or paxos).
-     *
-     * Calling @p Monitor::sync_update() will be essential during the efforts
-     * of providing a correct store state to the requester, since we will need
-     * to eventually update the iterator in order to start packing the Paxos
-     * versions.
-     */
-    void sync_init() {
-      sync_state = STATE_WHOLE;
-      set<string> sync_targets = mon->get_sync_targets_names();
-
-      string prefix("paxos");
-      paxos_synchronizer = mon->store->get_synchronizer(prefix);
-      version = mon->paxos->get_version();
-      generic_dout(10) << __func__ << " version " << version << dendl;
-
-      synchronizer = mon->store->get_synchronizer(last_received_key,
-                                                 sync_targets);
-      sync_update();
-      assert(synchronizer->has_next_chunk());
-    }
-    /**
-     * Update the @p synchronizer chunk iterator, if needed.
-     *
-     * Whenever we reach the end of the iterator during @p STATE_WHOLE, we
-     * must update the @p synchronizer to an iterator focused on reading only
-     * Paxos versions. This is an essential part of the sync store approach,
-     * and it will guarantee that we end up with a consistent store.
-     */
-    void sync_update() {
-      assert(sync_state != STATE_NONE);
-      assert(synchronizer.use_count() != 0);
-
-      if (!synchronizer->has_next_chunk()) {
-       crc_set(synchronizer->crc());
-       if (sync_state == STATE_WHOLE) {
-          assert(paxos_synchronizer.use_count() != 0);
-         sync_state = STATE_PAXOS;
-          synchronizer = paxos_synchronizer;
-       }
-      }
-    }
+  entity_inst_t sync_provider;   ///< who we are syncing from
+  uint64_t sync_cookie;          ///< 0 if we are starting, non-zero otherwise
+  bool sync_full;                ///< true if we are a full sync, false for recent catch-up
+  version_t sync_start_version;  ///< last_committed at sync start
+  Context *sync_timeout_event;   ///< timeout event
 
-    /* For debug purposes only */
-    /**
-     * Check if we have a CRC available.
-     *
-     * @returns true if crc is available; false otherwise.
-     */
-    bool has_crc() {
-      return (g_conf->mon_sync_debug && crc_available);
-    }
-    /**
-     * Set @p crc to @p to_set
-     *
-     * @param to_set a crc value to set.
-     */
-    void crc_set(__u32 to_set) {
-      crc = to_set;
-      crc_available = true;
-    }
-    /**
-     * Get the current CRC value from @p crc
-     *
-     * @returns the currenct CRC value from @p crc
-     */
-    __u32 crc_get() {
-      return crc;
-    }
-    /**
-     * Clear the current CRC.
-     */
-    void crc_clear() {
-      crc_available = false;
-    }
-  };
-  typedef std::tr1::shared_ptr< SyncEntityImpl > SyncEntity;
-  /**
-   * Get a Monitor::SyncEntity instance.
-   *
-   * @param entity The monitor's entity instance that we want to associate
-   *              with this Monitor::SyncEntity.
-   * @param mon The Monitor.
-   *
-   * @returns A Monitor::SyncEntity
-   */
-  SyncEntity get_sync_entity(entity_inst_t &entity, Monitor *mon) {
-    return std::tr1::shared_ptr<SyncEntityImpl>(
-       new SyncEntityImpl(entity, mon));
-  }
-  /**
-   * Callback class responsible for dealing with the consequences of a sync
-   * process timing out.
-   */
   struct C_SyncTimeout : public Context {
     Monitor *mon;
-    entity_inst_t entity;
-
-    C_SyncTimeout(Monitor *mon, entity_inst_t &entity)
-      : mon(mon), entity(entity)
-    { }
-
+    C_SyncTimeout(Monitor *m) : mon(m) {}
     void finish(int r) {
-      mon->sync_timeout(entity);
+      mon->sync_timeout();
     }
   };
+
   /**
-   * Map containing all the monitor entities to whom we are acting as sync
-   * providers.
-   */
-  map<entity_inst_t, SyncEntity> sync_entities;
-  /**
-   * RNG used for the sync (currently only used to pick random monitors)
-   */
-  SimpleRNG sync_rng;
-  /**
-   * Obtain random monitor from the monmap.
-   *
-   * @param other Any monitor other than the one with rank @p other
-   * @returns The picked monitor's name.
-   */
-  int _pick_random_mon(int other = -1);
-  int _pick_random_quorum_mon(int other = -1);
-  /**
-   * Deal with the consequences of @p entity's sync timing out.
-   *
-   * @note Both the sync provider and the sync requester make use of this
-   *      function, since both use the @p Monitor::C_SyncTimeout callback.
-   *
-   * Being the sync provider, whenever a Monitor::C_SyncTimeout is triggered,
-   * we only have to clean up the sync requester's state we are maintaining.
-   *
-   * Being the sync requester, we will have to choose a new sync provider, and
-   * resume our sync from where it was left.
-   *
-   * @param entity Entity instance of the monitor whose sync has timed out.
-   */
-  void sync_timeout(entity_inst_t &entity);
-  /**
-   * Cleanup the state we, the provider, are keeping during @p entity's sync.
-   *
-   * @param entity Entity instance of the monitor whose sync state we are
-   *              cleaning up.
-   */
-  void sync_provider_cleanup(entity_inst_t &entity);
-  /**
-   * Handle a Sync Start Chunks request from a sync requester.
-   *
-   * This request will create the necessary state our the provider's end, and
-   * the provider will then be able to send chunks of his own store to the
-   * requester.
-   *
-   * @param m Sync message with operation type MMonSync::OP_START_CHUNKS
-   */
-  void handle_sync_start_chunks(MMonSync *m);
-  /**
-   * Handle a requester's reply to the last chunk we sent him.
-   *
-   * We will only send a new chunk to the sync requester once he has acked the
-   * reception of the last chunk we sent them.
-   *
-   * That's also how we will make sure that, on their end, they became aware
-   * that there are no more chunks to send (since we shall tag a message with
-   * MMonSync::FLAG_LAST when we are sending them the last chunk of all),
-   * allowing us to clean up the requester's state.
-   *
-   * @param m Sync message with operation type MMonSync::OP_CHUNK_REPLY
-   */
-  void handle_sync_chunk_reply(MMonSync *m);
-  /**
-   * Send a chunk to the sync entity represented by @p sync.
+   * Obtain the synchronization target prefixes in set form.
    *
-   * This function will send the next chunk available on the synchronizer. If
-   * it happens to be the last chunk, then the message shall be marked as
-   * such using MMonSync::FLAG_LAST.
+   * We consider a target prefix all those that are relevant when
+   * synchronizing two stores. That is, all those that hold paxos service's
+   * versions, as well as paxos versions, or any control keys such as the
+   * first or last committed version.
    *
-   * @param sync A Monitor::SyncEntity representing a sync requester monitor.
-   */
-  void sync_send_chunks(SyncEntity sync);
-  /**
-   * @} // Synchronization Provider-specific
-   */
-  /**
-   * @defgroup Synchronization Requester-specific
-   * @{
-   */
-  /**
-   * The state in which we (the sync leader, provider or requester) are in
-   * regard to our sync process (if we are the requester) or any entity that
-   * we may be leading or providing to.
-   */
-  enum {
-    /**
-     * We are not part of any synchronization effort, or it has not began yet.
-     */
-    SYNC_STATE_NONE   = 0,
-    /**
-     * We have started our role in the synchronization.
-     *
-     * This state may have multiple meanings, depending on which entity is
-     * employing it and within which context.
-     *
-     * For instance, the leader will consider a sync requester to enter
-     * SYNC_STATE_START whenever it receives a MMonSync::OP_START from the
-     * said requester. On the other hand, the provider will consider that the
-     * requester enters this state after receiving a MMonSync::OP_START_CHUNKS.
-     * The sync requester will enter this state as soon as it begins its sync
-     * efforts.
-     */
-    SYNC_STATE_START  = 1,
-    /**
-     * We are synchronizing chunks.
-     *
-     * This state is not used by the sync leader; only the sync requester and
-     * the sync provider will.
-     */
-    SYNC_STATE_CHUNKS = 2,
-    /**
-     * We are stopping the sync effort.
-     */
-    SYNC_STATE_STOP   = 3
-  };
-  /**
-   * The current sync state.
+   * Given the current design, this function should return the name of all and
+   * any available paxos service, plus the paxos name.
    *
-   * This field is only used by the sync requester, being the only one that
-   * will take this state as part of its global state. The sync leader and the
-   * sync provider will only associate sync states to other entities (i.e., to
-   * sync requesters), and those shall be kept in the @p sync_entities_states
-   * map.
-   */
-  int sync_state;
-  /**
-   * Callback class responsible for dealing with the consequences of the sync
-   * requester not receiving a MMonSync::OP_START_REPLY in a timely manner.
+   * @returns a set of strings referring to the prefixes being synchronized
    */
-  struct C_SyncStartTimeout : public Context {
-    Monitor *mon;
-
-    C_SyncStartTimeout(Monitor *mon)
-      : mon(mon)
-    { }
+  set<string> get_sync_targets_names();
 
-    void finish(int r) {
-      mon->sync_start_reply_timeout();
-    }
-  };
   /**
-   * Callback class responsible for retrying a Sync Start after a given
-   * backoff period, whenever the Sync Leader flags a MMonSync::OP_START_REPLY
-   * with the MMonSync::FLAG_RETRY flag.
+   * Reset the monitor's sync-related data structures and state, both
+   * for the requester- and provider-side.
    */
-  struct C_SyncStartRetry : public Context {
-    Monitor *mon;
-    entity_inst_t entity;
-
-    C_SyncStartRetry(Monitor *mon, entity_inst_t &entity)
-      : mon(mon), entity(entity)
-    { }
+  void sync_reset();
 
-    void finish(int r) {
-      mon->bootstrap();
-    }
-  };
   /**
-   * We use heartbeats to check if both the Leader and the Synchronization
-   * Requester are both still alive, so we can determine if we should continue
-   * with the synchronization process, granted that trim is disabled.
+   * Caled when a sync attempt times out (requester-side)
    */
-  struct C_HeartbeatTimeout : public Context {
-    Monitor *mon;
+  void sync_timeout();
 
-    C_HeartbeatTimeout(Monitor *mon)
-      : mon(mon)
-    { }
-
-    void finish(int r) {
-      mon->sync_requester_abort();
-    }
-  };
   /**
-   * Callback class responsible for sending a heartbeat message to the sync
-   * leader. We use this callback to keep an assynchronous heartbeat with
-   * the sync leader at predefined intervals.
+   * Get the latest monmap for backup purposes during sync
    */
-  struct C_HeartbeatInterval : public Context {
-    Monitor *mon;
-    entity_inst_t entity;
-
-    C_HeartbeatInterval(Monitor *mon, entity_inst_t &entity)
-      : mon(mon), entity(entity)
-    { }
+  void sync_obtain_latest_monmap(bufferlist &bl);
 
-    void finish(int r) {
-      mon->sync_leader->set_timeout(new C_HeartbeatTimeout(mon),
-                                   g_conf->mon_sync_heartbeat_timeout);
-      mon->sync_send_heartbeat(entity);
-    }
-  };
   /**
-   * Callback class responsible for dealing with the consequences of never
-   * receiving a reply to a MMonSync::OP_FINISH sent to the sync leader.
+   * check if a sync is in progress
    */
-  struct C_SyncFinishReplyTimeout : public Context {
-    Monitor *mon;
-
-    C_SyncFinishReplyTimeout(Monitor *mon)
-      : mon(mon)
-    { }
+  bool is_sync_on_going();
 
-    void finish(int r) {
-      mon->sync_finish_reply_timeout();
-    }
-  };
-  /**
-   * The entity we, the sync requester, consider to be our sync leader. If
-   * there is a formed quorum, the @p sync_leader should represent the actual
-   * cluster Leader; otherwise, it can be any monitor and will likely be the
-   * same as @p sync_provider.
-   */
-  SyncEntity sync_leader;
-  /**
-   * The entity we, the sync requester, are synchronizing against. This entity
-   * will be our source of store chunks, and we will ultimately obtain a store
-   * state equal (or very similar, maybe off by a couple of versions) as their
-   * own.
-   */
-  SyncEntity sync_provider;
-  /**
-   * Clean up the Sync Requester's state (both in-memory and in-store).
-   */
-  void sync_requester_cleanup();
-  /**
-   * Abort the current sync effort.
-   *
-   * This will be translated into a MMonSync::OP_ABORT sent to the sync leader
-   * and to the sync provider, and ultimately it will also involve calling
-   * @p Monitor::sync_requester_cleanup() to clean up our current sync state.
-   */
-  void sync_requester_abort();
-  /**
-   * Deal with a timeout while waiting for a MMonSync::OP_FINISH_REPLY.
-   *
-   * This will be assumed as a leader failure, and having been exposed to the
-   * side-effects of a new Leader being elected, we have no other choice but
-   * to abort our sync process and start fresh.
-   */
-  void sync_finish_reply_timeout();
-  /**
-   * Deal with a timeout while waiting for a MMonSync::OP_START_REPLY.
-   *
-   * This will be assumed as a leader failure. Since we didn't get to do
-   * much work (as we haven't even started our sync), we will simply bootstrap
-   * and start off fresh with a new sync leader.
-   */
-  void sync_start_reply_timeout();
   /**
    * Start the synchronization efforts.
    *
    * This function should be called whenever we find the need to synchronize
    * our store state with the remaining cluster.
    *
-   * Starting the sync process means that we will have to request the cluster
-   * Leader (if there is a formed quorum) to stop trimming the Paxos state and
-   * allow us to start synchronizing with the sync provider we picked.
    *
    * @param entity An entity instance referring to the sync provider we picked.
+   * @param whether to sycn the full store, or just pull recent paxos commits
    */
-  void sync_start(entity_inst_t &entity);
-  /**
-   * Request the provider to start sending the chunks of his store, in order
-   * for us to obtain a consistent store state similar to the one shared by
-   * the cluster.
-   *
-   * @param provider The SyncEntity representing the Sync Provider.
-   */
-  void sync_start_chunks(SyncEntity provider);
+  void sync_start(entity_inst_t &entity, bool full);
+
   /**
-   * Handle a MMonSync::OP_START_REPLY sent by the Sync Leader.
-   *
-   * Reception of this message may be twofold: if it was marked with the
-   * MMonSync::FLAG_RETRY flag, we must backoff for a while and retry starting
-   * the sync at a later time; otherwise, we have the green-light to request
-   * the Sync Provider to start sharing his chunks with us.
-   *
-   * @param m Sync message with operation type MMonSync::OP_START_REPLY
+   * force a sync on next mon restart
    */
-  void handle_sync_start_reply(MMonSync *m);
+  void sync_force(ostream& ss);
+
   /**
-   * Handle a Heartbeat reply sent by the Sync Leader.
+   * reset the sync timeout
    *
-   * We use heartbeats to keep the Sync Leader aware that we are keeping our
-   * sync efforts alive. We also use them to make sure our Sync Leader is
-   * still alive. If the Sync Leader fails, we will have to abort our on-going
-   * sync, or we could incurr in an inconsistent store state due to a trim on
-   * the Paxos state of the monitor provinding us with his store chunks.
-   *
-   * @param m Sync message with operation type MMonSync::OP_HEARTBEAT_REPLY
+   * This is used on the client to restart if things aren't progressing
    */
-  void handle_sync_heartbeat_reply(MMonSync *m);
+  void sync_reset_timeout();
+
   /**
-   * Handle a chunk sent by the Sync Provider.
+   * trim stale sync provider state
    *
-   * We will receive the Sync Provider's store in chunks. These are encoded
-   * in bufferlists containing a transaction that will be directly applied
-   * onto our MonitorDBStore.
-   *
-   * Whenever we receive such a message, we must reply to the Sync Provider,
-   * as a way of acknowledging the reception of its last chunk. If the message
-   * is tagged with a MMonSync::FLAG_LAST, we can then consider we have
-   * received all the chunks the Sync Provider had to offer, and finish our
-   * sync efforts with the Sync Leader.
-   *
-   * @param m Sync message with operation type MMonSync::OP_CHUNK
+   * If someone is syncing from us and hasn't talked to us recently, expire their state.
    */
-  void handle_sync_chunk(MMonSync *m);
+  void sync_trim_providers();
+
   /**
-   * Handle a reply sent by the Sync Leader to a MMonSync::OP_FINISH.
-   *
-   * As soon as we receive this message, we know we finally have a store state
-   * consistent with the remaining cluster (give or take a couple of versions).
-   * We may then bootstrap and attempt to join the other monitors in the
-   * cluster.
+   * Complete a sync
    *
-   * @param m Sync message with operation type MMonSync::OP_FINISH_REPLY
-   */
-  void handle_sync_finish_reply(MMonSync *m);
-  /**
-   * Stop our synchronization effort by sending a MMonSync::OP_FINISH to the
-   * Sync Leader.
+   * Finish up a sync after we've gotten all of the chunks.
    *
-   * Once we receive the last chunk from the Sync Provider, we are in
-   * conditions of officially finishing our sync efforts. With that purpose in
-   * mind, we must then send a MMonSync::OP_FINISH to the Leader, letting him
-   * know that we no longer require the Paxos state to be preserved.
+   * @param last_committed final last_committed value from provider
    */
-  void sync_stop();
+  void sync_finish(version_t last_committed);
+
   /**
-   * @} // Synchronization Requester-specific
+   * request the next chunk from the provider
    */
-  const string get_sync_state_name(int s) const {
-    switch (s) {
-    case SYNC_STATE_NONE: return "none";
-    case SYNC_STATE_START: return "start";
-    case SYNC_STATE_CHUNKS: return "chunks";
-    case SYNC_STATE_STOP: return "stop";
-    }
-    return "???";
-  }
+  void sync_get_next_chunk();
+
   /**
-   * Obtain a string describing the current Sync State.
+   * handle sync message
    *
-   * @returns A string describing the current Sync State, if any, or an empty
-   *         string if no sync (or sync effort we know of) is in progress.
+   * @param m Sync message with operation type MMonSync::OP_START_CHUNKS
    */
-  const string get_sync_state_name() const {
-    string sn;
-
-    if (sync_role == SYNC_ROLE_NONE)
-      return "";
-
-    sn.append(" sync(");
-
-    if (sync_role & SYNC_ROLE_LEADER)
-      sn.append(" leader");
-    if (sync_role & SYNC_ROLE_PROVIDER)
-      sn.append(" provider");
-    if (sync_role & SYNC_ROLE_REQUESTER)
-      sn.append(" requester");
+  void handle_sync(MMonSync *m);
 
-    sn.append(" state ");
-    sn.append(get_sync_state_name(sync_state));
+  void _sync_reply_no_cookie(MMonSync *m);
 
-    sn.append(" )");
+  void handle_sync_get_cookie(MMonSync *m);
+  void handle_sync_get_chunk(MMonSync *m);
+  void handle_sync_finish(MMonSync *m);
 
-    return sn;
-  }
+  void handle_sync_cookie(MMonSync *m);
+  void handle_sync_forward(MMonSync *m);
+  void handle_sync_chunk(MMonSync *m);
+  void handle_sync_no_cookie(MMonSync *m);
 
   /**
    * @} // Synchronization
@@ -1288,8 +550,6 @@ public:
   bool _allowed_command(MonSession *s, map<std::string, cmd_vartype>& cmd);
   void _mon_status(ostream& ss);
   void _quorum_status(ostream& ss);
-  void _sync_status(ostream& ss);
-  void _sync_force(ostream& ss);
   void _add_bootstrap_peer_hint(string cmd, string args, ostream& ss);
   void handle_command(class MMonCommand *m);
   void handle_route(MRoute *m);
index 48a1e609316ab222d8c98d8a115a61efc4aa2e64..cb860b15a9fc5e36be2593ff42c96fae971786dd 100644 (file)
@@ -251,7 +251,8 @@ class MonitorDBStore
     bool add_chunk_entry(Transaction &tx,
                         string &prefix,
                         string &key,
-                        bufferlist &value) {
+                        bufferlist &value,
+                        uint64_t max) {
       Transaction tmp;
       bufferlist tmp_bl;
       tmp.put(prefix, key, value);
@@ -262,7 +263,7 @@ class MonitorDBStore
 
       size_t len = tx_bl.length() + tmp_bl.length();
 
-      if (!tx.empty() && (len > g_conf->mon_sync_max_payload_size)) {
+      if (!tx.empty() && (len > max)) {
        return false;
       }
 
@@ -293,13 +294,7 @@ class MonitorDBStore
     virtual bool has_next_chunk() {
       return !done && _is_valid();
     }
-    virtual void get_chunk_tx(Transaction &tx) = 0;
-    virtual void get_chunk(bufferlist& bl) {
-      Transaction tx;
-      get_chunk_tx(tx);
-      if (!tx.empty())
-       tx.encode(bl);
-    }
+    virtual void get_chunk_tx(Transaction &tx, uint64_t max) = 0;
     virtual pair<string,string> get_next_key() = 0;
   };
   typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer;
@@ -327,7 +322,7 @@ class MonitorDBStore
      *                     differ from the one passed on to the function)
      * @param last_key[out] Last key in the chunk
      */
-    virtual void get_chunk_tx(Transaction &tx) {
+    virtual void get_chunk_tx(Transaction &tx, uint64_t max) {
       assert(done == false);
       assert(iter->valid() == true);
 
@@ -336,7 +331,7 @@ class MonitorDBStore
        string key(iter->raw_key().second);
        if (sync_prefixes.count(prefix)) {
          bufferlist value = iter->value();
-         if (!add_chunk_entry(tx, prefix, key, value))
+         if (!add_chunk_entry(tx, prefix, key, value, max))
            return;
        }
        iter->next();
index da00e400113c22fafdab51b0f396a9c215d19b0e..ea777c4912eea8493a12f6f10233f7fc6170c7bd 100644 (file)
@@ -43,36 +43,18 @@ MonitorDBStore *Paxos::get_store()
   return mon->store;
 }
 
-
-void Paxos::apply_version(MonitorDBStore::Transaction &tx, version_t v)
-{
-  bufferlist bl;
-  int err = get_store()->get(get_name(), v, bl);
-  assert(err == 0);
-  assert(bl.length());
-  decode_append_transaction(tx, bl);
-}
-
-void Paxos::reapply_all_versions()
+void Paxos::read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t first, version_t last)
 {
-  version_t first = get_store()->get(get_name(), "first_committed");
-  version_t last = get_store()->get(get_name(), "last_committed");
   dout(10) << __func__ << " first " << first << " last " << last << dendl;
-
-  MonitorDBStore::Transaction tx;
   for (version_t v = first; v <= last; ++v) {
     dout(30) << __func__ << " apply version " << v << dendl;
-    apply_version(tx, v);
+    bufferlist bl;
+    int err = get_store()->get(get_name(), v, bl);
+    assert(err == 0);
+    assert(bl.length());
+    decode_append_transaction(*tx, bl);
   }
   dout(15) << __func__ << " total versions " << (last-first) << dendl;
-
-  dout(30) << __func__ << " tx dump:\n";
-  JSONFormatter f(true);
-  tx.dump(&f);
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  get_store()->apply_transaction(tx);
 }
 
 void Paxos::init()
index 9be4713a28203d1f172410c42ba91d08d6eb9cbf..c8a37e2aa65f6ed0d544eea9930365c62f0685ff 100644 (file)
@@ -1022,8 +1022,7 @@ public:
 
   void dispatch(PaxosServiceMessage *m);
 
-  void reapply_all_versions();
-  void apply_version(MonitorDBStore::Transaction &tx, version_t v);
+  void read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t from, version_t last);
 
   void init();
   /**