uncommitted_v = 0;
uncommitted_pn = 0;
uncommitted_value.clear();
+ peer_first_committed.clear();
+ peer_last_committed.clear();
// look for uncommitted value
if (mon->store->exists_bl_sn(machine_name, last_committed+1)) {
return;
}
- // share committed values?
- if (last->last_committed < last_committed) {
- // share committed values
- dout(10) << "sending commit to " << last->get_source() << dendl;
- MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
- share_state(commit, last->first_committed, last->last_committed);
- mon->messenger->send_message(commit, last->get_source_inst());
- }
+ // note peer's last_committed, in case we learn a new commit and need to
+ // push it to them.
+ peer_last_committed[last->get_source().num()] = last->last_committed;
// did we receive a committed value?
store_state(last);
mon->timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
+ // share committed values?
+ for (map<int,version_t>::iterator p = peer_last_committed.begin();
+ p != peer_last_committed.end();
+ ++p) {
+ if (p->second < last_committed) {
+ // share committed values
+ dout(10) << " sending commit to mon." << p->first << dendl;
+ MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
+ share_state(commit, peer_first_committed[p->first], p->second);
+ mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
+ }
+ }
+ peer_first_committed.clear();
+ peer_last_committed.clear();
+
// almost...
state = STATE_ACTIVE;