From 36324e9fabd9c40216a4f4113a94347b291c3a3b Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 5 Jun 2023 10:57:37 +0800 Subject: [PATCH] crimson/net: misc cleanups with logs around cross-core Signed-off-by: Yingxin Cheng (cherry picked from commit 77268679631adaad2408eac62d3d599e2acf6059) --- src/crimson/net/ProtocolV2.cc | 14 ++++++++++++++ src/crimson/net/io_handler.cc | 20 ++++++++++++++++++++ src/crimson/net/io_handler.h | 2 ++ 3 files changed, 36 insertions(+) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index e1ffef2908894..15d3d565dd778 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -155,6 +155,7 @@ ProtocolV2::~ProtocolV2() {} void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, const entity_name_t& _peer_name) { + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); ceph_assert(state == state_t::NONE); ceph_assert(!gate.is_closed()); conn.peer_addr = _peer_addr; @@ -175,6 +176,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, void ProtocolV2::start_accept(SocketFRef&& new_socket, const entity_addr_t& _peer_addr) { + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); ceph_assert(state == state_t::NONE); // until we know better conn.target_addr = _peer_addr; @@ -813,7 +815,10 @@ ProtocolV2::client_reconnect() // handle_session_reset() logic auto reset = ResetFrame::Decode(payload->back()); logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); + reset_session(reset.full()); + // user can make changes + return client_connect(); }); case Tag::WAIT: @@ -1960,9 +1965,12 @@ void ProtocolV2::trigger_replacing(bool reconnect, new_connect_seq, new_msg_seq] () mutable { if (state == state_t::REPLACING && do_reset) { reset_session(true); + // user can make changes } if (unlikely(state != state_t::REPLACING)) { + logger().debug("{} triggered {} in the middle of trigger_replacing(), abort", + conn, get_state_name(state)); ceph_assert_always(state == state_t::CLOSING); return mover.socket->close( ).then([sock = std::move(mover.socket)] { @@ -2038,7 +2046,9 @@ void ProtocolV2::notify_out_fault( std::exception_ptr eptr, io_handler_state _io_states) { + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); io_states = _io_states; + logger().debug("{} got notify_out_fault(): io_states={}", conn, io_states); fault(state_t::READY, where, eptr); } @@ -2062,6 +2072,8 @@ void ProtocolV2::execute_standby() void ProtocolV2::notify_out() { + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + logger().debug("{} got notify_out(): at {}", conn, get_state_name(state)); io_states.is_out_queued = true; if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { logger().info("{} notify_out(): at {}, going to CONNECTING", @@ -2137,6 +2149,8 @@ void ProtocolV2::execute_server_wait() void ProtocolV2::notify_mark_down() { + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + logger().debug("{} got notify_mark_down()", conn); do_close(false); } diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 7072c4d0b2ef8..f04ffff3a0065 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -367,6 +367,8 @@ IOHandler::wait_io_exit_dispatching() void IOHandler::reset_session(bool full) { + assert(seastar::this_shard_id() == get_shard_id()); + logger().debug("{} got reset_session({})", conn, full); assert(get_io_state() != io_state_t::open); reset_in(); if (full) { @@ -377,6 +379,8 @@ void IOHandler::reset_session(bool full) void IOHandler::reset_peer_state() { + assert(seastar::this_shard_id() == get_shard_id()); + logger().debug("{} got reset_peer_state()", conn); assert(get_io_state() != io_state_t::open); reset_in(); requeue_out_sent_up_to(0); @@ -385,6 +389,7 @@ void IOHandler::reset_peer_state() void IOHandler::requeue_out_sent() { + assert(seastar::this_shard_id() == get_shard_id()); assert(get_io_state() != io_state_t::open); if (out_sent_msgs.empty()) { return; @@ -407,6 +412,7 @@ void IOHandler::requeue_out_sent() void IOHandler::requeue_out_sent_up_to(seq_num_t seq) { + assert(seastar::this_shard_id() == get_shard_id()); assert(get_io_state() != io_state_t::open); if (out_sent_msgs.empty() && out_pending_msgs.empty()) { logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", @@ -470,7 +476,10 @@ IOHandler::dispatch_accept( ceph_assert_always(conn_ref); auto _conn_ref = conn_ref; auto fut = to_new_sid(new_sid, std::move(conn_fref)); + dispatchers.ms_handle_accept(_conn_ref, new_sid); + // user can make changes + return fut; } @@ -493,7 +502,10 @@ IOHandler::dispatch_connect( ceph_assert_always(conn_ref); auto _conn_ref = conn_ref; auto fut = to_new_sid(new_sid, std::move(conn_fref)); + dispatchers.ms_handle_connect(_conn_ref, new_sid); + // user can make changes + return fut; } @@ -581,7 +593,9 @@ void IOHandler::dispatch_reset(bool is_replace) } need_dispatch_reset = false; ceph_assert_always(conn_ref); + dispatchers.ms_handle_reset(conn_ref, is_replace); + // user can make changes } void IOHandler::dispatch_remote_reset() @@ -590,7 +604,9 @@ void IOHandler::dispatch_remote_reset() return; } ceph_assert_always(conn_ref); + dispatchers.ms_handle_remote_reset(conn_ref); + // user can make changes } void IOHandler::ack_out_sent(seq_num_t seq) @@ -712,6 +728,7 @@ IOHandler::do_out_dispatch(shard_states_t &ctx) void IOHandler::maybe_notify_out_dispatch() { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); if (is_out_queued()) { notify_out_dispatch(); } @@ -719,6 +736,7 @@ void IOHandler::maybe_notify_out_dispatch() void IOHandler::notify_out_dispatch() { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); assert(is_out_queued()); if (need_notify_out) { logger().debug("{} send notify_out()", conn); @@ -853,8 +871,10 @@ IOHandler::read_message( assert(ctx.get_io_state() == io_state_t::open); assert(get_io_state() == io_state_t::open); ceph_assert_always(conn_ref); + // throttle the reading process by the returned future return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); + // user can make changes }); } diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index ecdd02f157335..f478f14296300 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -151,11 +151,13 @@ public: */ void set_handshake_listener(HandshakeListener &hl) { + assert(seastar::this_shard_id() == get_shard_id()); ceph_assert_always(handshake_listener == nullptr); handshake_listener = &hl; } io_handler_state get_states() const { + assert(seastar::this_shard_id() == get_shard_id()); return {in_seq, is_out_queued(), has_out_sent()}; } -- 2.39.5