From 73a8562acf585706e9f069f35c069bbee1b28daa Mon Sep 17 00:00:00 2001 From: Jianpeng Ma Date: Fri, 27 Oct 2017 19:11:36 +0800 Subject: [PATCH] msg/async: remove useless pararmeter in DelayedDelivery. Now DelayedDelivery is a timer rather than a thread. So there is no case be called ahead. Signed-off-by: Jianpeng Ma --- src/msg/async/AsyncConnection.cc | 20 +++++--------------- src/msg/async/AsyncConnection.h | 8 ++++---- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index cf2f7517b45..a2d4fc15bc7 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -745,15 +745,13 @@ void AsyncConnection::process() auto fast_dispatch_time = ceph::mono_clock::now(); logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time); if (delay_state) { - utime_t release = message->get_recv_stamp(); double delay_period = 0; if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; - release += delay_period; - ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " - << message << " " << *message << dendl; + ldout(async_msgr->cct, 1) << "queue_received will delay after " << (ceph_clock_now() + delay_period) + << " on " << message << " " << *message << dendl; } - delay_state->queue(delay_period, release, message); + delay_state->queue(delay_period, message); } else if (async_msgr->ms_can_fast_dispatch(message)) { lock.unlock(); dispatch_queue->fast_dispatch(message); @@ -2288,15 +2286,7 @@ void AsyncConnection::DelayedDelivery::do_request(uint64_t id) return ; if (delay_queue.empty()) return ; - utime_t release = delay_queue.front().first; - m = delay_queue.front().second; - string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type; - utime_t now = ceph_clock_now(); - if ((release > now && - (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { - utime_t t = release - now; - t.sleep(); - } + m = delay_queue.front(); delay_queue.pop_front(); } if (msgr->ms_can_fast_dispatch(m)) { @@ -2312,7 +2302,7 @@ void AsyncConnection::DelayedDelivery::flush() { center->get_id(), [this] () mutable { std::lock_guard l(delay_lock); while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; + Message *m = delay_queue.front(); if (msgr->ms_can_fast_dispatch(m)) { dispatch_queue->fast_dispatch(m); } else { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 89606b005f4..4ad5ddbed11 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -139,7 +139,7 @@ class AsyncConnection : public Connection { */ class DelayedDelivery : public EventCallback { std::set register_time_events; // need to delete it if stop - std::deque > delay_queue; + std::deque delay_queue; std::mutex delay_lock; AsyncMessenger *msgr; EventCenter *center; @@ -158,9 +158,9 @@ class AsyncConnection : public Connection { } void set_center(EventCenter *c) { center = c; } void do_request(uint64_t id) override; - void queue(double delay_period, utime_t release, Message *m) { + void queue(double delay_period, Message *m) { std::lock_guard l(delay_lock); - delay_queue.push_back(std::make_pair(release, m)); + delay_queue.push_back(m); register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { @@ -168,7 +168,7 @@ class AsyncConnection : public Connection { center->submit_to(center->get_id(), [this] () mutable { std::lock_guard l(delay_lock); while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; + Message *m = delay_queue.front(); dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); m->put(); delay_queue.pop_front(); -- 2.39.5