From 4ac5acd4c7e9ab0b5b015d03ad297a912bdbd0bd Mon Sep 17 00:00:00 2001 From: rferrett Date: Fri, 18 Aug 2006 23:13:50 +0000 Subject: [PATCH] first part of election algorithm git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@804 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/mon/Monitor.cc | 290 ++++++++++++++++++++++++++++++++++++++++++++ ceph/mon/Monitor.h | 39 ++++++ 2 files changed, 329 insertions(+) diff --git a/ceph/mon/Monitor.cc b/ceph/mon/Monitor.cc index e81f2aadc433b..66bba63fef182 100644 --- a/ceph/mon/Monitor.cc +++ b/ceph/mon/Monitor.cc @@ -27,6 +27,10 @@ #include "messages/MOSDMap.h" #include "messages/MOSDGetMap.h" #include "messages/MOSDBoot.h" +#include "messages/MMonElectionRefresh.h" +#include "messages/MMonElectionStatus.h" +#include "messages/MMonElectionAck.h" +#include "messages/MMonElectionCollect.h" #include "common/Timer.h" #include "common/Clock.h" @@ -129,6 +133,53 @@ void Monitor::init() void Monitor::dispatch(Message *m) { +<<<<<<< Monitor.cc + switch (m->get_type()) { + case MSG_FAILURE: + handle_failure((MFailure*)m); + break; + + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; + + case MSG_OSD_GETMAP: + handle_osd_getmap((MOSDGetMap*)m); + return; + + case MSG_OSD_BOOT: + handle_osd_boot((MOSDBoot*)m); + return; + + case MSG_SHUTDOWN: + handle_shutdown(m); + return; + + case MSG_PING: + tick(); + delete m; + return; + + case MSG_MON_ELECTION_ACK: + handle_msg_ack(m); + return; + + case MSG_MON_ELECTION_STATUS: + handle_msg_status(m); + return; + + case MSG_MON_ELECTION_COLLECT: + handle_msg_collect(m); + return; + + case MSG_MON_ELECTION_REFRESH: + handle_msg_refresh(m); + return; + + default: + dout(0) << "unknown message " << *m << endl; + assert(0); +======= lock.Lock(); { switch (m->get_type()) { @@ -161,6 +212,7 @@ void Monitor::dispatch(Message *m) dout(0) << "unknown message " << *m << endl; assert(0); } +>>>>>>> 1.6 } lock.Unlock(); } @@ -421,6 +473,225 @@ void Monitor::bcast_latest_osd_map_osd() void Monitor::tick() { +<<<<<<< Monitor.cc + dout(10) << "tick" << endl; + + // mark down osds out? + utime_t now = g_clock.now(); + list mark_out; + for (map::iterator i = pending_out.begin(); + i != pending_out.end(); + i++) { + utime_t down = now; + down -= i->second; + + if (down.sec() >= g_conf.mon_osd_down_out_interval) { + dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl; + mark_out.push_back(i->first); + } + } + for (list::iterator i = mark_out.begin(); + i != mark_out.end(); + i++) { + pending_out.erase(*i); + pending.new_out.push_back( *i ); + accept_pending(); + } + + // next! + g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger)); +} + +/***************************************** + HANDLERS FOR MSG OF THE ELECTION ALGORITHM +******************************************/ + void Monitor::handle_ack_msg(MMonElectionAck* msg) + { + assert(this->refresh_num >= msg->get_refresh_num()); + + if (this->refresh_num > msg->get_refresh_num()) + { + // we got the message too late... discard it + return; + } + this->ack_msg_count++; + if (this->ack_msg_count >= this->f + 1) + { + dout(5) << "P" << this->p_id << ": Received _f+1 acks, increase freshness" << endl; + this->round_trip_timer->cancel(); + this->registry[this->p_id]->freshness++; + } + } + + void Monitor::handle_collect_msg(MMonElectionCollect* msg) + { + messenger->send_message(new MMonElectionStatus(this->whoami, + msg->getSenderId(), + msg->getReadNum(), + this->registry), + msg->get_source(), + msg->get_source_port()); + } + + void Monitor::handle_refresh_msg(MMonElectionRefresh* msg) + { + if (this->registry[msg->p]->isLesser(msg->state)) + { + // update local data + this->registry[msg->p] = msg->state; + // reply to msg + send(new MMonElectionAck(this->whoami, + msg->p, + msg->refresh_num), + msg->get_source(), + msg->get_source_port()); + } + } + + void Monitor::handle_status_msg(MMonElectionStatus* msg) + { + if (this->read_num != msg->read_num) + { + dout(1) << _processId << ":HANDLING:" << msg.getType() << ":DISCARDED B/C OF READNUM(" << this->read_num << ":" << msg->read_num << ")" << endl; + return; + } + for (int i=0; iprocesses->size(); i++) + { + int r = processes[i]; + // Put in the view the max value between then new state and the stored one + if ( msg->registry[r]->isGreater(this->views[r]->state) ) + { + this->views[r]->state = msg->registry[r]; + } + } + + this->status_msg_count++; + if (this->status_msg_count >= this->processes->size() - _f) // Responses from quorum collected + { + for (int i=0; iprocesses->size(); i++) + { + int r = processes[i]; + // Check if r has refreshed its epoch number + if (! this->views[r]->state->isGreater(this->old_views[r]->state) ) + { + dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl; + this->views[r]->expired = true; + } + if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num)) + { + this->views[r]->expired = false; + } + } + Epoch leader_epoch = get_min_epoch(); + this->leader_id = leader_epoch->p_id; + dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl; + + // Restarts the timer for the next iteration + startReadTimer(); + } + } + + // return the minimum epoch in the this->view map + private Monitor::get_min_epoch() + { + Epoch min = null; + for (View v : views) + { + if (min == null || + (Epoch.compare(v.getState().getEpochNum(), min) < 0 && !v.isExpired())) + { + min = v.getState().getEpochNum(); + } + } + return min; + } + +/***************************************** + CLASSES NEEDED FOR THE ELECTION ALGORITHM +******************************************/ +class Epoch { + public: + int p_id; + int s_num; + + Epoch(int p_id, int s_num) { + this->p_id = p_id; + this->s_num = s_num; + } + + bool isGreater(Epoch *e); + bool isLesser(Epoch *e); + bool isEqual(Epoch *e); + +}; + +bool Epoch::isGreater(Epoch *e) +{ + if (this->s_num == e->s_num) + { + return (this->p_id > e->p_id); + } + else + { + return (this->s_num > e->s_num); + } +} + +bool Epoch::isLesser(Epoch *e) +{ + if (this->s_num == e->s_num) + { + return (this->p_id < e->p_id); + } + else + { + return (this->s_num < e->s_num); + } +} + +bool Epoch::isEqual(Epoch *e) +{ + return ((this->s_num == e->s_num) && (this->p_id > e->p_id)); +} + +class State { + public: + Epoch *e_num; + int freshness; + + State (Epoch *e_num, int freshness) + { + this->e_num = e_num; + this->freshness = freshness; + } + + bool isGreater(State *e); + bool isLesser(State *e); + bool isEqual(State *e); +}; + +bool State::isGreater(State *e) +{ + if (this->e_num->isEqual(e->e_num)) + { + return (this->freshness > e->freshness); + } + else + { + return this->e_num->isGreater(e->e_num); + } +} + +bool State::isLesser(State *e) +{ + if (this->e_num->isEqual(e->e_num)) + { + return (this->freshness < e->freshness); + } + else + { + return this->e_num->isLesser(e->e_num); +======= lock.Lock(); { dout(10) << "tick" << endl; @@ -449,6 +720,25 @@ void Monitor::tick() // next! g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this)); +>>>>>>> 1.6 } +<<<<<<< Monitor.cc +======= lock.Unlock(); +>>>>>>> 1.6 } + +bool State::isEqual(State *e) +{ + return (this->e_num->isEqual(e->e_num) && (this->freshness == e->freshness)); +} + +class View { + public: + State *p_state; + bool expired; + View (State* p_state, bool expired) { + this->p_state = p_state; + this->expired = expired; + } +}; diff --git a/ceph/mon/Monitor.h b/ceph/mon/Monitor.h index f484adf22b386..3888d30d5085a 100644 --- a/ceph/mon/Monitor.h +++ b/ceph/mon/Monitor.h @@ -53,6 +53,35 @@ class Monitor : public Dispatcher { void bcast_latest_osd_map_mds(); void bcast_latest_osd_map_osd(); + /******************************************* + * Variables used by the election algorithm * + *******************************************/ + // used during refresh phase + int ack_msg_count; + int refresh_num; + + // used during read phase + int read_num; + int status_msg_count; + + // the leader process id + int leader_id; + // f-accessible + int f; + + // the processes that compose the group +// vector processes; + // parameters for the process + int main_delta; + int trip_delta; + + // state variables + hash_map registry; + hash_map views; + hash_map old_views; + /************************************************ + * END> Variables used by the election algorithm * + *************************************************/ public: Monitor(int w, Messenger *m) : @@ -72,11 +101,21 @@ class Monitor : public Dispatcher { void handle_osd_getmap(class MOSDGetMap *m); void handle_ping_ack(class MPingAck *m); +<<<<<<< Monitor.h + + // handles for election messages + void handle_ack_msg(class MMonElectionAck); + void handle_collect_msg(class MMonElectionCollect); + void handle_refresh_msg(class MMonElectionRefresh); + void handle_status_msg(class MMoneElectionStatus); + +======= void tick(); // periodic stuff. check state, take actions +>>>>>>> 1.4 // hack void fake_osd_failure(int osd, bool down); -- 2.39.5