MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id);
collect->last_committed = last_committed;
+ collect->first_committed = first_committed;
collect->pn = accepted_pn;
mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
}
// reply
MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id);
last->last_committed = last_committed;
+ last->first_committed = first_committed;
// do we have an accepted but uncommitted value?
// (it'll be at last_committed+1)
last->pn_from = accepted_pn_from;
// and share whatever data we have
- if (collect->last_committed < last_committed) {
- bufferlist bl;
- version_t l = get_latest(bl);
- assert(l <= last_committed);
+ if (collect->last_committed < last_committed)
+ share_state(last, collect->first_committed, collect->last_committed);
- version_t v = collect->last_committed;
+ // send reply
+ mon->messenger->send_message(last, collect->get_source_inst());
+ collect->put();
+}
- // start with a stashed full copy?
- /* hmm.
- if (l > v + 10) {
- last->latest_value.claim(bl);
- last->latest_version = l;
- v = l;
- }
- */
+void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, version_t peer_last_committed)
+{
+ assert(peer_last_committed < last_committed);
- // include (remaining) incrementals
- for (v++;
- v <= last_committed;
- v++) {
- if (mon->store->exists_bl_sn(machine_name, v)) {
- mon->store->get_bl_sn(last->values[v], machine_name, v);
- dout(10) << " sharing " << v << " ("
- << last->values[v].length() << " bytes)" << dendl;
- }
+ dout(10) << "share_state peer has fc " << peer_first_committed << " lc " << peer_last_committed << dendl;
+ version_t v = peer_last_committed;
+
+ // start with a stashed full copy?
+ if (peer_last_committed < first_committed) {
+ bufferlist bl;
+ version_t l = get_latest(bl);
+ assert(l <= last_committed);
+ dout(10) << "share_state starting with latest " << l << " (" << bl.length() << " bytes)" << dendl;
+ m->latest_value.claim(bl);
+ m->latest_version = l;
+ v = l;
+ }
+
+ // include (remaining) incrementals
+ for (v++;
+ v <= last_committed;
+ v++) {
+ if (mon->store->exists_bl_sn(machine_name, v)) {
+ mon->store->get_bl_sn(m->values[v], machine_name, v);
+ dout(10) << " sharing " << v << " ("
+ << m->values[v].length() << " bytes)" << dendl;
}
}
-
- // send reply
- mon->messenger->send_message(last, collect->get_source_inst());
- collect->put();
}
// share committed values
dout(10) << "sending commit to " << last->get_source() << dendl;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
- for (version_t v = last->last_committed+1;
- v <= last_committed;
- v++) {
- mon->store->get_bl_sn(commit->values[v], machine_name, v);
- dout(10) << " sharing " << v << " ("
- << commit->values[v].length() << " bytes)" << dendl;
- }
+ share_state(commit, last->first_committed, last->last_committed);
commit->last_committed = last_committed;
mon->messenger->send_message(commit, last->get_source_inst());
}
// commit locally.
bool big_sync = commit->values.size() > 2;
+
+ // stash?
+ if (commit->latest_version) {
+ dout(10) << "got stash version " << commit->latest_version << ", zapping old states" << dendl;
+ stash_latest(commit->latest_version, commit->latest_value);
+
+ while (first_committed <= last_committed) {
+ dout(10) << "trim " << first_committed << dendl;
+ mon->store->erase_sn(machine_name, first_committed);
+ first_committed++;
+ }
+ last_committed = commit->latest_version;
+ first_committed = last_committed;
+ mon->store->put_int(first_committed, machine_name, "first_committed");
+ }
+
for (map<version_t,bufferlist>::iterator p = commit->values.begin();
p != commit->values.end();
++p) {
MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
lease->last_committed = last_committed;
lease->lease_expire = lease_expire;
- if (mon->is_full_quorum())
- lease->first_committed = first_committed;
+ lease->first_committed = first_committed;
mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
}