From 4515c34977f8c54d459b0140305e24c9bb29a455 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 26 Jul 2023 12:52:57 +0800 Subject: [PATCH] test/crimson/test_messenger: relax the ordering checks to tracked_conn Signed-off-by: Yingxin Cheng --- src/test/crimson/test_messenger.cc | 117 ++++++++++++++++------------- 1 file changed, 66 insertions(+), 51 deletions(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 4dbb90bceb970..8b1ca07edb920 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -848,24 +848,24 @@ class FailoverSuite : public Dispatcher { TestInterceptor interceptor; unsigned tracked_index = 0; - Connection *tracked_conn; + Connection *tracked_conn = nullptr; unsigned pending_send = 0; unsigned pending_peer_receive = 0; unsigned pending_receive = 0; - std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { - auto result = interceptor.find_result(&*c); + std::optional> ms_dispatch(ConnectionRef conn, MessageRef m) override { + auto result = interceptor.find_result(&*conn); if (result == nullptr) { - logger().error("Untracked ms dispatched connection: {}", *c); + logger().error("Untracked ms dispatched connection: {}", *conn); ceph_abort(); } - if (tracked_conn != &*c) { - logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}", - result->index, *c, tracked_index, *tracked_conn); - ceph_abort(); + if (tracked_conn != &*conn) { + logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + } else { + ceph_assert(result->index == tracked_index); } - ceph_assert(result->index == tracked_index); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); ceph_assert(pending_receive > 0); @@ -874,7 +874,7 @@ class FailoverSuite : public Dispatcher { interceptor.notify(); } logger().info("[Test] got op, left {} ops -- [{}] {}", - pending_receive, result->index, *c); + pending_receive, result->index, *conn); return {seastar::now()}; } @@ -891,14 +891,14 @@ class FailoverSuite : public Dispatcher { if (tracked_conn && !tracked_conn->is_protocol_closed() && - tracked_conn != conn) { - logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}", + tracked_conn != &*conn) { + logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); ceph_abort(); } tracked_index = result->index; - tracked_conn = conn; + tracked_conn = &*conn; ++result->cnt_accept_dispatched; logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}", result->cnt_accept_dispatched, result->index, *conn); @@ -915,12 +915,17 @@ class FailoverSuite : public Dispatcher { ceph_abort(); } - if (tracked_conn != conn) { - logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}", + if (tracked_conn && + !tracked_conn->is_protocol_closed() && + tracked_conn != &*conn) { + logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}", result->index, *conn, tracked_index, *tracked_conn); ceph_abort(); } - ceph_assert(result->index == tracked_index); + + if (tracked_conn == &*conn) { + ceph_assert(result->index == tracked_index); + } ++result->cnt_connect_dispatched; logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}", @@ -934,15 +939,15 @@ class FailoverSuite : public Dispatcher { ceph_abort(); } - if (tracked_conn != conn) { - logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}", - result->index, *conn, tracked_index, *tracked_conn); - ceph_abort(); + if (tracked_conn != &*conn) { + logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + } else { + ceph_assert(result->index == tracked_index); + tracked_index = 0; + tracked_conn = nullptr; } - ceph_assert(result->index == tracked_index); - tracked_index = 0; - tracked_conn = nullptr; ++result->cnt_reset_dispatched; logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}", result->cnt_reset_dispatched, result->index, *conn); @@ -955,12 +960,12 @@ class FailoverSuite : public Dispatcher { ceph_abort(); } - if (tracked_conn != conn) { - logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}", - result->index, *conn, tracked_index, *tracked_conn); - ceph_abort(); + if (tracked_conn != &*conn) { + logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + } else { + ceph_assert(result->index == tracked_index); } - ceph_assert(result->index == tracked_index); ++result->cnt_remote_reset_dispatched; logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}", @@ -984,6 +989,7 @@ class FailoverSuite : public Dispatcher { seastar::future<> send_op(bool expect_reply=true) { ceph_assert(tracked_conn); + ceph_assert(!tracked_conn->is_protocol_closed()); if (expect_reply) { ++pending_peer_receive; } @@ -1000,6 +1006,7 @@ class FailoverSuite : public Dispatcher { logger().info("[Test] flush sending {} ops", pending_send); } ceph_assert(tracked_conn); + ceph_assert(!tracked_conn->is_protocol_closed()); return seastar::do_until( [this] { return pending_send == 0; }, [this] { @@ -1020,12 +1027,6 @@ class FailoverSuite : public Dispatcher { ++replaced_conns; } } else if (result.conn->is_protocol_ready()) { - if (tracked_conn != result.conn || tracked_index != result.index) { - throw std::runtime_error(fmt::format( - "The connected connection [{}] {} doesn't" - " match the tracked connection [{}] {}", - result.index, *result.conn, tracked_index, *tracked_conn)); - } if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) { result.state = conn_state_t::established; } else { @@ -1174,18 +1175,19 @@ class FailoverSuite : public Dispatcher { if (tracked_conn) { if (tracked_conn->is_protocol_closed()) { - ceph_assert(tracked_conn != conn); - logger().info("[Test] this is a new session replacing an closed one"); + logger().info("[Test] this is a new session" + " replacing an closed one"); + ceph_assert(tracked_conn != &*conn); } else { - ceph_assert(tracked_index == result->index); - ceph_assert(tracked_conn == conn); logger().info("[Test] this is not a new session"); + ceph_assert(tracked_index == result->index); + ceph_assert(tracked_conn == &*conn); } } else { logger().info("[Test] this is a new session"); } tracked_index = result->index; - tracked_conn = conn; + tracked_conn = &*conn; return flush_pending_send(); } @@ -1193,6 +1195,7 @@ class FailoverSuite : public Dispatcher { seastar::future<> send_peer() { if (tracked_conn) { logger().info("[Test] send_peer()"); + ceph_assert(!tracked_conn->is_protocol_closed()); ceph_assert(!pending_send); return send_op(); } else { @@ -1205,12 +1208,14 @@ class FailoverSuite : public Dispatcher { seastar::future<> keepalive_peer() { logger().info("[Test] keepalive_peer()"); ceph_assert(tracked_conn); + ceph_assert(!tracked_conn->is_protocol_closed()); return tracked_conn->send_keepalive(); } seastar::future<> try_send_peer() { logger().info("[Test] try_send_peer()"); ceph_assert(tracked_conn); + ceph_assert(!tracked_conn->is_protocol_closed()); return send_op(false); } @@ -1473,10 +1478,9 @@ class FailoverSuitePeer : public Dispatcher { ConnectionRef tracked_conn; unsigned pending_send = 0; - std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { + std::optional> ms_dispatch(ConnectionRef conn, MessageRef m) override { logger().info("[TestPeer] got op from Test"); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - ceph_assert(tracked_conn == c); std::ignore = op_callback(); return {seastar::now()}; } @@ -1487,17 +1491,19 @@ class FailoverSuitePeer : public Dispatcher { bool is_replace) override { assert(prv_shard == seastar::this_shard_id()); logger().info("[TestPeer] got accept from Test"); - ceph_assert(!tracked_conn || - tracked_conn->is_protocol_closed() || - tracked_conn == conn); + + if (tracked_conn && + !tracked_conn->is_protocol_closed() && + tracked_conn != conn) { + logger().error("[TestPeer] {} got accepted, but there's already a valid traced_conn {}", + *conn, *tracked_conn); + } tracked_conn = conn; std::ignore = flush_pending_send(); } void ms_handle_reset(ConnectionRef conn, bool is_replace) override { logger().info("[TestPeer] got reset from Test"); - ceph_assert(tracked_conn == conn); - tracked_conn = nullptr; } private: @@ -1516,6 +1522,11 @@ class FailoverSuitePeer : public Dispatcher { seastar::future<> send_op() { ceph_assert(tracked_conn); + if (tracked_conn->is_protocol_closed()) { + logger().error("[TestPeer] send op but the connection is closed -- {}", + *tracked_conn); + } + pg_t pgid; object_locator_t oloc; hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), @@ -1539,7 +1550,8 @@ class FailoverSuitePeer : public Dispatcher { public: FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback) - : peer_msgr(peer_msgr), op_callback(op_callback) { } + : peer_msgr(peer_msgr), + op_callback(op_callback) { } seastar::future<> shutdown() { peer_msgr->stop(); @@ -1548,26 +1560,29 @@ class FailoverSuitePeer : public Dispatcher { seastar::future<> connect_peer(entity_addr_t test_addr_decoded) { logger().info("[TestPeer] connect_peer({})", test_addr_decoded); - auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD); + auto conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD); + if (tracked_conn) { if (tracked_conn->is_protocol_closed()) { - ceph_assert(tracked_conn != new_tracked_conn); logger().info("[TestPeer] this is a new session" " replacing an closed one"); + ceph_assert(tracked_conn != conn); } else { - ceph_assert(tracked_conn == new_tracked_conn); logger().info("[TestPeer] this is not a new session"); + ceph_assert(tracked_conn == conn); } } else { logger().info("[TestPeer] this is a new session"); } - tracked_conn = new_tracked_conn; + tracked_conn = conn; + return flush_pending_send(); } seastar::future<> send_peer() { if (tracked_conn) { logger().info("[TestPeer] send_peer()"); + ceph_assert(!pending_send); return send_op(); } else { ++pending_send; -- 2.39.5