From 7400c86073e3e1ee3d79ce9067af2d3729efa9b6 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 21 Jul 2023 11:17:32 +0800 Subject: [PATCH] test/crimson/test_messenger: move all connection state checks to the protocol level Signed-off-by: Yingxin Cheng (cherry picked from commit 933745cd815f7cc81438a89b3ba9ac5fc2ef61b7) --- src/crimson/net/Connection.h | 8 +++- src/crimson/net/ProtocolV2.cc | 7 +++ src/crimson/net/ProtocolV2.h | 8 ++++ src/crimson/net/SocketConnection.cc | 15 +++++- src/crimson/net/SocketConnection.h | 8 +++- src/crimson/net/io_handler.cc | 7 --- src/test/crimson/test_messenger.cc | 73 ++++++++++++++++------------- 7 files changed, 80 insertions(+), 46 deletions(-) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 4339e41f0c2..7141e20f476 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -121,9 +121,13 @@ class Connection : public seastar::enable_shared_from_this { virtual void print(std::ostream& out) const = 0; #ifdef UNIT_TESTS_BUILT - virtual bool is_closed() const = 0; + virtual bool is_protocol_ready() const = 0; - virtual bool is_closed_clean() const = 0; + virtual bool is_protocol_standby() const = 0; + + virtual bool is_protocol_closed() const = 0; + + virtual bool is_protocol_closed_clean() const = 0; virtual bool peer_wins() const = 0; #endif diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 045022b353c..8bb9f7b6821 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -2112,6 +2112,13 @@ void ProtocolV2::execute_ready() // I'm not responsible to shutdown the socket at READY is_socket_valid = false; trigger_state(state_t::READY, io_state_t::open); +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + // FIXME: doesn't support cross-core + conn.interceptor->register_conn_ready( + conn.get_local_shared_foreign_from_this()); + } +#endif } // STANDBY state diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index a9aa4ecdf76..dd7a1e7039b 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -53,6 +53,14 @@ public: seastar::future<> close_clean_yielded(); #ifdef UNIT_TESTS_BUILT + bool is_ready() const { + return state == state_t::READY; + } + + bool is_standby() const { + return state == state_t::STANDBY; + } + bool is_closed_clean() const { return closed_clean; } diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 908977da36b..57e5c12c1ae 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -49,13 +49,24 @@ bool SocketConnection::is_connected() const } #ifdef UNIT_TESTS_BUILT -bool SocketConnection::is_closed() const +bool SocketConnection::is_protocol_ready() const +{ + assert(seastar::this_shard_id() == msgr_sid); + return protocol->is_ready(); +} + +bool SocketConnection::is_protocol_standby() const { + assert(seastar::this_shard_id() == msgr_sid); + return protocol->is_standby(); +} + +bool SocketConnection::is_protocol_closed() const { assert(seastar::this_shard_id() == msgr_sid); return protocol->is_closed(); } -bool SocketConnection::is_closed_clean() const +bool SocketConnection::is_protocol_closed_clean() const { assert(seastar::this_shard_id() == msgr_sid); return protocol->is_closed_clean(); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 102e1795831..823d6c574da 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -171,9 +171,13 @@ private: void set_socket(Socket *s); #ifdef UNIT_TESTS_BUILT - bool is_closed_clean() const override; + bool is_protocol_ready() const override; - bool is_closed() const override; + bool is_protocol_standby() const override; + + bool is_protocol_closed_clean() const override; + + bool is_protocol_closed() const override; // peer wins if myaddr > peeraddr bool peer_wins() const override; diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 8b774e678b7..15d5509dc16 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -293,13 +293,6 @@ void IOHandler::do_set_io_state( ceph_assert_always(protocol_is_connected == true); assign_frame_assembler(std::move(fa)); dispatch_in = true; -#ifdef UNIT_TESTS_BUILT - if (conn.interceptor) { - // FIXME: doesn't support cross-core - conn.interceptor->register_conn_ready( - conn.get_local_shared_foreign_from_this()); - } -#endif } else if (prv_state == io_state_t::open) { // from open ceph_assert_always(protocol_is_connected == true); diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 8f55412c9af..4dbb90bceb9 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -668,7 +668,7 @@ using ConnResults = std::vector; struct TestInterceptor : public Interceptor { std::map> breakpoints; std::map breakpoints_counter; - std::map conns; + std::map conns; ConnResults results; std::optional signal; @@ -697,7 +697,7 @@ struct TestInterceptor : public Interceptor { breakpoints[bp][round] = bp_action_t::STALL; } - ConnResult* find_result(ConnectionRef conn) { + ConnResult* find_result(Connection *conn) { auto it = conns.find(conn); if (it == conns.end()) { return nullptr; @@ -725,7 +725,7 @@ struct TestInterceptor : public Interceptor { private: void register_conn(ConnectionRef conn) override { - auto result = find_result(conn); + auto result = find_result(&*conn); if (result != nullptr) { logger().error("The connection [{}] {} already exists when register {}", result->index, *result->conn, *conn); @@ -733,13 +733,13 @@ struct TestInterceptor : public Interceptor { } unsigned index = results.size(); results.emplace_back(conn, index); - conns[conn] = index; + conns[&*conn] = index; notify(); logger().info("[{}] {} new connection registered", index, *conn); } void register_conn_closed(ConnectionRef conn) override { - auto result = find_result(conn); + auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked closed connection: {}", *conn); ceph_abort(); @@ -753,19 +753,19 @@ struct TestInterceptor : public Interceptor { } void register_conn_ready(ConnectionRef conn) override { - auto result = find_result(conn); + auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked ready connection: {}", *conn); ceph_abort(); } - ceph_assert(conn->is_connected()); + ceph_assert(conn->is_protocol_ready()); notify(); logger().info("[{}] {} ready", result->index, *conn); } void register_conn_replaced(ConnectionRef conn) override { - auto result = find_result(conn); + auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked replaced connection: {}", *conn); ceph_abort(); @@ -778,7 +778,7 @@ struct TestInterceptor : public Interceptor { bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override { ++breakpoints_counter[bp].counter; - auto result = find_result(conn); + auto result = find_result(&*conn); if (result == nullptr) { logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", *conn, bp, breakpoints_counter[bp].counter); @@ -848,19 +848,19 @@ class FailoverSuite : public Dispatcher { TestInterceptor interceptor; unsigned tracked_index = 0; - ConnectionRef tracked_conn; + Connection *tracked_conn; unsigned pending_send = 0; unsigned pending_peer_receive = 0; unsigned pending_receive = 0; std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { - auto result = interceptor.find_result(c); + auto result = interceptor.find_result(&*c); if (result == nullptr) { logger().error("Untracked ms dispatched connection: {}", *c); ceph_abort(); } - if (tracked_conn != c) { + if (tracked_conn != &*c) { logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}", result->index, *c, tracked_index, *tracked_conn); ceph_abort(); @@ -883,14 +883,14 @@ class FailoverSuite : public Dispatcher { seastar::shard_id prv_shard, bool is_replace) override { assert(prv_shard == seastar::this_shard_id()); - auto result = interceptor.find_result(conn); + auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked accepted connection: {}", *conn); ceph_abort(); } if (tracked_conn && - !tracked_conn->is_closed() && + !tracked_conn->is_protocol_closed() && tracked_conn != conn) { logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); @@ -909,7 +909,7 @@ class FailoverSuite : public Dispatcher { ConnectionRef conn, seastar::shard_id prv_shard) override { assert(prv_shard == seastar::this_shard_id()); - auto result = interceptor.find_result(conn); + auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked connected connection: {}", *conn); ceph_abort(); @@ -928,7 +928,7 @@ class FailoverSuite : public Dispatcher { } void ms_handle_reset(ConnectionRef conn, bool is_replace) override { - auto result = interceptor.find_result(conn); + auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked reset connection: {}", *conn); ceph_abort(); @@ -949,7 +949,7 @@ class FailoverSuite : public Dispatcher { } void ms_handle_remote_reset(ConnectionRef conn) override { - auto result = interceptor.find_result(conn); + auto result = interceptor.find_result(&*conn); if (result == nullptr) { logger().error("Untracked remotely reset connection: {}", *conn); ceph_abort(); @@ -1015,11 +1015,11 @@ class FailoverSuite : public Dispatcher { unsigned pending_establish = 0; unsigned replaced_conns = 0; for (auto& result : interceptor.results) { - if (result.conn->is_closed_clean()) { + if (result.conn->is_protocol_closed_clean()) { if (result.state == conn_state_t::replaced) { ++replaced_conns; } - } else if (result.conn->is_connected()) { + } else if (result.conn->is_protocol_ready()) { if (tracked_conn != result.conn || tracked_index != result.index) { throw std::runtime_error(fmt::format( "The connected connection [{}] {} doesn't" @@ -1049,15 +1049,22 @@ class FailoverSuite : public Dispatcher { do_wait = true; } } - if (wait_received && - (pending_send || pending_peer_receive || pending_receive)) { - if (pending_conns || pending_establish) { - logger().info("[Test] wait_ready(): wait for pending_send={}," - " pending_peer_receive={}, pending_receive={}," - " pending {}/{} ready/establish connections ...", - pending_send, pending_peer_receive, pending_receive, - pending_conns, pending_establish); - do_wait = true; + if (wait_received) { + if (pending_send || pending_peer_receive || pending_receive) { + if (pending_conns || pending_establish) { + logger().info("[Test] wait_ready(): wait for pending_send={}," + " pending_peer_receive={}, pending_receive={}," + " pending {}/{} ready/establish connections ...", + pending_send, pending_peer_receive, pending_receive, + pending_conns, pending_establish); + do_wait = true; + } else { + // If there are pending messages, stop waiting if there are + // no longer pending connections. + } + } else { + // Stop waiting if there are no pending messages. Pending connections + // should not be important. } } if (num_replaced > 0) { @@ -1162,11 +1169,11 @@ class FailoverSuite : public Dispatcher { seastar::future<> connect_peer() { logger().info("[Test] connect_peer({})", test_peer_addr); auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD); - auto result = interceptor.find_result(conn); + auto result = interceptor.find_result(&*conn); ceph_assert(result != nullptr); if (tracked_conn) { - if (tracked_conn->is_closed()) { + if (tracked_conn->is_protocol_closed()) { ceph_assert(tracked_conn != conn); logger().info("[Test] this is a new session replacing an closed one"); } else { @@ -1247,7 +1254,7 @@ class FailoverSuite : public Dispatcher { bool is_standby() { ceph_assert(tracked_conn); - return !(tracked_conn->is_connected() || tracked_conn->is_closed()); + return tracked_conn->is_protocol_standby(); } }; @@ -1481,7 +1488,7 @@ class FailoverSuitePeer : public Dispatcher { assert(prv_shard == seastar::this_shard_id()); logger().info("[TestPeer] got accept from Test"); ceph_assert(!tracked_conn || - tracked_conn->is_closed() || + tracked_conn->is_protocol_closed() || tracked_conn == conn); tracked_conn = conn; std::ignore = flush_pending_send(); @@ -1543,7 +1550,7 @@ class FailoverSuitePeer : public Dispatcher { logger().info("[TestPeer] connect_peer({})", test_addr_decoded); auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD); if (tracked_conn) { - if (tracked_conn->is_closed()) { + if (tracked_conn->is_protocol_closed()) { ceph_assert(tracked_conn != new_tracked_conn); logger().info("[TestPeer] this is a new session" " replacing an closed one"); -- 2.39.5