From: Yingxin Cheng Date: Thu, 3 Aug 2023 07:26:54 +0000 (+0800) Subject: crimson/net: rework interceptor to support cross-core interceptions X-Git-Tag: v19.0.0~627^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6be5e630ed79e09e0b77c481856f56a6b27ea562;p=ceph.git crimson/net: rework interceptor to support cross-core interceptions Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index bb48138a81fb7..273a6350d71ed 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -6,10 +6,6 @@ #include "Errors.h" #include "SocketConnection.h" -#ifdef UNIT_TESTS_BUILT -#include "Interceptor.h" -#endif - using ceph::msgr::v2::FrameAssembler; using ceph::msgr::v2::FrameError; using ceph::msgr::v2::preamble_block_t; @@ -43,23 +39,23 @@ FrameAssemblerV2::~FrameAssemblerV2() #ifdef UNIT_TESTS_BUILT // should be consistent to intercept() in ProtocolV2.cc -void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write) +seastar::future<> FrameAssemblerV2::intercept_frames( + std::vector bps, + bp_type_t type) { assert(seastar::this_shard_id() == sid); assert(has_socket()); - if (conn.interceptor) { - auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; - // FIXME: doesn't support cross-core - auto action = conn.interceptor->intercept( - conn.get_local_shared_foreign_from_this(), - Breakpoint{tag, type}); - // tolerate leaking future in tests - std::ignore = seastar::smp::submit_to( + if (!conn.interceptor) { + return seastar::now(); + } + return conn.interceptor->intercept(conn, bps + ).then([this, type](bp_action_t action) { + return seastar::smp::submit_to( socket->get_shard_id(), [this, type, action] { socket->set_trap(type, action, &conn.interceptor->blocker); }); - } + }); } #endif @@ -361,17 +357,22 @@ FrameAssemblerV2::read_main_preamble() return read_exactly( rx_frame_asm.get_preamble_onwire_len() ).then([this](auto bptr) { + rx_preamble.append(std::move(bptr)); + Tag tag; try { - rx_preamble.append(std::move(bptr)); - const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); -#ifdef UNIT_TESTS_BUILT - intercept_frame(tag, false); -#endif - return read_main_t{tag, &rx_frame_asm}; + tag = rx_frame_asm.disassemble_preamble(rx_preamble); } catch (FrameError& e) { logger().warn("{} read_main_preamble: {}", conn, e.what()); throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); } +#ifdef UNIT_TESTS_BUILT + return intercept_frame(tag, false + ).then([this, tag] { + return read_main_t{tag, &rx_frame_asm}; + }); +#else + return read_main_t{tag, &rx_frame_asm}; +#endif }); } template seastar::future FrameAssemblerV2::read_main_preamble(); diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index e4af653812d7a..9c89c144e80a1 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -10,6 +10,10 @@ #include "crimson/common/gated.h" #include "crimson/net/Socket.h" +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + namespace crimson::net { class SocketConnection; @@ -131,9 +135,6 @@ public: template ceph::bufferlist get_buffer(F &tx_frame) { assert(seastar::this_shard_id() == sid); -#ifdef UNIT_TESTS_BUILT - intercept_frame(F::tag, true); -#endif auto bl = tx_frame.get_buffer(tx_frame_asm); log_main_preamble(bl); return bl; @@ -143,12 +144,56 @@ public: seastar::future<> write_flush_frame(F &tx_frame) { assert(seastar::this_shard_id() == sid); auto bl = get_buffer(tx_frame); +#ifdef UNIT_TESTS_BUILT + return intercept_frame(F::tag, true + ).then([this, bl=std::move(bl)]() mutable { + return write_flush(std::move(bl)); + }); +#else return write_flush(std::move(bl)); +#endif } static FrameAssemblerV2Ref create(SocketConnection &conn); +#ifdef UNIT_TESTS_BUILT + seastar::future<> intercept_frames( + std::vector tags, + bool is_write) { + auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + std::vector bps; + for (auto &tag : tags) { + bps.emplace_back(Breakpoint{tag, type}); + } + return intercept_frames(bps, type); + } + + seastar::future<> intercept_frame( + ceph::msgr::v2::Tag tag, + bool is_write) { + auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + std::vector bps; + bps.emplace_back(Breakpoint{tag, type}); + return intercept_frames(bps, type); + } + + seastar::future<> intercept_frame( + custom_bp_t bp, + bool is_write) { + auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + std::vector bps; + bps.emplace_back(Breakpoint{bp}); + return intercept_frames(bps, type); + } +#endif + private: +#ifdef UNIT_TESTS_BUILT + seastar::future<> intercept_frames( + std::vector bps, + bp_type_t type); +#endif + bool has_socket() const; SocketFRef move_socket(); @@ -157,10 +202,6 @@ private: void log_main_preamble(const ceph::bufferlist &bl); -#ifdef UNIT_TESTS_BUILT - void intercept_frame(ceph::msgr::v2::Tag, bool is_write); -#endif - SocketConnection &conn; SocketFRef socket; diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h index 764facf7be109..921175cbf9417 100644 --- a/src/crimson/net/Interceptor.h +++ b/src/crimson/net/Interceptor.h @@ -120,7 +120,9 @@ struct Interceptor { virtual void register_conn_ready(ConnectionRef) = 0; virtual void register_conn_closed(ConnectionRef) = 0; virtual void register_conn_replaced(ConnectionRef) = 0; - virtual bp_action_t intercept(ConnectionRef, Breakpoint bp) = 0; + + virtual seastar::future + intercept(Connection&, std::vector bp) = 0; }; } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 8bb9f7b682183..0992c74ec390c 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -17,10 +17,6 @@ #include "Errors.h" #include "SocketMessenger.h" -#ifdef UNIT_TESTS_BUILT -#include "Interceptor.h" -#endif - using namespace ceph::msgr::v2; using crimson::common::local_conf; @@ -101,28 +97,6 @@ inline uint64_t generate_client_cookie() { namespace crimson::net { -#ifdef UNIT_TESTS_BUILT -// should be consistent to intercept_frame() in FrameAssemblerV2.cc -void intercept(Breakpoint bp, - bp_type_t type, - SocketConnection& conn, - Interceptor *interceptor, - Socket *socket) { - if (interceptor) { - auto action = interceptor->intercept( - conn.get_local_shared_foreign_from_this(), - Breakpoint(bp)); - socket->set_trap(type, action, &interceptor->blocker); - } -} - -#define INTERCEPT_CUSTOM(bp, type) \ -intercept({bp}, type, conn, \ - conn.interceptor, conn.socket) -#else -#define INTERCEPT_CUSTOM(bp, type) -#endif - seastar::future<> ProtocolV2::Timer::backoff(double seconds) { logger().warn("{} waiting {} seconds ...", conn, seconds); @@ -439,101 +413,121 @@ ProtocolV2::banner_exchange(bool is_connect) CRIMSON_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES, CEPH_BANNER_V2_PREFIX); - INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE); - return frame_assembler->write_flush(std::move(bl)).then([this] { - // 2. read peer banner - unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); - INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); +#ifdef UNIT_TESTS_BUILT + return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true + ).then([this, bl=std::move(bl)]() mutable { + return frame_assembler->write_flush(std::move(bl)); + } +#else + return frame_assembler->write_flush(std::move(bl) +#endif + ).then([this] { + // 2. read peer banner + unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); +#ifdef UNIT_TESTS_BUILT + return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false + ).then([this, banner_len] { return frame_assembler->read_exactly(banner_len); - }).then([this](auto bptr) { - // 3. process peer banner and read banner_payload - unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); - logger().debug("{} RECV({}) banner: \"{}\"", - conn, bptr.length(), - std::string(bptr.c_str(), banner_prefix_len)); - - if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { - if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { - logger().warn("{} peer is using V1 protocol", conn); - } else { - logger().warn("{} peer sent bad banner", conn); - } - abort_in_fault(); + }); +#else + return frame_assembler->read_exactly(banner_len); +#endif + }).then([this](auto bptr) { + // 3. process peer banner and read banner_payload + unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); + logger().debug("{} RECV({}) banner: \"{}\"", + conn, bptr.length(), + std::string(bptr.c_str(), banner_prefix_len)); + + if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { + if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { + logger().warn("{} peer is using V1 protocol", conn); + } else { + logger().warn("{} peer sent bad banner", conn); } + abort_in_fault(); + } - bptr.set_offset(bptr.offset() + banner_prefix_len); - bptr.set_length(bptr.length() - banner_prefix_len); - assert(bptr.length() == sizeof(ceph_le16)); - - uint16_t payload_len; - bufferlist buf; - buf.append(std::move(bptr)); - auto ti = buf.cbegin(); - try { - decode(payload_len, ti); - } catch (const buffer::error &e) { - logger().warn("{} decode banner payload len failed", conn); - abort_in_fault(); - } - logger().debug("{} GOT banner: payload_len={}", conn, payload_len); - INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); + bptr.set_offset(bptr.offset() + banner_prefix_len); + bptr.set_length(bptr.length() - banner_prefix_len); + assert(bptr.length() == sizeof(ceph_le16)); + + uint16_t payload_len; + bufferlist buf; + buf.append(std::move(bptr)); + auto ti = buf.cbegin(); + try { + decode(payload_len, ti); + } catch (const buffer::error &e) { + logger().warn("{} decode banner payload len failed", conn); + abort_in_fault(); + } + logger().debug("{} GOT banner: payload_len={}", conn, payload_len); +#ifdef UNIT_TESTS_BUILT + return frame_assembler->intercept_frame( + custom_bp_t::BANNER_PAYLOAD_READ, false + ).then([this, payload_len] { return frame_assembler->read(payload_len); - }).then([this, is_connect] (bufferlist bl) { - // 4. process peer banner_payload and send HelloFrame - auto p = bl.cbegin(); - uint64_t _peer_supported_features; - uint64_t _peer_required_features; - try { - decode(_peer_supported_features, p); - decode(_peer_required_features, p); - } catch (const buffer::error &e) { - logger().warn("{} decode banner payload failed", conn); - abort_in_fault(); - } - logger().debug("{} RECV({}) banner features: supported={} required={}", - conn, bl.length(), - _peer_supported_features, _peer_required_features); - - // Check feature bit compatibility - uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES; - uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; - if ((required_features & _peer_supported_features) != required_features) { - logger().error("{} peer does not support all required features" - " required={} peer_supported={}", - conn, required_features, _peer_supported_features); - ABORT_IN_CLOSE(is_connect); - } - if ((supported_features & _peer_required_features) != _peer_required_features) { - logger().error("{} we do not support all peer required features" - " peer_required={} supported={}", - conn, _peer_required_features, supported_features); - ABORT_IN_CLOSE(is_connect); - } - peer_supported_features = _peer_supported_features; - bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); - frame_assembler->set_is_rev1(is_rev1); - - auto hello = HelloFrame::Encode(messenger.get_mytype(), - conn.target_addr); - logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", - conn, ceph_entity_type_name(messenger.get_mytype()), - conn.target_addr); - return frame_assembler->write_flush_frame(hello); - }).then([this] { - //5. read peer HelloFrame - return frame_assembler->read_main_preamble(); - }).then([this](auto ret) { - expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame"); - return frame_assembler->read_frame_payload(); - }).then([this](auto payload) { - // 6. process peer HelloFrame - auto hello = HelloFrame::Decode(payload->back()); - logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", - conn, ceph_entity_type_name(hello.entity_type()), - hello.peer_addr()); - return seastar::make_ready_future>( - std::make_tuple(hello.entity_type(), hello.peer_addr())); }); +#else + return frame_assembler->read(payload_len); +#endif + }).then([this, is_connect] (bufferlist bl) { + // 4. process peer banner_payload and send HelloFrame + auto p = bl.cbegin(); + uint64_t _peer_supported_features; + uint64_t _peer_required_features; + try { + decode(_peer_supported_features, p); + decode(_peer_required_features, p); + } catch (const buffer::error &e) { + logger().warn("{} decode banner payload failed", conn); + abort_in_fault(); + } + logger().debug("{} RECV({}) banner features: supported={} required={}", + conn, bl.length(), + _peer_supported_features, _peer_required_features); + + // Check feature bit compatibility + uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES; + uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; + if ((required_features & _peer_supported_features) != required_features) { + logger().error("{} peer does not support all required features" + " required={} peer_supported={}", + conn, required_features, _peer_supported_features); + ABORT_IN_CLOSE(is_connect); + } + if ((supported_features & _peer_required_features) != _peer_required_features) { + logger().error("{} we do not support all peer required features" + " peer_required={} supported={}", + conn, _peer_required_features, supported_features); + ABORT_IN_CLOSE(is_connect); + } + peer_supported_features = _peer_supported_features; + bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); + frame_assembler->set_is_rev1(is_rev1); + + auto hello = HelloFrame::Encode(messenger.get_mytype(), + conn.target_addr); + logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", + conn, ceph_entity_type_name(messenger.get_mytype()), + conn.target_addr); + return frame_assembler->write_flush_frame(hello); + }).then([this] { + //5. read peer HelloFrame + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame"); + return frame_assembler->read_frame_payload(); + }).then([this](auto payload) { + // 6. process peer HelloFrame + auto hello = HelloFrame::Decode(payload->back()); + logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", + conn, ceph_entity_type_name(hello.entity_type()), + hello.peer_addr()); + return seastar::make_ready_future>( + std::make_tuple(hello.entity_type(), hello.peer_addr())); + }); } // CONNECTING state @@ -883,177 +877,177 @@ void ProtocolV2::execute_connecting() ceph_assert_always(!is_socket_valid); trigger_state(state_t::CONNECTING, io_state_t::delay); gated_execute("execute_connecting", conn, [this] { - global_seq = messenger.get_global_seq(); - assert(client_cookie != 0); - if (!conn.policy.lossy && server_cookie != 0) { - ++connect_seq; - logger().debug("{} UPDATE: gs={}, cs={} for reconnect", - conn, global_seq, connect_seq); - } else { // conn.policy.lossy || server_cookie == 0 - assert(connect_seq == 0); - assert(server_cookie == 0); - logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); - } - return wait_exit_io().then([this] { + global_seq = messenger.get_global_seq(); + assert(client_cookie != 0); + if (!conn.policy.lossy && server_cookie != 0) { + ++connect_seq; + logger().debug("{} UPDATE: gs={}, cs={} for reconnect", + conn, global_seq, connect_seq); + } else { // conn.policy.lossy || server_cookie == 0 + assert(connect_seq == 0); + assert(server_cookie == 0); + logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); + } + return wait_exit_io().then([this] { #ifdef UNIT_TESTS_BUILT - // process custom_bp_t::SOCKET_CONNECTING - // supports CONTINUE/FAULT/BLOCK - if (conn.interceptor) { - auto action = conn.interceptor->intercept( - conn.get_local_shared_foreign_from_this(), - {custom_bp_t::SOCKET_CONNECTING}); - switch (action) { - case bp_action_t::CONTINUE: - return seastar::now(); - case bp_action_t::FAULT: - logger().info("[Test] got FAULT"); - abort_in_fault(); - case bp_action_t::BLOCK: - logger().info("[Test] got BLOCK"); - return conn.interceptor->blocker.block(); - default: - ceph_abort("unexpected action from trap"); - return seastar::now(); - } - } else { - return seastar::now(); - } - }).then([this] { -#endif - ceph_assert_always(frame_assembler); - if (unlikely(state != state_t::CONNECTING)) { - logger().debug("{} triggered {} before Socket::connect()", - conn, get_state_name(state)); - abort_protocol(); - } - return Socket::connect(conn.peer_addr); - }).then([this](SocketRef _new_socket) { - logger().debug("{} socket connected", conn); - if (unlikely(state != state_t::CONNECTING)) { - logger().debug("{} triggered {} during Socket::connect()", - conn, get_state_name(state)); - return _new_socket->close().then([sock=std::move(_new_socket)] { - abort_protocol(); - }); - } - SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket)); - if (!has_socket) { - frame_assembler->set_socket(std::move(new_socket)); - has_socket = true; - } else { - gate.dispatch_in_background( - "replace_socket_connecting", - conn, - [this, new_socket=std::move(new_socket)]() mutable { - return frame_assembler->replace_shutdown_socket(std::move(new_socket)); - } - ); - } - is_socket_valid = true; + // process custom_bp_t::SOCKET_CONNECTING + // supports CONTINUE/FAULT/BLOCK + if (!conn.interceptor) { + return seastar::now(); + } + return conn.interceptor->intercept( + conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}} + ).then([this](bp_action_t action) { + switch (action) { + case bp_action_t::CONTINUE: return seastar::now(); - }).then([this] { - auth_meta = seastar::make_lw_shared(); - frame_assembler->reset_handlers(); - frame_assembler->start_recording(); - return banner_exchange(true); - }).then([this] (auto&& ret) { - auto [_peer_type, _my_addr_from_peer] = std::move(ret); - if (conn.get_peer_type() != _peer_type) { - logger().warn("{} connection peer type does not match what peer advertises {} != {}", - conn, ceph_entity_type_name(conn.get_peer_type()), - ceph_entity_type_name(_peer_type)); - ABORT_IN_CLOSE(true); - } - if (unlikely(state != state_t::CONNECTING)) { - logger().debug("{} triggered {} during banner_exchange(), abort", - conn, get_state_name(state)); - abort_protocol(); - } - frame_assembler->learn_socket_ephemeral_port_as_connector( - _my_addr_from_peer.get_port()); - if (unlikely(_my_addr_from_peer.is_legacy())) { - logger().warn("{} peer sent a legacy address for me: {}", - conn, _my_addr_from_peer); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2); - messenger.learned_addr(_my_addr_from_peer, conn); - return client_auth(); - }).then([this] { - if (server_cookie == 0) { - ceph_assert(connect_seq == 0); - return client_connect(); - } else { - ceph_assert(connect_seq > 0); - return client_reconnect(); + case bp_action_t::FAULT: + logger().info("[Test] got FAULT"); + abort_in_fault(); + case bp_action_t::BLOCK: + logger().info("[Test] got BLOCK"); + return conn.interceptor->blocker.block(); + default: + ceph_abort("unexpected action from trap"); + return seastar::now(); + } + });; + }).then([this] { +#endif + ceph_assert_always(frame_assembler); + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} before Socket::connect()", + conn, get_state_name(state)); + abort_protocol(); + } + return Socket::connect(conn.peer_addr); + }).then([this](SocketRef _new_socket) { + logger().debug("{} socket connected", conn); + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} during Socket::connect()", + conn, get_state_name(state)); + return _new_socket->close().then([sock=std::move(_new_socket)] { + abort_protocol(); + }); + } + SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket)); + if (!has_socket) { + frame_assembler->set_socket(std::move(new_socket)); + has_socket = true; + } else { + gate.dispatch_in_background( + "replace_socket_connecting", + conn, + [this, new_socket=std::move(new_socket)]() mutable { + return frame_assembler->replace_shutdown_socket(std::move(new_socket)); } - }).then([this] (next_step_t next) { + ); + } + is_socket_valid = true; + return seastar::now(); + }).then([this] { + auth_meta = seastar::make_lw_shared(); + frame_assembler->reset_handlers(); + frame_assembler->start_recording(); + return banner_exchange(true); + }).then([this] (auto&& ret) { + auto [_peer_type, _my_addr_from_peer] = std::move(ret); + if (conn.get_peer_type() != _peer_type) { + logger().warn("{} connection peer type does not match what peer advertises {} != {}", + conn, ceph_entity_type_name(conn.get_peer_type()), + ceph_entity_type_name(_peer_type)); + ABORT_IN_CLOSE(true); + } + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} during banner_exchange(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + frame_assembler->learn_socket_ephemeral_port_as_connector( + _my_addr_from_peer.get_port()); + if (unlikely(_my_addr_from_peer.is_legacy())) { + logger().warn("{} peer sent a legacy address for me: {}", + conn, _my_addr_from_peer); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2); + messenger.learned_addr(_my_addr_from_peer, conn); + return client_auth(); + }).then([this] { + if (server_cookie == 0) { + ceph_assert(connect_seq == 0); + return client_connect(); + } else { + ceph_assert(connect_seq > 0); + return client_reconnect(); + } + }).then([this] (next_step_t next) { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} at the end of execute_connecting()", + conn, get_state_name(state)); + abort_protocol(); + } + switch (next) { + case next_step_t::ready: { + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} before dispatch_connect(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + + auto cc_seq = crosscore.prepare_submit(); + // there are 2 hops with dispatch_connect() + crosscore.prepare_submit(); + logger().info("{} connected: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}, new_sid={}, " + "send {} IOHandler::dispatch_connect()", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, io_states, + frame_assembler->get_socket_shard_id(), cc_seq); + + // set io_handler to a new shard + auto new_io_shard = frame_assembler->get_socket_shard_id(); + ConnectionFRef conn_fref = seastar::make_foreign( + conn.shared_from_this()); + ceph_assert_always(!pr_switch_io_shard.has_value()); + pr_switch_io_shard = seastar::shared_promise<>(); + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, cc_seq, new_io_shard, + conn_fref=std::move(conn_fref)]() mutable { + return io_handler.dispatch_connect( + cc_seq, new_io_shard, std::move(conn_fref)); + }).then([this, new_io_shard] { + ceph_assert_always(io_handler.get_shard_id() == new_io_shard); + pr_switch_io_shard->set_value(); + pr_switch_io_shard = std::nullopt; + // user can make changes + if (unlikely(state != state_t::CONNECTING)) { - logger().debug("{} triggered {} at the end of execute_connecting()", + logger().debug("{} triggered {} after dispatch_connect(), abort", conn, get_state_name(state)); abort_protocol(); } - switch (next) { - case next_step_t::ready: { - if (unlikely(state != state_t::CONNECTING)) { - logger().debug("{} triggered {} before dispatch_connect(), abort", - conn, get_state_name(state)); - abort_protocol(); - } - - auto cc_seq = crosscore.prepare_submit(); - // there are 2 hops with dispatch_connect() - crosscore.prepare_submit(); - logger().info("{} connected: gs={}, pgs={}, cs={}, " - "client_cookie={}, server_cookie={}, {}, new_sid={}, " - "send {} IOHandler::dispatch_connect()", - conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, io_states, - frame_assembler->get_socket_shard_id(), cc_seq); - - // set io_handler to a new shard - auto new_io_shard = frame_assembler->get_socket_shard_id(); - ConnectionFRef conn_fref = seastar::make_foreign( - conn.shared_from_this()); - ceph_assert_always(!pr_switch_io_shard.has_value()); - pr_switch_io_shard = seastar::shared_promise<>(); - return seastar::smp::submit_to( - io_handler.get_shard_id(), - [this, cc_seq, new_io_shard, - conn_fref=std::move(conn_fref)]() mutable { - return io_handler.dispatch_connect( - cc_seq, new_io_shard, std::move(conn_fref)); - }).then([this, new_io_shard] { - ceph_assert_always(io_handler.get_shard_id() == new_io_shard); - pr_switch_io_shard->set_value(); - pr_switch_io_shard = std::nullopt; - // user can make changes - - if (unlikely(state != state_t::CONNECTING)) { - logger().debug("{} triggered {} after dispatch_connect(), abort", - conn, get_state_name(state)); - abort_protocol(); - } - execute_ready(); - }); - } - case next_step_t::wait: { - logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); - ceph_assert_always(is_socket_valid); - frame_assembler->shutdown_socket(&gate); - is_socket_valid = false; - execute_wait(true); - return seastar::now(); - } - default: { - ceph_abort("impossible next step"); - } - } - }).handle_exception([this](std::exception_ptr eptr) { - fault(state_t::CONNECTING, "execute_connecting", eptr); + execute_ready(); }); + } + case next_step_t::wait: { + logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn); + ceph_assert_always(is_socket_valid); + frame_assembler->shutdown_socket(&gate); + is_socket_valid = false; + execute_wait(true); + return seastar::now(); + } + default: { + ceph_abort("impossible next step"); + } + } + }).handle_exception([this](std::exception_ptr eptr) { + fault(state_t::CONNECTING, "execute_connecting", eptr); }); + }); } // ACCEPTING state @@ -1629,90 +1623,87 @@ void ProtocolV2::execute_accepting() assert(is_socket_valid); trigger_state(state_t::ACCEPTING, io_state_t::none); gate.dispatch_in_background("execute_accepting", conn, [this] { - return seastar::futurize_invoke([this] { + return seastar::futurize_invoke([this] { #ifdef UNIT_TESTS_BUILT - if (conn.interceptor) { - auto action = conn.interceptor->intercept( - conn.get_local_shared_foreign_from_this(), - {custom_bp_t::SOCKET_ACCEPTED}); - switch (action) { - case bp_action_t::CONTINUE: - break; - case bp_action_t::FAULT: - logger().info("[Test] got FAULT"); - abort_in_fault(); - default: - ceph_abort("unexpected action from trap"); - } - } -#endif - auth_meta = seastar::make_lw_shared(); - frame_assembler->reset_handlers(); - frame_assembler->start_recording(); - return banner_exchange(false); - }).then([this] (auto&& ret) { - auto [_peer_type, _my_addr_from_peer] = std::move(ret); - ceph_assert(conn.get_peer_type() == 0); - conn.set_peer_type(_peer_type); - - conn.policy = messenger.get_policy(_peer_type); - logger().info("{} UPDATE: peer_type={}," - " policy(lossy={} server={} standby={} resetcheck={})", - conn, ceph_entity_type_name(_peer_type), - conn.policy.lossy, conn.policy.server, - conn.policy.standby, conn.policy.resetcheck); - if (!messenger.get_myaddr().is_blank_ip() && - (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() || - messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) { - logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}", - conn, _my_addr_from_peer, messenger.get_myaddr()); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - messenger.learned_addr(_my_addr_from_peer, conn); - return server_auth(); - }).then([this] { - return frame_assembler->read_main_preamble(); - }).then([this](auto ret) { - switch (ret.tag) { - case Tag::CLIENT_IDENT: - return server_connect(); - case Tag::SESSION_RECONNECT: - return server_reconnect(); - default: { - unexpected_tag(ret.tag, conn, "post_server_auth"); - return seastar::make_ready_future(next_step_t::none); - } - } - }).then([this] (next_step_t next) { - switch (next) { - case next_step_t::ready: - assert(state != state_t::ACCEPTING); - break; - case next_step_t::wait: - if (unlikely(state != state_t::ACCEPTING)) { - logger().debug("{} triggered {} at the end of execute_accepting()", - conn, get_state_name(state)); - abort_protocol(); - } - logger().info("{} execute_accepting(): going to SERVER_WAIT", conn); - execute_server_wait(); - break; - default: - ceph_abort("impossible next step"); - } - }).handle_exception([this](std::exception_ptr eptr) { - const char *e_what; - try { - std::rethrow_exception(eptr); - } catch (std::exception &e) { - e_what = e.what(); - } - logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", - conn, get_state_name(state), e_what); - do_close(false); + if (conn.interceptor) { + // only notify socket accepted + gate.dispatch_in_background( + "test_intercept_socket_accepted", conn, [this] { + return conn.interceptor->intercept( + conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}} + ).then([](bp_action_t action) { + ceph_assert(action == bp_action_t::CONTINUE); + }); }); + } +#endif + auth_meta = seastar::make_lw_shared(); + frame_assembler->reset_handlers(); + frame_assembler->start_recording(); + return banner_exchange(false); + }).then([this] (auto&& ret) { + auto [_peer_type, _my_addr_from_peer] = std::move(ret); + ceph_assert(conn.get_peer_type() == 0); + conn.set_peer_type(_peer_type); + + conn.policy = messenger.get_policy(_peer_type); + logger().info("{} UPDATE: peer_type={}," + " policy(lossy={} server={} standby={} resetcheck={})", + conn, ceph_entity_type_name(_peer_type), + conn.policy.lossy, conn.policy.server, + conn.policy.standby, conn.policy.resetcheck); + if (!messenger.get_myaddr().is_blank_ip() && + (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() || + messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) { + logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}", + conn, _my_addr_from_peer, messenger.get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + messenger.learned_addr(_my_addr_from_peer, conn); + return server_auth(); + }).then([this] { + return frame_assembler->read_main_preamble(); + }).then([this](auto ret) { + switch (ret.tag) { + case Tag::CLIENT_IDENT: + return server_connect(); + case Tag::SESSION_RECONNECT: + return server_reconnect(); + default: { + unexpected_tag(ret.tag, conn, "post_server_auth"); + return seastar::make_ready_future(next_step_t::none); + } + } + }).then([this] (next_step_t next) { + switch (next) { + case next_step_t::ready: + assert(state != state_t::ACCEPTING); + break; + case next_step_t::wait: + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} at the end of execute_accepting()", + conn, get_state_name(state)); + abort_protocol(); + } + logger().info("{} execute_accepting(): going to SERVER_WAIT", conn); + execute_server_wait(); + break; + default: + ceph_abort("impossible next step"); + } + }).handle_exception([this](std::exception_ptr eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", + conn, get_state_name(state), e_what); + do_close(false); }); + }); } // CONNECTING or ACCEPTING state diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 15d5509dc160a..c414c48e12f8e 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -63,7 +63,12 @@ IOHandler::~IOHandler() assert(!conn_ref); } -ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( +#ifdef UNIT_TESTS_BUILT +IOHandler::sweep_ret +#else +ceph::bufferlist +#endif +IOHandler::sweep_out_pending_msgs_to_sent( bool require_keepalive, std::optional maybe_keepalive_ack, bool require_ack) @@ -71,25 +76,45 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( std::size_t num_msgs = out_pending_msgs.size(); ceph::bufferlist bl; +#ifdef UNIT_TESTS_BUILT + std::vector tags; +#endif + if (unlikely(require_keepalive)) { auto keepalive_frame = KeepAliveFrame::Encode(); bl.append(frame_assembler->get_buffer(keepalive_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = KeepAliveFrame::tag; + tags.push_back(tag); +#endif } if (unlikely(maybe_keepalive_ack.has_value())) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); bl.append(frame_assembler->get_buffer(keepalive_ack_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = KeepAliveFrameAck::tag; + tags.push_back(tag); +#endif } if (require_ack && num_msgs == 0u) { auto ack_frame = AckFrame::Encode(in_seq); bl.append(frame_assembler->get_buffer(ack_frame)); +#ifdef UNIT_TESTS_BUILT + auto tag = AckFrame::tag; + tags.push_back(tag); +#endif } std::for_each( out_pending_msgs.begin(), out_pending_msgs.begin()+num_msgs, - [this, &bl](const MessageFRef& msg) { + [this, &bl +#ifdef UNIT_TESTS_BUILT + , &tags +#endif + ](const MessageFRef& msg) { // set priority msg->get_header().src = conn.messenger.get_myname(); @@ -114,6 +139,10 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); bl.append(frame_assembler->get_buffer(message)); +#ifdef UNIT_TESTS_BUILT + auto tag = MessageFrame::tag; + tags.push_back(tag); +#endif }); if (!conn.policy.lossy) { @@ -123,7 +152,12 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent( std::make_move_iterator(out_pending_msgs.end())); } out_pending_msgs.clear(); + +#ifdef UNIT_TESTS_BUILT + return sweep_ret{std::move(bl), tags}; +#else return bl; +#endif } seastar::future<> IOHandler::send(MessageFRef msg) @@ -791,9 +825,18 @@ IOHandler::do_out_dispatch(shard_states_t &ctx) auto to_ack = ack_left; assert(to_ack == 0 || in_seq > 0); ack_left = 0; - return frame_assembler->write( - sweep_out_pending_msgs_to_sent( - require_keepalive, maybe_keepalive_ack, to_ack > 0) +#ifdef UNIT_TESTS_BUILT + auto ret = sweep_out_pending_msgs_to_sent( + require_keepalive, maybe_keepalive_ack, to_ack > 0); + return frame_assembler->intercept_frames(ret.tags, true + ).then([this, bl=std::move(ret.bl)]() mutable { + return frame_assembler->write(std::move(bl)); + } +#else + auto bl = sweep_out_pending_msgs_to_sent( + require_keepalive, maybe_keepalive_ack, to_ack > 0); + return frame_assembler->write(std::move(bl) +#endif ).then([this, &ctx] { if (ctx.get_io_state() != io_state_t::open) { return frame_assembler->flush( diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index 843e565672abe..f53c2ba646847 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -3,6 +3,8 @@ #pragma once +#include + #include #include @@ -475,7 +477,16 @@ public: seastar::future<> do_out_dispatch(shard_states_t &ctx); - ceph::bufferlist sweep_out_pending_msgs_to_sent( +#ifdef UNIT_TESTS_BUILT + struct sweep_ret { + ceph::bufferlist bl; + std::vector tags; + }; + sweep_ret +#else + ceph::bufferlist +#endif + sweep_out_pending_msgs_to_sent( bool require_keepalive, std::optional maybe_keepalive_ack, bool require_ack); diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 8b1ca07edb920..1359b5aeb62ea 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -775,49 +775,68 @@ struct TestInterceptor : public Interceptor { logger().info("[{}] {} {}", result->index, *conn, result->state); } - bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override { - ++breakpoints_counter[bp].counter; + seastar::future + intercept(Connection &_conn, std::vector bps) override { + assert(bps.size() >= 1); + + std::vector actions; + for (const Breakpoint &bp : bps) { + ++breakpoints_counter[bp].counter; + + auto result = find_result(&*conn); + if (result == nullptr) { + logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", + *conn, bp, breakpoints_counter[bp].counter); + ceph_abort(); + } - auto result = find_result(&*conn); - if (result == nullptr) { - logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", - *conn, bp, breakpoints_counter[bp].counter); - ceph_abort(); - } + if (bp == custom_bp_t::SOCKET_CONNECTING) { + ++result->connect_attempts; + logger().info("[Test] connect_attempts={}", result->connect_attempts); + } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) { + ++result->client_connect_attempts; + logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts); + } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) { + ++result->client_reconnect_attempts; + logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts); + } else if (bp == custom_bp_t::SOCKET_ACCEPTED) { + ++result->accept_attempts; + logger().info("[Test] accept_attempts={}", result->accept_attempts); + } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) { + ++result->server_connect_attempts; + logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts); + } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) { + ++result->server_reconnect_attempts; + logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts); + } - if (bp == custom_bp_t::SOCKET_CONNECTING) { - ++result->connect_attempts; - logger().info("[Test] connect_attempts={}", result->connect_attempts); - } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) { - ++result->client_connect_attempts; - logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts); - } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) { - ++result->client_reconnect_attempts; - logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts); - } else if (bp == custom_bp_t::SOCKET_ACCEPTED) { - ++result->accept_attempts; - logger().info("[Test] accept_attempts={}", result->accept_attempts); - } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) { - ++result->server_connect_attempts; - logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts); - } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) { - ++result->server_reconnect_attempts; - logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts); + auto it_bp = breakpoints.find(bp); + if (it_bp != breakpoints.end()) { + auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter); + if (it_cnt != it_bp->second.end()) { + logger().info("[{}] {} intercepted {}({}) => {}", + result->index, *conn, bp, + breakpoints_counter[bp].counter, it_cnt->second); + actions.emplace_back(it_cnt->second); + continue; + } + } + logger().info("[{}] {} intercepted {}({})", + result->index, *conn, bp, breakpoints_counter[bp].counter); + actions.emplace_back(bp_action_t::CONTINUE); } - auto it_bp = breakpoints.find(bp); - if (it_bp != breakpoints.end()) { - auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter); - if (it_cnt != it_bp->second.end()) { - logger().info("[{}] {} intercepted {}({}) => {}", - result->index, *conn, bp, - breakpoints_counter[bp].counter, it_cnt->second); - return it_cnt->second; + bp_action_t action = bp_action_t::CONTINUE; + for (bp_action_t &a : actions) { + if (a != bp_action_t::CONTINUE) { + if (action == bp_action_t::CONTINUE) { + action = a; + } else { + ceph_abort("got multiple incompatible actions"); + } } } - logger().info("[{}] {} intercepted {}({})", - result->index, *conn, bp, breakpoints_counter[bp].counter); - return bp_action_t::CONTINUE; + return seastar::make_ready_future(action); } };