// proposer
void Paxos::propose(version_t v, bufferlist& value)
{
-//todo high rf
+//todo high rf: what si this?!?
}
void Paxos::handle_last(MMonPaxos *m)
{
-//todo high rf
- dout(10) << "handle_last " << *m << endl;
+ dout(10) << "handle_last - one more last collected " << *m << endl;
+ num_last++;
+ // use == instead of > so that if we receive additional LAST messages, we will not do this again
+ if (num_last == (unsigned)(mon->monmap->num_mon / 2)+1){
+ dout(5) << "handle_last - got majority" << endl;
+ // bcast to everyone else - begin
+ num_accepts=0;
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
+ // we only set up our value if we get the majority, so for now we don't save it
+ if (i == whoami) continue;
+ // todo high rf: where is the data we want to send?
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_BEGIN, whoami, pn, DATA_TO_SEND),
+ MSG_ADDR_MON(i), mon->monmap->get_inst(i));
+ }
+ }
delete m;
}
void Paxos::handle_accept(MMonPaxos *m)
{
-//todo high rf
dout(10) << "handle_accept " << *m << endl;
+ num_accepts++;
+ if (num_accepts == (unsigned)(mon->monmap->num_mon / 2)+1){
+ dout(5) << "handle_accept - bcast commit messages" << *m << endl;
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COMMIT, whoami, pn, DATA_TO_COMMIT),
+ MSG_ADDR_MON(i), mon->monmap->get_inst(i));
+ }
+ }
+
delete m;
}
void Paxos::handle_ack(MMonPaxos *m)
{
-//todo high rf
+//todo high rf: Do we have to do anything here?!?
dout(10) << "handle_ack " << *m << endl;
delete m;
}
void Paxos::handle_old_round(MMonPaxos *m)
{
-//todo high rf
+//todo high rf: should we get a new number (higher than the one returned by the other process) and try again?
dout(10) << "handle_old_round " << *m << endl;
delete m;
}
// accepter
void Paxos::handle_collect(MMonPaxos *m)
{
-//todo high rf
- // ...
-
+ if (m->pn > last) {
+ dout(10) << "handle_collect - replied LAST" << *m << endl;
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_LAST, whoami, last));
+ last = m->pn;
+ }
+ else {
+ dout(10) << "handle_collect - replied OLDROUND" << *m << endl;
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_OLDROUND, whoami, last));
+ }
+
delete m;
}
// learner
void Paxos::handle_success(MMonPaxos *m)
{
- //todo high rf
+ dout(10) << "handle_success - commit results" << endl;
+ //todo high rf: copy from TEMP_RES TO RES
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_ACK, whoaim));
delete m;
}
void Paxos::handle_begin(MMonPaxos *m)
{
- //todo high rf
+ if (m->pn >= last) {
+ dout(10) << "handle_begin - replied ACCEPT" << *m << endl;
+ // todo high rf: save the new monitor map to TEMP_RES
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_ACCEPT, whoami, last));
+ last = m->pn;
+ }
+ else {
+ dout(10) << "handle_begin - replied OLDROUND" << *m << endl;
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_OLDROUND, whoami, last));
+ }
delete m;
}
void Paxos::leader_start()
{
- dout(10) << "i am the leader" << endl;
+ dout(10) << "i am the leader, start paxos" << endl;
- // .. do something else too
+ // reset the number of lasts received
+ num_lasts = 0;
version_t pn = get_new_proposal_number();
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;