From e85c9f89df8a60c9a7fdc2cbc3403159b61ffc9c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 20 Nov 2008 10:36:19 -0800 Subject: [PATCH] msgr: reference count messenger We want an explicit destroy() method, because the SimpleMessenger needs to join the dispatch thread, and that can't happen just on the last reference drop because that may happen in the dispatch thread itself. --- src/client/Client.cc | 8 ++++---- src/cmonctl.cc | 2 +- src/mds/MDS.cc | 7 +++---- src/mon/MonClient.cc | 2 +- src/mon/Monitor.cc | 3 ++- src/msg/Messenger.h | 21 +++++++++++++++++++-- src/msg/SimpleMessenger.cc | 12 +++++++++--- src/msg/SimpleMessenger.h | 6 +++++- src/osd/OSD.cc | 12 ++++++------ 9 files changed, 50 insertions(+), 23 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 3dc30a85cb248..96d73cc57ca47 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -158,8 +158,8 @@ Client::~Client() if (objecter) { delete objecter; objecter = 0; } if (osdmap) { delete osdmap; osdmap = 0; } if (mdsmap) { delete mdsmap; mdsmap = 0; } - - if (messenger) { delete messenger; messenger = 0; } + if (messenger) + messenger->destroy(); } @@ -5043,11 +5043,11 @@ void Client::ms_handle_failure(Message *m, const entity_inst_t& inst) dout(0) << "ms_handle_failure " << *m << " to " << inst << ", resending to mon" << mon << dendl; - messenger->send_message(m, monmap->get_inst(mon)); + Message *n = decode_message(m->get_header(), m->get_footer(), m->get_payload(), m->get_data()); + messenger->send_message(n, monmap->get_inst(mon)); } else { dout(0) << "ms_handle_failure " << *m << " to " << inst << ", dropping" << dendl; - delete m; } } diff --git a/src/cmonctl.cc b/src/cmonctl.cc index e532e7d8584f6..f75a3704d4aad 100644 --- a/src/cmonctl.cc +++ b/src/cmonctl.cc @@ -230,7 +230,7 @@ int main(int argc, const char **argv, const char *envp[]) { // wait for messenger to finish rank.wait(); - + messenger->destroy(); return 0; } diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index a6ce88f0ec582..d9957b4c6237e 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -142,11 +142,12 @@ MDS::~MDS() { if (filer) { delete filer; filer = 0; } if (objecter) { delete objecter; objecter = 0; } - if (messenger) { delete messenger; messenger = NULL; } if (logger) { delete logger; logger = 0; } if (logger2) { delete logger2; logger2 = 0; } - + + if (messenger) + messenger->destroy(); } @@ -1295,8 +1296,6 @@ void MDS::ms_handle_failure(Message *m, const entity_inst_t& inst) { mds_lock.Lock(); dout(0) << "ms_handle_failure to " << inst << " on " << *m << dendl; - - delete m; mds_lock.Unlock(); } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 55275d65d6bdb..d044357752efc 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -50,7 +50,7 @@ int MonClient::probe_mon(MonMap *pmonmap) dout(2) << "get_monmap got monmap epoch " << pmonmap->epoch << " fsid " << pmonmap->fsid << dendl; } msgr->shutdown(); - //delete msgr; // FIXME: we need proper reference counting in messenger + msgr->destroy(); rank.wait(); if (monmap_bl.length()) diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index a18d286d48abd..abbfa5d693a0b 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -90,7 +90,8 @@ Monitor::~Monitor() delete mdsmon; delete clientmon; delete pgmon; - delete messenger; + if (messenger) + messenger->destroy(); } void Monitor::init() diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index e5505fc024aba..817a9ed3e3d60 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -39,11 +39,28 @@ protected: entity_inst_t _myinst; int default_send_priority; + atomic_t nref; + public: - Messenger(entity_name_t w) : dispatcher(0), default_send_priority(CEPH_MSG_PRIO_DEFAULT) { + Messenger(entity_name_t w) : dispatcher(0), + default_send_priority(CEPH_MSG_PRIO_DEFAULT), + nref(1) { _myinst.name = w; } - virtual ~Messenger() { } + virtual ~Messenger() { + assert(nref.test() == 0); + } + + void get() { + nref.inc(); + } + void put() { + if (nref.dec() == 0) + delete this; + } + virtual void destroy() { + put(); + } // accessors entity_name_t get_myname() { return _myinst.name; } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 7c48bb61b2d8b..76cf703385ab0 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -403,6 +403,7 @@ Rank::EntityMessenger *Rank::register_entity(entity_name_t name) local.resize(max_local); stopped.resize(max_local); + msgr->get(); local[erank] = msgr; stopped[erank] = false; msgr->_myinst.addr = rank_addr; @@ -427,9 +428,15 @@ void Rank::unregister_entity(EntityMessenger *msgr) dout(10) << "unregister_entity " << msgr->get_myname() << dendl; // remove from local directory. + assert(msgr->my_rank >= 0); + assert(local[msgr->my_rank] == msgr); local[msgr->my_rank] = 0; stopped[msgr->my_rank] = true; num_local--; + msgr->my_rank = -1; + + assert(msgr->nref.test() > 1); + msgr->put(); wait_cond.Signal(); @@ -639,14 +646,14 @@ void Rank::EntityMessenger::dispatch_entry() // deregister rank.unregister_entity(this); + put(); } void Rank::EntityMessenger::ready() { dout(10) << "ready " << get_myaddr() << dendl; assert(!dispatch_thread.is_started()); - - // start my dispatch thread + get(); dispatch_thread.create(); } @@ -666,7 +673,6 @@ int Rank::EntityMessenger::shutdown() cond.Signal(); lock.Unlock(); } - return 0; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 27374355dc91d..b9a107c8ab1ba 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -326,10 +326,14 @@ private: my_rank(r), need_addr(false), dispatch_thread(this) { } - ~EntityMessenger() { + ~EntityMessenger() { } + + void destroy() { // join dispatch thread if (dispatch_thread.is_started()) dispatch_thread.join(); + + Messenger::destroy(); } void ready(); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 54fa4b7a15564..07a1289b0f0be 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -313,12 +313,12 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) : OSD::~OSD() { - if (threadpool) { delete threadpool; threadpool = 0; } - if (osdmap) { delete osdmap; osdmap = 0; } - //if (monitor) { delete monitor; monitor = 0; } - if (messenger) { delete messenger; messenger = 0; } - if (logger) { delete logger; logger = 0; } - if (store) { delete store; store = 0; } + delete threadpool; + delete osdmap; + delete logger; + delete store; + if (messenger) + messenger->destroy(); } bool got_sigterm = false; -- 2.39.5