]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: reference count messenger
authorSage Weil <sage@newdream.net>
Thu, 20 Nov 2008 18:36:19 +0000 (10:36 -0800)
committerSage Weil <sage@newdream.net>
Thu, 20 Nov 2008 19:43:45 +0000 (11:43 -0800)
We want an explicit destroy() method, because the SimpleMessenger
needs to join the dispatch thread, and that can't happen just on
the last reference drop because that may happen in the dispatch
thread itself.

src/client/Client.cc
src/cmonctl.cc
src/mds/MDS.cc
src/mon/MonClient.cc
src/mon/Monitor.cc
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/osd/OSD.cc

index 3dc30a85cb248e7e897827503c540baefbefb6a7..96d73cc57ca478f5f8ef338157aace9ce43ac533 100644 (file)
@@ -158,8 +158,8 @@ Client::~Client()
   if (objecter) { delete objecter; objecter = 0; }
   if (osdmap) { delete osdmap; osdmap = 0; }
   if (mdsmap) { delete mdsmap; mdsmap = 0; }
-
-  if (messenger) { delete messenger; messenger = 0; }
+  if (messenger)
+    messenger->destroy();
 }
 
 
@@ -5043,11 +5043,11 @@ void Client::ms_handle_failure(Message *m, const entity_inst_t& inst)
     dout(0) << "ms_handle_failure " << *m << " to " << inst 
             << ", resending to mon" << mon 
             << dendl;
-    messenger->send_message(m, monmap->get_inst(mon));
+    Message *n = decode_message(m->get_header(), m->get_footer(), m->get_payload(), m->get_data());
+    messenger->send_message(n, monmap->get_inst(mon));
   }
   else {
     dout(0) << "ms_handle_failure " << *m << " to " << inst << ", dropping" << dendl;
-    delete m;
   }
 }
 
index e532e7d8584f6987114dc5b30c245b93f4c0f68a..f75a3704d4aadf4a317021e1e75ba2b0dcf36533 100644 (file)
@@ -230,7 +230,7 @@ int main(int argc, const char **argv, const char *envp[]) {
 
   // wait for messenger to finish
   rank.wait();
-  
+  messenger->destroy();
   return 0;
 }
 
index a6ce88f0ec58251b63b207d038b331fa64e3349e..d9957b4c6237e6ee844338495e5ee1903c72e66d 100644 (file)
@@ -142,11 +142,12 @@ MDS::~MDS() {
 
   if (filer) { delete filer; filer = 0; }
   if (objecter) { delete objecter; objecter = 0; }
-  if (messenger) { delete messenger; messenger = NULL; }
 
   if (logger) { delete logger; logger = 0; }
   if (logger2) { delete logger2; logger2 = 0; }
-
+  
+  if (messenger)
+    messenger->destroy();
 }
 
 
@@ -1295,8 +1296,6 @@ void MDS::ms_handle_failure(Message *m, const entity_inst_t& inst)
 {
   mds_lock.Lock();
   dout(0) << "ms_handle_failure to " << inst << " on " << *m << dendl;
-  
-  delete m;
   mds_lock.Unlock();
 }
 
index 55275d65d6bdb4c05b79c4c5a8e16582f329f2e0..d044357752efc6f6950d0f78b3c235b42e2cbe80 100644 (file)
@@ -50,7 +50,7 @@ int MonClient::probe_mon(MonMap *pmonmap)
     dout(2) << "get_monmap got monmap epoch " << pmonmap->epoch << " fsid " << pmonmap->fsid << dendl;
   }
   msgr->shutdown();
-  //delete msgr;  // FIXME: we need proper reference counting in messenger
+  msgr->destroy();
   rank.wait();
 
   if (monmap_bl.length())
index a18d286d48abd621e1e18c2553aadbeabaa9f451..abbfa5d693a0b2e28504f788e5be5c62d02911c2 100644 (file)
@@ -90,7 +90,8 @@ Monitor::~Monitor()
   delete mdsmon;
   delete clientmon;
   delete pgmon;
