From 5445f97d0582ec1550896edc10213e837a8e33b8 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Wed, 2 Oct 2019 13:06:25 +0200 Subject: [PATCH] msg/async: reset the TX session handler in proper EventCenter. Fixes: https://tracker.ceph.com/issues/42026 Signed-off-by: Radoslaw Zarzynski (cherry picked from commit 8a86719b062cd3b4b6c64a1c027564c092600117) --- src/msg/async/ProtocolV2.cc | 47 ++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 678516f9f9638..94f9b134640cc 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -221,12 +221,27 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { return count; } +// it's expected the `write_lock` is held while calling this method. void ProtocolV2::reset_recv_state() { - auth_meta.reset(new AuthConnectionMeta); - session_stream_handlers.tx.reset(nullptr); - session_stream_handlers.rx.reset(nullptr); - pre_auth.txbuf.clear(); - pre_auth.rxbuf.clear(); + ldout(cct, 5) << __func__ << dendl; + + // execute in the same thread that uses the rx/tx handlers. We need + // to do the warp because holding `write_lock` is not enough as + // `write_event()` unlocks it just before calling `write_message()`. + // `submit_to()` here is NOT blocking. + connection->center->submit_to(connection->center->get_id(), [this] { + ldout(cct, 5) << "reset_recv_state reseting crypto handlers" << dendl; + + // Possibly unnecessary. See the comment in `deactivate_existing`. + std::lock_guard l(connection->lock); + std::lock_guard wl(connection->write_lock); + + auth_meta.reset(new AuthConnectionMeta); + session_stream_handlers.rx.reset(nullptr); + session_stream_handlers.tx.reset(nullptr); + pre_auth.rxbuf.clear(); + pre_auth.txbuf.clear(); + }, /* nowait = */true); // clean read and write callbacks connection->pendingReadLen.reset(); @@ -2658,16 +2673,19 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, } exproto->peer_global_seq = peer_global_seq; + ceph_assert(connection->center->in_thread()); auto temp_cs = std::move(connection->cs); EventCenter *new_center = connection->center; Worker *new_worker = connection->worker; + // we can steal the session_stream_handlers under the assumption + // this happens in the event center's thread as there should be + // no user outside its boundaries (simlarly to e.g. outgoing_bl). + auto temp_stream_handlers = std::move(session_stream_handlers); + exproto->auth_meta = auth_meta; ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing" << dendl; - std::swap(exproto->session_stream_handlers, session_stream_handlers); - exproto->auth_meta = auth_meta; - // avoid _stop shutdown replacing socket // queue a reset on the new connection, which we're dumping for the old stop(); @@ -2688,14 +2706,25 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, ceph_assert(connection->recv_start == connection->recv_end); auto deactivate_existing = std::bind( - [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable { + [ existing, + new_worker, + new_center, + exproto, + temp_stream_handlers=std::move(temp_stream_handlers) + ](ConnectedSocket &cs) mutable { // we need to delete time event in original thread { std::lock_guard l(existing->lock); existing->write_lock.lock(); exproto->requeue_sent(); + // XXX: do we really need the locking for `outgoing_bl`? There is + // a comment just above its definition saying "lockfree, only used + // in own thread". I'm following lockfull schema just in the case. + // From performance point of view it should be fine – this happens + // far away from hot paths. existing->outgoing_bl.clear(); existing->open_write = false; + exproto->session_stream_handlers = std::move(temp_stream_handlers); existing->write_lock.unlock(); if (exproto->state == NONE) { existing->shutdown_socket(); -- 2.39.5