From: Piotr Dałek Date: Tue, 5 Apr 2016 07:37:23 +0000 (+0200) Subject: msg/async: add missing DelayedDelivery and delay injection X-Git-Tag: v10.2.3~136^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9b199d0b19220bf4adb9b0754f493e7c1ad78a4e;p=ceph.git msg/async: add missing DelayedDelivery and delay injection Delay injection was missing from a few spots, also, DelayedDelivery was added. Fixes: http://tracker.ceph.com/issues/15372 Signed-off-by: Piotr Dałek (cherry picked from commit 49a0c9981bd4bf61b520ece8fb8adfdf7439185b) --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5e066372d2c..07204783a13 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -176,9 +176,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) } AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p) - : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), - out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1), - write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), + : Connection(cct, m), delay_thread(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0), + peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), + port(-1), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false), @@ -207,6 +207,18 @@ AsyncConnection::~AsyncConnection() delete[] recv_buf; if (state_buffer) delete[] state_buffer; + if (delay_thread) + delete delay_thread; +} + +void AsyncConnection::maybe_start_delay_thread() +{ + if (!delay_thread && + async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) { + lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Connection " << this << dendl; + delay_thread = new DelayedDelivery(this, async_msgr); + delay_thread->create("ms_async_delay"); + } } /* return -1 means `fd` occurs error or closed, it should be closed @@ -495,6 +507,16 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) return len - state_offset; } +void AsyncConnection::inject_delay() { + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + ldout(async_msgr->cct, 10) << __func__ << " sleep for " << + async_msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } +} + void AsyncConnection::process() { ssize_t r = 0; @@ -911,7 +933,15 @@ void AsyncConnection::process() state = STATE_OPEN; async_msgr->ms_fast_preprocess(message); - if (async_msgr->ms_can_fast_dispatch(message)) { + if (delay_thread) { + utime_t release; + if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + release = message->get_recv_stamp(); + release += async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " << message << " " << *message << dendl; + } + delay_thread->queue(release, message); + } else if (async_msgr->ms_can_fast_dispatch(message)) { lock.Unlock(); async_msgr->ms_fast_dispatch(message); lock.Lock(); @@ -1347,7 +1377,7 @@ ssize_t AsyncConnection::_process_connection() if (is_queued()) center->dispatch_event_external(write_handler); write_lock.Unlock(); - + maybe_start_delay_thread(); break; } @@ -1487,7 +1517,7 @@ ssize_t AsyncConnection::_process_connection() r = read_until(sizeof(newly_acked_seq), state_buffer); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl; - goto fail; + goto fail_registered; } else if (r > 0) { break; } @@ -1509,6 +1539,7 @@ ssize_t AsyncConnection::_process_connection() if (is_queued()) center->dispatch_event_external(write_handler); write_lock.Unlock(); + maybe_start_delay_thread(); break; } @@ -1521,6 +1552,10 @@ ssize_t AsyncConnection::_process_connection() return 0; +fail_registered: + ldout(async_msgr->cct, 10) << "accept fault after register" << dendl; + inject_delay(); + fail: return -1; } @@ -1655,13 +1690,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis lock.Unlock(); AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr); - if (async_msgr->cct->_conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << __func__ << " sleep for " - << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } + inject_delay(); lock.Lock(); if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { @@ -1796,12 +1825,12 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis replace: ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; - if (async_msgr->cct->_conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << __func__ << " sleep for " - << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); + inject_delay(); + if (existing->delay_thread) { + existing->delay_thread->steal_for_pipe(this); + delay_thread = existing->delay_thread; + existing->delay_thread = NULL; + delay_thread->flush(); } if (existing->policy.lossy) { @@ -1911,14 +1940,8 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // it's safe that here we don't acquire Connection's lock r = async_msgr->accept_conn(this); - if (async_msgr->cct->_conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << __func__ << " sleep for " - << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - + inject_delay(); + lock.Lock(); replacing = false; if (r < 0) { @@ -1953,15 +1976,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis fail_registered: ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl; - - if (async_msgr->cct->_conf->ms_inject_internal_delays) { - ldout(async_msgr->cct, 10) << __func__ << " sleep for " - << async_msgr->cct->_conf->ms_inject_internal_delays - << dendl; - utime_t t; - t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } + inject_delay(); fail: ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl; @@ -2147,6 +2162,8 @@ void AsyncConnection::fault() ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl; center->dispatch_event_external(reset_handler); _stop(); + if (delay_thread) + delay_thread->discard(); return ; } @@ -2160,6 +2177,9 @@ void AsyncConnection::fault() can_write = WriteStatus::NOWRITE; open_write = false; + // queue delayed items immediately + if (delay_thread) + delay_thread->flush(); // requeue sent items requeue_sent(); recv_start = recv_end = 0; @@ -2219,6 +2239,8 @@ void AsyncConnection::was_session_reset() ldout(async_msgr->cct,10) << __func__ << " started" << dendl; assert(lock.is_locked()); Mutex::Locker l(write_lock); + if (delay_thread) + delay_thread->discard(); discard_out_queue(); center->dispatch_event_external(remote_reset_handler); @@ -2238,6 +2260,14 @@ void AsyncConnection::was_session_reset() void AsyncConnection::_stop() { assert(lock.is_locked()); + if (delay_thread && delay_thread->is_started()) { + ldout(msgr->cct, 20) << "joining delay_thread" << dendl; + if (delay_thread->is_flushing()) { + delay_thread->wait_for_flush(); + } + delay_thread->stop(); + delay_thread->join(); + } if (state == STATE_CLOSED) return ; @@ -2263,6 +2293,7 @@ void AsyncConnection::_stop() center->delete_time_event(*it); // Make sure in-queue events will been processed center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); + } void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl) @@ -2402,6 +2433,77 @@ void AsyncConnection::handle_ack(uint64_t seq) } } +void AsyncConnection::DelayedDelivery::discard() +{ + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + // TODO: what to use here? + //pipe->async_msgr->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } +} + +void AsyncConnection::DelayedDelivery::flush() +{ + Mutex::Locker l(delay_lock); + flush_count = delay_queue.size(); + delay_cond.Signal(); +} + +void *AsyncConnection::DelayedDelivery::entry() +{ + Mutex::Locker locker(delay_lock); + + while (!stop_delayed_delivery) { + if (delay_queue.empty()) { + delay_cond.Wait(delay_lock); + continue; + } + utime_t release = delay_queue.front().first; + Message *m = delay_queue.front().second; + string delay_msg_type = connection->async_msgr->cct->_conf->ms_inject_delay_msg_type; + if (!flush_count && + (release > ceph_clock_now(connection->async_msgr->cct) && + (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { + delay_cond.WaitUntil(delay_lock, release); + continue; + } + delay_queue.pop_front(); + if (flush_count > 0) { + --flush_count; + active_flush = true; + } + if (connection->async_msgr->ms_can_fast_dispatch(m)) { + if (!stop_fast_dispatching_flag) { + delay_dispatching = true; + delay_lock.Unlock(); + connection->async_msgr->ms_fast_dispatch(m); + delay_lock.Lock(); + delay_dispatching = false; + if (stop_fast_dispatching_flag) { + // we need to let the stopping thread proceed + delay_cond.Signal(); + delay_lock.Unlock(); + delay_lock.Lock(); + } + } + } else { + connection->center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(connection->async_msgr, m))); + } + active_flush = false; + } + return NULL; +} + +void AsyncConnection::DelayedDelivery::stop_fast_dispatching() { + Mutex::Locker l(delay_lock); + stop_fast_dispatching_flag = true; + while (delay_dispatching) + delay_cond.Wait(delay_lock); +} + void AsyncConnection::send_keepalive() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 28cf751a412..c141c7b597a 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -77,6 +77,7 @@ class AsyncConnection : public Connection { void handle_ack(uint64_t seq); void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL); ssize_t write_message(Message *m, bufferlist& bl, bool more); + void inject_delay(); ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply, bufferlist &authorizer_reply) { bufferlist reply_bl; @@ -88,8 +89,10 @@ class AsyncConnection : public Connection { reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); } ssize_t r = try_send(reply_bl); - if (r < 0) + if (r < 0) { + inject_delay(); return -1; + } state = STATE_ACCEPTING_WAIT_CONNECT_MSG; return 0; @@ -123,10 +126,77 @@ class AsyncConnection : public Connection { assert(write_lock.is_locked()); return !out_q.empty(); } + + /** + * The DelayedDelivery is for injecting delays into Message delivery off + * the socket. It is only enabled if delays are requested, and if they + * are then it pulls Messages off the DelayQueue and puts them into the + * AsyncMessenger event queue. + * This is a nearly direct copy & paste from SimpleMessenger, and as + * such, there was a problem during AsyncConnection shutdown, fixed by + * checking whether delay_thread isn't already stopped and refraining + * from stopping it again. + */ + class DelayedDelivery: public Thread { + AsyncConnection *connection; + std::deque< pair > delay_queue; + Mutex delay_lock; + Cond delay_cond; + int flush_count; + AsyncMessenger *msgr; + bool active_flush; + bool stop_delayed_delivery; + bool delay_dispatching; // we are in fast dispatch now + bool stop_fast_dispatching_flag; // we need to stop fast dispatching + + public: + explicit DelayedDelivery(AsyncConnection *p, AsyncMessenger *omsgr) + : connection(p), + delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), flush_count(0), + msgr(omsgr), + active_flush(false), + stop_delayed_delivery(false), + delay_dispatching(false), + stop_fast_dispatching_flag(false) { } + ~DelayedDelivery() { discard(); } + void *entry(); + void queue(utime_t release, Message *m) { + Mutex::Locker l(delay_lock); + delay_queue.push_back(make_pair(release, m)); + delay_cond.Signal(); + } + void discard(); + void flush(); + bool is_flushing() { + Mutex::Locker l(delay_lock); + return flush_count > 0 || active_flush; + } + void wait_for_flush() { + Mutex::Locker l(delay_lock); + while (flush_count > 0 || active_flush) + delay_cond.Wait(delay_lock); + } + void stop() { + delay_lock.Lock(); + stop_delayed_delivery = true; + delay_cond.Signal(); + delay_lock.Unlock(); + } + void steal_for_pipe(AsyncConnection *new_owner) { + Mutex::Locker l(delay_lock); + connection = new_owner; + } + /** + * We need to stop fast dispatching before we need to stop putting + * normal messages into the DispatchQueue. + */ + void stop_fast_dispatching(); + } *delay_thread; public: AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p); ~AsyncConnection(); + void maybe_start_delay_thread(); ostream& _conn_prefix(std::ostream *_dout);