-  delete messenger;
+  if (messenger)
+    messenger->destroy();
 }
 
 void Monitor::init()
index e5505fc024aba51042655e1f7310503374bac407..817a9ed3e3d60f02b21ad0c77bc9b865fc3ae41a 100644 (file)
@@ -39,11 +39,28 @@ protected:
   entity_inst_t _myinst;
   int default_send_priority;
 
+  atomic_t nref;
+
  public:
-  Messenger(entity_name_t w) : dispatcher(0), default_send_priority(CEPH_MSG_PRIO_DEFAULT) {
+  Messenger(entity_name_t w) : dispatcher(0),
+                              default_send_priority(CEPH_MSG_PRIO_DEFAULT),
+                              nref(1) {
     _myinst.name = w;
   }
-  virtual ~Messenger() { }
+  virtual ~Messenger() {
+    assert(nref.test() == 0);
+  }
+
+  void get() {
+    nref.inc();
+  }
+  void put() {
+    if (nref.dec() == 0)
+      delete this;
+  }
+  virtual void destroy() {
+    put();
+  }
   
   // accessors
   entity_name_t get_myname() { return _myinst.name; }
index 7c48bb61b2d8be087d7710d77a87f8a29ad19966..76cf703385ab0b9a37ba077625245f6f60436c59 100644 (file)
@@ -403,6 +403,7 @@ Rank::EntityMessenger *Rank::register_entity(entity_name_t name)
   local.resize(max_local);
   stopped.resize(max_local);
 
+  msgr->get();
   local[erank] = msgr;
   stopped[erank] = false;
   msgr->_myinst.addr = rank_addr;
@@ -427,9 +428,15 @@ void Rank::unregister_entity(EntityMessenger *msgr)
   dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
   
   // remove from local directory.
+  assert(msgr->my_rank >= 0);
+  assert(local[msgr->my_rank] == msgr);
   local[msgr->my_rank] = 0;
   stopped[msgr->my_rank] = true;
   num_local--;
+  msgr->my_rank = -1;
+
+  assert(msgr->nref.test() > 1);
+  msgr->put();
 
   wait_cond.Signal();
 
@@ -639,14 +646,14 @@ void Rank::EntityMessenger::dispatch_entry()
 
   // deregister
   rank.unregister_entity(this);
+  put();
 }
 
 void Rank::EntityMessenger::ready()
 {
   dout(10) << "ready " << get_myaddr() << dendl;
   assert(!dispatch_thread.is_started());
-  
-  // start my dispatch thread
+  get();
   dispatch_thread.create();
 }
 
@@ -666,7 +673,6 @@ int Rank::EntityMessenger::shutdown()
     cond.Signal();
     lock.Unlock();
   }
-
   return 0;
 }
 
index 27374355dc91d1e743ebc4f4fc2e23c92b132eb6..b9a107c8ab1ba2f4329886b94073e53e39c65938 100644 (file)
@@ -326,10 +326,14 @@ private:
       my_rank(r),
       need_addr(false),
       dispatch_thread(this) { }
-    ~EntityMessenger() {
+    ~EntityMessenger() { }
+
+    void destroy() {
       // join dispatch thread
       if (dispatch_thread.is_started())
        dispatch_thread.join();
+
+      Messenger::destroy();
     }
 
     void ready();
index 54fa4b7a1556469c4b100f196dc2dd6164a6800d..07a1289b0f0be77e2780596d4eaa0c9a918a90bb 100644 (file)
@@ -313,12 +313,12 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) :
 
 OSD::~OSD()
 {
-  if (threadpool) { delete threadpool; threadpool = 0; }
-  if (osdmap) { delete osdmap; osdmap = 0; }
-  //if (monitor) { delete monitor; monitor = 0; }
-  if (messenger) { delete messenger; messenger = 0; }
-  if (logger) { delete logger; logger = 0; }
-  if (store) { delete store; store = 0; }
+  delete threadpool;
+  delete osdmap;
+  delete logger;
+  delete store;
+  if (messenger) 
+    messenger->destroy();
 }
 
 bool got_sigterm = false;