]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 19 Aug 2006 15:17:33 +0000 (15:17 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 19 Aug 2006 15:17:33 +0000 (15:17 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@806 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/mon/Elector.cc [new file with mode: 0644]
ceph/mon/Elector.h [new file with mode: 0644]
ceph/mon/Monitor.cc
ceph/mon/Monitor.h

diff --git a/ceph/mon/Elector.cc b/ceph/mon/Elector.cc
new file mode 100644 (file)
index 0000000..ff2e319
--- /dev/null
@@ -0,0 +1,218 @@
+
+#include "Elector.h"
+#include "Monitor.h"
+
+#include "common/Timer.h"
+
+#include "messages/MMonElectionRefresh.h"
+#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionAck.h"
+#include "messages/MMonElectionCollect.h"
+
+#include "config.h"
+#undef dout
+#define  dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << "mon" << whoami << " "
+#define  derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " "
+
+
+
+class C_Elect_ReadTimer : public Context {
+  Elector *mon;
+public:
+  C_Elect_ReadTimer(Elector *m) : mon(m){}
+  void finish(int r) {
+       mon->read_timer();
+  }
+};
+
+void Elector::read_timer()
+{
+  lock.Lock();
+  {
+    read_num++;
+    status_msg_count = 0;
+    old_views = views;   // TODO deep copy
+    for (int i=0; i<processes.size(); i++) {
+         mon->messenger->send_message(new MMonElectionCollect(read_num), 
+                                                                  MSG_ADDR_MON(processes[i]));
+       }
+  }
+  lock.Unlock();
+};
+
+class C_Elect_TripTimer : public Context {
+  Elector *mon;
+public:
+  C_Elect_TripTimer(Elector *m) : mon(m){}
+  void finish(int r) {
+       mon->trip_timer();
+  }
+};
+
+void Elector::trip_timer()
+{
+  lock.Lock();
+  {
+    views[whoami].expired = true;
+       registry[whoami].epoch.s_num++;
+    dout(1) << "Process " << whoami
+                       <<  " timed out (" << m->ackMsgCount << "/" << (m->f + 1)
+                       << ") ... increasing epoch. Now epoch is "
+                       << m->registry[whoami]->epoch->s_num
+                       << endl;
+  }
+  lock.Unlock();
+};
+
+
+
+class C_Elect_RefreshTimer : public Context {
+  Elector *mon;
+public:
+  C_Elect_RefreshTimer(Elector *m) : mon(m) {}
+  void finish(int r) {
+       mon->refresh_timer();
+  }
+};
+
+void Elector::refresh_timer()
+{
+  lock.Lock();
+  {
+       ack_msg_count = 0;
+       refresh_num++;
+       MMonElectionRefresh *msg = new MMonElectionRefresh(whoami, registry[whoami], refresh_num);
+       for (int i=0; i<processes.size(); i++) {
+         mon->messenger->send_message(msg, MSG_ADDR_MON(processes[i]));
+       }
+       
+       // Start the trip timer
+       round_trip_timer = new C_Elect_TripTimer(this);
+       g_timer.add_event_after(trip_delta, m->round_trip_timer);        
+  }
+  lock.Unlock();
+};
+
+
+
+//////////////////////////
+
+
+void Elector::dispatch(Message *m)
+{
+  lock.Lock();
+  {
+       switch (m->get_type()) {
+       case MSG_MON_ELECTION_ACK:
+         handle_ack(m);
+         break;
+       
+       case MSG_MON_ELECTION_STATUS:
+         handle_status(m);
+         break;
+       
+       case MSG_MON_ELECTION_COLLECT:
+         handle_collect(m);
+         break;
+       
+       case MSG_MON_ELECTION_REFRESH:
+         handle_refresh(m);
+         break;
+         
+       default:
+         assert(0);
+       }
+  }
+  lock.Unlock();
+}
+
+void Elector::handle_ack(MMonElectionAck* msg)
+{
+  assert(refresh_num >= msg->get_refresh_num());
+  
+  if (refresh_num > msg->get_refresh_num()) {
+       // we got the message too late... discard it
+       return;
+  }
+  ack_msg_count++;
+  if (ack_msg_count >= f + 1) {
+       dout(5) << "P" << p_id << ": Received _f+1 acks, increase freshness" << endl;
+       g_timer.cancel_event(round_trip_task);
+       round_trip_timer->cancel();
+       registry[p_id]->freshness++;         
+  }
+  
+  delete msg;
+}
+
+void Elector::handle_collect(MMonElectionCollect* msg)
+{
+  messenger->send_message(new MMonElectionStatus(whoami, 
+                                                                                                msg->getSenderId(), 
+                                                                                                msg->getReadNum(), 
+                                                                                                registry),
+                                                 msg->get_source());
+  delete msg;
+}
+
+void Elector::handle_refresh(MMonElectionRefresh* msg)
+{
+  if (this->registry[msg->p]->isLesser(msg->state)) {
+       // update local data
+       registry[msg->p] = msg->state;
+
+       // reply to msg
+       messenger->send_message(new MMonElectionAck(whoami, 
+                                                                                               msg->p, 
+                                                                                               msg->refresh_num), 
+                                                       msg->get_source());
+  }
+
+  delete msg;
+}
+
+
+void Elector::handle_status(MMonElectionStatus* msg)
+{
+  if (read_num != msg->read_num) {
+       dout(1) << _processId << ":HANDLING:" << msg.getType()
+                       << ":DISCARDED B/C OF READNUM(" << read_num << ":"
+                       << msg->read_num << ")" 
+                       << endl;
+       return;
+  }
+  for (int i=0; i<this->processes->size(); i++) {
+       int r = processes[i];
+       // Put in the view the max value between then new state and the stored one
+       if ( msg->registry[r]->isGreater(views[r]->state) ) {
+         views[r]->state = msg->registry[r];
+       }
+  }
+               
+  status_msg_count++;
+  if (status_msg_count >= processes.size() - _f) { // Responses from quorum collected
+       for (int i=0; i<processes.size(); i++) {
+         int r = processes[i];
+         // Check if r has refreshed its epoch number
+         if (! this->views[r]->state->isGreater(this->old_views[r]->state) )
+               {
+                 dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl;
+                 this->views[r]->expired = true;
+               }
+         if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num))
+               {
+                 this->views[r]->expired = false;
+               }
+       }
+       Epoch leader_epoch = get_min_epoch();
+       this->leader_id = leader_epoch->p_id;
+       dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl;
+       
+       // Restarts the timer for the next iteration
+       g_timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this));
+  }
+}
+
+
+
+
diff --git a/ceph/mon/Elector.h b/ceph/mon/Elector.h
new file mode 100644 (file)
index 0000000..084e59d
--- /dev/null
@@ -0,0 +1,161 @@
+// -*- mode:C++; tab-width:4; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MON_ELECTOR_H
+#define __MON_ELECTOR_H
+
+#include <map>
+using namespace std;
+
+#include "include/types.h"
+#include "msg/Message.h"
+
+
+class Monitor;
+
+
+class Elector {
+ public:
+
+  //// sub-classes
+
+  // Epoch
+  class Epoch {
+  public:
+    int p_id;
+    int s_num;
+       
+    Epoch(int p_id=0, int s_num=0) {
+      this->p_id = p_id;
+      this->s_num = s_num;
+    }
+       
+    bool operator>(Epoch& e) {
+         if (s_num == e.s_num)
+               return (p_id > e.p_id);
+         else
+               return (s_num > e.s_num);
+       }
+       bool operator<(Epoch& e) {
+         if (s_num == e.s_num)
+               return (p_id < e.p_id);
+         else
+               return (s_num < e.s_num);
+       }
+    bool operator==(Epoch& e) {
+         return ((s_num == e.s_num) && (p_id > e.p_id));
+       }
+  };
+
+
+  // State
+  class State {
+  public:
+    Epoch epoch;
+    int freshness;
+
+       State() {};
+    State (Epoch& e, int f) :
+         epoch(e), freshness(f) {}
+
+    bool operator>(State& e) {
+         if (epoch == e.epoch)
+               return (freshness > e.freshness);
+         else
+               return epoch > e.epoch;
+       }
+    bool operator<(State& e) {
+         if (epoch == e.epoch)
+               return (freshness < e.freshness);
+         else
+               return epoch < e.epoch;
+       }
+    bool operator==(State& e) {
+         return ( (epoch == e.epoch) && (freshness == e.freshness) );
+       }
+  };
+
+
+  class View {
+  public:
+    State state;
+    bool expired;
+    View(State& s, bool e) : state(s), expired(e) {}
+  };
+
+
+  ///////////////
+ private:
+  Monitor *mon;
+  int whoami;
+  Mutex lock;
+
+  // used during refresh phase
+  int ack_msg_count;
+  int refresh_num;
+  
+  // used during read phase
+  int read_num;
+  int status_msg_count;
+  
+  // the leader process id
+  int leader_id;
+  // f-accessible
+  int f;
+  
+  // the processes that compose the group
+  //   vector<int> processes;
+  // parameters for the process
+  int main_delta;
+  int trip_delta;
+  
+  // state variables
+  map<int, State> registry;
+  map<int, View>  views;
+  map<int, View>  old_views;
+
+  // get the minimum epoch in the view map
+  Epoch get_min_epoch() {
+       assert(!views.empty());
+       Epoch min = views[0].state.epoch;
+       for (unsigned i=1; i<views.size(); i++) {
+         if (views[i].state.epoch < min && !views[i].expired) {
+               min = views[i].state.epoch;
+         }
+       }
+       return min;
+  }
+  
+  // handlers for election messages
+  void handle_ack(class MMonElectionAck *m);
+  void handle_collect(class MMonElectionCollect *m);
+  void handle_refresh(class MMonElectionRefresh *m);
+  void handle_status(class MMoneElectionStatus *m);
+
+ public:  
+  Elector(Monitor *m, int w) : mon(m), whoami(w) {
+       // initialize all those values!
+       // ...
+  }
+
+  // timer methods
+  void read_timer();
+  void trip_timer();
+  void refresh_timer();
+  
+  void dispatch(Message *m);
+
+};
+
+#endif
index 45a5f8e8450f43487a1dcec32a4b9c70961fce3c..39a72d2431e5749934fd7ceccd8c03534e3d9ec6 100644 (file)
 #include "messages/MOSDMap.h"
 #include "messages/MOSDGetMap.h"
 #include "messages/MOSDBoot.h"
