--- /dev/null
+
+#include "Elector.h"
+#include "Monitor.h"
+
+#include "common/Timer.h"
+
+#include "messages/MMonElectionRefresh.h"
+#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionAck.h"
+#include "messages/MMonElectionCollect.h"
+
+#include "config.h"
+#undef dout
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << "mon" << whoami << " "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " "
+
+
+
+class C_Elect_ReadTimer : public Context {
+ Elector *mon;
+public:
+ C_Elect_ReadTimer(Elector *m) : mon(m){}
+ void finish(int r) {
+ mon->read_timer();
+ }
+};
+
+void Elector::read_timer()
+{
+ lock.Lock();
+ {
+ read_num++;
+ status_msg_count = 0;
+ old_views = views; // TODO deep copy
+ for (int i=0; i<processes.size(); i++) {
+ mon->messenger->send_message(new MMonElectionCollect(read_num),
+ MSG_ADDR_MON(processes[i]));
+ }
+ }
+ lock.Unlock();
+};
+
+class C_Elect_TripTimer : public Context {
+ Elector *mon;
+public:
+ C_Elect_TripTimer(Elector *m) : mon(m){}
+ void finish(int r) {
+ mon->trip_timer();
+ }
+};
+
+void Elector::trip_timer()
+{
+ lock.Lock();
+ {
+ views[whoami].expired = true;
+ registry[whoami].epoch.s_num++;
+ dout(1) << "Process " << whoami
+ << " timed out (" << m->ackMsgCount << "/" << (m->f + 1)
+ << ") ... increasing epoch. Now epoch is "
+ << m->registry[whoami]->epoch->s_num
+ << endl;
+ }
+ lock.Unlock();
+};
+
+
+
+class C_Elect_RefreshTimer : public Context {
+ Elector *mon;
+public:
+ C_Elect_RefreshTimer(Elector *m) : mon(m) {}
+ void finish(int r) {
+ mon->refresh_timer();
+ }
+};
+
+void Elector::refresh_timer()
+{
+ lock.Lock();
+ {
+ ack_msg_count = 0;
+ refresh_num++;
+ MMonElectionRefresh *msg = new MMonElectionRefresh(whoami, registry[whoami], refresh_num);
+ for (int i=0; i<processes.size(); i++) {
+ mon->messenger->send_message(msg, MSG_ADDR_MON(processes[i]));
+ }
+
+ // Start the trip timer
+ round_trip_timer = new C_Elect_TripTimer(this);
+ g_timer.add_event_after(trip_delta, m->round_trip_timer);
+ }
+ lock.Unlock();
+};
+
+
+
+//////////////////////////
+
+
+void Elector::dispatch(Message *m)
+{
+ lock.Lock();
+ {
+ switch (m->get_type()) {
+ case MSG_MON_ELECTION_ACK:
+ handle_ack(m);
+ break;
+
+ case MSG_MON_ELECTION_STATUS:
+ handle_status(m);
+ break;
+
+ case MSG_MON_ELECTION_COLLECT:
+ handle_collect(m);
+ break;
+
+ case MSG_MON_ELECTION_REFRESH:
+ handle_refresh(m);
+ break;
+
+ default:
+ assert(0);
+ }
+ }
+ lock.Unlock();
+}
+
+void Elector::handle_ack(MMonElectionAck* msg)
+{
+ assert(refresh_num >= msg->get_refresh_num());
+
+ if (refresh_num > msg->get_refresh_num()) {
+ // we got the message too late... discard it
+ return;
+ }
+ ack_msg_count++;
+ if (ack_msg_count >= f + 1) {
+ dout(5) << "P" << p_id << ": Received _f+1 acks, increase freshness" << endl;
+ g_timer.cancel_event(round_trip_task);
+ round_trip_timer->cancel();
+ registry[p_id]->freshness++;
+ }
+
+ delete msg;
+}
+
+void Elector::handle_collect(MMonElectionCollect* msg)
+{
+ messenger->send_message(new MMonElectionStatus(whoami,
+ msg->getSenderId(),
+ msg->getReadNum(),
+ registry),
+ msg->get_source());
+ delete msg;
+}
+
+void Elector::handle_refresh(MMonElectionRefresh* msg)
+{
+ if (this->registry[msg->p]->isLesser(msg->state)) {
+ // update local data
+ registry[msg->p] = msg->state;
+
+ // reply to msg
+ messenger->send_message(new MMonElectionAck(whoami,
+ msg->p,
+ msg->refresh_num),
+ msg->get_source());
+ }
+
+ delete msg;
+}
+
+
+void Elector::handle_status(MMonElectionStatus* msg)
+{
+ if (read_num != msg->read_num) {
+ dout(1) << _processId << ":HANDLING:" << msg.getType()
+ << ":DISCARDED B/C OF READNUM(" << read_num << ":"
+ << msg->read_num << ")"
+ << endl;
+ return;
+ }
+ for (int i=0; i<this->processes->size(); i++) {
+ int r = processes[i];
+ // Put in the view the max value between then new state and the stored one
+ if ( msg->registry[r]->isGreater(views[r]->state) ) {
+ views[r]->state = msg->registry[r];
+ }
+ }
+
+ status_msg_count++;
+ if (status_msg_count >= processes.size() - _f) { // Responses from quorum collected
+ for (int i=0; i<processes.size(); i++) {
+ int r = processes[i];
+ // Check if r has refreshed its epoch number
+ if (! this->views[r]->state->isGreater(this->old_views[r]->state) )
+ {
+ dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl;
+ this->views[r]->expired = true;
+ }
+ if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num))
+ {
+ this->views[r]->expired = false;
+ }
+ }
+ Epoch leader_epoch = get_min_epoch();
+ this->leader_id = leader_epoch->p_id;
+ dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl;
+
+ // Restarts the timer for the next iteration
+ g_timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this));
+ }
+}
+
+
+
+
--- /dev/null
+// -*- mode:C++; tab-width:4; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MON_ELECTOR_H
+#define __MON_ELECTOR_H
+
+#include <map>
+using namespace std;
+
+#include "include/types.h"
+#include "msg/Message.h"
+
+
+class Monitor;
+
+
+class Elector {
+ public:
+
+ //// sub-classes
+
+ // Epoch
+ class Epoch {
+ public:
+ int p_id;
+ int s_num;
+
+ Epoch(int p_id=0, int s_num=0) {
+ this->p_id = p_id;
+ this->s_num = s_num;
+ }
+
+ bool operator>(Epoch& e) {
+ if (s_num == e.s_num)
+ return (p_id > e.p_id);
+ else
+ return (s_num > e.s_num);
+ }
+ bool operator<(Epoch& e) {
+ if (s_num == e.s_num)
+ return (p_id < e.p_id);
+ else
+ return (s_num < e.s_num);
+ }
+ bool operator==(Epoch& e) {
+ return ((s_num == e.s_num) && (p_id > e.p_id));
+ }
+ };
+
+
+ // State
+ class State {
+ public:
+ Epoch epoch;
+ int freshness;
+
+ State() {};
+ State (Epoch& e, int f) :
+ epoch(e), freshness(f) {}
+
+ bool operator>(State& e) {
+ if (epoch == e.epoch)
+ return (freshness > e.freshness);
+ else
+ return epoch > e.epoch;
+ }
+ bool operator<(State& e) {
+ if (epoch == e.epoch)
+ return (freshness < e.freshness);
+ else
+ return epoch < e.epoch;
+ }
+ bool operator==(State& e) {
+ return ( (epoch == e.epoch) && (freshness == e.freshness) );
+ }
+ };
+
+
+ class View {
+ public:
+ State state;
+ bool expired;
+ View(State& s, bool e) : state(s), expired(e) {}
+ };
+
+
+ ///////////////
+ private:
+ Monitor *mon;
+ int whoami;
+ Mutex lock;
+
+ // used during refresh phase
+ int ack_msg_count;
+ int refresh_num;
+
+ // used during read phase
+ int read_num;
+ int status_msg_count;
+
+ // the leader process id
+ int leader_id;
+ // f-accessible
+ int f;
+
+ // the processes that compose the group
+ // vector<int> processes;
+ // parameters for the process
+ int main_delta;
+ int trip_delta;
+
+ // state variables
+ map<int, State> registry;
+ map<int, View> views;
+ map<int, View> old_views;
+
+ // get the minimum epoch in the view map
+ Epoch get_min_epoch() {
+ assert(!views.empty());
+ Epoch min = views[0].state.epoch;
+ for (unsigned i=1; i<views.size(); i++) {
+ if (views[i].state.epoch < min && !views[i].expired) {
+ min = views[i].state.epoch;
+ }
+ }
+ return min;
+ }
+
+ // handlers for election messages
+ void handle_ack(class MMonElectionAck *m);
+ void handle_collect(class MMonElectionCollect *m);
+ void handle_refresh(class MMonElectionRefresh *m);
+ void handle_status(class MMoneElectionStatus *m);
+
+ public:
+ Elector(Monitor *m, int w) : mon(m), whoami(w) {
+ // initialize all those values!
+ // ...
+ }
+
+ // timer methods
+ void read_timer();
+ void trip_timer();
+ void refresh_timer();
+
+ void dispatch(Message *m);
+
+};
+
+#endif
#include "messages/MOSDMap.h"
#include "messages/MOSDGetMap.h"
#include "messages/MOSDBoot.h"
-#include "messages/MMonElectionRefresh.h"
-#include "messages/MMonElectionStatus.h"
-#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionCollect.h"
#include "common/Timer.h"
#include "common/Clock.h"
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " e" << (osdmap ? osdmap->get_epoch():0) << " "
-class C_OM_PingTick : public Context {
-public:
- Messenger *msgr;
- C_OM_PingTick(Messenger *m) : msgr(m) {}
- void finish(int r) {
- msgr->send_message(new MPing, MSG_ADDR_MON(0));
- }
-};
-
-class C_OM_Faker : public Context {
+class C_Mon_Tick : public Context {
+ Monitor *mon;
public:
- Monitor *om;
- C_OM_Faker(Monitor *m) {
- this->om = m;
- }
+ C_Mon_Tick(Monitor *m) : mon(m) {}
void finish(int r) {
- om->fake_reorg();
+ mon->tick();
}
};
-class C_OM_FakeOSDFailure : public Context {
+class C_Mon_FakeOSDFailure : public Context {
Monitor *mon;
int osd;
bool down;
public:
- C_OM_FakeOSDFailure(Monitor *m, int o, bool d) : mon(m), osd(o), down(d) {}
+ C_Mon_FakeOSDFailure(Monitor *m, int o, bool d) : mon(m), osd(o), down(d) {}
void finish(int r) {
mon->fake_osd_failure(osd,down);
}
};
-class C_Elect_ReadTimer : public Context {
- Monitor *mon;
-public:
- C_Elect_ReadTimer(Monitor *m) : mon(m){}
- void finish(int r) {
- m->read_num++;
- m->status_msg_count = 0;
- m->old_views = views; // TODO deep copy
- for (int i=0; i<m->processes->size(); i++)
- {
- m->send_message(new MMonElectionCollect(m->read_num), ###address###, ###port###);
- }
- }
-};
-
-class C_Elect_TripTimer : public Context {
- Monitor *mon;
-public:
- C_Elect_TripTimer(Monitor *m) : mon(m){}
- void finish(int r) {
- m->views[whoami]->expired = true;
- m->registry[whoami]->e_num->s_num++;
- dout(1) << "Process " << whoami << " timed out (" << m->ackMsgCount << "/" << (m->f + 1) << ") ... increasing epoch. Now epoch is " << m->registry[whoami]->e_num->s_num;
- }
-};
-
-class C_Elect_RefreshTimer : public Context {
- Monitor *mon;
-public:
- C_Elect_TripTimer(Monitor *m) : mon(m){}
- void finish(int r) {
- m->ack_msg_count = 0;
- m->refresh_num++;
- MMonElectionRefresh msg = new MMonElectionRefresh(whoami, m->registry[whoami], m->refresh_num);
- for (int i=0; i<m->processes->size(); i++)
- {
- m->send_message(msg, ###i-ma dest###, ###port###);
- }
- // Start the trip timer
- m->round_trip_timer = new C_Elect_TripTimer(this);
- g_timer.add_event_after(trip_delta, m->round_trip_timer);
- }
-};
-
void Monitor::fake_reorg()
{
i != g_fake_osd_down.end();
i++) {
dout(0) << "will fake osd" << i->first << " DOWN after " << i->second << endl;
- g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 1));
+ g_timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 1));
}
for (map<int,float>::iterator i = g_fake_osd_out.begin();
i != g_fake_osd_out.end();
i++) {
dout(0) << "will fake osd" << i->first << " OUT after " << i->second << endl;
- g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 0));
+ g_timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 0));
}
}
messenger->set_dispatcher(this);
// start ticker
- g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger));
+ g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this));
}
void Monitor::dispatch(Message *m)
{
- switch (m->get_type()) {
- case MSG_FAILURE:
- handle_failure((MFailure*)m);
- break;
-
- case MSG_PING_ACK:
- handle_ping_ack((MPingAck*)m);
- break;
-
- case MSG_OSD_GETMAP:
- handle_osd_getmap((MOSDGetMap*)m);
- return;
-
- case MSG_OSD_BOOT:
- handle_osd_boot((MOSDBoot*)m);
- return;
-
- case MSG_SHUTDOWN:
- handle_shutdown(m);
- return;
-
- case MSG_PING:
- tick();
- delete m;
- return;
-
- case MSG_MON_ELECTION_ACK:
- handle_msg_ack(m);
- return;
-
- case MSG_MON_ELECTION_STATUS:
- handle_msg_status(m);
- return;
-
- case MSG_MON_ELECTION_COLLECT:
- handle_msg_collect(m);
- return;
-
- case MSG_MON_ELECTION_REFRESH:
- handle_msg_refresh(m);
- return;
-
- default:
- dout(0) << "unknown message " << *m << endl;
- assert(0);
+ lock.Lock();
+ {
+ switch (m->get_type()) {
+ case MSG_FAILURE:
+ handle_failure((MFailure*)m);
+ break;
+
+ case MSG_PING_ACK:
+ handle_ping_ack((MPingAck*)m);
+ break;
+
+ case MSG_OSD_GETMAP:
+ handle_osd_getmap((MOSDGetMap*)m);
+ break;
+
+ case MSG_OSD_BOOT:
+ handle_osd_boot((MOSDBoot*)m);
+ break;
+
+ case MSG_SHUTDOWN:
+ handle_shutdown(m);
+ break;
+
+ // elector messages
+ case MSG_MON_ELECTION_ACK:
+ case MSG_MON_ELECTION_STATUS:
+ case MSG_MON_ELECTION_COLLECT:
+ case MSG_MON_ELECTION_REFRESH:
+ elector.dispatch(m);
+ break;
+
+
+ default:
+ dout(0) << "unknown message " << *m << endl;
+ assert(0);
+ }
}
+ lock.Unlock();
}
void Monitor::fake_osd_failure(int osd, bool down)
{
- if (down) {
- dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
- pending.new_down[osd] = osdmap->osd_inst[osd];
- } else {
+ lock.Lock();
+ {
+ if (down) {
+ dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
+ pending.new_down[osd] = osdmap->osd_inst[osd];
+ } else {
dout(1) << "fake_osd_failure OUT osd" << osd << endl;
pending.new_out.push_back(osd);
+ }
+ accept_pending();
+ bcast_latest_osd_map_osd();
+ bcast_latest_osd_map_mds();
}
- accept_pending();
- bcast_latest_osd_map_osd();
- bcast_latest_osd_map_mds();
+ lock.Unlock();
}
void Monitor::tick()
{
- dout(10) << "tick" << endl;
-
- // mark down osds out?
- utime_t now = g_clock.now();
- list<int> mark_out;
- for (map<int,utime_t>::iterator i = pending_out.begin();
- i != pending_out.end();
- i++) {
- utime_t down = now;
- down -= i->second;
-
- if (down.sec() >= g_conf.mon_osd_down_out_interval) {
- dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl;
- mark_out.push_back(i->first);
- }
- }
- for (list<int>::iterator i = mark_out.begin();
- i != mark_out.end();
- i++) {
- pending_out.erase(*i);
- pending.new_out.push_back( *i );
- accept_pending();
- }
-
- // next!
- g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger));
-}
-
-/*****************************************
- HANDLERS FOR MSG OF THE ELECTION ALGORITHM
-******************************************/
- void Monitor::handle_ack_msg(MMonElectionAck* msg)
- {
- assert(this->refresh_num >= msg->get_refresh_num());
-
- if (this->refresh_num > msg->get_refresh_num())
- {
- // we got the message too late... discard it
- return;
- }
- this->ack_msg_count++;
- if (this->ack_msg_count >= this->f + 1)
- {
- dout(5) << "P" << this->p_id << ": Received _f+1 acks, increase freshness" << endl;
- g_timer.cancel_event(this->round_trip_task);
- this->round_trip_timer->cancel();
- this->registry[this->p_id]->freshness++;
- }
- }
-
- void Monitor::handle_collect_msg(MMonElectionCollect* msg)
- {
- messenger->send_message(new MMonElectionStatus(this->whoami,
- msg->getSenderId(),
- msg->getReadNum(),
- this->registry),
- msg->get_source(),
- msg->get_source_port());
- }
-
- void Monitor::handle_refresh_msg(MMonElectionRefresh* msg)
- {
- if (this->registry[msg->p]->isLesser(msg->state))
- {
- // update local data
- this->registry[msg->p] = msg->state;
- // reply to msg
- send(new MMonElectionAck(this->whoami,
- msg->p,
- msg->refresh_num),
- msg->get_source(),
- msg->get_source_port());
- }
- }
-
- void Monitor::handle_status_msg(MMonElectionStatus* msg)
- {
- if (this->read_num != msg->read_num)
- {
- dout(1) << _processId << ":HANDLING:" << msg.getType() << ":DISCARDED B/C OF READNUM(" << this->read_num << ":" << msg->read_num << ")" << endl;
- return;
- }
- for (int i=0; i<this->processes->size(); i++)
- {
- int r = processes[i];
- // Put in the view the max value between then new state and the stored one
- if ( msg->registry[r]->isGreater(this->views[r]->state) )
- {
- this->views[r]->state = msg->registry[r];
- }
- }
-
- this->status_msg_count++;
- if (this->status_msg_count >= this->processes->size() - _f) // Responses from quorum collected
- {
- for (int i=0; i<this->processes->size(); i++)
- {
- int r = processes[i];
- // Check if r has refreshed its epoch number
- if (! this->views[r]->state->isGreater(this->old_views[r]->state) )
- {
- dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl;
- this->views[r]->expired = true;
- }
- if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num))
- {
- this->views[r]->expired = false;
- }
- }
- Epoch leader_epoch = get_min_epoch();
- this->leader_id = leader_epoch->p_id;
- dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl;
-
- // Restarts the timer for the next iteration
- g_timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this));
- }
- }
-
- // return the minimum epoch in the this->view map
- private Monitor::get_min_epoch()
- {
- Epoch min = new Epoch(-1, -1);
- for (int i=0; i<this->views->size(); i++)
- {
- View v = this->views[i];
- if (v->state->e_num->isLesser(min) && !v->expired)
- {
- min = v->state->e_num;
- }
- }
- return min;
- }
-
- void Monitor::start_read_timer()
- {
-
- }
-/*****************************************
- CLASSES NEEDED FOR THE ELECTION ALGORITHM
-******************************************/
-class Epoch {
- public:
- int p_id;
- int s_num;
-
- Epoch(int p_id, int s_num) {
- this->p_id = p_id;
- this->s_num = s_num;
- }
-
- bool operator>(Epoch *e);
- bool operator<(Epoch *e);
- bool operator==(Epoch *e);
-
-};
-
-bool Epoch::operator>(Epoch *e)
-{
- if (this->s_num == e->s_num)
+ lock.Lock();
{
- return (this->p_id > e->p_id);
- }
- else
- {
- return (this->s_num > e->s_num);
- }
-}
-
-bool Epoch::operator<(Epoch *e)
-{
- if (this->s_num == e->s_num)
- {
- return (this->p_id < e->p_id);
- }
- else
- {
- return (this->s_num < e->s_num);
+ dout(10) << "tick" << endl;
+
+ // mark down osds out?
+ utime_t now = g_clock.now();
+ list<int> mark_out;
+ for (map<int,utime_t>::iterator i = pending_out.begin();
+ i != pending_out.end();
+ i++) {
+ utime_t down = now;
+ down -= i->second;
+
+ if (down.sec() >= g_conf.mon_osd_down_out_interval) {
+ dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl;
+ mark_out.push_back(i->first);
+ }
+ }
+ for (list<int>::iterator i = mark_out.begin();
+ i != mark_out.end();
+ i++) {
+ pending_out.erase(*i);
+ pending.new_out.push_back( *i );
+ accept_pending();
+ }
+
+ // next!
+ g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this));
}
+ lock.Unlock();
}
-bool Epoch::operator==(Epoch *e)
-{
- return ((this->s_num == e->s_num) && (this->p_id > e->p_id));
-}
-class State {
- public:
- Epoch *e_num;
- int freshness;
-
- State (Epoch *e_num, int freshness)
- {
- this->e_num = e_num;
- this->freshness = freshness;
- }
-
- bool operator>(State *e);
- bool operator<(State *e);
- bool operator==(State *e);
-};
-bool State::operator>(State *e)
-{
- if (this->e_num == e->e_num)
- {
- return (this->freshness > e->freshness);
- }
- else
- {
- return this->e_num > e->e_num;
- }
-}
-bool State::operator<(State *e)
-{
- if (this->e_num == e->e_num)
- {
- return (this->freshness < e->freshness);
- }
- else
- {
- return this->e_num < e->e_num;
- }
-}
-bool State::operator==(State *e)
-{
- return ( (this->e_num == e->e_num) && (this->freshness == e->freshness) );
-}
-class View {
- public:
- State *p_state;
- bool expired;
- View (State* p_state, bool expired) {
- this->p_state = p_state;
- this->expired = expired;
- }
-};