]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
wrote it out.. hmm..
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Mar 2007 04:27:44 +0000 (04:27 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 2 Mar 2007 04:27:44 +0000 (04:27 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1157 29311d96-e01e-0410-9327-a35deaab8ce9

branches/riccardo/monitor2/messages/MMonPaxos.h
branches/riccardo/monitor2/mon/Paxos.cc
branches/riccardo/monitor2/mon/Paxos.h

index 10265374fc56dd2a8eb5e8e0c9f10dae980d153c..75f620c303f5ac67a543a4e31e0749c4be4aba97 100644 (file)
@@ -28,40 +28,33 @@ class MMonPaxos : public Message {
   const static int OP_SUCCESS = 7;   // proposer: notify learners of agreed value
   const static int OP_ACK = 8;          // learner:  notify proposer that new value has been saved
 
+  // which state machine?
   int op;   
   int machine_id;
-  version_t pn;
-  version_t v;
-  bufferlist value;
+  
+  version_t last_committed;  // i've committed to
+  version_t pn_from;         // i promise to accept after
+  version_t pn;              // with with proposal
+  map<version_t,bufferlist> values;
+  version_t old_accepted_pn;     // previous pn, if we are a LAST with an uncommitted value
 
   MMonPaxos() : Message(MSG_MON_PAXOS) {}
-  MMonPaxos(int o, int mid, 
-           version_t _pn, version_t _v) : Message(MSG_MON_PAXOS),
-                                          op(o), machine_id(mid), 
-                                          pn(_pn), v(_v) {}
-  MMonPaxos(int o, int mid, 
-           version_t _pn, version_t _v, 
-           bufferlist& b) : Message(MSG_MON_PAXOS),
-                            op(o), machine_id(mid),
-                            pn(_pn), v(_v),
-                            value(b) {}
+  MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS),
+                             op(o), machine_id(mid) {}
   
   virtual char *get_type_name() { return "paxos"; }
   
   void print(ostream& out) {
     out << "paxos(op " << op
-       << ", machine " << machine_id
-       << ", proposal " << proposal 
-       << ", state " << n 
-       << ", " << value.length() << " bytes)";
+       << ", machine " << machine_id << ")";
   }
 
   void encode_payload() {
     payload.append((char*)&op, sizeof(op));
     payload.append((char*)&machine_id, sizeof(machine_id));
-    payload.append((char*)&proposal, sizeof(proposal));
-    payload.append((char*)&n, sizeof(n));
-    ::_encode(value, payload);
+    payload.append((char*)&last_committed, sizeof(last_committed));
+    ::_encode(values, payload);
+    ::_encode(pns, payload);
   }
   void decode_payload() {
     int off = 0;
@@ -69,11 +62,10 @@ class MMonPaxos : public Message {
     off += sizeof(op);
     payload.copy(off, sizeof(machine_id), (char*)&machine_id);
     off += sizeof(machine_id);
-    payload.copy(off, sizeof(proposal), (char*)&proposal);
-    off += sizeof(proposal);
-    payload.copy(off, sizeof(n), (char*)&n);
-    off += sizeof(n);
-    ::_decode(value, payload, off);
+    payload.copy(off, sizeof(last_committed), (char*)&last_committed);
+    off += sizeof(last_committed);
+    ::_decode(values, payload, off);
+    ::_decode(pns, payload, off);
   }
 };
 
index ea1ed59c1462a43ac1267bb6b0cfea4672efb60b..57b69f8a4ca764007bc6a8deae8d0dc04b1643c8 100644 (file)
 // PHASE 1
 
 // proposer
-void Paxos::leader_start()
-{
-  dout(10) << "i am the leader, start paxos" << endl;
   
+void Paxos::collect(version_t oldpn)
+{
   // reset the number of lasts received
-  last_num = 0;
-  my_pn = get_new_proposal_number();
-  last_pn = 0;
+  accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
+  accepted_pn_from = last_committed;
+  num_last = 1;
+  old_accepted_pn = 0;
+  old_accepted_value.clear();
+
+  dout(10) << "collect with pn " << accepted_pn << endl;
 
   // send collect
   for (int i=0; i<mon->monmap->num_mon; ++i) {
     if (i == whoami) continue;
-    // todo high rf I pass the pn twice... what is the last parameter for?
-    mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COLLECT, whoami, pn, 0),
-                                mon->monmap->get_inst(i));
+    
+    MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
+    collect->last_committed = last_committed;
+    collect->pn = my_pn;
+    mon->messenger->send_message(collect, mon->monmap->get_inst(i));
   }
 }
 
-void Paxos::handle_collect(MMonPaxos *m)
+void Paxos::handle_collect(MMonPaxos *collect)
 {
-  if (m->pn > accepted_pn[m->v]) {
-    dout(10) << "handle_collect - replied LAST" << *m << endl;
-    // send ack/last
-    mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_LAST, machine_id,
-                                              accepted_pn[m->v], m->v,
-                                              accepted_value[m->v]));
-    accepted_pn[m->v] = m->pn;
+  dout(10) << "handle_collect " << *collect << endl;
+
+  // reply
+  MMonPaxos *reply = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
+  reply->last_committed = last_committed;
+  
+  // do we have an accepted but uncommitted value?
+  //  (it'll be at last_committed+1)
+  bufferlist bl;
+  mon->store->get_bl(bl, machine_name, last_committed+1);
+  if (bl.length()) {
+    dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl;
+    last->values[last_committed+1] = bl;
+    last->old_accepted_pn = accepted_pn;
   }
-  else {
-    dout(10) << "handle_collect - replied OLDROUND" << *m << endl;
-    // send nak/oldround
-    mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_LAST, machine_id,
-                                              accepted_pn[m->v], m->v, 
-                                              accepted_value[m->v]));
+
+  // can we accept this pn?
+  if (collect->pn > accepted_pn) {
+    // ok, accept it
+    accepted_pn = collect->pn;
+    accepted_pn_from = collect->pn_from;
+    dout(10) << "accepting " << accepted_pn << " from " << accepted_pn_from << endl;
+  } else {
+    // don't accept!
+    dout(10) << "NOT accepting " << collect->pn << " from " << collect->pn_from 
+            << ", we already accepted " << accepted_pn << " from " << accepted_pn_from 
+            << endl;
   }