-#include "messages/MMonElectionRefresh.h"
-#include "messages/MMonElectionStatus.h"
-#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionCollect.h"
 
 #include "common/Timer.h"
 #include "common/Clock.h"
 #define  derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " e" << (osdmap ? osdmap->get_epoch():0) << " "
 
 
-class C_OM_PingTick : public Context {
-public:
-  Messenger *msgr;
-  C_OM_PingTick(Messenger *m) : msgr(m) {}
-  void finish(int r) {
-       msgr->send_message(new MPing, MSG_ADDR_MON(0));
-  }
-};
-
-class C_OM_Faker : public Context {
+class C_Mon_Tick : public Context {
+  Monitor *mon;
 public:
-  Monitor *om;
-  C_OM_Faker(Monitor *m) { 
-       this->om = m;
-  }
+  C_Mon_Tick(Monitor *m) : mon(m) {}
   void finish(int r) {
-       om->fake_reorg();
+       mon->tick();
   }
 };
 
-class C_OM_FakeOSDFailure : public Context {
+class C_Mon_FakeOSDFailure : public Context {
   Monitor *mon;
   int osd;
   bool down;
 public:
-  C_OM_FakeOSDFailure(Monitor *m, int o, bool d) : mon(m), osd(o), down(d) {}
+  C_Mon_FakeOSDFailure(Monitor *m, int o, bool d) : mon(m), osd(o), down(d) {}
   void finish(int r) {
        mon->fake_osd_failure(osd,down);
   }
 };
 
-class C_Elect_ReadTimer : public Context {
-  Monitor *mon;
-public:
-  C_Elect_ReadTimer(Monitor *m) : mon(m){}
-  void finish(int r) {
-    m->read_num++;
-    m->status_msg_count = 0;
-    m->old_views = views;   // TODO deep copy
-    for (int i=0; i<m->processes->size(); i++)
-    {
-      m->send_message(new MMonElectionCollect(m->read_num), ###address###, ###port###);
-    }
-  }
-};
-
-class C_Elect_TripTimer : public Context {
-  Monitor *mon;
-public:
-  C_Elect_TripTimer(Monitor *m) : mon(m){}
-  void finish(int r) {
-    m->views[whoami]->expired = true;
-               m->registry[whoami]->e_num->s_num++;
-    dout(1) << "Process " << whoami <<  " timed out (" << m->ackMsgCount << "/" << (m->f + 1) << ") ... increasing epoch. Now epoch is " << m->registry[whoami]->e_num->s_num;                                                                                                                                         
-  }
-};
-
-class C_Elect_RefreshTimer : public Context {
-  Monitor *mon;
-public:
-  C_Elect_TripTimer(Monitor *m) : mon(m){}
-  void finish(int r) {
-                       m->ack_msg_count = 0;
-                       m->refresh_num++;
-                       MMonElectionRefresh msg = new MMonElectionRefresh(whoami, m->registry[whoami], m->refresh_num);
-                       for (int i=0; i<m->processes->size(); i++)
-                       {
-                               m->send_message(msg, ###i-ma dest###, ###port###);
-                       }
-      // Start the trip timer
-      m->round_trip_timer = new C_Elect_TripTimer(this);
-           g_timer.add_event_after(trip_delta, m->round_trip_timer);        
-   }
-};
-
 
 void Monitor::fake_reorg() 
 {
@@ -200,13 +141,13 @@ void Monitor::init()
                 i != g_fake_osd_down.end();
                 i++) {
          dout(0) << "will fake osd" << i->first << " DOWN after " << i->second << endl;
-         g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 1));
+         g_timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 1));
        }
        for (map<int,float>::iterator i = g_fake_osd_out.begin();
                 i != g_fake_osd_out.end();
                 i++) {
          dout(0) << "will fake osd" << i->first << " OUT after " << i->second << endl;
-         g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 0));
+         g_timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 0));
        }
   }
 
