From 85ed24afb83d7b33d56868df119e8be857b93670 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 6 Sep 2012 13:39:34 -0700 Subject: [PATCH] mon: share global version with proposal, collect, commits, slurp, etc. Share the global version through all of the paxos messages that share data, and when slurping. This ensures that committed values get the gv's associated with them, and that proposed values, if they are later committed by a new leader, will retain their original gv, and that new gv's will only be allocated after that -- once paxos settles and things go writeable. Signed-off-by: Sage Weil --- src/mon/Monitor.cc | 3 ++- src/mon/Paxos.cc | 14 +++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index adbd8708af9d9..f5f6ca1123ede 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -897,6 +897,7 @@ MMonProbe *Monitor::fill_probe_data(MMonProbe *m, Paxos *pax) int len = 0; for (; v <= pax->get_version(); v++) { len += store->get_bl_sn(r->paxos_values[m->machine_name][v], m->machine_name.c_str(), v); + r->gv[m->machine_name][v] = store->get_global_version(m->machine_name.c_str(), v); for (list::iterator p = pax->extra_state_dirs.begin(); p != pax->extra_state_dirs.end(); ++p) { @@ -955,7 +956,7 @@ void Monitor::handle_probe_data(MMonProbe *m) for (map >::iterator p = m->paxos_values.begin(); p != m->paxos_values.end(); ++p) { - store->put_bl_sn_map(p->first.c_str(), p->second.begin(), p->second.end()); + store->put_bl_sn_map(p->first.c_str(), p->second.begin(), p->second.end(), &m->gv[p->first]); } pax->last_committed = m->paxos_values.begin()->second.rbegin()->first; diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index f0f4cd0cc16c1..df636d8f71be6 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -151,6 +151,7 @@ void Paxos::handle_collect(MMonPaxos *collect) dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1 << " (" << bl.length() << " bytes)" << dendl; last->values[last_committed+1] = bl; + last->gv[last_committed+1] = mon->store->get_global_version(machine_name, last_committed+1); last->uncommitted_pn = accepted_pn; } @@ -184,6 +185,7 @@ void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, for ( ; v <= last_committed; v++) { if (mon->store->exists_bl_sn(machine_name, v)) { mon->store->get_bl_sn(m->values[v], machine_name, v); + m->gv[v] = mon->store->get_global_version(machine_name, v); dout(10) << " sharing " << v << " (" << m->values[v].length() << " bytes)" << dendl; } @@ -243,7 +245,7 @@ void Paxos::store_state(MMonPaxos *m) dout(10) << "store_state [" << start->first << ".." << last_committed << "]" << dendl; - mon->store->put_bl_sn_map(machine_name, start, end); + mon->store->put_bl_sn_map(machine_name, start, end, &m->gv); mon->store->put_int(last_committed, machine_name, "last_committed"); mon->store->put_int(first_committed, machine_name, "first_committed"); } @@ -383,6 +385,8 @@ void Paxos::begin(bufferlist& v, version_t gv) accepted.insert(mon->rank); new_value = v; mon->store->put_bl_sn(new_value, machine_name, last_committed+1); + if (gv > 0) + mon->store->put_global_version(machine_name, last_committed+1, gv); if (mon->get_quorum().size() == 1) { // we're alone, take it easy @@ -409,6 +413,7 @@ void Paxos::begin(bufferlist& v, version_t gv) MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id, ceph_clock_now(g_ceph_context)); begin->values[last_committed+1] = new_value; + begin->gv[last_committed+1] = gv; begin->last_committed = last_committed; begin->pn = accepted_pn; @@ -440,8 +445,10 @@ void Paxos::handle_begin(MMonPaxos *begin) // yes. version_t v = last_committed+1; - dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl; + dout(10) << "accepting value for " << v << " pn " << accepted_pn << " gv " << begin->gv[v] << dendl; mon->store->put_bl_sn(begin->values[v], machine_name, v); + if (begin->gv.count(v) && begin->gv[v] > 0) + mon->store->put_global_version(machine_name, v, begin->gv[v]); // reply MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, @@ -546,9 +553,10 @@ void Paxos::commit() MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id, ceph_clock_now(g_ceph_context)); commit->values[last_committed] = new_value; + commit->gv[last_committed] = mon->store->get_global_version(machine_name, last_committed); commit->pn = accepted_pn; commit->last_committed = last_committed; - + mon->messenger->send_message(commit, mon->monmap->get_inst(*p)); } -- 2.39.5