#include "messages/MGenericMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
+#include "messages/MMonSync.h"
#include "messages/MMonProbe.h"
#include "messages/MMonJoin.h"
#include "messages/MMonPaxos.h"
leader(0),
quorum_features(0),
+ // trim & store sync
+ sync_role(SYNC_ROLE_NONE),
+ trim_lock("Monitor::trim_lock"),
+ trim_enable_timer(NULL),
+ sync_rng(getpid()),
+ sync_state(SYNC_STATE_NONE),
+ sync_leader(),
+ sync_provider(),
+
timecheck_round(0),
timecheck_event(NULL),
_mon_status(ss);
else if (command == "quorum_status")
_quorum_status(ss);
- else if (command.find("add_bootstrap_peer_hint") == 0)
+ else if (command == "sync_status")
+ _sync_status(ss);
+ else if (command == "sync_force") {
+ if (args != "--yes-i-really-mean-it") {
+ ss << "are you SURE? this will mean the monitor store will be erased "
+ "the next time the monitor is restarted. pass "
+ "'--yes-i-really-mean-it' if you really do.";
+ return;
+ }
+ _sync_force(ss);
+ } else if (command.find("add_bootstrap_peer_hint") == 0)
_add_bootstrap_peer_hint(command, args, ss);
else
assert(0 == "bad AdminSocket command binding");
}
}
- paxos->init();
- // init paxos
- for (int i = 0; i < PAXOS_NUM; ++i) {
- if (paxos->is_consistent()) {
- paxos_service[i]->update_from_paxos();
- } // else we don't do anything; handle_probe_reply will detect it's slurping
+ {
+ // We have a potentially inconsistent store state in hands. Get rid of it
+ // and start fresh.
+ bool clear_store = false;
+ if (store->get("mon_sync", "in_sync") > 0) {
+ dout(1) << __func__ << " clean up potentially inconsistent store state"
+ << dendl;
+ clear_store = true;
+ }
+
+ if (store->get("mon_sync", "force_sync") > 0) {
+ dout(1) << __func__ << " force sync by clearing store state" << dendl;
+ clear_store = true;
+ }
+
+ if (clear_store) {
+ set<string> sync_prefixes = get_sync_targets_names();
+ sync_prefixes.insert("mon_sync");
+ store->clear(sync_prefixes);
+ }
}
+ init_paxos();
+
// we need to bootstrap authentication keys so we can form an
// initial quorum.
if (authmon()->get_version() == 0) {
r = admin_socket->register_command("quorum_status", admin_hook,
"show current quorum status");
assert(r == 0);
+ r = admin_socket->register_command("sync_status", admin_hook,
+ "show current synchronization status");
+ assert(r == 0);
r = admin_socket->register_command("add_bootstrap_peer_hint", admin_hook,
"add peer address as potential bootstrap peer for cluster bringup");
assert(r == 0);
return 0;
}
+void Monitor::init_paxos()
+{
+ dout(10) << __func__ << dendl;
+ paxos->init();
+ // init paxos
+ for (int i = 0; i < PAXOS_NUM; ++i) {
+ if (paxos->is_consistent()) {
+ paxos_service[i]->update_from_paxos();
+ }
+ }
+}
+
void Monitor::register_cluster_logger()
{
if (!cluster_logger_registered) {
AdminSocket* admin_socket = cct->get_admin_socket();
admin_socket->unregister_command("mon_status");
admin_socket->unregister_command("quorum_status");
+ admin_socket->unregister_command("sync_status");
delete admin_hook;
admin_hook = NULL;
}
messenger->mark_down_all();
}
- // reset
- state = STATE_PROBING;
+ reset_sync();
+
+ // reset
+ state = STATE_PROBING;
+
+ reset();
+
+ // singleton monitor?
+ if (monmap->size() == 1 && rank == 0) {
+ win_standalone_election();
+ return;
+ }
+
+ reset_probe_timeout();
+
+ // i'm outside the quorum
+ if (monmap->contains(name))
+ outside_quorum.insert(name);
+
+ // probe monitors
+ dout(10) << "probing other monitors" << dendl;
+ for (unsigned i = 0; i < monmap->size(); i++) {
+ if ((int)i != rank)
+ messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
+ monmap->get_inst(i));
+ }
+ for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
+ p != extra_probe_peers.end();
+ ++p) {
+ if (*p != messenger->get_myaddr()) {
+ entity_inst_t i;
+ i.name = entity_name_t::MON(-1);
+ i.addr = *p;
+ messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
+ }
+ }
+}
+
+void Monitor::_add_bootstrap_peer_hint(string cmd, string args, ostream& ss)
+{
+ dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" << args << "'" << dendl;
+
+ entity_addr_t addr;
+ const char *end = 0;
+ if (!addr.parse(args.c_str(), &end)) {
+ ss << "failed to parse addr '" << args << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
+ return;
+ }
+
+ if (is_leader() || is_peon()) {
+ ss << "mon already active; ignoring bootstrap hint";
+ return;
+ }
+
+ if (addr.get_port() == 0)
+ addr.set_port(CEPH_MON_PORT);
+
+ extra_probe_peers.insert(addr);
+ ss << "adding peer " << addr << " to list: " << extra_probe_peers;
+}
+
+// called by bootstrap(), or on leader|peon -> electing
+void Monitor::reset()
+{
+ dout(10) << "reset" << dendl;
+
+ timecheck_finish();
+
+ leader_since = utime_t();
+ if (!quorum.empty()) {
+ exited_quorum = ceph_clock_now(g_ceph_context);
+ }
+ quorum.clear();
+ outside_quorum.clear();
+
+ paxos->restart();
+
+ for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
+ (*p)->restart();
+}
+
+set<string> Monitor::get_sync_targets_names() {
+ set<string> targets;
+ targets.insert(paxos->get_name());
+ for (int i = 0; i < PAXOS_NUM; ++i)
+ targets.insert(paxos_service[i]->get_service_name());
+
+ return targets;
+}
+
+/**
+ * Reset any lingering sync/trim informations we might have.
+ */
+void Monitor::reset_sync(bool abort)
+{
+ dout(10) << __func__ << dendl;
+ // clear everything trim/sync related
+ {
+ map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
+ for (; iter != trim_timeouts.end(); ++iter) {
+ if (!iter->second)
+ continue;
+
+ timer.cancel_event(iter->second);
+ if (abort) {
+ MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
+ entity_inst_t other = iter->first;
+ messenger->send_message(msg, other);
+ }
+ }
+ trim_timeouts.clear();
+ }
+ {
+ map<entity_inst_t,SyncEntity>::iterator iter = sync_entities.begin();
+ for (; iter != sync_entities.end(); ++iter) {
+ (*iter).second->cancel_timeout();
+ }
+ sync_entities.clear();
+ }
+
+ sync_entities_states.clear();
+ trim_entities_states.clear();
+
+ sync_leader.reset();
+ sync_provider.reset();
+
+ sync_state = SYNC_STATE_NONE;
+ sync_role = SYNC_ROLE_NONE;
+}
+
+// leader
+
+void Monitor::sync_send_heartbeat(entity_inst_t &other, bool reply)
+{
+ dout(10) << __func__ << " " << other << " reply(" << reply << ")" << dendl;
+ uint32_t op = (reply ? MMonSync::OP_HEARTBEAT_REPLY : MMonSync::OP_HEARTBEAT);
+ MMonSync *msg = new MMonSync(op);
+ messenger->send_message(msg, other);
+}
+
+void Monitor::handle_sync_start(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ /* If we are not the leader, then some monitor picked us as the point of
+ * entry to the quorum during its synchronization process. Therefore, we
+ * have an obligation of forwarding this message to leader, so the sender
+ * can start synchronizing.
+ */
+ if (!is_leader() && quorum.size() > 0) {
+ assert(!(sync_role & SYNC_ROLE_REQUESTER));
+ assert(!(sync_role & SYNC_ROLE_LEADER));
+ assert(!is_synchronizing());
+
+ entity_inst_t leader = monmap->get_inst(get_leader());
+ MMonSync *msg = new MMonSync(m);
+ // keep forwarding the message up the chain if it has already been
+ // forwarded.
+ if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
+ msg->set_reply_to(m->get_source_inst());
+ }
+ dout(10) << __func__ << " forward " << *m
+ << " to leader at " << leader << dendl;
+ assert(g_conf->mon_sync_provider_kill_at != 1);
+ messenger->send_message(msg, leader);
+ assert(g_conf->mon_sync_provider_kill_at != 2);
+ m->put();
+ return;
+ }
+
+ // If we are synchronizing, then it means that we know someone who has a
+ // higher version than the one we have; and if someone attempted to sync
+ // from us, then that must mean they have a lower version than us.
+ // Therefore, they must be much more interested in synchronizing from the
+ // one we are trying to synchronize from than they are from us.
+ // Moreover, if we are already synchronizing under the REQUESTER role, then
+ // we must know someone who is in the quorum, either because we were lucky
+ // enough to contact them in the first place or we managed to contact
+ // someone who knew who they were. Therefore, just forward this request to
+ // our sync leader.
+ if (is_synchronizing()) {
+ assert(!(sync_role & SYNC_ROLE_LEADER));
+ assert(!(sync_role & SYNC_ROLE_PROVIDER));
+ assert(quorum.size() == 0);
+ assert(sync_leader.get() != NULL);
+
+ dout(10) << __func__ << " forward " << *m
+ << " to our sync leader at "
+ << sync_leader->entity << dendl;
+
+ MMonSync *msg = new MMonSync(m);
+ // keep forwarding the message up the chain if it has already been
+ // forwarded.
+ if (!(m->flags & MMonSync::FLAG_REPLY_TO)) {
+ msg->set_reply_to(m->get_source_inst());
+ }
+ messenger->send_message(msg, sync_leader->entity);
+ m->put();
+ return;
+ }
+
+ // At this point we may or may not be the leader. If we are not the leader,
+ // it means that we are not in the quorum, but someone would still very much
+ // like to synchronize with us. In certain circumstances, letting them
+ // synchronize with us is by far our only option -- for instance, when we
+ // need them to form a quorum and they have started fresh or are severely
+ // out of date --, but it won't hurt to let them sync from us anyway: if
+ // they chose us, then they must have noticed that we had a higher version
+ // than they do, so it makes sense to let them try their luck and join the
+ // party.
+
+ Mutex::Locker l(trim_lock);
+ entity_inst_t other =
+ (m->flags & MMonSync::FLAG_REPLY_TO ? m->reply_to : m->get_source_inst());
+
+ assert(g_conf->mon_sync_leader_kill_at != 1);
+
+ if (trim_timeouts.count(other) > 0) {
+ dout(1) << __func__ << " sync session already in progress for " << other
+ << dendl;
+
+ if (trim_entities_states[other] != SYNC_STATE_NONE) {
+ dout(1) << __func__ << " ignore stray message" << dendl;
+ m->put();
+ return;
+ }
+
+ dout(1) << __func__<< " destroying current state and creating new"
+ << dendl;
+
+ if (trim_timeouts[other])
+ timer.cancel_event(trim_timeouts[other]);
+ trim_timeouts.erase(other);
+ trim_entities_states.erase(other);
+ }
+
+ MMonSync *msg = new MMonSync(MMonSync::OP_START_REPLY);
+
+ if (((quorum.size() > 0) && paxos->should_trim())
+ || (trim_enable_timer != NULL)) {
+ msg->flags |= MMonSync::FLAG_RETRY;
+ } else {
+ trim_timeouts.insert(make_pair(other, new C_TrimTimeout(this, other)));
+ timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
+
+ trim_entities_states[other] = SYNC_STATE_START;
+ sync_role |= SYNC_ROLE_LEADER;
+
+ paxos->trim_disable();
+
+ // Is the one that contacted us in the quorum?
+ if (quorum.count(m->get_source().num())) {
+ // Then it must have been a forwarded message from someone else
+ assert(m->flags & MMonSync::FLAG_REPLY_TO);
+ // And must not be synchronizing. What sense would that make, eh?
+ assert(trim_timeouts.count(m->get_source_inst()) == 0);
+ // Set the provider as the one that contacted us. He's in the
+ // quorum, so he's up to the job (i.e., he's not synchronizing)
+ msg->set_reply_to(m->get_source_inst());
+ dout(10) << __func__ << " set provider to " << msg->reply_to << dendl;
+ } else if (quorum.size() > 0) {
+ // grab someone from the quorum and assign them as the sync provider
+ int n = _pick_random_quorum_mon(rank);
+ if (n >= 0) {
+ msg->set_reply_to(monmap->get_inst(n));
+ dout(10) << __func__ << " set quorum-based provider to "
+ << msg->reply_to << dendl;
+ } else {
+ assert(0 == "We shouldn't get here!");
+ }
+ } else {
+ // There is no quorum, so we must either be in a quorum-less cluster,
+ // or we must be mid-election. Either way, tell them it is okay to
+ // sync from us by not setting the reply-to field.
+ assert(!(msg->flags & MMonSync::FLAG_REPLY_TO));
+ }
+ }
+ messenger->send_message(msg, other);
+ m->put();
+
+ assert(g_conf->mon_sync_leader_kill_at != 2);
+}
+
+void Monitor::handle_sync_heartbeat(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ entity_inst_t other = m->get_source_inst();
+ if (!(sync_role & SYNC_ROLE_LEADER)
+ || !trim_entities_states.count(other)
+ || (trim_entities_states[other] != SYNC_STATE_START)) {
+ // stray message; ignore.
+ dout(1) << __func__ << " ignored stray message " << *m << dendl;
+ m->put();
+ return;
+ }
+
+ if (!is_leader() && (quorum.size() > 0)
+ && (trim_timeouts.count(other) > 0)) {
+ // we must have been the leader before, but we lost leadership to
+ // someone else.
+ sync_finish_abort(other);
+ m->put();
+ return;
+ }
+
+ assert(trim_timeouts.count(other) > 0);
+
+ if (trim_timeouts[other])
+ timer.cancel_event(trim_timeouts[other]);
+ trim_timeouts[other] = new C_TrimTimeout(this, other);
+ timer.add_event_after(g_conf->mon_sync_trim_timeout, trim_timeouts[other]);
+
+ assert(g_conf->mon_sync_leader_kill_at != 3);
+ sync_send_heartbeat(other, true);
+ assert(g_conf->mon_sync_leader_kill_at != 4);
+
+ m->put();
+}
+
+void Monitor::sync_finish(entity_inst_t &entity, bool abort)
+{
+ dout(10) << __func__ << " entity(" << entity << ")" << dendl;
+
+ Mutex::Locker l(trim_lock);
+
+ if (!trim_timeouts.count(entity)) {
+ dout(1) << __func__ << " we know of no sync effort from "
+ << entity << " -- ignore it." << dendl;
+ return;
+ }
+
+ if (trim_timeouts[entity] != NULL)
+ timer.cancel_event(trim_timeouts[entity]);
+
+ trim_timeouts.erase(entity);
+ trim_entities_states.erase(entity);
+
+ if (abort) {
+ MMonSync *m = new MMonSync(MMonSync::OP_ABORT);
+ assert(g_conf->mon_sync_leader_kill_at != 5);
+ messenger->send_message(m, entity);
+ assert(g_conf->mon_sync_leader_kill_at != 6);
+ }
+
+ if (trim_timeouts.size() > 0)
+ return;
+
+ dout(10) << __func__ << " no longer a sync leader" << dendl;
+ sync_role &= ~SYNC_ROLE_LEADER;
+
+ // we may have been the leader, but by now we may no longer be.
+ // this can happen when the we sync'ed a monitor that became the
+ // leader, or that same monitor simply came back to life and got
+ // elected as the new leader.
+ if (is_leader() && paxos->is_trim_disabled()) {
+ trim_enable_timer = new C_TrimEnable(this);
+ timer.add_event_after(30.0, trim_enable_timer);
+ }
+}
+
+void Monitor::handle_sync_finish(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ entity_inst_t other = m->get_source_inst();
+
+ if (!trim_timeouts.count(other) || !trim_entities_states.count(other)
+ || (trim_entities_states[other] != SYNC_STATE_START)) {
+ dout(1) << __func__ << " ignored stray message from " << other << dendl;
+ if (!trim_timeouts.count(other))
+ dout(1) << __func__ << " not on trim_timeouts" << dendl;
+ if (!trim_entities_states.count(other))
+ dout(1) << __func__ << " not on trim_entities_states" << dendl;
+ else if (trim_entities_states[other] != SYNC_STATE_START)
+ dout(1) << __func__ << " state " << trim_entities_states[other] << dendl;
+ m->put();
+ return;
+ }
+
+ // We may no longer the leader. In such case, we should just inform the
+ // other monitor that he should abort his sync. However, it appears that
+ // his sync has finished, so there is no use in scraping the whole thing
+ // now. Therefore, just go along and acknowledge.
+ if (!is_leader()) {
+ dout(10) << __func__ << " We are no longer the leader; reply nonetheless"
+ << dendl;
+ }
+
+ MMonSync *msg = new MMonSync(MMonSync::OP_FINISH_REPLY);
+ assert(g_conf->mon_sync_leader_kill_at != 7);
+ messenger->send_message(msg, other);
+ assert(g_conf->mon_sync_leader_kill_at != 8);
+
+ sync_finish(other);
+ m->put();
+}
+
+// end of leader
+
+// synchronization provider
+
+string Monitor::_pick_random_mon(int other)
+{
+ assert(monmap->size() > 0);
+ if (monmap->size() == 1)
+ return monmap->get_name(0);
+
+ int max = monmap->size();
+ int n = sync_rng() % max;
+ if (other >= 0 && n >= other)
+ n++;
+ return monmap->get_name(n);
+}
+
+int Monitor::_pick_random_quorum_mon(int other)
+{
+ assert(monmap->size() > 0);
+ if (quorum.size() == 0)
+ return -1;
+ set<int>::iterator p = quorum.begin();
+ for (int n = sync_rng() % quorum.size(); p != quorum.end() && n; ++p, --n);
+ if (other >= 0 && p != quorum.end() && *p == other)
+ ++p;
+
+ return (p == quorum.end() ? *(quorum.rbegin()) : *p);
+}
+
+void Monitor::sync_timeout(entity_inst_t &entity)
+{
+ if (state == STATE_SYNCHRONIZING) {
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state == SYNC_STATE_CHUNKS);
+
+ // we are a sync requester; our provider just timed out, so find another
+ // monitor to synchronize with.
+ dout(1) << __func__ << " " << sync_provider->entity << dendl;
+
+ sync_provider->attempts++;
+ if ((sync_provider->attempts > g_conf->mon_sync_max_retries)
+ || (monmap->size() == 2)) {
+ // We either tried too many times to sync, or there's just us and the
+ // monitor we were attempting to sync with.
+ // Therefore, just abort the whole sync and start off fresh whenever he
+ // (or somebody else) comes back.
+ sync_requester_abort();
+ return;
+ }
+
+ int i = 0;
+ string entity_name = monmap->get_name(entity.addr);
+ string debug_mon = g_conf->mon_sync_debug_provider;
+ string debug_fallback = g_conf->mon_sync_debug_provider_fallback;
+ while ((i++) < 2*monmap->size()) {
+ // we are trying to pick a random monitor, but we cannot do this forever.
+ // in case something goes awfully wrong, just stop doing it after a
+ // couple of attempts and try again later.
+ string new_mon = _pick_random_mon();
+
+ if (!debug_fallback.empty()) {
+ if (entity_name != debug_fallback)
+ new_mon = debug_fallback;
+ else if (!debug_mon.empty() && (entity_name != debug_mon))
+ new_mon = debug_mon;
+ }
+
+ if ((new_mon != name) && (new_mon != entity_name)) {
+ sync_provider->entity = monmap->get_inst(new_mon);
+ sync_state = SYNC_STATE_START;
+ sync_start_chunks(sync_provider);
+ return;
+ }
+ }
+
+ assert(0 == "Unable to find a new monitor to connect to. Not cool.");
+ } else if (sync_role & SYNC_ROLE_PROVIDER) {
+ dout(10) << __func__ << " cleanup " << entity << dendl;
+ sync_provider_cleanup(entity);
+ return;
+ } else
+ assert(0 == "We should never reach this");
+}
+
+void Monitor::sync_provider_cleanup(entity_inst_t &entity)
+{
+ dout(10) << __func__ << " " << entity << dendl;
+ if (sync_entities.count(entity) > 0) {
+ sync_entities[entity]->cancel_timeout();
+ sync_entities.erase(entity);
+ sync_entities_states.erase(entity);
+ }
+
+ if (sync_entities.size() == 0) {
+ dout(1) << __func__ << " no longer a sync provider" << dendl;
+ sync_role &= ~SYNC_ROLE_PROVIDER;
+ }
+}
+
+void Monitor::handle_sync_start_chunks(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ assert(!(sync_role & SYNC_ROLE_REQUESTER));
+
+ entity_inst_t other = m->get_source_inst();
+
+ // if we have a sync going on for this entity, just drop the message. If it
+ // was a stray message, we did the right thing. If it wasn't, then it means
+ // that we still have an old state of this entity, and that the said entity
+ // failed in the meantime and is now up again; therefore, just let the
+ // timeout timers fulfill their purpose and deal with state cleanup when
+ // they are triggered. Until then, no Sir, we won't accept your messages.
+ if (sync_entities.count(other) > 0) {
+ dout(1) << __func__ << " sync session already in progress for " << other
+ << " -- assumed as stray message." << dendl;
+ m->put();
+ return;
+ }
+
+ SyncEntity sync = get_sync_entity(other, this);
+ sync->version = paxos->get_version();
+
+ if (!m->last_key.first.empty() && !m->last_key.second.empty()) {
+ sync->last_received_key = m->last_key;
+ dout(10) << __func__ << " set last received key to ("
+ << sync->last_received_key.first << ","
+ << sync->last_received_key.second << ")" << dendl;
+ }
+
+ sync->sync_init();
+
+ sync_entities.insert(make_pair(other, sync));
+ sync_entities_states[other] = SYNC_STATE_START;
+ sync_role |= SYNC_ROLE_PROVIDER;
+
+ sync_send_chunks(sync);
+ m->put();
+}
+
+void Monitor::handle_sync_chunk_reply(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ entity_inst_t other = m->get_source_inst();
+
+ if (!(sync_role & SYNC_ROLE_PROVIDER)
+ || !sync_entities.count(other)
+ || (sync_entities_states[other] != SYNC_STATE_START)) {
+ dout(1) << __func__ << " ignored stray message from " << other << dendl;
+ m->put();
+ return;
+ }
+
+ if (m->flags & MMonSync::FLAG_LAST) {
+ // they acked the last chunk. Clean up.
+ sync_provider_cleanup(other);
+ m->put();
+ return;
+ }
+
+ sync_send_chunks(sync_entities[other]);
+ m->put();
+}
+
+void Monitor::sync_send_chunks(SyncEntity sync)
+{
+ dout(10) << __func__ << " entity(" << sync->entity << ")" << dendl;
+
+ sync->cancel_timeout();
+
+ assert(sync->synchronizer.use_count() > 0);
+ assert(sync->synchronizer->has_next_chunk());
+
+ MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK);
+
+ sync->synchronizer->get_chunk(msg->chunk_bl);
+ msg->last_key = sync->synchronizer->get_last_key();
+ dout(10) << __func__ << " last key ("
+ << msg->last_key.first << ","
+ << msg->last_key.second << ")" << dendl;
+
+ sync->sync_update();
+
+ if (sync->has_crc()) {
+ msg->flags |= MMonSync::FLAG_CRC;
+ msg->crc = sync->crc_get();
+ sync->crc_clear();
+ }
+
+ if (!sync->synchronizer->has_next_chunk()) {
+ msg->flags |= MMonSync::FLAG_LAST;
+ msg->version = sync->get_version();
+ sync->synchronizer.reset();
+ }
+
+ sync->set_timeout(new C_SyncTimeout(this, sync->entity),
+ g_conf->mon_sync_timeout);
+ assert(g_conf->mon_sync_provider_kill_at != 3);
+ messenger->send_message(msg, sync->entity);
+ assert(g_conf->mon_sync_provider_kill_at != 4);
+
+ // kill the monitor as soon as we move into synchronizing the paxos versions.
+ // This is intended as debug.
+ if (sync->sync_state == SyncEntityImpl::STATE_PAXOS)
+ assert(g_conf->mon_sync_provider_kill_at != 5);
+
+
+}
+// end of synchronization provider
+
+// start of synchronization requester
+
+void Monitor::sync_requester_abort()
+{
+ dout(10) << __func__;
+ assert(state == STATE_SYNCHRONIZING);
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+
+ if (sync_leader.get() != NULL) {
+ *_dout << " " << sync_leader->entity;
+ sync_leader->cancel_timeout();
+ sync_leader.reset();
+ }
+
+ if (sync_provider.get() != NULL) {
+ *_dout << " " << sync_provider->entity;
+ sync_provider->cancel_timeout();
+
+ MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
+ messenger->send_message(msg, sync_provider->entity);
+
+ sync_provider.reset();
+ }
+ *_dout << " clearing potentially inconsistent store" << dendl;
+
+ // Given that we are explicitely aborting the whole sync process, we should
+ // play it safe and clear the store.
+ set<string> targets = get_sync_targets_names();
+ targets.insert("mon_sync");
+ store->clear(targets);
+
+ dout(1) << __func__ << " no longer a sync requester" << dendl;
+ sync_role = SYNC_ROLE_NONE;
+ sync_state = SYNC_STATE_NONE;
+
+ state = 0;
+
+ bootstrap();
+}
+
+/**
+ * Start Sync process
+ *
+ * Create SyncEntity instances for the leader and the provider;
+ * Send OP_START message to the leader;
+ * Set trim timeout on the leader
+ *
+ * @param other Synchronization provider to-be.
+ */
+void Monitor::sync_start(entity_inst_t &other)
+{
+ cancel_probe_timeout();
+
+ dout(10) << __func__ << " entity( " << other << " )" << dendl;
+ if ((state == STATE_SYNCHRONIZING) && (sync_role == SYNC_ROLE_REQUESTER)) {
+ dout(1) << __func__ << " already synchronizing; drop it" << dendl;
+ return;
+ }
+
+ // Looks like we are the acting leader for someone. Better force them to
+ // abort their endeavours. After all, if they are trying to sync from us,
+ // it means that we must have a higher paxos version than the one they
+ // have; however, if we are trying to sync as well, it must mean that
+ // someone has a higher version than the one we have. Everybody wins if
+ // we force them to cancel their sync and try again.
+ if (sync_role & SYNC_ROLE_LEADER) {
+ dout(10) << __func__ << " we are acting as a leader to someone; "
+ << "destroy their dreams" << dendl;
+
+ assert(trim_timeouts.size() > 0);
+ reset_sync();
+ }
+
+ assert(sync_role == SYNC_ROLE_NONE);
+ assert(sync_state == SYNC_STATE_NONE);
+
+ state = STATE_SYNCHRONIZING;
+ sync_role = SYNC_ROLE_REQUESTER;
+ sync_state = SYNC_STATE_START;
+
+ // clear the underlying store, since we are starting a whole
+ // sync process from the bare beginning.
+ set<string> targets = get_sync_targets_names();
+ targets.insert("mon_sync");
+ store->clear(targets);
+
+ MonitorDBStore::Transaction t;
+ t.put("mon_sync", "in_sync", 1);
+ store->apply_transaction(t);
+
+ // assume 'other' as the leader. We will update the leader once we receive
+ // a reply to the sync start.
+ entity_inst_t leader = other;
+ entity_inst_t provider = other;
+
+ if (!g_conf->mon_sync_debug_leader.empty()) {
+ leader = monmap->get_inst(g_conf->mon_sync_debug_leader);
+ dout(10) << __func__ << " assuming " << leader
+ << " as the leader for debug" << dendl;
+ }
+
+ if (!g_conf->mon_sync_debug_provider.empty()) {
+ provider = monmap->get_inst(g_conf->mon_sync_debug_provider);
+ dout(10) << __func__ << " assuming " << provider
+ << " as the provider for debug" << dendl;
+ }
+
+ sync_leader = get_sync_entity(leader, this);
+ sync_provider = get_sync_entity(provider, this);
+
+ // this message may bounce through 'other' (if 'other' is not the leader)
+ // in order to reach the leader. Therefore, set a higher timeout to allow
+ // breathing room for the reply message to reach us.
+ sync_leader->set_timeout(new C_SyncStartTimeout(this),
+ g_conf->mon_sync_trim_timeout*2);
+
+ MMonSync *m = new MMonSync(MMonSync::OP_START);
+ messenger->send_message(m, other);
+ assert(g_conf->mon_sync_requester_kill_at != 1);
+}
+
+void Monitor::sync_start_chunks(SyncEntity provider)
+{
+ dout(10) << __func__ << " provider(" << provider->entity << ")" << dendl;
+
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state == SYNC_STATE_START);
+
+ sync_state = SYNC_STATE_CHUNKS;
+
+ provider->set_timeout(new C_SyncTimeout(this, provider->entity),
+ g_conf->mon_sync_timeout);
+ MMonSync *msg = new MMonSync(MMonSync::OP_START_CHUNKS);
+ pair<string,string> last_key = provider->last_received_key;
+ if (!last_key.first.empty() && !last_key.second.empty())
+ msg->last_key = last_key;
+
+ assert(g_conf->mon_sync_requester_kill_at != 4);
+ messenger->send_message(msg, provider->entity);
+ assert(g_conf->mon_sync_requester_kill_at != 5);
+}
+
+void Monitor::sync_start_reply_timeout()
+{
+ dout(10) << __func__ << dendl;
+
+ assert(state == STATE_SYNCHRONIZING);
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state == SYNC_STATE_START);
+
+ // Restart the sync attempt. It's not as if we were going to lose a vast
+ // amount of work, and if we take into account that we are timing out while
+ // waiting for a reply from the Leader, it sure seems like the right path
+ // to take.
+ sync_requester_abort();
+}
+
+void Monitor::handle_sync_start_reply(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ entity_inst_t other = m->get_source_inst();
+
+ if ((sync_role != SYNC_ROLE_REQUESTER)
+ || (sync_state != SYNC_STATE_START)) {
+ // If the leader has sent this message before we failed, there is no point
+ // in replying to it, as he has no idea that we actually received it. On
+ // the other hand, if he received one of our stray messages (because it was
+ // delivered once he got back up after failing) and replied accordingly,
+ // there is a chance that he did stopped trimming on our behalf. However,
+ // we have no way to know it, and we really don't want to mess with his
+ // state if that is not the case. Therefore, just drop it and let the
+ // timeouts figure it out. Eventually.
+ dout(1) << __func__ << " stray message -- drop it." << dendl;
+ goto out;
+ }
+
+ assert(state == STATE_SYNCHRONIZING);
+ assert(sync_leader.get() != NULL);
+ assert(sync_provider.get() != NULL);
+
+ // We now know for sure who the leader is.
+ sync_leader->entity = other;
+ sync_leader->cancel_timeout();
+
+ if (m->flags & MMonSync::FLAG_RETRY) {
+ dout(10) << __func__ << " retrying sync at a later time" << dendl;
+ sync_role = SYNC_ROLE_NONE;
+ sync_state = SYNC_STATE_NONE;
+ sync_leader->set_timeout(new C_SyncStartRetry(this, sync_leader->entity),
+ g_conf->mon_sync_backoff_timeout);
+ goto out;
+ }
+
+ if (m->flags & MMonSync::FLAG_REPLY_TO) {
+ dout(10) << __func__ << " leader told us to use " << m->reply_to
+ << " as sync provider" << dendl;
+ sync_provider->entity = m->reply_to;
+ } else {
+ dout(10) << __func__ << " synchronizing from leader at " << other << dendl;
+ sync_provider->entity = other;
+ }
+
+ sync_leader->set_timeout(new C_HeartbeatTimeout(this),
+ g_conf->mon_sync_heartbeat_timeout);
+
+ assert(g_conf->mon_sync_requester_kill_at != 2);
+ sync_send_heartbeat(sync_leader->entity);
+ assert(g_conf->mon_sync_requester_kill_at != 3);
+
+ sync_start_chunks(sync_provider);
+out:
+ m->put();
+}
+
+void Monitor::handle_sync_heartbeat_reply(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ entity_inst_t other = m->get_source_inst();
+ if ((sync_role != SYNC_ROLE_REQUESTER)
+ || (sync_state == SYNC_STATE_NONE)
+ || (sync_leader.get() == NULL)
+ || (other != sync_leader->entity)) {
+ dout(1) << __func__ << " stray message -- drop it." << dendl;
+ m->put();
+ return;
+ }
+
+ assert(state == STATE_SYNCHRONIZING);
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state != SYNC_STATE_NONE);
+
+ assert(sync_leader.get() != NULL);
+ assert(sync_leader->entity == other);
+
+ sync_leader->cancel_timeout();
+ sync_leader->set_timeout(new C_HeartbeatInterval(this, sync_leader->entity),
+ g_conf->mon_sync_heartbeat_interval);
+ m->put();
+}
+
+void Monitor::handle_sync_chunk(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ entity_inst_t other = m->get_source_inst();
+
+ if ((sync_role != SYNC_ROLE_REQUESTER)
+ || (sync_state != SYNC_STATE_CHUNKS)
+ || (sync_provider.get() == NULL)
+ || (other != sync_provider->entity)) {
+ dout(1) << __func__ << " stray message -- drop it." << dendl;
+ m->put();
+ return;
+ }
+
+ assert(state == STATE_SYNCHRONIZING);
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state == SYNC_STATE_CHUNKS);
+
+ assert(sync_leader.get() != NULL);
+
+ assert(sync_provider.get() != NULL);
+ assert(other == sync_provider->entity);
+
+ sync_provider->cancel_timeout();
+
+ MonitorDBStore::Transaction tx;
+ tx.append_from_encoded(m->chunk_bl);
+
+ sync_provider->set_timeout(new C_SyncTimeout(this, sync_provider->entity),
+ g_conf->mon_sync_timeout);
+ sync_provider->last_received_key = m->last_key;
- reset();
+ MMonSync *msg = new MMonSync(MMonSync::OP_CHUNK_REPLY);
- // singleton monitor?
- if (monmap->size() == 1 && rank == 0) {
- win_standalone_election();
- return;
+ bool stop = false;
+ if (m->flags & MMonSync::FLAG_LAST) {
+ msg->flags |= MMonSync::FLAG_LAST;
+ assert(m->version > 0);
+ tx.put(paxos->get_name(), "last_committed", m->version);
+ stop = true;
}
+ assert(g_conf->mon_sync_requester_kill_at != 8);
+ messenger->send_message(msg, sync_provider->entity);
- reset_probe_timeout();
+ store->apply_transaction(tx);
- // i'm outside the quorum
- if (monmap->contains(name))
- outside_quorum.insert(name);
+ if (g_conf->mon_sync_debug && (m->flags & MMonSync::FLAG_CRC)) {
+ dout(10) << __func__ << " checking CRC" << dendl;
+ MonitorDBStore::Synchronizer sync;
+ if (m->flags & MMonSync::FLAG_LAST) {
+ dout(10) << __func__ << " checking CRC only for Paxos" << dendl;
+ string paxos_name("paxos");
+ sync = store->get_synchronizer(paxos_name);
+ } else {
+ dout(10) << __func__ << " checking CRC for all prefixes" << dendl;
+ set<string> prefixes = get_sync_targets_names();
+ pair<string,string> empty_key;
+ sync = store->get_synchronizer(empty_key, prefixes);
+ }
- // probe monitors
- dout(10) << "probing other monitors" << dendl;
- for (unsigned i = 0; i < monmap->size(); i++) {
- if ((int)i != rank)
- messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
- monmap->get_inst(i));
- }
- for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
- p != extra_probe_peers.end();
- ++p) {
- if (*p != messenger->get_myaddr()) {
- entity_inst_t i;
- i.name = entity_name_t::MON(-1);
- i.addr = *p;
- messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
+ while (sync->has_next_chunk()) {
+ bufferlist bl;
+ sync->get_chunk(bl);
}
+ __u32 got_crc = sync->crc();
+ dout(10) << __func__ << " expected crc " << m->crc
+ << " got " << got_crc << dendl;
+
+ assert(m->crc == got_crc);
+ dout(10) << __func__ << " CRC matches" << dendl;
}
+
+ m->put();
+ if (stop)
+ sync_stop();
}
-void Monitor::_add_bootstrap_peer_hint(string cmd, string args, ostream& ss)
+void Monitor::sync_stop()
{
- dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" << args << "'" << dendl;
+ dout(10) << __func__ << dendl;
- entity_addr_t addr;
- const char *end = 0;
- if (!addr.parse(args.c_str(), &end)) {
- ss << "failed to parse addr '" << args << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
- return;
- }
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state == SYNC_STATE_CHUNKS);
- if (is_leader() || is_peon()) {
- ss << "mon already active; ignoring bootstrap hint";
+ sync_state = SYNC_STATE_STOP;
+
+ sync_leader->cancel_timeout();
+ sync_provider->cancel_timeout();
+ sync_provider.reset();
+
+ entity_inst_t leader = sync_leader->entity;
+
+ sync_leader->set_timeout(new C_SyncFinishReplyTimeout(this),
+ g_conf->mon_sync_timeout);
+
+ MMonSync *msg = new MMonSync(MMonSync::OP_FINISH);
+ assert(g_conf->mon_sync_requester_kill_at != 9);
+ messenger->send_message(msg, leader);
+ assert(g_conf->mon_sync_requester_kill_at != 10);
+}
+
+void Monitor::sync_finish_reply_timeout()
+{
+ dout(10) << __func__ << dendl;
+ assert(state == STATE_SYNCHRONIZING);
+ assert(sync_leader.get() != NULL);
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state == SYNC_STATE_STOP);
+
+ sync_requester_abort();
+}
+
+void Monitor::handle_sync_finish_reply(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ entity_inst_t other = m->get_source_inst();
+
+ if ((sync_role != SYNC_ROLE_REQUESTER)
+ || (sync_state != SYNC_STATE_STOP)
+ || (sync_leader.get() == NULL)
+ || (sync_leader->entity != other)) {
+ dout(1) << __func__ << " stray message -- drop it." << dendl;
+ m->put();
return;
}
- if (addr.get_port() == 0)
- addr.set_port(CEPH_MON_PORT);
+ assert(sync_role == SYNC_ROLE_REQUESTER);
+ assert(sync_state == SYNC_STATE_STOP);
- extra_probe_peers.insert(addr);
- ss << "adding peer " << addr << " to list: " << extra_probe_peers;
+ assert(sync_leader.get() != NULL);
+ assert(sync_leader->entity == other);
+
+ sync_role = SYNC_ROLE_NONE;
+ sync_state = SYNC_STATE_NONE;
+
+ sync_leader->cancel_timeout();
+ sync_leader.reset();
+
+ paxos->reapply_all_versions();
+
+ MonitorDBStore::Transaction t;
+ t.erase("mon_sync", "in_sync");
+ store->apply_transaction(t);
+
+ init_paxos();
+
+ assert(g_conf->mon_sync_requester_kill_at != 11);
+
+ m->put();
+
+ bootstrap();
}
-// called by bootstrap(), or on leader|peon -> electing
-void Monitor::reset()
+void Monitor::handle_sync_abort(MMonSync *m)
{
- dout(10) << "reset" << dendl;
+ dout(10) << __func__ << " " << *m << dendl;
+ /* This function's responsabilities are manifold, and they depend on
+ * who we (the monitor) are and what is our role in the sync.
+ *
+ * If we are the sync requester (i.e., if we are synchronizing), it
+ * means that we *must* abort the current sync and bootstrap. This may
+ * be required if there was a leader change and we are talking to the
+ * wrong leader, which makes continuing with the current sync way too
+ * risky, given that a Paxos trim may be underway and we certainly incur
+ * in the chance of ending up with an inconsistent store state.
+ *
+ * If we are the sync provider, it means that the requester wants to
+ * abort his sync, either because he lost connectivity to the leader
+ * (i.e., his heartbeat timeout was triggered) or he became aware of a
+ * leader change.
+ *
+ * As a leader, we should never receive such a message though, unless we
+ * have just won an election, in which case we should have been a sync
+ * provider before. In such a case, we should behave as if we were a sync
+ * provider and clean up the requester's state.
+ */
+ entity_inst_t other = m->get_source_inst();
- timecheck_finish();
+ if ((sync_role == SYNC_ROLE_REQUESTER)
+ && (sync_leader.get() != NULL)
+ && (sync_leader->entity == other)) {
- leader_since = utime_t();
- if (!quorum.empty()) {
- exited_quorum = ceph_clock_now(g_ceph_context);
- }
- quorum.clear();
- outside_quorum.clear();
+ sync_requester_abort();
+ } else if ((sync_role & SYNC_ROLE_PROVIDER)
+ && (sync_entities.count(other) > 0)
+ && (sync_entities_states[other] == SYNC_STATE_START)) {
- paxos->restart();
+ sync_provider_cleanup(other);
+ } else {
+ dout(1) << __func__ << " stray message -- drop it." << dendl;
+ }
+ m->put();
+}
- for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
- (*p)->restart();
+void Monitor::handle_sync(MMonSync *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ switch (m->op) {
+ case MMonSync::OP_START:
+ handle_sync_start(m);
+ break;
+ case MMonSync::OP_START_REPLY:
+ handle_sync_start_reply(m);
+ break;
+ case MMonSync::OP_HEARTBEAT:
+ handle_sync_heartbeat(m);
+ break;
+ case MMonSync::OP_HEARTBEAT_REPLY:
+ handle_sync_heartbeat_reply(m);
+ break;
+ case MMonSync::OP_FINISH:
+ handle_sync_finish(m);
+ break;
+ case MMonSync::OP_START_CHUNKS:
+ handle_sync_start_chunks(m);
+ break;
+ case MMonSync::OP_CHUNK:
+ handle_sync_chunk(m);
+ break;
+ case MMonSync::OP_CHUNK_REPLY:
+ handle_sync_chunk_reply(m);
+ break;
+ case MMonSync::OP_FINISH_REPLY:
+ handle_sync_finish_reply(m);
+ break;
+ case MMonSync::OP_ABORT:
+ handle_sync_abort(m);
+ break;
+ default:
+ dout(0) << __func__ << " unknown op " << m->op << dendl;
+ m->put();
+ assert(0 == "unknown op");
+ break;
+ }
}
void Monitor::cancel_probe_timeout()
{
cancel_probe_timeout();
probe_timeout_event = new C_ProbeTimeout(this);
- double t = is_probing() ? g_conf->mon_probe_timeout : g_conf->mon_slurp_timeout;
+ double t = g_conf->mon_probe_timeout;
timer.add_event_after(t, probe_timeout_event);
dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
}
void Monitor::probe_timeout(int r)
{
dout(4) << "probe_timeout " << probe_timeout_event << dendl;
- assert(is_probing() || is_slurping());
+ assert(is_probing() || is_synchronizing());
assert(probe_timeout_event);
probe_timeout_event = NULL;
bootstrap();
handle_probe_reply(m);
break;
- case MMonProbe::OP_SLURP:
- handle_probe_slurp(m);
- break;
-
- case MMonProbe::OP_SLURP_LATEST:
- handle_probe_slurp_latest(m);
- break;
-
- case MMonProbe::OP_DATA:
- handle_probe_data(m);
- break;
-
default:
m->put();
}
r->name = name;
r->quorum = quorum;
monmap->encode(r->monmap_bl, m->get_connection()->get_features());
- r->paxos_versions[paxos->get_name()] = paxos->get_version();
+ r->paxos_first_version = paxos->get_first_committed();
+ r->paxos_last_version = paxos->get_version();
messenger->send_message(r, m->get_connection());
// did we discover a peer here?
}
}
+ assert(paxos != NULL);
+
+ if (is_synchronizing()) {
+ dout(10) << " we are currently synchronizing, so that will continue."
+ << dendl;
+ m->put();
+ return;
+ }
+
+ entity_inst_t other = m->get_source_inst();
// is there an existing quorum?
if (m->quorum.size()) {
dout(10) << " existing quorum " << m->quorum << dendl;
- // do i need to catch up?
- bool ok = true;
- for (map<string,version_t>::iterator p = m->paxos_versions.begin();
- p != m->paxos_versions.end();
- ++p) {
- if (!paxos) {
- dout(0) << " peer has paxos machine " << p->first << " but i don't... weird" << dendl;
- continue; // weird!
- }
- if (paxos->is_slurping()) {
- dout(10) << " My paxos machine " << p->first
- << " is currently slurping, so that will continue. Peer has v "
- << p->second << dendl;
- ok = false;
- } else if (paxos->get_version() + g_conf->paxos_max_join_drift < p->second) {
- dout(10) << " peer paxos machine " << p->first << " v " << p->second
- << " vs my v " << paxos->get_version()
- << " (too far ahead)"
- << dendl;
- ok = false;
- } else {
- dout(10) << " peer paxos machine " << p->first << " v " << p->second
- << " vs my v " << paxos->get_version()
- << " (ok)"
- << dendl;
- }
+ if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+ dout(10) << " peer paxos version " << m->paxos_last_version
+ << " vs my version " << paxos->get_version()
+ << " (too far ahead)"
+ << dendl;
+ sync_start(other);
+ m->put();
+ return;
}
- if (ok) {
- if (monmap->contains(name) &&
- !monmap->get_addr(name).is_blank_ip()) {
- // i'm part of the cluster; just initiate a new election
- start_election();
- } else {
- dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
- messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
- monmap->get_inst(*m->quorum.begin()));
- }
+ dout(10) << " peer paxos version " << m->paxos_last_version
+ << " vs my version " << paxos->get_version()
+ << " (ok)"
+ << dendl;
+
+ if (monmap->contains(name) &&
+ !monmap->get_addr(name).is_blank_ip()) {
+ // i'm part of the cluster; just initiate a new election
+ start_election();
} else {
- slurp_source = m->get_source_inst();
- slurp_versions = m->paxos_versions;
- slurp();
+ dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
+ messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
+ monmap->get_inst(*m->quorum.begin()));
}
} else {
- // not part of a quorum
- if (monmap->contains(m->name))
+ if (monmap->contains(m->name)) {
+ dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
outside_quorum.insert(m->name);
- else
+ } else {
dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
+ m->put();
+ return;
+ }
+
+ if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+ dout(10) << " peer paxos version " << m->paxos_last_version
+ << " vs my version " << paxos->get_version()
+ << " (too far ahead)"
+ << dendl;
+ sync_start(other);
+ m->put();
+ return;
+ }
unsigned need = monmap->size() / 2 + 1;
dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
-
if (outside_quorum.size() >= need) {
if (outside_quorum.count(name)) {
- dout(10) << " that's enough to form a new quorum, calling election" << dendl;
- start_election();
+ dout(10) << " that's enough to form a new quorum, calling election" << dendl;
+ start_election();
} else {
- dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
+ dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
}
} else {
- dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
- }
- }
- m->put();
-}
-
-/*
- * The whole slurp process is currently a bit of a hack. Given the
- * current storage model, we should be sharing code with Paxos to make
- * sure we copy the right content. But that model sucks and will
- * hopefully soon change, and it's less work to kludge around it here
- * than it is to make the current model clean.
- *
- * So: more or less duplicate the work of resyncing each paxos state
- * machine here. And move the monitor storage refactor stuff up the
- * todo list.
- *
- */
-
-void Monitor::slurp()
-{
- dout(10) << "slurp " << slurp_source << " " << slurp_versions << dendl;
-
- /*
- reset_probe_timeout();
-
- state = STATE_SLURPING;
-
- map<string,version_t>::iterator p = slurp_versions.begin();
- while (p != slurp_versions.end()) {
- Paxos *pax = get_paxos_by_name(p->first);
- if (!pax) {
- p++;
- continue;
- }
-
- dout(10) << " " << p->first << " v " << p->second << " vs my " << pax->get_version() << dendl;
- if (p->second > pax->get_version() ||
- pax->get_stashed_version() > pax->get_version()) {
- if (!pax->is_slurping()) {
- pax->start_slurping();
- }
- MMonProbe *m = new MMonProbe(monmap->fsid, MMonProbe::OP_SLURP, name, has_ever_joined);
- m->machine_name = p->first;
- m->oldest_version = pax->get_first_committed();
- m->newest_version = pax->get_version();
- messenger->send_message(m, slurp_source);
- return;
- }
-
- // latest?
- if (pax->get_first_committed() > 1 && // don't need it!
- pax->get_stashed_version() < pax->get_first_committed()) {
- if (!pax->is_slurping()) {
- pax->start_slurping();
- }
- MMonProbe *m = new MMonProbe(monmap->fsid, MMonProbe::OP_SLURP_LATEST, name, has_ever_joined);
- m->machine_name = p->first;
- m->oldest_version = pax->get_first_committed();
- m->newest_version = pax->get_version();
- messenger->send_message(m, slurp_source);
- return;
- }
-
- PaxosService *paxs = get_paxos_service_by_name(p->first);
- assert(paxs);
- paxs->update_from_paxos();
-
- pax->end_slurping();
-
- slurp_versions.erase(p++);
- }
-
- dout(10) << "done slurping" << dendl;
- bootstrap();
- */
-}
-
-MMonProbe *Monitor::fill_probe_data(MMonProbe *m, Paxos *pax)
-{
- dout(10) << __func__ << *m << dendl;
- /*
- MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_DATA, name, has_ever_joined);
- r->machine_name = m->machine_name;
- r->oldest_version = pax->get_first_committed();
- r->newest_version = pax->get_version();
-
- version_t v = MAX(pax->get_first_committed(), m->newest_version + 1);
- int len = 0;
- for (; v <= pax->get_version(); v++) {
- len += store->get(m->machine_name.c_str(), v,
- r->paxos_values[m->machine_name][v]);
-
- for (list<string>::iterator p = pax->extra_state_dirs.begin();
- p != pax->extra_state_dirs.end();
- ++p) {
- len += store->get(p->c_str(), v, r->paxos_values[*p][v]);
- }
- if (len >= g_conf->mon_slurp_bytes)
- break;
- }
-
- return r;
- */
- return NULL;
-}
-
-void Monitor::handle_probe_slurp(MMonProbe *m)
-{
- dout(10) << "handle_probe_slurp " << *m << dendl;
- /*
- Paxos *pax = get_paxos_by_name(m->machine_name);
- assert(pax);
-
- MMonProbe *r = fill_probe_data(m, pax);
- messenger->send_message(r, m->get_connection());
- */
- m->put();
-}
-
-void Monitor::handle_probe_slurp_latest(MMonProbe *m)
-{
- dout(10) << "handle_probe_slurp_latest " << *m << dendl;
- /*
- Paxos *pax = get_paxos_by_name(m->machine_name);
- assert(pax);
-
- MMonProbe *r = fill_probe_data(m, pax);
- r->latest_version = pax->get_stashed(r->latest_value);
-
- messenger->send_message(r, m->get_connection());
- */
- m->put();
-}
-
-void Monitor::handle_probe_data(MMonProbe *m)
-{
- dout(10) << "handle_probe_data " << *m << dendl;
- /*
- Paxos *pax = get_paxos_by_name(m->machine_name);
- assert(pax);
-
- // trim old cruft?
- if (m->oldest_version > pax->get_first_committed())
- pax->trim_to(m->oldest_version, true);
-
- // note new latest version?
- if (slurp_versions.count(m->machine_name))
- slurp_versions[m->machine_name] = m->newest_version;
-
- // store any new stuff
- if (m->paxos_values.size()) {
- map<string, map<version_t,bufferlist> >::iterator p;
-
- // bundle everything on a single transaction
- MonitorDBStore::Transaction t;
-
- for (p = m->paxos_values.begin(); p != m->paxos_values.end(); ++p) {
- store->put(&t, p->first.c_str(), p->second.begin(), p->second.end());
+ dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
}
-
- pax->last_committed = m->paxos_values.begin()->second.rbegin()->first;
- store->put(&t, m->machine_name.c_str(),
- "last_committed", pax->last_committed);
-
- store->apply_transaction(t);
- }
-
- // latest?
- if (m->latest_version) {
- pax->stash_latest(m->latest_version, m->latest_value);
}
-
m->put();
-
- slurp();
- */
}
void Monitor::start_election()
dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
<< " quorum is " << quorum << " features are " << quorum_features << dendl;
+ // let everyone currently syncing know that we are no longer the leader and
+ // that they should all abort their on-going syncs
+ for (map<entity_inst_t,Context*>::iterator iter = trim_timeouts.begin();
+ iter != trim_timeouts.end();
+ ++iter) {
+ timer.cancel_event((*iter).second);
+ entity_inst_t entity = (*iter).first;
+ MMonSync *msg = new MMonSync(MMonSync::OP_ABORT);
+ messenger->send_message(msg, entity);
+ }
+ trim_timeouts.clear();
+ sync_role &= ~SYNC_ROLE_LEADER;
+
paxos->peon_init();
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
(*p)->election_finished();
return false;
}
+void Monitor::_sync_status(ostream& ss)
+{
+ JSONFormatter jf(true);
+ jf.open_object_section("sync_status");
+ jf.dump_string("state", get_state_name());
+ jf.dump_unsigned("paxos_version", paxos->get_version());
+
+ if (is_leader() || (sync_role == SYNC_ROLE_LEADER)) {
+ Mutex::Locker l(trim_lock);
+ jf.open_object_section("trim");
+ jf.dump_int("disabled", paxos->is_trim_disabled());
+ jf.dump_int("should_trim", paxos->should_trim());
+ if (trim_timeouts.size() > 0) {
+ jf.open_array_section("mons");
+ for (map<entity_inst_t,Context*>::iterator it = trim_timeouts.begin();
+ it != trim_timeouts.end();
+ ++it) {
+ entity_inst_t e = (*it).first;
+ jf.dump_stream("mon") << e;
+ int s = -1;
+ if (trim_entities_states.count(e))
+ s = trim_entities_states[e];
+ jf.dump_stream("sync_state") << get_sync_state_name(s);
+ }
+ }
+ jf.close_section();
+ }
+
+ if ((sync_entities.size() > 0) || (sync_role == SYNC_ROLE_PROVIDER)) {
+ jf.open_array_section("on_going");
+ for (map<entity_inst_t,SyncEntity>::iterator it = sync_entities.begin();
+ it != sync_entities.end();
+ ++it) {
+ entity_inst_t e = (*it).first;
+ jf.open_object_section("mon");
+ jf.dump_stream("addr") << e;
+ jf.dump_string("state", (*it).second->get_state());
+ int s = -1;
+ if (sync_entities_states.count(e))
+ s = sync_entities_states[e];
+ jf.dump_stream("sync_state") << get_sync_state_name(s);
+
+ jf.close_section();
+ }
+ jf.close_section();
+ }
+
+ if (is_synchronizing() || (sync_role == SYNC_ROLE_REQUESTER)) {
+ jf.open_object_section("leader");
+ SyncEntity sync_entity = sync_leader;
+ if (sync_entity.get() != NULL)
+ jf.dump_stream("addr") << sync_entity->entity;
+ jf.close_section();
+
+ jf.open_object_section("provider");
+ sync_entity = sync_provider;
+ if (sync_entity.get() != NULL)
+ jf.dump_stream("addr") << sync_entity->entity;
+ jf.close_section();
+ }
+
+ if (g_conf->mon_sync_leader_kill_at > 0)
+ jf.dump_int("leader_kill_at", g_conf->mon_sync_leader_kill_at);
+ if (g_conf->mon_sync_provider_kill_at > 0)
+ jf.dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
+ if (g_conf->mon_sync_requester_kill_at > 0)
+ jf.dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
+
+ jf.close_section();
+ jf.flush(ss);
+}
+
+void Monitor::_sync_force(ostream& ss)
+{
+ MonitorDBStore::Transaction tx;
+ tx.put("mon_sync", "force_sync", 1);
+ store->apply_transaction(tx);
+
+ JSONFormatter jf(true);
+ jf.open_object_section("sync_force");
+ jf.dump_int("ret", 0);
+ jf.dump_stream("msg") << "forcing store sync the next time the monitor starts";
+ jf.close_section();
+ jf.flush(ss);
+}
+
void Monitor::_quorum_status(ostream& ss)
{
JSONFormatter jf(true);
jf.dump_string("mon", *p);
jf.close_section();
- if (is_slurping()) {
- jf.dump_stream("slurp_source") << slurp_source;
- jf.open_object_section("slurp_version");
- for (map<string,version_t>::iterator p = slurp_versions.begin(); p != slurp_versions.end(); ++p)
- jf.dump_int(p->first.c_str(), p->second);
- jf.close_section();
+ if (is_synchronizing()) {
+ jf.dump_stream("sync_leader") << sync_leader->entity;
+ jf.dump_stream("sync_provider") << sync_provider->entity;
}
jf.open_object_section("monmap");
_mon_status(ss);
rs = ss.str();
r = 0;
+ } else if (m->cmd[0] == "sync") {
+ if (!access_r) {
+ r = -EACCES;
+ rs = "access denied";
+ goto out;
+ }
+ if (m->cmd[1] == "status") {
+ stringstream ss;
+ _sync_status(ss);
+ rs = ss.str();
+ r = 0;
+ } else if (m->cmd[1] == "force") {
+ if (m->cmd.size() < 4 || m->cmd[2] != "--yes-i-really-mean-it"
+ || m->cmd[3] != "--i-know-what-i-am-doing") {
+ r = -EINVAL;
+ rs = "are you SURE? this will mean the monitor store will be "
+ "erased. pass '--yes-i-really-mean-it "
+ "--i-know-what-i-am-doing' if you really do.";
+ goto out;
+ }
+ stringstream ss;
+ _sync_force(ss);
+ rs = ss.str();
+ r = 0;
+ } else {
+ rs = "unknown command";
+ r = -EINVAL;
+ goto out;
+ }
} else if (m->cmd[0] == "heap") {
if (!access_all) {
r = -EACCES;
handle_probe((MMonProbe*)m);
break;
+ // Sync (i.e., the new slurp, but on steroids)
+ case MSG_MON_SYNC:
+ handle_sync((MMonSync*)m);
+ break;
+
// OSDs
case MSG_OSD_FAILURE:
case MSG_OSD_BOOT:
break;
}
+ if (state == STATE_SYNCHRONIZING) {
+ // we are synchronizing. These messages would do us no
+ // good, thus just drop them and ignore them.
+ dout(10) << __func__ << " ignore paxos msg from "
+ << pm->get_source_inst() << dendl;
+ pm->put();
+ break;
+ }
+
// sanitize
if (pm->epoch > get_epoch()) {
bootstrap();
m->put();
break;
}
- if (!is_probing() && !is_slurping()) {
+ if (!is_probing() && !is_synchronizing()) {
elector.dispatch(m);
} else {
m->put();
// ok go.
dout(11) << "tick" << dendl;
- if (!is_slurping()) {
- for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) {
- (*p)->tick();
- }
+ for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) {
+ (*p)->tick();
}
// trim sessions
#include "osd/OSDMap.h"
#include "common/LogClient.h"
+#include "common/SimpleRNG.h"
#include "auth/cephx/CephxKeyServer.h"
#include "auth/AuthMethodList.h"
#include "mon/MonitorDBStore.h"
#include <memory>
+#include <tr1/memory>
#include <errno.h>
class MMonGetMap;
class MMonGetVersion;
+class MMonSync;
class MMonProbe;
class MMonSubscribe;
class MAuthRotating;
private:
enum {
STATE_PROBING = 1,
- STATE_SLURPING,
+ STATE_SYNCHRONIZING,
STATE_ELECTING,
STATE_LEADER,
STATE_PEON,
static const char *get_state_name(int s) {
switch (s) {
case STATE_PROBING: return "probing";
- case STATE_SLURPING: return "slurping";
+ case STATE_SYNCHRONIZING: return "synchronizing";
case STATE_ELECTING: return "electing";
case STATE_LEADER: return "leader";
case STATE_PEON: return "peon";
}
}
const char *get_state_name() const {
- return get_state_name(state);
+ string sn(get_state_name(state));
+ string sync_name(get_sync_state_name());
+ sn.append(sync_name);
+ return sn.c_str();
}
bool is_probing() const { return state == STATE_PROBING; }
- bool is_slurping() const { return state == STATE_SLURPING; }
+ bool is_synchronizing() const { return state == STATE_SYNCHRONIZING; }
bool is_electing() const { return state == STATE_ELECTING; }
bool is_leader() const { return state == STATE_LEADER; }
bool is_peon() const { return state == STATE_PEON; }
uint64_t quorum_features; ///< intersection of quorum member feature bits
set<string> outside_quorum;
- entity_inst_t slurp_source;
- map<string,version_t> slurp_versions;
+
+ /**
+ * @defgroup Synchronization
+ * @{
+ */
+ /**
+ * Obtain the synchronization target prefixes in set form.
+ *
+ * We consider a target prefix all those that are relevant when
+ * synchronizing two stores. That is, all those that hold paxos service's
+ * versions, as well as paxos versions, or any control keys such as the
+ * first or last committed version.
+ *
+ * Given the current design, this function should return the name of all and
+ * any available paxos service, plus the paxos name.
+ *
+ * @returns a set of strings referring to the prefixes being synchronized
+ */
+ set<string> get_sync_targets_names();
+ /**
+ * Handle a sync-related message
+ *
+ * This function will call the appropriate handling functions for each
+ * operation type.
+ *
+ * @param m A sync-related message (i.e., of type MMonSync)
+ */
+ void handle_sync(MMonSync *m);
+ /**
+ * Handle a sync-related message of operation type OP_ABORT.
+ *
+ * @param m A sync-related message of type OP_ABORT
+ */
+ void handle_sync_abort(MMonSync *m);
+ /**
+ * Reset the monitor's sync-related data structures and state.
+ */
+ void reset_sync(bool abort = false);
+
+ /**
+ * @defgroup Synchronization_Roles
+ * @{
+ */
+ /**
+ * The monitor has no role in any on-going synchronization.
+ */
+ static const uint8_t SYNC_ROLE_NONE = 0x0;
+ /**
+ * The monitor is the Leader in at least one synchronization.
+ */
+ static const uint8_t SYNC_ROLE_LEADER = 0x1;
+ /**
+ * The monitor is the Provider in at least one synchronization.
+ */
+ static const uint8_t SYNC_ROLE_PROVIDER = 0x2;
+ /**
+ * The monitor is a requester in the on-going synchronization.
+ */
+ static const uint8_t SYNC_ROLE_REQUESTER = 0x4;
+
+ /**
+ * The monitor's current role in on-going synchronizations, if any.
+ *
+ * A monitor can either be part of no synchronization at all, in which case
+ * @p sync_role shall hold the value @p SYNC_ROLE_NONE, or it can be part of
+ * an on-going synchronization, in which case it may be playing either one or
+ * two roles at the same time:
+ *
+ * - If the monitor is the sync requester (i.e., be the one synchronizing
+ * against some other monitor), the @p sync_role field will hold only the
+ * @p SYNC_ROLE_REQUESTER value.
+ * - Otherwise, the monitor can be either a sync leader, or a sync provider,
+ * or both, in which case @p sync_role will hold a binary OR of both
+ * @p SYNC_ROLE_LEADER and @p SYNC_ROLE_PROVIDER.
+ */
+ uint8_t sync_role;
+ /**
+ * @}
+ */
+ /**
+ * @defgroup Leader-specific
+ * @{
+ */
+ /**
+ * Guarantee mutual exclusion access to the @p trim_timeouts map.
+ *
+ * We need this mutex specially when we have a monitor starting a sync with
+ * the leader and another one finishing or aborting an on-going sync, that
+ * happens to be the last on-going trim on the map. Given that we will
+ * enable the Paxos trim once we deplete the @p trim_timeouts map, we must
+ * then ensure that we either add the new sync start to the map before
+ * removing the one just finishing, or that we remove the finishing one
+ * first and enable the trim before we add the new one. If we fail to do
+ * this, nasty repercussions could follow.
+ */
+ Mutex trim_lock;
+ /**
+ * Map holding all on-going syncs' timeouts.
+ *
+ * An on-going sync leads to the Paxos trim to be suspended, and this map
+ * will associate entities to the timeouts to be triggered if the monitor
+ * being synchronized fails to check-in with the leader, letting him know
+ * that the sync is still in effect and that in no circumstances should the
+ * Paxos trim be enabled.
+ */
+ map<entity_inst_t, Context*> trim_timeouts;
+ map<entity_inst_t, uint8_t> trim_entities_states;
+ /**
+ * Map associating monitors to a sync state.
+ *
+ * This map is used by both the Leader and the Sync Provider, and has the
+ * sole objective of keeping track of the state each monitor's sync process
+ * is in.
+ */
+ map<entity_inst_t, uint8_t> sync_entities_states;
+ /**
+ * Timer that will enable the Paxos trim.
+ *
+ * This timer is set after the @p trim_timeouts map is depleted, and once
+ * fired it will enable the Paxos trim (if still disabled). By setting
+ * this timer, we avoid a scenario in which a monitor has just finished
+ * synchronizing, but because the Paxos trim had been disabled for a long,
+ * long time and a lot of trims were proposed in the timespan of the monitor
+ * finishing its sync and actually joining the cluster, the monitor happens
+ * to be out-of-sync yet again. Backing off enabling the Paxos trim will
+ * allow the other monitor to join the cluster before actually trimming.
+ */
+ Context *trim_enable_timer;
+
+ /**
+ * Callback class responsible for finishing a monitor's sync session on the
+ * leader's side, because the said monitor failed to acknowledge its
+ * liveliness in a timely manner, thus being assumed as failed.
+ */
+ struct C_TrimTimeout : public Context {
+ Monitor *mon;
+ entity_inst_t entity;
+
+ C_TrimTimeout(Monitor *m, entity_inst_t& entity)
+ : mon(m), entity(entity) { }
+ void finish(int r) {
+ mon->sync_finish(entity);
+ }
+ };
+
+ /**
+ * Callback class responsible for enabling the Paxos trim if there are no
+ * more on-going syncs.
+ */
+ struct C_TrimEnable : public Context {
+ Monitor *mon;
+
+ C_TrimEnable(Monitor *m) : mon(m) { }
+ void finish(int r) {
+ Mutex::Locker(mon->trim_lock);
+ // even if we are no longer the leader, we should re-enable trim if
+ // we have disabled it in the past. It doesn't mean we are going to
+ // do anything about it, but if we happen to become the leader
+ // sometime down the future, we sure want to have the trim enabled.
+ if (!mon->trim_timeouts.size())
+ mon->paxos->trim_enable();
+ mon->trim_enable_timer = NULL;
+ }
+ };
+
+ /**
+ * Send a heartbeat message to another entity.
+ *
+ * The sent message may be a heartbeat reply if the @p reply parameter is
+ * set to true.
+ *
+ * This function is used both by the leader (always with @p reply = true),
+ * and by the sync requester (always with @p reply = false).
+ *
+ * @param other The target monitor's entity instance.
+ * @param reply Whether the message to be sent should be a heartbeat reply.
+ */
+ void sync_send_heartbeat(entity_inst_t &other, bool reply = false);
+ /**
+ * Handle a Sync Start request.
+ *
+ * Monitors wanting to synchronize with the cluster will have to first ask
+ * the leader to do so. The only objective with this is so that the we can
+ * gurantee that the leader won't trim the paxos state.
+ *
+ * The leader may not be the only one receiving this request. A sync provider
+ * may also receive it when it is taken as the point of entry onto the
+ * cluster. In this scenario, the provider must then forward this request to
+ * the leader, if he know of one, or assume himself as the leader for this
+ * sync purpose (this may happen if there is no formed quorum).
+ *
+ * @param m Sync message with operation type MMonSync::OP_START
+ */
+ void handle_sync_start(MMonSync *m);
+ /**
+ * Handle a Heartbeat sent by a sync requester.
+ *
+ * We use heartbeats as a way to guarantee that both the leader and the sync
+ * requester are still alive. Receiving this message means that the requester
+ * if still working on getting his store synchronized.
+ *
+ * @param m Sync message with operation type MMonSync::OP_HEARTBEAT
+ */
+ void handle_sync_heartbeat(MMonSync *m);
+ /**
+ * Handle a Sync Finish.
+ *
+ * A MMonSync::OP_FINISH is the way the sync requester has to inform the
+ * leader that he finished synchronizing his store.
+ *
+ * @param m Sync message with operation type MMonSync::OP_FINISH
+ */
+ void handle_sync_finish(MMonSync *m);
+ /**
+ * Finish a given monitor's sync process on the leader's side.
+ *
+ * This means cleaning up the state referring to the monitor whose sync has
+ * finished (may it have been finished successfully, by receiving a message
+ * with type MMonSync::OP_FINISH, or due to the assumption that the said
+ * monitor failed).
+ *
+ * If we happen to know of no other monitor synchronizing, we may then enable
+ * the paxos trim.
+ *
+ * @param entity Entity instance of the monitor whose sync we are considering
+ * as finished.
+ * @param abort If true, we consider this sync has finished due to an abort.
+ */
+ void sync_finish(entity_inst_t &entity, bool abort = false);
+ /**
+ * Abort a given monitor's sync process on the leader's side.
+ *
+ * This function is a wrapper for Monitor::sync_finish().
+ *
+ * @param entity Entity instance of the monitor whose sync we are aborting.
+ */
+ void sync_finish_abort(entity_inst_t &entity) {
+ sync_finish(entity, true);
+ }
+ /**
+ * @} // Leader-specific
+ */
+ /**
+ * @defgroup Synchronization Provider-specific
+ * @{
+ */
+ /**
+ * Represents a participant in a synchronization, along with its state.
+ *
+ * This class is used to track down all the sync requesters we are providing
+ * to. In such scenario, it won't be uncommon to have the @p synchronizer
+ * field set with a connection to the MonitorDBStore, the @p timeout field
+ * containing a timeout event and @p entity containing the entity instance
+ * of the monitor we are here representing.
+ *
+ * The sync requester will also use this class to represent both the sync
+ * leader and the sync provider.
+ */
+ struct SyncEntityImpl {
+
+ /**
+ * Store synchronization related Sync state.
+ */
+ enum {
+ /**
+ * No state whatsoever. We are not providing any sync suppport.
+ */
+ STATE_NONE = 0,
+ /**
+ * This entity's sync effort is currently focused on reading and sharing
+ * our whole store state with @p entity. This means all the entries in
+ * the key/value space.
+ */
+ STATE_WHOLE = 1,
+ /**
+ * This entity's sync effor is currently focused on reading and sharing
+ * our Paxos state with @p entity. This means all the Paxos-related
+ * key/value entries, such as the Paxos versions.
+ */
+ STATE_PAXOS = 2
+ };
+
+ /**
+ * The entity instace of the monitor whose sync effort we are representing.
+ */
+ entity_inst_t entity;
+ /**
+ * Our Monitor.
+ */
+ Monitor *mon;
+ /**
+ * The Paxos version we are focusing on.
+ *
+ * @note This is not used at the moment. We are still assessing whether we
+ * need it.
+ */
+ version_t version;
+ /**
+ * Timeout event. Its type and purpose varies depending on the situation.
+ */
+ Context *timeout;
+ /**
+ * Last key received during a sync effort.
+ *
+ * This field is mainly used by the sync requester to track the last
+ * received key, in case he needs to switch providers due to failure. The
+ * sync provider will also use this field whenever the requester specifies
+ * a last received key when requesting the provider to start sending his
+ * store chunks.
+ */
+ pair<string,string> last_received_key;
+ /**
+ * Hold the Store Synchronization related Sync State.
+ */
+ int sync_state;
+ /**
+ * The MonitorDBStore's chunk iterator instance we are currently using
+ * to obtain the store's chunks and pack them to the sync requester.
+ */
+ MonitorDBStore::Synchronizer synchronizer;
+ MonitorDBStore::Synchronizer paxos_synchronizer;
+ /* Should only be used for debugging purposes */
+ /**
+ * crc of the contents read from the store.
+ *
+ * @note may not always be available, as it is used only on specific
+ * points in time during the sync process.
+ * @note depends on '--mon-sync-debug' being set.
+ */
+ __u32 crc;
+ /**
+ * Should be true if @p crc has been set.
+ */
+ bool crc_available;
+ /**
+ * Total synchronization attempts.
+ */
+ int attempts;
+
+ SyncEntityImpl(entity_inst_t &entity, Monitor *mon)
+ : entity(entity),
+ mon(mon),
+ version(0),
+ timeout(NULL),
+ sync_state(STATE_NONE),
+ crc_available(false),
+ attempts(0)
+ { }
+
+ /**
+ * Obtain current Sync State name.
+ *
+ * @returns Name of current sync state.
+ */
+ string get_state() {
+ switch (sync_state) {
+ case STATE_NONE: return "none";
+ case STATE_WHOLE: return "whole";
+ case STATE_PAXOS: return "paxos";
+ default: return "unknown";
+ }
+ }
+ /**
+ * Obtain the paxos version at which this sync started.
+ *
+ * @returns Paxos version at which this sync started
+ */
+ version_t get_version() {
+ return version;
+ }
+ /**
+ * Set a timeout event for this sync entity.
+ *
+ * @param event Timeout class to be called after @p fire_after seconds.
+ * @param fire_after Number of seconds until we fire the @p event event.
+ */
+ void set_timeout(Context *event, double fire_after) {
+ cancel_timeout();
+ timeout = event;
+ mon->timer.add_event_after(fire_after, timeout);
+ }
+ /**
+ * Cancel the currently set timeout, if any.
+ */
+ void cancel_timeout() {
+ if (timeout)
+ mon->timer.cancel_event(timeout);
+ timeout = NULL;
+ }
+ /**
+ * Initiate the required fields for obtaining chunks out of the
+ * MonitorDBStore.
+ *
+ * This function will initiate @p synchronizer with a chunk iterator whose
+ * scope is all the keys/values that belong to one of the sync targets
+ * (i.e., paxos services or paxos).
+ *
+ * Calling @p Monitor::sync_update() will be essential during the efforts
+ * of providing a correct store state to the requester, since we will need
+ * to eventually update the iterator in order to start packing the Paxos
+ * versions.
+ */
+ void sync_init() {
+ sync_state = STATE_WHOLE;
+ set<string> sync_targets = mon->get_sync_targets_names();
+
+ string prefix("paxos");
+ paxos_synchronizer = mon->store->get_synchronizer(prefix);
+ version = mon->paxos->get_version();
+ generic_dout(10) << __func__ << " version " << version << dendl;
+
+ synchronizer = mon->store->get_synchronizer(last_received_key,
+ sync_targets);
+ sync_update();
+ assert(synchronizer->has_next_chunk());
+ }
+ /**
+ * Update the @p synchronizer chunk iterator, if needed.
+ *
+ * Whenever we reach the end of the iterator during @p STATE_WHOLE, we
+ * must update the @p synchronizer to an iterator focused on reading only
+ * Paxos versions. This is an essential part of the sync store approach,
+ * and it will guarantee that we end up with a consistent store.
+ */
+ void sync_update() {
+ assert(sync_state != STATE_NONE);
+ assert(synchronizer.use_count() != 0);
+
+ if (!synchronizer->has_next_chunk()) {
+ crc_set(synchronizer->crc());
+ if (sync_state == STATE_WHOLE) {
+ assert(paxos_synchronizer.use_count() != 0);
+ sync_state = STATE_PAXOS;
+ synchronizer = paxos_synchronizer;
+ }
+ }
+ }
+
+ /* For debug purposes only */
+ /**
+ * Check if we have a CRC available.
+ *
+ * @returns true if crc is available; false otherwise.
+ */
+ bool has_crc() {
+ return (g_conf->mon_sync_debug && crc_available);
+ }
+ /**
+ * Set @p crc to @p to_set
+ *
+ * @param to_set a crc value to set.
+ */
+ void crc_set(__u32 to_set) {
+ crc = to_set;
+ crc_available = true;
+ }
+ /**
+ * Get the current CRC value from @p crc
+ *
+ * @returns the currenct CRC value from @p crc
+ */
+ __u32 crc_get() {
+ return crc;
+ }
+ /**
+ * Clear the current CRC.
+ */
+ void crc_clear() {
+ crc_available = false;
+ }
+ };
+ typedef std::tr1::shared_ptr< SyncEntityImpl > SyncEntity;
+ /**
+ * Get a Monitor::SyncEntity instance.
+ *
+ * @param entity The monitor's entity instance that we want to associate
+ * with this Monitor::SyncEntity.
+ * @param mon The Monitor.
+ *
+ * @returns A Monitor::SyncEntity
+ */
+ SyncEntity get_sync_entity(entity_inst_t &entity, Monitor *mon) {
+ return std::tr1::shared_ptr<SyncEntityImpl>(
+ new SyncEntityImpl(entity, mon));
+ }
+ /**
+ * Callback class responsible for dealing with the consequences of a sync
+ * process timing out.
+ */
+ struct C_SyncTimeout : public Context {
+ Monitor *mon;
+ entity_inst_t entity;
+
+ C_SyncTimeout(Monitor *mon, entity_inst_t &entity)
+ : mon(mon), entity(entity)
+ { }
+
+ void finish(int r) {
+ mon->sync_timeout(entity);
+ }
+ };
+ /**
+ * Map containing all the monitor entities to whom we are acting as sync
+ * providers.
+ */
+ map<entity_inst_t, SyncEntity> sync_entities;
+ /**
+ * RNG used for the sync (currently only used to pick random monitors)
+ */
+ SimpleRNG sync_rng;
+ /**
+ * Obtain random monitor from the monmap.
+ *
+ * @param other Any monitor other than the one with rank @p other
+ * @returns The picked monitor's name.
+ */
+ string _pick_random_mon(int other = -1);
+ int _pick_random_quorum_mon(int other = -1);
+ /**
+ * Deal with the consequences of @p entity's sync timing out.
+ *
+ * @note Both the sync provider and the sync requester make use of this
+ * function, since both use the @p Monitor::C_SyncTimeout callback.
+ *
+ * Being the sync provider, whenever a Monitor::C_SyncTimeout is triggered,
+ * we only have to clean up the sync requester's state we are maintaining.
+ *
+ * Being the sync requester, we will have to choose a new sync provider, and
+ * resume our sync from where it was left.
+ *
+ * @param entity Entity instance of the monitor whose sync has timed out.
+ */
+ void sync_timeout(entity_inst_t &entity);
+ /**
+ * Cleanup the state we, the provider, are keeping during @p entity's sync.
+ *
+ * @param entity Entity instance of the monitor whose sync state we are
+ * cleaning up.
+ */
+ void sync_provider_cleanup(entity_inst_t &entity);
+ /**
+ * Handle a Sync Start Chunks request from a sync requester.
+ *
+ * This request will create the necessary state our the provider's end, and
+ * the provider will then be able to send chunks of his own store to the
+ * requester.
+ *
+ * @param m Sync message with operation type MMonSync::OP_START_CHUNKS
+ */
+ void handle_sync_start_chunks(MMonSync *m);
+ /**
+ * Handle a requester's reply to the last chunk we sent him.
+ *
+ * We will only send a new chunk to the sync requester once he has acked the
+ * reception of the last chunk we sent them.
+ *
+ * That's also how we will make sure that, on their end, they became aware
+ * that there are no more chunks to send (since we shall tag a message with
+ * MMonSync::FLAG_LAST when we are sending them the last chunk of all),
+ * allowing us to clean up the requester's state.
+ *
+ * @param m Sync message with operation type MMonSync::OP_CHUNK_REPLY
+ */
+ void handle_sync_chunk_reply(MMonSync *m);
+ /**
+ * Send a chunk to the sync entity represented by @p sync.
+ *
+ * This function will send the next chunk available on the synchronizer. If
+ * it happens to be the last chunk, then the message shall be marked as
+ * such using MMonSync::FLAG_LAST.
+ *
+ * @param sync A Monitor::SyncEntity representing a sync requester monitor.
+ */
+ void sync_send_chunks(SyncEntity sync);
+ /**
+ * @} // Synchronization Provider-specific
+ */
+ /**
+ * @defgroup Synchronization Requester-specific
+ * @{
+ */
+ /**
+ * The state in which we (the sync leader, provider or requester) are in
+ * regard to our sync process (if we are the requester) or any entity that
+ * we may be leading or providing to.
+ */
+ enum {
+ /**
+ * We are not part of any synchronization effort, or it has not began yet.
+ */
+ SYNC_STATE_NONE = 0,
+ /**
+ * We have started our role in the synchronization.
+ *
+ * This state may have multiple meanings, depending on which entity is
+ * employing it and within which context.
+ *
+ * For instance, the leader will consider a sync requester to enter
+ * SYNC_STATE_START whenever it receives a MMonSync::OP_START from the
+ * said requester. On the other hand, the provider will consider that the
+ * requester enters this state after receiving a MMonSync::OP_START_CHUNKS.
+ * The sync requester will enter this state as soon as it begins its sync
+ * efforts.
+ */
+ SYNC_STATE_START = 1,
+ /**
+ * We are synchronizing chunks.
+ *
+ * This state is not used by the sync leader; only the sync requester and
+ * the sync provider will.
+ */
+ SYNC_STATE_CHUNKS = 2,
+ /**
+ * We are stopping the sync effort.
+ */
+ SYNC_STATE_STOP = 3
+ };
+ /**
+ * The current sync state.
+ *
+ * This field is only used by the sync requester, being the only one that
+ * will take this state as part of its global state. The sync leader and the
+ * sync provider will only associate sync states to other entities (i.e., to
+ * sync requesters), and those shall be kept in the @p sync_entities_states
+ * map.
+ */
+ int sync_state;
+ /**
+ * Callback class responsible for dealing with the consequences of the sync
+ * requester not receiving a MMonSync::OP_START_REPLY in a timely manner.
+ */
+ struct C_SyncStartTimeout : public Context {
+ Monitor *mon;
+
+ C_SyncStartTimeout(Monitor *mon)
+ : mon(mon)
+ { }
+
+ void finish(int r) {
+ mon->sync_start_reply_timeout();
+ }
+ };
+ /**
+ * Callback class responsible for retrying a Sync Start after a given
+ * backoff period, whenever the Sync Leader flags a MMonSync::OP_START_REPLY
+ * with the MMonSync::FLAG_RETRY flag.
+ */
+ struct C_SyncStartRetry : public Context {
+ Monitor *mon;
+ entity_inst_t entity;
+
+ C_SyncStartRetry(Monitor *mon, entity_inst_t &entity)
+ : mon(mon), entity(entity)
+ { }
+
+ void finish(int r) {
+ mon->bootstrap();
+ }
+ };
+ /**
+ * We use heartbeats to check if both the Leader and the Synchronization
+ * Requester are both still alive, so we can determine if we should continue
+ * with the synchronization process, granted that trim is disabled.
+ */
+ struct C_HeartbeatTimeout : public Context {
+ Monitor *mon;
+
+ C_HeartbeatTimeout(Monitor *mon)
+ : mon(mon)
+ { }
+
+ void finish(int r) {
+ mon->sync_requester_abort();
+ }
+ };
+ /**
+ * Callback class responsible for sending a heartbeat message to the sync
+ * leader. We use this callback to keep an assynchronous heartbeat with
+ * the sync leader at predefined intervals.
+ */
+ struct C_HeartbeatInterval : public Context {
+ Monitor *mon;
+ entity_inst_t entity;
+
+ C_HeartbeatInterval(Monitor *mon, entity_inst_t &entity)
+ : mon(mon), entity(entity)
+ { }
+
+ void finish(int r) {
+ mon->sync_leader->set_timeout(new C_HeartbeatTimeout(mon),
+ g_conf->mon_sync_heartbeat_timeout);
+ mon->sync_send_heartbeat(entity);
+ }
+ };
+ /**
+ * Callback class responsible for dealing with the consequences of never
+ * receiving a reply to a MMonSync::OP_FINISH sent to the sync leader.
+ */
+ struct C_SyncFinishReplyTimeout : public Context {
+ Monitor *mon;
+
+ C_SyncFinishReplyTimeout(Monitor *mon)
+ : mon(mon)
+ { }
+
+ void finish(int r) {
+ mon->sync_finish_reply_timeout();
+ }
+ };
+ /**
+ * The entity we, the sync requester, consider to be our sync leader. If
+ * there is a formed quorum, the @p sync_leader should represent the actual
+ * cluster Leader; otherwise, it can be any monitor and will likely be the
+ * same as @p sync_provider.
+ */
+ SyncEntity sync_leader;
+ /**
+ * The entity we, the sync requester, are synchronizing against. This entity
+ * will be our source of store chunks, and we will ultimately obtain a store
+ * state equal (or very similar, maybe off by a couple of versions) as their
+ * own.
+ */
+ SyncEntity sync_provider;
+ /**
+ * Clean up the Sync Requester's state (both in-memory and in-store).
+ */
+ void sync_requester_cleanup();
+ /**
+ * Abort the current sync effort.
+ *
+ * This will be translated into a MMonSync::OP_ABORT sent to the sync leader
+ * and to the sync provider, and ultimately it will also involve calling
+ * @p Monitor::sync_requester_cleanup() to clean up our current sync state.
+ */
+ void sync_requester_abort();
+ /**
+ * Deal with a timeout while waiting for a MMonSync::OP_FINISH_REPLY.
+ *
+ * This will be assumed as a leader failure, and having been exposed to the
+ * side-effects of a new Leader being elected, we have no other choice but
+ * to abort our sync process and start fresh.
+ */
+ void sync_finish_reply_timeout();
+ /**
+ * Deal with a timeout while waiting for a MMonSync::OP_START_REPLY.
+ *
+ * This will be assumed as a leader failure. Since we didn't get to do
+ * much work (as we haven't even started our sync), we will simply bootstrap
+ * and start off fresh with a new sync leader.
+ */
+ void sync_start_reply_timeout();
+ /**
+ * Start the synchronization efforts.
+ *
+ * This function should be called whenever we find the need to synchronize
+ * our store state with the remaining cluster.
+ *
+ * Starting the sync process means that we will have to request the cluster
+ * Leader (if there is a formed quorum) to stop trimming the Paxos state and
+ * allow us to start synchronizing with the sync provider we picked.
+ *
+ * @param entity An entity instance referring to the sync provider we picked.
+ */
+ void sync_start(entity_inst_t &entity);
+ /**
+ * Request the provider to start sending the chunks of his store, in order
+ * for us to obtain a consistent store state similar to the one shared by
+ * the cluster.
+ *
+ * @param provider The SyncEntity representing the Sync Provider.
+ */
+ void sync_start_chunks(SyncEntity provider);
+ /**
+ * Handle a MMonSync::OP_START_REPLY sent by the Sync Leader.
+ *
+ * Reception of this message may be twofold: if it was marked with the
+ * MMonSync::FLAG_RETRY flag, we must backoff for a while and retry starting
+ * the sync at a later time; otherwise, we have the green-light to request
+ * the Sync Provider to start sharing his chunks with us.
+ *
+ * @param m Sync message with operation type MMonSync::OP_START_REPLY
+ */
+ void handle_sync_start_reply(MMonSync *m);
+ /**
+ * Handle a Heartbeat reply sent by the Sync Leader.
+ *
+ * We use heartbeats to keep the Sync Leader aware that we are keeping our
+ * sync efforts alive. We also use them to make sure our Sync Leader is
+ * still alive. If the Sync Leader fails, we will have to abort our on-going
+ * sync, or we could incurr in an inconsistent store state due to a trim on
+ * the Paxos state of the monitor provinding us with his store chunks.
+ *
+ * @param m Sync message with operation type MMonSync::OP_HEARTBEAT_REPLY
+ */
+ void handle_sync_heartbeat_reply(MMonSync *m);
+ /**
+ * Handle a chunk sent by the Sync Provider.
+ *
+ * We will receive the Sync Provider's store in chunks. These are encoded
+ * in bufferlists containing a transaction that will be directly applied
+ * onto our MonitorDBStore.
+ *
+ * Whenever we receive such a message, we must reply to the Sync Provider,
+ * as a way of acknowledging the reception of its last chunk. If the message
+ * is tagged with a MMonSync::FLAG_LAST, we can then consider we have
+ * received all the chunks the Sync Provider had to offer, and finish our
+ * sync efforts with the Sync Leader.
+ *
+ * @param m Sync message with operation type MMonSync::OP_CHUNK
+ */
+ void handle_sync_chunk(MMonSync *m);
+ /**
+ * Handle a reply sent by the Sync Leader to a MMonSync::OP_FINISH.
+ *
+ * As soon as we receive this message, we know we finally have a store state
+ * consistent with the remaining cluster (give or take a couple of versions).
+ * We may then bootstrap and attempt to join the other monitors in the
+ * cluster.
+ *
+ * @param m Sync message with operation type MMonSync::OP_FINISH_REPLY
+ */
+ void handle_sync_finish_reply(MMonSync *m);
+ /**
+ * Stop our synchronization effort by sending a MMonSync::OP_FINISH to the
+ * Sync Leader.
+ *
+ * Once we receive the last chunk from the Sync Provider, we are in
+ * conditions of officially finishing our sync efforts. With that purpose in
+ * mind, we must then send a MMonSync::OP_FINISH to the Leader, letting him
+ * know that we no longer require the Paxos state to be preserved.
+ */
+ void sync_stop();
+ /**
+ * @} // Synchronization Requester-specific
+ */
+ const string get_sync_state_name(int s) const {
+ switch (s) {
+ case SYNC_STATE_NONE: return "none";
+ case SYNC_STATE_START: return "start";
+ case SYNC_STATE_CHUNKS: return "chunks";
+ case SYNC_STATE_STOP: return "stop";
+ }
+ return "???";
+ }
+ /**
+ * Obtain a string describing the current Sync State.
+ *
+ * @returns A string describing the current Sync State, if any, or an empty
+ * string if no sync (or sync effort we know of) is in progress.
+ */
+ const string get_sync_state_name() const {
+ string sn;
+
+ if (sync_role == SYNC_ROLE_NONE)
+ return "";
+
+ sn.append(" sync(");
+
+ if (sync_role & SYNC_ROLE_LEADER)
+ sn.append(" leader");
+ if (sync_role & SYNC_ROLE_PROVIDER)
+ sn.append(" provider");
+ if (sync_role & SYNC_ROLE_REQUESTER)
+ sn.append(" requester");
+
+ sn.append(" state ");
+ sn.append(get_sync_state_name(sync_state));
+
+ sn.append(" )");
+
+ return sn;
+ }
+
+ /**
+ * @} // Synchronization
+ */
list<Context*> waitfor_quorum;
list<Context*> maybe_wait_for_quorum;
*/
- Context *probe_timeout_event; // for probing and slurping states
+ Context *probe_timeout_event; // for probing
struct C_ProbeTimeout : public Context {
Monitor *mon;
void cancel_probe_timeout();
void probe_timeout(int r);
- void slurp();
-
-
public:
epoch_t get_epoch();
int get_leader() { return leader; }
bool _allowed_command(MonSession *s, const vector<std::string>& cmd);
void _mon_status(ostream& ss);
void _quorum_status(ostream& ss);
+ void _sync_status(ostream& ss);
+ void _sync_force(ostream& ss);
void _add_bootstrap_peer_hint(string cmd, string args, ostream& ss);
void handle_command(class MMonCommand *m);
void handle_route(MRoute *m);
void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version);
+ /**
+ * Handle Synchronization-related messages.
+ */
void handle_probe(MMonProbe *m);
/**
* Handle a Probe Operation, replying with our name, quorum and known versions.
*/
void handle_probe_probe(MMonProbe *m);
void handle_probe_reply(MMonProbe *m);
- void handle_probe_slurp(MMonProbe *m);
- void handle_probe_slurp_latest(MMonProbe *m);
- void handle_probe_data(MMonProbe *m);
- /**
- * Given an MMonProbe and associated Paxos machine, create a reply,
- * fill it with the missing Paxos states and current commit pointers
- *
- * @param m The incoming MMonProbe. We use this to determine the range
- * of paxos states to include in the reply.
- * @param pax The Paxos state machine which m is associated with.
- *
- * @returns A new MMonProbe message, initialized as OP_DATA, and filled
- * with the necessary Paxos states. */
- MMonProbe *fill_probe_data(MMonProbe *m, Paxos *pax);
// request routing
struct RoutedRequest {
int preinit();
int init();
+ void init_paxos();
void shutdown();
void tick();