From a5a7eea27c65af2a18007d2a606b88f8be3b6634 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 4 Dec 2008 13:31:00 -0800 Subject: [PATCH] dispatcher: cascading dispatch infrastructure --- src/client/Client.cc | 8 ++++---- src/client/Client.h | 3 +-- src/cmonctl.cc | 7 +++++-- src/dumpjournal.cc | 5 ++++- src/mds/MDS.cc | 18 ++++++++++-------- src/mds/MDS.h | 6 +++--- src/mon/MonClient.cc | 6 +++++- src/mon/MonClient.h | 2 +- src/mon/Monitor.cc | 7 ++++--- src/mon/Monitor.h | 4 +++- src/mon/PaxosService.cc | 11 ++++++----- src/mon/PaxosService.h | 2 +- src/msg/Dispatcher.cc | 11 +++++++++++ src/msg/Dispatcher.h | 24 ++++++++++++++++++++++-- src/osd/OSD.cc | 14 +++++++------- src/osd/OSD.h | 14 ++++++++------ src/testmsgr.cc | 3 ++- 17 files changed, 97 insertions(+), 48 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index c159a6b911651..47362dd013639 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1082,7 +1082,7 @@ void Client::handle_client_reply(MClientReply *reply) // ------------------------ // incoming messages -void Client::dispatch(Message *m) +bool Client::dispatch_impl(Message *m) { client_lock.Lock(); @@ -1135,9 +1135,7 @@ void Client::dispatch(Message *m) break; default: - dout(10) << "dispatch doesn't recognize message type " << m->get_type() << dendl; - assert(0); // fail loudly - break; + return false; } // unmounting? @@ -1156,6 +1154,8 @@ void Client::dispatch(Message *m) } client_lock.Unlock(); + + return true; } diff --git a/src/client/Client.h b/src/client/Client.h index 1a0e8d49db5ad..81f2f0d642fef 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -806,6 +806,7 @@ protected: // friends friend class SyntheticClient; + bool dispatch_impl(Message *m); public: Client(Messenger *m, MonMap *mm); @@ -818,8 +819,6 @@ protected: void shutdown(); // messaging - void dispatch(Message *m); - void handle_mon_map(MMonMap *m); void handle_unmount(Message*); void handle_mds_map(class MMDSMap *m); diff --git a/src/cmonctl.cc b/src/cmonctl.cc index a7f96141d39ab..ad26580958528 100644 --- a/src/cmonctl.cc +++ b/src/cmonctl.cc @@ -134,12 +134,15 @@ void handle_ack(MMonCommandAck *ack) class Admin : public Dispatcher { - void dispatch(Message *m) { + bool dispatch_impl(Message *m) { switch (m->get_type()) { case MSG_MON_COMMAND_ACK: handle_ack((MMonCommandAck*)m); - break; + break; + default: + return false; } + return true; } } dispatcher; diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc index a190a685ef6c4..09c54feb3c299 100644 --- a/src/dumpjournal.cc +++ b/src/dumpjournal.cc @@ -48,7 +48,7 @@ Objecter *objecter = 0; Journaler *journaler = 0; class Dumper : public Dispatcher { - void dispatch(Message *m) { + bool dispatch_impl(Message *m) { switch (m->get_type()) { case CEPH_MSG_OSD_OPREPLY: objecter->handle_osd_op_reply((MOSDOpReply *)m); @@ -56,7 +56,10 @@ class Dumper : public Dispatcher { case CEPH_MSG_OSD_MAP: objecter->handle_osd_map((MOSDMap*)m); break; + default: + return false; } + return true; } } dispatcher; diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index c644ddcc3cff5..e976fed12ad5d 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -1097,16 +1097,19 @@ void MDS::suicide() -void MDS::dispatch(Message *m) +bool MDS::dispatch_impl(Message *m) { + bool ret; mds_lock.Lock(); - _dispatch(m); + ret = _dispatch(m); mds_lock.Unlock(); + + return ret; } -void MDS::_dispatch(Message *m) +bool MDS::_dispatch(Message *m) { // from bad mds? if (m->get_source().is_mds()) { @@ -1126,7 +1129,7 @@ void MDS::_dispatch(Message *m) dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source() << ", dropping" << dendl; delete m; - return; + return true; } } } @@ -1204,8 +1207,7 @@ void MDS::_dispatch(Message *m) break; default: - dout(1) << "MDS unknown messge " << m->get_type() << dendl; - assert(0); + return false; } } @@ -1214,7 +1216,7 @@ void MDS::_dispatch(Message *m) if (laggy) - return; + return true; // finish any triggered contexts @@ -1314,7 +1316,7 @@ void MDS::_dispatch(Message *m) stopping_done(); } } - + return true; } diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 3d0ce1c1fe762..d89e2954526a5 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -217,7 +217,8 @@ class MDS : public Dispatcher { int get_req_rate() { return req_rate; } - + private: + virtual bool dispatch_impl(Message *m); public: MDS(int whoami, Messenger *m, MonMap *mm); ~MDS(); @@ -270,8 +271,7 @@ class MDS : public Dispatcher { void reset_beacon_killer(); // messages - virtual void dispatch(Message *m); - void _dispatch(Message *m); + bool _dispatch(Message *m); void ms_handle_failure(Message *m, const entity_inst_t& inst); void ms_handle_reset(const entity_addr_t& addr, entity_name_t last); diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index d044357752efc..fc7c51aaa7466 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -88,14 +88,18 @@ void MonClient::handle_monmap(MMonMap *m) monmap_lock.Unlock(); } -void MonClient::dispatch(Message *m) +bool MonClient::dispatch_impl(Message *m) { dout(10) << "dispatch " << *m << dendl; switch (m->get_type()) { case CEPH_MSG_MON_MAP: handle_monmap((MMonMap*)m); + break; + default: + return false; } delete m; + return true; } diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index ddacb3217f454..85b66798993b6 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -24,8 +24,8 @@ class MMonMap; class MonClient : public Dispatcher { int probe_mon(MonMap *pmonmap); void handle_monmap(MMonMap *m); + bool dispatch_impl(Message *m); public: - void dispatch(Message *m); int get_monmap(MonMap *pmonmap); }; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 79d397266af19..1ea979aeab8cc 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -331,7 +331,7 @@ void Monitor::stop_cluster() } -void Monitor::dispatch(Message *m) +bool Monitor::dispatch_impl(Message *m) { lock.Lock(); { @@ -432,11 +432,12 @@ void Monitor::dispatch(Message *m) default: - dout(0) << "unknown message " << m << " " << *m << " from " << m->get_source_inst() << dendl; - assert(0); + return false; } } lock.Unlock(); + + return true; } void Monitor::handle_mon_get_map(MMonGetMap *m) diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 78048dda27c0e..187f2d5e08652 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -151,13 +151,15 @@ public: } }; + private: + bool dispatch_impl(Message *m); + public: Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map); ~Monitor(); void init(); void shutdown(); - void dispatch(Message *m); void tick(); void stop_cluster(); diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index 8e95dfb92d600..069549e1e770f 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -36,7 +36,7 @@ const char *PaxosService::get_machine_name() } -void PaxosService::dispatch(Message *m) +bool PaxosService::dispatch_impl(Message *m) { dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl; @@ -44,7 +44,7 @@ void PaxosService::dispatch(Message *m) if (!paxos->is_readable()) { dout(10) << " waiting for paxos -> readable" << dendl; paxos->wait_for_readable(new C_RetryMessage(this, m)); - return; + return true; } // make sure service has latest from paxos. @@ -52,21 +52,21 @@ void PaxosService::dispatch(Message *m) // preprocess if (preprocess_query(m)) - return; // easy! + return true; // easy! // leader? if (!mon->is_leader()) { // fw to leader dout(10) << " fw to leader mon" << mon->get_leader() << dendl; mon->messenger->forward_message(m, mon->monmap->get_inst(mon->get_leader())); - return; + return true; } // writeable? if (!paxos->is_writeable()) { dout(10) << " waiting for paxos -> writeable" << dendl; paxos->wait_for_writeable(new C_RetryMessage(this, m)); - return; + return true; } // update @@ -89,6 +89,7 @@ void PaxosService::dispatch(Message *m) dout(10) << " not proposing" << dendl; } } + return true; } bool PaxosService::should_propose(double& delay) diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h index 10cb6fcd4e4c6..03499c317914a 100644 --- a/src/mon/PaxosService.h +++ b/src/mon/PaxosService.h @@ -71,6 +71,7 @@ protected: private: Context *proposal_timer; bool have_pending; + bool dispatch_impl(Message *m); public: PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p), @@ -80,7 +81,6 @@ public: const char *get_machine_name(); // i implement and you ignore - void dispatch(Message *m); void election_starting(); void election_finished(); void shutdown(); diff --git a/src/msg/Dispatcher.cc b/src/msg/Dispatcher.cc index 4fa04d7d4c92a..80dadb03f9528 100644 --- a/src/msg/Dispatcher.cc +++ b/src/msg/Dispatcher.cc @@ -26,3 +26,14 @@ int Dispatcher::send_message(Message *m, msg_addr_t dest, int dest_port) //return dis_messenger->send_message(m, dest, dest_port, MDS_PORT_SERVER); // on my port! } */ +void Dispatcher::dispatch(Message *m) { + if (!dispatch_impl(m)) { + if (next) { + next->dispatch(m); + } else { + dout(10) << "dispatch doesn't recognize message type " << m->get_type() << dendl; + assert(0); + } + } +} + diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index cbaf7aaaaa591..42ce2e7d8ae1b 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -21,11 +21,31 @@ class Messenger; class Dispatcher { + Dispatcher *next; + + // how i receive messages + virtual bool dispatch_impl(Message *m) = 0; public: virtual ~Dispatcher() { } + Dispatcher() : next(NULL) { } - // how i receive messages - virtual void dispatch(Message *m) = 0; + virtual void dispatch(Message *m) { + if (!dispatch_impl(m)) { + if (next) { + next->dispatch(m); + } else { + assert(0); + } + } + } + + virtual void add(Dispatcher *disp) { + if (!next) { + next = disp; + } else { + next->add(disp); + } + } // how i deal with transmission failures. virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 89732f65c8115..a3acdece834dc 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1430,7 +1430,7 @@ void OSD::_share_map_outgoing(const entity_inst_t& inst) } -void OSD::heartbeat_dispatch(Message *m) +bool OSD::heartbeat_dispatch(Message *m) { dout(20) << "heartbeat_dispatch " << m << dendl; @@ -1446,13 +1446,13 @@ void OSD::heartbeat_dispatch(Message *m) break; default: - dout(1) << " got unknown message " << m->get_type() << dendl; - assert(0); + return false; } + return true; } -void OSD::dispatch(Message *m) +bool OSD::dispatch_impl(Message *m) { // lock! osd_lock.Lock(); @@ -1551,8 +1551,7 @@ void OSD::dispatch(Message *m) default: - dout(1) << " got unknown message " << m->get_type() << dendl; - assert(0); + return false; } } } @@ -1571,11 +1570,12 @@ void OSD::dispatch(Message *m) dispatch(waiting.front()); waiting.pop_front(); } - return; + return true; } finished_lock.Unlock(); osd_lock.Unlock(); + return true; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 47122ee24e595..398dcc265a03b 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -138,14 +138,16 @@ private: } heartbeat_thread; public: - void heartbeat_dispatch(Message *m); + bool heartbeat_dispatch(Message *m); struct HeartbeatDispatcher : public Dispatcher { + private: + bool dispatch_impl(Message *m) { + return osd->heartbeat_dispatch(m); + }; + public: OSD *osd; HeartbeatDispatcher(OSD *o) : osd(o) {} - void dispatch(Message *m) { - osd->heartbeat_dispatch(m); - }; } heartbeat_dispatcher; @@ -607,7 +609,8 @@ private: } } scrub_wq; - + private: + virtual bool dispatch_impl(Message *m); public: OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev = 0); ~OSD(); @@ -623,7 +626,6 @@ private: int shutdown(); // messages - virtual void dispatch(Message *m); virtual void ms_handle_failure(Message *m, const entity_inst_t& inst); void reply_op_error(MOSDOp *op, int r); diff --git a/src/testmsgr.cc b/src/testmsgr.cc index 2ab1ed8b659b2..f9ea598292530 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -44,7 +44,7 @@ Cond cond; __u64 received = 0; class Admin : public Dispatcher { - void dispatch(Message *m) { + bool dispatch_impl(Message *m) { //cerr << "got ping from " << m->get_source() << std::endl; dout(0) << "got ping from " << m->get_source() << dendl; @@ -54,6 +54,7 @@ class Admin : public Dispatcher { lock.Unlock(); delete m; + return true; } void ms_handle_failure(Message *m, const entity_inst_t& inst) { -- 2.39.5