@@ -215,58 +156,50 @@ void Monitor::init()
   messenger->set_dispatcher(this);
   
   // start ticker
-  g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger));
+  g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this));
 }
 
 
 void Monitor::dispatch(Message *m)
 {
-  switch (m->get_type()) {
-  case MSG_FAILURE:
-       handle_failure((MFailure*)m);
-       break;
-       
-  case MSG_PING_ACK:
-       handle_ping_ack((MPingAck*)m);
-       break;
-
-  case MSG_OSD_GETMAP:
-       handle_osd_getmap((MOSDGetMap*)m);
-       return;
-
-  case MSG_OSD_BOOT:
-       handle_osd_boot((MOSDBoot*)m);
-       return;
-
-  case MSG_SHUTDOWN:
-       handle_shutdown(m);
-       return;
-
-  case MSG_PING:
-       tick();
-       delete m;
-       return;
-
-  case MSG_MON_ELECTION_ACK:
-  handle_msg_ack(m);
-  return;
-  
-  case MSG_MON_ELECTION_STATUS:
-  handle_msg_status(m);
-  return;
-  
-  case MSG_MON_ELECTION_COLLECT:
-  handle_msg_collect(m);
-  return;
-  
-  case MSG_MON_ELECTION_REFRESH:
-  handle_msg_refresh(m);
-  return;
-  
-  default:
-       dout(0) << "unknown message " << *m << endl;
-       assert(0);
+  lock.Lock();
+  {
+       switch (m->get_type()) {
+       case MSG_FAILURE:
+         handle_failure((MFailure*)m);
+         break;
+         
+       case MSG_PING_ACK:
+         handle_ping_ack((MPingAck*)m);
+         break;
+         
+       case MSG_OSD_GETMAP:
+         handle_osd_getmap((MOSDGetMap*)m);
+         break;
+
+       case MSG_OSD_BOOT:
+         handle_osd_boot((MOSDBoot*)m);
+         break;
+         
+       case MSG_SHUTDOWN:
+         handle_shutdown(m);
+         break;
+
+         // elector messages
+       case MSG_MON_ELECTION_ACK:
+       case MSG_MON_ELECTION_STATUS:
+       case MSG_MON_ELECTION_COLLECT:
+       case MSG_MON_ELECTION_REFRESH:
+         elector.dispatch(m);
+         break;
+
+         
+       default:
+         dout(0) << "unknown message " << *m << endl;
+         assert(0);
+       }
   }
+  lock.Unlock();
 }
 
 
@@ -320,16 +253,20 @@ void Monitor::handle_failure(MFailure *m)
 
 void Monitor::fake_osd_failure(int osd, bool down) 
 {
-  if (down) {
-       dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
-       pending.new_down[osd] = osdmap->osd_inst[osd];
-  } else {
+  lock.Lock();
+  {
+       if (down) {
+         dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
+         pending.new_down[osd] = osdmap->osd_inst[osd];
+       } else {
        dout(1) << "fake_osd_failure OUT osd" << osd << endl;
        pending.new_out.push_back(osd);
+       }
+       accept_pending();
+       bcast_latest_osd_map_osd();
+       bcast_latest_osd_map_mds();
   }
-  accept_pending();
-  bcast_latest_osd_map_osd();
-  bcast_latest_osd_map_mds();
+  lock.Unlock();
 }
 
 
@@ -521,242 +458,40 @@ void Monitor::bcast_latest_osd_map_osd()
 
 void Monitor::tick()
 {
-  dout(10) << "tick" << endl;
-
-  // mark down osds out?
-  utime_t now = g_clock.now();
-  list<int> mark_out;
-  for (map<int,utime_t>::iterator i = pending_out.begin();
-          i != pending_out.end();
-          i++) {
-       utime_t down = now;
-       down -= i->second;
-
-       if (down.sec() >= g_conf.mon_osd_down_out_interval) {
-         dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl;
-         mark_out.push_back(i->first);
-       }
-  }
-  for (list<int>::iterator i = mark_out.begin();
-          i != mark_out.end();
-          i++) {
-       pending_out.erase(*i);
-       pending.new_out.push_back( *i );
-       accept_pending();
-  }
-  
-  // next!
-  g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger));
-}
-
-/*****************************************
- HANDLERS FOR MSG OF THE ELECTION ALGORITHM
-******************************************/
-  void Monitor::handle_ack_msg(MMonElectionAck* msg)
-  {
-    assert(this->refresh_num >= msg->get_refresh_num());
-    
-    if (this->refresh_num > msg->get_refresh_num())
-    {
-       // we got the message too late... discard it
-       return;
-    }
-    this->ack_msg_count++;
-    if (this->ack_msg_count >= this->f + 1)
-    {
-       dout(5) << "P" << this->p_id << ": Received _f+1 acks, increase freshness" << endl;
-       g_timer.cancel_event(this->round_trip_task);
-       this->round_trip_timer->cancel();
-       this->registry[this->p_id]->freshness++;         
-    }  
-  }
-    
-  void Monitor::handle_collect_msg(MMonElectionCollect* msg)
-  {
-    messenger->send_message(new MMonElectionStatus(this->whoami, 
-                                                                                                                                       msg->getSenderId(), 
-                                                                                                                                       msg->getReadNum(), 
-                                                                                                                                       this->registry),
-          msg->get_source(), 
-          msg->get_source_port());
-  }
-  
-  void Monitor::handle_refresh_msg(MMonElectionRefresh* msg)
-  {
-    if (this->registry[msg->p]->isLesser(msg->state))
-               {
-                       // update local data
-                       this->registry[msg->p] = msg->state;
-                       // reply to msg
-                       send(new MMonElectionAck(this->whoami, 
-                                                                                                                  msg->p, 
-                                                                                                            msg->refresh_num), 
-                                        msg->get_source(), 
-           msg->get_source_port());
-               }
-  }
-
-  void Monitor::handle_status_msg(MMonElectionStatus* msg)
-  {
-               if (this->read_num != msg->read_num)
-               {
-                       dout(1) << _processId << ":HANDLING:" << msg.getType() << ":DISCARDED B/C OF READNUM(" << this->read_num << ":" << msg->read_num << ")" << endl;
-                       return;
-               }
-               for (int i=0; i<this->processes->size(); i++)
-               {
-                       int r = processes[i];
-                       // Put in the view the max value between then new state and the stored one
-                       if ( msg->registry[r]->isGreater(this->views[r]->state) )
-                       {
-                               this->views[r]->state = msg->registry[r];
-                       }
-               }
-               
-               this->status_msg_count++;
-               if (this->status_msg_count >= this->processes->size() - _f)  // Responses from quorum collected
-               {
-                       for (int i=0; i<this->processes->size(); i++)
-                       {
-                               int r = processes[i];
-                               // Check if r has refreshed its epoch number
-                               if (! this->views[r]->state->isGreater(this->old_views[r]->state) )
-                               {
-                                       dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl;
-                                       this->views[r]->expired = true;
-                               }
-                               if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num))
-                               {
-                                       this->views[r]->expired = false;
-                               }
-                       }
-                       Epoch leader_epoch = get_min_epoch();
-                       this->leader_id = leader_epoch->p_id;
-                       dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl;
-                       
-                       // Restarts the timer for the next iteration
-                       g_timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this));
-               }
-  }
-
-  // return the minimum epoch in the this->view map
-  private Monitor::get_min_epoch()
-  {
-       Epoch min = new Epoch(-1, -1);
-               for (int i=0; i<this->views->size(); i++)
-               {
-      View v = this->views[i];
-                       if (v->state->e_num->isLesser(min) && !v->expired)
-                       {
-                               min = v->state->e_num;
-                       }
-               }
-               return min;
-  }
-  
-  void Monitor::start_read_timer()
-  {
-         
-  }
-/*****************************************
- CLASSES NEEDED FOR THE ELECTION ALGORITHM
-******************************************/
-class Epoch {
-  public:
-    int p_id;
-    int s_num;
-
-    Epoch(int p_id, int s_num) {
-      this->p_id = p_id;
-      this->s_num = s_num;
-    }
-        
-    bool operator>(Epoch *e);
-    bool operator<(Epoch *e);
-    bool operator==(Epoch *e);
-
-};
-
-bool Epoch::operator>(Epoch *e)
-{
-  if (this->s_num == e->s_num)
+  lock.Lock();
   {
-    return (this->p_id > e->p_id);
-  }
-  else
-  {
-    return (this->s_num > e->s_num);
-  }
-}
-
-bool Epoch::operator<(Epoch *e)
-{
-  if (this->s_num == e->s_num)
-  {
-    return (this->p_id < e->p_id);
-  }
-  else
-  {
-    return (this->s_num < e->s_num);
+       dout(10) << "tick" << endl;
+       
+       // mark down osds out?
+       utime_t now = g_clock.now();
+       list<int> mark_out;
+       for (map<int,utime_t>::iterator i = pending_out.begin();
+                i != pending_out.end();
+                i++) {
+         utime_t down = now;
+         down -= i->second;
+         
+         if (down.sec() >= g_conf.mon_osd_down_out_interval) {
+               dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl;
+               mark_out.push_back(i->first);
+         }
+       }
+       for (list<int>::iterator i = mark_out.begin();
+                i != mark_out.end();
+                i++) {
+         pending_out.erase(*i);
+         pending.new_out.push_back( *i );
+         accept_pending();
+       }
+       
+       // next!
+       g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this));
   }
