]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
compiles, too
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Mar 2007 04:35:56 +0000 (04:35 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Mar 2007 04:35:56 +0000 (04:35 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1158 29311d96-e01e-0410-9327-a35deaab8ce9

branches/riccardo/monitor2/messages/MMonPaxos.h
branches/riccardo/monitor2/mon/Paxos.cc
branches/riccardo/monitor2/mon/Paxos.h

index 75f620c303f5ac67a543a4e31e0749c4be4aba97..692b943c8b9a9c0fc597b08cece2277aab7ce397 100644 (file)
@@ -22,11 +22,9 @@ class MMonPaxos : public Message {
   // op types
   const static int OP_COLLECT = 1;   // proposer: propose round
   const static int OP_LAST = 2;                 // voter:    accept proposed round
-  const static int OP_OLDROUND = 3;     // voter:    notify proposer he proposed an old round
   const static int OP_BEGIN = 4;        // proposer: value proposed for this round
   const static int OP_ACCEPT = 5;       // voter:    accept propsed value
-  const static int OP_SUCCESS = 7;   // proposer: notify learners of agreed value
-  const static int OP_ACK = 8;          // learner:  notify proposer that new value has been saved
+  const static int OP_COMMIT = 7;   // proposer: notify learners of agreed value
 
   // which state machine?
   int op;   
@@ -53,8 +51,8 @@ class MMonPaxos : public Message {
     payload.append((char*)&op, sizeof(op));
     payload.append((char*)&machine_id, sizeof(machine_id));
     payload.append((char*)&last_committed, sizeof(last_committed));
+    payload.append((char*)&old_accepted_pn, sizeof(old_accepted_pn));
     ::_encode(values, payload);
-    ::_encode(pns, payload);
   }
   void decode_payload() {
     int off = 0;
@@ -64,8 +62,9 @@ class MMonPaxos : public Message {
     off += sizeof(machine_id);
     payload.copy(off, sizeof(last_committed), (char*)&last_committed);
     off += sizeof(last_committed);
+    payload.copy(off, sizeof(old_accepted_pn), (char*)&old_accepted_pn);
+    off += sizeof(old_accepted_pn);
     ::_decode(values, payload, off);
-    ::_decode(pns, payload, off);
   }
 };
 
index 57b69f8a4ca764007bc6a8deae8d0dc04b1643c8..be3a952e3b16313501fe01cfa39a6cda6abed46a 100644 (file)
@@ -46,7 +46,7 @@ void Paxos::collect(version_t oldpn)
     
     MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
     collect->last_committed = last_committed;
-    collect->pn = my_pn;
+    collect->pn = accepted_pn;
     mon->messenger->send_message(collect, mon->monmap->get_inst(i));
   }
 }
@@ -56,13 +56,13 @@ void Paxos::handle_collect(MMonPaxos *collect)
   dout(10) << "handle_collect " << *collect << endl;
 
   // reply
-  MMonPaxos *reply = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
-  reply->last_committed = last_committed;
+  MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
+  last->last_committed = last_committed;
   
   // do we have an accepted but uncommitted value?
   //  (it'll be at last_committed+1)
   bufferlist bl;
-  mon->store->get_bl(bl, machine_name, last_committed+1);
+  mon->store->get_bl_sn(bl, machine_name, last_committed+1);
   if (bl.length()) {
     dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl;
     last->values[last_committed+1] = bl;
@@ -81,20 +81,20 @@ void Paxos::handle_collect(MMonPaxos *collect)
             << ", we already accepted " << accepted_pn << " from " << accepted_pn_from 
             << endl;
   }
-  reply->pn = accepted_pn;
-  reply->pn_from = accepted_pn_from;
+  last->pn = accepted_pn;
+  last->pn_from = accepted_pn_from;
 
   // and share whatever data we have
   for (version_t v = collect->last_committed;
        v <= last_committed;
        v++) {
-    mon->store->get_bl(reply->values[v], machine_name, v);
+    mon->store->get_bl_sn(last->values[v], machine_name, v);
     dout(10) << " sharing " << v << " " 
-            << reply->values[v].length() << " bytes" << endl;
+            << last->values[v].length() << " bytes" << endl;
   }
 
   // send reply
-  mon->messenger->send_message(reply, collect->get_source_inst());
+  mon->messenger->send_message(last, collect->get_source_inst());
   delete collect;
 }
 
@@ -104,14 +104,14 @@ void Paxos::handle_last(MMonPaxos *last)
   dout(10) << "handle_last " << *last << endl;
 
   // share committed values?
