From 6a20a68aaf26be39fca10ed10134b7863078a105 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 7 Dec 2022 10:08:18 +0800 Subject: [PATCH] crimson/net: introduce IOHandler class for message and event dispatching IOHandler also represents the Connection as ConnectionHandler. ProtocolV2 and IOHandler will be finally running in different cores, as ProtocolV2 will need to call IOHandler interfaces asynchronously. And IOHandler will also notify ProtocolV2 through HandshakeListener asynchronously. Signed-off-by: Yingxin Cheng --- src/crimson/CMakeLists.txt | 2 +- src/crimson/net/ProtocolV2.cc | 62 ++++----- src/crimson/net/ProtocolV2.h | 78 +++++++---- src/crimson/net/SocketConnection.cc | 20 +-- src/crimson/net/SocketConnection.h | 44 ++++++- .../net/{Protocol.cc => io_handler.cc} | 62 +++++---- src/crimson/net/{Protocol.h => io_handler.h} | 123 +++++++++++------- 7 files changed, 240 insertions(+), 151 deletions(-) rename src/crimson/net/{Protocol.cc => io_handler.cc} (94%) rename src/crimson/net/{Protocol.h => io_handler.h} (61%) diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index cd5d1fb0cd8a3..68e1b64beb9db 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -176,11 +176,11 @@ set(crimson_net_srcs ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc net/Errors.cc net/FrameAssemblerV2.cc + net/io_handler.cc net/Messenger.cc net/SocketConnection.cc net/SocketMessenger.cc net/Socket.cc - net/Protocol.cc net/ProtocolV2.cc net/chained_dispatchers.cc) add_library(crimson STATIC diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index b4a5767d7fa26..0b9d4be2d257d 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -12,9 +12,9 @@ #include "crimson/auth/AuthClient.h" #include "crimson/auth/AuthServer.h" #include "crimson/common/formatter.h" +#include "crimson/common/log.h" #include "Errors.h" -#include "SocketConnection.h" #include "SocketMessenger.h" #ifdef UNIT_TESTS_BUILT @@ -23,6 +23,8 @@ using namespace ceph::msgr::v2; using crimson::common::local_conf; +using io_state_t = crimson::net::IOHandler::io_state_t; +using io_stat_printer = crimson::net::IOHandler::io_stat_printer; namespace { @@ -154,11 +156,11 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds) }); } -ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers, - SocketConnection& conn) - : Protocol(dispatchers, conn), - conn{conn}, +ProtocolV2::ProtocolV2(SocketConnection& conn, + IOHandler &io_handler) + : conn{conn}, messenger{conn.messenger}, + io_handler{io_handler}, frame_assembler{FrameAssemblerV2::create(conn)}, auth_meta{seastar::make_lw_shared()}, protocol_timer{conn} @@ -225,9 +227,9 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool if (new_state == state_t::READY) { // I'm not responsible to shutdown the socket at READY is_socket_valid = false; - set_io_state(new_io_state, std::move(frame_assembler)); + io_handler.set_io_state(new_io_state, std::move(frame_assembler)); } else { - set_io_state(new_io_state, nullptr); + io_handler.set_io_state(new_io_state, nullptr); } /* @@ -236,7 +238,7 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool if (pre_state == state_t::READY) { gate.dispatch_in_background("exit_io", conn, [this] { - return wait_io_exit_dispatching( + return io_handler.wait_io_exit_dispatching( ).then([this](FrameAssemblerV2Ref fa) { frame_assembler = std::move(fa); exit_io->set_value(); @@ -308,20 +310,20 @@ void ProtocolV2::fault( } if (conn.policy.server || - (conn.policy.standby && !is_out_queued_or_sent())) { + (conn.policy.standby && !io_handler.is_out_queued_or_sent())) { if (conn.policy.server) { logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}", conn, get_state_name(state), where, - io_stat_printer{*this}, + io_stat_printer{io_handler}, e_what); } else { logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}", conn, get_state_name(state), where, - io_stat_printer{*this}, + io_stat_printer{io_handler}, e_what); } execute_standby(); @@ -331,7 +333,7 @@ void ProtocolV2::fault( conn, get_state_name(state), where, - io_stat_printer{*this}, + io_stat_printer{io_handler}, e_what); execute_wait(false); } else { @@ -341,7 +343,7 @@ void ProtocolV2::fault( conn, get_state_name(state), where, - io_stat_printer{*this}, + io_stat_printer{io_handler}, e_what); execute_connecting(); } @@ -355,7 +357,7 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; } - do_reset_session(full); + io_handler.reset_session(full); } seastar::future> @@ -633,7 +635,7 @@ ProtocolV2::client_connect() return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_server_ident() logic - requeue_out_sent(); + io_handler.requeue_out_sent(); auto server_ident = ServerIdentFrame::Decode(payload->back()); logger().debug("{} GOT ServerIdentFrame:" " addrs={}, gid={}, gs={}," @@ -709,12 +711,12 @@ ProtocolV2::client_reconnect() server_cookie, global_seq, connect_seq, - get_in_seq()); + io_handler.get_in_seq()); logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={}," " server_cookie={}, gs={}, cs={}, in_seq={}", conn, messenger.get_myaddrs(), client_cookie, server_cookie, - global_seq, connect_seq, get_in_seq()); + global_seq, connect_seq, io_handler.get_in_seq()); return frame_assembler->write_flush_frame(reconnect).then([this] { return frame_assembler->read_main_preamble(); }).then([this](auto ret) { @@ -764,7 +766,7 @@ ProtocolV2::client_reconnect() auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", conn, reconnect_ok.msg_seq()); - requeue_out_sent_up_to(reconnect_ok.msg_seq()); + io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq()); return seastar::make_ready_future(next_step_t::ready); }); default: { @@ -872,8 +874,8 @@ void ProtocolV2::execute_connecting() "client_cookie={}, server_cookie={}, {}", conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, - io_stat_printer{*this}); - dispatch_connect(); + io_stat_printer{io_handler}); + io_handler.dispatch_connect(); if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} after ms_handle_connect(), abort", conn, get_state_name(state)); @@ -1593,7 +1595,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { accept_me(); } - dispatch_accept(); + io_handler.dispatch_accept(); if (unlikely(state != state_t::ESTABLISHING)) { logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()", conn, get_state_name(state)); @@ -1613,7 +1615,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { "client_cookie={}, server_cookie={}, {}", conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, - io_stat_printer{*this}); + io_stat_printer{io_handler}); execute_ready(); }).handle_exception([this](std::exception_ptr eptr) { fault(state_t::ESTABLISHING, "execute_establishing", eptr); @@ -1633,8 +1635,8 @@ ProtocolV2::send_server_ident() logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); // this is required for the case when this connection is being replaced - requeue_out_sent_up_to(0); - do_reset_session(false); + io_handler.requeue_out_sent_up_to(0); + io_handler.reset_session(false); if (!conn.policy.lossy) { server_cookie = ceph::util::generate_random_number(1, -1ll); @@ -1699,7 +1701,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, new_peer_global_seq, new_connect_seq, new_msg_seq] () mutable { ceph_assert_always(state == state_t::REPLACING); - dispatch_accept(); + io_handler.dispatch_accept(); // state may become CLOSING, close mover.socket and abort later return wait_exit_io( ).then([this] { @@ -1742,9 +1744,9 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (reconnect) { connect_seq = new_connect_seq; // send_reconnect_ok() logic - requeue_out_sent_up_to(new_msg_seq); - auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq()); - logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq()); + io_handler.requeue_out_sent_up_to(new_msg_seq); + auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq()); + logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq()); return frame_assembler->write_flush_frame(reconnect_ok); } else { client_cookie = new_client_cookie; @@ -1769,7 +1771,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, conn, reconnect ? "reconnected" : "connected", global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, - io_stat_printer{*this}); + io_stat_printer{io_handler}); execute_ready(); }).handle_exception([this](std::exception_ptr eptr) { fault(state_t::REPLACING, "trigger_replacing", eptr); @@ -1933,7 +1935,7 @@ void ProtocolV2::do_close( } assert(!gate.is_closed()); auto handshake_closed = gate.close(); - auto io_closed = close_io( + auto io_closed = io_handler.close_io( is_dispatch_reset, is_replace); // asynchronous operations diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 820d8e5f0acfd..b6f67b566510e 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -3,48 +3,60 @@ #pragma once +#include #include -#include "Protocol.h" +#include "io_handler.h" namespace crimson::net { -class ProtocolV2 final : public Protocol { +class ProtocolV2 final : public HandshakeListener { using AuthConnectionMetaRef = seastar::lw_shared_ptr; - public: - ProtocolV2(ChainedDispatchers& dispatchers, - SocketConnection& conn); - ~ProtocolV2() override; +public: + ProtocolV2(SocketConnection &, + IOHandler &); -// public to SocketConnection, but private to the others - private: - seastar::future<> close_clean_yielded() override; + ~ProtocolV2() final; -#ifdef UNIT_TESTS_BUILT - bool is_closed_clean() const override { - return closed_clean; - } + ProtocolV2(const ProtocolV2 &) = delete; + ProtocolV2(ProtocolV2 &&) = delete; + ProtocolV2 &operator=(const ProtocolV2 &) = delete; + ProtocolV2 &operator=(ProtocolV2 &&) = delete; - bool is_closed() const override { - return closed; - } +/** + * as HandshakeListener + */ +private: + void notify_out() final; -#endif + void notify_out_fault(const char *, std::exception_ptr) final; + + void notify_mark_down() final; + +/* +* as ProtocolV2 to be called by SocketConnection +*/ +public: void start_connect(const entity_addr_t& peer_addr, - const entity_name_t& peer_name) override; + const entity_name_t& peer_name); void start_accept(SocketRef&& socket, - const entity_addr_t& peer_addr) override; + const entity_addr_t& peer_addr); - private: - void notify_out() override; + seastar::future<> close_clean_yielded(); - void notify_out_fault(const char *, std::exception_ptr) override; +#ifdef UNIT_TESTS_BUILT + bool is_closed_clean() const { + return closed_clean; + } - void notify_mark_down() override; + bool is_closed() const { + return closed; + } - private: +#endif +private: seastar::future<> wait_exit_io() { if (exit_io.has_value()) { return exit_io->get_shared_future(); @@ -80,7 +92,7 @@ class ProtocolV2 final : public Protocol { return statenames[static_cast(state)]; } - void trigger_state(state_t state, io_state_t io_state, bool reentrant); + void trigger_state(state_t state, IOHandler::io_state_t io_state, bool reentrant); template void gated_execute(const char *what, T &who, Func &&func) { @@ -196,11 +208,13 @@ class ProtocolV2 final : public Protocol { void do_close(bool is_dispatch_reset, std::optional> f_accept_new=std::nullopt); - private: +private: SocketConnection &conn; SocketMessenger &messenger; + IOHandler &io_handler; + bool has_socket = false; // the socket exists and it is not shutdown @@ -254,6 +268,18 @@ class ProtocolV2 final : public Protocol { Timer protocol_timer; }; +struct create_handlers_ret { + std::unique_ptr io_handler; + std::unique_ptr protocol; +}; +inline create_handlers_ret create_handlers(ChainedDispatchers &dispatchers, SocketConnection &conn) { + std::unique_ptr io_handler = std::make_unique(dispatchers, conn); + IOHandler &io_handler_concrete = static_cast(*io_handler); + auto protocol = std::make_unique(conn, io_handler_concrete); + io_handler_concrete.set_handshake_listener(*protocol); + return {std::move(io_handler), std::move(protocol)}; +} + } // namespace crimson::net #if FMT_VERSION >= 90000 diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 5b3d806ed7e9a..aa7fcc027790d 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -28,9 +28,11 @@ using crimson::common::local_conf; SocketConnection::SocketConnection(SocketMessenger& messenger, ChainedDispatchers& dispatchers) : core(messenger.shard_id()), - messenger(messenger), - protocol(std::make_unique(dispatchers, *this)) + messenger(messenger) { + auto ret = create_handlers(dispatchers, *this); + io_handler = std::move(ret.io_handler); + protocol = std::move(ret.protocol); #ifdef UNIT_TESTS_BUILT if (messenger.interceptor) { interceptor = messenger.interceptor; @@ -44,7 +46,7 @@ SocketConnection::~SocketConnection() {} bool SocketConnection::is_connected() const { assert(seastar::this_shard_id() == shard_id()); - return protocol->is_connected(); + return io_handler->is_connected(); } #ifdef UNIT_TESTS_BUILT @@ -71,7 +73,7 @@ seastar::future<> SocketConnection::send(MessageURef msg) return seastar::smp::submit_to( shard_id(), [this, msg=std::move(msg)]() mutable { - return protocol->send(std::move(msg)); + return io_handler->send(std::move(msg)); }); } @@ -80,31 +82,31 @@ seastar::future<> SocketConnection::send_keepalive() return seastar::smp::submit_to( shard_id(), [this] { - return protocol->send_keepalive(); + return io_handler->send_keepalive(); }); } SocketConnection::clock_t::time_point SocketConnection::get_last_keepalive() const { - return protocol->get_last_keepalive(); + return io_handler->get_last_keepalive(); } SocketConnection::clock_t::time_point SocketConnection::get_last_keepalive_ack() const { - return protocol->get_last_keepalive_ack(); + return io_handler->get_last_keepalive_ack(); } void SocketConnection::set_last_keepalive_ack(clock_t::time_point when) { - protocol->set_last_keepalive_ack(when); + io_handler->set_last_keepalive_ack(when); } void SocketConnection::mark_down() { assert(seastar::this_shard_id() == shard_id()); - protocol->mark_down(); + io_handler->mark_down(); } void diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 94d98302bdf6e..863968cac58d7 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -23,7 +23,7 @@ namespace crimson::net { -class Protocol; +class ProtocolV2; class SocketMessenger; class SocketConnection; using SocketConnectionRef = seastar::shared_ptr; @@ -32,10 +32,48 @@ using SocketConnectionRef = seastar::shared_ptr; class Interceptor; #endif +/** + * ConnectionHandler + * + * The interface class to implement Connection, called by SocketConnection. + */ +class ConnectionHandler { +public: + using clock_t = seastar::lowres_system_clock; + + virtual ~ConnectionHandler() = default; + + ConnectionHandler(const ConnectionHandler &) = delete; + ConnectionHandler(ConnectionHandler &&) = delete; + ConnectionHandler &operator=(const ConnectionHandler &) = delete; + ConnectionHandler &operator=(ConnectionHandler &&) = delete; + + virtual bool is_connected() const = 0; + + virtual seastar::future<> send(MessageURef) = 0; + + virtual seastar::future<> send_keepalive() = 0; + + virtual clock_t::time_point get_last_keepalive() const = 0; + + virtual clock_t::time_point get_last_keepalive_ack() const = 0; + + virtual void set_last_keepalive_ack(clock_t::time_point) = 0; + + virtual void mark_down() = 0; + +protected: + ConnectionHandler() = default; +}; + class SocketConnection : public Connection { const seastar::shard_id core; + SocketMessenger& messenger; - std::unique_ptr protocol; + + std::unique_ptr io_handler; + + std::unique_ptr protocol; SocketRef socket; @@ -178,7 +216,7 @@ private: bool peer_wins() const; #endif - friend class Protocol; + friend class IOHandler; friend class ProtocolV2; friend class FrameAssemblerV2; }; diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/io_handler.cc similarity index 94% rename from src/crimson/net/Protocol.cc rename to src/crimson/net/io_handler.cc index b470abf5f69fb..20bcdbde88f52 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/io_handler.cc @@ -1,7 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -#include "Protocol.h" +#include "io_handler.h" #include "auth/Auth.h" @@ -9,7 +9,6 @@ #include "crimson/common/log.h" #include "crimson/net/Errors.h" #include "crimson/net/chained_dispatchers.h" -#include "crimson/net/SocketConnection.h" #include "crimson/net/SocketMessenger.h" #include "msg/Message.h" @@ -45,19 +44,19 @@ std::size_t get_msg_size(const FrameAssembler &rx_frame_asm) namespace crimson::net { -Protocol::Protocol(ChainedDispatchers& dispatchers, - SocketConnection& conn) +IOHandler::IOHandler(ChainedDispatchers &dispatchers, + SocketConnection &conn) : dispatchers(dispatchers), conn(conn) {} -Protocol::~Protocol() +IOHandler::~IOHandler() { ceph_assert(gate.is_closed()); assert(!out_exit_dispatching); } -ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent( +ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( bool require_keepalive, std::optional maybe_keepalive_ack, bool require_ack) @@ -120,7 +119,7 @@ ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent( return bl; } -seastar::future<> Protocol::send(MessageURef msg) +seastar::future<> IOHandler::send(MessageURef msg) { if (io_state != io_state_t::drop) { out_pending_msgs.push_back(std::move(msg)); @@ -129,7 +128,7 @@ seastar::future<> Protocol::send(MessageURef msg) return seastar::now(); } -seastar::future<> Protocol::send_keepalive() +seastar::future<> IOHandler::send_keepalive() { if (!need_keepalive) { need_keepalive = true; @@ -138,7 +137,7 @@ seastar::future<> Protocol::send_keepalive() return seastar::now(); } -void Protocol::mark_down() +void IOHandler::mark_down() { ceph_assert_always(io_state != io_state_t::none); need_dispatch_reset = false; @@ -149,10 +148,10 @@ void Protocol::mark_down() logger().info("{} mark_down() with {}", conn, io_stat_printer{*this}); set_io_state(io_state_t::drop); - notify_mark_down(); + handshake_listener->notify_mark_down(); } -void Protocol::print_io_stat(std::ostream &out) const +void IOHandler::print_io_stat(std::ostream &out) const { out << "io_stat(" << "io_state=" << fmt::format("{}", io_state) @@ -166,8 +165,8 @@ void Protocol::print_io_stat(std::ostream &out) const << ")"; } -void Protocol::set_io_state( - const Protocol::io_state_t &new_state, +void IOHandler::set_io_state( + const IOHandler::io_state_t &new_state, FrameAssemblerV2Ref fa) { ceph_assert_always(!( @@ -220,7 +219,7 @@ void Protocol::set_io_state( } } -seastar::future Protocol::wait_io_exit_dispatching() +seastar::future IOHandler::wait_io_exit_dispatching() { ceph_assert_always(io_state != io_state_t::open); ceph_assert_always(frame_assembler != nullptr); @@ -245,7 +244,7 @@ seastar::future Protocol::wait_io_exit_dispatching() }); } -void Protocol::reset_session(bool full) +void IOHandler::reset_session(bool full) { // reset in in_seq = 0; @@ -255,7 +254,7 @@ void Protocol::reset_session(bool full) } } -void Protocol::requeue_out_sent() +void IOHandler::requeue_out_sent() { assert(io_state != io_state_t::open); if (out_sent_msgs.empty()) { @@ -277,7 +276,7 @@ void Protocol::requeue_out_sent() notify_out_dispatch(); } -void Protocol::requeue_out_sent_up_to(seq_num_t seq) +void IOHandler::requeue_out_sent_up_to(seq_num_t seq) { assert(io_state != io_state_t::open); if (out_sent_msgs.empty() && out_pending_msgs.empty()) { @@ -299,7 +298,7 @@ void Protocol::requeue_out_sent_up_to(seq_num_t seq) requeue_out_sent(); } -void Protocol::reset_out() +void IOHandler::reset_out() { assert(io_state != io_state_t::open); out_seq = 0; @@ -310,7 +309,7 @@ void Protocol::reset_out() ack_left = 0; } -void Protocol::dispatch_accept() +void IOHandler::dispatch_accept() { if (io_state == io_state_t::drop) { return; @@ -322,7 +321,7 @@ void Protocol::dispatch_accept() seastar::static_pointer_cast(conn.shared_from_this())); } -void Protocol::dispatch_connect() +void IOHandler::dispatch_connect() { if (io_state == io_state_t::drop) { return; @@ -333,7 +332,7 @@ void Protocol::dispatch_connect() seastar::static_pointer_cast(conn.shared_from_this())); } -void Protocol::dispatch_reset(bool is_replace) +void IOHandler::dispatch_reset(bool is_replace) { ceph_assert_always(io_state == io_state_t::drop); if (!need_dispatch_reset) { @@ -345,7 +344,7 @@ void Protocol::dispatch_reset(bool is_replace) is_replace); } -void Protocol::dispatch_remote_reset() +void IOHandler::dispatch_remote_reset() { if (io_state == io_state_t::drop) { return; @@ -354,7 +353,7 @@ void Protocol::dispatch_remote_reset() seastar::static_pointer_cast(conn.shared_from_this())); } -void Protocol::ack_out_sent(seq_num_t seq) +void IOHandler::ack_out_sent(seq_num_t seq) { if (conn.policy.lossy) { // lossy connections don't keep sent messages return; @@ -368,7 +367,7 @@ void Protocol::ack_out_sent(seq_num_t seq) } } -seastar::future Protocol::try_exit_out_dispatch() { +seastar::future IOHandler::try_exit_out_dispatch() { assert(!is_out_queued()); return frame_assembler->flush( ).then([this] { @@ -392,7 +391,7 @@ seastar::future Protocol::try_exit_out_dispatch() { }); } -seastar::future<> Protocol::do_out_dispatch() +seastar::future<> IOHandler::do_out_dispatch() { return seastar::repeat([this] { switch (io_state) { @@ -403,7 +402,6 @@ seastar::future<> Protocol::do_out_dispatch() } auto to_ack = ack_left; assert(to_ack == 0 || in_seq > 0); - // sweep all pending out with the concrete Protocol return frame_assembler->write( sweep_out_pending_msgs_to_sent( need_keepalive, next_keepalive_ack, to_ack > 0) @@ -466,7 +464,7 @@ seastar::future<> Protocol::do_out_dispatch() eptr = std::current_exception(); } set_io_state(io_state_t::delay); - notify_out_fault("do_out_dispatch", eptr); + handshake_listener->notify_out_fault("do_out_dispatch", eptr); } else { logger().info("{} do_out_dispatch(): fault at {} -- {}", conn, io_state, e); @@ -476,9 +474,9 @@ seastar::future<> Protocol::do_out_dispatch() }); } -void Protocol::notify_out_dispatch() +void IOHandler::notify_out_dispatch() { - notify_out(); + handshake_listener->notify_out(); if (out_dispatching) { // already dispatching return; @@ -502,7 +500,7 @@ void Protocol::notify_out_dispatch() } seastar::future<> -Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size) +IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size) { return frame_assembler->read_frame_payload( ).then([this, throttle_stamp, msg_size](auto payload) { @@ -613,7 +611,7 @@ Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size) }); } -void Protocol::do_in_dispatch() +void IOHandler::do_in_dispatch() { ceph_assert_always(!in_exit_dispatching.has_value()); in_exit_dispatching = seastar::promise<>(); @@ -702,7 +700,7 @@ void Protocol::do_in_dispatch() logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}", conn, io_state, e_what); set_io_state(io_state_t::delay); - notify_out_fault("do_in_dispatch", eptr); + handshake_listener->notify_out_fault("do_in_dispatch", eptr); } else { logger().info("{} do_in_dispatch(): fault at {} -- {}", conn, io_state, e_what); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/io_handler.h similarity index 61% rename from src/crimson/net/Protocol.h rename to src/crimson/net/io_handler.h index 5bfdc71282bfb..d7beb9ccb6fdc 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/io_handler.h @@ -3,41 +3,30 @@ #pragma once -#include -#include #include #include "crimson/common/gated.h" -#include "crimson/common/log.h" #include "Fwd.h" #include "SocketConnection.h" #include "FrameAssemblerV2.h" namespace crimson::net { -class Protocol { -// public to SocketConnection - public: - Protocol(Protocol&&) = delete; - virtual ~Protocol(); - - virtual seastar::future<> close_clean_yielded() = 0; - -#ifdef UNIT_TESTS_BUILT - virtual bool is_closed_clean() const = 0; - - virtual bool is_closed() const = 0; - -#endif - virtual void start_connect(const entity_addr_t& peer_addr, - const entity_name_t& peer_name) = 0; - - virtual void start_accept(SocketRef&& socket, - const entity_addr_t& peer_addr) = 0; - - protected: - Protocol(ChainedDispatchers& dispatchers, - SocketConnection& conn); +/** + * HandshakeListener + * + * The interface class for IOHandler to notify the ProtocolV2 for handshake. + * + * The notifications may be cross-core and asynchronous. + */ +class HandshakeListener { +public: + virtual ~HandshakeListener() = default; + + HandshakeListener(const HandshakeListener&) = delete; + HandshakeListener(HandshakeListener &&) = delete; + HandshakeListener &operator=(const HandshakeListener &) = delete; + HandshakeListener &operator=(HandshakeListener &&) = delete; virtual void notify_out() = 0; @@ -45,39 +34,71 @@ class Protocol { virtual void notify_mark_down() = 0; -// the write state-machine - public: - using clock_t = seastar::lowres_system_clock; +protected: + HandshakeListener() = default; +}; - bool is_connected() const { +/** + * IOHandler + * + * Implements the message read and write paths after the handshake, and also be + * responsible to dispatch events. It is supposed to be working on the same + * core with the underlying socket and the FrameAssemblerV2 class. + */ +class IOHandler final : public ConnectionHandler { +public: + IOHandler(ChainedDispatchers &, + SocketConnection &); + + ~IOHandler() final; + + IOHandler(const IOHandler &) = delete; + IOHandler(IOHandler &&) = delete; + IOHandler &operator=(const IOHandler &) = delete; + IOHandler &operator=(IOHandler &&) = delete; + +/* + * as ConnectionHandler + */ +private: + bool is_connected() const final { return protocol_is_connected; } - seastar::future<> send(MessageURef msg); + seastar::future<> send(MessageURef msg) final; - seastar::future<> send_keepalive(); + seastar::future<> send_keepalive() final; - clock_t::time_point get_last_keepalive() const { + clock_t::time_point get_last_keepalive() const final { return last_keepalive; } - clock_t::time_point get_last_keepalive_ack() const { + clock_t::time_point get_last_keepalive_ack() const final { return last_keepalive_ack; } - void set_last_keepalive_ack(clock_t::time_point when) { + void set_last_keepalive_ack(clock_t::time_point when) final { last_keepalive_ack = when; } - void mark_down(); + void mark_down() final; + +/* + * as IOHandler to be called by ProtocolV2 handshake + * + * The calls may be cross-core and asynchronous + */ +public: + void set_handshake_listener(HandshakeListener &hl) { + ceph_assert_always(handshake_listener == nullptr); + handshake_listener = &hl; + } struct io_stat_printer { - const Protocol &protocol; + const IOHandler &io_handler; }; void print_io_stat(std::ostream &out) const; -// TODO: encapsulate a SessionedSender class - protected: seastar::future<> close_io( bool is_dispatch_reset, bool is_replace) { @@ -93,14 +114,14 @@ class Protocol { /** * io_state_t * - * The io_state is changed with protocol state atomically, indicating the - * IOHandler behavior of the according protocol state. + * The io_state is changed with the protocol state, to control the + * io behavior accordingly. */ enum class io_state_t : uint8_t { - none, - delay, - open, - drop + none, // no IO is possible as the connection is not available to the user yet. + delay, // IO is delayed until open. + open, // Dispatch In and Out concurrently. + drop // Drop IO as the connection is closed. }; friend class fmt::formatter; @@ -162,6 +183,8 @@ private: SocketConnection &conn; + HandshakeListener *handshake_listener = nullptr; + crimson::common::Gated gate; FrameAssemblerV2Ref frame_assembler; @@ -213,19 +236,19 @@ private: }; inline std::ostream& operator<<( - std::ostream& out, Protocol::io_stat_printer stat) { - stat.protocol.print_io_stat(out); + std::ostream& out, IOHandler::io_stat_printer stat) { + stat.io_handler.print_io_stat(out); return out; } } // namespace crimson::net template <> -struct fmt::formatter +struct fmt::formatter : fmt::formatter { template - auto format(crimson::net::Protocol::io_state_t state, FormatContext& ctx) { - using enum crimson::net::Protocol::io_state_t; + auto format(crimson::net::IOHandler::io_state_t state, FormatContext& ctx) { + using enum crimson::net::IOHandler::io_state_t; std::string_view name; switch (state) { case none: -- 2.39.5