From c8578821b46d06b0db4168e4b41cf1a98a752469 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 1 Dec 2022 09:47:36 +0800 Subject: [PATCH] crimson/net: move intercept_frame() from ProtocolV2 down to FrameAssemblerV2 Signed-off-by: Yingxin Cheng --- src/crimson/net/FrameAssemblerV2.cc | 21 ++++++ src/crimson/net/FrameAssemblerV2.h | 7 ++ src/crimson/net/ProtocolV2.cc | 99 ++++++++++------------------- src/crimson/net/ProtocolV2.h | 8 --- 4 files changed, 60 insertions(+), 75 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index 034cf8ed4ca3a..6165e2e57dccf 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -6,6 +6,10 @@ #include "Errors.h" #include "SocketConnection.h" +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + using ceph::msgr::v2::FrameAssembler; using ceph::msgr::v2::FrameError; using ceph::msgr::v2::preamble_block_t; @@ -26,6 +30,20 @@ FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn) : conn{_conn} {} +#ifdef UNIT_TESTS_BUILT +// should be consistent to intercept() in ProtocolV2.cc +void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write) +{ + assert(has_socket()); + if (conn.interceptor) { + auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + auto action = conn.interceptor->intercept( + conn, Breakpoint{tag, type}); + socket->set_trap(type, action, &conn.interceptor->blocker); + } +} +#endif + void FrameAssemblerV2::set_is_rev1(bool _is_rev1) { is_rev1 = _is_rev1; @@ -198,6 +216,9 @@ FrameAssemblerV2::read_main_preamble() try { rx_preamble.append(buffer::create(std::move(bl))); const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); +#ifdef UNIT_TESTS_BUILT + intercept_frame(tag, false); +#endif return read_main_t{tag, &rx_frame_asm}; } catch (FrameError& e) { logger().warn("{} read_main_preamble: {}", conn, e.what()); diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index b3ee3e03a3cb6..5cea92440fe22 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -103,6 +103,9 @@ public: template ceph::bufferlist get_buffer(F &tx_frame) { +#ifdef UNIT_TESTS_BUILT + intercept_frame(F::tag, true); +#endif auto bl = tx_frame.get_buffer(tx_frame_asm); log_main_preamble(bl); return bl; @@ -121,6 +124,10 @@ private: void log_main_preamble(const ceph::bufferlist &bl); +#ifdef UNIT_TESTS_BUILT + void intercept_frame(ceph::msgr::v2::Tag, bool is_write); +#endif + SocketConnection &conn; Socket *socket = nullptr; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 8514da381e884..74719ec3babf5 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -122,6 +122,7 @@ template auto ptr(const ::seastar::shared_ptr& p) -> const void* namespace crimson::net { #ifdef UNIT_TESTS_BUILT +// should be consistent to intercept_frame() in FrameAssemblerV2.cc void intercept(Breakpoint bp, bp_type_t type, Connection& conn, @@ -137,11 +138,6 @@ void intercept(Breakpoint bp, intercept({bp}, type, conn, \ conn.interceptor, conn.socket) -#define INTERCEPT_FRAME(tag, type) \ -intercept({static_cast(tag), type}, \ - type, conn, \ - conn.interceptor, conn.socket) - #define INTERCEPT_N_RW(bp) \ if (conn.interceptor) { \ auto action = conn.interceptor->intercept(conn, {bp}); \ @@ -153,7 +149,6 @@ if (conn.interceptor) { \ #else #define INTERCEPT_CUSTOM(bp, type) -#define INTERCEPT_FRAME(tag, type) #define INTERCEPT_N_RW(bp) #endif @@ -224,36 +219,6 @@ void ProtocolV2::start_accept(SocketRef&& new_socket, execute_accepting(); } -// TODO: Frame related implementations, probably to a separate class. - -seastar::future -ProtocolV2::read_main_preamble() -{ - return frame_assembler.read_main_preamble( -#ifdef UNIT_TESTS_BUILT - ).then([this](auto ret) { - INTERCEPT_FRAME(ret.tag, bp_type_t::READ); - return ret; - }); -#else - ); -#endif -} - -template -ceph::bufferlist ProtocolV2::get_buffer(F &tx_frame) -{ - INTERCEPT_FRAME(F::tag, bp_type_t::WRITE); - return frame_assembler.get_buffer(tx_frame); -} - -template -seastar::future<> ProtocolV2::write_flush_frame(F &tx_frame) -{ - INTERCEPT_FRAME(F::tag, bp_type_t::WRITE); - return frame_assembler.write_flush_frame(tx_frame); -} - void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant) { if (!reentrant && _state == state) { @@ -482,10 +447,10 @@ ProtocolV2::banner_exchange(bool is_connect) logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", conn, ceph_entity_type_name(messenger.get_mytype()), conn.target_addr); - return write_flush_frame(hello); + return frame_assembler.write_flush_frame(hello); }).then([this] { //5. read peer HelloFrame - return read_main_preamble(); + return frame_assembler.read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::HELLO, ret.tag, conn, __func__); return frame_assembler.read_frame_payload(); @@ -504,7 +469,7 @@ ProtocolV2::banner_exchange(bool is_connect) seastar::future<> ProtocolV2::handle_auth_reply() { - return read_main_preamble( + return frame_assembler.read_main_preamble( ).then([this](auto ret) { switch (ret.tag) { case Tag::AUTH_BAD_METHOD: @@ -542,7 +507,7 @@ seastar::future<> ProtocolV2::handle_auth_reply() auto more_reply = AuthRequestMoreFrame::Encode(reply); logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", conn, reply.length()); - return write_flush_frame(more_reply); + return frame_assembler.write_flush_frame(more_reply); }).then([this] { return handle_auth_reply(); }); @@ -591,7 +556,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods logger().debug("{} WRITE AuthRequestFrame: method={}," " preferred_modes={}, payload_len={}", conn, auth_method, preferred_modes, bl.length()); - return write_flush_frame(frame).then([this] { + return frame_assembler.write_flush_frame(frame).then([this] { return handle_auth_reply(); }); } catch (const crimson::auth::error& e) { @@ -639,8 +604,8 @@ ProtocolV2::client_connect() conn.policy.features_supported, conn.policy.features_required | msgr2_required, flags, client_cookie); - return write_flush_frame(client_ident).then([this] { - return read_main_preamble(); + return frame_assembler.write_flush_frame(client_ident).then([this] { + return frame_assembler.read_main_preamble(); }).then([this](auto ret) { switch (ret.tag) { case Tag::IDENT_MISSING_FEATURES: @@ -742,8 +707,8 @@ ProtocolV2::client_reconnect() conn, messenger.get_myaddrs(), client_cookie, server_cookie, global_seq, connect_seq, get_in_seq()); - return write_flush_frame(reconnect).then([this] { - return read_main_preamble(); + return frame_assembler.write_flush_frame(reconnect).then([this] { + return frame_assembler.read_main_preamble(); }).then([this](auto ret) { switch (ret.tag) { case Tag::SESSION_RETRY_GLOBAL: @@ -937,7 +902,7 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r) "allowed_methods={}, allowed_modes={})", conn, auth_meta->auth_method, cpp_strerror(r), allowed_methods, allowed_modes); - return write_flush_frame(bad_method).then([this] { + return frame_assembler.write_flush_frame(bad_method).then([this] { return server_auth(); }); } @@ -963,7 +928,7 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", conn, conn.peer_global_id, ceph_con_mode_name(auth_meta->con_mode), reply.length()); - return write_flush_frame(auth_done).then([this] { + return frame_assembler.write_flush_frame(auth_done).then([this] { ceph_assert(auth_meta); frame_assembler.create_session_stream_handlers(*auth_meta, true); return finish_auth(); @@ -974,8 +939,8 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo auto more = AuthReplyMoreFrame::Encode(reply); logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", conn, reply.length()); - return write_flush_frame(more).then([this] { - return read_main_preamble(); + return frame_assembler.write_flush_frame(more).then([this] { + return frame_assembler.read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__); return frame_assembler.read_frame_payload(); @@ -1000,7 +965,7 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo seastar::future<> ProtocolV2::server_auth() { - return read_main_preamble( + return frame_assembler.read_main_preamble( ).then([this](auto ret) { expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__); return frame_assembler.read_frame_payload(); @@ -1042,7 +1007,7 @@ ProtocolV2::send_wait() { auto wait = WaitFrame::Encode(); logger().debug("{} WRITE WaitFrame", conn); - return write_flush_frame(wait).then([] { + return frame_assembler.write_flush_frame(wait).then([] { return next_step_t::wait; }); } @@ -1256,7 +1221,7 @@ ProtocolV2::server_connect() auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", conn, feat_missing); - return write_flush_frame(ident_missing_features).then([] { + return frame_assembler.write_flush_frame(ident_missing_features).then([] { return next_step_t::wait; }); } @@ -1294,7 +1259,7 @@ ProtocolV2::server_connect() seastar::future ProtocolV2::read_reconnect() { - return read_main_preamble( + return frame_assembler.read_main_preamble( ).then([this](auto ret) { expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect"); return server_reconnect(); @@ -1306,7 +1271,7 @@ ProtocolV2::send_retry(uint64_t connect_seq) { auto retry = RetryFrame::Encode(connect_seq); logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); - return write_flush_frame(retry).then([this] { + return frame_assembler.write_flush_frame(retry).then([this] { return read_reconnect(); }); } @@ -1316,7 +1281,7 @@ ProtocolV2::send_retry_global(uint64_t global_seq) { auto retry = RetryGlobalFrame::Encode(global_seq); logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); - return write_flush_frame(retry).then([this] { + return frame_assembler.write_flush_frame(retry).then([this] { return read_reconnect(); }); } @@ -1326,8 +1291,8 @@ ProtocolV2::send_reset(bool full) { auto reset = ResetFrame::Encode(full); logger().warn("{} WRITE ResetFrame: full={}", conn, full); - return write_flush_frame(reset).then([this] { - return read_main_preamble(); + return frame_assembler.write_flush_frame(reset).then([this] { + return frame_assembler.read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset"); return server_connect(); @@ -1511,7 +1476,7 @@ void ProtocolV2::execute_accepting() messenger.learned_addr(_my_addr_from_peer, conn); return server_auth(); }).then([this] { - return read_main_preamble(); + return frame_assembler.read_main_preamble(); }).then([this](auto ret) { switch (ret.tag) { case Tag::CLIENT_IDENT: @@ -1559,8 +1524,8 @@ seastar::future<> ProtocolV2::finish_auth() auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf); auto sig_frame = AuthSignatureFrame::Encode(sig); logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); - return write_flush_frame(sig_frame).then([this] { - return read_main_preamble(); + return frame_assembler.write_flush_frame(sig_frame).then([this] { + return frame_assembler.read_main_preamble(); }).then([this](auto ret) { expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth"); return frame_assembler.read_frame_payload(); @@ -1677,7 +1642,7 @@ ProtocolV2::send_server_ident() conn.policy.features_required | msgr2_required, flags, server_cookie); - return write_flush_frame(server_ident); + return frame_assembler.write_flush_frame(server_ident); } // REPLACING state @@ -1760,7 +1725,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, requeue_out_sent_up_to(new_msg_seq); auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq()); logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq()); - return write_flush_frame(reconnect_ok); + return frame_assembler.write_flush_frame(reconnect_ok); } else { client_cookie = new_client_cookie; assert(conn.get_peer_type() == new_peer_name.type()); @@ -1805,17 +1770,17 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( if (unlikely(require_keepalive)) { auto keepalive_frame = KeepAliveFrame::Encode(); - bl.append(get_buffer(keepalive_frame)); + bl.append(frame_assembler.get_buffer(keepalive_frame)); } if (unlikely(maybe_keepalive_ack.has_value())) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); - bl.append(get_buffer(keepalive_ack_frame)); + bl.append(frame_assembler.get_buffer(keepalive_ack_frame)); } if (require_ack && num_msgs == 0u) { auto ack_frame = AckFrame::Encode(get_in_seq()); - bl.append(get_buffer(ack_frame)); + bl.append(frame_assembler.get_buffer(ack_frame)); } std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) { @@ -1843,7 +1808,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( msg->get_payload(), msg->get_middle(), msg->get_data()); logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); - bl.append(get_buffer(message)); + bl.append(frame_assembler.get_buffer(message)); }); return bl; @@ -1983,7 +1948,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect) gate.dispatch_in_background("execute_ready", *this, [this] { protocol_timer.cancel(); return seastar::keep_doing([this] { - return read_main_preamble( + return frame_assembler.read_main_preamble( ).then([this](auto ret) { switch (ret.tag) { case Tag::MESSAGE: { diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index b75c008f0d5e1..8b0d098a0fcc9 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -166,14 +166,6 @@ class ProtocolV2 final : public Protocol { Timer protocol_timer; private: - seastar::future read_main_preamble(); - - template - ceph::bufferlist get_buffer(F &tx_frame); - - template - seastar::future<> write_flush_frame(F &tx_frame); - void fault(state_t expected_state, const char *where, std::exception_ptr eptr); -- 2.39.5