From f6c73e9f4cbd38a57445e35f56738f05a62767c0 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Mon, 27 Jun 2016 23:23:20 +0800 Subject: [PATCH] msg/async/AsyncMessenger: make sure all connections closed then shutdown dq 1. ensure stop accepter before shutdown dispatcherqueue 2. ensure we don't generate new item after dispatcher queue shutdown Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.h | 4 ++-- src/msg/async/AsyncMessenger.cc | 30 +++++++++++++----------------- src/msg/async/AsyncMessenger.h | 8 ++++++-- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 36c8f2d7de1..af1747951c0 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -367,9 +367,9 @@ class AsyncConnection : public Connection { void process(); void wakeup_from(uint64_t id); void local_deliver(); - void stop() { + void stop(bool queue_reset) { lock.Lock(); - bool need_queue_reset = (state != STATE_CLOSED); + bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; lock.Unlock(); mark_down(); if (need_queue_reset) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index c2c9d708177..50d48c3ecb0 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -577,16 +577,17 @@ int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; - // break ref cycles on the loopback connection - processor.stop(); mark_down_all(); - dispatch_queue.shutdown(); + // break ref cycles on the loopback connection local_connection->set_priv(NULL); - pool->barrier(); + // done! clean up. + processor.stop(); + did_bind = false; lock.Lock(); stop_cond.Signal(); - lock.Unlock(); stopped = true; + lock.Unlock(); + pool->barrier(); return 0; } @@ -659,15 +660,7 @@ void AsyncMessenger::wait() lock.Unlock(); - // done! clean up. - ldout(cct,20) << __func__ << ": stopping processor thread" << dendl; - processor.stop(); - did_bind = false; - ldout(cct,20) << __func__ << ": stopped processor thread" << dendl; - - // close all connections - mark_down_all(); - + dispatch_queue.shutdown(); if (dispatch_queue.is_started()) { ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl; dispatch_queue.wait(); @@ -675,6 +668,9 @@ void AsyncMessenger::wait() ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl; } + // close all connections + shutdown_connections(false); + ldout(cct, 10) << __func__ << ": done." << dendl; ldout(cct, 1) << __func__ << " complete." << dendl; started = false; @@ -815,7 +811,7 @@ int AsyncMessenger::send_keepalive(Connection *con) return 0; } -void AsyncMessenger::mark_down_all() +void AsyncMessenger::shutdown_connections(bool queue_reset) { ldout(cct,1) << __func__ << " " << dendl; lock.Lock(); @@ -823,7 +819,7 @@ void AsyncMessenger::mark_down_all() q != accepting_conns.end(); ++q) { AsyncConnectionRef p = *q; ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl; - p->stop(); + p->stop(queue_reset); } accepting_conns.clear(); @@ -833,7 +829,7 @@ void AsyncMessenger::mark_down_all() ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl; conns.erase(it); p->get_perf_counter()->dec(l_msgr_active_connections); - p->stop(); + p->stop(queue_reset); } { diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index e54d3cd139c..61f8f3b76d1 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -176,8 +176,10 @@ public: ConnectionRef get_connection(const entity_inst_t& dest) override; ConnectionRef get_loopback_connection() override; int send_keepalive(Connection *con); - void mark_down(const entity_addr_t& addr) override; - void mark_down_all() override; + virtual void mark_down(const entity_addr_t& addr) override; + virtual void mark_down_all() override { + shutdown_connections(true); + } /** @} // Connection Management */ /** @@ -348,6 +350,8 @@ private: ms_deliver_handle_fast_connect(local_connection.get()); } + void shutdown_connections(bool queue_reset); + public: /// con used for sending messages to ourselves -- 2.39.5