int len = 0;
for (; v <= pax->get_version(); v++) {
len += store->get_bl_sn(r->paxos_values[m->machine_name][v], m->machine_name.c_str(), v);
+ r->gv[m->machine_name][v] = store->get_global_version(m->machine_name.c_str(), v);
for (list<string>::iterator p = pax->extra_state_dirs.begin();
p != pax->extra_state_dirs.end();
++p) {
for (map<string, map<version_t, bufferlist> >::iterator p = m->paxos_values.begin();
p != m->paxos_values.end();
++p) {
- store->put_bl_sn_map(p->first.c_str(), p->second.begin(), p->second.end());
+ store->put_bl_sn_map(p->first.c_str(), p->second.begin(), p->second.end(), &m->gv[p->first]);
}
pax->last_committed = m->paxos_values.begin()->second.rbegin()->first;
dout(10) << " sharing our accepted but uncommitted value for "
<< last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
last->values[last_committed+1] = bl;
+ last->gv[last_committed+1] = mon->store->get_global_version(machine_name, last_committed+1);
last->uncommitted_pn = accepted_pn;
}
for ( ; v <= last_committed; v++) {
if (mon->store->exists_bl_sn(machine_name, v)) {
mon->store->get_bl_sn(m->values[v], machine_name, v);
+ m->gv[v] = mon->store->get_global_version(machine_name, v);
dout(10) << " sharing " << v << " ("
<< m->values[v].length() << " bytes)" << dendl;
}
dout(10) << "store_state [" << start->first << ".."
<< last_committed << "]" << dendl;
- mon->store->put_bl_sn_map(machine_name, start, end);
+ mon->store->put_bl_sn_map(machine_name, start, end, &m->gv);
mon->store->put_int(last_committed, machine_name, "last_committed");
mon->store->put_int(first_committed, machine_name, "first_committed");
}
accepted.insert(mon->rank);
new_value = v;
mon->store->put_bl_sn(new_value, machine_name, last_committed+1);
+ if (gv > 0)
+ mon->store->put_global_version(machine_name, last_committed+1, gv);
if (mon->get_quorum().size() == 1) {
// we're alone, take it easy
MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
machine_id, ceph_clock_now(g_ceph_context));
begin->values[last_committed+1] = new_value;
+ begin->gv[last_committed+1] = gv;
begin->last_committed = last_committed;
begin->pn = accepted_pn;
// yes.
version_t v = last_committed+1;
- dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
+ dout(10) << "accepting value for " << v << " pn " << accepted_pn << " gv " << begin->gv[v] << dendl;
mon->store->put_bl_sn(begin->values[v], machine_name, v);
+ if (begin->gv.count(v) && begin->gv[v] > 0)
+ mon->store->put_global_version(machine_name, v, begin->gv[v]);
// reply
MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
machine_id, ceph_clock_now(g_ceph_context));
commit->values[last_committed] = new_value;
+ commit->gv[last_committed] = mon->store->get_global_version(machine_name, last_committed);
commit->pn = accepted_pn;
commit->last_committed = last_committed;
-
+
mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
}