From: Haomai Wang Date: Fri, 15 Apr 2016 03:43:42 +0000 (+0800) Subject: AsyncConnection: make delay message happen within original thread X-Git-Tag: v11.0.0~656^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=83f7db58aad2509e1a8742e862d4e8bbfd85c37c;p=ceph.git AsyncConnection: make delay message happen within original thread Fixes: http://tracker.ceph.com/issues/15503 Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 07204783a13..0d7ca05d5d2 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -176,7 +176,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) } AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p) - : Connection(cct, m), delay_thread(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0), + : Connection(cct, m), delay_state(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), @@ -207,17 +207,15 @@ AsyncConnection::~AsyncConnection() delete[] recv_buf; if (state_buffer) delete[] state_buffer; - if (delay_thread) - delete delay_thread; + assert(!delay_state); } void AsyncConnection::maybe_start_delay_thread() { - if (!delay_thread && + if (!delay_state && async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) { - lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Connection " << this << dendl; - delay_thread = new DelayedDelivery(this, async_msgr); - delay_thread->create("ms_async_delay"); + ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl; + delay_state = new DelayedDelivery(async_msgr, center); } } @@ -932,15 +930,20 @@ void AsyncConnection::process() state = STATE_OPEN; + logger->inc(l_msgr_recv_messages); + logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); + async_msgr->ms_fast_preprocess(message); - if (delay_thread) { - utime_t release; + if (delay_state) { + utime_t release = message->get_recv_stamp(); + double delay_period = 0; if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { - release = message->get_recv_stamp(); - release += async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; - ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " << message << " " << *message << dendl; + delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + release += delay_period; + ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " + << message << " " << *message << dendl; } - delay_thread->queue(release, message); + delay_state->queue(delay_period, release, message); } else if (async_msgr->ms_can_fast_dispatch(message)) { lock.Unlock(); async_msgr->ms_fast_dispatch(message); @@ -948,9 +951,6 @@ void AsyncConnection::process() } else { center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message))); } - logger->inc(l_msgr_recv_messages); - logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); - break; } @@ -1826,13 +1826,6 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; inject_delay(); - if (existing->delay_thread) { - existing->delay_thread->steal_for_pipe(this); - delay_thread = existing->delay_thread; - existing->delay_thread = NULL; - delay_thread->flush(); - } - if (existing->policy.lossy) { // disconnect from the Connection existing->center->dispatch_event_external(existing->reset_handler); @@ -1862,6 +1855,10 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // Clean up output buffer existing->outcoming_bl.clear(); + if (existing->delay_state) { + existing->delay_state->flush(); + assert(!delay_state); + } existing->requeue_sent(); swap(existing->sd, sd); @@ -2162,8 +2159,6 @@ void AsyncConnection::fault() ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl; center->dispatch_event_external(reset_handler); _stop(); - if (delay_thread) - delay_thread->discard(); return ; } @@ -2178,8 +2173,8 @@ void AsyncConnection::fault() open_write = false; // queue delayed items immediately - if (delay_thread) - delay_thread->flush(); + if (delay_state) + delay_state->flush(); // requeue sent items requeue_sent(); recv_start = recv_end = 0; @@ -2239,8 +2234,8 @@ void AsyncConnection::was_session_reset() ldout(async_msgr->cct,10) << __func__ << " started" << dendl; assert(lock.is_locked()); Mutex::Locker l(write_lock); - if (delay_thread) - delay_thread->discard(); + if (delay_state) + delay_state->discard(); discard_out_queue(); center->dispatch_event_external(remote_reset_handler); @@ -2260,17 +2255,12 @@ void AsyncConnection::was_session_reset() void AsyncConnection::_stop() { assert(lock.is_locked()); - if (delay_thread && delay_thread->is_started()) { - ldout(msgr->cct, 20) << "joining delay_thread" << dendl; - if (delay_thread->is_flushing()) { - delay_thread->wait_for_flush(); - } - delay_thread->stop(); - delay_thread->join(); - } if (state == STATE_CLOSED) return ; + if (delay_state) + delay_state->flush(); + ldout(async_msgr->cct, 1) << __func__ << dendl; Mutex::Locker l(write_lock); if (sd >= 0) @@ -2433,75 +2423,57 @@ void AsyncConnection::handle_ack(uint64_t seq) } } -void AsyncConnection::DelayedDelivery::discard() +void AsyncConnection::DelayedDelivery::do_request(int id) { - Mutex::Locker l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; - // TODO: what to use here? - //pipe->async_msgr->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); + Message *m = nullptr; + { + Mutex::Locker l(delay_lock); + register_time_events.erase(id); + if (delay_queue.empty()) + return ; + utime_t release = delay_queue.front().first; + m = delay_queue.front().second; + string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type; + utime_t now = ceph_clock_now(msgr->cct); + if ((release > now && + (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { + utime_t t = release - now; + t.sleep(); + } delay_queue.pop_front(); } + if (msgr->ms_can_fast_dispatch(m)) { + msgr->ms_fast_dispatch(m); + } else { + msgr->ms_deliver_dispatch(m); + } } -void AsyncConnection::DelayedDelivery::flush() -{ - Mutex::Locker l(delay_lock); - flush_count = delay_queue.size(); - delay_cond.Signal(); -} - -void *AsyncConnection::DelayedDelivery::entry() -{ - Mutex::Locker locker(delay_lock); - - while (!stop_delayed_delivery) { - if (delay_queue.empty()) { - delay_cond.Wait(delay_lock); - continue; - } - utime_t release = delay_queue.front().first; - Message *m = delay_queue.front().second; - string delay_msg_type = connection->async_msgr->cct->_conf->ms_inject_delay_msg_type; - if (!flush_count && - (release > ceph_clock_now(connection->async_msgr->cct) && - (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { - delay_cond.WaitUntil(delay_lock, release); - continue; - } - delay_queue.pop_front(); - if (flush_count > 0) { - --flush_count; - active_flush = true; - } - if (connection->async_msgr->ms_can_fast_dispatch(m)) { - if (!stop_fast_dispatching_flag) { - delay_dispatching = true; - delay_lock.Unlock(); - connection->async_msgr->ms_fast_dispatch(m); - delay_lock.Lock(); - delay_dispatching = false; - if (stop_fast_dispatching_flag) { - // we need to let the stopping thread proceed - delay_cond.Signal(); - delay_lock.Unlock(); - delay_lock.Lock(); - } +class C_flush_messages : public EventCallback { + std::deque > delay_queue; + AsyncMessenger *msgr; + public: + C_flush_messages(std::deque > &&q, AsyncMessenger *m): delay_queue(std::move(q)), msgr(m) {} + void do_request(int id) { + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + if (msgr->ms_can_fast_dispatch(m)) { + msgr->ms_fast_dispatch(m); + } else { + msgr->ms_deliver_dispatch(m); } - } else { - connection->center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(connection->async_msgr, m))); + delay_queue.pop_front(); } - active_flush = false; + delete this; } - return NULL; -} +}; -void AsyncConnection::DelayedDelivery::stop_fast_dispatching() { +void AsyncConnection::DelayedDelivery::flush() { Mutex::Locker l(delay_lock); - stop_fast_dispatching_flag = true; - while (delay_dispatching) - delay_cond.Wait(delay_lock); + center->dispatch_event_external(new C_flush_messages(std::move(delay_queue), msgr)); + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); } void AsyncConnection::send_keepalive() diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index c141c7b597a..23577db36b4 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -126,72 +126,47 @@ class AsyncConnection : public Connection { assert(write_lock.is_locked()); return !out_q.empty(); } - + /** * The DelayedDelivery is for injecting delays into Message delivery off * the socket. It is only enabled if delays are requested, and if they * are then it pulls Messages off the DelayQueue and puts them into the * AsyncMessenger event queue. - * This is a nearly direct copy & paste from SimpleMessenger, and as - * such, there was a problem during AsyncConnection shutdown, fixed by - * checking whether delay_thread isn't already stopped and refraining - * from stopping it again. */ - class DelayedDelivery: public Thread { - AsyncConnection *connection; - std::deque< pair > delay_queue; + class DelayedDelivery : public EventCallback { + std::set register_time_events; // need to delete it if stop + std::deque > delay_queue; Mutex delay_lock; - Cond delay_cond; - int flush_count; AsyncMessenger *msgr; - bool active_flush; - bool stop_delayed_delivery; - bool delay_dispatching; // we are in fast dispatch now - bool stop_fast_dispatching_flag; // we need to stop fast dispatching + EventCenter *center; - public: - explicit DelayedDelivery(AsyncConnection *p, AsyncMessenger *omsgr) - : connection(p), - delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), flush_count(0), - msgr(omsgr), - active_flush(false), - stop_delayed_delivery(false), - delay_dispatching(false), - stop_fast_dispatching_flag(false) { } - ~DelayedDelivery() { discard(); } - void *entry(); - void queue(utime_t release, Message *m) { - Mutex::Locker l(delay_lock); - delay_queue.push_back(make_pair(release, m)); - delay_cond.Signal(); + public: + explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c) + : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), + msgr(omsgr), center(c) { } + ~DelayedDelivery() { + assert(register_time_events.empty()); + assert(delay_queue.empty()); } - void discard(); - void flush(); - bool is_flushing() { + void do_request(int id) override; + void queue(double delay_period, utime_t release, Message *m) { Mutex::Locker l(delay_lock); - return flush_count > 0 || active_flush; + delay_queue.push_back(std::make_pair(release, m)); + register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } - void wait_for_flush() { + void discard() { Mutex::Locker l(delay_lock); - while (flush_count > 0 || active_flush) - delay_cond.Wait(delay_lock); - } - void stop() { - delay_lock.Lock(); - stop_delayed_delivery = true; - delay_cond.Signal(); - delay_lock.Unlock(); - } - void steal_for_pipe(AsyncConnection *new_owner) { - Mutex::Locker l(delay_lock); - connection = new_owner; + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + m->put(); + delay_queue.pop_front(); + } + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); } - /** - * We need to stop fast dispatching before we need to stop putting - * normal messages into the DispatchQueue. - */ - void stop_fast_dispatching(); - } *delay_thread; + void flush(); + } *delay_state; public: AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p); @@ -401,6 +376,10 @@ class AsyncConnection : public Connection { delete connect_handler; delete local_deliver_handler; delete wakeup_handler; + if (delay_state) { + delete delay_state; + delay_state = NULL; + } } PerfCounters *get_perf_counter() { return logger;