}
}
+void Paxos::store_state(MMonPaxos *m)
+{
+ bool big_sync = m->values.size() > 5;
+
+ // stash?
+ if (m->latest_version && m->latest_version > last_committed) {
+ dout(10) << "store_state got stash version " << m->latest_version << ", zapping old states" << dendl;
+ stash_latest(m->latest_version, m->latest_value);
+
+ while (first_committed <= last_committed) {
+ dout(10) << "store_state trim " << first_committed << dendl;
+ mon->store->erase_sn(machine_name, first_committed);
+ first_committed++;
+ }
+ last_committed = m->latest_version;
+ first_committed = last_committed;
+ mon->store->put_int(first_committed, machine_name, "first_committed");
+ }
+
+ for (map<version_t,bufferlist>::iterator p = m->values.begin();
+ p != m->values.end();
+ ++p) {
+ if (p->first <= last_committed)
+ continue;
+ if (p->first > last_committed + 1)
+ break;
+ last_committed = p->first;
+ dout(10) << "store_state got " << last_committed << " (" << p->second.length() << " bytes)" << dendl;
+ mon->store->put_bl_sn(p->second, machine_name, last_committed, !big_sync);
+ }
+
+ mon->store->put_int(last_committed, machine_name, "last_committed");
+ if (big_sync)
+ mon->store->sync();
+}
+
// leader
void Paxos::handle_last(MMonPaxos *last)
}
// did we receive a committed value?
- if (last->last_committed > last_committed) {
- /* hmm.
- if (last->latest_version) {
- last_committed = last->latest_value;
- dout(10) << "stashing latest full value " << last_committed << dendl;
- stash_latest(last_committed, last->latest_value);
- }
- */
- bool big_sync = last->last_committed - last_committed > 5;
- for (version_t v = last_committed+1;
- v <= last->last_committed;
- v++) {
- mon->store->put_bl_sn(last->values[v], machine_name, v, !big_sync);
- dout(10) << "committing " << v << " "
- << last->values[v].length() << " bytes" << dendl;
- }
- if (big_sync)
- mon->store->sync();
- last_committed = last->last_committed;
- mon->store->put_int(last_committed, machine_name, "last_committed");
- dout(10) << "last_committed now " << last_committed << dendl;
- }
+ store_state(last);
// do they accept your pn?
if (last->pn > accepted_pn) {
}
+
+
void Paxos::handle_commit(MMonPaxos *commit)
{
dout(10) << "handle_commit on " << commit->last_committed << dendl;
return;
}
- // commit locally.
- bool big_sync = commit->values.size() > 2;
-
- // stash?
- if (commit->latest_version) {
- dout(10) << "got stash version " << commit->latest_version << ", zapping old states" << dendl;
- stash_latest(commit->latest_version, commit->latest_value);
-
- while (first_committed <= last_committed) {
- dout(10) << "trim " << first_committed << dendl;
- mon->store->erase_sn(machine_name, first_committed);
- first_committed++;
- }
- last_committed = commit->latest_version;
- first_committed = last_committed;
- mon->store->put_int(first_committed, machine_name, "first_committed");
- }
-
- for (map<version_t,bufferlist>::iterator p = commit->values.begin();
- p != commit->values.end();
- ++p) {
- assert(p->first <= last_committed+1);
- if (p->first == last_committed+1) {
- last_committed = p->first;
- dout(10) << " storing " << last_committed << " (" << p->second.length() << " bytes)" << dendl;
- mon->store->put_bl_sn(p->second, machine_name, last_committed, !big_sync);
- }
- }
- if (big_sync)
- mon->store->sync();
- mon->store->put_int(last_committed, machine_name, "last_committed");
+ store_state(commit);
commit->put();