From 357aa0334436da79065dc67b270ff78f8899493f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 21 May 2010 16:17:34 -0700 Subject: [PATCH] paxos: recover using stashed latest when state histories don't overlap If we don't have incremental states to catch up, jump to the latest. --- src/mon/Paxos.cc | 86 ++++++++++++++++++++++++++++-------------------- src/mon/Paxos.h | 2 ++ 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 119aa482e40cc..91e96ca99a955 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -91,6 +91,7 @@ void Paxos::collect(version_t oldpn) MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id); collect->last_committed = last_committed; + collect->first_committed = first_committed; collect->pn = accepted_pn; mon->messenger->send_message(collect, mon->monmap->get_inst(*p)); } @@ -114,6 +115,7 @@ void Paxos::handle_collect(MMonPaxos *collect) // reply MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id); last->last_committed = last_committed; + last->first_committed = first_committed; // do we have an accepted but uncommitted value? // (it'll be at last_committed+1) @@ -144,37 +146,42 @@ void Paxos::handle_collect(MMonPaxos *collect) last->pn_from = accepted_pn_from; // and share whatever data we have - if (collect->last_committed < last_committed) { - bufferlist bl; - version_t l = get_latest(bl); - assert(l <= last_committed); + if (collect->last_committed < last_committed) + share_state(last, collect->first_committed, collect->last_committed); - version_t v = collect->last_committed; + // send reply + mon->messenger->send_message(last, collect->get_source_inst()); + collect->put(); +} - // start with a stashed full copy? - /* hmm. - if (l > v + 10) { - last->latest_value.claim(bl); - last->latest_version = l; - v = l; - } - */ +void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, version_t peer_last_committed) +{ + assert(peer_last_committed < last_committed); - // include (remaining) incrementals - for (v++; - v <= last_committed; - v++) { - if (mon->store->exists_bl_sn(machine_name, v)) { - mon->store->get_bl_sn(last->values[v], machine_name, v); - dout(10) << " sharing " << v << " (" - << last->values[v].length() << " bytes)" << dendl; - } + dout(10) << "share_state peer has fc " << peer_first_committed << " lc " << peer_last_committed << dendl; + version_t v = peer_last_committed; + + // start with a stashed full copy? + if (peer_last_committed < first_committed) { + bufferlist bl; + version_t l = get_latest(bl); + assert(l <= last_committed); + dout(10) << "share_state starting with latest " << l << " (" << bl.length() << " bytes)" << dendl; + m->latest_value.claim(bl); + m->latest_version = l; + v = l; + } + + // include (remaining) incrementals + for (v++; + v <= last_committed; + v++) { + if (mon->store->exists_bl_sn(machine_name, v)) { + mon->store->get_bl_sn(m->values[v], machine_name, v); + dout(10) << " sharing " << v << " (" + << m->values[v].length() << " bytes)" << dendl; } } - - // send reply - mon->messenger->send_message(last, collect->get_source_inst()); - collect->put(); } @@ -194,13 +201,7 @@ void Paxos::handle_last(MMonPaxos *last) // share committed values dout(10) << "sending commit to " << last->get_source() << dendl; MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); - for (version_t v = last->last_committed+1; - v <= last_committed; - v++) { - mon->store->get_bl_sn(commit->values[v], machine_name, v); - dout(10) << " sharing " << v << " (" - << commit->values[v].length() << " bytes)" << dendl; - } + share_state(commit, last->first_committed, last->last_committed); commit->last_committed = last_committed; mon->messenger->send_message(commit, last->get_source_inst()); } @@ -494,6 +495,22 @@ void Paxos::handle_commit(MMonPaxos *commit) // commit locally. bool big_sync = commit->values.size() > 2; + + // stash? + if (commit->latest_version) { + dout(10) << "got stash version " << commit->latest_version << ", zapping old states" << dendl; + stash_latest(commit->latest_version, commit->latest_value); + + while (first_committed <= last_committed) { + dout(10) << "trim " << first_committed << dendl; + mon->store->erase_sn(machine_name, first_committed); + first_committed++; + } + last_committed = commit->latest_version; + first_committed = last_committed; + mon->store->put_int(first_committed, machine_name, "first_committed"); + } + for (map::iterator p = commit->values.begin(); p != commit->values.end(); ++p) { @@ -533,8 +550,7 @@ void Paxos::extend_lease() MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id); lease->last_committed = last_committed; lease->lease_expire = lease_expire; - if (mon->is_full_quorum()) - lease->first_committed = first_committed; + lease->first_committed = first_committed; mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); } diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index 3ca31c5f30dff..9c999ac1cefc8 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -248,6 +248,8 @@ public: void leader_init(); void peon_init(); + void share_state(MMonPaxos *m, version_t first_committed, version_t last_committed); + // -- service interface -- void wait_for_active(Context *c) { waiting_for_active.push_back(c); -- 2.39.5