From 6f36b1ef49ec8301c6ece7b04329ca933556bd88 Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 2 Mar 2007 04:35:56 +0000 Subject: [PATCH] compiles, too git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1158 29311d96-e01e-0410-9327-a35deaab8ce9 --- .../riccardo/monitor2/messages/MMonPaxos.h | 9 ++- branches/riccardo/monitor2/mon/Paxos.cc | 69 +++++++++---------- branches/riccardo/monitor2/mon/Paxos.h | 2 +- 3 files changed, 39 insertions(+), 41 deletions(-) diff --git a/branches/riccardo/monitor2/messages/MMonPaxos.h b/branches/riccardo/monitor2/messages/MMonPaxos.h index 75f620c303f5a..692b943c8b9a9 100644 --- a/branches/riccardo/monitor2/messages/MMonPaxos.h +++ b/branches/riccardo/monitor2/messages/MMonPaxos.h @@ -22,11 +22,9 @@ class MMonPaxos : public Message { // op types const static int OP_COLLECT = 1; // proposer: propose round const static int OP_LAST = 2; // voter: accept proposed round - const static int OP_OLDROUND = 3; // voter: notify proposer he proposed an old round const static int OP_BEGIN = 4; // proposer: value proposed for this round const static int OP_ACCEPT = 5; // voter: accept propsed value - const static int OP_SUCCESS = 7; // proposer: notify learners of agreed value - const static int OP_ACK = 8; // learner: notify proposer that new value has been saved + const static int OP_COMMIT = 7; // proposer: notify learners of agreed value // which state machine? int op; @@ -53,8 +51,8 @@ class MMonPaxos : public Message { payload.append((char*)&op, sizeof(op)); payload.append((char*)&machine_id, sizeof(machine_id)); payload.append((char*)&last_committed, sizeof(last_committed)); + payload.append((char*)&old_accepted_pn, sizeof(old_accepted_pn)); ::_encode(values, payload); - ::_encode(pns, payload); } void decode_payload() { int off = 0; @@ -64,8 +62,9 @@ class MMonPaxos : public Message { off += sizeof(machine_id); payload.copy(off, sizeof(last_committed), (char*)&last_committed); off += sizeof(last_committed); + payload.copy(off, sizeof(old_accepted_pn), (char*)&old_accepted_pn); + off += sizeof(old_accepted_pn); ::_decode(values, payload, off); - ::_decode(pns, payload, off); } }; diff --git a/branches/riccardo/monitor2/mon/Paxos.cc b/branches/riccardo/monitor2/mon/Paxos.cc index 57b69f8a4ca76..be3a952e3b163 100644 --- a/branches/riccardo/monitor2/mon/Paxos.cc +++ b/branches/riccardo/monitor2/mon/Paxos.cc @@ -46,7 +46,7 @@ void Paxos::collect(version_t oldpn) MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id); collect->last_committed = last_committed; - collect->pn = my_pn; + collect->pn = accepted_pn; mon->messenger->send_message(collect, mon->monmap->get_inst(i)); } } @@ -56,13 +56,13 @@ void Paxos::handle_collect(MMonPaxos *collect) dout(10) << "handle_collect " << *collect << endl; // reply - MMonPaxos *reply = new MMonPaxos(MMonPaxos::OP_LAST, machine_id); - reply->last_committed = last_committed; + MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id); + last->last_committed = last_committed; // do we have an accepted but uncommitted value? // (it'll be at last_committed+1) bufferlist bl; - mon->store->get_bl(bl, machine_name, last_committed+1); + mon->store->get_bl_sn(bl, machine_name, last_committed+1); if (bl.length()) { dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl; last->values[last_committed+1] = bl; @@ -81,20 +81,20 @@ void Paxos::handle_collect(MMonPaxos *collect) << ", we already accepted " << accepted_pn << " from " << accepted_pn_from << endl; } - reply->pn = accepted_pn; - reply->pn_from = accepted_pn_from; + last->pn = accepted_pn; + last->pn_from = accepted_pn_from; // and share whatever data we have for (version_t v = collect->last_committed; v <= last_committed; v++) { - mon->store->get_bl(reply->values[v], machine_name, v); + mon->store->get_bl_sn(last->values[v], machine_name, v); dout(10) << " sharing " << v << " " - << reply->values[v].length() << " bytes" << endl; + << last->values[v].length() << " bytes" << endl; } // send reply - mon->messenger->send_message(reply, collect->get_source_inst()); + mon->messenger->send_message(last, collect->get_source_inst()); delete collect; } @@ -104,14 +104,14 @@ void Paxos::handle_last(MMonPaxos *last) dout(10) << "handle_last " << *last << endl; // share committed values? - if (last->last_committed < last_commited) { + if (last->last_committed < last_committed) { // share committed values dout(10) << "sending commit to " << last->get_source() << endl; MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id); for (version_t v = last->last_committed; v <= last_committed; v++) { - mon->store->get_bl(commit->values[v], machine_name, v); + mon->store->get_bl_sn(commit->values[v], machine_name, v); dout(10) << "sharing " << v << " " << commit->values[v].length() << " bytes" << endl; } @@ -119,43 +119,42 @@ void Paxos::handle_last(MMonPaxos *last) } // did we receive committed value? - if (last->last_committed > last_commited) { + if (last->last_committed > last_committed) { for (version_t v = last_committed; v <= last->last_committed; v++) { - mon->store->put_bl(last->values[v], machine_name, v); + mon->store->put_bl_sn(last->values[v], machine_name, v); dout(10) << "committing " << v << " " - << commit->values[v].length() << " bytes" << endl; + << last->values[v].length() << " bytes" << endl; } - last_commited = last->last_committed; + last_committed = last->last_committed; mon->store->put_int(last_committed, machine_name, "last_commtted"); dout(10) << "last_committed now " << last_committed << endl; } // do they accept your pn? - if (last->accepted_pn > accepted_pn) { + if (last->old_accepted_pn > accepted_pn) { dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl; - leader_start(last->accepted_pn); + collect(last->old_accepted_pn); } else { // they accepted our pn. great. num_last++; dout(10) << "great, they accepted out pn, we now have " << num_last << endl; // did this person send back an accepted but uncommitted value? - if (last->accepted_pn) { + if (last->old_accepted_pn && + last->old_accepted_pn > old_accepted_pn) { version_t v = last->last_committed+1; - if (v > old_accepted_pn) { - dout(10) << "we learned an old value for " << v << " pn " << last->old_pn; - old_accepted_pn = old_accepted_pn; - old_accepted_value = last->values[v]; - } + dout(10) << "we learned an old value for " << v << " pn " << last->old_accepted_pn; + old_accepted_pn = last->old_accepted_pn; + old_accepted_value = last->values[v]; } - + // do we have a majority? - if (num_last == mon->monmap->nummon/2+1) { + if (num_last == mon->monmap->num_mon/2+1) { // do this once. // did we learn an old value? - if (!old_accepted_value.empty()) { + if (!old_accepted_value.length()) { begin(old_accepted_value); } } @@ -172,15 +171,15 @@ void Paxos::begin(bufferlist& v) << endl; // we must already have a majority for this to work. - assert(num_last > mon->monmap->nummon/2); + assert(num_last > mon->monmap->num_mon/2); // and no value, yet. - assert(new_value.empty()); + assert(new_value.length() == 0); // accept it ourselves num_accepted = 1; new_value = v; - mon->store->put_bl(new_value, machine_name, last_committed+1); + mon->store->put_bl_sn(new_value, machine_name, last_committed+1); // ask others to accept it to! for (int i=0; imonmap->num_mon; ++i) { @@ -207,15 +206,15 @@ void Paxos::handle_begin(MMonPaxos *begin) } // yes. - dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl; version_t v = last_committed+1; - mon->store->put_bl(begin->values[v], machine_name, v); + dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl; + mon->store->put_bl_sn(begin->values[v], machine_name, v); // reply MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id); accept->pn = accepted_pn; accept->last_committed = last_committed; - mon->messenger->send_message(accept); + mon->messenger->send_message(accept, begin->get_source_inst()); delete begin; } @@ -241,7 +240,7 @@ void Paxos::handle_accept(MMonPaxos *accept) dout(10) << "now " << num_accepted << " have accepted" << endl; // new majority? - if (num_accepted == mon->monmap->nummon/2+1) { + if (num_accepted == mon->monmap->num_mon/2+1) { // yay, commit! dout(10) << "we got a majority, committing too" << endl; commit(); @@ -281,7 +280,7 @@ void Paxos::handle_commit(MMonPaxos *commit) // commit locally. last_committed = commit->last_committed; - mon->store->put_bl(commit->values[last_committed], machine_name, last_committed); + mon->store->put_bl_sn(commit->values[last_committed], machine_name, last_committed); mon->store->put_int(last_committed, machine_name, "last_committed"); delete commit; @@ -314,7 +313,7 @@ version_t Paxos::get_new_proposal_number(version_t gt) } -void Paxos::leader_start(version_t oldpn) +void Paxos::leader_start() { dout(10) << "i am the leader, start paxos" << endl; collect(0); diff --git a/branches/riccardo/monitor2/mon/Paxos.h b/branches/riccardo/monitor2/mon/Paxos.h index d184d0d59653c..c635cc78b7f8e 100644 --- a/branches/riccardo/monitor2/mon/Paxos.h +++ b/branches/riccardo/monitor2/mon/Paxos.h @@ -71,7 +71,7 @@ class Paxos { bufferlist new_value; int num_accepted; - void collect(versiont_t oldpn); + void collect(version_t oldpn); void handle_collect(MMonPaxos*); void handle_last(MMonPaxos*); void begin(bufferlist& value); -- 2.39.5