]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
first part of election algorithm
authorrferrett <rferrett@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 18 Aug 2006 23:13:50 +0000 (23:13 +0000)
committerrferrett <rferrett@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 18 Aug 2006 23:13:50 +0000 (23:13 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@804 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/mon/Monitor.cc
ceph/mon/Monitor.h

index e81f2aadc433bef1dac9794c394666e5c1cb59dc..66bba63fef182ae592f06b043835138741b742c5 100644 (file)
 #include "messages/MOSDMap.h"
 #include "messages/MOSDGetMap.h"
 #include "messages/MOSDBoot.h"
+#include "messages/MMonElectionRefresh.h"
+#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionAck.h"
+#include "messages/MMonElectionCollect.h"
 
 #include "common/Timer.h"
 #include "common/Clock.h"
@@ -129,6 +133,53 @@ void Monitor::init()
 
 void Monitor::dispatch(Message *m)
 {
+<<<<<<< Monitor.cc
+  switch (m->get_type()) {
+  case MSG_FAILURE:
+       handle_failure((MFailure*)m);
+       break;
+       
+  case MSG_PING_ACK:
+       handle_ping_ack((MPingAck*)m);
+       break;
+
+  case MSG_OSD_GETMAP:
+       handle_osd_getmap((MOSDGetMap*)m);
+       return;
+
+  case MSG_OSD_BOOT:
+       handle_osd_boot((MOSDBoot*)m);
+       return;
+
+  case MSG_SHUTDOWN:
+       handle_shutdown(m);
+       return;
+
+  case MSG_PING:
+       tick();
+       delete m;
+       return;
+
+  case MSG_MON_ELECTION_ACK:
+  handle_msg_ack(m);
+  return;
+  
+  case MSG_MON_ELECTION_STATUS:
+  handle_msg_status(m);
+  return;
+  
+  case MSG_MON_ELECTION_COLLECT:
+  handle_msg_collect(m);
+  return;
+  
+  case MSG_MON_ELECTION_REFRESH:
+  handle_msg_refresh(m);
+  return;
+  
+  default:
+       dout(0) << "unknown message " << *m << endl;
+       assert(0);
+=======
   lock.Lock();
   {
        switch (m->get_type()) {
@@ -161,6 +212,7 @@ void Monitor::dispatch(Message *m)
          dout(0) << "unknown message " << *m << endl;
          assert(0);
        }
+>>>>>>> 1.6
   }
   lock.Unlock();
 }
@@ -421,6 +473,225 @@ void Monitor::bcast_latest_osd_map_osd()
 
 void Monitor::tick()
 {
+<<<<<<< Monitor.cc
+  dout(10) << "tick" << endl;
+
+  // mark down osds out?
+  utime_t now = g_clock.now();
+  list<int> mark_out;
+  for (map<int,utime_t>::iterator i = pending_out.begin();
+          i != pending_out.end();
+          i++) {
+       utime_t down = now;
+       down -= i->second;
+
+       if (down.sec() >= g_conf.mon_osd_down_out_interval) {
+         dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl;
+         mark_out.push_back(i->first);
+       }
+  }
+  for (list<int>::iterator i = mark_out.begin();
+          i != mark_out.end();
+          i++) {
+       pending_out.erase(*i);
+       pending.new_out.push_back( *i );
+       accept_pending();
+  }
+  
+  // next!
+  g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger));
+}
+
+/*****************************************
+ HANDLERS FOR MSG OF THE ELECTION ALGORITHM
+******************************************/
+  void Monitor::handle_ack_msg(MMonElectionAck* msg)
+  {
+    assert(this->refresh_num >= msg->get_refresh_num());
+    
+    if (this->refresh_num > msg->get_refresh_num())
+    {
+       // we got the message too late... discard it
+       return;
+    }
+    this->ack_msg_count++;
+    if (this->ack_msg_count >= this->f + 1)
+    {
+       dout(5) << "P" << this->p_id << ": Received _f+1 acks, increase freshness" << endl;
+       this->round_trip_timer->cancel();
+       this->registry[this->p_id]->freshness++;         
+    }  
+  }
+    
+  void Monitor::handle_collect_msg(MMonElectionCollect* msg)
+  {
+    messenger->send_message(new MMonElectionStatus(this->whoami, 
+                                                                                                                                       msg->getSenderId(), 
+                                                                                                                                       msg->getReadNum(), 
+                                                                                                                                       this->registry),
+          msg->get_source(), 
+          msg->get_source_port());
+  }
+  
+  void Monitor::handle_refresh_msg(MMonElectionRefresh* msg)
+  {
+    if (this->registry[msg->p]->isLesser(msg->state))
+               {
+                       // update local data
+                       this->registry[msg->p] = msg->state;
+                       // reply to msg
+                       send(new MMonElectionAck(this->whoami, 
+                                                                                                                  msg->p, 
+                                                                                                            msg->refresh_num), 
+                                        msg->get_source(), 
+           msg->get_source_port());
+               }
+  }
+
+  void Monitor::handle_status_msg(MMonElectionStatus* msg)
+  {
+               if (this->read_num != msg->read_num)
+               {
+                       dout(1) << _processId << ":HANDLING:" << msg.getType() << ":DISCARDED B/C OF READNUM(" << this->read_num << ":" << msg->read_num << ")" << endl;
+                       return;
+               }
+               for (int i=0; i<this->processes->size(); i++)
+               {
+                       int r = processes[i];
+                       // Put in the view the max value between then new state and the stored one
+                       if ( msg->registry[r]->isGreater(this->views[r]->state) )
+                       {
+                               this->views[r]->state = msg->registry[r];
+                       }
+               }
+               
+               this->status_msg_count++;
+               if (this->status_msg_count >= this->processes->size() - _f)  // Responses from quorum collected
+               {
+                       for (int i=0; i<this->processes->size(); i++)
+                       {
+                               int r = processes[i];
+                               // Check if r has refreshed its epoch number
+                               if (! this->views[r]->state->isGreater(this->old_views[r]->state) )
+                               {
+                                       dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl;
+                                       this->views[r]->expired = true;
+                               }
+                               if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num))
+                               {
+                                       this->views[r]->expired = false;
+                               }
+                       }
+                       Epoch leader_epoch = get_min_epoch();
+                       this->leader_id = leader_epoch->p_id;
+                       dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl;
+                       
+                       // Restarts the timer for the next iteration
+                       startReadTimer();
+               }
+  }
+  
+  // return the minimum epoch in the this->view map
+  private Monitor::get_min_epoch()
+  {
+       Epoch min = null;
+               for (View v : views)
+               {
+                       if (min == null || 
+                                       (Epoch.compare(v.getState().getEpochNum(), min) < 0 && !v.isExpired()))
+                       {
+                               min = v.getState().getEpochNum();
+                       }
+               }
+               return min;
+  }
+
+/*****************************************
+ CLASSES NEEDED FOR THE ELECTION ALGORITHM
+******************************************/
+class Epoch {
+  public:
+    int p_id;
+    int s_num;
+
+    Epoch(int p_id, int s_num) {
+      this->p_id = p_id;
+      this->s_num = s_num;
+    }
+        
+    bool isGreater(Epoch *e);
+    bool isLesser(Epoch *e);
+    bool isEqual(Epoch *e);
+
+};
+
+bool Epoch::isGreater(Epoch *e)
+{
+  if (this->s_num == e->s_num)
+  {
+    return (this->p_id > e->p_id);
+  }
+  else
+  {
+    return (this->s_num > e->s_num);
+  }
+}
+
+bool Epoch::isLesser(Epoch *e)
+{
+  if (this->s_num == e->s_num)
+  {
+    return (this->p_id < e->p_id);
+  }
+  else
+  {
+    return (this->s_num < e->s_num);
+  }
+}
+
+bool Epoch::isEqual(Epoch *e)
+{
+  return ((this->s_num == e->s_num) && (this->p_id > e->p_id));
+}
+
+class State {
+  public:
+    Epoch *e_num;
+    int freshness;
+
+    State (Epoch *e_num, int freshness)
+    {
+      this->e_num = e_num;
+      this->freshness = freshness;
+    }
+        
+    bool isGreater(State *e);
+    bool isLesser(State *e);
+    bool isEqual(State *e);
+};
+
+bool State::isGreater(State *e)
+{
+  if (this->e_num->isEqual(e->e_num))
+  {
+    return (this->freshness > e->freshness);
+  }
+  else
+  {
+    return this->e_num->isGreater(e->e_num);
+  }
+}
+
+bool State::isLesser(State *e)
+{
+  if (this->e_num->isEqual(e->e_num))
+  {
+    return (this->freshness < e->freshness);
+  }
+  else
+  {
+    return this->e_num->isLesser(e->e_num);
+=======
   lock.Lock();
   {
        dout(10) << "tick" << endl;
@@ -449,6 +720,25 @@ void Monitor::tick()
        
        // next!
        g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this));
+>>>>>>> 1.6
   }
