From: Radoslaw Zarzynski Date: Wed, 2 Oct 2019 11:06:25 +0000 (+0200) Subject: msg/async: reset the TX session handler in proper EventCenter. X-Git-Tag: v15.1.0~880^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8a86719b062cd3b4b6c64a1c027564c092600117;p=ceph.git msg/async: reset the TX session handler in proper EventCenter. Fixes: https://tracker.ceph.com/issues/42026 Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 446772f3f4c..fe2158289a5 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -222,12 +222,27 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { return count; } +// it's expected the `write_lock` is held while calling this method. void ProtocolV2::reset_recv_state() { - auth_meta.reset(new AuthConnectionMeta); - session_stream_handlers.tx.reset(nullptr); - session_stream_handlers.rx.reset(nullptr); - pre_auth.txbuf.clear(); - pre_auth.rxbuf.clear(); + ldout(cct, 5) << __func__ << dendl; + + // execute in the same thread that uses the rx/tx handlers. We need + // to do the warp because holding `write_lock` is not enough as + // `write_event()` unlocks it just before calling `write_message()`. + // `submit_to()` here is NOT blocking. + connection->center->submit_to(connection->center->get_id(), [this] { + ldout(cct, 5) << "reset_recv_state reseting crypto handlers" << dendl; + + // Possibly unnecessary. See the comment in `deactivate_existing`. + std::lock_guard l(connection->lock); + std::lock_guard wl(connection->write_lock); + + auth_meta.reset(new AuthConnectionMeta); + session_stream_handlers.rx.reset(nullptr); + session_stream_handlers.tx.reset(nullptr); + pre_auth.rxbuf.clear(); + pre_auth.txbuf.clear(); + }, /* nowait = */true); // clean read and write callbacks connection->pendingReadLen.reset(); @@ -2681,16 +2696,19 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, } exproto->peer_global_seq = peer_global_seq; + ceph_assert(connection->center->in_thread()); auto temp_cs = std::move(connection->cs); EventCenter *new_center = connection->center; Worker *new_worker = connection->worker; + // we can steal the session_stream_handlers under the assumption + // this happens in the event center's thread as there should be + // no user outside its boundaries (simlarly to e.g. outgoing_bl). + auto temp_stream_handlers = std::move(session_stream_handlers); + exproto->auth_meta = auth_meta; ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing" << dendl; - std::swap(exproto->session_stream_handlers, session_stream_handlers); - exproto->auth_meta = auth_meta; - // avoid _stop shutdown replacing socket // queue a reset on the new connection, which we're dumping for the old stop(); @@ -2711,14 +2729,25 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, ceph_assert(connection->recv_start == connection->recv_end); auto deactivate_existing = std::bind( - [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable { + [ existing, + new_worker, + new_center, + exproto, + temp_stream_handlers=std::move(temp_stream_handlers) + ](ConnectedSocket &cs) mutable { // we need to delete time event in original thread { std::lock_guard l(existing->lock); existing->write_lock.lock(); exproto->requeue_sent(); + // XXX: do we really need the locking for `outgoing_bl`? There is + // a comment just above its definition saying "lockfree, only used + // in own thread". I'm following lockfull schema just in the case. + // From performance point of view it should be fine – this happens + // far away from hot paths. existing->outgoing_bl.clear(); existing->open_write = false; + exproto->session_stream_handlers = std::move(temp_stream_handlers); existing->write_lock.unlock(); if (exproto->state == NONE) { existing->shutdown_socket();