-  
-  delete m;
+  reply->pn = accepted_pn;
+  reply->pn_from = accepted_pn_from;
+
+  // and share whatever data we have
+  for (version_t v = collect->last_committed;
+       v <= last_committed;
+       v++) {
+    mon->store->get_bl(reply->values[v], machine_name, v);
+    dout(10) << " sharing " << v << " " 
+            << reply->values[v].length() << " bytes" << endl;
+  }
+
+  // send reply
+  mon->messenger->send_message(reply, collect->get_source_inst());
+  delete collect;
 }
 
-void Paxos::handle_last(MMonPaxos *m)
-{
-  dout(10) << "handle_last " << *m << endl;
 
-  if (m->pn > last_pn) {
-    dout(10) << " accepter accepted pn " << m->pn << ", taking that value" << endl;
-    last_value = m->value;
+void Paxos::handle_last(MMonPaxos *last)
+{
+  dout(10) << "handle_last " << *last << endl;
+
+  // share committed values?
+  if (last->last_committed < last_commited) {
+    // share committed values
+    dout(10) << "sending commit to " << last->get_source() << endl;
+    MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+    for (version_t v = last->last_committed;
+        v <= last_committed;
+        v++) {
+      mon->store->get_bl(commit->values[v], machine_name, v);
+      dout(10) << "sharing " << v << " " 
+              << commit->values[v].length() << " bytes" << endl;
+    }
+    mon->messenger->send_message(commit, last->get_source_inst());
   }
 
-  // majority?
-  bool had = have_majority();
-  num_last++;
-
-  if (!had && have_majority()) {
-    dout(5) << "handle_last - we just got a majority" << endl;
+  // did we receive committed value?
+  if (last->last_committed > last_commited) {
+    for (version_t v = last_committed;
+        v <= last->last_committed;
+        v++) {
+      mon->store->put_bl(last->values[v], machine_name, v);
+      dout(10) << "committing " << v << " " 
+              << commit->values[v].length() << " bytes" << endl;
+    }
+    last_commited = last->last_committed;
+    mon->store->put_int(last_committed, machine_name, "last_commtted");
+    dout(10) << "last_committed now " << last_committed << endl;
+  }
+      
+  // do they accept your pn?
+  if (last->accepted_pn > accepted_pn) {
+    dout(10) << "uh oh, they have a higher pn than us.  pick a new one." << endl;
+    leader_start(last->accepted_pn);
+  } else {
+    // they accepted our pn.  great.
+    num_last++;
+    dout(10) << "great, they accepted out pn, we now have " << num_last << endl;
+
+    // did this person send back an accepted but uncommitted value?
+    if (last->accepted_pn) {
+      version_t v = last->last_committed+1;
+      if (v > old_accepted_pn) {
+       dout(10) << "we learned an old value for " << v << " pn " << last->old_pn;
+       old_accepted_pn = old_accepted_pn;
+       old_accepted_value = last->values[v];
+      }
+    }
 
-    if (last_pn) {
-      // propose previous value
-      propose();
-    } else {
-      // it's unconstrained.
+    // do we have a majority?
+    if (num_last == mon->monmap->nummon/2+1) {
+      // do this once.
+      // did we learn an old value?
+      if (!old_accepted_value.empty()) {
+       begin(old_accepted_value);
+      }
     }
   }
 
+  delete last;
+}
+
+
+void Paxos::begin(bufferlist& v)
+{
+  dout(10) << "begin for " << last_committed+1 << " " 
+          << new_value.length() << " bytes"
+          << endl;
+
+  // we must already have a majority for this to work.
+  assert(num_last > mon->monmap->nummon/2);
 
+  // and no value, yet.
+  assert(new_value.empty());
 
+  // accept it ourselves
+  num_accepted = 1;
+  new_value = v;
+  mon->store->put_bl(new_value, machine_name, last_committed+1);
 
-  // use == instead of > so that if we receive additional LAST messages, we will not do this again
-  if (num_last == (unsigned)(mon->monmap->num_mon / 2)+1) {
-    dout(5) << "handle_last - got majority" << endl;
+  // ask others to accept it to!
+  for (int i=0; i<mon->monmap->num_mon; ++i) {
+    if (i == whoami) continue;
 
-    // propose
-    propose
+    dout(10) << " sending begin to mon" << i << endl;
+    MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id);
+    begin->values[last_committed+1] = new_value;
+    begin->pn = accepted_pn;
     
-    // bcast to everyone else - begin
-    num_accept = 0;
-    for (int i=0; i<mon->monmap->num_mon; ++i) {
-      if (i == whoami) continue;
-      // we only set up our value if we get the majority, so for now we don't save it
-      // todo high rf: where is the data we want to send?
-      mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id, 
-                                                pn, DATA_TO_SEND),
-                                  mon->monmap->get_inst(i));
-    }
+    mon->messenger->send_message(begin, mon->monmap->get_inst(i));
   }
-  delete m;
 }
 
-void Paxos::handle_oldround(MMonPaxos *m)
+void Paxos::handle_begin(MMonPaxos *begin)
 {
-  dout(10) << "handle_oldround " << *m << endl;
+  dout(10) << "handle_begin " << *begin << endl;
 
-  // the state is already constrained because an acceptor has
-  // accepted with a higher pn than ours.
+  // can we accept this?
+  if (begin->pn != accepted_pn) {
+    dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl;
+    delete begin;
+    return;
+  }
   
-  // in any case, we can propose for this state.
-
-  last_value = m->value;
-  last_pn = m->pn;
+  // yes.
+  dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
+  version_t v = last_committed+1;
+  mon->store->put_bl(begin->values[v], machine_name, v);
   
-  // ?
-  if (have_majority())
-    propose(); 
-
-  delete m;
+  // reply
+  MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
+  accept->pn = accepted_pn;
+  accept->last_committed = last_committed;
+  mon->messenger->send_message(accept);
+  
+  delete begin;
 }
 
 
-
-void Paxos::handle_accept(MMonPaxos *m)
+void Paxos::handle_accept(MMonPaxos *accept)
 {
-  dout(10) << "handle_accept " << *m << endl;
-  num_accepts++;
-  if (num_accepts == (unsigned)(mon->monmap->num_mon / 2)+1){
-    dout(5) << "handle_accept - bcast commit messages" << *m << endl;
-    for (int i=0; i<mon->monmap->num_mon; ++i) {
-      mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COMMIT, 
-                                                whoami, 
-                                                pn, 
-                                                DATA_TO_COMMIT),
-                                  mon->monmap->get_inst(i));
-    }
-  }
-  
-  delete m;
+  dout(10) << "handle_accept " << *accept << endl;
   
-}
+  if (accept->pn != accepted_pn) {
+    // we accepted a higher pn, from some other leader
+    dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl;
+    delete accept;
+    return;
+  }
+  if (accept->last_committed != last_committed) {
+    dout(10) << " this is from an old round that's already committed, ignoring" << endl;
+    delete accept;
+    return;
+  }
 
