From 50bc62a9da8f18fae1cff86ce8c91f6125a96225 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 26 Apr 2023 17:27:03 +0800 Subject: [PATCH] crimson/net: allow FrameAssemblerV2 to check and shutdown a foreign socket Signed-off-by: Yingxin Cheng --- src/crimson/net/FrameAssemblerV2.cc | 47 ++++++++++++++++++++++++----- src/crimson/net/FrameAssemblerV2.h | 9 +++++- src/crimson/net/ProtocolV2.cc | 9 +++--- src/crimson/net/io_handler.cc | 2 +- 4 files changed, 53 insertions(+), 14 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index 4a56829493a07..4608f83da533e 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -135,7 +135,12 @@ bool FrameAssemblerV2::has_socket() const bool FrameAssemblerV2::is_socket_valid() const { assert(seastar::this_shard_id() == sid); - return has_socket() && !socket->is_shutdown(); +#ifndef NDEBUG + if (has_socket() && socket->get_shard_id() == sid) { + assert(socket->is_shutdown() == is_socket_shutdown); + } +#endif + return has_socket() && !is_socket_shutdown; } SocketFRef FrameAssemblerV2::move_socket() @@ -152,6 +157,7 @@ void FrameAssemblerV2::set_socket(SocketFRef &&new_socket) assert(new_socket); socket = std::move(new_socket); conn.set_socket(socket.get()); + is_socket_shutdown = false; assert(is_socket_valid()); } @@ -162,30 +168,55 @@ void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port) socket->learn_ephemeral_port_as_connector(port); } -void FrameAssemblerV2::shutdown_socket() +template +void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *gate) { assert(seastar::this_shard_id() == sid); assert(is_socket_valid()); - socket->shutdown(); + is_socket_shutdown = true; + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + assert(gate); + gate->dispatch_in_background("shutdown_socket", conn, [this] { + return seastar::smp::submit_to( + socket->get_shard_id(), [this] { + socket->shutdown(); + }); + }); + } else { + assert(socket->get_shard_id() == sid); + assert(!gate); + socket->shutdown(); + } } +template void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *); +template void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *); seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket) { assert(seastar::this_shard_id() == sid); assert(has_socket()); - assert(socket->is_shutdown()); + assert(!is_socket_valid()); auto old_socket = move_socket(); + auto old_socket_shard_id = old_socket->get_shard_id(); set_socket(std::move(new_socket)); - return old_socket->close( - ).then([sock = std::move(old_socket)] {}); + return seastar::smp::submit_to( + old_socket_shard_id, + [old_socket = std::move(old_socket)]() mutable { + return old_socket->close( + ).then([sock = std::move(old_socket)] {}); + }); } seastar::future<> FrameAssemblerV2::close_shutdown_socket() { assert(seastar::this_shard_id() == sid); assert(has_socket()); - assert(socket->is_shutdown()); - return socket->close(); + assert(!is_socket_valid()); + return seastar::smp::submit_to( + socket->get_shard_id(), [this] { + return socket->close(); + }); } seastar::future diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index d31e9c4b78d86..0cc495574c2fe 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -7,6 +7,7 @@ #include "msg/async/crypto_onwire.h" #include "msg/async/compression_onwire.h" +#include "crimson/common/gated.h" #include "crimson/net/Socket.h" namespace crimson::net { @@ -70,7 +71,9 @@ public: void learn_socket_ephemeral_port_as_connector(uint16_t port); - void shutdown_socket(); + // if may_cross_core == true, gate is required for cross-core shutdown + template + void shutdown_socket(crimson::common::Gated *gate); seastar::future<> replace_shutdown_socket(SocketFRef &&); @@ -141,6 +144,10 @@ private: SocketFRef socket; + // checking Socket::is_shutdown() synchronously is impossible when sid is + // different from the socket sid. + bool is_socket_shutdown = false; + seastar::shard_id sid; /* diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index ee8f1a99906f0..ae532acb08374 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -225,6 +225,7 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool return io_handler.wait_io_exit_dispatching( ).then([this](FrameAssemblerV2Ref fa) { frame_assembler = std::move(fa); + ceph_assert_always(!frame_assembler->is_socket_valid()); exit_io->set_value(); exit_io = std::nullopt; }); @@ -283,7 +284,7 @@ void ProtocolV2::fault( if (likely(has_socket)) { if (likely(is_socket_valid)) { ceph_assert_always(state != state_t::READY); - frame_assembler->shutdown_socket(); + frame_assembler->shutdown_socket(&gate); is_socket_valid = false; } else { ceph_assert_always(state != state_t::ESTABLISHING); @@ -899,7 +900,7 @@ void ProtocolV2::execute_connecting() case next_step_t::wait: { logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); ceph_assert_always(is_socket_valid); - frame_assembler->shutdown_socket(); + frame_assembler->shutdown_socket(&gate); is_socket_valid = false; execute_wait(true); break; @@ -1723,7 +1724,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, ceph_assert_always(!mover.socket->is_shutdown()); trigger_state(state_t::REPLACING, io_state_t::delay, false); if (is_socket_valid) { - frame_assembler->shutdown_socket(); + frame_assembler->shutdown_socket(&gate); is_socket_valid = false; } gate.dispatch_in_background( @@ -1981,7 +1982,7 @@ void ProtocolV2::do_close( (*f_accept_new)(); } if (is_socket_valid) { - frame_assembler->shutdown_socket(); + frame_assembler->shutdown_socket(&gate); is_socket_valid = false; } assert(!gate.is_closed()); diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 70e6650c699cb..0c0cdc76a2cda 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -203,7 +203,7 @@ void IOHandler::set_io_state( protocol_is_connected = false; assert(fa == nullptr); ceph_assert_always(frame_assembler->is_socket_valid()); - frame_assembler->shutdown_socket(); + frame_assembler->shutdown_socket(nullptr); if (out_dispatching) { ceph_assert_always(!out_exit_dispatching.has_value()); out_exit_dispatching = seastar::promise<>(); -- 2.39.5