+<<<<<<< Monitor.cc
+=======
   lock.Unlock();
+>>>>>>> 1.6
 }
+
+bool State::isEqual(State *e)
+{
+  return (this->e_num->isEqual(e->e_num) && (this->freshness == e->freshness));
+}
+
+class View {
+  public:
+    State *p_state;
+    bool expired;
+    View (State* p_state, bool expired) {
+      this->p_state = p_state;
+      this->expired = expired;
+    }    
+};
index f484adf22b386cd7737bae50bc9d5a15bd0445ea..3888d30d5085a52baac45c190658fb53f8bef2cc 100644 (file)
@@ -53,6 +53,35 @@ class Monitor : public Dispatcher {
   void bcast_latest_osd_map_mds();
   void bcast_latest_osd_map_osd();
 
+  /*******************************************
+  * Variables used by the election algorithm *
+  *******************************************/
+  // used during refresh phase
+  int ack_msg_count;
+  int refresh_num;
+  
+  // used during read phase
+  int read_num;
+  int status_msg_count;
+  
+  // the leader process id
+  int leader_id;
+  // f-accessible
+  int f;
+  
+  // the processes that compose the group
+//   vector<int> processes;
+  // parameters for the process
+  int main_delta;
+  int trip_delta;
+  
+  // state variables
+  hash_map<int, state> registry;
+  hash_map<int, view> views;
+  hash_map<int, view> old_views;
+  /************************************************
+  * END> Variables used by the election algorithm *
+  *************************************************/
 
  public:
   Monitor(int w, Messenger *m) : 
@@ -72,11 +101,21 @@ class Monitor : public Dispatcher {
   void handle_osd_getmap(class MOSDGetMap *m);
 
   void handle_ping_ack(class MPingAck *m);
+<<<<<<< Monitor.h
+  
+  // handles for election messages
+  void handle_ack_msg(class MMonElectionAck);
+  void handle_collect_msg(class MMonElectionCollect);
+  void handle_refresh_msg(class MMonElectionRefresh);
+  void handle_status_msg(class MMoneElectionStatus);
+  
+=======
 
   
   void tick();  // periodic stuff. check state, take actions
 
 
+>>>>>>> 1.4
   // hack
   void fake_osd_failure(int osd, bool down);