]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* more paxos work
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 14 Jun 2007 01:06:23 +0000 (01:06 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 14 Jun 2007 01:06:23 +0000 (01:06 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1418 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/messages/MMonPaxos.h
trunk/ceph/mon/Elector.cc
trunk/ceph/mon/Elector.h
trunk/ceph/mon/Monitor.cc
trunk/ceph/mon/Monitor.h
trunk/ceph/mon/Paxos.cc
trunk/ceph/mon/Paxos.h
trunk/ceph/msg/FakeMessenger.cc

index a5236e84722705da93f03e4410c655f3227c2c51..f766c9254cb4abb045c347dc977aecd2b0f2bd86 100644 (file)
@@ -39,10 +39,10 @@ class MMonPaxos : public Message {
     }
   }
 
-  // which state machine?
-  int op;   
-  int machine_id;
-  
+  epoch_t epoch;   // monitor epoch
+  int op;          // paxos op
+  int machine_id;  // which state machine?
+
   version_t last_committed;  // i've committed to
   version_t pn_from;         // i promise to accept after
   version_t pn;              // with with proposal
@@ -52,9 +52,11 @@ class MMonPaxos : public Message {
   map<version_t,bufferlist> values;
 
   MMonPaxos() : Message(MSG_MON_PAXOS) {}
-  MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS),
-                             op(o), machine_id(mid),
-                             last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
+  MMonPaxos(epoch_t e, int o, int mid) : 
+    Message(MSG_MON_PAXOS),
+    epoch(e),
+    op(o), machine_id(mid),
+    last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
   
   virtual char *get_type_name() { return "paxos"; }
   
@@ -66,6 +68,7 @@ class MMonPaxos : public Message {
   }
 
   void encode_payload() {
+    ::_encode(epoch, payload);
     ::_encode(op, payload);
     ::_encode(machine_id, payload);
     ::_encode(last_committed, payload);
@@ -77,6 +80,7 @@ class MMonPaxos : public Message {
   }
   void decode_payload() {
     int off = 0;
+    ::_decode(epoch, payload, off);
     ::_decode(op, payload, off);
     ::_decode(machine_id, payload, off);
     ::_decode(last_committed, payload, off);
index 345a38b171e16aaf4212560023de2dbe8a8754f7..cdfce72bb0681cb8ebfd62b5ffe25755f00167a2 100644 (file)
@@ -33,6 +33,12 @@ void Elector::init()
   dout(1) << "init, last seen epoch " << epoch << endl;
 }
 
+void Elector::shutdown()
+{
+  if (expire_event)
+    mon->timer.cancel_event(expire_event);
+}
+
 void Elector::bump_epoch(epoch_t e) 
 {
   dout(10) << "bump_epoch " << epoch << " to " << e << endl;
@@ -94,15 +100,17 @@ void Elector::reset_timer(double plus)
   // set the timer
   cancel_timer();
   expire_event = new C_ElectionExpire(this);
-  g_timer.add_event_after(g_conf.mon_lease + plus,
-                         expire_event);
+  mon->timer.add_event_after(g_conf.mon_lease + plus,
+                            expire_event);
 }
 
 
 void Elector::cancel_timer()
 {
-  if (expire_event)
-    g_timer.cancel_event(expire_event);
+  if (expire_event) {
+    mon->timer.cancel_event(expire_event);
+    expire_event = 0;
+  }
 }
 
 void Elector::expire()
index 5a0edf70d1b8c333f2d9fcea9ec7f7683e0096b2..9bfd7cb644fc7770f4a6dafe63c756a122a7dde9 100644 (file)
@@ -69,19 +69,23 @@ class Elector {
   void handle_propose(class MMonElection *m);
   void handle_ack(class MMonElection *m);
   void handle_victory(class MMonElection *m);
-
   
  public:  
-  Elector(Monitor *m, int w) : mon(m), whoami(w) {
-    // initialize all those values!
-    // ...
-  }
+  Elector(Monitor *m, int w) : mon(m), whoami(w),
+                              expire_event(0),
+                              epoch(0),
+                              electing_me(false),
+                              leader_acked(-1) { }
 
   void init();
+  void shutdown();
+
+  void dispatch(Message *m);
+
   void call_election() {
     start();
   }
-  void dispatch(Message *m);
+
 };
 
 
index 8b4f73f41b366dda02a2eee278bba99fd967db4c..1d55b6c7d849608bcf38dec44f3f5d1caf96dd5a 100644 (file)
@@ -90,6 +90,8 @@ void Monitor::shutdown()
 {
   dout(1) << "shutdown" << endl;
 
+  elector.shutdown();
+
   // cancel all events
   cancel_tick();
   timer.cancel_all();
@@ -255,16 +257,28 @@ void Monitor::dispatch(Message *m)
 
       // paxos
     case MSG_MON_PAXOS:
-      // send it to the right paxos instance
-      switch (((MMonPaxos*)m)->machine_id) {
-      case PAXOS_TEST:
-       test_paxos.dispatch(m);
-       break;
-      case PAXOS_OSDMAP:
-       //...
-       
-      default:
-       assert(0);
+      {
+       MMonPaxos *pm = (MMonPaxos*)m;
+
+       // sanitize
+       if (pm->epoch > mon_epoch) 
+         assert(0);    //call_election();   // wtf
+       if (pm->epoch != mon_epoch) {
+         delete pm;
+         break;
+       }
+
+       // send it to the right paxos instance
+       switch (pm->machine_id) {
+       case PAXOS_TEST:
+         test_paxos.dispatch(m);
+         break;
+       case PAXOS_OSDMAP:
+         //...
+         
+       default:
+         assert(0);
+       }
       }
       break;
 
index 0f001e302a92e13cfffe29aa9721c8c99835285b..e7e77bca305a40d11d7fe2b58785881c19d0453b 100644 (file)
@@ -80,16 +80,14 @@ private:
   utime_t last_called_election;  // [starting] last time i called an election
   
 public:
-  // initiate election
-  void call_election();
-
-  // end election (called by Elector)
-  void win_election(epoch_t epoch, set<int>& q);
-  void lose_election(epoch_t epoch, int l);
-
+  epoch_t get_epoch() { return mon_epoch; }
   int get_leader() { return leader; }
   const set<int>& get_quorum() { return quorum; }
 
+  void call_election();  // initiate election
+  void win_election(epoch_t epoch, set<int>& q);  // end election (called by Elector)
+  void lose_election(epoch_t epoch, int l);       // end election (called by Elector)
+
 
   // -- paxos --
   Paxos test_paxos;
index 8f200717c1a40b68a43255fb9fe6b67e1d1819d3..a0875e072bba3fcad594de3ba5fd024809063d99 100644 (file)
@@ -51,7 +51,7 @@ void Paxos::collect(version_t oldpn)
        ++p) {
     if (*p == whoami) continue;
     
-    MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
+    MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id);
     collect->last_committed = last_committed;
     collect->pn = accepted_pn;
     mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
@@ -74,7 +74,7 @@ void Paxos::handle_collect(MMonPaxos *collect)
   state = STATE_RECOVERING;
 
   // reply
-  MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
+  MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id);
   last->last_committed = last_committed;
   
   // do we have an accepted but uncommitted value?
@@ -135,7 +135,7 @@ void Paxos::handle_last(MMonPaxos *last)
   if (last->last_committed < last_committed) {
     // share committed values
     dout(10) << "sending commit to " << last->get_source() << endl;
-    MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+    MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
     for (version_t v = last->last_committed;
         v <= last_committed;
         v++) {
@@ -232,7 +232,7 @@ void Paxos::begin(bufferlist& v)
     if (*p == whoami) continue;
     
     dout(10) << " sending begin to mon" << *p << endl;
-    MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id);
+    MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id);
     begin->values[last_committed+1] = new_value;
     begin->last_committed = last_committed;
     begin->pn = accepted_pn;
@@ -264,7 +264,7 @@ void Paxos::handle_begin(MMonPaxos *begin)
   mon->store->put_bl_sn(begin->values[v], machine_name, v);
   
   // reply
-  MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
+  MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id);
   accept->pn = accepted_pn;
   accept->last_committed = last_committed;
   mon->messenger->send_message(accept, begin->get_source_inst());
@@ -325,7 +325,7 @@ void Paxos::commit()
     if (*p == whoami) continue;
 
     dout(10) << " sending commit to mon" << *p << endl;
-    MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+    MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
     commit->values[last_committed] = new_value;
     commit->pn = accepted_pn;
     
@@ -376,7 +376,7 @@ void Paxos::extend_lease()
        p != mon->get_quorum().end();
        ++p) {
     if (*p == whoami) continue;
-    MMonPaxos *lease = new MMonPaxos(MMonPaxos::OP_LEASE, machine_id);
+    MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
     lease->last_committed = last_committed;
     lease->lease_timeout = lease_timeout;
     mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
@@ -444,7 +444,7 @@ void Paxos::leader_init()
 {
   state = STATE_RECOVERING;
   lease_timeout = utime_t();
-  dout(10) << "leader_init -- i am the leader, starting paxos recovery" << endl;
+  dout(10) << "leader_init -- starting paxos recovery" << endl;
   collect(0);
 }
 
@@ -468,20 +468,13 @@ void Paxos::dispatch(Message *m)
     delete m;
     return;    
   }