+  num_accepted++;
+  dout(10) << "now " << num_accepted << " have accepted" << endl;
 
-void Paxos::propose(version_t v, bufferlist& value)
-{
-  last_value = value;
-  last_version = v;
+  // new majority?
+  if (num_accepted == mon->monmap->nummon/2+1) {
+    // yay, commit!
+    dout(10) << "we got a majority, committing too" << endl;
+    commit();
+  }  
 
-  // kick start whatever
-  // send some message
 }
-  
 
+void Paxos::commit()
+{
+  dout(10) << "commit " << last_committed+1 << endl;
 
+  // commit locally
+  last_committed++;
+  mon->store->put_int(last_committed, machine_name, "last_committed");
+
+  // tell everyone
+  for (int i=0; i<mon->monmap->num_mon; ++i) {
+    if (i == whoami) continue;
+
+    dout(10) << " sending commit to mon" << i << endl;
+    MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+    commit->values[last_committed] = new_value;
+    commit->pn = accepted_pn;
+    
+    mon->messenger->send_message(commit, mon->monmap->get_inst(i));
+  }
+
+  // get ready for a new round.
+  new_value.clear();
 
-void Paxos::handle_ack(MMonPaxos *m)
-{
-//todo high rf: Do we have to do anything here?!?
-  dout(10) << "handle_ack " << *m << endl;
-  delete m;
 }
 
-void Paxos::handle_old_round(MMonPaxos *m)
+
+void Paxos::handle_commit(MMonPaxos *commit)
 {
-//todo high rf: should we get a new number (higher than the one returned by the other process) and try again?
-  dout(10) << "handle_old_round " << *m << endl;
-  delete m;
-}
+  dout(10) << "handle_commit on " << commit->last_committed << endl;
+
+  // commit locally.
+  last_committed = commit->last_committed;
+  mon->store->put_bl(commit->values[last_committed], machine_name, last_committed);
+  mon->store->put_int(last_committed, machine_name, "last_committed");
   
+  delete commit;
+}  
+
+
 
 /*
  * return a globally unique, monotonically increasing proposal number
@@ -208,57 +314,12 @@ version_t Paxos::get_new_proposal_number(version_t gt)
 }
 
 
-// ---------------------------------
-// accepter
-void Paxos::handle_collect(MMonPaxos *m)
-{
-  if (m->pn > accepted_pn[m->v]) {
-     dout(10) << "handle_collect - replied LAST" << *m << endl;
-     mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_LAST, machine_id,
-                                               m->pn, m->v));
-     accepted_pn[m->v] = m->pn;
-  }
-  else {
-    dout(10) << "handle_collect - replied OLDROUND" << *m << endl;
-    mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_OLDROUND, machine_id,
-                                              accepted_pn[m->v], m->v, 
-                                              accepted_value[m->v]));
-  }
-  
-  delete m;
-}
-
-
-
-
-// ---------------------------------
-// learner
-void Paxos::handle_success(MMonPaxos *m)
-{
-  dout(10) << "handle_success - commit results" << endl;
-  //todo high rf: copy from TEMP_RES TO RES
-  mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_ACK, whoaim));
-  delete m;
-}
-
-void Paxos::handle_begin(MMonPaxos *m)
+void Paxos::leader_start(version_t oldpn)
 {
-  if (m->pn >= last) {
-          dout(10) << "handle_begin - replied ACCEPT" << *m << endl;
-          // todo high rf: save the new monitor map to TEMP_RES
-          mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_ACCEPT, whoami, last));
-          last = m->pn;
-   }
-   else {
-          dout(10) << "handle_begin - replied OLDROUND" << *m << endl;
-          mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_OLDROUND, whoami, last));
-   }
-  delete m;
+  dout(10) << "i am the leader, start paxos" << endl;
+  collect(0);
 }
 
-// ---------------------------------
-
-
 
 void Paxos::dispatch(Message *m)
 {
@@ -279,10 +340,6 @@ void Paxos::dispatch(Message *m)
        handle_last(pm);
        break;
        
-      case MMonPaxos::OP_OLDROUND:
-       handle_old_round(pm);
-       break;
-       
       case MMonPaxos::OP_BEGIN:
        handle_begin(pm);
        break;
@@ -291,16 +348,12 @@ void Paxos::dispatch(Message *m)
        handle_accept(pm);
        break;          
        
-      case MMonPaxos::OP_SUCCESS:
-       handle_success(pm);
-       break;
-       
-      case MMonPaxos::OP_ACK:
-       handle_ack(pm);
+      case MMonPaxos::OP_COMMIT:
+       handle_commit(pm);
        break;
-       
-         default:
-           assert(0);
+
+      default:
+       assert(0);
       }
     }
     break;
index 622460d51e18dc4a04edcca04f6c90eb769f4c13..d184d0d59653cdd96d8ff6f47a7f86ba622f20ce 100644 (file)
 /*
 time---->
 
-ccccccccccccccccccaaa??????????????????????????????????????
-ccccccccccccccc????????????????????????????????????????????
-ccccccccccccccccccaaa??????????????????????????????????????
-ccccccccccccccccccaaa??????????????????????????????????????
-ccca???????????????????????????????????????????????????????
+cccccccccccccccccca????????????????????????????????????????
+cccccccccccccccccca????????????????????????????????????????
+cccccccccccccccccca???????????????????????????????????????? leader
+cccccccccccccccccc????????????????????????????????????????? 
+ccccc?????????????????????????????????????????????????????? 
 
-collect(v>2)
- last and values for each v>2
-  or
- oldround ...
+last_committed
 
+pn_from
+pn
 
-what values we\'ve accepted, and for at pn\'s
-what values we happen to know have been committed.
+a 12v 
+b 12v
+c 14v
+d
+e 12v
 
 
-dddddddddddddddd_??????????????????????????????????????????
 */
 
 #ifndef __MON_PAXOS_H
