From: Ricardo Dias Date: Wed, 27 Jun 2018 10:58:36 +0000 (+0100) Subject: msgr/async: fix forward declaration of DispatchQueue X-Git-Tag: v14.0.1~230^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fed335ff473fa49a21fc69889008425c207460ed;p=ceph.git msgr/async: fix forward declaration of DispatchQueue Signed-off-by: Ricardo Dias --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 03bd542d4a43..e631ff1033f0 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2392,6 +2392,26 @@ void AsyncConnection::DelayedDelivery::do_request(uint64_t id) } } +void AsyncConnection::DelayedDelivery::discard() { + stop_dispatch = true; + center->submit_to(center->get_id(), + [this]() mutable { + std::lock_guard l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front(); + dispatch_queue->dispatch_throttle_release( + m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + stop_dispatch = false; + }, + true); +} + void AsyncConnection::DelayedDelivery::flush() { stop_dispatch = true; center->submit_to( @@ -2546,6 +2566,14 @@ void AsyncConnection::handle_write() lock.unlock(); } +void AsyncConnection::stop(bool queue_reset) { + lock.lock(); + bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; + _stop(); + lock.unlock(); + if (need_queue_reset) dispatch_queue->queue_reset(this); +} + void AsyncConnection::wakeup_from(uint64_t id) { lock.lock(); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index a7396f5895b4..29db0a412f31 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -35,6 +35,7 @@ #include "Stack.h" class AsyncMessenger; +class DispatchQueue; class Worker; static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); @@ -162,22 +163,7 @@ class AsyncConnection : public Connection { delay_queue.push_back(m); register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } - void discard() { - stop_dispatch = true; - center->submit_to(center->get_id(), [this] () mutable { - std::lock_guard l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front(); - dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - delay_queue.pop_front(); - } - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); - stop_dispatch = false; - }, true); - } + void discard(); bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } void flush(); } *delay_state; @@ -384,14 +370,7 @@ class AsyncConnection : public Connection { void wakeup_from(uint64_t id); void tick(uint64_t id); void local_deliver(); - void stop(bool queue_reset) { - lock.lock(); - bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; - _stop(); - lock.unlock(); - if (need_queue_reset) - dispatch_queue->queue_reset(this); - } + void stop(bool queue_reset); void cleanup() { shutdown_socket(); delete read_handler;