From e6349ec2e0ff65d78e2606843660cf4e5841d5d5 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Thu, 9 Jun 2011 16:08:14 -0700 Subject: [PATCH] Dispatcher should take a CephContext argument Signed-off-by: Colin McCabe --- src/cfuse.cc | 2 +- src/client/Client.cc | 2 +- src/cmds.cc | 4 ++-- src/cmon.cc | 2 +- src/common/LogClient.cc | 7 +++++++ src/common/LogClient.h | 4 +--- src/cosd.cc | 6 +++--- src/csyn.cc | 2 +- src/dumpjournal.cc | 11 +++++++++-- src/libceph.cc | 2 +- src/librados.cc | 5 +++-- src/mds/Dumper.h | 1 + src/mds/MDS.cc | 1 + src/mds/Resetter.h | 1 + src/mon/MonClient.cc | 21 ++++++++++++++++++++- src/mon/MonClient.h | 19 +------------------ src/mon/Monitor.cc | 1 + src/msg/Dispatcher.h | 7 ++++++- src/msg/Messenger.h | 6 ++++-- src/msg/SimpleMessenger.h | 4 ++-- src/osd/OSD.cc | 1 + src/osd/OSD.h | 5 ++++- src/testmsgr.cc | 8 +++++++- src/tools/common.cc | 5 +++-- 24 files changed, 82 insertions(+), 45 deletions(-) diff --git a/src/cfuse.cc b/src/cfuse.cc index 84ea77e898cd0..9b80df0c5d327 100644 --- a/src/cfuse.cc +++ b/src/cfuse.cc @@ -90,7 +90,7 @@ int main(int argc, const char **argv, const char *envp[]) { return -1; // start up network - SimpleMessenger *messenger = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(&g_ceph_context); messenger->register_entity(entity_name_t::CLIENT()); Client *client = new Client(messenger, &mc); if (filer_flags) { diff --git a/src/client/Client.cc b/src/client/Client.cc index 19316668a587e..ecc41f33bb04c 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -109,7 +109,7 @@ void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset) // cons/des Client::Client(Messenger *m, MonClient *mc) - : timer(client_lock), client_lock("Client::client_lock"), + : Dispatcher(m->cct), timer(client_lock), client_lock("Client::client_lock"), filer_flags(0) { // which client am i? diff --git a/src/cmds.cc b/src/cmds.cc index 0b77d466cb573..3f7f3e05e00e4 100644 --- a/src/cmds.cc +++ b/src/cmds.cc @@ -62,7 +62,7 @@ void usage() static int do_cmds_special_action(const std::string &action, const std::string &dump_file, int rank) { - SimpleMessenger *messenger = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(&g_ceph_context); messenger->bind(getpid()); MonClient mc(&g_ceph_context); if (mc.build_initial_monmap() < 0) @@ -197,7 +197,7 @@ int main(int argc, const char **argv) usage(); } - SimpleMessenger *messenger = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(&g_ceph_context); messenger->bind(getpid()); cout << "starting " << g_conf->name << " at " << messenger->get_ms_addr() << std::endl; diff --git a/src/cmon.cc b/src/cmon.cc index 0326e6c094fdb..0995e7fb2cdbb 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -242,7 +242,7 @@ int main(int argc, const char **argv) } // bind - SimpleMessenger *messenger = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(&g_ceph_context); int rank = monmap.get_rank(g_conf->name.get_id()); cout << "starting " << g_conf->name << " rank " << rank diff --git a/src/common/LogClient.cc b/src/common/LogClient.cc index b1683b0f84cf3..865f932d5d04e 100644 --- a/src/common/LogClient.cc +++ b/src/common/LogClient.cc @@ -60,6 +60,13 @@ static inline int clog_type_to_syslog_prio(clog_type t) } } +LogClient::LogClient(Messenger *m, MonMap *mm, MonClient *mc, enum logclient_flag_t flags) : + Dispatcher(m->cct), + messenger(m), monmap(mm), monc(mc), is_mon(flags & FLAG_MON), + log_lock("LogClient::log_lock"), last_log_sent(0), last_log(0) +{ +} + LogClientTemp::LogClientTemp(clog_type type_, LogClient &parent_) : type(type_), parent(parent_) { diff --git a/src/common/LogClient.h b/src/common/LogClient.h index a956b52019970..3b3997a729834 100644 --- a/src/common/LogClient.h +++ b/src/common/LogClient.h @@ -56,9 +56,7 @@ public: FLAG_MON = 0x1, }; - LogClient(Messenger *m, MonMap *mm, MonClient *mc, enum logclient_flag_t flags) : - messenger(m), monmap(mm), monc(mc), is_mon(flags & FLAG_MON), - log_lock("LogClient::log_lock"), last_log_sent(0), last_log(0) { } + LogClient(Messenger *m, MonMap *mm, MonClient *mc, enum logclient_flag_t flags); void handle_log_ack(MLogAck *m); diff --git a/src/cosd.cc b/src/cosd.cc index bdf042e626764..73b79dd0470b9 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -225,9 +225,9 @@ int main(int argc, const char **argv) cluster_addr_set = false; } - SimpleMessenger *client_messenger = new SimpleMessenger(); - SimpleMessenger *cluster_messenger = new SimpleMessenger(); - SimpleMessenger *messenger_hb = new SimpleMessenger(); + SimpleMessenger *client_messenger = new SimpleMessenger(&g_ceph_context); + SimpleMessenger *cluster_messenger = new SimpleMessenger(&g_ceph_context); + SimpleMessenger *messenger_hb = new SimpleMessenger(&g_ceph_context); client_messenger->bind(g_conf->public_addr, getpid()); cluster_messenger->bind(g_conf->cluster_addr, getpid()); diff --git a/src/csyn.cc b/src/csyn.cc index da045ee6f120f..1354fb5d0e4e9 100644 --- a/src/csyn.cc +++ b/src/csyn.cc @@ -65,7 +65,7 @@ int main(int argc, const char **argv, char *envp[]) cout << "csyn: starting " << g_conf->num_client << " syn client(s)" << std::endl; for (int i=0; inum_client; i++) { - messengers[i] = new SimpleMessenger(); + messengers[i] = new SimpleMessenger(&g_ceph_context); messengers[i]->register_entity(entity_name_t(entity_name_t::TYPE_CLIENT,-1)); messengers[i]->bind(i * 1000000 + getpid()); mclients[i] = new MonClient(&g_ceph_context); diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc index b403a1b0cf0a7..45e5ccd52a952 100644 --- a/src/dumpjournal.cc +++ b/src/dumpjournal.cc @@ -51,6 +51,12 @@ SafeTimer *obj_timer = 0; SafeTimer *jnl_timer = 0; class Dumper : public Dispatcher { +public: + Dumper() + : Dispatcher(&g_ceph_context) + { + } +private: bool ms_dispatch(Message *m) { switch (m->get_type()) { case CEPH_MSG_OSD_OPREPLY: @@ -67,7 +73,7 @@ class Dumper : public Dispatcher { bool ms_handle_reset(Connection *con) { return false; } void ms_handle_remote_reset(Connection *con) {} -} dispatcher; +}; void usage() @@ -94,10 +100,11 @@ int main(int argc, const char **argv, const char *envp[]) return -1; // start up network - SimpleMessenger *messenger = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(&g_ceph_context); messenger->bind(getpid()); messenger->register_entity(entity_name_t::CLIENT()); messenger->start(); + Dumper dispatcher; messenger->add_dispatcher_head(&dispatcher); inodeno_t ino = MDS_INO_LOG_OFFSET + mds; diff --git a/src/libceph.cc b/src/libceph.cc index 14e512d6155f5..80d23a3bf8b1a 100644 --- a/src/libceph.cc +++ b/src/libceph.cc @@ -80,7 +80,7 @@ public: } //network connection - messenger = new SimpleMessenger(); + messenger = new SimpleMessenger(cct); if (!messenger->register_entity(entity_name_t::CLIENT())) { messenger->destroy(); messenger = NULL; diff --git a/src/librados.cc b/src/librados.cc index 62371cd4208b5..ef52a55780b9c 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -403,7 +403,8 @@ private: SafeTimer timer; public: - RadosClient(CephContext *cct_) : cct(cct_), conf(cct_->_conf), + RadosClient(CephContext *cct_) : Dispatcher(cct_), + cct(cct_), conf(cct_->_conf), state(DISCONNECTED), monclient(cct_), messenger(NULL), objecter(NULL), lock("radosclient"), timer(lock), max_watch_cookie(0) @@ -701,7 +702,7 @@ connect() goto out; err = -ENOMEM; - messenger = new SimpleMessenger(); + messenger = new SimpleMessenger(cct); if (!messenger) goto out; diff --git a/src/mds/Dumper.h b/src/mds/Dumper.h index 6ce45d60de78c..e07b171d09b2c 100644 --- a/src/mds/Dumper.h +++ b/src/mds/Dumper.h @@ -49,6 +49,7 @@ public: * build_initial_monmap(). */ Dumper(SimpleMessenger *messenger_, MonClient *monc_) : + Dispatcher(messenger_->cct), messenger(messenger_), monc(monc_), lock("Dumper::lock"), timer(lock) diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index d4dbd03786c26..15018a5076341 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -85,6 +85,7 @@ // cons/des MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : + Dispatcher(m->cct), mds_lock("MDS::mds_lock"), timer(mds_lock), name(n), diff --git a/src/mds/Resetter.h b/src/mds/Resetter.h index ad4f670ec708a..15fb0eb9df01e 100644 --- a/src/mds/Resetter.h +++ b/src/mds/Resetter.h @@ -47,6 +47,7 @@ public: * build_initial_monmap(). */ Resetter(SimpleMessenger *messenger_, MonClient *monc_) : + Dispatcher(messenger_->cct), messenger(messenger_), monc(monc_), lock("Resetter::lock"), timer(lock) diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 02e3a0ea9ef86..531202c6482d6 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -41,6 +41,25 @@ #undef dout_prefix #define dout_prefix *_dout << "monclient" << (hunting ? "(hunting)":"") << ": " +MonClient::MonClient(CephContext *cct_) : + Dispatcher(cct_), + cct(cct_), + state(MC_STATE_NONE), + messenger(NULL), + cur_con(NULL), + monc_lock("MonClient::monc_lock"), + timer(monc_lock), + log_client(NULL), + hunting(true), + want_monmap(true), + want_keys(0), global_id(0), + authenticate_err(0), + auth(NULL), + keyring(NULL), + rotating_secrets(NULL) +{ +} + MonClient::~MonClient() { delete auth; @@ -190,7 +209,7 @@ int MonClient::get_monmap_privately() bool temp_msgr = false; SimpleMessenger* smessenger = NULL; if (!messenger) { - messenger = smessenger = new SimpleMessenger(); + messenger = smessenger = new SimpleMessenger(cct); smessenger->register_entity(entity_name_t::CLIENT(-1)); messenger->add_dispatcher_head(this); smessenger->start_with_nonce(getpid()); diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index c2b83ab25fb16..e6b7943ade7b3 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -166,24 +166,7 @@ public: RotatingKeyRing *rotating_secrets; public: - MonClient(CephContext *cct_) : - cct(cct_), - state(MC_STATE_NONE), - messenger(NULL), - cur_con(NULL), - monc_lock("MonClient::monc_lock"), - timer(monc_lock), - log_client(NULL), - hunting(true), - want_monmap(true), - want_keys(0), global_id(0), - authenticate_err(0), - auth(NULL), - keyring(NULL), - rotating_secrets(NULL) - { - } - + MonClient(CephContext *cct_); ~MonClient(); int init(); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 256942846b941..f7347aa10e20f 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -86,6 +86,7 @@ const CompatSet::Feature ceph_mon_feature_incompat[] = { CEPH_MON_FEATURE_INCOMPAT_BASE , CompatSet::Feature(0, "")}; Monitor::Monitor(string nm, MonitorStore *s, Messenger *m, MonMap *map) : + Dispatcher(m->cct), name(nm), rank(-1), messenger(m), diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 21fc913a8d739..5742dca9aacc6 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -24,8 +24,11 @@ class Messenger; class Dispatcher { public: + Dispatcher(CephContext *cct_) + : cct(cct_) + { + } virtual ~Dispatcher() { } - Dispatcher() { } // how i receive messages virtual bool ms_dispatch(Message *m) = 0; @@ -51,6 +54,8 @@ public: virtual bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, bool& isvalid) { return false; }; +private: + CephContext *cct; }; #endif diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 1171e1c3b647b..a61278b0b4239 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -44,8 +44,10 @@ protected: atomic_t nref; public: - Messenger(entity_name_t w) : default_send_priority(CEPH_MSG_PRIO_DEFAULT), - nref(1) { + CephContext *cct; + Messenger(CephContext *cct_, entity_name_t w) + : default_send_priority(CEPH_MSG_PRIO_DEFAULT), nref(1), cct(cct_) + { _my_name = w; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 2d165c22defd1..ac76df221947c 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -553,8 +553,8 @@ private: int timeout; public: - SimpleMessenger() : - Messenger(entity_name_t()), + SimpleMessenger(CephContext *cct) : + Messenger(cct, entity_name_t()), accepter(this), lock("SimpleMessenger::lock"), started(false), did_bind(false), dispatch_throttler(g_conf->ms_dispatch_throttle_bytes), need_addr(true), diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 2f405d3df9f17..0b3411af13d9c 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -400,6 +400,7 @@ int OSD::peek_meta(const std::string &dev, std::string& magic, OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, Messenger *hbm, MonClient *mc, const std::string &dev, const std::string &jdev) : + Dispatcher(hbm->cct), osd_lock("OSD::osd_lock"), timer(osd_lock), cluster_messenger(internal_messenger), diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 7576f2c2b0a38..906bfcb727622 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -269,7 +269,10 @@ public: void ms_handle_remote_reset(Connection *con) {} public: OSD *osd; - HeartbeatDispatcher(OSD *o) : osd(o) {} + HeartbeatDispatcher(OSD *o) + : Dispatcher(&g_ceph_context), osd(o) + { + } } heartbeat_dispatcher; diff --git a/src/testmsgr.cc b/src/testmsgr.cc index 0f9cf37a06886..cf7b182c0bca5 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -45,6 +45,12 @@ Cond cond; uint64_t received = 0; class Admin : public Dispatcher { +public: + Admin() + : Dispatcher(&g_ceph_context) + { + } +private: bool ms_dispatch(Message *m) { //cerr << "got ping from " << m->get_source() << std::endl; @@ -86,7 +92,7 @@ int main(int argc, const char **argv, const char *envp[]) { int whoami = mc.monmap.get_rank(args[0]); assert(whoami >= 0); g_conf->public_addr = mc.monmap.get_addr(whoami); - SimpleMessenger *rank = new SimpleMessenger(); + SimpleMessenger *rank = new SimpleMessenger(&g_ceph_context); int err = rank->bind(getpid()); if (err < 0) return 1; diff --git a/src/tools/common.cc b/src/tools/common.cc index 5e42bfd7f4987..2de08841f0dc8 100644 --- a/src/tools/common.cc +++ b/src/tools/common.cc @@ -280,7 +280,8 @@ static void send_command(CephToolCtx *ctx) class Admin : public Dispatcher { public: Admin(CephToolCtx *ctx_) - : ctx(ctx_) + : Dispatcher(&g_ceph_context), + ctx(ctx_) { } @@ -538,7 +539,7 @@ CephToolCtx* ceph_tool_common_init(ceph_tool_mode_t mode, bool concise) tok = tok_init(NULL); // start up network - messenger = new SimpleMessenger(); + messenger = new SimpleMessenger(&g_ceph_context); messenger->register_entity(entity_name_t::CLIENT()); messenger->start_with_nonce(getpid()); ctx->dispatcher = new Admin(ctx.get()); -- 2.39.5