From b10f0bd3ccc919bfbe4d063bebabe651faf60812 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 5 Jun 2023 10:22:29 +0800 Subject: [PATCH] crimson/net: implement logic to move a io-handler to a new sid Note that it is inevitable that the user can mark down the connection while the protocol is trying to move the connection to another core. In that case, the implementation should tolerate the racing, which finally needs to cleanup resources correctly and dispatch reasonable events. Signed-off-by: Yingxin Cheng --- src/crimson/net/FrameAssemblerV2.cc | 8 ++ src/crimson/net/FrameAssemblerV2.h | 12 ++ src/crimson/net/ProtocolV2.cc | 150 ++++++++++++++++++---- src/crimson/net/ProtocolV2.h | 2 + src/crimson/net/io_handler.cc | 192 +++++++++++++++++++++++++--- src/crimson/net/io_handler.h | 21 ++- 6 files changed, 339 insertions(+), 46 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index f76f67740546d..bb48138a81fb7 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -151,6 +151,14 @@ bool FrameAssemblerV2::is_socket_valid() const return has_socket() && !is_socket_shutdown; } +seastar::shard_id +FrameAssemblerV2::get_socket_shard_id() const +{ + assert(seastar::this_shard_id() == sid); + assert(is_socket_valid()); + return socket->get_shard_id(); +} + SocketFRef FrameAssemblerV2::move_socket() { assert(has_socket()); diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index a4494384ab107..e4af653812d7a 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -26,6 +26,16 @@ public: FrameAssemblerV2(FrameAssemblerV2 &&) = delete; + void set_shard_id(seastar::shard_id _sid) { + assert(seastar::this_shard_id() == sid); + clear(); + sid = _sid; + } + + seastar::shard_id get_shard_id() const { + return sid; + } + void set_is_rev1(bool is_rev1); void create_session_stream_handlers( @@ -67,6 +77,8 @@ public: // the socket exists and not shutdown bool is_socket_valid() const; + seastar::shard_id get_socket_shard_id() const; + void set_socket(SocketFRef &&); void learn_socket_ephemeral_port_as_connector(uint16_t port); diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index b3a25955483e9..6c64db82fd558 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -222,6 +222,9 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state) FrameAssemblerV2Ref fa; if (new_state == state_t::READY) { assert(new_io_state == io_state_t::open); + assert(io_handler.get_shard_id() == + frame_assembler->get_socket_shard_id()); + frame_assembler->set_shard_id(io_handler.get_shard_id()); fa = std::move(frame_assembler); } else { assert(new_io_state != io_state_t::open); @@ -229,11 +232,19 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state) io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out); if (pre_state == state_t::READY) { + logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn); assert(new_io_state != io_state_t::open); gate.dispatch_in_background("exit_io", conn, [this] { - return io_handler.wait_io_exit_dispatching( - ).then([this](auto ret) { + return seastar::smp::submit_to( + io_handler.get_shard_id(), [this] { + return io_handler.wait_io_exit_dispatching(); + }).then([this](auto ret) { + logger().debug("{} IOHandler::wait_io_exit_dispatching() finish, {}", + conn, ret.io_states); frame_assembler = std::move(ret.frame_assembler); + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + ceph_assert_always( + seastar::this_shard_id() == frame_assembler->get_shard_id()); ceph_assert_always(!frame_assembler->is_socket_valid()); io_states = ret.io_states; pr_exit_io->set_value(); @@ -896,18 +907,43 @@ void ProtocolV2::execute_connecting() } switch (next) { case next_step_t::ready: { - logger().info("{} connected: gs={}, pgs={}, cs={}, " - "client_cookie={}, server_cookie={}, {}", - conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, io_states); - io_handler.dispatch_connect(); if (unlikely(state != state_t::CONNECTING)) { - logger().debug("{} triggered {} after ms_handle_connect(), abort", + logger().debug("{} triggered {} before dispatch_connect(), abort", conn, get_state_name(state)); abort_protocol(); } - execute_ready(); - break; + + logger().info("{} connected: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}, new_sid={}, " + "IOHandler::dispatch_connect()", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, io_states, + frame_assembler->get_socket_shard_id()); + + // set io_handler to a new shard + auto new_io_shard = frame_assembler->get_socket_shard_id(); + ConnectionFRef conn_fref = seastar::make_foreign( + conn.shared_from_this()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + pr_switch_io_shard = seastar::shared_promise<>(); + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable { + return io_handler.dispatch_connect( + new_io_shard, std::move(conn_fref)); + }).then([this, new_io_shard] { + ceph_assert_always(io_handler.get_shard_id() == new_io_shard); + pr_switch_io_shard->set_value(); + pr_switch_io_shard = std::nullopt; + // user can make changes + + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} after dispatch_connect(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + execute_ready(); + }); } case next_step_t::wait: { logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); @@ -915,7 +951,7 @@ void ProtocolV2::execute_connecting() frame_assembler->shutdown_socket(&gate); is_socket_valid = false; execute_wait(true); - break; + return seastar::now(); } default: { ceph_abort("impossible next step"); @@ -1634,6 +1670,13 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { ceph_assert_always(is_socket_valid); trigger_state(state_t::ESTABLISHING, io_state_t::delay); if (existing_conn) { + logger().info("{} start establishing: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}, new_sid={}, " + "close existing {}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, + io_states, frame_assembler->get_socket_shard_id(), + *existing_conn); ProtocolV2 *existing_proto = dynamic_cast( existing_conn->protocol.get()); existing_proto->do_close( @@ -1646,19 +1689,42 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { abort_protocol(); } } else { + logger().info("{} start establishing: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}, new_sid={}, " + "no existing", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, io_states, + frame_assembler->get_socket_shard_id()); accept_me(); } - io_handler.dispatch_accept(); - if (unlikely(state != state_t::ESTABLISHING)) { - logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()", - conn, get_state_name(state)); - abort_protocol(); - } - gated_execute("execute_establishing", conn, [this] { ceph_assert_always(state == state_t::ESTABLISHING); - return seastar::futurize_invoke([this] { + + // set io_handler to a new shard + auto new_io_shard = frame_assembler->get_socket_shard_id(); + logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard); + ConnectionFRef conn_fref = seastar::make_foreign( + conn.shared_from_this()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + pr_switch_io_shard = seastar::shared_promise<>(); + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable { + return io_handler.dispatch_accept( + new_io_shard, std::move(conn_fref)); + }).then([this, new_io_shard] { + ceph_assert_always(io_handler.get_shard_id() == new_io_shard); + pr_switch_io_shard->set_value(); + pr_switch_io_shard = std::nullopt; + // user can make changes + + if (unlikely(state != state_t::ESTABLISHING)) { + logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + return send_server_ident(); }).then([this] { if (unlikely(state != state_t::ESTABLISHING)) { @@ -1666,10 +1732,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { conn, get_state_name(state)); abort_protocol(); } - logger().info("{} established: gs={}, pgs={}, cs={}, " - "client_cookie={}, server_cookie={}, {}", - conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, io_states); + logger().info("{} established, going to ready", conn); execute_ready(); }).handle_exception([this](std::exception_ptr eptr) { fault(state_t::ESTABLISHING, "execute_establishing", eptr); @@ -1741,6 +1804,12 @@ void ProtocolV2::trigger_replacing(bool reconnect, ceph_assert_always(state <= state_t::WAIT); ceph_assert_always(has_socket || state == state_t::CONNECTING); ceph_assert_always(!mover.socket->is_shutdown()); + + logger().info("{} start replacing ({}): pgs was {}, cs was {}, " + "client_cookie was {}, {}, new_sid={}", + conn, reconnect ? "reconnected" : "connected", + peer_global_seq, connect_seq, client_cookie, + io_states, mover.socket->get_shard_id()); trigger_state(state_t::REPLACING, io_state_t::delay); if (is_socket_valid) { frame_assembler->shutdown_socket(&gate); @@ -1759,15 +1828,46 @@ void ProtocolV2::trigger_replacing(bool reconnect, new_peer_global_seq, new_connect_seq, new_msg_seq] () mutable { ceph_assert_always(state == state_t::REPLACING); - io_handler.dispatch_accept(); - // state may become CLOSING, close mover.socket and abort later + auto new_io_shard = mover.socket->get_shard_id(); + // state may become CLOSING below, but we cannot abort the chain until + // mover.socket is correctly handled (closed or replaced). + return wait_exit_io( ).then([this] { + if (unlikely(state != state_t::REPLACING)) { + ceph_assert_always(state == state_t::CLOSING); + return seastar::now(); + } + ceph_assert_always(frame_assembler); protocol_timer.cancel(); auto done = std::move(execution_done); execution_done = seastar::now(); return done; + }).then([this, new_io_shard] { + if (unlikely(state != state_t::REPLACING)) { + ceph_assert_always(state == state_t::CLOSING); + return seastar::now(); + } + + // set io_handler to a new shard + // we should prevent parallel switching core attemps + logger().debug("{} IOHandler::dispatch_accept({})", conn, new_io_shard); + ConnectionFRef conn_fref = seastar::make_foreign( + conn.shared_from_this()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + pr_switch_io_shard = seastar::shared_promise<>(); + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, new_io_shard, conn_fref=std::move(conn_fref)]() mutable { + return io_handler.dispatch_accept( + new_io_shard, std::move(conn_fref)); + }).then([this, new_io_shard] { + ceph_assert_always(io_handler.get_shard_id() == new_io_shard); + pr_switch_io_shard->set_value(); + pr_switch_io_shard = std::nullopt; + // user can make changes + }); }).then([this, reconnect, do_reset, diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index b1767f7da0108..d74425344b4c6 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -227,6 +227,8 @@ private: FrameAssemblerV2Ref frame_assembler; + std::optional> pr_switch_io_shard; + std::optional> pr_exit_io; AuthConnectionMetaRef auth_meta; diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 54ef67356ab0f..576d91b7a15e4 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -57,7 +57,9 @@ IOHandler::IOHandler(ChainedDispatchers &dispatchers, IOHandler::~IOHandler() { // close_io() must be finished - ceph_assert_always(shard_states->assert_closed_and_exit()); + ceph_assert_always(maybe_prv_shard_states == nullptr); + // should be true in the according shard + // ceph_assert_always(shard_states->assert_closed_and_exit()); assert(!conn_ref); } @@ -234,6 +236,19 @@ void IOHandler::print_io_stat(std::ostream &out) const << ")"; } +void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa) +{ + assert(fa != nullptr); + ceph_assert_always(frame_assembler == nullptr); + frame_assembler = std::move(fa); + ceph_assert_always( + frame_assembler->get_shard_id() == get_shard_id()); + // should have been set through dispatch_accept/connect() + ceph_assert_always( + frame_assembler->get_socket_shard_id() == get_shard_id()); + ceph_assert_always(frame_assembler->is_socket_valid()); +} + void IOHandler::set_io_state( io_state_t new_state, FrameAssemblerV2Ref fa, @@ -241,20 +256,32 @@ void IOHandler::set_io_state( { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); auto prv_state = get_io_state(); + logger().debug("{} got set_io_state(): prv_state={}, new_state={}, " + "fa={}, set_notify_out={}, at {}", + conn, prv_state, new_state, + fa ? "present" : "N/A", set_notify_out, + io_stat_printer{*this}); ceph_assert_always(!( (new_state == io_state_t::none && prv_state != io_state_t::none) || - (new_state == io_state_t::open && prv_state == io_state_t::open) || - (new_state != io_state_t::drop && prv_state == io_state_t::drop) + (new_state == io_state_t::open && prv_state == io_state_t::open) )); + if (prv_state == io_state_t::drop) { + // only possible due to a racing mark_down() from user + if (new_state == io_state_t::open) { + assign_frame_assembler(std::move(fa)); + frame_assembler->shutdown_socket(nullptr); + } else { + assert(fa == nullptr); + } + return; + } + bool dispatch_in = false; if (new_state == io_state_t::open) { // to open ceph_assert_always(protocol_is_connected == true); - assert(fa != nullptr); - ceph_assert_always(frame_assembler == nullptr); - frame_assembler = std::move(fa); - ceph_assert_always(frame_assembler->is_socket_valid()); + assign_frame_assembler(std::move(fa)); dispatch_in = true; #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { @@ -301,13 +328,31 @@ void IOHandler::set_io_state( seastar::future IOHandler::wait_io_exit_dispatching() { + assert(seastar::this_shard_id() == get_shard_id()); + logger().debug("{} got wait_io_exit_dispatching()", conn); ceph_assert_always(get_io_state() != io_state_t::open); ceph_assert_always(frame_assembler != nullptr); ceph_assert_always(!frame_assembler->is_socket_valid()); - return shard_states->wait_io_exit_dispatching( - ).then([this] { + return seastar::futurize_invoke([this] { + // cannot be running in parallel with to_new_sid() + if (maybe_dropped_sid.has_value()) { + ceph_assert_always(get_io_state() == io_state_t::drop); + assert(shard_states->assert_closed_and_exit()); + auto prv_sid = *maybe_dropped_sid; + return seastar::smp::submit_to(prv_sid, [this] { + logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn); + assert(maybe_prv_shard_states != nullptr); + return maybe_prv_shard_states->wait_io_exit_dispatching(); + }); + } else { + return shard_states->wait_io_exit_dispatching(); + } + }).then([this] { + logger().debug("{} finish wait_io_exit_dispatching at {}", + conn, io_stat_printer{*this}); ceph_assert_always(frame_assembler != nullptr); ceph_assert_always(!frame_assembler->is_socket_valid()); + frame_assembler->set_shard_id(conn.get_messenger_shard_id()); return exit_dispatching_ret{ std::move(frame_assembler), get_states()}; @@ -399,27 +444,127 @@ void IOHandler::discard_out_sent() out_sent_msgs.clear(); } -void IOHandler::dispatch_accept() +seastar::future<> +IOHandler::dispatch_accept( + seastar::shard_id new_sid, + ConnectionFRef conn_fref) { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + logger().debug("{} got dispatch_accept({}) at {}", + conn, new_sid, io_stat_printer{*this}); if (get_io_state() == io_state_t::drop) { - return; + assert(!protocol_is_connected); + // it is possible that both io_handler and protocolv2 are + // trying to close each other from different cores simultaneously. + return to_new_sid(new_sid, std::move(conn_fref)); } // protocol_is_connected can be from true to true here if the replacing is // happening to a connected connection. protocol_is_connected = true; ceph_assert_always(conn_ref); - dispatchers.ms_handle_accept(conn_ref, get_shard_id()); + auto _conn_ref = conn_ref; + auto fut = to_new_sid(new_sid, std::move(conn_fref)); + dispatchers.ms_handle_accept(_conn_ref, new_sid); + return fut; } -void IOHandler::dispatch_connect() +seastar::future<> +IOHandler::dispatch_connect( + seastar::shard_id new_sid, + ConnectionFRef conn_fref) { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + logger().debug("{} got dispatch_connect({}) at {}", + conn, new_sid, io_stat_printer{*this}); if (get_io_state() == io_state_t::drop) { - return; + assert(!protocol_is_connected); + // it is possible that both io_handler and protocolv2 are + // trying to close each other from different cores simultaneously. + return to_new_sid(new_sid, std::move(conn_fref)); } ceph_assert_always(protocol_is_connected == false); protocol_is_connected = true; ceph_assert_always(conn_ref); - dispatchers.ms_handle_connect(conn_ref, get_shard_id()); + auto _conn_ref = conn_ref; + auto fut = to_new_sid(new_sid, std::move(conn_fref)); + dispatchers.ms_handle_connect(_conn_ref, new_sid); + return fut; +} + +seastar::future<> +IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid) +{ + assert(seastar::this_shard_id() == get_shard_id()); + return seastar::smp::submit_to(prv_sid, [this] { + logger().debug("{} got cleanup_prv_shard()", conn); + assert(maybe_prv_shard_states != nullptr); + auto ref_prv_states = std::move(maybe_prv_shard_states); + auto &prv_states = *ref_prv_states; + return prv_states.close( + ).then([ref_prv_states=std::move(ref_prv_states)] { + ceph_assert_always(ref_prv_states->assert_closed_and_exit()); + }); + }).then([this] { + ceph_assert_always(maybe_prv_shard_states == nullptr); + }); +} + +seastar::future<> +IOHandler::to_new_sid( + seastar::shard_id new_sid, + ConnectionFRef conn_fref) +{ + /* + * Note: + * - It must be called before user is aware of the new core (through dispatching); + * - Messenger must wait the returned future for futher operations to prevent racing; + * - In general, the below submitted continuation should be the first one from the prv sid + * to the new sid; + */ + + assert(seastar::this_shard_id() == get_shard_id()); + bool is_dropped = false; + if (get_io_state() == io_state_t::drop) { + is_dropped = true; + } + ceph_assert_always(get_io_state() != io_state_t::open); + + // apply the switching atomically + ceph_assert_always(conn_ref); + conn_ref.reset(); + auto prv_sid = get_shard_id(); + ceph_assert_always(maybe_prv_shard_states == nullptr); + maybe_prv_shard_states = std::move(shard_states); + shard_states = shard_states_t::create_from_previous( + *maybe_prv_shard_states, new_sid); + assert(new_sid == get_shard_id()); + + return seastar::smp::submit_to(new_sid, + [this, is_dropped, prv_sid, conn_fref=std::move(conn_fref)]() mutable { + logger().debug("{} see new_sid in io_handler(new_sid) from {}, is_dropped={}", + conn, prv_sid, is_dropped); + + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); + ceph_assert_always(get_io_state() != io_state_t::open); + ceph_assert_always(!maybe_dropped_sid.has_value()); + + ceph_assert_always(!conn_ref); + conn_ref = make_local_shared_foreign(std::move(conn_fref)); + + if (is_dropped) { + // the follow up cleanups will be done in the prv_sid + ceph_assert_always(shard_states->assert_closed_and_exit()); + maybe_dropped_sid = prv_sid; + } else { + // may be at io_state_t::drop + // cleanup the prvious shard + shard_states->dispatch_in_background( + "cleanup_prv_sid", conn, [this, prv_sid] { + return cleanup_prv_shard(prv_sid); + }); + maybe_notify_out_dispatch(); + } + }); } void IOHandler::dispatch_reset(bool is_replace) @@ -806,8 +951,12 @@ void IOHandler::do_in_dispatch() seastar::future<> IOHandler::close_io(bool is_dispatch_reset, bool is_replace) { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); ceph_assert_always(get_io_state() == io_state_t::drop); + logger().debug("{} got close_io(reset={}, replace={})", + conn, is_dispatch_reset, is_replace); + if (is_dispatch_reset) { dispatch_reset(is_replace); } @@ -815,10 +964,17 @@ IOHandler::close_io(bool is_dispatch_reset, bool is_replace) ceph_assert_always(conn_ref); conn_ref.reset(); - return shard_states->close( - ).then([this] { + // cannot be running in parallel with to_new_sid() + if (maybe_dropped_sid.has_value()) { assert(shard_states->assert_closed_and_exit()); - }); + auto prv_sid = *maybe_dropped_sid; + return cleanup_prv_shard(prv_sid); + } else { + return shard_states->close( + ).then([this] { + assert(shard_states->assert_closed_and_exit()); + }); + } } /* diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index 07f4c1cb4262a..acb171bde282e 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -108,7 +108,7 @@ public: /* * as ConnectionHandler */ -private: +public: seastar::shard_id get_shard_id() const final { return shard_states->get_shard_id(); } @@ -196,9 +196,11 @@ public: void requeue_out_sent(); - void dispatch_accept(); + seastar::future<> dispatch_accept( + seastar::shard_id new_sid, ConnectionFRef); - void dispatch_connect(); + seastar::future<> dispatch_connect( + seastar::shard_id new_sid, ConnectionFRef); private: class shard_states_t; @@ -339,6 +341,8 @@ public: return shard_states->get_io_state(); } + void assign_frame_assembler(FrameAssemblerV2Ref); + seastar::future<> send_redirected(MessageFRef msg); seastar::future<> do_send(MessageFRef msg); @@ -347,6 +351,9 @@ public: seastar::future<> do_send_keepalive(); + seastar::future<> to_new_sid( + seastar::shard_id new_sid, ConnectionFRef); + void dispatch_reset(bool is_replace); void dispatch_remote_reset(); @@ -388,9 +395,17 @@ public: void do_in_dispatch(); + seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid); + private: shard_states_ref_t shard_states; + // drop was happening in the previous sid + std::optional maybe_dropped_sid; + + // the remaining states in the previous sid for cleanup, see to_new_sid() + shard_states_ref_t maybe_prv_shard_states; + ChainedDispatchers &dispatchers; SocketConnection &conn; -- 2.39.5