From e2dc952bfa405ae61daff556984cb9a8cabb82d7 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 24 Apr 2023 13:47:22 +0800 Subject: [PATCH] crimson/net: decouple tests from SocketConnection class Signed-off-by: Yingxin Cheng (cherry picked from commit 7797cabe2ab38eea30eca51614398badef8bada1) --- src/crimson/net/FrameAssemblerV2.cc | 4 ++- src/crimson/net/Fwd.h | 2 -- src/crimson/net/Interceptor.h | 10 +++---- src/crimson/net/ProtocolV2.cc | 17 +++++++---- src/crimson/net/SocketConnection.cc | 2 +- src/crimson/net/SocketConnection.h | 1 + src/crimson/net/io_handler.cc | 4 ++- src/test/crimson/test_messenger.cc | 45 ++++++++++++++--------------- 8 files changed, 46 insertions(+), 39 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index 165ae18a1a0..c952bb1bd56 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -44,8 +44,10 @@ void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write) assert(has_socket()); if (conn.interceptor) { auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; + // FIXME: doesn't support cross-core auto action = conn.interceptor->intercept( - conn, Breakpoint{tag, type}); + conn.get_local_shared_foreign_from_this(), + Breakpoint{tag, type}); socket->set_trap(type, action, &conn.interceptor->blocker); } } diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 3eb57ef9781..1dc4809810d 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -39,8 +39,6 @@ using ConnectionLRef = seastar::shared_ptr; using ConnectionFRef = seastar::foreign_ptr; using ConnectionRef = ::crimson::local_shared_foreign_ptr; -class SocketConnection; - class Dispatcher; class ChainedDispatchers; constexpr std::size_t NUM_DISPATCHERS = 4u; diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h index 41ec31f3755..764facf7be1 100644 --- a/src/crimson/net/Interceptor.h +++ b/src/crimson/net/Interceptor.h @@ -116,11 +116,11 @@ struct Breakpoint { struct Interceptor { socket_blocker blocker; virtual ~Interceptor() {} - virtual void register_conn(SocketConnection& conn) = 0; - virtual void register_conn_ready(SocketConnection& conn) = 0; - virtual void register_conn_closed(SocketConnection& conn) = 0; - virtual void register_conn_replaced(SocketConnection& conn) = 0; - virtual bp_action_t intercept(SocketConnection& conn, Breakpoint bp) = 0; + virtual void register_conn(ConnectionRef) = 0; + virtual void register_conn_ready(ConnectionRef) = 0; + virtual void register_conn_closed(ConnectionRef) = 0; + virtual void register_conn_replaced(ConnectionRef) = 0; + virtual bp_action_t intercept(ConnectionRef, Breakpoint bp) = 0; }; } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 543f2581b47..9d1956c3c11 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -111,7 +111,9 @@ void intercept(Breakpoint bp, Interceptor *interceptor, Socket *socket) { if (interceptor) { - auto action = interceptor->intercept(conn, Breakpoint(bp)); + auto action = interceptor->intercept( + conn.get_local_shared_foreign_from_this(), + Breakpoint(bp)); socket->set_trap(type, action, &interceptor->blocker); } } @@ -784,7 +786,8 @@ void ProtocolV2::execute_connecting() // supports CONTINUE/FAULT/BLOCK if (conn.interceptor) { auto action = conn.interceptor->intercept( - conn, {custom_bp_t::SOCKET_CONNECTING}); + conn.get_local_shared_foreign_from_this(), + {custom_bp_t::SOCKET_CONNECTING}); switch (action) { case bp_action_t::CONTINUE: return seastar::now(); @@ -796,6 +799,7 @@ void ProtocolV2::execute_connecting() return conn.interceptor->blocker.block(); default: ceph_abort("unexpected action from trap"); + return seastar::now(); } } else { return seastar::now(); @@ -1064,7 +1068,8 @@ ProtocolV2::reuse_connection( has_socket = false; #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { - conn.interceptor->register_conn_replaced(conn); + conn.interceptor->register_conn_replaced( + conn.get_local_shared_foreign_from_this()); } #endif // close this connection because all the necessary information is delivered @@ -1485,7 +1490,8 @@ void ProtocolV2::execute_accepting() #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { auto action = conn.interceptor->intercept( - conn, {custom_bp_t::SOCKET_ACCEPTED}); + conn.get_local_shared_foreign_from_this(), + {custom_bp_t::SOCKET_ACCEPTED}); switch (action) { case bp_action_t::CONTINUE: break; @@ -2002,7 +2008,8 @@ void ProtocolV2::do_close( #ifdef UNIT_TESTS_BUILT closed_clean = true; if (conn.interceptor) { - conn.interceptor->register_conn_closed(conn); + conn.interceptor->register_conn_closed( + conn.get_local_shared_foreign_from_this()); } #endif }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index f0eccfe2d43..1cbc3cf3fba 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -36,7 +36,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger, #ifdef UNIT_TESTS_BUILT if (messenger.interceptor) { interceptor = messenger.interceptor; - interceptor->register_conn(*this); + interceptor->register_conn(this->get_local_shared_foreign_from_this()); } #endif } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 718d1da318a..c986370fc01 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -25,6 +25,7 @@ namespace crimson::net { class ProtocolV2; class SocketMessenger; +class SocketConnection; using SocketConnectionRef = seastar::shared_ptr; #ifdef UNIT_TESTS_BUILT diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 328ca724eb7..63e0c93c9fd 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -192,7 +192,9 @@ void IOHandler::set_io_state( dispatch_in = true; #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { - conn.interceptor->register_conn_ready(conn); + // FIXME: doesn't support cross-core + conn.interceptor->register_conn_ready( + conn.get_local_shared_foreign_from_this()); } #endif } else if (io_state == io_state_t::open) { diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 6fc9c1d7750..0107fa6ccc5 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -14,7 +14,6 @@ #include "crimson/net/Dispatcher.h" #include "crimson/net/Messenger.h" #include "crimson/net/Interceptor.h" -#include "crimson/net/SocketConnection.h" #include #include @@ -496,7 +495,6 @@ using crimson::net::Dispatcher; using crimson::net::Interceptor; using crimson::net::Messenger; using crimson::net::MessengerRef; -using crimson::net::SocketConnection; using crimson::net::SocketPolicy; using crimson::net::tag_bp_t; using namespace ceph::net::test; @@ -699,25 +697,24 @@ struct TestInterceptor : public Interceptor { } private: - void register_conn(SocketConnection& _conn) override { - auto conn = _conn.get_local_shared_foreign_from_this(); + void register_conn(ConnectionRef conn) override { auto result = find_result(conn); if (result != nullptr) { logger().error("The connection [{}] {} already exists when register {}", - result->index, *result->conn, _conn); + result->index, *result->conn, *conn); ceph_abort(); } unsigned index = results.size(); results.emplace_back(conn, index); conns[conn] = index; notify(); - logger().info("[{}] {} new connection registered", index, _conn); + logger().info("[{}] {} new connection registered", index, *conn); } - void register_conn_closed(SocketConnection& conn) override { - auto result = find_result(conn.get_local_shared_foreign_from_this()); + void register_conn_closed(ConnectionRef conn) override { + auto result = find_result(conn); if (result == nullptr) { - logger().error("Untracked closed connection: {}", conn); + logger().error("Untracked closed connection: {}", *conn); ceph_abort(); } @@ -725,39 +722,39 @@ struct TestInterceptor : public Interceptor { result->state = conn_state_t::closed; } notify(); - logger().info("[{}] {} closed({})", result->index, conn, result->state); + logger().info("[{}] {} closed({})", result->index, *conn, result->state); } - void register_conn_ready(SocketConnection& conn) override { - auto result = find_result(conn.get_local_shared_foreign_from_this()); + void register_conn_ready(ConnectionRef conn) override { + auto result = find_result(conn); if (result == nullptr) { - logger().error("Untracked ready connection: {}", conn); + logger().error("Untracked ready connection: {}", *conn); ceph_abort(); } - ceph_assert(conn.is_connected()); + ceph_assert(conn->is_connected()); notify(); - logger().info("[{}] {} ready", result->index, conn); + logger().info("[{}] {} ready", result->index, *conn); } - void register_conn_replaced(SocketConnection& conn) override { - auto result = find_result(conn.get_local_shared_foreign_from_this()); + void register_conn_replaced(ConnectionRef conn) override { + auto result = find_result(conn); if (result == nullptr) { - logger().error("Untracked replaced connection: {}", conn); + logger().error("Untracked replaced connection: {}", *conn); ceph_abort(); } result->state = conn_state_t::replaced; - logger().info("[{}] {} {}", result->index, conn, result->state); + logger().info("[{}] {} {}", result->index, *conn, result->state); } - bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override { + bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override { ++breakpoints_counter[bp].counter; - auto result = find_result(conn.get_local_shared_foreign_from_this()); + auto result = find_result(conn); if (result == nullptr) { logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", - conn, bp, breakpoints_counter[bp].counter); + *conn, bp, breakpoints_counter[bp].counter); ceph_abort(); } @@ -786,13 +783,13 @@ struct TestInterceptor : public Interceptor { auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter); if (it_cnt != it_bp->second.end()) { logger().info("[{}] {} intercepted {}({}) => {}", - result->index, conn, bp, + result->index, *conn, bp, breakpoints_counter[bp].counter, it_cnt->second); return it_cnt->second; } } logger().info("[{}] {} intercepted {}({})", - result->index, conn, bp, breakpoints_counter[bp].counter); + result->index, *conn, bp, breakpoints_counter[bp].counter); return bp_action_t::CONTINUE; } }; -- 2.39.5