+  lock.Unlock();
 }
 
-bool Epoch::operator==(Epoch *e)
-{
-  return ((this->s_num == e->s_num) && (this->p_id > e->p_id));
-}
 
-class State {
-  public:
-    Epoch *e_num;
-    int freshness;
-
-    State (Epoch *e_num, int freshness)
-    {
-      this->e_num = e_num;
-      this->freshness = freshness;
-    }
-        
-    bool operator>(State *e);
-    bool operator<(State *e);
-    bool operator==(State *e);
-};
 
-bool State::operator>(State *e)
-{
-  if (this->e_num == e->e_num)
-  {
-    return (this->freshness > e->freshness);
-  }
-  else
-  {
-    return this->e_num > e->e_num;
-  }
-}
 
-bool State::operator<(State *e)
-{
-  if (this->e_num == e->e_num)
-  {
-    return (this->freshness < e->freshness);
-  }
-  else
-  {
-    return this->e_num < e->e_num;
-  }
-}
 
-bool State::operator==(State *e)
-{
-  return ( (this->e_num == e->e_num) && (this->freshness == e->freshness) );
-}
 
-class View {
-  public:
-    State *p_state;
-    bool expired;
-    View (State* p_state, bool expired) {
-      this->p_state = p_state;
-      this->expired = expired;
-    }    
-};
index 3f8e1cf795a53f1553f5c461faba087b4006924a..77cf502f99620d8d8b068bafcd0bdd77c8f4ec5c 100644 (file)
@@ -26,10 +26,19 @@ using namespace std;
 
 #include "osd/OSDMap.h"
 