-  if (last->last_committed < last_commited) {
+  if (last->last_committed < last_committed) {
     // share committed values
     dout(10) << "sending commit to " << last->get_source() << endl;
     MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
     for (version_t v = last->last_committed;
         v <= last_committed;
         v++) {
-      mon->store->get_bl(commit->values[v], machine_name, v);
+      mon->store->get_bl_sn(commit->values[v], machine_name, v);
       dout(10) << "sharing " << v << " " 
               << commit->values[v].length() << " bytes" << endl;
     }
@@ -119,43 +119,42 @@ void Paxos::handle_last(MMonPaxos *last)
   }
 
   // did we receive committed value?
-  if (last->last_committed > last_commited) {
+  if (last->last_committed > last_committed) {
     for (version_t v = last_committed;
         v <= last->last_committed;
         v++) {
-      mon->store->put_bl(last->values[v], machine_name, v);
+      mon->store->put_bl_sn(last->values[v], machine_name, v);
       dout(10) << "committing " << v << " " 
-              << commit->values[v].length() << " bytes" << endl;
+              << last->values[v].length() << " bytes" << endl;
     }
-    last_commited = last->last_committed;
+    last_committed = last->last_committed;
     mon->store->put_int(last_committed, machine_name, "last_commtted");
     dout(10) << "last_committed now " << last_committed << endl;
   }
       
   // do they accept your pn?
-  if (last->accepted_pn > accepted_pn) {
+  if (last->old_accepted_pn > accepted_pn) {
     dout(10) << "uh oh, they have a higher pn than us.  pick a new one." << endl;
-    leader_start(last->accepted_pn);
+    collect(last->old_accepted_pn);
   } else {
     // they accepted our pn.  great.
     num_last++;
     dout(10) << "great, they accepted out pn, we now have " << num_last << endl;
 
     // did this person send back an accepted but uncommitted value?
-    if (last->accepted_pn) {
+    if (last->old_accepted_pn &&
+       last->old_accepted_pn > old_accepted_pn) {
       version_t v = last->last_committed+1;
-      if (v > old_accepted_pn) {
-       dout(10) << "we learned an old value for " << v << " pn " << last->old_pn;
-       old_accepted_pn = old_accepted_pn;
-       old_accepted_value = last->values[v];
-      }
+      dout(10) << "we learned an old value for " << v << " pn " << last->old_accepted_pn;
+      old_accepted_pn = last->old_accepted_pn;
+      old_accepted_value = last->values[v];
     }
-
+    
     // do we have a majority?
-    if (num_last == mon->monmap->nummon/2+1) {
+    if (num_last == mon->monmap->num_mon/2+1) {
       // do this once.
       // did we learn an old value?
-      if (!old_accepted_value.empty()) {
+      if (!old_accepted_value.length()) {
        begin(old_accepted_value);
       }
     }
@@ -172,15 +171,15 @@ void Paxos::begin(bufferlist& v)
           << endl;
 
   // we must already have a majority for this to work.
-  assert(num_last > mon->monmap->nummon/2);
+  assert(num_last > mon->monmap->num_mon/2);
 
   // and no value, yet.
-  assert(new_value.empty());
+  assert(new_value.length() == 0);
 
   // accept it ourselves
   num_accepted = 1;
   new_value = v;
-  mon->store->put_bl(new_value, machine_name, last_committed+1);
+  mon->store->put_bl_sn(new_value, machine_name, last_committed+1);
 
   // ask others to accept it to!
   for (int i=0; i<mon->monmap->num_mon; ++i) {
@@ -207,15 +206,15 @@ void Paxos::handle_begin(MMonPaxos *begin)
   }
   
   // yes.
-  dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
   version_t v = last_committed+1;
-  mon->store->put_bl(begin->values[v], machine_name, v);
+  dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
+  mon->store->put_bl_sn(begin->values[v], machine_name, v);
   
   // reply
   MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
   accept->pn = accepted_pn;
   accept->last_committed = last_committed;
-  mon->messenger->send_message(accept);
+  mon->messenger->send_message(accept, begin->get_source_inst());
   
   delete begin;
 }
@@ -241,7 +240,7 @@ void Paxos::handle_accept(MMonPaxos *accept)
   dout(10) << "now " << num_accepted << " have accepted" << endl;
 
   // new majority?
-  if (num_accepted == mon->monmap->nummon/2+1) {
+  if (num_accepted == mon->monmap->num_mon/2+1) {
     // yay, commit!
     dout(10) << "we got a majority, committing too" << endl;
     commit();
@@ -281,7 +280,7 @@ void Paxos::handle_commit(MMonPaxos *commit)
 
   // commit locally.
   last_committed = commit->last_committed;
-  mon->store->put_bl(commit->values[last_committed], machine_name, last_committed);
+  mon->store->put_bl_sn(commit->values[last_committed], machine_name, last_committed);
   mon->store->put_int(last_committed, machine_name, "last_committed");
   
   delete commit;
@@ -314,7 +313,7 @@ version_t Paxos::get_new_proposal_number(version_t gt)
 }
 
 
-void Paxos::leader_start(version_t oldpn)
+void Paxos::leader_start()
 {
   dout(10) << "i am the leader, start paxos" << endl;
   collect(0);
index d184d0d59653cdd96d8ff6f47a7f86ba622f20ce..c635cc78b7f8efc1a38caf31426eb0faf4d07370 100644 (file)
@@ -71,7 +71,7 @@ class Paxos {
   bufferlist new_value;
   int        num_accepted;
  
-  void collect(versiont_t oldpn);
+  void collect(version_t oldpn);
   void handle_collect(MMonPaxos*);
   void handle_last(MMonPaxos*);
   void begin(bufferlist& value);