From: Haomai Wang Date: Sat, 25 Jun 2016 05:25:51 +0000 (+0800) Subject: msg/simple: wait dispatch_queue until all pipes closed X-Git-Tag: v11.0.1~362^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4ee47ab06228f7783877c2327ff70bcbc953c49a;p=ceph.git msg/simple: wait dispatch_queue until all pipes closed Now we use dispatch_queue.wait to wait for SimpleMessenger shutdown, but we need to ensure DispatchQueue can process event after Accepter down, Otherwise accepter may continue to accept new connection which may queue new item. so we can't rely on DispatchQueue now. Introduce stop_cond and stop flag to indicate this function like AsyncMessenger did Fixes: http://tracker.ceph.com/issues/16472 Signed-off-by: Haomai Wang --- diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc index 604b3967295d..4ef2fe139233 100644 --- a/src/msg/simple/SimpleMessenger.cc +++ b/src/msg/simple/SimpleMessenger.cc @@ -87,10 +87,15 @@ int SimpleMessenger::shutdown() { ldout(cct,10) << "shutdown " << get_myaddr() << dendl; mark_down_all(); - dispatch_queue.shutdown(); // break ref cycles on the loopback connection local_connection->set_priv(NULL); + + lock.Lock(); + stop_cond.Signal(); + stopped = true; + lock.Unlock(); + return 0; } @@ -312,6 +317,7 @@ int SimpleMessenger::start() assert(!started); started = true; + stopped = false; if (!did_bind) { my_inst.addr.nonce = nonce; @@ -525,14 +531,10 @@ void SimpleMessenger::wait() lock.Unlock(); return; } - lock.Unlock(); + if (!stopped) + stop_cond.Wait(lock); - if (dispatch_queue.is_started()) { - ldout(cct,10) << "wait: waiting for dispatch queue" << dendl; - dispatch_queue.wait(); - dispatch_queue.discard_local(); - ldout(cct,10) << "wait: dispatch queue is stopped" << dendl; - } + lock.Unlock(); // done! clean up. if (did_bind) { @@ -542,6 +544,14 @@ void SimpleMessenger::wait() ldout(cct,20) << "wait: stopped accepter thread" << dendl; } + dispatch_queue.shutdown(); + if (dispatch_queue.is_started()) { + ldout(cct,10) << "wait: waiting for dispatch queue" << dendl; + dispatch_queue.wait(); + dispatch_queue.discard_local(); + ldout(cct,10) << "wait: dispatch queue is stopped" << dendl; + } + if (reaper_started) { ldout(cct,20) << "wait: stopping reaper thread" << dendl; lock.Lock(); diff --git a/src/msg/simple/SimpleMessenger.h b/src/msg/simple/SimpleMessenger.h index 3dbdad0f8c63..2f4685bedc40 100644 --- a/src/msg/simple/SimpleMessenger.h +++ b/src/msg/simple/SimpleMessenger.h @@ -305,6 +305,9 @@ private: /// internal cluster protocol version, if any, for talking to entities of the same type. int cluster_protocol; + Cond stop_cond; + bool stopped = true; + bool reaper_started, reaper_stop; Cond reaper_cond;