+#include "Elector.h"
+
+
 class Monitor : public Dispatcher {
+protected:
   // me
   int whoami;
   Messenger *messenger;
+  Mutex lock;
+
+  // elector
+  Elector elector;
+  friend class Elector;
 
   // maps
   OSDMap *osdmap;
@@ -57,43 +66,12 @@ class Monitor : public Dispatcher {
   void get_min_epoch();
   void start_read_timer();
   
-  /*******************************************
-  * Variables used by the election algorithm *
-  *******************************************/
-  // used during refresh phase
-  int ack_msg_count;
-  int refresh_num;
-  
-  // used during read phase
-  int read_num;
-  int status_msg_count;
-  
-  // the leader process id
-  int leader_id;
-  // f-accessible
-  int f;
-  
-  // the processes that compose the group
-//   vector<int> processes;
-  // parameters for the process
-  int main_delta;
-  int trip_delta;
-  
-  // state variables
-  hash_map<int, state> registry;
-  hash_map<int, view> views;
-  hash_map<int, view> old_views;
-  
-  // round trip timer
-  round_trip_task
-  /************************************************
-  * END> Variables used by the election algorithm *
-  *************************************************/
 
  public:
   Monitor(int w, Messenger *m) : 
        whoami(w),
        messenger(m),
+       elector(this, w),
        osdmap(0) {
   }
 
@@ -109,12 +87,6 @@ class Monitor : public Dispatcher {
 
   void handle_ping_ack(class MPingAck *m);
   
-  // handles for election messages
-  void handle_ack_msg(class MMonElectionAck);
-  void handle_collect_msg(class MMonElectionCollect);
-  void handle_refresh_msg(class MMonElectionRefresh);
-  void handle_status_msg(class MMoneElectionStatus);
-  
   // hack
   void fake_osd_failure(int osd, bool down);
   void fake_reorg();