@@ -56,47 +57,31 @@ class Paxos {
   int machine_id;
   const char *machine_name;
 
-  // proposer
-  /*
-  version_t  last_version; // the last version i'm proposing
-  int        num_last;     // number of peers that have responded my collect
-  //version_t  last_pn;      // my proposal number
-  int        num_accept;   // number of peers that have accepted my propose
-
-  version_t  constrained_thru;
-  */
-  //map<version_t, version_t>  last_pn;
-
   // phase 1
-  version_t  my_pn;        // my pn
-  version_t  last_pn;      // the largest pn i've heard via a LAST
-  bufferlist last_value;   // the value i'm proposing
-  int        last_num;     // how many LAST's i've received
-  
-  bool have_majority() {
-    return num_accept > (mon->monmap->num_mon / 2)+1;
-  }
-
-  void propose(version_t v, bufferlist& value);
+  version_t last_committed;
+  version_t accepted_pn;
+  version_t accepted_pn_from;
   
+  // results from our last replies
+  int        num_last;
+  version_t  old_accepted_pn;
+  bufferlist old_accepted_value;
+
+  // phase 2
+  bufferlist new_value;
+  int        num_accepted;
+  void collect(versiont_t oldpn);
+  void handle_collect(MMonPaxos*);
   void handle_last(MMonPaxos*);
+  void begin(bufferlist& value);
+  void handle_begin(MMonPaxos*);
   void handle_accept(MMonPaxos*);
-  void handle_ack(MMonPaxos*);
-  void handle_old_round(MMonPaxos*);
-  
-  version_t get_new_proposal_number(version_t gt=0);
-  
-  // accepter
-  map<version_t, bufferlist> accepted_values;
-  map<version_t, version_t>  accepted_pn;
-
-  void handle_collect(MMonPaxos*);
+  void commit();
+  void handle_commit(MMonPaxos*);
 
-  // learner
-  void handle_success(MMonPaxos*);
-  void handle_begin(MMonPaxos*);
+  version_t get_new_proposal_number(version_t gt=0);
   
-
 public:
   Paxos(Monitor *m, int w,
        int mid,const char *mnm) : mon(m), whoami(w),