]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: make delete_time_event async
authorHaomai Wang <haomai@xsky.com>
Wed, 25 May 2016 09:55:04 +0000 (17:55 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 29 Jun 2016 04:14:04 +0000 (12:14 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 2e9eb39f05ec1d5f8698ec40a78e4c86a90e7b4f..4f8c0e3157a1291c05e15c3be8833b455eaceb82 100644 (file)
@@ -2447,16 +2447,10 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
   }
 }
 
-class C_flush_messages : public EventCallback {
-  std::deque<std::pair<utime_t, Message*> > delay_queue;
-  AsyncMessenger *msgr;
-  DispatchQueue *dispatch_queue;
-  uint64_t conn_id;
- public:
-  C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m,
-                   DispatchQueue *dq, uint64_t cid)
-      : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {}
-  void do_request(int id) {
+void AsyncConnection::DelayedDelivery::flush() {
+  EventCenter::submit_to(
+      center->get_id(), [this] () mutable {
+    Mutex::Locker l(delay_lock);
     while (!delay_queue.empty()) {
       Message *m = delay_queue.front().second;
       if (msgr->ms_can_fast_dispatch(m)) {
@@ -2466,17 +2460,10 @@ class C_flush_messages : public EventCallback {
       }
       delay_queue.pop_front();
     }
-    delete this;
-  }
-};
-
-void AsyncConnection::DelayedDelivery::flush() {
-  Mutex::Locker l(delay_lock);
-  center->dispatch_event_external(
-      new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id));
-  for (auto i : register_time_events)
-    center->delete_time_event(i);
-  register_time_events.clear();
+    for (auto i : register_time_events)
+      center->delete_time_event(i);
+    register_time_events.clear();
+  }, true);
 }
 
 void AsyncConnection::send_keepalive()
index aac571a04976268b4c455f891b5ea6972f1c7eee..1a458c873e18bcb116872398581fd2cc32163e47 100644 (file)
@@ -160,16 +160,18 @@ class AsyncConnection : public Connection {
       register_time_events.insert(center->create_time_event(delay_period*1000000, this));
     }
     void discard() {
-      Mutex::Locker l(delay_lock);
-      while (!delay_queue.empty()) {
-        Message *m = delay_queue.front().second;
-        dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
-        m->put();
-        delay_queue.pop_front();
-      }
-      for (auto i : register_time_events)
-        center->delete_time_event(i);
-      register_time_events.clear();
+      EventCenter::submit_to(center->get_id(), [this] () mutable {
+        Mutex::Locker l(delay_lock);
+        while (!delay_queue.empty()) {
+          Message *m = delay_queue.front().second;
+          dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
+          m->put();
+          delay_queue.pop_front();
+        }
+        for (auto i : register_time_events)
+          center->delete_time_event(i);
+        register_time_events.clear();
+      }, true);
     }
     void flush();
   } *delay_state;