From d2bdb7a482cf07b9fb5456ecd7474cbad6408252 Mon Sep 17 00:00:00 2001 From: sage Date: Sat, 19 Aug 2006 15:17:33 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@806 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/mon/Elector.cc | 218 ++++++++++++++++++++++ ceph/mon/Elector.h | 161 ++++++++++++++++ ceph/mon/Monitor.cc | 435 +++++++++----------------------------------- ceph/mon/Monitor.h | 48 +---- 4 files changed, 474 insertions(+), 388 deletions(-) create mode 100644 ceph/mon/Elector.cc create mode 100644 ceph/mon/Elector.h diff --git a/ceph/mon/Elector.cc b/ceph/mon/Elector.cc new file mode 100644 index 0000000000000..ff2e319fc0ff6 --- /dev/null +++ b/ceph/mon/Elector.cc @@ -0,0 +1,218 @@ + +#include "Elector.h" +#include "Monitor.h" + +#include "common/Timer.h" + +#include "messages/MMonElectionRefresh.h" +#include "messages/MMonElectionStatus.h" +#include "messages/MMonElectionAck.h" +#include "messages/MMonElectionCollect.h" + +#include "config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << "mon" << whoami << " " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " " + + + +class C_Elect_ReadTimer : public Context { + Elector *mon; +public: + C_Elect_ReadTimer(Elector *m) : mon(m){} + void finish(int r) { + mon->read_timer(); + } +}; + +void Elector::read_timer() +{ + lock.Lock(); + { + read_num++; + status_msg_count = 0; + old_views = views; // TODO deep copy + for (int i=0; imessenger->send_message(new MMonElectionCollect(read_num), + MSG_ADDR_MON(processes[i])); + } + } + lock.Unlock(); +}; + +class C_Elect_TripTimer : public Context { + Elector *mon; +public: + C_Elect_TripTimer(Elector *m) : mon(m){} + void finish(int r) { + mon->trip_timer(); + } +}; + +void Elector::trip_timer() +{ + lock.Lock(); + { + views[whoami].expired = true; + registry[whoami].epoch.s_num++; + dout(1) << "Process " << whoami + << " timed out (" << m->ackMsgCount << "/" << (m->f + 1) + << ") ... increasing epoch. Now epoch is " + << m->registry[whoami]->epoch->s_num + << endl; + } + lock.Unlock(); +}; + + + +class C_Elect_RefreshTimer : public Context { + Elector *mon; +public: + C_Elect_RefreshTimer(Elector *m) : mon(m) {} + void finish(int r) { + mon->refresh_timer(); + } +}; + +void Elector::refresh_timer() +{ + lock.Lock(); + { + ack_msg_count = 0; + refresh_num++; + MMonElectionRefresh *msg = new MMonElectionRefresh(whoami, registry[whoami], refresh_num); + for (int i=0; imessenger->send_message(msg, MSG_ADDR_MON(processes[i])); + } + + // Start the trip timer + round_trip_timer = new C_Elect_TripTimer(this); + g_timer.add_event_after(trip_delta, m->round_trip_timer); + } + lock.Unlock(); +}; + + + +////////////////////////// + + +void Elector::dispatch(Message *m) +{ + lock.Lock(); + { + switch (m->get_type()) { + case MSG_MON_ELECTION_ACK: + handle_ack(m); + break; + + case MSG_MON_ELECTION_STATUS: + handle_status(m); + break; + + case MSG_MON_ELECTION_COLLECT: + handle_collect(m); + break; + + case MSG_MON_ELECTION_REFRESH: + handle_refresh(m); + break; + + default: + assert(0); + } + } + lock.Unlock(); +} + +void Elector::handle_ack(MMonElectionAck* msg) +{ + assert(refresh_num >= msg->get_refresh_num()); + + if (refresh_num > msg->get_refresh_num()) { + // we got the message too late... discard it + return; + } + ack_msg_count++; + if (ack_msg_count >= f + 1) { + dout(5) << "P" << p_id << ": Received _f+1 acks, increase freshness" << endl; + g_timer.cancel_event(round_trip_task); + round_trip_timer->cancel(); + registry[p_id]->freshness++; + } + + delete msg; +} + +void Elector::handle_collect(MMonElectionCollect* msg) +{ + messenger->send_message(new MMonElectionStatus(whoami, + msg->getSenderId(), + msg->getReadNum(), + registry), + msg->get_source()); + delete msg; +} + +void Elector::handle_refresh(MMonElectionRefresh* msg) +{ + if (this->registry[msg->p]->isLesser(msg->state)) { + // update local data + registry[msg->p] = msg->state; + + // reply to msg + messenger->send_message(new MMonElectionAck(whoami, + msg->p, + msg->refresh_num), + msg->get_source()); + } + + delete msg; +} + + +void Elector::handle_status(MMonElectionStatus* msg) +{ + if (read_num != msg->read_num) { + dout(1) << _processId << ":HANDLING:" << msg.getType() + << ":DISCARDED B/C OF READNUM(" << read_num << ":" + << msg->read_num << ")" + << endl; + return; + } + for (int i=0; iprocesses->size(); i++) { + int r = processes[i]; + // Put in the view the max value between then new state and the stored one + if ( msg->registry[r]->isGreater(views[r]->state) ) { + views[r]->state = msg->registry[r]; + } + } + + status_msg_count++; + if (status_msg_count >= processes.size() - _f) { // Responses from quorum collected + for (int i=0; iviews[r]->state->isGreater(this->old_views[r]->state) ) + { + dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl; + this->views[r]->expired = true; + } + if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num)) + { + this->views[r]->expired = false; + } + } + Epoch leader_epoch = get_min_epoch(); + this->leader_id = leader_epoch->p_id; + dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl; + + // Restarts the timer for the next iteration + g_timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this)); + } +} + + + + diff --git a/ceph/mon/Elector.h b/ceph/mon/Elector.h new file mode 100644 index 0000000000000..084e59d92d57c --- /dev/null +++ b/ceph/mon/Elector.h @@ -0,0 +1,161 @@ +// -*- mode:C++; tab-width:4; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef __MON_ELECTOR_H +#define __MON_ELECTOR_H + +#include +using namespace std; + +#include "include/types.h" +#include "msg/Message.h" + + +class Monitor; + + +class Elector { + public: + + //// sub-classes + + // Epoch + class Epoch { + public: + int p_id; + int s_num; + + Epoch(int p_id=0, int s_num=0) { + this->p_id = p_id; + this->s_num = s_num; + } + + bool operator>(Epoch& e) { + if (s_num == e.s_num) + return (p_id > e.p_id); + else + return (s_num > e.s_num); + } + bool operator<(Epoch& e) { + if (s_num == e.s_num) + return (p_id < e.p_id); + else + return (s_num < e.s_num); + } + bool operator==(Epoch& e) { + return ((s_num == e.s_num) && (p_id > e.p_id)); + } + }; + + + // State + class State { + public: + Epoch epoch; + int freshness; + + State() {}; + State (Epoch& e, int f) : + epoch(e), freshness(f) {} + + bool operator>(State& e) { + if (epoch == e.epoch) + return (freshness > e.freshness); + else + return epoch > e.epoch; + } + bool operator<(State& e) { + if (epoch == e.epoch) + return (freshness < e.freshness); + else + return epoch < e.epoch; + } + bool operator==(State& e) { + return ( (epoch == e.epoch) && (freshness == e.freshness) ); + } + }; + + + class View { + public: + State state; + bool expired; + View(State& s, bool e) : state(s), expired(e) {} + }; + + + /////////////// + private: + Monitor *mon; + int whoami; + Mutex lock; + + // used during refresh phase + int ack_msg_count; + int refresh_num; + + // used during read phase + int read_num; + int status_msg_count; + + // the leader process id + int leader_id; + // f-accessible + int f; + + // the processes that compose the group + // vector processes; + // parameters for the process + int main_delta; + int trip_delta; + + // state variables + map registry; + map views; + map old_views; + + // get the minimum epoch in the view map + Epoch get_min_epoch() { + assert(!views.empty()); + Epoch min = views[0].state.epoch; + for (unsigned i=1; iget_epoch():0) << " " -class C_OM_PingTick : public Context { -public: - Messenger *msgr; - C_OM_PingTick(Messenger *m) : msgr(m) {} - void finish(int r) { - msgr->send_message(new MPing, MSG_ADDR_MON(0)); - } -}; - -class C_OM_Faker : public Context { +class C_Mon_Tick : public Context { + Monitor *mon; public: - Monitor *om; - C_OM_Faker(Monitor *m) { - this->om = m; - } + C_Mon_Tick(Monitor *m) : mon(m) {} void finish(int r) { - om->fake_reorg(); + mon->tick(); } }; -class C_OM_FakeOSDFailure : public Context { +class C_Mon_FakeOSDFailure : public Context { Monitor *mon; int osd; bool down; public: - C_OM_FakeOSDFailure(Monitor *m, int o, bool d) : mon(m), osd(o), down(d) {} + C_Mon_FakeOSDFailure(Monitor *m, int o, bool d) : mon(m), osd(o), down(d) {} void finish(int r) { mon->fake_osd_failure(osd,down); } }; -class C_Elect_ReadTimer : public Context { - Monitor *mon; -public: - C_Elect_ReadTimer(Monitor *m) : mon(m){} - void finish(int r) { - m->read_num++; - m->status_msg_count = 0; - m->old_views = views; // TODO deep copy - for (int i=0; iprocesses->size(); i++) - { - m->send_message(new MMonElectionCollect(m->read_num), ###address###, ###port###); - } - } -}; - -class C_Elect_TripTimer : public Context { - Monitor *mon; -public: - C_Elect_TripTimer(Monitor *m) : mon(m){} - void finish(int r) { - m->views[whoami]->expired = true; - m->registry[whoami]->e_num->s_num++; - dout(1) << "Process " << whoami << " timed out (" << m->ackMsgCount << "/" << (m->f + 1) << ") ... increasing epoch. Now epoch is " << m->registry[whoami]->e_num->s_num; - } -}; - -class C_Elect_RefreshTimer : public Context { - Monitor *mon; -public: - C_Elect_TripTimer(Monitor *m) : mon(m){} - void finish(int r) { - m->ack_msg_count = 0; - m->refresh_num++; - MMonElectionRefresh msg = new MMonElectionRefresh(whoami, m->registry[whoami], m->refresh_num); - for (int i=0; iprocesses->size(); i++) - { - m->send_message(msg, ###i-ma dest###, ###port###); - } - // Start the trip timer - m->round_trip_timer = new C_Elect_TripTimer(this); - g_timer.add_event_after(trip_delta, m->round_trip_timer); - } -}; - void Monitor::fake_reorg() { @@ -200,13 +141,13 @@ void Monitor::init() i != g_fake_osd_down.end(); i++) { dout(0) << "will fake osd" << i->first << " DOWN after " << i->second << endl; - g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 1)); + g_timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 1)); } for (map::iterator i = g_fake_osd_out.begin(); i != g_fake_osd_out.end(); i++) { dout(0) << "will fake osd" << i->first << " OUT after " << i->second << endl; - g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 0)); + g_timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 0)); } } @@ -215,58 +156,50 @@ void Monitor::init() messenger->set_dispatcher(this); // start ticker - g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger)); + g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this)); } void Monitor::dispatch(Message *m) { - switch (m->get_type()) { - case MSG_FAILURE: - handle_failure((MFailure*)m); - break; - - case MSG_PING_ACK: - handle_ping_ack((MPingAck*)m); - break; - - case MSG_OSD_GETMAP: - handle_osd_getmap((MOSDGetMap*)m); - return; - - case MSG_OSD_BOOT: - handle_osd_boot((MOSDBoot*)m); - return; - - case MSG_SHUTDOWN: - handle_shutdown(m); - return; - - case MSG_PING: - tick(); - delete m; - return; - - case MSG_MON_ELECTION_ACK: - handle_msg_ack(m); - return; - - case MSG_MON_ELECTION_STATUS: - handle_msg_status(m); - return; - - case MSG_MON_ELECTION_COLLECT: - handle_msg_collect(m); - return; - - case MSG_MON_ELECTION_REFRESH: - handle_msg_refresh(m); - return; - - default: - dout(0) << "unknown message " << *m << endl; - assert(0); + lock.Lock(); + { + switch (m->get_type()) { + case MSG_FAILURE: + handle_failure((MFailure*)m); + break; + + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; + + case MSG_OSD_GETMAP: + handle_osd_getmap((MOSDGetMap*)m); + break; + + case MSG_OSD_BOOT: + handle_osd_boot((MOSDBoot*)m); + break; + + case MSG_SHUTDOWN: + handle_shutdown(m); + break; + + // elector messages + case MSG_MON_ELECTION_ACK: + case MSG_MON_ELECTION_STATUS: + case MSG_MON_ELECTION_COLLECT: + case MSG_MON_ELECTION_REFRESH: + elector.dispatch(m); + break; + + + default: + dout(0) << "unknown message " << *m << endl; + assert(0); + } } + lock.Unlock(); } @@ -320,16 +253,20 @@ void Monitor::handle_failure(MFailure *m) void Monitor::fake_osd_failure(int osd, bool down) { - if (down) { - dout(1) << "fake_osd_failure DOWN osd" << osd << endl; - pending.new_down[osd] = osdmap->osd_inst[osd]; - } else { + lock.Lock(); + { + if (down) { + dout(1) << "fake_osd_failure DOWN osd" << osd << endl; + pending.new_down[osd] = osdmap->osd_inst[osd]; + } else { dout(1) << "fake_osd_failure OUT osd" << osd << endl; pending.new_out.push_back(osd); + } + accept_pending(); + bcast_latest_osd_map_osd(); + bcast_latest_osd_map_mds(); } - accept_pending(); - bcast_latest_osd_map_osd(); - bcast_latest_osd_map_mds(); + lock.Unlock(); } @@ -521,242 +458,40 @@ void Monitor::bcast_latest_osd_map_osd() void Monitor::tick() { - dout(10) << "tick" << endl; - - // mark down osds out? - utime_t now = g_clock.now(); - list mark_out; - for (map::iterator i = pending_out.begin(); - i != pending_out.end(); - i++) { - utime_t down = now; - down -= i->second; - - if (down.sec() >= g_conf.mon_osd_down_out_interval) { - dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl; - mark_out.push_back(i->first); - } - } - for (list::iterator i = mark_out.begin(); - i != mark_out.end(); - i++) { - pending_out.erase(*i); - pending.new_out.push_back( *i ); - accept_pending(); - } - - // next! - g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger)); -} - -/***************************************** - HANDLERS FOR MSG OF THE ELECTION ALGORITHM -******************************************/ - void Monitor::handle_ack_msg(MMonElectionAck* msg) - { - assert(this->refresh_num >= msg->get_refresh_num()); - - if (this->refresh_num > msg->get_refresh_num()) - { - // we got the message too late... discard it - return; - } - this->ack_msg_count++; - if (this->ack_msg_count >= this->f + 1) - { - dout(5) << "P" << this->p_id << ": Received _f+1 acks, increase freshness" << endl; - g_timer.cancel_event(this->round_trip_task); - this->round_trip_timer->cancel(); - this->registry[this->p_id]->freshness++; - } - } - - void Monitor::handle_collect_msg(MMonElectionCollect* msg) - { - messenger->send_message(new MMonElectionStatus(this->whoami, - msg->getSenderId(), - msg->getReadNum(), - this->registry), - msg->get_source(), - msg->get_source_port()); - } - - void Monitor::handle_refresh_msg(MMonElectionRefresh* msg) - { - if (this->registry[msg->p]->isLesser(msg->state)) - { - // update local data - this->registry[msg->p] = msg->state; - // reply to msg - send(new MMonElectionAck(this->whoami, - msg->p, - msg->refresh_num), - msg->get_source(), - msg->get_source_port()); - } - } - - void Monitor::handle_status_msg(MMonElectionStatus* msg) - { - if (this->read_num != msg->read_num) - { - dout(1) << _processId << ":HANDLING:" << msg.getType() << ":DISCARDED B/C OF READNUM(" << this->read_num << ":" << msg->read_num << ")" << endl; - return; - } - for (int i=0; iprocesses->size(); i++) - { - int r = processes[i]; - // Put in the view the max value between then new state and the stored one - if ( msg->registry[r]->isGreater(this->views[r]->state) ) - { - this->views[r]->state = msg->registry[r]; - } - } - - this->status_msg_count++; - if (this->status_msg_count >= this->processes->size() - _f) // Responses from quorum collected - { - for (int i=0; iprocesses->size(); i++) - { - int r = processes[i]; - // Check if r has refreshed its epoch number - if (! this->views[r]->state->isGreater(this->old_views[r]->state) ) - { - dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl; - this->views[r]->expired = true; - } - if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num)) - { - this->views[r]->expired = false; - } - } - Epoch leader_epoch = get_min_epoch(); - this->leader_id = leader_epoch->p_id; - dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl; - - // Restarts the timer for the next iteration - g_timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this)); - } - } - - // return the minimum epoch in the this->view map - private Monitor::get_min_epoch() - { - Epoch min = new Epoch(-1, -1); - for (int i=0; iviews->size(); i++) - { - View v = this->views[i]; - if (v->state->e_num->isLesser(min) && !v->expired) - { - min = v->state->e_num; - } - } - return min; - } - - void Monitor::start_read_timer() - { - - } -/***************************************** - CLASSES NEEDED FOR THE ELECTION ALGORITHM -******************************************/ -class Epoch { - public: - int p_id; - int s_num; - - Epoch(int p_id, int s_num) { - this->p_id = p_id; - this->s_num = s_num; - } - - bool operator>(Epoch *e); - bool operator<(Epoch *e); - bool operator==(Epoch *e); - -}; - -bool Epoch::operator>(Epoch *e) -{ - if (this->s_num == e->s_num) + lock.Lock(); { - return (this->p_id > e->p_id); - } - else - { - return (this->s_num > e->s_num); - } -} - -bool Epoch::operator<(Epoch *e) -{ - if (this->s_num == e->s_num) - { - return (this->p_id < e->p_id); - } - else - { - return (this->s_num < e->s_num); + dout(10) << "tick" << endl; + + // mark down osds out? + utime_t now = g_clock.now(); + list mark_out; + for (map::iterator i = pending_out.begin(); + i != pending_out.end(); + i++) { + utime_t down = now; + down -= i->second; + + if (down.sec() >= g_conf.mon_osd_down_out_interval) { + dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl; + mark_out.push_back(i->first); + } + } + for (list::iterator i = mark_out.begin(); + i != mark_out.end(); + i++) { + pending_out.erase(*i); + pending.new_out.push_back( *i ); + accept_pending(); + } + + // next! + g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this)); } + lock.Unlock(); } -bool Epoch::operator==(Epoch *e) -{ - return ((this->s_num == e->s_num) && (this->p_id > e->p_id)); -} -class State { - public: - Epoch *e_num; - int freshness; - - State (Epoch *e_num, int freshness) - { - this->e_num = e_num; - this->freshness = freshness; - } - - bool operator>(State *e); - bool operator<(State *e); - bool operator==(State *e); -}; -bool State::operator>(State *e) -{ - if (this->e_num == e->e_num) - { - return (this->freshness > e->freshness); - } - else - { - return this->e_num > e->e_num; - } -} -bool State::operator<(State *e) -{ - if (this->e_num == e->e_num) - { - return (this->freshness < e->freshness); - } - else - { - return this->e_num < e->e_num; - } -} -bool State::operator==(State *e) -{ - return ( (this->e_num == e->e_num) && (this->freshness == e->freshness) ); -} -class View { - public: - State *p_state; - bool expired; - View (State* p_state, bool expired) { - this->p_state = p_state; - this->expired = expired; - } -}; diff --git a/ceph/mon/Monitor.h b/ceph/mon/Monitor.h index 3f8e1cf795a53..77cf502f99620 100644 --- a/ceph/mon/Monitor.h +++ b/ceph/mon/Monitor.h @@ -26,10 +26,19 @@ using namespace std; #include "osd/OSDMap.h" +#include "Elector.h" + + class Monitor : public Dispatcher { +protected: // me int whoami; Messenger *messenger; + Mutex lock; + + // elector + Elector elector; + friend class Elector; // maps OSDMap *osdmap; @@ -57,43 +66,12 @@ class Monitor : public Dispatcher { void get_min_epoch(); void start_read_timer(); - /******************************************* - * Variables used by the election algorithm * - *******************************************/ - // used during refresh phase - int ack_msg_count; - int refresh_num; - - // used during read phase - int read_num; - int status_msg_count; - - // the leader process id - int leader_id; - // f-accessible - int f; - - // the processes that compose the group -// vector processes; - // parameters for the process - int main_delta; - int trip_delta; - - // state variables - hash_map registry; - hash_map views; - hash_map old_views; - - // round trip timer - round_trip_task - /************************************************ - * END> Variables used by the election algorithm * - *************************************************/ public: Monitor(int w, Messenger *m) : whoami(w), messenger(m), + elector(this, w), osdmap(0) { } @@ -109,12 +87,6 @@ class Monitor : public Dispatcher { void handle_ping_ack(class MPingAck *m); - // handles for election messages - void handle_ack_msg(class MMonElectionAck); - void handle_collect_msg(class MMonElectionCollect); - void handle_refresh_msg(class MMonElectionRefresh); - void handle_status_msg(class MMoneElectionStatus); - // hack void fake_osd_failure(int osd, bool down); void fake_reorg(); -- 2.39.5