From: Haomai Wang Date: Thu, 26 May 2016 02:25:16 +0000 (+0800) Subject: AsyncConnection: ensure delay_state empty when building a new session X-Git-Tag: ses5-milestone5~574^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8256211e1afb636cd99b287f6fbdb94dba0cab55;p=ceph.git AsyncConnection: ensure delay_state empty when building a new session Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 4f8c0e3157a12..f002be603a163 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1303,6 +1303,8 @@ ssize_t AsyncConnection::_process_connection() session_security.reset(); } + if (delay_state) + assert(delay_state->ready()); dispatch_queue->queue_connect(this); async_msgr->ms_deliver_handle_fast_connect(this); @@ -1478,6 +1480,8 @@ ssize_t AsyncConnection::_process_connection() state = STATE_OPEN; memset(&connect_msg, 0, sizeof(connect_msg)); + if (delay_state) + assert(delay_state->ready()); // make sure no pending tick timer center->delete_time_event(last_tick_id); last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); @@ -2427,6 +2431,8 @@ void AsyncConnection::DelayedDelivery::do_request(int id) { Mutex::Locker l(delay_lock); register_time_events.erase(id); + if (stop_dispatch) + return ; if (delay_queue.empty()) return ; utime_t release = delay_queue.front().first; @@ -2448,6 +2454,7 @@ void AsyncConnection::DelayedDelivery::do_request(int id) } void AsyncConnection::DelayedDelivery::flush() { + stop_dispatch = true; EventCenter::submit_to( center->get_id(), [this] () mutable { Mutex::Locker l(delay_lock); @@ -2463,6 +2470,7 @@ void AsyncConnection::DelayedDelivery::flush() { for (auto i : register_time_events) center->delete_time_event(i); register_time_events.clear(); + stop_dispatch = false; }, true); } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 1a458c873e18b..8bef56c38a05b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -143,12 +143,14 @@ class AsyncConnection : public Connection { EventCenter *center; DispatchQueue *dispatch_queue; uint64_t conn_id; + std::atomic_bool stop_dispatch; public: explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, DispatchQueue *q, uint64_t cid) : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), - msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { } + msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), + stop_dispatch(false) { } ~DelayedDelivery() { assert(register_time_events.empty()); assert(delay_queue.empty()); @@ -160,6 +162,7 @@ class AsyncConnection : public Connection { register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { + stop_dispatch = true; EventCenter::submit_to(center->get_id(), [this] () mutable { Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { @@ -171,8 +174,10 @@ class AsyncConnection : public Connection { for (auto i : register_time_events) center->delete_time_event(i); register_time_events.clear(); + stop_dispatch = false; }, true); } + bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } void flush(); } *delay_state;