]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/simple: wait dispatch_queue until all pipes closed 9930/head
authorHaomai Wang <haomai@xsky.com>
Sat, 25 Jun 2016 05:25:51 +0000 (13:25 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 17 Aug 2016 08:59:36 +0000 (16:59 +0800)
Now we use dispatch_queue.wait to wait for SimpleMessenger shutdown,
but we need to ensure DispatchQueue can process event after Accepter down,
Otherwise accepter may continue to accept new connection which may queue
new item. so we can't rely on DispatchQueue now.

Introduce stop_cond and stop flag to indicate this function like
AsyncMessenger did

Fixes: http://tracker.ceph.com/issues/16472
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/simple/SimpleMessenger.cc
src/msg/simple/SimpleMessenger.h

index 604b3967295ddfde06c0f9c2267385347074a141..4ef2fe1392335eeac4ff806f596aed0e3b7f8348 100644 (file)
@@ -87,10 +87,15 @@ int SimpleMessenger::shutdown()
 {
   ldout(cct,10) << "shutdown " << get_myaddr() << dendl;
   mark_down_all();
-  dispatch_queue.shutdown();
 
   // break ref cycles on the loopback connection
   local_connection->set_priv(NULL);
+
+  lock.Lock();
+  stop_cond.Signal();
+  stopped = true;
+  lock.Unlock();
+
   return 0;
 }
 
@@ -312,6 +317,7 @@ int SimpleMessenger::start()
 
   assert(!started);
   started = true;
+  stopped = false;
 
   if (!did_bind) {
     my_inst.addr.nonce = nonce;
@@ -525,14 +531,10 @@ void SimpleMessenger::wait()
     lock.Unlock();
     return;
   }
-  lock.Unlock();
+  if (!stopped)
+    stop_cond.Wait(lock);
 
-  if (dispatch_queue.is_started()) {
-    ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
-    dispatch_queue.wait();
-    dispatch_queue.discard_local();
-    ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
-  }
+  lock.Unlock();
 
   // done!  clean up.
   if (did_bind) {
@@ -542,6 +544,14 @@ void SimpleMessenger::wait()
     ldout(cct,20) << "wait: stopped accepter thread" << dendl;
   }
 
+  dispatch_queue.shutdown();
+  if (dispatch_queue.is_started()) {
+    ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
+    dispatch_queue.wait();
+    dispatch_queue.discard_local();
+    ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
+  }
+
   if (reaper_started) {
     ldout(cct,20) << "wait: stopping reaper thread" << dendl;
     lock.Lock();
index 3dbdad0f8c635f0a7bbf1e57da5ca2119c8cdced..2f4685bedc404451e839145cb9654825a7cc49d6 100644 (file)
@@ -305,6 +305,9 @@ private:
   /// internal cluster protocol version, if any, for talking to entities of the same type.
   int cluster_protocol;
 
+  Cond  stop_cond;
+  bool stopped = true;
+
   bool reaper_started, reaper_stop;
   Cond reaper_cond;