-  
-  // from the proper leader?
-  if (mon->is_peon()) {
-    if (m->get_source().num() != mon->get_leader()) {
-      dout(5) << "dropping from non-leader " << m->get_source() << " " << *m << endl;
-      delete m;
-      return;
-    }
-  } 
-
-  assert(mon->is_peon() || mon->is_leader());
 
+  // check sanity
+  assert(mon->is_leader() || 
+        (mon->is_peon() && m->get_source().num() == mon->get_leader()));
+  
   switch (m->get_type()) {
-       
+    
   case MSG_MON_PAXOS:
     {
       MMonPaxos *pm = (MMonPaxos*)m;
@@ -492,27 +485,21 @@ void Paxos::dispatch(Message *m)
       case MMonPaxos::OP_COLLECT:
        handle_collect(pm);
        break;
-       
       case MMonPaxos::OP_LAST:
        handle_last(pm);
        break;
-       
       case MMonPaxos::OP_BEGIN:
        handle_begin(pm);
        break;
-       
       case MMonPaxos::OP_ACCEPT:
        handle_accept(pm);
        break;          
-       
       case MMonPaxos::OP_COMMIT:
        handle_commit(pm);
        break;
-       
       case MMonPaxos::OP_LEASE:
        handle_lease(pm);
        break;
-
       default:
        assert(0);
       }
index 63a7361b09b935c656a7c393a64f93214fe7f8de..43c1967ca7054cb6bab75e54504facd726bb0b0f 100644 (file)
@@ -35,6 +35,17 @@ e 12v
 
 */
 
+
+/*
+ * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways:
+ *  1- Only a single new value is generated at a time, simplifying the recovery logic.
+ *  2- Nodes track "committed" values, and share them generously (and trustingly)
+ *  3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to 
+ *     "read" their copy of the last committed value.
+ *
+ * This provides a simple replication substrate that services can be built on top of.
+ */
+
 #ifndef __MON_PAXOS_H
 #define __MON_PAXOS_H
 
index b7f02bdb40624a43bbf1a76aa96ceb52ec2a995f..19f8f4320ed68a7f578beb96e30ee83bcf07475b 100644 (file)
@@ -71,17 +71,17 @@ void *fakemessenger_thread(void *ptr)
 {
   lock.Lock();
   while (1) {
+    if (fm_shutdown) break;
+    fakemessenger_do_loop_2();
+    
+    if (directory.empty()) break;
+    
     dout(20) << "thread waiting" << endl;
     if (fm_shutdown) break;
     awake = false;
     cond.Wait(lock);
     awake = true;
     dout(20) << "thread woke up" << endl;
-    if (fm_shutdown) break;
-
-    fakemessenger_do_loop_2();
-
-    if (directory.empty()) break;
   }
   lock.Unlock();