From f7b622b231d8dbf3f677e4cff4be7d77afcb00d8 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 25 Nov 2022 10:08:52 +0800 Subject: [PATCH] crimson/net: refactor socket managements Previously, the socket state is transparent to the protocol and implicitly handled. Move the responsibilities into protocol for finer controls to further decouple the IO and handshake. Also, refactor the fault handling and make the in/out message dispatching more symmetric. Signed-off-by: Yingxin Cheng --- src/crimson/net/FrameAssemblerV2.cc | 51 +++--- src/crimson/net/FrameAssemblerV2.h | 12 +- src/crimson/net/Protocol.cc | 11 +- src/crimson/net/Protocol.h | 2 + src/crimson/net/ProtocolV2.cc | 260 +++++++++++++++++++--------- src/crimson/net/ProtocolV2.h | 19 +- src/crimson/net/Socket.cc | 3 +- src/crimson/net/Socket.h | 12 ++ 8 files changed, 255 insertions(+), 115 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index cf419897460..034cf8ed4ca 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -50,7 +50,7 @@ void FrameAssemblerV2::reset_handlers() FrameAssemblerV2::mover_t FrameAssemblerV2::to_replace() { - assert(has_socket()); + assert(is_socket_valid()); socket = nullptr; return mover_t{ std::move(conn.socket), @@ -58,14 +58,14 @@ FrameAssemblerV2::to_replace() std::move(session_comp_handlers)}; } -void FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover) +seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover) { - set_socket(std::move(mover.socket)); record_io = false; rxbuf.clear(); txbuf.clear(); session_stream_handlers = std::move(mover.session_stream_handlers); session_comp_handlers = std::move(mover.session_comp_handlers); + return replace_shutdown_socket(std::move(mover.socket)); } void FrameAssemblerV2::start_recording() @@ -89,13 +89,17 @@ bool FrameAssemblerV2::has_socket() const return socket != nullptr; } -void FrameAssemblerV2::set_socket(SocketRef &&_socket) +bool FrameAssemblerV2::is_socket_valid() const +{ + return has_socket() && !socket->is_shutdown(); +} + +void FrameAssemblerV2::set_socket(SocketRef &&new_socket) { assert(!has_socket()); - ceph_assert_always(!conn.socket); - socket = _socket.get(); - conn.socket = std::move(_socket); - assert(has_socket()); + socket = new_socket.get(); + conn.socket = std::move(new_socket); + assert(is_socket_valid()); } void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port) @@ -106,23 +110,26 @@ void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port) void FrameAssemblerV2::shutdown_socket() { - if (has_socket()) { - socket->shutdown(); - } + assert(is_socket_valid()); + socket->shutdown(); } -seastar::future<> FrameAssemblerV2::reset_and_close_socket(bool do_reset) +seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketRef &&new_socket) { - if (!has_socket()) { - return seastar::now(); - } - if (do_reset) { - socket = nullptr; - return conn.socket->close( - ).then([sock = std::move(conn.socket)] {}); - } else { - return socket->close(); - } + assert(has_socket()); + assert(socket->is_shutdown()); + socket = nullptr; + auto old_socket = std::move(conn.socket); + set_socket(std::move(new_socket)); + return old_socket->close( + ).then([sock = std::move(old_socket)] {}); +} + +seastar::future<> FrameAssemblerV2::close_shutdown_socket() +{ + assert(has_socket()); + assert(socket->is_shutdown()); + return socket->close(); } seastar::future diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index f72eeeeae45..b3ee3e03a3c 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -43,7 +43,7 @@ public: mover_t to_replace(); - void replace_by(mover_t &&); + seastar::future<> replace_by(mover_t &&); /* * auth signature interfaces @@ -61,15 +61,15 @@ public: * socket maintainence interfaces */ - bool has_socket() const; - void set_socket(SocketRef &&); void learn_socket_ephemeral_port_as_connector(uint16_t port); void shutdown_socket(); - seastar::future<> reset_and_close_socket(bool do_reset=true); + seastar::future<> replace_shutdown_socket(SocketRef &&); + + seastar::future<> close_shutdown_socket(); /* * socket read and write interfaces @@ -115,6 +115,10 @@ public: } private: + bool has_socket() const; + + bool is_socket_valid() const; + void log_main_preamble(const ceph::bufferlist &bl); SocketConnection &conn; diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index f8bb3b7e932..58edd882561 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -242,8 +242,15 @@ seastar::future<> Protocol::do_out_dispatch() conn, out_state, e); ceph_abort(); } - ceph_assert_always(frame_assembler.has_socket()); - frame_assembler.shutdown_socket(); + + std::exception_ptr eptr; + try { + throw e; + } catch(...) { + eptr = std::current_exception(); + } + notify_out_fault(eptr); + if (out_state == out_state_t::open) { logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}", conn, out_state, e); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index c352683dcb7..c7bc71630b9 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -54,6 +54,8 @@ class Protocol { virtual void notify_out() = 0; + virtual void notify_out_fault(std::exception_ptr) = 0; + // the write state-machine public: using clock_t = seastar::lowres_system_clock; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 083978dd96f..8514da381e8 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -193,7 +193,6 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, const entity_name_t& _peer_name) { ceph_assert(state == state_t::NONE); - ceph_assert(!frame_assembler.has_socket()); ceph_assert(!gate.is_closed()); conn.peer_addr = _peer_addr; conn.target_addr = _peer_addr; @@ -210,14 +209,15 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, execute_connecting(); } -void ProtocolV2::start_accept(SocketRef&& sock, +void ProtocolV2::start_accept(SocketRef&& new_socket, const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::NONE); - ceph_assert(!frame_assembler.has_socket()); // until we know better conn.target_addr = _peer_addr; - frame_assembler.set_socket(std::move(sock)); + frame_assembler.set_socket(std::move(new_socket)); + has_socket = true; + is_socket_valid = true; logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); messenger.accept_conn( seastar::static_pointer_cast(conn.shared_from_this())); @@ -272,24 +272,103 @@ void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reen set_out_state(_out_state); } -void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr) +void ProtocolV2::fault( + state_t expected_state, + const char *where, + std::exception_ptr eptr) { - if (conn.policy.lossy) { - logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}", - conn, func_name, get_state_name(state), eptr); + assert(expected_state == state_t::CONNECTING || + expected_state == state_t::ESTABLISHING || + expected_state == state_t::REPLACING || + expected_state == state_t::READY); + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + + if (state != expected_state) { + logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}", + conn, + get_state_name(expected_state), + where, + get_state_name(state), + e_what); +#ifndef NDEBUG + if (expected_state == state_t::REPLACING) { + assert(state == state_t::CLOSING); + } else if (expected_state == state_t::READY) { + assert(state == state_t::CLOSING || + state == state_t::REPLACING || + state == state_t::CONNECTING || + state == state_t::STANDBY); + } else { + assert(state == state_t::CLOSING || + state == state_t::REPLACING); + } +#endif + return; + } + assert(state == expected_state); + + if (state != state_t::CONNECTING && conn.policy.lossy) { + // socket will be shutdown in do_close() + logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}", + conn, get_state_name(state), where, e_what); do_close(true); - } else if (conn.policy.server || - (conn.policy.standby && !is_out_queued_or_sent())) { - logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}", - conn, func_name, get_state_name(state), eptr); + return; + } + + if (likely(has_socket)) { + if (likely(is_socket_valid)) { + frame_assembler.shutdown_socket(); + is_socket_valid = false; + } else { + ceph_assert_always(state == state_t::CONNECTING || + state == state_t::REPLACING); + } + } else { // !has_socket + ceph_assert_always(state == state_t::CONNECTING); + assert(!is_socket_valid); + } + + if (conn.policy.server || + (conn.policy.standby && !is_out_queued_or_sent())) { + if (conn.policy.server) { + logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}", + conn, + get_state_name(state), + where, + io_stat_printer{*this}, + e_what); + } else { + logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}", + conn, + get_state_name(state), + where, + io_stat_printer{*this}, + e_what); + } execute_standby(); - } else if (backoff) { - logger().info("{} {}: fault at {}, going to WAIT -- {}", - conn, func_name, get_state_name(state), eptr); + } else if (state == state_t::CONNECTING || + state == state_t::REPLACING) { + logger().info("{} protocol {} {} fault, going to WAIT {} -- {}", + conn, + get_state_name(state), + where, + io_stat_printer{*this}, + e_what); execute_wait(false); } else { - logger().info("{} {}: fault at {}, going to CONNECTING -- {}", - conn, func_name, get_state_name(state), eptr); + assert(state == state_t::READY || + state == state_t::ESTABLISHING); + logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}", + conn, + get_state_name(state), + where, + io_stat_printer{*this}, + e_what); execute_connecting(); } } @@ -725,8 +804,8 @@ ProtocolV2::client_reconnect() void ProtocolV2::execute_connecting() { + ceph_assert_always(!is_socket_valid); trigger_state(state_t::CONNECTING, out_state_t::delay, false); - frame_assembler.shutdown_socket(); gated_execute("execute_connecting", [this] { global_seq = messenger.get_global_seq(); assert(client_cookie != 0); @@ -739,28 +818,39 @@ void ProtocolV2::execute_connecting() assert(server_cookie == 0); logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); } - return wait_out_exit_dispatching().then([this] { + return seastar::when_all( + wait_out_exit_dispatching(), + wait_in_exit_dispatching() + ).discard_result().then([this] { if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} before Socket::connect()", conn, get_state_name(state)); abort_protocol(); } - gate.dispatch_in_background( - "reset_close_socket_connecting", - *this, - [this] { return frame_assembler.reset_and_close_socket(); }); INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING); return Socket::connect(conn.peer_addr); - }).then([this](SocketRef sock) { + }).then([this](SocketRef new_socket) { logger().debug("{} socket connected", conn); if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} during Socket::connect()", conn, get_state_name(state)); - return sock->close().then([sock = std::move(sock)] { + return new_socket->close().then([sock=std::move(new_socket)] { abort_protocol(); }); } - frame_assembler.set_socket(std::move(sock)); + if (!has_socket) { + frame_assembler.set_socket(std::move(new_socket)); + has_socket = true; + } else { + gate.dispatch_in_background( + "replace_socket_connecting", + *this, + [this, new_socket=std::move(new_socket)]() mutable { + return frame_assembler.replace_shutdown_socket(std::move(new_socket)); + } + ); + } + is_socket_valid = true; return seastar::now(); }).then([this] { auth_meta = seastar::make_lw_shared(); @@ -817,6 +907,9 @@ void ProtocolV2::execute_connecting() } case next_step_t::wait: { logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); + ceph_assert_always(is_socket_valid); + frame_assembler.shutdown_socket(); + is_socket_valid = false; execute_wait(true); break; } @@ -824,26 +917,8 @@ void ProtocolV2::execute_connecting() ceph_abort("impossible next step"); } } - }).handle_exception([this] (std::exception_ptr eptr) { - if (state != state_t::CONNECTING) { - logger().info("{} execute_connecting(): protocol aborted at {} -- {}", - conn, get_state_name(state), eptr); - assert(state == state_t::CLOSING || - state == state_t::REPLACING); - return; - } - - if (conn.policy.server || - (conn.policy.standby && !is_out_queued_or_sent())) { - logger().info("{} execute_connecting(): fault at {} with nothing to send," - " going to STANDBY -- {}", - conn, get_state_name(state), eptr); - execute_standby(); - } else { - logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}", - conn, get_state_name(state), eptr); - execute_wait(false); - } + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::CONNECTING, "execute_connecting", eptr); }); }); } @@ -994,6 +1069,9 @@ ProtocolV2::reuse_connection( peer_supported_features, conn_seq, msg_seq); + ceph_assert_always(has_socket && is_socket_valid); + is_socket_valid = false; + has_socket = false; #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { conn.interceptor->register_conn_replaced(conn); @@ -1402,6 +1480,7 @@ ProtocolV2::server_reconnect() void ProtocolV2::execute_accepting() { + assert(is_socket_valid); trigger_state(state_t::ACCEPTING, out_state_t::none, false); gate.dispatch_in_background("execute_accepting", *this, [this] { return seastar::futurize_invoke([this] { @@ -1513,6 +1592,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { conn.shared_from_this())); }; + ceph_assert_always(is_socket_valid); trigger_state(state_t::ESTABLISHING, out_state_t::delay, false); if (existing_conn) { static_cast(existing_conn->protocol.get())->do_close( @@ -1550,15 +1630,8 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { client_cookie, server_cookie, io_stat_printer{*this}); execute_ready(false); - }).handle_exception([this] (std::exception_ptr eptr) { - if (state != state_t::ESTABLISHING) { - logger().info("{} execute_establishing() protocol aborted at {} -- {}", - conn, get_state_name(state), eptr); - assert(state == state_t::CLOSING || - state == state_t::REPLACING); - return; - } - fault(false, "execute_establishing()", eptr); + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::ESTABLISHING, "execute_establishing", eptr); }); }); } @@ -1622,7 +1695,12 @@ void ProtocolV2::trigger_replacing(bool reconnect, uint64_t new_msg_seq) { trigger_state(state_t::REPLACING, out_state_t::delay, false); - frame_assembler.shutdown_socket(); + ceph_assert_always(has_socket); + ceph_assert_always(!mover.socket->is_shutdown()); + if (is_socket_valid) { + frame_assembler.shutdown_socket(); + is_socket_valid = false; + } gate.dispatch_in_background("trigger_replacing", *this, [this, reconnect, @@ -1637,8 +1715,10 @@ void ProtocolV2::trigger_replacing(bool reconnect, dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); // state may become CLOSING, close mover.socket and abort later - return wait_out_exit_dispatching( - ).then([this] { + return seastar::when_all( + wait_out_exit_dispatching(), + wait_in_exit_dispatching() + ).discard_result().then([this] { protocol_timer.cancel(); auto done = std::move(execution_done); execution_done = seastar::now(); @@ -1663,13 +1743,16 @@ void ProtocolV2::trigger_replacing(bool reconnect, }); } - gate.dispatch_in_background( - "reset_close_socket_replacing", - *this, - [this] { return frame_assembler.reset_and_close_socket(); }); auth_meta = std::move(new_auth_meta); peer_global_seq = new_peer_global_seq; - frame_assembler.replace_by(std::move(mover)); + gate.dispatch_in_background( + "replace_frame_assembler", + *this, + [this, mover=std::move(mover)]() mutable { + return frame_assembler.replace_by(std::move(mover)); + } + ); + is_socket_valid = true; if (reconnect) { connect_seq = new_connect_seq; @@ -1703,14 +1786,8 @@ void ProtocolV2::trigger_replacing(bool reconnect, client_cookie, server_cookie, io_stat_printer{*this}); execute_ready(false); - }).handle_exception([this] (std::exception_ptr eptr) { - if (state != state_t::REPLACING) { - logger().info("{} trigger_replacing(): protocol aborted at {} -- {}", - conn, get_state_name(state), eptr); - assert(state == state_t::CLOSING); - return; - } - fault(true, "trigger_replacing()", eptr); + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::REPLACING, "trigger_replacing", eptr); }); }); } @@ -1772,6 +1849,11 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( return bl; } +void ProtocolV2::notify_out_fault(std::exception_ptr eptr) +{ + fault(state_t::READY, "notify_out_fault", eptr); +} + seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size) { return frame_assembler.read_frame_payload( @@ -1879,6 +1961,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t m void ProtocolV2::execute_ready(bool dispatch_connect) { + ceph_assert_always(is_socket_valid); assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); trigger_state(state_t::READY, out_state_t::open, false); if (dispatch_connect) { @@ -1895,7 +1978,9 @@ void ProtocolV2::execute_ready(bool dispatch_connect) conn.interceptor->register_conn_ready(conn); } #endif - gated_execute("execute_ready", [this] { + ceph_assert_always(!in_exit_dispatching.has_value()); + in_exit_dispatching = seastar::shared_promise<>(); + gate.dispatch_in_background("execute_ready", *this, [this] { protocol_timer.cancel(); return seastar::keep_doing([this] { return read_main_preamble( @@ -1965,15 +2050,12 @@ void ProtocolV2::execute_ready(bool dispatch_connect) } } }); - }).handle_exception([this] (std::exception_ptr eptr) { - if (state != state_t::READY) { - logger().info("{} execute_ready(): protocol aborted at {} -- {}", - conn, get_state_name(state), eptr); - assert(state == state_t::REPLACING || - state == state_t::CLOSING); - return; - } - fault(false, "execute_ready()", eptr); + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::READY, "execute_ready", eptr); + }).finally([this] { + ceph_assert_always(in_exit_dispatching.has_value()); + in_exit_dispatching->set_value(); + in_exit_dispatching = std::nullopt; }); }); } @@ -1982,8 +2064,8 @@ void ProtocolV2::execute_ready(bool dispatch_connect) void ProtocolV2::execute_standby() { + ceph_assert_always(!is_socket_valid); trigger_state(state_t::STANDBY, out_state_t::delay, false); - frame_assembler.shutdown_socket(); } void ProtocolV2::notify_out() @@ -1999,8 +2081,8 @@ void ProtocolV2::notify_out() void ProtocolV2::execute_wait(bool max_backoff) { + ceph_assert_always(!is_socket_valid); trigger_state(state_t::WAIT, out_state_t::delay, false); - frame_assembler.shutdown_socket(); gated_execute("execute_wait", [this, max_backoff] { double backoff = protocol_timer.last_dur(); if (max_backoff) { @@ -2031,6 +2113,7 @@ void ProtocolV2::execute_wait(bool max_backoff) void ProtocolV2::execute_server_wait() { + ceph_assert_always(is_socket_valid); trigger_state(state_t::SERVER_WAIT, out_state_t::none, false); gated_execute("execute_server_wait", [this] { return frame_assembler.read_exactly(1).then([this] (auto bl) { @@ -2111,7 +2194,10 @@ void ProtocolV2::do_close( if (f_accept_new) { (*f_accept_new)(); } - frame_assembler.shutdown_socket(); + if (is_socket_valid) { + frame_assembler.shutdown_socket(); + is_socket_valid = false; + } assert(!gate.is_closed()); auto gate_closed = gate.close(); auto out_closed = close_out(); @@ -2127,7 +2213,11 @@ void ProtocolV2::do_close( closed_clean_fut = seastar::when_all( std::move(gate_closed), std::move(out_closed) ).discard_result().then([this] { - return frame_assembler.reset_and_close_socket(false); + if (has_socket) { + return frame_assembler.close_shutdown_socket(); + } else { + return seastar::now(); + } }).then([this] { logger().debug("{} closed!", conn); messenger.closed_conn( diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index d9216570b7a..b75c008f0d5 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -54,9 +54,16 @@ class ProtocolV2 final : public Protocol { void notify_out() override; + void notify_out_fault(std::exception_ptr) override; + private: SocketMessenger &messenger; + bool has_socket = false; + + // the socket exists and it is not shutdown + bool is_socket_valid = false; + AuthConnectionMetaRef auth_meta; crimson::common::Gated gate; @@ -108,6 +115,14 @@ class ProtocolV2 final : public Protocol { uint64_t peer_global_seq = 0; uint64_t connect_seq = 0; + std::optional> in_exit_dispatching; + seastar::future<> wait_in_exit_dispatching() { + if (in_exit_dispatching.has_value()) { + return in_exit_dispatching->get_shared_future(); + } + return seastar::now(); + } + seastar::future<> execution_done = seastar::now(); template @@ -159,7 +174,9 @@ class ProtocolV2 final : public Protocol { template seastar::future<> write_flush_frame(F &tx_frame); - void fault(bool backoff, const char* func_name, std::exception_ptr eptr); + void fault(state_t expected_state, + const char *where, + std::exception_ptr eptr); void reset_session(bool is_full); seastar::future> diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index d58fb3987b3..304f4dc16be 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -119,6 +119,7 @@ Socket::read_exactly(size_t bytes) { } void Socket::shutdown() { + socket_is_shutdown = true; socket.shutdown_input(); socket.shutdown_output(); } @@ -210,7 +211,7 @@ seastar::future<> Socket::try_trap_post(bp_action_t& trap) { break; case bp_action_t::STALL: logger().info("[Test] got STALL and block"); - shutdown(); + force_shutdown(); return blocker->block(); default: ceph_abort("unexpected action from trap"); diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index a533d3180b2..b6125eb8a02 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -42,6 +42,7 @@ class Socket // the default buffer size 8192 is too small that may impact our write // performance. see seastar::net::connected_socket::output() out(socket.output(65536)), + socket_is_shutdown(false), side(_side), ephemeral_port(e_port) {} @@ -109,6 +110,10 @@ class Socket #endif } + bool is_shutdown() const { + return socket_is_shutdown; + } + // preemptively disable further reads or writes, can only be shutdown once. void shutdown(); @@ -119,6 +124,12 @@ class Socket static void inject_failure(); + // shutdown for tests + void force_shutdown() { + socket.shutdown_input(); + socket.shutdown_output(); + } + // shutdown input_stream only, for tests void force_shutdown_in() { socket.shutdown_input(); @@ -155,6 +166,7 @@ class Socket seastar::connected_socket socket; seastar::input_stream in; seastar::output_stream out; + bool socket_is_shutdown; side_t side; uint16_t ephemeral_port; -- 2.39.5