From: Haomai Wang Date: Fri, 6 May 2016 16:37:39 +0000 (+0800) Subject: AsyncConnection: fix delay state using dispatch_queue X-Git-Tag: v11.0.0~138^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e5109d578b9c2d38656da390022232f427aeb9ed;p=ceph.git AsyncConnection: fix delay state using dispatch_queue Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index e67d5b52e7e1..4c568311ed63 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -146,7 +146,7 @@ void AsyncConnection::maybe_start_delay_thread() if (!delay_state && async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) { ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl; - delay_state = new DelayedDelivery(async_msgr, center); + delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id); } } @@ -2398,24 +2398,28 @@ void AsyncConnection::DelayedDelivery::do_request(int id) delay_queue.pop_front(); } if (msgr->ms_can_fast_dispatch(m)) { - msgr->ms_fast_dispatch(m); + dispatch_queue->fast_dispatch(m); } else { - msgr->ms_deliver_dispatch(m); + dispatch_queue->enqueue(m, m->get_priority(), conn_id); } } class C_flush_messages : public EventCallback { std::deque > delay_queue; AsyncMessenger *msgr; + DispatchQueue *dispatch_queue; + uint64_t conn_id; public: - C_flush_messages(std::deque > &&q, AsyncMessenger *m): delay_queue(std::move(q)), msgr(m) {} + C_flush_messages(std::deque > &&q, AsyncMessenger *m, + DispatchQueue *dq, uint64_t cid) + : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {} void do_request(int id) { while (!delay_queue.empty()) { Message *m = delay_queue.front().second; if (msgr->ms_can_fast_dispatch(m)) { - msgr->ms_fast_dispatch(m); + dispatch_queue->fast_dispatch(m); } else { - msgr->ms_deliver_dispatch(m); + dispatch_queue->enqueue(m, m->get_priority(), conn_id); } delay_queue.pop_front(); } @@ -2425,7 +2429,8 @@ class C_flush_messages : public EventCallback { void AsyncConnection::DelayedDelivery::flush() { Mutex::Locker l(delay_lock); - center->dispatch_event_external(new C_flush_messages(std::move(delay_queue), msgr)); + center->dispatch_event_external( + new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id)); for (auto i : register_time_events) center->delete_time_event(i); register_time_events.clear(); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 2550283a1459..170f10875784 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -140,12 +140,13 @@ class AsyncConnection : public Connection { AsyncMessenger *msgr; EventCenter *center; DispatchQueue *dispatch_queue; + uint64_t conn_id; public: explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, - DispatchQueue *q) + DispatchQueue *q, uint64_t cid) : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), - msgr(omsgr), center(c), dispatch_queue(q) { } + msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { } ~DelayedDelivery() { assert(register_time_events.empty()); assert(delay_queue.empty());