map_ver[notify->machine_id] = notify->ver;
}
-static void send_observe_requests(bool);
-static bool is_timeout = false;
+static void send_observe_requests();
class C_ObserverRefresh : public Context {
public:
bool newmon;
C_ObserverRefresh(bool n) : newmon(n) {}
void finish(int r) {
- is_timeout = false;
- send_observe_requests(newmon);
+ send_observe_requests();
}
};
-static void send_observe_requests(bool newmon)
+static void send_observe_requests()
{
- dout(1) << "send_observe_requests " << newmon << dendl;
+ dout(1) << "send_observe_requests " << dendl;
- if (is_timeout)
- return;
-
- int mon = mc.monmap.pick_mon(newmon);
bool sent = false;
for (int i=0; i<PAXOS_NUM; i++) {
if (registered.count(i))
continue;
MMonObserve *m = new MMonObserve(mc.monmap.fsid, i, map_ver[i]);
- dout(1) << "mon" << mon << " <- observe " << get_paxos_name(i) << dendl;
- messenger->send_message(m, mc.monmap.get_inst(mon));
+ dout(1) << "mon" << " <- observe " << get_paxos_name(i) << dendl;
+ mc.send_mon_message(m);
sent = true;
}
dout(1) << " refresh after " << seconds << " with same mon" << dendl;
timer.add_event_after(seconds, new C_ObserverRefresh(false));
} else {
- is_timeout = true;
+ //is_timeout = true;
dout(1) << " refresh after " << retry_seconds << " with new mon" << dendl;
timer.add_event_after(retry_seconds, new C_ObserverRefresh(true));
}
void get_status(bool newmon)
{
- int mon = mc.monmap.pick_mon(newmon);
-
vector<string> vcmd(2);
vcmd[0] = prefix[which];
vcmd[1] = "stat";
MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
m->cmd.swap(vcmd);
- messenger->send_message(m, mc.monmap.get_inst(mon));
+ mc.send_mon_message(m);
event = new C_Refresh;
timer.add_event_after(.2, event);
}
}
+void send_command()
+{
+ MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
+ m->cmd = pending_cmd;
+ m->get_data() = pending_bl;
+
+ generic_dout(0) << "mon" << " <- " << pending_cmd << dendl;
+ mc.send_mon_message(m);
+}
+
+
class Admin : public Dispatcher {
bool ms_dispatch(Message *m) {
switch (m->get_type()) {
return true;
}
- void ms_handle_reset(const entity_addr_t& peer, entity_name_t last) {
- if (observe) {
- lock.Lock();
- send_observe_requests(true);
- lock.Unlock();
- }
- }
-} dispatcher;
-
-void send_command();
+ void ms_handle_failure(Message *m, const entity_addr_t& addr) {}
-struct C_Resend : public Context {
- void finish(int) {
- mc.monmap.pick_mon(true); // pick a new mon
- if (!reply)
+ bool ms_handle_reset(const entity_addr_t& peer) {
+ lock.Lock();
+ if (observe)
+ send_observe_requests();
+ if (pending_cmd.size())
send_command();
+ lock.Unlock();
+ return true;
}
-};
-void send_command()
-{
- MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
- m->cmd = pending_cmd;
- m->get_data() = pending_bl;
- int mon = mc.monmap.pick_mon();
- generic_dout(0) << "mon" << mon << " <- " << pending_cmd << dendl;
- messenger->send_message(m, mc.monmap.get_inst(mon));
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
- resend_event = new C_Resend;
- timer.add_event_after(15.0, resend_event);
-}
+} dispatcher;
int do_command(vector<string>& cmd, bufferlist& bl, string& rs, bufferlist& rbl)
{
SimpleMessenger rank;
rank.bind();
messenger = rank.register_entity(entity_name_t::ADMIN());
- messenger->set_dispatcher(&dispatcher);
+ messenger->add_dispatcher_head(&dispatcher);
rank.start();
rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0));
mc.set_messenger(messenger);
- dispatcher.link_dispatcher(&mc);
+ mc.init();
+
if (mc.get_monmap() < 0)
return -1;
}
if (observe) {
lock.Lock();
- send_observe_requests(true);
+ send_observe_requests();
lock.Unlock();
}
if (!watch && !observe) {
if (osdmap) { delete osdmap; osdmap = 0; }
if (mdsmap) { delete mdsmap; mdsmap = 0; }
- unlink_dispatcher(monclient);
-
if (messenger)
messenger->destroy();
}
Mutex::Locker lock(client_lock);
// ok!
- messenger->set_dispatcher(this);
- link_dispatcher(monclient);
+ messenger->add_dispatcher_head(this);
monclient->init();
// ===============================
-void Client::ms_handle_failure(Message *m, const entity_inst_t& inst)
+void Client::ms_handle_failure(Message *m, const entity_addr_t& addr)
{
- entity_name_t dest = inst.name;
- dout(0) << "ms_handle_failure " << *m << " to " << inst << dendl;
+ dout(0) << "ms_handle_failure " << *m << " to " << addr << dendl;
}
-void Client::ms_handle_reset(const entity_addr_t& addr, entity_name_t last)
+bool Client::ms_handle_reset(const entity_addr_t& addr)
{
dout(0) << "ms_handle_reset on " << addr << dendl;
+ return false;
}
-void Client::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last)
+void Client::ms_handle_remote_reset(const entity_addr_t& addr)
{
- dout(0) << "ms_handle_remote_reset on " << addr << ", last " << last << dendl;
+ dout(0) << "ms_handle_remote_reset on " << addr << dendl;
+#if 0
if (last.is_mds()) {
int mds = last.num();
dout(0) << "ms_handle_remote_reset on " << last << ", " << mds_sessions[mds].num_caps
}
else
objecter->ms_handle_remote_reset(addr, last);
-
+#endif
}
friend class SyntheticClient;
bool ms_dispatch(Message *m);
+ bool ms_handle_reset(const entity_addr_t& peer);
+ void ms_handle_failure(Message *m, const entity_addr_t& peer);
+ void ms_handle_remote_reset(const entity_addr_t& peer);
+
+
public:
Client(Messenger *m, MonClient *mc);
~Client();
int ll_fsync(Fh *fh, bool syncdataonly);
int ll_release(Fh *fh);
int ll_statfs(vinodeno_t vino, struct statvfs *stbuf);
-
- // failure
- void ms_handle_failure(Message*, const entity_inst_t& inst);
- void ms_handle_reset(const entity_addr_t& addr, entity_name_t last);
- void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last);
};
#endif
bool is_synchronous;
void _send_log();
+ bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
+
public:
// -- log --
MonClient mc;
if (mc.build_initial_monmap() < 0)
return -1;
- if (mc.get_monmap() < 0)
+ if (mc.get_monmap_privately() < 0)
return -1;
if (mkfs) {
}
return true;
}
+ bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
} dispatcher;
g_conf.daemonize = false; // not us!
rank.start();
messenger = rank.register_entity(entity_name_t::ADMIN());
- messenger->set_dispatcher(&dispatcher);
+ messenger->add_dispatcher_head(&dispatcher);
inodeno_t ino = MDS_INO_LOG_OFFSET + mds;
unsigned pg_pool = CEPH_METADATA_RULE;
bool _dispatch(Message *m);
bool ms_dispatch(Message *m);
+ bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
Objecter *objecter;
Mutex lock;
rank.start(1);
- monclient.link_dispatcher(this);
-
objecter = new Objecter(messenger, &monclient, &osdmap, lock);
if (!objecter)
return false;
int MDS::init()
{
- messenger->set_dispatcher(this);
+ messenger->add_dispatcher_tail(this);
+ messenger->add_dispatcher_head(&logclient);
// get monmap
monc->set_messenger(messenger);
- link_dispatcher(monc);
monc->init();
monc->get_monmap();
// schedule tick
reset_tick();
- // i'm ready!
- link_dispatcher(&logclient);
-
mds_lock.Unlock();
return 0;
}
objecter->shutdown();
// shut down messenger
- unlink_dispatcher(&logclient);
messenger->shutdown();
monc->shutdown();
-void MDS::ms_handle_failure(Message *m, const entity_inst_t& inst)
+void MDS::ms_handle_failure(Message *m, const entity_addr_t& addr)
{
mds_lock.Lock();
- dout(0) << "ms_handle_failure to " << inst << " on " << *m << dendl;
+ dout(0) << "ms_handle_failure to " << addr << " on " << *m << dendl;
mds_lock.Unlock();
}
-void MDS::ms_handle_reset(const entity_addr_t& addr, entity_name_t last)
+bool MDS::ms_handle_reset(const entity_addr_t& addr)
{
dout(0) << "ms_handle_reset on " << addr << dendl;
+ return false;
}
-void MDS::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last)
+void MDS::ms_handle_remote_reset(const entity_addr_t& addr)
{
dout(0) << "ms_handle_remote_reset on " << addr << dendl;
- objecter->ms_handle_remote_reset(addr, last);
+ objecter->ms_handle_remote_reset(addr);
}
int get_req_rate() { return req_rate; }
private:
- virtual bool ms_dispatch(Message *m);
+ bool ms_dispatch(Message *m);
+
public:
MDS(const char *n, Messenger *m, MonClient *mc);
~MDS();
// messages
bool _dispatch(Message *m);
- void ms_handle_failure(Message *m, const entity_inst_t& inst);
- void ms_handle_reset(const entity_addr_t& addr, entity_name_t last);
- void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last);
+ bool ms_handle_reset(const entity_addr_t& peer);
+ void ms_handle_failure(Message *m, const entity_addr_t& peer);
+ void ms_handle_remote_reset(const entity_addr_t& peer);
// special message types
void handle_mds_map(class MMDSMap *m);
return -ENOENT;
}
+
int MonClient::get_monmap()
{
dout(10) << "get_monmap" << dendl;
Mutex::Locker l(monc_lock);
+ want_monmap = true;
+ _reopen_session();
+
+ while (want_monmap)
+ map_cond.Wait(monc_lock);
+
+ dout(10) << "get_monmap done" << dendl;
+
+ return 0;
+}
+
+int MonClient::get_monmap_privately()
+{
+ dout(10) << "get_monmap_privately" << dendl;
+ Mutex::Locker l(monc_lock);
+
SimpleMessenger *rank = NULL;
bool temp_msgr = false;
if (!messenger) {
rank->bind();
rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
messenger = rank->register_entity(entity_name_t::CLIENT(-1));
- messenger->set_dispatcher(this);
+ messenger->add_dispatcher_head(this);
rank->start(true); // do not daemonize!
temp_msgr = true;
}
int attempt = 10;
int i = 0;
-
srand(getpid());
+
dout(10) << "have " << monmap.epoch << dendl;
-
+
while (monmap.epoch == 0) {
i = rand() % monmap.mon_inst.size();
dout(10) << "querying " << monmap.mon_inst[i] << dendl;
messenger->destroy();
messenger = 0;
}
-
+
if (monmap.epoch)
return 0;
return -1;
return false;
}
+void MonClient::_finish_hunting()
+{
+ if (hunting) {
+ dout(0) << "found new mon" << cur_mon << dendl;
+ hunting = false;
+ }
+}
+
void MonClient::handle_monmap(MMonMap *m)
{
dout(10) << "handle_monmap " << *m << dendl;
monc_lock.Lock();
+ _finish_hunting();
+
bufferlist::iterator p = m->monmapbl.begin();
::decode(monmap, p);
+
map_cond.Signal();
+ want_monmap = false;
+
monc_lock.Unlock();
delete m;
}
void MonClient::init()
{
dout(10) << "init" << dendl;
- messenger->set_dispatcher(this);
+ messenger->add_dispatcher_head(this);
Mutex::Locker l(monc_lock);
timer.add_event_after(10.0, new C_Tick(this));
{
dout(10) << "handle_mount_ack " << *m << dendl;
- hunting = false;
+ _finish_hunting();
// monmap
bufferlist::iterator p = m->monmap_bl.begin();
void MonClient::_send_mon_message(Message *m)
{
- int mon = monmap.pick_mon();
- messenger->send_message(m, monmap.mon_inst[mon]);
+ messenger->send_message(m, monmap.mon_inst[cur_mon]);
}
void MonClient::_pick_new_mon()
{
- int oldmon = monmap.pick_mon();
- messenger->mark_down(monmap.get_inst(oldmon).addr);
- monmap.pick_mon(true);
+ if (cur_mon >= 0)
+ messenger->mark_down(monmap.get_inst(cur_mon).addr);
+ cur_mon = monmap.pick_mon(true);
+ dout(10) << "_pick_new_mon picked mon" << cur_mon << dendl;
}
-void MonClient::ms_handle_reset(const entity_addr_t& peer)
+void MonClient::_reopen_session()
{
- dout(10) << "ms_handle_reset " << peer << dendl;
- if (!hunting) {
- dout(0) << "staring hunt for new mon" << dendl;
- hunting = true;
- _pick_new_mon();
- if (mounting)
- _send_mount();
+ dout(10) << "_reopen_session" << dendl;
+ _pick_new_mon();
+ if (mounting)
+ _send_mount();
+ if (!sub_have.empty())
_renew_subs();
+ if (!mounting && sub_have.empty()) {
+ _send_mon_message(new MMonGetMap);
}
}
+bool MonClient::ms_handle_reset(const entity_addr_t& peer)
+{
+ dout(10) << "ms_handle_reset " << peer << dendl;
+ if (hunting)
+ return true;
+
+ dout(0) << "starting hunt for new mon" << dendl;
+ hunting = true;
+ _reopen_session();
+ return false;
+}
+
void MonClient::tick()
{
dout(10) << "tick" << dendl;
if (hunting) {
dout(0) << "continuing hunt" << dendl;
- // try new monitor
- _pick_new_mon();
- if (mounting)
- _send_mount();
- _renew_subs();
+ _reopen_session();
} else {
// just renew as needed
utime_t now = g_clock.now();
}
timer.add_event_after(10.0, new C_Tick(this));
- dout(10) << "tick done" << dendl;
-
}
void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
{
- hunting = false;
+ _finish_hunting();
if (sub_renew_sent != utime_t()) {
sub_renew_after = sub_renew_sent;
private:
Messenger *messenger;
+ int cur_mon;
+
entity_addr_t my_addr;
Mutex monc_lock;
SafeTimer timer;
bool ms_dispatch(Message *m);
- void handle_monmap(MMonMap *m);
+ bool ms_handle_reset(const entity_addr_t& peer);
- void ms_handle_reset(const entity_addr_t& peer);
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
+ void handle_monmap(MMonMap *m);
// monitor session
};
void tick();
+ // monclient
+ bool want_monmap;
+
// mount
private:
client_t clientid;
Cond mount_cond, map_cond;
utime_t mount_started;
+ void _finish_hunting();
+ void _reopen_session();
void _pick_new_mon();
void _send_mon_message(Message *m);
void _send_mount();
}
public:
- MonClient() : messenger(NULL),
+ MonClient() : messenger(NULL), cur_mon(-1),
monc_lock("MonClient::monc_lock"),
timer(monc_lock),
hunting(false),
int build_initial_monmap();
int get_monmap();
+ int get_monmap_privately();
void send_mon_message(Message *m) {
Mutex::Locker l(monc_lock);
logclient.set_synchronous(true);
// i'm ready!
- messenger->set_dispatcher(this);
- link_dispatcher(&logclient);
+ messenger->add_dispatcher_tail(this);
+ messenger->add_dispatcher_head(&logclient);
// start ticker
reset_tick();
timer.cancel_all();
timer.join();
- unlink_dispatcher(&logclient);
-
// die.
messenger->shutdown();
}
private:
bool ms_dispatch(Message *m);
+ bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
public:
Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map);
class Messenger;
class Dispatcher {
- Dispatcher *next;
-
- // how i receive messages
- virtual bool ms_dispatch(Message *m) = 0;
- bool _ms_deliver_dispatch(Message *m) {
- bool ret = false;
- if (next)
- ret = next->_ms_deliver_dispatch(m);
- if (!ret)
- ret = ms_dispatch(m);
- return ret;
- }
-public:
- void ms_deliver_dispatch(Message *m) {
- if (!_ms_deliver_dispatch(m)) {
- generic_dout(0) << "unhandled message " << m << " " << *m
- << " from " << m->get_orig_source_inst()
- << dendl;
- assert(0);
- }
- }
-
- void ms_deliver_handle_reset(const entity_addr_t& peer) {
- ms_handle_reset(peer);
- if (next)
- next->ms_handle_reset(peer);
- }
- void ms_deliver_handle_remote_reset(const entity_addr_t& peer) {
- ms_handle_remote_reset(peer);
- if (next)
- next->ms_handle_remote_reset(peer);
- }
- void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) {
- ms_handle_failure(m, peer);
- if (next)
- next->ms_handle_failure(m, peer);
- }
-
-
public:
virtual ~Dispatcher() { }
- Dispatcher() : next(NULL) { }
+ Dispatcher() { }
- virtual void link_dispatcher(Dispatcher *disp) {
- if (!next) {
- next = disp;
- } else {
- next->link_dispatcher(disp);
- }
- }
- virtual void unlink_dispatcher(Dispatcher *disp) {
- assert(next);
- if (next == disp)
- next = next->next;
- else
- next->unlink_dispatcher(disp);
- }
+ // how i receive messages
+ virtual bool ms_dispatch(Message *m) = 0;
// how i deal with transmission failures.
- virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) { }
+ virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) = 0;
/*
* on any connection reset.
* this indicates that the ordered+reliable delivery semantics have
* been violated. messages may have been lost.
*/
- virtual void ms_handle_reset(const entity_addr_t& peer) { }
+ virtual bool ms_handle_reset(const entity_addr_t& peer) = 0;
// on deliberate reset of connection by remote
// implies incoming messages dropped; possibly/probably some of our previous outgoing too.
- virtual void ms_handle_remote_reset(const entity_addr_t& peer) { }
+ virtual void ms_handle_remote_reset(const entity_addr_t& peer) = 0;
};
#endif
class Messenger {
private:
- Dispatcher *dispatcher;
+ list<Dispatcher*> dispatchers;
protected:
entity_name_t _my_name;
atomic_t nref;
public:
- Messenger(entity_name_t w) : dispatcher(0),
- default_send_priority(CEPH_MSG_PRIO_DEFAULT),
+ Messenger(entity_name_t w) : default_send_priority(CEPH_MSG_PRIO_DEFAULT),
nref(1) {
_my_name = w;
}
virtual int get_dispatch_queue_len() { return 0; };
// setup
- void set_dispatcher(Dispatcher *d) {
- if (!dispatcher) {
- dispatcher = d;
- ready();
- }
+ void add_dispatcher_head(Dispatcher *d) {
+ bool first = dispatchers.empty();
+ dispatchers.push_front(d);
+ if (first)
+ ready();
}
- Dispatcher *get_dispatcher() { return dispatcher; }
+ void add_dispatcher_tail(Dispatcher *d) {
+ bool first = dispatchers.empty();
+ dispatchers.push_back(d);
+ if (first)
+ ready();
+ }
+
virtual void ready() { }
- bool is_ready() { return dispatcher != 0; }
+ bool is_ready() { return !dispatchers.empty(); }
// dispatch incoming messages
- virtual void dispatch(Message *m) {
- assert(dispatcher);
- dispatcher->ms_deliver_dispatch(m);
+ void ms_deliver_dispatch(Message *m) {
+ for (list<Dispatcher*>::iterator p = dispatchers.begin();
+ p != dispatchers.end();
+ p++)
+ if ((*p)->ms_dispatch(m))
+ return;
+ generic_dout(0) << "unhandled message " << m << " " << *m
+ << " from " << m->get_orig_source_inst()
+ << dendl;
+ assert(0);
+ }
+ void ms_deliver_handle_reset(const entity_addr_t& peer) {
+ for (list<Dispatcher*>::iterator p = dispatchers.begin();
+ p != dispatchers.end();
+ p++)
+ if ((*p)->ms_handle_reset(peer))
+ return;
+ }
+ void ms_deliver_handle_remote_reset(const entity_addr_t& peer) {
+ for (list<Dispatcher*>::iterator p = dispatchers.begin();
+ p != dispatchers.end();
+ p++)
+ (*p)->ms_handle_remote_reset(peer);
+ }
+ void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) {
+ for (list<Dispatcher*>::iterator p = dispatchers.begin();
+ p != dispatchers.end();
+ p++)
+ (*p)->ms_handle_failure(m, peer);
}
// shutdown
entity_addr_t a = remote_reset_q.front();
remote_reset_q.pop_front();
lock.Unlock();
- get_dispatcher()->ms_deliver_handle_remote_reset(a);
+ ms_deliver_handle_remote_reset(a);
} else if ((long)m == BAD_RESET) {
lock.Lock();
entity_addr_t a = reset_q.front();
reset_q.pop_front();
lock.Unlock();
- get_dispatcher()->ms_deliver_handle_reset(a);
+ ms_deliver_handle_reset(a);
} else if ((long)m == BAD_FAILED) {
lock.Lock();
m = failed_q.front().first;
entity_addr_t a = failed_q.front().second;
failed_q.pop_front();
lock.Unlock();
- get_dispatcher()->ms_deliver_handle_failure(m, a);
+ ms_deliver_handle_failure(m, a);
m->put();
} else {
dout(1) << "<== " << m->get_source_inst()
<< " " << m->get_footer().data_crc << ")"
<< " " << m
<< dendl;
- dispatch(m);
+ ms_deliver_dispatch(m);
dout(20) << "done calling dispatch on " << m << dendl;
}
}
report_failures();
for (unsigned i=0; i<rank->local.size(); i++)
- if (rank->local[i] && rank->local[i]->get_dispatcher())
+ if (rank->local[i])
rank->local[i]->queue_reset(peer_addr);
// unregister
dout(10) << "was_session_reset" << dendl;
report_failures();
for (unsigned i=0; i<rank->local.size(); i++)
- if (rank->local[i] && rank->local[i]->get_dispatcher())
+ if (rank->local[i])
rank->local[i]->queue_remote_reset(peer_addr);
out_seq = 0;
// i'm ready!
- messenger->set_dispatcher(this);
- link_dispatcher(&logclient);
- heartbeat_messenger->set_dispatcher(&heartbeat_dispatcher);
+ messenger->add_dispatcher_head(this);
+ messenger->add_dispatcher_head(&logclient);
+
+ heartbeat_messenger->add_dispatcher_head(&heartbeat_dispatcher);
monc->init();
}
pg_map.clear();
- unlink_dispatcher(&logclient);
messenger->shutdown();
if (heartbeat_messenger)
heartbeat_messenger->shutdown();
}
-void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
-{
- entity_name_t dest = inst.name;
-
- dout(1) << "ms_handle_failure " << inst << " on " << *m << dendl;
- if (g_conf.ms_die_on_failure)
- assert(0);
-}
-
-
-
-
void OSD::handle_scrub(MOSDScrub *m)
{
dout(10) << "handle_scrub " << *m << dendl;
bool ms_dispatch(Message *m) {
return osd->heartbeat_dispatch(m);
};
+ bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
public:
OSD *osd;
HeartbeatDispatcher(OSD *o) : osd(o) {}
} remove_wq;
private:
- virtual bool ms_dispatch(Message *m);
+ bool ms_dispatch(Message *m);
+ bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
public:
OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev = 0, const char *jdev = 0);
~OSD();
int init();
int shutdown();
- // messages
- virtual void ms_handle_failure(Message *m, const entity_inst_t& inst);
-
void reply_op_error(MOSDOp *op, int r);
void handle_scrub(class MOSDScrub *m);
bool is_out(int osd) { return !exists(osd) || get_weight(osd) == CEPH_OSD_OUT; }
bool is_in(int osd) { return exists(osd) && !is_out(osd); }
+ bool have_addr(const entity_addr_t& addr) const {
+ for (vector<entity_addr_t>::const_iterator p = osd_addr.begin();
+ p != osd_addr.end();
+ p++)
+ if (*p == addr)
+ return true;
+ return false;
+ }
bool have_inst(int osd) {
return exists(osd) && is_up(osd);
}
}
-void Objecter::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest)
+void Objecter::ms_handle_remote_reset(const entity_addr_t& addr)
{
- entity_inst_t inst;
- inst.name = dest;
- inst.addr = addr;
-
- if (dest.is_osd()) {
- if (!osdmap->have_inst(dest.num()) ||
- (osdmap->get_inst(dest.num()) != inst)) {
- dout(0) << "ms_handle_remote_reset " << dest << " inst " << inst
- << ", ignoring, already have newer osdmap" << dendl;
- } else {
- // kick requests
- set<pg_t> changed_pgs;
- dout(0) << "ms_handle_remote_reset " << dest << dendl;
- scan_pgs_for(changed_pgs, dest.num());
- if (!changed_pgs.empty()) {
- dout(0) << "ms_handle_remote_reset " << dest << " kicking " << changed_pgs << dendl;
- kick_requests(changed_pgs);
- }
- }
- }
+ if (osdmap->have_addr(addr))
+ maybe_request_map();
}
}
}
- void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest);
+ void ms_handle_remote_reset(const entity_addr_t& addr);
};
return true;
}
- void ms_handle_failure(Message *m, const entity_inst_t& inst) {
- }
+ bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(const entity_addr_t& peer) {}
} dispatcher;
// start monitor
messenger = rank.register_entity(entity_name_t::MON(whoami));
messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
- messenger->set_dispatcher(&dispatcher);
+ messenger->add_dispatcher_head(&dispatcher);
rank.start();