]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: fix delay state using dispatch_queue
authorHaomai Wang <haomai@xsky.com>
Fri, 6 May 2016 16:37:39 +0000 (00:37 +0800)
committerHaomai Wang <haomai@xsky.com>
Mon, 9 May 2016 01:37:24 +0000 (09:37 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index e67d5b52e7e12ce466484016b324ce0bb01181fa..4c568311ed63b3d050eae905f105312e2ea845d8 100644 (file)
@@ -146,7 +146,7 @@ void AsyncConnection::maybe_start_delay_thread()
   if (!delay_state &&
       async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) {
     ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
-    delay_state = new DelayedDelivery(async_msgr, center);
+    delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id);
   }
 }
 
@@ -2398,24 +2398,28 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
     delay_queue.pop_front();
   }
   if (msgr->ms_can_fast_dispatch(m)) {
-    msgr->ms_fast_dispatch(m);
+    dispatch_queue->fast_dispatch(m);
   } else {
-    msgr->ms_deliver_dispatch(m);
+    dispatch_queue->enqueue(m, m->get_priority(), conn_id);
   }
 }
 
 class C_flush_messages : public EventCallback {
   std::deque<std::pair<utime_t, Message*> > delay_queue;
   AsyncMessenger *msgr;
+  DispatchQueue *dispatch_queue;
+  uint64_t conn_id;
  public:
-  C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m): delay_queue(std::move(q)), msgr(m) {}
+  C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m,
+                   DispatchQueue *dq, uint64_t cid)
+      : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {}
   void do_request(int id) {
     while (!delay_queue.empty()) {
       Message *m = delay_queue.front().second;
       if (msgr->ms_can_fast_dispatch(m)) {
-        msgr->ms_fast_dispatch(m);
+        dispatch_queue->fast_dispatch(m);
       } else {
-        msgr->ms_deliver_dispatch(m);
+        dispatch_queue->enqueue(m, m->get_priority(), conn_id);
       }
       delay_queue.pop_front();
     }
@@ -2425,7 +2429,8 @@ class C_flush_messages : public EventCallback {
 
 void AsyncConnection::DelayedDelivery::flush() {
   Mutex::Locker l(delay_lock);
-  center->dispatch_event_external(new C_flush_messages(std::move(delay_queue), msgr));
+  center->dispatch_event_external(
+      new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id));
   for (auto i : register_time_events)
     center->delete_time_event(i);
   register_time_events.clear();
index 2550283a1459b49b787d872fdf609d8078d62a3f..170f108757846b8af7b7b3d93dce39243349c17c 100644 (file)
@@ -140,12 +140,13 @@ class AsyncConnection : public Connection {
     AsyncMessenger *msgr;
     EventCenter *center;
     DispatchQueue *dispatch_queue;
+    uint64_t conn_id;
 
    public:
     explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
-                             DispatchQueue *q)
+                             DispatchQueue *q, uint64_t cid)
       : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
-        msgr(omsgr), center(c), dispatch_queue(q) { }
+        msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { }
     ~DelayedDelivery() {
       assert(register_time_events.empty());
       assert(delay_queue.empty());