From: Sage Weil Date: Tue, 15 Sep 2009 22:02:17 +0000 (-0700) Subject: msgr: change dispatcher interfaces; some monclient cleanups X-Git-Tag: v0.15~63 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=decde380575059a7724d0ce5d013cff500201cd3;p=ceph.git msgr: change dispatcher interfaces; some monclient cleanups --- diff --git a/src/ceph.cc b/src/ceph.cc index f336a2a16a0c..f66f8871b2dc 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -204,34 +204,28 @@ void handle_notify(MMonObserveNotify *notify) map_ver[notify->machine_id] = notify->ver; } -static void send_observe_requests(bool); -static bool is_timeout = false; +static void send_observe_requests(); class C_ObserverRefresh : public Context { public: bool newmon; C_ObserverRefresh(bool n) : newmon(n) {} void finish(int r) { - is_timeout = false; - send_observe_requests(newmon); + send_observe_requests(); } }; -static void send_observe_requests(bool newmon) +static void send_observe_requests() { - dout(1) << "send_observe_requests " << newmon << dendl; + dout(1) << "send_observe_requests " << dendl; - if (is_timeout) - return; - - int mon = mc.monmap.pick_mon(newmon); bool sent = false; for (int i=0; isend_message(m, mc.monmap.get_inst(mon)); + dout(1) << "mon" << " <- observe " << get_paxos_name(i) << dendl; + mc.send_mon_message(m); sent = true; } @@ -243,7 +237,7 @@ static void send_observe_requests(bool newmon) dout(1) << " refresh after " << seconds << " with same mon" << dendl; timer.add_event_after(seconds, new C_ObserverRefresh(false)); } else { - is_timeout = true; + //is_timeout = true; dout(1) << " refresh after " << retry_seconds << " with new mon" << dendl; timer.add_event_after(retry_seconds, new C_ObserverRefresh(true)); } @@ -274,15 +268,13 @@ Context *event = 0; void get_status(bool newmon) { - int mon = mc.monmap.pick_mon(newmon); - vector vcmd(2); vcmd[0] = prefix[which]; vcmd[1] = "stat"; MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version); m->cmd.swap(vcmd); - messenger->send_message(m, mc.monmap.get_inst(mon)); + mc.send_mon_message(m); event = new C_Refresh; timer.add_event_after(.2, event); @@ -335,6 +327,17 @@ void handle_ack(MMonCommandAck *ack) } } +void send_command() +{ + MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version); + m->cmd = pending_cmd; + m->get_data() = pending_bl; + + generic_dout(0) << "mon" << " <- " << pending_cmd << dendl; + mc.send_mon_message(m); +} + + class Admin : public Dispatcher { bool ms_dispatch(Message *m) { switch (m->get_type()) { @@ -356,37 +359,21 @@ class Admin : public Dispatcher { return true; } - void ms_handle_reset(const entity_addr_t& peer, entity_name_t last) { - if (observe) { - lock.Lock(); - send_observe_requests(true); - lock.Unlock(); - } - } -} dispatcher; - -void send_command(); + void ms_handle_failure(Message *m, const entity_addr_t& addr) {} -struct C_Resend : public Context { - void finish(int) { - mc.monmap.pick_mon(true); // pick a new mon - if (!reply) + bool ms_handle_reset(const entity_addr_t& peer) { + lock.Lock(); + if (observe) + send_observe_requests(); + if (pending_cmd.size()) send_command(); + lock.Unlock(); + return true; } -}; -void send_command() -{ - MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version); - m->cmd = pending_cmd; - m->get_data() = pending_bl; - int mon = mc.monmap.pick_mon(); - generic_dout(0) << "mon" << mon << " <- " << pending_cmd << dendl; - messenger->send_message(m, mc.monmap.get_inst(mon)); + void ms_handle_remote_reset(const entity_addr_t& peer) {} - resend_event = new C_Resend; - timer.add_event_after(15.0, resend_event); -} +} dispatcher; int do_command(vector& cmd, bufferlist& bl, string& rs, bufferlist& rbl) { @@ -619,13 +606,14 @@ int main(int argc, const char **argv, const char *envp[]) SimpleMessenger rank; rank.bind(); messenger = rank.register_entity(entity_name_t::ADMIN()); - messenger->set_dispatcher(&dispatcher); + messenger->add_dispatcher_head(&dispatcher); rank.start(); rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0)); mc.set_messenger(messenger); - dispatcher.link_dispatcher(&mc); + mc.init(); + if (mc.get_monmap() < 0) return -1; @@ -636,7 +624,7 @@ int main(int argc, const char **argv, const char *envp[]) } if (observe) { lock.Lock(); - send_observe_requests(true); + send_observe_requests(); lock.Unlock(); } if (!watch && !observe) { diff --git a/src/client/Client.cc b/src/client/Client.cc index bdde2cbefe65..a6205e36fb6d 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -169,8 +169,6 @@ Client::~Client() if (osdmap) { delete osdmap; osdmap = 0; } if (mdsmap) { delete mdsmap; mdsmap = 0; } - unlink_dispatcher(monclient); - if (messenger) messenger->destroy(); } @@ -256,8 +254,7 @@ void Client::init() Mutex::Locker lock(client_lock); // ok! - messenger->set_dispatcher(this); - link_dispatcher(monclient); + messenger->add_dispatcher_head(this); monclient->init(); @@ -5570,21 +5567,22 @@ int Client::enumerate_layout(int fd, vector& result, // =============================== -void Client::ms_handle_failure(Message *m, const entity_inst_t& inst) +void Client::ms_handle_failure(Message *m, const entity_addr_t& addr) { - entity_name_t dest = inst.name; - dout(0) << "ms_handle_failure " << *m << " to " << inst << dendl; + dout(0) << "ms_handle_failure " << *m << " to " << addr << dendl; } -void Client::ms_handle_reset(const entity_addr_t& addr, entity_name_t last) +bool Client::ms_handle_reset(const entity_addr_t& addr) { dout(0) << "ms_handle_reset on " << addr << dendl; + return false; } -void Client::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last) +void Client::ms_handle_remote_reset(const entity_addr_t& addr) { - dout(0) << "ms_handle_remote_reset on " << addr << ", last " << last << dendl; + dout(0) << "ms_handle_remote_reset on " << addr << dendl; +#if 0 if (last.is_mds()) { int mds = last.num(); dout(0) << "ms_handle_remote_reset on " << last << ", " << mds_sessions[mds].num_caps @@ -5610,5 +5608,5 @@ void Client::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t las } else objecter->ms_handle_remote_reset(addr, last); - +#endif } diff --git a/src/client/Client.h b/src/client/Client.h index 9119c489531a..17cb7aceeca4 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -976,6 +976,11 @@ protected: friend class SyntheticClient; bool ms_dispatch(Message *m); + bool ms_handle_reset(const entity_addr_t& peer); + void ms_handle_failure(Message *m, const entity_addr_t& peer); + void ms_handle_remote_reset(const entity_addr_t& peer); + + public: Client(Messenger *m, MonClient *mc); ~Client(); @@ -1225,11 +1230,6 @@ public: int ll_fsync(Fh *fh, bool syncdataonly); int ll_release(Fh *fh); int ll_statfs(vinodeno_t vino, struct statvfs *stbuf); - - // failure - void ms_handle_failure(Message*, const entity_inst_t& inst); - void ms_handle_reset(const entity_addr_t& addr, entity_name_t last); - void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last); }; #endif diff --git a/src/common/LogClient.h b/src/common/LogClient.h index 4ecf66d82a1c..7eb5402721a8 100644 --- a/src/common/LogClient.h +++ b/src/common/LogClient.h @@ -36,6 +36,11 @@ class LogClient : public Dispatcher { bool is_synchronous; void _send_log(); + bool ms_handle_reset(const entity_addr_t& peer) { return false; } + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} + + public: // -- log -- diff --git a/src/cosd.cc b/src/cosd.cc index f67c54ca024e..7f9aa23b6ef9 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -83,7 +83,7 @@ int main(int argc, const char **argv) MonClient mc; if (mc.build_initial_monmap() < 0) return -1; - if (mc.get_monmap() < 0) + if (mc.get_monmap_privately() < 0) return -1; if (mkfs) { diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc index c6d9895bd1c3..2c0494cf90b4 100644 --- a/src/dumpjournal.cc +++ b/src/dumpjournal.cc @@ -62,6 +62,10 @@ class Dumper : public Dispatcher { } return true; } + bool ms_handle_reset(const entity_addr_t& peer) { return false; } + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} + } dispatcher; @@ -92,7 +96,7 @@ int main(int argc, const char **argv, const char *envp[]) g_conf.daemonize = false; // not us! rank.start(); messenger = rank.register_entity(entity_name_t::ADMIN()); - messenger->set_dispatcher(&dispatcher); + messenger->add_dispatcher_head(&dispatcher); inodeno_t ino = MDS_INO_LOG_OFFSET + mds; unsigned pg_pool = CEPH_METADATA_RULE; diff --git a/src/librados.cc b/src/librados.cc index 56334ab1673a..f4497e2ef9e7 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -59,6 +59,10 @@ class RadosClient : public Dispatcher bool _dispatch(Message *m); bool ms_dispatch(Message *m); + bool ms_handle_reset(const entity_addr_t& peer) { return false; } + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} + Objecter *objecter; Mutex lock; @@ -294,8 +298,6 @@ bool RadosClient::init() rank.start(1); - monclient.link_dispatcher(this); - objecter = new Objecter(messenger, &monclient, &osdmap, lock); if (!objecter) return false; diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 97953a346693..20d9d3ce45f4 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -373,11 +373,11 @@ void MDS::send_message_client(Message *m, entity_inst_t clientinst) int MDS::init() { - messenger->set_dispatcher(this); + messenger->add_dispatcher_tail(this); + messenger->add_dispatcher_head(&logclient); // get monmap monc->set_messenger(messenger); - link_dispatcher(monc); monc->init(); monc->get_monmap(); @@ -397,9 +397,6 @@ int MDS::init() // schedule tick reset_tick(); - // i'm ready! - link_dispatcher(&logclient); - mds_lock.Unlock(); return 0; } @@ -1143,7 +1140,6 @@ void MDS::suicide() objecter->shutdown(); // shut down messenger - unlink_dispatcher(&logclient); messenger->shutdown(); monc->shutdown(); @@ -1399,21 +1395,22 @@ bool MDS::_dispatch(Message *m) -void MDS::ms_handle_failure(Message *m, const entity_inst_t& inst) +void MDS::ms_handle_failure(Message *m, const entity_addr_t& addr) { mds_lock.Lock(); - dout(0) << "ms_handle_failure to " << inst << " on " << *m << dendl; + dout(0) << "ms_handle_failure to " << addr << " on " << *m << dendl; mds_lock.Unlock(); } -void MDS::ms_handle_reset(const entity_addr_t& addr, entity_name_t last) +bool MDS::ms_handle_reset(const entity_addr_t& addr) { dout(0) << "ms_handle_reset on " << addr << dendl; + return false; } -void MDS::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last) +void MDS::ms_handle_remote_reset(const entity_addr_t& addr) { dout(0) << "ms_handle_remote_reset on " << addr << dendl; - objecter->ms_handle_remote_reset(addr, last); + objecter->ms_handle_remote_reset(addr); } diff --git a/src/mds/MDS.h b/src/mds/MDS.h index eefa9df439b3..a0be1128da9b 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -303,7 +303,8 @@ class MDS : public Dispatcher { int get_req_rate() { return req_rate; } private: - virtual bool ms_dispatch(Message *m); + bool ms_dispatch(Message *m); + public: MDS(const char *n, Messenger *m, MonClient *mc); ~MDS(); @@ -360,9 +361,9 @@ class MDS : public Dispatcher { // messages bool _dispatch(Message *m); - void ms_handle_failure(Message *m, const entity_inst_t& inst); - void ms_handle_reset(const entity_addr_t& addr, entity_name_t last); - void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last); + bool ms_handle_reset(const entity_addr_t& peer); + void ms_handle_failure(Message *m, const entity_addr_t& peer); + void ms_handle_remote_reset(const entity_addr_t& peer); // special message types void handle_mds_map(class MMDSMap *m); diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 7f35bf3d3613..064cf2c1c698 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -102,11 +102,28 @@ int MonClient::build_initial_monmap() return -ENOENT; } + int MonClient::get_monmap() { dout(10) << "get_monmap" << dendl; Mutex::Locker l(monc_lock); + want_monmap = true; + _reopen_session(); + + while (want_monmap) + map_cond.Wait(monc_lock); + + dout(10) << "get_monmap done" << dendl; + + return 0; +} + +int MonClient::get_monmap_privately() +{ + dout(10) << "get_monmap_privately" << dendl; + Mutex::Locker l(monc_lock); + SimpleMessenger *rank = NULL; bool temp_msgr = false; if (!messenger) { @@ -114,17 +131,17 @@ int MonClient::get_monmap() rank->bind(); rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); messenger = rank->register_entity(entity_name_t::CLIENT(-1)); - messenger->set_dispatcher(this); + messenger->add_dispatcher_head(this); rank->start(true); // do not daemonize! temp_msgr = true; } int attempt = 10; int i = 0; - srand(getpid()); + dout(10) << "have " << monmap.epoch << dendl; - + while (monmap.epoch == 0) { i = rand() % monmap.mon_inst.size(); dout(10) << "querying " << monmap.mon_inst[i] << dendl; @@ -143,7 +160,7 @@ int MonClient::get_monmap() messenger->destroy(); messenger = 0; } - + if (monmap.epoch) return 0; return -1; @@ -174,14 +191,27 @@ bool MonClient::ms_dispatch(Message *m) return false; } +void MonClient::_finish_hunting() +{ + if (hunting) { + dout(0) << "found new mon" << cur_mon << dendl; + hunting = false; + } +} + void MonClient::handle_monmap(MMonMap *m) { dout(10) << "handle_monmap " << *m << dendl; monc_lock.Lock(); + _finish_hunting(); + bufferlist::iterator p = m->monmapbl.begin(); ::decode(monmap, p); + map_cond.Signal(); + want_monmap = false; + monc_lock.Unlock(); delete m; } @@ -201,7 +231,7 @@ void MonClient::_send_mount() void MonClient::init() { dout(10) << "init" << dendl; - messenger->set_dispatcher(this); + messenger->add_dispatcher_head(this); Mutex::Locker l(monc_lock); timer.add_event_after(10.0, new C_Tick(this)); @@ -240,7 +270,7 @@ void MonClient::handle_mount_ack(MClientMountAck* m) { dout(10) << "handle_mount_ack " << *m << dendl; - hunting = false; + _finish_hunting(); // monmap bufferlist::iterator p = m->monmap_bl.begin(); @@ -259,42 +289,50 @@ void MonClient::handle_mount_ack(MClientMountAck* m) void MonClient::_send_mon_message(Message *m) { - int mon = monmap.pick_mon(); - messenger->send_message(m, monmap.mon_inst[mon]); + messenger->send_message(m, monmap.mon_inst[cur_mon]); } void MonClient::_pick_new_mon() { - int oldmon = monmap.pick_mon(); - messenger->mark_down(monmap.get_inst(oldmon).addr); - monmap.pick_mon(true); + if (cur_mon >= 0) + messenger->mark_down(monmap.get_inst(cur_mon).addr); + cur_mon = monmap.pick_mon(true); + dout(10) << "_pick_new_mon picked mon" << cur_mon << dendl; } -void MonClient::ms_handle_reset(const entity_addr_t& peer) +void MonClient::_reopen_session() { - dout(10) << "ms_handle_reset " << peer << dendl; - if (!hunting) { - dout(0) << "staring hunt for new mon" << dendl; - hunting = true; - _pick_new_mon(); - if (mounting) - _send_mount(); + dout(10) << "_reopen_session" << dendl; + _pick_new_mon(); + if (mounting) + _send_mount(); + if (!sub_have.empty()) _renew_subs(); + if (!mounting && sub_have.empty()) { + _send_mon_message(new MMonGetMap); } } +bool MonClient::ms_handle_reset(const entity_addr_t& peer) +{ + dout(10) << "ms_handle_reset " << peer << dendl; + if (hunting) + return true; + + dout(0) << "starting hunt for new mon" << dendl; + hunting = true; + _reopen_session(); + return false; +} + void MonClient::tick() { dout(10) << "tick" << dendl; if (hunting) { dout(0) << "continuing hunt" << dendl; - // try new monitor - _pick_new_mon(); - if (mounting) - _send_mount(); - _renew_subs(); + _reopen_session(); } else { // just renew as needed utime_t now = g_clock.now(); @@ -306,8 +344,6 @@ void MonClient::tick() } timer.add_event_after(10.0, new C_Tick(this)); - dout(10) << "tick done" << dendl; - } @@ -332,7 +368,7 @@ void MonClient::_renew_subs() void MonClient::handle_subscribe_ack(MMonSubscribeAck *m) { - hunting = false; + _finish_hunting(); if (sub_renew_sent != utime_t()) { sub_renew_after = sub_renew_sent; diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 6fa102cb74a1..74edf8874d85 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -36,15 +36,20 @@ public: private: Messenger *messenger; + int cur_mon; + entity_addr_t my_addr; Mutex monc_lock; SafeTimer timer; bool ms_dispatch(Message *m); - void handle_monmap(MMonMap *m); + bool ms_handle_reset(const entity_addr_t& peer); - void ms_handle_reset(const entity_addr_t& peer); + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} + + void handle_monmap(MMonMap *m); // monitor session @@ -59,6 +64,9 @@ private: }; void tick(); + // monclient + bool want_monmap; + // mount private: client_t clientid; @@ -67,6 +75,8 @@ private: Cond mount_cond, map_cond; utime_t mount_started; + void _finish_hunting(); + void _reopen_session(); void _pick_new_mon(); void _send_mon_message(Message *m); void _send_mount(); @@ -113,7 +123,7 @@ public: } public: - MonClient() : messenger(NULL), + MonClient() : messenger(NULL), cur_mon(-1), monc_lock("MonClient::monc_lock"), timer(monc_lock), hunting(false), @@ -127,6 +137,7 @@ public: int build_initial_monmap(); int get_monmap(); + int get_monmap_privately(); void send_mon_message(Message *m) { Mutex::Locker l(monc_lock); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 9bedd6949af0..37c0ebd32a2f 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -125,8 +125,8 @@ void Monitor::init() logclient.set_synchronous(true); // i'm ready! - messenger->set_dispatcher(this); - link_dispatcher(&logclient); + messenger->add_dispatcher_tail(this); + messenger->add_dispatcher_head(&logclient); // start ticker reset_tick(); @@ -159,8 +159,6 @@ void Monitor::shutdown() timer.cancel_all(); timer.join(); - unlink_dispatcher(&logclient); - // die. messenger->shutdown(); } diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 09d26ffd0b60..83281b54ccc5 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -167,6 +167,9 @@ public: private: bool ms_dispatch(Message *m); + bool ms_handle_reset(const entity_addr_t& peer) { return false; } + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} public: Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map); diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index da7298a94bf8..a999a856f544 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -22,77 +22,26 @@ class Messenger; class Dispatcher { - Dispatcher *next; - - // how i receive messages - virtual bool ms_dispatch(Message *m) = 0; - bool _ms_deliver_dispatch(Message *m) { - bool ret = false; - if (next) - ret = next->_ms_deliver_dispatch(m); - if (!ret) - ret = ms_dispatch(m); - return ret; - } -public: - void ms_deliver_dispatch(Message *m) { - if (!_ms_deliver_dispatch(m)) { - generic_dout(0) << "unhandled message " << m << " " << *m - << " from " << m->get_orig_source_inst() - << dendl; - assert(0); - } - } - - void ms_deliver_handle_reset(const entity_addr_t& peer) { - ms_handle_reset(peer); - if (next) - next->ms_handle_reset(peer); - } - void ms_deliver_handle_remote_reset(const entity_addr_t& peer) { - ms_handle_remote_reset(peer); - if (next) - next->ms_handle_remote_reset(peer); - } - void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) { - ms_handle_failure(m, peer); - if (next) - next->ms_handle_failure(m, peer); - } - - public: virtual ~Dispatcher() { } - Dispatcher() : next(NULL) { } + Dispatcher() { } - virtual void link_dispatcher(Dispatcher *disp) { - if (!next) { - next = disp; - } else { - next->link_dispatcher(disp); - } - } - virtual void unlink_dispatcher(Dispatcher *disp) { - assert(next); - if (next == disp) - next = next->next; - else - next->unlink_dispatcher(disp); - } + // how i receive messages + virtual bool ms_dispatch(Message *m) = 0; // how i deal with transmission failures. - virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) { } + virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) = 0; /* * on any connection reset. * this indicates that the ordered+reliable delivery semantics have * been violated. messages may have been lost. */ - virtual void ms_handle_reset(const entity_addr_t& peer) { } + virtual bool ms_handle_reset(const entity_addr_t& peer) = 0; // on deliberate reset of connection by remote // implies incoming messages dropped; possibly/probably some of our previous outgoing too. - virtual void ms_handle_remote_reset(const entity_addr_t& peer) { } + virtual void ms_handle_remote_reset(const entity_addr_t& peer) = 0; }; #endif diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 14136e8abc49..c41c2d0663c8 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -34,7 +34,7 @@ class Timer; class Messenger { private: - Dispatcher *dispatcher; + list dispatchers; protected: entity_name_t _my_name; @@ -43,8 +43,7 @@ protected: atomic_t nref; public: - Messenger(entity_name_t w) : dispatcher(0), - default_send_priority(CEPH_MSG_PRIO_DEFAULT), + Messenger(entity_name_t w) : default_send_priority(CEPH_MSG_PRIO_DEFAULT), nref(1) { _my_name = w; } @@ -77,20 +76,52 @@ protected: virtual int get_dispatch_queue_len() { return 0; }; // setup - void set_dispatcher(Dispatcher *d) { - if (!dispatcher) { - dispatcher = d; - ready(); - } + void add_dispatcher_head(Dispatcher *d) { + bool first = dispatchers.empty(); + dispatchers.push_front(d); + if (first) + ready(); } - Dispatcher *get_dispatcher() { return dispatcher; } + void add_dispatcher_tail(Dispatcher *d) { + bool first = dispatchers.empty(); + dispatchers.push_back(d); + if (first) + ready(); + } + virtual void ready() { } - bool is_ready() { return dispatcher != 0; } + bool is_ready() { return !dispatchers.empty(); } // dispatch incoming messages - virtual void dispatch(Message *m) { - assert(dispatcher); - dispatcher->ms_deliver_dispatch(m); + void ms_deliver_dispatch(Message *m) { + for (list::iterator p = dispatchers.begin(); + p != dispatchers.end(); + p++) + if ((*p)->ms_dispatch(m)) + return; + generic_dout(0) << "unhandled message " << m << " " << *m + << " from " << m->get_orig_source_inst() + << dendl; + assert(0); + } + void ms_deliver_handle_reset(const entity_addr_t& peer) { + for (list::iterator p = dispatchers.begin(); + p != dispatchers.end(); + p++) + if ((*p)->ms_handle_reset(peer)) + return; + } + void ms_deliver_handle_remote_reset(const entity_addr_t& peer) { + for (list::iterator p = dispatchers.begin(); + p != dispatchers.end(); + p++) + (*p)->ms_handle_remote_reset(peer); + } + void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) { + for (list::iterator p = dispatchers.begin(); + p != dispatchers.end(); + p++) + (*p)->ms_handle_failure(m, peer); } // shutdown diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 2c1955c3b0b7..ca7c092c260f 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -287,20 +287,20 @@ void SimpleMessenger::Endpoint::dispatch_entry() entity_addr_t a = remote_reset_q.front(); remote_reset_q.pop_front(); lock.Unlock(); - get_dispatcher()->ms_deliver_handle_remote_reset(a); + ms_deliver_handle_remote_reset(a); } else if ((long)m == BAD_RESET) { lock.Lock(); entity_addr_t a = reset_q.front(); reset_q.pop_front(); lock.Unlock(); - get_dispatcher()->ms_deliver_handle_reset(a); + ms_deliver_handle_reset(a); } else if ((long)m == BAD_FAILED) { lock.Lock(); m = failed_q.front().first; entity_addr_t a = failed_q.front().second; failed_q.pop_front(); lock.Unlock(); - get_dispatcher()->ms_deliver_handle_failure(m, a); + ms_deliver_handle_failure(m, a); m->put(); } else { dout(1) << "<== " << m->get_source_inst() @@ -312,7 +312,7 @@ void SimpleMessenger::Endpoint::dispatch_entry() << " " << m->get_footer().data_crc << ")" << " " << m << dendl; - dispatch(m); + ms_deliver_dispatch(m); dout(20) << "done calling dispatch on " << m << dendl; } } @@ -1170,7 +1170,7 @@ void SimpleMessenger::Pipe::fail() report_failures(); for (unsigned i=0; ilocal.size(); i++) - if (rank->local[i] && rank->local[i]->get_dispatcher()) + if (rank->local[i]) rank->local[i]->queue_reset(peer_addr); // unregister @@ -1188,7 +1188,7 @@ void SimpleMessenger::Pipe::was_session_reset() dout(10) << "was_session_reset" << dendl; report_failures(); for (unsigned i=0; ilocal.size(); i++) - if (rank->local[i] && rank->local[i]->get_dispatcher()) + if (rank->local[i]) rank->local[i]->queue_remote_reset(peer_addr); out_seq = 0; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 62cbd8e9c2af..0d462a80e3b7 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -415,9 +415,10 @@ int OSD::init() // i'm ready! - messenger->set_dispatcher(this); - link_dispatcher(&logclient); - heartbeat_messenger->set_dispatcher(&heartbeat_dispatcher); + messenger->add_dispatcher_head(this); + messenger->add_dispatcher_head(&logclient); + + heartbeat_messenger->add_dispatcher_head(&heartbeat_dispatcher); monc->init(); @@ -533,7 +534,6 @@ int OSD::shutdown() } pg_map.clear(); - unlink_dispatcher(&logclient); messenger->shutdown(); if (heartbeat_messenger) heartbeat_messenger->shutdown(); @@ -1613,18 +1613,6 @@ void OSD::_dispatch(Message *m) } -void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst) -{ - entity_name_t dest = inst.name; - - dout(1) << "ms_handle_failure " << inst << " on " << *m << dendl; - if (g_conf.ms_die_on_failure) - assert(0); -} - - - - void OSD::handle_scrub(MOSDScrub *m) { dout(10) << "handle_scrub " << *m << dendl; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index b9052abf8968..5a1b88d3b775 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -207,6 +207,9 @@ public: bool ms_dispatch(Message *m) { return osd->heartbeat_dispatch(m); }; + bool ms_handle_reset(const entity_addr_t& peer) { return false; } + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} public: OSD *osd; HeartbeatDispatcher(OSD *o) : osd(o) {} @@ -812,7 +815,11 @@ protected: } remove_wq; private: - virtual bool ms_dispatch(Message *m); + bool ms_dispatch(Message *m); + bool ms_handle_reset(const entity_addr_t& peer) { return false; } + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} + public: OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev = 0, const char *jdev = 0); ~OSD(); @@ -827,9 +834,6 @@ protected: int init(); int shutdown(); - // messages - virtual void ms_handle_failure(Message *m, const entity_inst_t& inst); - void reply_op_error(MOSDOp *op, int r); void handle_scrub(class MOSDScrub *m); diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index b223b2f5b067..013d30d38431 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -368,6 +368,14 @@ private: bool is_out(int osd) { return !exists(osd) || get_weight(osd) == CEPH_OSD_OUT; } bool is_in(int osd) { return exists(osd) && !is_out(osd); } + bool have_addr(const entity_addr_t& addr) const { + for (vector::const_iterator p = osd_addr.begin(); + p != osd_addr.end(); + p++) + if (*p == addr) + return true; + return false; + } bool have_inst(int osd) { return exists(osd) && is_up(osd); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 22d617f017d0..a1da74a30b5e 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -934,28 +934,10 @@ void Objecter::_sg_read_finish(vector& extents, vector } -void Objecter::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest) +void Objecter::ms_handle_remote_reset(const entity_addr_t& addr) { - entity_inst_t inst; - inst.name = dest; - inst.addr = addr; - - if (dest.is_osd()) { - if (!osdmap->have_inst(dest.num()) || - (osdmap->get_inst(dest.num()) != inst)) { - dout(0) << "ms_handle_remote_reset " << dest << " inst " << inst - << ", ignoring, already have newer osdmap" << dendl; - } else { - // kick requests - set changed_pgs; - dout(0) << "ms_handle_remote_reset " << dest << dendl; - scan_pgs_for(changed_pgs, dest.num()); - if (!changed_pgs.empty()) { - dout(0) << "ms_handle_remote_reset " << dest << " kicking " << changed_pgs << dendl; - kick_requests(changed_pgs); - } - } - } + if (osdmap->have_addr(addr)) + maybe_request_map(); } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 2bbe29d3343e..8c3f3e6eb9dd 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -697,7 +697,7 @@ public: } } - void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest); + void ms_handle_remote_reset(const entity_addr_t& addr); }; diff --git a/src/testmsgr.cc b/src/testmsgr.cc index 0359881034a5..368d34dd7b3a 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -57,8 +57,9 @@ class Admin : public Dispatcher { return true; } - void ms_handle_failure(Message *m, const entity_inst_t& inst) { - } + bool ms_handle_reset(const entity_addr_t& peer) { return false; } + void ms_handle_failure(Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(const entity_addr_t& peer) {} } dispatcher; @@ -92,7 +93,7 @@ int main(int argc, const char **argv, const char *envp[]) { // start monitor messenger = rank.register_entity(entity_name_t::MON(whoami)); messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH); - messenger->set_dispatcher(&dispatcher); + messenger->add_dispatcher_head(&dispatcher); rank.start();