const static int OP_SUCCESS = 7; // proposer: notify learners of agreed value
const static int OP_ACK = 8; // learner: notify proposer that new value has been saved
+ // which state machine?
int op;
int machine_id;
- version_t pn;
- version_t v;
- bufferlist value;
+
+ version_t last_committed; // i've committed to
+ version_t pn_from; // i promise to accept after
+ version_t pn; // with with proposal
+ map<version_t,bufferlist> values;
+ version_t old_accepted_pn; // previous pn, if we are a LAST with an uncommitted value
MMonPaxos() : Message(MSG_MON_PAXOS) {}
- MMonPaxos(int o, int mid,
- version_t _pn, version_t _v) : Message(MSG_MON_PAXOS),
- op(o), machine_id(mid),
- pn(_pn), v(_v) {}
- MMonPaxos(int o, int mid,
- version_t _pn, version_t _v,
- bufferlist& b) : Message(MSG_MON_PAXOS),
- op(o), machine_id(mid),
- pn(_pn), v(_v),
- value(b) {}
+ MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS),
+ op(o), machine_id(mid) {}
virtual char *get_type_name() { return "paxos"; }
void print(ostream& out) {
out << "paxos(op " << op
- << ", machine " << machine_id
- << ", proposal " << proposal
- << ", state " << n
- << ", " << value.length() << " bytes)";
+ << ", machine " << machine_id << ")";
}
void encode_payload() {
payload.append((char*)&op, sizeof(op));
payload.append((char*)&machine_id, sizeof(machine_id));
- payload.append((char*)&proposal, sizeof(proposal));
- payload.append((char*)&n, sizeof(n));
- ::_encode(value, payload);
+ payload.append((char*)&last_committed, sizeof(last_committed));
+ ::_encode(values, payload);
+ ::_encode(pns, payload);
}
void decode_payload() {
int off = 0;
off += sizeof(op);
payload.copy(off, sizeof(machine_id), (char*)&machine_id);
off += sizeof(machine_id);
- payload.copy(off, sizeof(proposal), (char*)&proposal);
- off += sizeof(proposal);
- payload.copy(off, sizeof(n), (char*)&n);
- off += sizeof(n);
- ::_decode(value, payload, off);
+ payload.copy(off, sizeof(last_committed), (char*)&last_committed);
+ off += sizeof(last_committed);
+ ::_decode(values, payload, off);
+ ::_decode(pns, payload, off);
}
};
// PHASE 1
// proposer
-void Paxos::leader_start()
-{
- dout(10) << "i am the leader, start paxos" << endl;
+void Paxos::collect(version_t oldpn)
+{
// reset the number of lasts received
- last_num = 0;
- my_pn = get_new_proposal_number();
- last_pn = 0;
+ accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
+ accepted_pn_from = last_committed;
+ num_last = 1;
+ old_accepted_pn = 0;
+ old_accepted_value.clear();
+
+ dout(10) << "collect with pn " << accepted_pn << endl;
// send collect
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
- // todo high rf I pass the pn twice... what is the last parameter for?
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COLLECT, whoami, pn, 0),
- mon->monmap->get_inst(i));
+
+ MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
+ collect->last_committed = last_committed;
+ collect->pn = my_pn;
+ mon->messenger->send_message(collect, mon->monmap->get_inst(i));
}
}
-void Paxos::handle_collect(MMonPaxos *m)
+void Paxos::handle_collect(MMonPaxos *collect)
{
- if (m->pn > accepted_pn[m->v]) {
- dout(10) << "handle_collect - replied LAST" << *m << endl;
- // send ack/last
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_LAST, machine_id,
- accepted_pn[m->v], m->v,
- accepted_value[m->v]));
- accepted_pn[m->v] = m->pn;
+ dout(10) << "handle_collect " << *collect << endl;
+
+ // reply
+ MMonPaxos *reply = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
+ reply->last_committed = last_committed;
+
+ // do we have an accepted but uncommitted value?
+ // (it'll be at last_committed+1)
+ bufferlist bl;
+ mon->store->get_bl(bl, machine_name, last_committed+1);
+ if (bl.length()) {
+ dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl;
+ last->values[last_committed+1] = bl;
+ last->old_accepted_pn = accepted_pn;
}
- else {
- dout(10) << "handle_collect - replied OLDROUND" << *m << endl;
- // send nak/oldround
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_LAST, machine_id,
- accepted_pn[m->v], m->v,
- accepted_value[m->v]));
+
+ // can we accept this pn?
+ if (collect->pn > accepted_pn) {
+ // ok, accept it
+ accepted_pn = collect->pn;
+ accepted_pn_from = collect->pn_from;
+ dout(10) << "accepting " << accepted_pn << " from " << accepted_pn_from << endl;
+ } else {
+ // don't accept!
+ dout(10) << "NOT accepting " << collect->pn << " from " << collect->pn_from
+ << ", we already accepted " << accepted_pn << " from " << accepted_pn_from
+ << endl;
}
-
- delete m;
+ reply->pn = accepted_pn;
+ reply->pn_from = accepted_pn_from;
+
+ // and share whatever data we have
+ for (version_t v = collect->last_committed;
+ v <= last_committed;
+ v++) {
+ mon->store->get_bl(reply->values[v], machine_name, v);
+ dout(10) << " sharing " << v << " "
+ << reply->values[v].length() << " bytes" << endl;
+ }
+
+ // send reply
+ mon->messenger->send_message(reply, collect->get_source_inst());
+ delete collect;
}
-void Paxos::handle_last(MMonPaxos *m)
-{
- dout(10) << "handle_last " << *m << endl;
- if (m->pn > last_pn) {
- dout(10) << " accepter accepted pn " << m->pn << ", taking that value" << endl;
- last_value = m->value;
+void Paxos::handle_last(MMonPaxos *last)
+{
+ dout(10) << "handle_last " << *last << endl;
+
+ // share committed values?
+ if (last->last_committed < last_commited) {
+ // share committed values
+ dout(10) << "sending commit to " << last->get_source() << endl;
+ MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+ for (version_t v = last->last_committed;
+ v <= last_committed;
+ v++) {
+ mon->store->get_bl(commit->values[v], machine_name, v);
+ dout(10) << "sharing " << v << " "
+ << commit->values[v].length() << " bytes" << endl;
+ }
+ mon->messenger->send_message(commit, last->get_source_inst());
}
- // majority?
- bool had = have_majority();
- num_last++;
-
- if (!had && have_majority()) {
- dout(5) << "handle_last - we just got a majority" << endl;
+ // did we receive committed value?
+ if (last->last_committed > last_commited) {
+ for (version_t v = last_committed;
+ v <= last->last_committed;
+ v++) {
+ mon->store->put_bl(last->values[v], machine_name, v);
+ dout(10) << "committing " << v << " "
+ << commit->values[v].length() << " bytes" << endl;
+ }
+ last_commited = last->last_committed;
+ mon->store->put_int(last_committed, machine_name, "last_commtted");
+ dout(10) << "last_committed now " << last_committed << endl;
+ }
+
+ // do they accept your pn?
+ if (last->accepted_pn > accepted_pn) {
+ dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl;
+ leader_start(last->accepted_pn);
+ } else {
+ // they accepted our pn. great.
+ num_last++;
+ dout(10) << "great, they accepted out pn, we now have " << num_last << endl;
+
+ // did this person send back an accepted but uncommitted value?
+ if (last->accepted_pn) {
+ version_t v = last->last_committed+1;
+ if (v > old_accepted_pn) {
+ dout(10) << "we learned an old value for " << v << " pn " << last->old_pn;
+ old_accepted_pn = old_accepted_pn;
+ old_accepted_value = last->values[v];
+ }
+ }
- if (last_pn) {
- // propose previous value
- propose();
- } else {
- // it's unconstrained.
+ // do we have a majority?
+ if (num_last == mon->monmap->nummon/2+1) {
+ // do this once.
+ // did we learn an old value?
+ if (!old_accepted_value.empty()) {
+ begin(old_accepted_value);
+ }
}
}
+ delete last;
+}
+
+
+void Paxos::begin(bufferlist& v)
+{
+ dout(10) << "begin for " << last_committed+1 << " "
+ << new_value.length() << " bytes"
+ << endl;
+
+ // we must already have a majority for this to work.
+ assert(num_last > mon->monmap->nummon/2);
+ // and no value, yet.
+ assert(new_value.empty());
+ // accept it ourselves
+ num_accepted = 1;
+ new_value = v;
+ mon->store->put_bl(new_value, machine_name, last_committed+1);
- // use == instead of > so that if we receive additional LAST messages, we will not do this again
- if (num_last == (unsigned)(mon->monmap->num_mon / 2)+1) {
- dout(5) << "handle_last - got majority" << endl;
+ // ask others to accept it to!
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
+ if (i == whoami) continue;
- // propose
- propose
+ dout(10) << " sending begin to mon" << i << endl;
+ MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id);
+ begin->values[last_committed+1] = new_value;
+ begin->pn = accepted_pn;
- // bcast to everyone else - begin
- num_accept = 0;
- for (int i=0; i<mon->monmap->num_mon; ++i) {
- if (i == whoami) continue;
- // we only set up our value if we get the majority, so for now we don't save it
- // todo high rf: where is the data we want to send?
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id,
- pn, DATA_TO_SEND),
- mon->monmap->get_inst(i));
- }
+ mon->messenger->send_message(begin, mon->monmap->get_inst(i));
}
- delete m;
}
-void Paxos::handle_oldround(MMonPaxos *m)
+void Paxos::handle_begin(MMonPaxos *begin)
{
- dout(10) << "handle_oldround " << *m << endl;
+ dout(10) << "handle_begin " << *begin << endl;
- // the state is already constrained because an acceptor has
- // accepted with a higher pn than ours.
+ // can we accept this?
+ if (begin->pn != accepted_pn) {
+ dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl;
+ delete begin;
+ return;
+ }
- // in any case, we can propose for this state.
-
- last_value = m->value;
- last_pn = m->pn;
+ // yes.
+ dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
+ version_t v = last_committed+1;
+ mon->store->put_bl(begin->values[v], machine_name, v);
- // ?
- if (have_majority())
- propose();
-
- delete m;
+ // reply
+ MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
+ accept->pn = accepted_pn;
+ accept->last_committed = last_committed;
+ mon->messenger->send_message(accept);
+
+ delete begin;
}
-
-void Paxos::handle_accept(MMonPaxos *m)
+void Paxos::handle_accept(MMonPaxos *accept)
{
- dout(10) << "handle_accept " << *m << endl;
- num_accepts++;
- if (num_accepts == (unsigned)(mon->monmap->num_mon / 2)+1){
- dout(5) << "handle_accept - bcast commit messages" << *m << endl;
- for (int i=0; i<mon->monmap->num_mon; ++i) {
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COMMIT,
- whoami,
- pn,
- DATA_TO_COMMIT),
- mon->monmap->get_inst(i));
- }
- }
-
- delete m;
+ dout(10) << "handle_accept " << *accept << endl;
-}
+ if (accept->pn != accepted_pn) {
+ // we accepted a higher pn, from some other leader
+ dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl;
+ delete accept;
+ return;
+ }
+ if (accept->last_committed != last_committed) {
+ dout(10) << " this is from an old round that's already committed, ignoring" << endl;
+ delete accept;
+ return;
+ }
+ num_accepted++;
+ dout(10) << "now " << num_accepted << " have accepted" << endl;
-void Paxos::propose(version_t v, bufferlist& value)
-{
- last_value = value;
- last_version = v;
+ // new majority?
+ if (num_accepted == mon->monmap->nummon/2+1) {
+ // yay, commit!
+ dout(10) << "we got a majority, committing too" << endl;
+ commit();
+ }
- // kick start whatever
- // send some message
}
-
+void Paxos::commit()
+{
+ dout(10) << "commit " << last_committed+1 << endl;
+ // commit locally
+ last_committed++;
+ mon->store->put_int(last_committed, machine_name, "last_committed");
+
+ // tell everyone
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
+ if (i == whoami) continue;
+
+ dout(10) << " sending commit to mon" << i << endl;
+ MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+ commit->values[last_committed] = new_value;
+ commit->pn = accepted_pn;
+
+ mon->messenger->send_message(commit, mon->monmap->get_inst(i));
+ }
+
+ // get ready for a new round.
+ new_value.clear();
-void Paxos::handle_ack(MMonPaxos *m)
-{
-//todo high rf: Do we have to do anything here?!?
- dout(10) << "handle_ack " << *m << endl;
- delete m;
}
-void Paxos::handle_old_round(MMonPaxos *m)
+
+void Paxos::handle_commit(MMonPaxos *commit)
{
-//todo high rf: should we get a new number (higher than the one returned by the other process) and try again?
- dout(10) << "handle_old_round " << *m << endl;
- delete m;
-}
+ dout(10) << "handle_commit on " << commit->last_committed << endl;
+
+ // commit locally.
+ last_committed = commit->last_committed;
+ mon->store->put_bl(commit->values[last_committed], machine_name, last_committed);
+ mon->store->put_int(last_committed, machine_name, "last_committed");
+ delete commit;
+}
+
+
/*
* return a globally unique, monotonically increasing proposal number
}
-// ---------------------------------
-// accepter
-void Paxos::handle_collect(MMonPaxos *m)
-{
- if (m->pn > accepted_pn[m->v]) {
- dout(10) << "handle_collect - replied LAST" << *m << endl;
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_LAST, machine_id,
- m->pn, m->v));
- accepted_pn[m->v] = m->pn;
- }
- else {
- dout(10) << "handle_collect - replied OLDROUND" << *m << endl;
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_OLDROUND, machine_id,
- accepted_pn[m->v], m->v,
- accepted_value[m->v]));
- }
-
- delete m;
-}
-
-
-
-
-// ---------------------------------
-// learner
-void Paxos::handle_success(MMonPaxos *m)
-{
- dout(10) << "handle_success - commit results" << endl;
- //todo high rf: copy from TEMP_RES TO RES
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_ACK, whoaim));
- delete m;
-}
-
-void Paxos::handle_begin(MMonPaxos *m)
+void Paxos::leader_start(version_t oldpn)
{
- if (m->pn >= last) {
- dout(10) << "handle_begin - replied ACCEPT" << *m << endl;
- // todo high rf: save the new monitor map to TEMP_RES
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_ACCEPT, whoami, last));
- last = m->pn;
- }
- else {
- dout(10) << "handle_begin - replied OLDROUND" << *m << endl;
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_OLDROUND, whoami, last));
- }
- delete m;
+ dout(10) << "i am the leader, start paxos" << endl;
+ collect(0);
}
-// ---------------------------------
-
-
void Paxos::dispatch(Message *m)
{
handle_last(pm);
break;
- case MMonPaxos::OP_OLDROUND:
- handle_old_round(pm);
- break;
-
case MMonPaxos::OP_BEGIN:
handle_begin(pm);
break;
handle_accept(pm);
break;
- case MMonPaxos::OP_SUCCESS:
- handle_success(pm);
- break;
-
- case MMonPaxos::OP_ACK:
- handle_ack(pm);
+ case MMonPaxos::OP_COMMIT:
+ handle_commit(pm);
break;
-
- default:
- assert(0);
+
+ default:
+ assert(0);
}
}
break;
/*
time---->
-ccccccccccccccccccaaa??????????????????????????????????????
-ccccccccccccccc????????????????????????????????????????????
-ccccccccccccccccccaaa??????????????????????????????????????
-ccccccccccccccccccaaa??????????????????????????????????????
-ccca???????????????????????????????????????????????????????
+cccccccccccccccccca????????????????????????????????????????
+cccccccccccccccccca????????????????????????????????????????
+cccccccccccccccccca???????????????????????????????????????? leader
+cccccccccccccccccc?????????????????????????????????????????
+ccccc??????????????????????????????????????????????????????
-collect(v>2)
- last and values for each v>2
- or
- oldround ...
+last_committed
+pn_from
+pn
-what values we\'ve accepted, and for at pn\'s
-what values we happen to know have been committed.
+a 12v
+b 12v
+c 14v
+d
+e 12v
-dddddddddddddddd_??????????????????????????????????????????
*/
#ifndef __MON_PAXOS_H
int machine_id;
const char *machine_name;
- // proposer
- /*
- version_t last_version; // the last version i'm proposing
- int num_last; // number of peers that have responded my collect
- //version_t last_pn; // my proposal number
- int num_accept; // number of peers that have accepted my propose
-
- version_t constrained_thru;
- */
- //map<version_t, version_t> last_pn;
-
// phase 1
- version_t my_pn; // my pn
- version_t last_pn; // the largest pn i've heard via a LAST
- bufferlist last_value; // the value i'm proposing
- int last_num; // how many LAST's i've received
-
- bool have_majority() {
- return num_accept > (mon->monmap->num_mon / 2)+1;
- }
-
- void propose(version_t v, bufferlist& value);
+ version_t last_committed;
+ version_t accepted_pn;
+ version_t accepted_pn_from;
+ // results from our last replies
+ int num_last;
+ version_t old_accepted_pn;
+ bufferlist old_accepted_value;
+
+ // phase 2
+ bufferlist new_value;
+ int num_accepted;
+
+ void collect(versiont_t oldpn);
+ void handle_collect(MMonPaxos*);
void handle_last(MMonPaxos*);
+ void begin(bufferlist& value);
+ void handle_begin(MMonPaxos*);
void handle_accept(MMonPaxos*);
- void handle_ack(MMonPaxos*);
- void handle_old_round(MMonPaxos*);
-
- version_t get_new_proposal_number(version_t gt=0);
-
- // accepter
- map<version_t, bufferlist> accepted_values;
- map<version_t, version_t> accepted_pn;
-
- void handle_collect(MMonPaxos*);
+ void commit();
+ void handle_commit(MMonPaxos*);
- // learner
- void handle_success(MMonPaxos*);
- void handle_begin(MMonPaxos*);
+ version_t get_new_proposal_number(version_t gt=0);
-
public:
Paxos(Monitor *m, int w,
int mid,const char *mnm) : mon(m), whoami(w),