// op types
const static int OP_COLLECT = 1; // proposer: propose round
const static int OP_LAST = 2; // voter: accept proposed round
- const static int OP_OLDROUND = 3; // voter: notify proposer he proposed an old round
const static int OP_BEGIN = 4; // proposer: value proposed for this round
const static int OP_ACCEPT = 5; // voter: accept propsed value
- const static int OP_SUCCESS = 7; // proposer: notify learners of agreed value
- const static int OP_ACK = 8; // learner: notify proposer that new value has been saved
+ const static int OP_COMMIT = 7; // proposer: notify learners of agreed value
// which state machine?
int op;
payload.append((char*)&op, sizeof(op));
payload.append((char*)&machine_id, sizeof(machine_id));
payload.append((char*)&last_committed, sizeof(last_committed));
+ payload.append((char*)&old_accepted_pn, sizeof(old_accepted_pn));
::_encode(values, payload);
- ::_encode(pns, payload);
}
void decode_payload() {
int off = 0;
off += sizeof(machine_id);
payload.copy(off, sizeof(last_committed), (char*)&last_committed);
off += sizeof(last_committed);
+ payload.copy(off, sizeof(old_accepted_pn), (char*)&old_accepted_pn);
+ off += sizeof(old_accepted_pn);
::_decode(values, payload, off);
- ::_decode(pns, payload, off);
}
};
MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
collect->last_committed = last_committed;
- collect->pn = my_pn;
+ collect->pn = accepted_pn;
mon->messenger->send_message(collect, mon->monmap->get_inst(i));
}
}
dout(10) << "handle_collect " << *collect << endl;
// reply
- MMonPaxos *reply = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
- reply->last_committed = last_committed;
+ MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
+ last->last_committed = last_committed;
// do we have an accepted but uncommitted value?
// (it'll be at last_committed+1)
bufferlist bl;
- mon->store->get_bl(bl, machine_name, last_committed+1);
+ mon->store->get_bl_sn(bl, machine_name, last_committed+1);
if (bl.length()) {
dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl;
last->values[last_committed+1] = bl;
<< ", we already accepted " << accepted_pn << " from " << accepted_pn_from
<< endl;
}
- reply->pn = accepted_pn;
- reply->pn_from = accepted_pn_from;
+ last->pn = accepted_pn;
+ last->pn_from = accepted_pn_from;
// and share whatever data we have
for (version_t v = collect->last_committed;
v <= last_committed;
v++) {
- mon->store->get_bl(reply->values[v], machine_name, v);
+ mon->store->get_bl_sn(last->values[v], machine_name, v);
dout(10) << " sharing " << v << " "
- << reply->values[v].length() << " bytes" << endl;
+ << last->values[v].length() << " bytes" << endl;
}
// send reply
- mon->messenger->send_message(reply, collect->get_source_inst());
+ mon->messenger->send_message(last, collect->get_source_inst());
delete collect;
}
dout(10) << "handle_last " << *last << endl;
// share committed values?
- if (last->last_committed < last_commited) {
+ if (last->last_committed < last_committed) {
// share committed values
dout(10) << "sending commit to " << last->get_source() << endl;
MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
for (version_t v = last->last_committed;
v <= last_committed;
v++) {
- mon->store->get_bl(commit->values[v], machine_name, v);
+ mon->store->get_bl_sn(commit->values[v], machine_name, v);
dout(10) << "sharing " << v << " "
<< commit->values[v].length() << " bytes" << endl;
}
}
// did we receive committed value?
- if (last->last_committed > last_commited) {
+ if (last->last_committed > last_committed) {
for (version_t v = last_committed;
v <= last->last_committed;
v++) {
- mon->store->put_bl(last->values[v], machine_name, v);
+ mon->store->put_bl_sn(last->values[v], machine_name, v);
dout(10) << "committing " << v << " "
- << commit->values[v].length() << " bytes" << endl;
+ << last->values[v].length() << " bytes" << endl;
}
- last_commited = last->last_committed;
+ last_committed = last->last_committed;
mon->store->put_int(last_committed, machine_name, "last_commtted");
dout(10) << "last_committed now " << last_committed << endl;
}
// do they accept your pn?
- if (last->accepted_pn > accepted_pn) {
+ if (last->old_accepted_pn > accepted_pn) {
dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl;
- leader_start(last->accepted_pn);
+ collect(last->old_accepted_pn);
} else {
// they accepted our pn. great.
num_last++;
dout(10) << "great, they accepted out pn, we now have " << num_last << endl;
// did this person send back an accepted but uncommitted value?
- if (last->accepted_pn) {
+ if (last->old_accepted_pn &&
+ last->old_accepted_pn > old_accepted_pn) {
version_t v = last->last_committed+1;
- if (v > old_accepted_pn) {
- dout(10) << "we learned an old value for " << v << " pn " << last->old_pn;
- old_accepted_pn = old_accepted_pn;
- old_accepted_value = last->values[v];
- }
+ dout(10) << "we learned an old value for " << v << " pn " << last->old_accepted_pn;
+ old_accepted_pn = last->old_accepted_pn;
+ old_accepted_value = last->values[v];
}
-
+
// do we have a majority?
- if (num_last == mon->monmap->nummon/2+1) {
+ if (num_last == mon->monmap->num_mon/2+1) {
// do this once.
// did we learn an old value?
- if (!old_accepted_value.empty()) {
+ if (!old_accepted_value.length()) {
begin(old_accepted_value);
}
}
<< endl;
// we must already have a majority for this to work.
- assert(num_last > mon->monmap->nummon/2);
+ assert(num_last > mon->monmap->num_mon/2);
// and no value, yet.
- assert(new_value.empty());
+ assert(new_value.length() == 0);
// accept it ourselves
num_accepted = 1;
new_value = v;
- mon->store->put_bl(new_value, machine_name, last_committed+1);
+ mon->store->put_bl_sn(new_value, machine_name, last_committed+1);
// ask others to accept it to!
for (int i=0; i<mon->monmap->num_mon; ++i) {
}
// yes.
- dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
version_t v = last_committed+1;
- mon->store->put_bl(begin->values[v], machine_name, v);
+ dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
+ mon->store->put_bl_sn(begin->values[v], machine_name, v);
// reply
MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
accept->pn = accepted_pn;
accept->last_committed = last_committed;
- mon->messenger->send_message(accept);
+ mon->messenger->send_message(accept, begin->get_source_inst());
delete begin;
}
dout(10) << "now " << num_accepted << " have accepted" << endl;
// new majority?
- if (num_accepted == mon->monmap->nummon/2+1) {
+ if (num_accepted == mon->monmap->num_mon/2+1) {
// yay, commit!
dout(10) << "we got a majority, committing too" << endl;
commit();
// commit locally.
last_committed = commit->last_committed;
- mon->store->put_bl(commit->values[last_committed], machine_name, last_committed);
+ mon->store->put_bl_sn(commit->values[last_committed], machine_name, last_committed);
mon->store->put_int(last_committed, machine_name, "last_committed");
delete commit;
}
-void Paxos::leader_start(version_t oldpn)
+void Paxos::leader_start()
{
dout(10) << "i am the leader, start paxos" << endl;
collect(0);