]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr/async: fix forward declaration of DispatchQueue
authorRicardo Dias <rdias@suse.com>
Wed, 27 Jun 2018 10:58:36 +0000 (11:58 +0100)
committerRicardo Dias <rdias@suse.com>
Wed, 19 Sep 2018 12:40:23 +0000 (13:40 +0100)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 03bd542d4a43a702518c60353c8f5150a836bbfc..e631ff1033f037ff3e9c565c18ca58f67390eda2 100644 (file)
@@ -2392,6 +2392,26 @@ void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
   }
 }
 
+void AsyncConnection::DelayedDelivery::discard() {
+  stop_dispatch = true;
+  center->submit_to(center->get_id(),
+                    [this]() mutable {
+                      std::lock_guard<std::mutex> l(delay_lock);
+                      while (!delay_queue.empty()) {
+                        Message *m = delay_queue.front();
+                        dispatch_queue->dispatch_throttle_release(
+                            m->get_dispatch_throttle_size());
+                        m->put();
+                        delay_queue.pop_front();
+                      }
+                      for (auto i : register_time_events)
+                        center->delete_time_event(i);
+                      register_time_events.clear();
+                      stop_dispatch = false;
+                    },
+                    true);
+}
+
 void AsyncConnection::DelayedDelivery::flush() {
   stop_dispatch = true;
   center->submit_to(
@@ -2546,6 +2566,14 @@ void AsyncConnection::handle_write()
   lock.unlock();
 }
 
+void AsyncConnection::stop(bool queue_reset) {
+  lock.lock();
+  bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
+  _stop();
+  lock.unlock();
+  if (need_queue_reset) dispatch_queue->queue_reset(this);
+}
+
 void AsyncConnection::wakeup_from(uint64_t id)
 {
   lock.lock();
index a7396f5895b451b1ba7143e67d3cdd5b0a523484..29db0a412f319a117c224ef4f21b711d5f0bfdde 100644 (file)
@@ -35,6 +35,7 @@
 #include "Stack.h"
 
 class AsyncMessenger;
+class DispatchQueue;
 class Worker;
 
 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
@@ -162,22 +163,7 @@ class AsyncConnection : public Connection {
       delay_queue.push_back(m);
       register_time_events.insert(center->create_time_event(delay_period*1000000, this));
     }
-    void discard() {
-      stop_dispatch = true;
-      center->submit_to(center->get_id(), [this] () mutable {
-        std::lock_guard<std::mutex> l(delay_lock);
-        while (!delay_queue.empty()) {
-          Message *m = delay_queue.front();
-          dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
-          m->put();
-          delay_queue.pop_front();
-        }
-        for (auto i : register_time_events)
-          center->delete_time_event(i);
-        register_time_events.clear();
-        stop_dispatch = false;
-      }, true);
-    }
+    void discard();
     bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
     void flush();
   } *delay_state;
@@ -384,14 +370,7 @@ class AsyncConnection : public Connection {
   void wakeup_from(uint64_t id);
   void tick(uint64_t id);
   void local_deliver();
-  void stop(bool queue_reset) {
-    lock.lock();
-    bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
-    _stop();
-    lock.unlock();
-    if (need_queue_reset)
-      dispatch_queue->queue_reset(this);
-  }
+  void stop(bool queue_reset);
   void cleanup() {
     shutdown_socket();
     delete read_handler;