From: Yingxin Cheng Date: Fri, 2 Dec 2022 08:28:36 +0000 (+0800) Subject: crimson/net: make it explict about the FrameAssemberV2 ownership X-Git-Tag: v18.1.0~375^2~15 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4fc9db08da90d792ea9c5a6f175f0256bf293725;p=ceph-ci.git crimson/net: make it explict about the FrameAssemberV2 ownership FrameAssemblerV2 is owned by ProtocolV2 during handshake, and owned by Protocol during messaging. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index 6165e2e57dc..1b38263335a 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -289,4 +289,9 @@ void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl) (int)main_preamble->num_segments, main_preamble->crc); } +FrameAssemblerV2Ref FrameAssemblerV2::create(SocketConnection &conn) +{ + return std::make_unique(conn); +} + } // namespace crimson::net diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index 3165a048fc1..06c5cb25eee 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -12,6 +12,8 @@ namespace crimson::net { class SocketConnection; +class FrameAssemblerV2; +using FrameAssemblerV2Ref = std::unique_ptr; class FrameAssemblerV2 { public: @@ -120,6 +122,8 @@ public: return write_flush(std::move(bl)); } + static FrameAssemblerV2Ref create(SocketConnection &conn); + private: bool has_socket() const; diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 6bdcf2db5c2..e79795a7cd5 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -48,8 +48,7 @@ namespace crimson::net { Protocol::Protocol(ChainedDispatchers& dispatchers, SocketConnection& conn) : dispatchers(dispatchers), - conn(conn), - frame_assembler(conn) + conn(conn) {} Protocol::~Protocol() @@ -68,17 +67,17 @@ ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent( if (unlikely(require_keepalive)) { auto keepalive_frame = KeepAliveFrame::Encode(); - bl.append(frame_assembler.get_buffer(keepalive_frame)); + bl.append(frame_assembler->get_buffer(keepalive_frame)); } if (unlikely(maybe_keepalive_ack.has_value())) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); - bl.append(frame_assembler.get_buffer(keepalive_ack_frame)); + bl.append(frame_assembler->get_buffer(keepalive_ack_frame)); } if (require_ack && num_msgs == 0u) { auto ack_frame = AckFrame::Encode(get_in_seq()); - bl.append(frame_assembler.get_buffer(ack_frame)); + bl.append(frame_assembler->get_buffer(ack_frame)); } std::for_each( @@ -108,7 +107,7 @@ ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent( msg->get_payload(), msg->get_middle(), msg->get_data()); logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); - bl.append(frame_assembler.get_buffer(message)); + bl.append(frame_assembler->get_buffer(message)); }); if (!conn.policy.lossy) { @@ -140,7 +139,8 @@ seastar::future<> Protocol::send_keepalive() } void Protocol::set_out_state( - const Protocol::out_state_t &new_state) + const Protocol::out_state_t &new_state, + FrameAssemblerV2Ref fa) { ceph_assert_always(!( (new_state == out_state_t::none && out_state != out_state_t::none) || @@ -149,25 +149,29 @@ void Protocol::set_out_state( )); bool dispatch_in = false; - if (out_state != out_state_t::open && - new_state == out_state_t::open) { + if (new_state == out_state_t::open) { // to open - ceph_assert_always(frame_assembler.is_socket_valid()); + assert(fa != nullptr); + ceph_assert_always(frame_assembler == nullptr); + frame_assembler = std::move(fa); + ceph_assert_always(frame_assembler->is_socket_valid()); dispatch_in = true; #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { conn.interceptor->register_conn_ready(conn); } #endif - } else if (out_state == out_state_t::open && - new_state != out_state_t::open) { + } else if (out_state == out_state_t::open) { // from open - ceph_assert_always(frame_assembler.is_socket_valid()); - frame_assembler.shutdown_socket(); + assert(fa == nullptr); + ceph_assert_always(frame_assembler->is_socket_valid()); + frame_assembler->shutdown_socket(); if (out_dispatching) { ceph_assert_always(!out_exit_dispatching.has_value()); - out_exit_dispatching = seastar::shared_promise<>(); + out_exit_dispatching = seastar::promise<>(); } + } else { + assert(fa == nullptr); } if (out_state != new_state) { @@ -176,32 +180,38 @@ void Protocol::set_out_state( out_state_changed = seastar::promise<>(); } - // The above needs to be atomic + /* + * not atomic below + */ + if (dispatch_in) { do_in_dispatch(); } } -seastar::future<> Protocol::wait_io_exit_dispatching() +seastar::future Protocol::wait_io_exit_dispatching() { ceph_assert_always(out_state != out_state_t::open); - ceph_assert_always(!frame_assembler.is_socket_valid()); + ceph_assert_always(frame_assembler != nullptr); + ceph_assert_always(!frame_assembler->is_socket_valid()); return seastar::when_all( [this] { if (out_exit_dispatching) { - return out_exit_dispatching->get_shared_future(); + return out_exit_dispatching->get_future(); } else { return seastar::now(); } }(), [this] { if (in_exit_dispatching) { - return in_exit_dispatching->get_shared_future(); + return in_exit_dispatching->get_future(); } else { return seastar::now(); } }() - ).discard_result(); + ).discard_result().then([this] { + return std::move(frame_assembler); + }); } void Protocol::requeue_out_sent() @@ -275,7 +285,8 @@ void Protocol::ack_out_sent(seq_num_t seq) seastar::future Protocol::try_exit_out_dispatch() { assert(!is_out_queued()); - return frame_assembler.flush().then([this] { + return frame_assembler->flush( + ).then([this] { if (!is_out_queued()) { // still nothing pending to send after flush, // the dispatching can ONLY stop now @@ -308,7 +319,7 @@ seastar::future<> Protocol::do_out_dispatch() auto to_ack = ack_left; assert(to_ack == 0 || in_seq > 0); // sweep all pending out with the concrete Protocol - return frame_assembler.write( + return frame_assembler->write( sweep_out_pending_msgs_to_sent( need_keepalive, next_keepalive_ack, to_ack > 0) ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { @@ -408,7 +419,7 @@ void Protocol::notify_out_dispatch() seastar::future<> Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size) { - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this, throttle_stamp, msg_size](auto payload) { if (unlikely(out_state != out_state_t::open)) { logger().debug("{} triggered {} during read_message()", @@ -520,10 +531,10 @@ Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size) void Protocol::do_in_dispatch() { ceph_assert_always(!in_exit_dispatching.has_value()); - in_exit_dispatching = seastar::shared_promise<>(); + in_exit_dispatching = seastar::promise<>(); gate.dispatch_in_background("do_in_dispatch", *this, [this] { return seastar::keep_doing([this] { - return frame_assembler.read_main_preamble( + return frame_assembler->read_main_preamble( ).then([this](auto ret) { switch (ret.tag) { case Tag::MESSAGE: { @@ -556,7 +567,7 @@ void Protocol::do_in_dispatch() }); } case Tag::ACK: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_message_ack() logic auto ack = AckFrame::Decode(payload->back()); @@ -564,7 +575,7 @@ void Protocol::do_in_dispatch() ack_out_sent(ack.seq()); }); case Tag::KEEPALIVE2: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_keepalive2() logic auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); @@ -577,7 +588,7 @@ void Protocol::do_in_dispatch() last_keepalive = seastar::lowres_system_clock::now(); }); case Tag::KEEPALIVE2_ACK: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_keepalive2_ack() logic auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back()); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 819c7aa9504..f1e7d2bb0da 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -105,9 +105,9 @@ class Protocol { }; friend class fmt::formatter; - void set_out_state(const out_state_t &new_state); + void set_out_state(const out_state_t &new_state, FrameAssemblerV2Ref fa=nullptr); - seastar::future<> wait_io_exit_dispatching(); + seastar::future wait_io_exit_dispatching(); void requeue_out_sent_up_to(seq_num_t seq); @@ -131,8 +131,6 @@ class Protocol { SocketConnection &conn; - FrameAssemblerV2 frame_assembler; - private: bool is_out_queued() const { return (!out_pending_msgs.empty() || @@ -160,6 +158,8 @@ class Protocol { crimson::common::Gated gate; + FrameAssemblerV2Ref frame_assembler; + /* * out states for writing */ @@ -171,10 +171,7 @@ class Protocol { bool out_dispatching = false; - // If another continuation is trying to close or replace socket when - // out_dispatching is true and out_state is open, it needs to wait for - // out_exit_dispatching until writing is stopped or failed. - std::optional> out_exit_dispatching; + std::optional> out_exit_dispatching; /// the seq num of the last transmitted message seq_num_t out_seq = 0; @@ -195,7 +192,7 @@ class Protocol { * in states for reading */ - std::optional> in_exit_dispatching; + std::optional> in_exit_dispatching; /// the seq num of the last received message seq_num_t in_seq = 0; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 2337ecfbc38..8c3c2e54806 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -160,6 +160,7 @@ ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers, SocketMessenger& messenger) : Protocol(dispatchers, conn), messenger{messenger}, + frame_assembler{FrameAssemblerV2::create(conn)}, auth_meta{seastar::make_lw_shared()}, protocol_timer{conn} {} @@ -198,7 +199,7 @@ void ProtocolV2::start_accept(SocketRef&& new_socket, ceph_assert(state == state_t::NONE); // until we know better conn.target_addr = _peer_addr; - frame_assembler.set_socket(std::move(new_socket)); + frame_assembler->set_socket(std::move(new_socket)); has_socket = true; is_socket_valid = true; logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); @@ -207,22 +208,49 @@ void ProtocolV2::start_accept(SocketRef&& new_socket, execute_accepting(); } -void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant) +void ProtocolV2::trigger_state(state_t new_state, out_state_t _out_state, bool reentrant) { - if (!reentrant && _state == state) { + if (!reentrant && new_state == state) { logger().error("{} is not allowed to re-trigger state {}", conn, get_state_name(state)); ceph_abort(); } if (state == state_t::CLOSING) { logger().error("{} CLOSING is not allowed to trigger state {}", - conn, get_state_name(_state)); + conn, get_state_name(new_state)); ceph_abort(); } logger().debug("{} TRIGGER {}, was {}", - conn, get_state_name(_state), get_state_name(state)); - state = _state; - set_out_state(_out_state); + conn, get_state_name(new_state), get_state_name(state)); + auto pre_state = state; + if (pre_state == state_t::READY) { + assert(!gate.is_closed()); + ceph_assert_always(!exit_io.has_value()); + exit_io = seastar::shared_promise<>(); + } + state = new_state; + if (new_state == state_t::READY) { + // I'm not responsible to shutdown the socket at READY + is_socket_valid = false; + set_out_state(_out_state, std::move(frame_assembler)); + } else { + set_out_state(_out_state, nullptr); + } + + /* + * not atomic below + */ + + if (pre_state == state_t::READY) { + gate.dispatch_in_background("exit_io", *this, [this] { + return wait_io_exit_dispatching( + ).then([this](FrameAssemblerV2Ref fa) { + frame_assembler = std::move(fa); + exit_io->set_value(); + exit_io = std::nullopt; + }); + }); + } } void ProtocolV2::fault( @@ -276,7 +304,7 @@ void ProtocolV2::fault( if (likely(has_socket)) { if (likely(is_socket_valid)) { ceph_assert_always(state != state_t::READY); - frame_assembler.shutdown_socket(); + frame_assembler->shutdown_socket(); is_socket_valid = false; } else { ceph_assert_always(state != state_t::ESTABLISHING); @@ -360,11 +388,11 @@ ProtocolV2::banner_exchange(bool is_connect) CEPH_MSGR2_REQUIRED_FEATURES, CEPH_BANNER_V2_PREFIX); INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE); - return frame_assembler.write_flush(std::move(bl)).then([this] { + return frame_assembler->write_flush(std::move(bl)).then([this] { // 2. read peer banner unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); - return frame_assembler.read_exactly(banner_len); // or read exactly? + return frame_assembler->read_exactly(banner_len); // or read exactly? }).then([this] (auto bl) { // 3. process peer banner and read banner_payload unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); @@ -394,7 +422,7 @@ ProtocolV2::banner_exchange(bool is_connect) } logger().debug("{} GOT banner: payload_len={}", conn, payload_len); INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); - return frame_assembler.read(payload_len); + return frame_assembler->read(payload_len); }).then([this, is_connect] (bufferlist bl) { // 4. process peer banner_payload and send HelloFrame auto p = bl.cbegin(); @@ -428,20 +456,20 @@ ProtocolV2::banner_exchange(bool is_connect) } peer_supported_features = _peer_supported_features; bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); - frame_assembler.set_is_rev1(is_rev1); + frame_assembler->set_is_rev1(is_rev1); auto hello = HelloFrame::Encode(messenger.get_mytype(), conn.target_addr); logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", conn, ceph_entity_type_name(messenger.get_mytype()), conn.target_addr); - return frame_assembler.write_flush_frame(hello); + return frame_assembler->write_flush_frame(hello); }).then([this] { //5. read peer HelloFrame - return frame_assembler.read_main_preamble(); + return frame_assembler->read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::HELLO, ret.tag, conn, __func__); - return frame_assembler.read_frame_payload(); + return frame_assembler->read_frame_payload(); }).then([this](auto payload) { // 6. process peer HelloFrame auto hello = HelloFrame::Decode(payload->back()); @@ -457,11 +485,11 @@ ProtocolV2::banner_exchange(bool is_connect) seastar::future<> ProtocolV2::handle_auth_reply() { - return frame_assembler.read_main_preamble( + return frame_assembler->read_main_preamble( ).then([this](auto ret) { switch (ret.tag) { case Tag::AUTH_BAD_METHOD: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_auth_bad_method() logic auto bad_method = AuthBadMethodFrame::Decode(payload->back()); @@ -482,7 +510,7 @@ seastar::future<> ProtocolV2::handle_auth_reply() return client_auth(bad_method.allowed_methods()); }); case Tag::AUTH_REPLY_MORE: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_auth_reply_more() logic auto auth_more = AuthReplyMoreFrame::Decode(payload->back()); @@ -495,12 +523,12 @@ seastar::future<> ProtocolV2::handle_auth_reply() auto more_reply = AuthRequestMoreFrame::Encode(reply); logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", conn, reply.length()); - return frame_assembler.write_flush_frame(more_reply); + return frame_assembler->write_flush_frame(more_reply); }).then([this] { return handle_auth_reply(); }); case Tag::AUTH_DONE: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_auth_done() logic auto auth_done = AuthDoneFrame::Decode(payload->back()); @@ -520,7 +548,7 @@ seastar::future<> ProtocolV2::handle_auth_reply() abort_in_fault(); } auth_meta->con_mode = auth_done.con_mode(); - frame_assembler.create_session_stream_handlers(*auth_meta, false); + frame_assembler->create_session_stream_handlers(*auth_meta, false); return finish_auth(); }); default: { @@ -544,7 +572,8 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods logger().debug("{} WRITE AuthRequestFrame: method={}," " preferred_modes={}, payload_len={}", conn, auth_method, preferred_modes, bl.length()); - return frame_assembler.write_flush_frame(frame).then([this] { + return frame_assembler->write_flush_frame(frame + ).then([this] { return handle_auth_reply(); }); } catch (const crimson::auth::error& e) { @@ -557,7 +586,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods seastar::future ProtocolV2::process_wait() { - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_wait() logic logger().debug("{} GOT WaitFrame", conn); @@ -592,12 +621,13 @@ ProtocolV2::client_connect() conn.policy.features_supported, conn.policy.features_required | msgr2_required, flags, client_cookie); - return frame_assembler.write_flush_frame(client_ident).then([this] { - return frame_assembler.read_main_preamble(); + return frame_assembler->write_flush_frame(client_ident + ).then([this] { + return frame_assembler->read_main_preamble(); }).then([this](auto ret) { switch (ret.tag) { case Tag::IDENT_MISSING_FEATURES: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_ident_missing_features() logic auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back()); @@ -610,7 +640,7 @@ ProtocolV2::client_connect() case Tag::WAIT: return process_wait(); case Tag::SERVER_IDENT: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_server_ident() logic requeue_out_sent(); @@ -695,12 +725,12 @@ ProtocolV2::client_reconnect() conn, messenger.get_myaddrs(), client_cookie, server_cookie, global_seq, connect_seq, get_in_seq()); - return frame_assembler.write_flush_frame(reconnect).then([this] { - return frame_assembler.read_main_preamble(); + return frame_assembler->write_flush_frame(reconnect).then([this] { + return frame_assembler->read_main_preamble(); }).then([this](auto ret) { switch (ret.tag) { case Tag::SESSION_RETRY_GLOBAL: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_session_retry_global() logic auto retry = RetryGlobalFrame::Decode(payload->back()); @@ -711,7 +741,7 @@ ProtocolV2::client_reconnect() return client_reconnect(); }); case Tag::SESSION_RETRY: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_session_retry() logic auto retry = RetryFrame::Decode(payload->back()); @@ -722,7 +752,7 @@ ProtocolV2::client_reconnect() return client_reconnect(); }); case Tag::SESSION_RESET: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} before reset_session()", @@ -738,7 +768,7 @@ ProtocolV2::client_reconnect() case Tag::WAIT: return process_wait(); case Tag::SESSION_RECONNECT_OK: - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_reconnect_ok() logic auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); @@ -771,7 +801,8 @@ void ProtocolV2::execute_connecting() assert(server_cookie == 0); logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); } - return wait_io_exit_dispatching().then([this] { + return wait_exit_io().then([this] { + ceph_assert_always(frame_assembler); if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} before Socket::connect()", conn, get_state_name(state)); @@ -789,14 +820,14 @@ void ProtocolV2::execute_connecting() }); } if (!has_socket) { - frame_assembler.set_socket(std::move(new_socket)); + frame_assembler->set_socket(std::move(new_socket)); has_socket = true; } else { gate.dispatch_in_background( "replace_socket_connecting", *this, [this, new_socket=std::move(new_socket)]() mutable { - return frame_assembler.replace_shutdown_socket(std::move(new_socket)); + return frame_assembler->replace_shutdown_socket(std::move(new_socket)); } ); } @@ -804,8 +835,8 @@ void ProtocolV2::execute_connecting() return seastar::now(); }).then([this] { auth_meta = seastar::make_lw_shared(); - frame_assembler.reset_handlers(); - frame_assembler.start_recording(); + frame_assembler->reset_handlers(); + frame_assembler->start_recording(); return banner_exchange(true); }).then([this] (auto&& ret) { auto [_peer_type, _my_addr_from_peer] = std::move(ret); @@ -820,7 +851,7 @@ void ProtocolV2::execute_connecting() conn, get_state_name(state)); abort_protocol(); } - frame_assembler.learn_socket_ephemeral_port_as_connector( + frame_assembler->learn_socket_ephemeral_port_as_connector( _my_addr_from_peer.get_port()); if (unlikely(_my_addr_from_peer.is_legacy())) { logger().warn("{} peer sent a legacy address for me: {}", @@ -865,7 +896,7 @@ void ProtocolV2::execute_connecting() case next_step_t::wait: { logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); ceph_assert_always(is_socket_valid); - frame_assembler.shutdown_socket(); + frame_assembler->shutdown_socket(); is_socket_valid = false; execute_wait(true); break; @@ -894,7 +925,8 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r) "allowed_methods={}, allowed_modes={})", conn, auth_meta->auth_method, cpp_strerror(r), allowed_methods, allowed_modes); - return frame_assembler.write_flush_frame(bad_method).then([this] { + return frame_assembler->write_flush_frame(bad_method + ).then([this] { return server_auth(); }); } @@ -920,9 +952,10 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", conn, conn.peer_global_id, ceph_con_mode_name(auth_meta->con_mode), reply.length()); - return frame_assembler.write_flush_frame(auth_done).then([this] { + return frame_assembler->write_flush_frame(auth_done + ).then([this] { ceph_assert(auth_meta); - frame_assembler.create_session_stream_handlers(*auth_meta, true); + frame_assembler->create_session_stream_handlers(*auth_meta, true); return finish_auth(); }); } @@ -931,11 +964,12 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo auto more = AuthReplyMoreFrame::Encode(reply); logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", conn, reply.length()); - return frame_assembler.write_flush_frame(more).then([this] { - return frame_assembler.read_main_preamble(); + return frame_assembler->write_flush_frame(more + ).then([this] { + return frame_assembler->read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__); - return frame_assembler.read_frame_payload(); + return frame_assembler->read_frame_payload(); }).then([this](auto payload) { auto auth_more = AuthRequestMoreFrame::Decode(payload->back()); logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}", @@ -957,10 +991,10 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo seastar::future<> ProtocolV2::server_auth() { - return frame_assembler.read_main_preamble( + return frame_assembler->read_main_preamble( ).then([this](auto ret) { expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__); - return frame_assembler.read_frame_payload(); + return frame_assembler->read_frame_payload(); }).then([this](auto payload) { // handle_auth_request() logic auto request = AuthRequestFrame::Decode(payload->back()); @@ -999,7 +1033,8 @@ ProtocolV2::send_wait() { auto wait = WaitFrame::Encode(); logger().debug("{} WRITE WaitFrame", conn); - return frame_assembler.write_flush_frame(wait).then([] { + return frame_assembler->write_flush_frame(wait + ).then([] { return next_step_t::wait; }); } @@ -1017,7 +1052,7 @@ ProtocolV2::reuse_connection( existing_proto->trigger_replacing(reconnect, do_reset, - frame_assembler.to_replace(), + frame_assembler->to_replace(), std::move(auth_meta), peer_global_seq, client_cookie, @@ -1161,7 +1196,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) seastar::future ProtocolV2::server_connect() { - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_client_ident() logic auto client_ident = ClientIdentFrame::Decode(payload->back()); @@ -1213,7 +1248,8 @@ ProtocolV2::server_connect() auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", conn, feat_missing); - return frame_assembler.write_flush_frame(ident_missing_features).then([] { + return frame_assembler->write_flush_frame(ident_missing_features + ).then([] { return next_step_t::wait; }); } @@ -1251,7 +1287,7 @@ ProtocolV2::server_connect() seastar::future ProtocolV2::read_reconnect() { - return frame_assembler.read_main_preamble( + return frame_assembler->read_main_preamble( ).then([this](auto ret) { expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect"); return server_reconnect(); @@ -1263,7 +1299,8 @@ ProtocolV2::send_retry(uint64_t connect_seq) { auto retry = RetryFrame::Encode(connect_seq); logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); - return frame_assembler.write_flush_frame(retry).then([this] { + return frame_assembler->write_flush_frame(retry + ).then([this] { return read_reconnect(); }); } @@ -1273,7 +1310,8 @@ ProtocolV2::send_retry_global(uint64_t global_seq) { auto retry = RetryGlobalFrame::Encode(global_seq); logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); - return frame_assembler.write_flush_frame(retry).then([this] { + return frame_assembler->write_flush_frame(retry + ).then([this] { return read_reconnect(); }); } @@ -1283,8 +1321,9 @@ ProtocolV2::send_reset(bool full) { auto reset = ResetFrame::Encode(full); logger().warn("{} WRITE ResetFrame: full={}", conn, full); - return frame_assembler.write_flush_frame(reset).then([this] { - return frame_assembler.read_main_preamble(); + return frame_assembler->write_flush_frame(reset + ).then([this] { + return frame_assembler->read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset"); return server_connect(); @@ -1294,7 +1333,7 @@ ProtocolV2::send_reset(bool full) seastar::future ProtocolV2::server_reconnect() { - return frame_assembler.read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_reconnect() logic auto reconnect = ReconnectFrame::Decode(payload->back()); @@ -1443,8 +1482,8 @@ void ProtocolV2::execute_accepting() return seastar::futurize_invoke([this] { INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared(); - frame_assembler.reset_handlers(); - frame_assembler.start_recording(); + frame_assembler->reset_handlers(); + frame_assembler->start_recording(); return banner_exchange(false); }).then([this] (auto&& ret) { auto [_peer_type, _my_addr_from_peer] = std::move(ret); @@ -1468,7 +1507,7 @@ void ProtocolV2::execute_accepting() messenger.learned_addr(_my_addr_from_peer, conn); return server_auth(); }).then([this] { - return frame_assembler.read_main_preamble(); + return frame_assembler->read_main_preamble(); }).then([this](auto ret) { switch (ret.tag) { case Tag::CLIENT_IDENT: @@ -1511,16 +1550,17 @@ seastar::future<> ProtocolV2::finish_auth() { ceph_assert(auth_meta); - auto records = frame_assembler.stop_recording(); + auto records = frame_assembler->stop_recording(); const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf); auto sig_frame = AuthSignatureFrame::Encode(sig); logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); - return frame_assembler.write_flush_frame(sig_frame).then([this] { - return frame_assembler.read_main_preamble(); + return frame_assembler->write_flush_frame(sig_frame + ).then([this] { + return frame_assembler->read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth"); - return frame_assembler.read_frame_payload(); + return frame_assembler->read_frame_payload(); }).then([this, txbuf=std::move(records.txbuf)](auto payload) { // handle_auth_signature() logic auto sig_frame = AuthSignatureFrame::Decode(payload->back()); @@ -1634,7 +1674,7 @@ ProtocolV2::send_server_ident() conn.policy.features_required | msgr2_required, flags, server_cookie); - return frame_assembler.write_flush_frame(server_ident); + return frame_assembler->write_flush_frame(server_ident); } // REPLACING state @@ -1655,7 +1695,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, ceph_assert_always(has_socket); ceph_assert_always(!mover.socket->is_shutdown()); if (is_socket_valid) { - frame_assembler.shutdown_socket(); + frame_assembler->shutdown_socket(); is_socket_valid = false; } gate.dispatch_in_background("trigger_replacing", *this, @@ -1672,8 +1712,9 @@ void ProtocolV2::trigger_replacing(bool reconnect, dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); // state may become CLOSING, close mover.socket and abort later - return wait_io_exit_dispatching( + return wait_exit_io( ).then([this] { + ceph_assert_always(frame_assembler); protocol_timer.cancel(); auto done = std::move(execution_done); execution_done = seastar::now(); @@ -1704,7 +1745,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, "replace_frame_assembler", *this, [this, mover=std::move(mover)]() mutable { - return frame_assembler.replace_by(std::move(mover)); + return frame_assembler->replace_by(std::move(mover)); } ); is_socket_valid = true; @@ -1715,7 +1756,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, requeue_out_sent_up_to(new_msg_seq); auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq()); logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq()); - return frame_assembler.write_flush_frame(reconnect_ok); + return frame_assembler->write_flush_frame(reconnect_ok); } else { client_cookie = new_client_cookie; assert(conn.get_peer_type() == new_peer_name.type()); @@ -1725,7 +1766,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, conn.set_features(new_conn_features); peer_supported_features = new_peer_supported_features; bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); - frame_assembler.set_is_rev1(is_rev1); + frame_assembler->set_is_rev1(is_rev1); return send_server_ident(); } }).then([this, reconnect] { @@ -1759,8 +1800,6 @@ void ProtocolV2::execute_ready() assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); protocol_timer.cancel(); ceph_assert_always(is_socket_valid); - // I'm not responsible to shutdown the socket at READY - is_socket_valid = false; trigger_state(state_t::READY, out_state_t::open, false); } @@ -1820,7 +1859,8 @@ void ProtocolV2::execute_server_wait() ceph_assert_always(is_socket_valid); trigger_state(state_t::SERVER_WAIT, out_state_t::none, false); gated_execute("execute_server_wait", [this] { - return frame_assembler.read_exactly(1).then([this] (auto bl) { + return frame_assembler->read_exactly(1 + ).then([this](auto bl) { logger().warn("{} SERVER_WAIT got read, abort", conn); abort_in_fault(); }).handle_exception([this] (std::exception_ptr eptr) { @@ -1899,7 +1939,7 @@ void ProtocolV2::do_close( (*f_accept_new)(); } if (is_socket_valid) { - frame_assembler.shutdown_socket(); + frame_assembler->shutdown_socket(); is_socket_valid = false; } assert(!gate.is_closed()); @@ -1917,8 +1957,10 @@ void ProtocolV2::do_close( closed_clean_fut = seastar::when_all( std::move(gate_closed), std::move(out_closed) ).discard_result().then([this] { + ceph_assert_always(!exit_io.has_value()); if (has_socket) { - return frame_assembler.close_shutdown_socket(); + ceph_assert_always(frame_assembler); + return frame_assembler->close_shutdown_socket(); } else { return seastar::now(); } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 206af1213d5..0b915540adc 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -49,6 +49,14 @@ class ProtocolV2 final : public Protocol { void notify_out_fault(const char *, std::exception_ptr) override; + seastar::future<> wait_exit_io() { + if (exit_io.has_value()) { + return exit_io->get_shared_future(); + } else { + return seastar::now(); + } + } + private: SocketMessenger &messenger; @@ -57,6 +65,10 @@ class ProtocolV2 final : public Protocol { // the socket exists and it is not shutdown bool is_socket_valid = false; + FrameAssemblerV2Ref frame_assembler; + + std::optional> exit_io; + AuthConnectionMetaRef auth_meta; crimson::common::Gated gate;