#include "messages/MOSDMap.h"
#include "messages/MOSDGetMap.h"
#include "messages/MOSDBoot.h"
+#include "messages/MMonElectionRefresh.h"
+#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionAck.h"
+#include "messages/MMonElectionCollect.h"
#include "common/Timer.h"
#include "common/Clock.h"
void Monitor::dispatch(Message *m)
{
+<<<<<<< Monitor.cc
+ switch (m->get_type()) {
+ case MSG_FAILURE:
+ handle_failure((MFailure*)m);
+ break;
+
+ case MSG_PING_ACK:
+ handle_ping_ack((MPingAck*)m);
+ break;
+
+ case MSG_OSD_GETMAP:
+ handle_osd_getmap((MOSDGetMap*)m);
+ return;
+
+ case MSG_OSD_BOOT:
+ handle_osd_boot((MOSDBoot*)m);
+ return;
+
+ case MSG_SHUTDOWN:
+ handle_shutdown(m);
+ return;
+
+ case MSG_PING:
+ tick();
+ delete m;
+ return;
+
+ case MSG_MON_ELECTION_ACK:
+ handle_msg_ack(m);
+ return;
+
+ case MSG_MON_ELECTION_STATUS:
+ handle_msg_status(m);
+ return;
+
+ case MSG_MON_ELECTION_COLLECT:
+ handle_msg_collect(m);
+ return;
+
+ case MSG_MON_ELECTION_REFRESH:
+ handle_msg_refresh(m);
+ return;
+
+ default:
+ dout(0) << "unknown message " << *m << endl;
+ assert(0);
+=======
lock.Lock();
{
switch (m->get_type()) {
dout(0) << "unknown message " << *m << endl;
assert(0);
}
+>>>>>>> 1.6
}
lock.Unlock();
}
void Monitor::tick()
{
+<<<<<<< Monitor.cc
+ dout(10) << "tick" << endl;
+
+ // mark down osds out?
+ utime_t now = g_clock.now();
+ list<int> mark_out;
+ for (map<int,utime_t>::iterator i = pending_out.begin();
+ i != pending_out.end();
+ i++) {
+ utime_t down = now;
+ down -= i->second;
+
+ if (down.sec() >= g_conf.mon_osd_down_out_interval) {
+ dout(10) << "tick marking osd" << i->first << " OUT after " << down << " sec" << endl;
+ mark_out.push_back(i->first);
+ }
+ }
+ for (list<int>::iterator i = mark_out.begin();
+ i != mark_out.end();
+ i++) {
+ pending_out.erase(*i);
+ pending.new_out.push_back( *i );
+ accept_pending();
+ }
+
+ // next!
+ g_timer.add_event_after(g_conf.mon_tick_interval, new C_OM_PingTick(messenger));
+}
+
+/*****************************************
+ HANDLERS FOR MSG OF THE ELECTION ALGORITHM
+******************************************/
+ void Monitor::handle_ack_msg(MMonElectionAck* msg)
+ {
+ assert(this->refresh_num >= msg->get_refresh_num());
+
+ if (this->refresh_num > msg->get_refresh_num())
+ {
+ // we got the message too late... discard it
+ return;
+ }
+ this->ack_msg_count++;
+ if (this->ack_msg_count >= this->f + 1)
+ {
+ dout(5) << "P" << this->p_id << ": Received _f+1 acks, increase freshness" << endl;
+ this->round_trip_timer->cancel();
+ this->registry[this->p_id]->freshness++;
+ }
+ }
+
+ void Monitor::handle_collect_msg(MMonElectionCollect* msg)
+ {
+ messenger->send_message(new MMonElectionStatus(this->whoami,
+ msg->getSenderId(),
+ msg->getReadNum(),
+ this->registry),
+ msg->get_source(),
+ msg->get_source_port());
+ }
+
+ void Monitor::handle_refresh_msg(MMonElectionRefresh* msg)
+ {
+ if (this->registry[msg->p]->isLesser(msg->state))
+ {
+ // update local data
+ this->registry[msg->p] = msg->state;
+ // reply to msg
+ send(new MMonElectionAck(this->whoami,
+ msg->p,
+ msg->refresh_num),
+ msg->get_source(),
+ msg->get_source_port());
+ }
+ }
+
+ void Monitor::handle_status_msg(MMonElectionStatus* msg)
+ {
+ if (this->read_num != msg->read_num)
+ {
+ dout(1) << _processId << ":HANDLING:" << msg.getType() << ":DISCARDED B/C OF READNUM(" << this->read_num << ":" << msg->read_num << ")" << endl;
+ return;
+ }
+ for (int i=0; i<this->processes->size(); i++)
+ {
+ int r = processes[i];
+ // Put in the view the max value between then new state and the stored one
+ if ( msg->registry[r]->isGreater(this->views[r]->state) )
+ {
+ this->views[r]->state = msg->registry[r];
+ }
+ }
+
+ this->status_msg_count++;
+ if (this->status_msg_count >= this->processes->size() - _f) // Responses from quorum collected
+ {
+ for (int i=0; i<this->processes->size(); i++)
+ {
+ int r = processes[i];
+ // Check if r has refreshed its epoch number
+ if (! this->views[r]->state->isGreater(this->old_views[r]->state) )
+ {
+ dout(5) << this->whoami << ":Other process (" << r << ") has expired" << endl;
+ this->views[r]->expired = true;
+ }
+ if (this->views[r]->state->e_num->isGreater(this->old_views[r]->state->e_num))
+ {
+ this->views[r]->expired = false;
+ }
+ }
+ Epoch leader_epoch = get_min_epoch();
+ this->leader_id = leader_epoch->p_id;
+ dout(1) << this->whoami << " thinks leader has ID: " << this->leader_id << endl;
+
+ // Restarts the timer for the next iteration
+ startReadTimer();
+ }
+ }
+
+ // return the minimum epoch in the this->view map
+ private Monitor::get_min_epoch()
+ {
+ Epoch min = null;
+ for (View v : views)
+ {
+ if (min == null ||
+ (Epoch.compare(v.getState().getEpochNum(), min) < 0 && !v.isExpired()))
+ {
+ min = v.getState().getEpochNum();
+ }
+ }
+ return min;
+ }
+
+/*****************************************
+ CLASSES NEEDED FOR THE ELECTION ALGORITHM
+******************************************/
+class Epoch {
+ public:
+ int p_id;
+ int s_num;
+
+ Epoch(int p_id, int s_num) {
+ this->p_id = p_id;
+ this->s_num = s_num;
+ }
+
+ bool isGreater(Epoch *e);
+ bool isLesser(Epoch *e);
+ bool isEqual(Epoch *e);
+
+};
+
+bool Epoch::isGreater(Epoch *e)
+{
+ if (this->s_num == e->s_num)
+ {
+ return (this->p_id > e->p_id);
+ }
+ else
+ {
+ return (this->s_num > e->s_num);
+ }
+}
+
+bool Epoch::isLesser(Epoch *e)
+{
+ if (this->s_num == e->s_num)
+ {
+ return (this->p_id < e->p_id);
+ }
+ else
+ {
+ return (this->s_num < e->s_num);
+ }
+}
+
+bool Epoch::isEqual(Epoch *e)
+{
+ return ((this->s_num == e->s_num) && (this->p_id > e->p_id));
+}
+
+class State {
+ public:
+ Epoch *e_num;
+ int freshness;
+
+ State (Epoch *e_num, int freshness)
+ {
+ this->e_num = e_num;
+ this->freshness = freshness;
+ }
+
+ bool isGreater(State *e);
+ bool isLesser(State *e);
+ bool isEqual(State *e);
+};
+
+bool State::isGreater(State *e)
+{
+ if (this->e_num->isEqual(e->e_num))
+ {
+ return (this->freshness > e->freshness);
+ }
+ else
+ {
+ return this->e_num->isGreater(e->e_num);
+ }
+}
+
+bool State::isLesser(State *e)
+{
+ if (this->e_num->isEqual(e->e_num))
+ {
+ return (this->freshness < e->freshness);
+ }
+ else
+ {
+ return this->e_num->isLesser(e->e_num);
+=======
lock.Lock();
{
dout(10) << "tick" << endl;
// next!
g_timer.add_event_after(g_conf.mon_tick_interval, new C_Mon_Tick(this));
+>>>>>>> 1.6
}
+<<<<<<< Monitor.cc
+=======
lock.Unlock();
+>>>>>>> 1.6
}
+
+bool State::isEqual(State *e)
+{
+ return (this->e_num->isEqual(e->e_num) && (this->freshness == e->freshness));
+}
+
+class View {
+ public:
+ State *p_state;
+ bool expired;
+ View (State* p_state, bool expired) {
+ this->p_state = p_state;
+ this->expired = expired;
+ }
+};
void bcast_latest_osd_map_mds();
void bcast_latest_osd_map_osd();
+ /*******************************************
+ * Variables used by the election algorithm *
+ *******************************************/
+ // used during refresh phase
+ int ack_msg_count;
+ int refresh_num;
+
+ // used during read phase
+ int read_num;
+ int status_msg_count;
+
+ // the leader process id
+ int leader_id;
+ // f-accessible
+ int f;
+
+ // the processes that compose the group
+// vector<int> processes;
+ // parameters for the process
+ int main_delta;
+ int trip_delta;
+
+ // state variables
+ hash_map<int, state> registry;
+ hash_map<int, view> views;
+ hash_map<int, view> old_views;
+ /************************************************
+ * END> Variables used by the election algorithm *
+ *************************************************/
public:
Monitor(int w, Messenger *m) :
void handle_osd_getmap(class MOSDGetMap *m);
void handle_ping_ack(class MPingAck *m);
+<<<<<<< Monitor.h
+
+ // handles for election messages
+ void handle_ack_msg(class MMonElectionAck);
+ void handle_collect_msg(class MMonElectionCollect);
+ void handle_refresh_msg(class MMonElectionRefresh);
+ void handle_status_msg(class MMoneElectionStatus);
+
+=======
void tick(); // periodic stuff. check state, take actions
+>>>>>>> 1.4
// hack
void fake_osd_failure(int osd, bool down);