From fb480fd0adf0e961c9a006e1784aa9d4f766fd65 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 17:55:04 +0800 Subject: [PATCH] AsyncConnection: make delete_time_event async Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 29 ++++++++--------------------- src/msg/async/AsyncConnection.h | 22 ++++++++++++---------- 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 2e9eb39f05ec1..4f8c0e3157a12 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2447,16 +2447,10 @@ void AsyncConnection::DelayedDelivery::do_request(int id) } } -class C_flush_messages : public EventCallback { - std::deque > delay_queue; - AsyncMessenger *msgr; - DispatchQueue *dispatch_queue; - uint64_t conn_id; - public: - C_flush_messages(std::deque > &&q, AsyncMessenger *m, - DispatchQueue *dq, uint64_t cid) - : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {} - void do_request(int id) { +void AsyncConnection::DelayedDelivery::flush() { + EventCenter::submit_to( + center->get_id(), [this] () mutable { + Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; if (msgr->ms_can_fast_dispatch(m)) { @@ -2466,17 +2460,10 @@ class C_flush_messages : public EventCallback { } delay_queue.pop_front(); } - delete this; - } -}; - -void AsyncConnection::DelayedDelivery::flush() { - Mutex::Locker l(delay_lock); - center->dispatch_event_external( - new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id)); - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + }, true); } void AsyncConnection::send_keepalive() diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index aac571a049762..1a458c873e18b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -160,16 +160,18 @@ class AsyncConnection : public Connection { register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { - Mutex::Locker l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; - dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - delay_queue.pop_front(); - } - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); + EventCenter::submit_to(center->get_id(), [this] () mutable { + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + }, true); } void flush(); } *delay_state; -- 2.39.5