From d65ba2d018b14d87252f50baa890350a6e7b38e3 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 3 Aug 2023 15:42:45 +0800 Subject: [PATCH] test/crimson/test_messenger: support cross-core protocol-level tests Signed-off-by: Yingxin Cheng --- src/test/crimson/test_messenger.cc | 407 +++++++++++++++++------------ 1 file changed, 239 insertions(+), 168 deletions(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 1359b5aeb62ea..3c6616cbd6a16 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -671,15 +671,18 @@ struct TestInterceptor : public Interceptor { std::map conns; ConnResults results; std::optional signal; + const seastar::shard_id primary_sid; + + TestInterceptor() : primary_sid{seastar::this_shard_id()} {} - TestInterceptor() = default; // only used for copy breakpoint configurations - TestInterceptor(const TestInterceptor& other) { + TestInterceptor(const TestInterceptor& other) : primary_sid{other.primary_sid} { assert(other.breakpoints_counter.empty()); assert(other.conns.empty()); assert(other.results.empty()); breakpoints = other.breakpoints; assert(!other.signal); + assert(seastar::this_shard_id() == primary_sid); } void make_fault(Breakpoint bp, unsigned round = 1) { @@ -698,6 +701,7 @@ struct TestInterceptor : public Interceptor { } ConnResult* find_result(Connection *conn) { + assert(seastar::this_shard_id() == primary_sid); auto it = conns.find(conn); if (it == conns.end()) { return nullptr; @@ -707,6 +711,7 @@ struct TestInterceptor : public Interceptor { } seastar::future<> wait() { + assert(seastar::this_shard_id() == primary_sid); assert(!signal); signal = seastar::abort_source(); return seastar::sleep_abortable(10s, *signal).then([] { @@ -717,6 +722,7 @@ struct TestInterceptor : public Interceptor { } void notify() { + assert(seastar::this_shard_id() == primary_sid); if (signal) { signal->request_abort(); signal = std::nullopt; @@ -778,65 +784,68 @@ struct TestInterceptor : public Interceptor { seastar::future intercept(Connection &_conn, std::vector bps) override { assert(bps.size() >= 1); + Connection *conn = &_conn; - std::vector actions; - for (const Breakpoint &bp : bps) { - ++breakpoints_counter[bp].counter; + return seastar::smp::submit_to(primary_sid, [conn, bps, this] { + std::vector actions; + for (const Breakpoint &bp : bps) { + ++breakpoints_counter[bp].counter; - auto result = find_result(&*conn); - if (result == nullptr) { - logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", - *conn, bp, breakpoints_counter[bp].counter); - ceph_abort(); - } + auto result = find_result(&*conn); + if (result == nullptr) { + logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", + *conn, bp, breakpoints_counter[bp].counter); + ceph_abort(); + } - if (bp == custom_bp_t::SOCKET_CONNECTING) { - ++result->connect_attempts; - logger().info("[Test] connect_attempts={}", result->connect_attempts); - } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) { - ++result->client_connect_attempts; - logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts); - } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) { - ++result->client_reconnect_attempts; - logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts); - } else if (bp == custom_bp_t::SOCKET_ACCEPTED) { - ++result->accept_attempts; - logger().info("[Test] accept_attempts={}", result->accept_attempts); - } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) { - ++result->server_connect_attempts; - logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts); - } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) { - ++result->server_reconnect_attempts; - logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts); - } + if (bp == custom_bp_t::SOCKET_CONNECTING) { + ++result->connect_attempts; + logger().info("[Test] connect_attempts={}", result->connect_attempts); + } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) { + ++result->client_connect_attempts; + logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts); + } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) { + ++result->client_reconnect_attempts; + logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts); + } else if (bp == custom_bp_t::SOCKET_ACCEPTED) { + ++result->accept_attempts; + logger().info("[Test] accept_attempts={}", result->accept_attempts); + } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) { + ++result->server_connect_attempts; + logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts); + } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) { + ++result->server_reconnect_attempts; + logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts); + } - auto it_bp = breakpoints.find(bp); - if (it_bp != breakpoints.end()) { - auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter); - if (it_cnt != it_bp->second.end()) { - logger().info("[{}] {} intercepted {}({}) => {}", - result->index, *conn, bp, - breakpoints_counter[bp].counter, it_cnt->second); - actions.emplace_back(it_cnt->second); - continue; + auto it_bp = breakpoints.find(bp); + if (it_bp != breakpoints.end()) { + auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter); + if (it_cnt != it_bp->second.end()) { + logger().info("[{}] {} intercepted {}({}) => {}", + result->index, *conn, bp, + breakpoints_counter[bp].counter, it_cnt->second); + actions.emplace_back(it_cnt->second); + continue; + } } + logger().info("[{}] {} intercepted {}({})", + result->index, *conn, bp, breakpoints_counter[bp].counter); + actions.emplace_back(bp_action_t::CONTINUE); } - logger().info("[{}] {} intercepted {}({})", - result->index, *conn, bp, breakpoints_counter[bp].counter); - actions.emplace_back(bp_action_t::CONTINUE); - } - bp_action_t action = bp_action_t::CONTINUE; - for (bp_action_t &a : actions) { - if (a != bp_action_t::CONTINUE) { - if (action == bp_action_t::CONTINUE) { - action = a; - } else { - ceph_abort("got multiple incompatible actions"); + bp_action_t action = bp_action_t::CONTINUE; + for (bp_action_t &a : actions) { + if (a != bp_action_t::CONTINUE) { + if (action == bp_action_t::CONTINUE) { + action = a; + } else { + ceph_abort("got multiple incompatible actions"); + } } } - } - return seastar::make_ready_future(action); + return seastar::make_ready_future(action); + }); } }; @@ -872,123 +881,158 @@ class FailoverSuite : public Dispatcher { unsigned pending_peer_receive = 0; unsigned pending_receive = 0; - std::optional> ms_dispatch(ConnectionRef conn, MessageRef m) override { - auto result = interceptor.find_result(&*conn); - if (result == nullptr) { - logger().error("Untracked ms dispatched connection: {}", *conn); - ceph_abort(); - } - - if (tracked_conn != &*conn) { - logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}", - result->index, *conn, tracked_index, *tracked_conn); - } else { - ceph_assert(result->index == tracked_index); - } + ShardedGates &gates; + const seastar::shard_id primary_sid; + std::optional> ms_dispatch( + ConnectionRef conn_ref, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - ceph_assert(pending_receive > 0); - --pending_receive; - if (pending_receive == 0) { - interceptor.notify(); - } - logger().info("[Test] got op, left {} ops -- [{}] {}", - pending_receive, result->index, *conn); + Connection *conn = &*conn_ref; + gates.dispatch_in_background("TestSuite_ms_dispatch", + [this, conn, conn_ref] { + return seastar::smp::submit_to(primary_sid, [this, conn] { + auto result = interceptor.find_result(&*conn); + if (result == nullptr) { + logger().error("Untracked ms dispatched connection: {}", *conn); + ceph_abort(); + } + + if (tracked_conn != &*conn) { + logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + } else { + ceph_assert(result->index == tracked_index); + } + + ceph_assert(pending_receive > 0); + --pending_receive; + if (pending_receive == 0) { + interceptor.notify(); + } + logger().info("[Test] got op, left {} ops -- [{}] {}", + pending_receive, result->index, *conn); + }).then([conn_ref] {}); + }); return {seastar::now()}; } void ms_handle_accept( - ConnectionRef conn, + ConnectionRef conn_ref, seastar::shard_id prv_shard, bool is_replace) override { - assert(prv_shard == seastar::this_shard_id()); - auto result = interceptor.find_result(&*conn); - if (result == nullptr) { - logger().error("Untracked accepted connection: {}", *conn); - ceph_abort(); - } + Connection *conn = &*conn_ref; + gates.dispatch_in_background("TestSuite_ms_dispatch", + [this, conn, conn_ref] { + return seastar::smp::submit_to(primary_sid, [this, conn] { + auto result = interceptor.find_result(&*conn); + if (result == nullptr) { + logger().error("Untracked accepted connection: {}", *conn); + ceph_abort(); + } - if (tracked_conn && - !tracked_conn->is_protocol_closed() && - tracked_conn != &*conn) { - logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}", - result->index, *conn, tracked_index, *tracked_conn); - ceph_abort(); - } + if (tracked_conn && + !tracked_conn->is_protocol_closed() && + tracked_conn != &*conn) { + logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + ceph_abort(); + } - tracked_index = result->index; - tracked_conn = &*conn; - ++result->cnt_accept_dispatched; - logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}", - result->cnt_accept_dispatched, result->index, *conn); - std::ignore = flush_pending_send(); + tracked_index = result->index; + tracked_conn = &*conn; + ++result->cnt_accept_dispatched; + logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}", + result->cnt_accept_dispatched, result->index, *conn); + return flush_pending_send(); + }).then([conn_ref] {}); + }); } void ms_handle_connect( - ConnectionRef conn, + ConnectionRef conn_ref, seastar::shard_id prv_shard) override { - assert(prv_shard == seastar::this_shard_id()); - auto result = interceptor.find_result(&*conn); - if (result == nullptr) { - logger().error("Untracked connected connection: {}", *conn); - ceph_abort(); - } + Connection *conn = &*conn_ref; + gates.dispatch_in_background("TestSuite_ms_dispatch", + [this, conn, conn_ref] { + return seastar::smp::submit_to(primary_sid, [this, conn] { + auto result = interceptor.find_result(&*conn); + if (result == nullptr) { + logger().error("Untracked connected connection: {}", *conn); + ceph_abort(); + } - if (tracked_conn && - !tracked_conn->is_protocol_closed() && - tracked_conn != &*conn) { - logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}", - result->index, *conn, tracked_index, *tracked_conn); - ceph_abort(); - } + if (tracked_conn && + !tracked_conn->is_protocol_closed() && + tracked_conn != &*conn) { + logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + ceph_abort(); + } - if (tracked_conn == &*conn) { - ceph_assert(result->index == tracked_index); - } + if (tracked_conn == &*conn) { + ceph_assert(result->index == tracked_index); + } - ++result->cnt_connect_dispatched; - logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}", - result->cnt_connect_dispatched, result->index, *conn); + ++result->cnt_connect_dispatched; + logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}", + result->cnt_connect_dispatched, result->index, *conn); + }).then([conn_ref] {}); + }); } - void ms_handle_reset(ConnectionRef conn, bool is_replace) override { - auto result = interceptor.find_result(&*conn); - if (result == nullptr) { - logger().error("Untracked reset connection: {}", *conn); - ceph_abort(); - } + void ms_handle_reset( + ConnectionRef conn_ref, + bool is_replace) override { + Connection *conn = &*conn_ref; + gates.dispatch_in_background("TestSuite_ms_dispatch", + [this, conn, conn_ref] { + return seastar::smp::submit_to(primary_sid, [this, conn] { + auto result = interceptor.find_result(&*conn); + if (result == nullptr) { + logger().error("Untracked reset connection: {}", *conn); + ceph_abort(); + } - if (tracked_conn != &*conn) { - logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}", - result->index, *conn, tracked_index, *tracked_conn); - } else { - ceph_assert(result->index == tracked_index); - tracked_index = 0; - tracked_conn = nullptr; - } + if (tracked_conn != &*conn) { + logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + } else { + ceph_assert(result->index == tracked_index); + tracked_index = 0; + tracked_conn = nullptr; + } - ++result->cnt_reset_dispatched; - logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}", - result->cnt_reset_dispatched, result->index, *conn); + ++result->cnt_reset_dispatched; + logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}", + result->cnt_reset_dispatched, result->index, *conn); + }).then([conn_ref] {}); + }); } - void ms_handle_remote_reset(ConnectionRef conn) override { - auto result = interceptor.find_result(&*conn); - if (result == nullptr) { - logger().error("Untracked remotely reset connection: {}", *conn); - ceph_abort(); - } + void ms_handle_remote_reset( + ConnectionRef conn_ref) override { + Connection *conn = &*conn_ref; + gates.dispatch_in_background("TestSuite_ms_dispatch", + [this, conn, conn_ref] { + return seastar::smp::submit_to(primary_sid, [this, conn] { + auto result = interceptor.find_result(&*conn); + if (result == nullptr) { + logger().error("Untracked remotely reset connection: {}", *conn); + ceph_abort(); + } - if (tracked_conn != &*conn) { - logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}", - result->index, *conn, tracked_index, *tracked_conn); - } else { - ceph_assert(result->index == tracked_index); - } + if (tracked_conn != &*conn) { + logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}", + result->index, *conn, tracked_index, *tracked_conn); + } else { + ceph_assert(result->index == tracked_index); + } - ++result->cnt_remote_reset_dispatched; - logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}", - result->cnt_remote_reset_dispatched, result->index, *conn); + ++result->cnt_remote_reset_dispatched; + logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}", + result->cnt_remote_reset_dispatched, result->index, *conn); + }).then([conn_ref] {}); + }); } private: @@ -1037,6 +1081,7 @@ class FailoverSuite : public Dispatcher { seastar::future<> wait_ready(unsigned num_ready_conns, unsigned num_replaced, bool wait_received) { + assert(seastar::this_shard_id() == primary_sid); unsigned pending_conns = 0; unsigned pending_establish = 0; unsigned replaced_conns = 0; @@ -1116,10 +1161,13 @@ class FailoverSuite : public Dispatcher { public: FailoverSuite(MessengerRef test_msgr, entity_addr_t test_peer_addr, - const TestInterceptor& interceptor) + const TestInterceptor& interceptor, + ShardedGates &gates) : test_msgr(test_msgr), test_peer_addr(test_peer_addr), - interceptor(interceptor) { } + interceptor(interceptor), + gates{gates}, + primary_sid{seastar::this_shard_id()} { } entity_addr_t get_addr() const { return test_msgr->get_myaddr(); @@ -1170,14 +1218,17 @@ class FailoverSuite : public Dispatcher { create(entity_addr_t test_addr, SocketPolicy test_policy, entity_addr_t test_peer_addr, - const TestInterceptor& interceptor) { + const TestInterceptor& interceptor, + ShardedGates &gates) { auto suite = std::make_unique( Messenger::create( entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE, true), - test_peer_addr, interceptor); + test_peer_addr, + interceptor, + gates); return suite->init(test_addr, test_policy ).then([suite = std::move(suite)] () mutable { return std::move(suite); @@ -1188,6 +1239,7 @@ class FailoverSuite : public Dispatcher { public: seastar::future<> connect_peer() { logger().info("[Test] connect_peer({})", test_peer_addr); + assert(seastar::this_shard_id() == primary_sid); auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD); auto result = interceptor.find_result(&*conn); ceph_assert(result != nullptr); @@ -1212,6 +1264,7 @@ class FailoverSuite : public Dispatcher { } seastar::future<> send_peer() { + assert(seastar::this_shard_id() == primary_sid); if (tracked_conn) { logger().info("[Test] send_peer()"); ceph_assert(!tracked_conn->is_protocol_closed()); @@ -1226,6 +1279,7 @@ class FailoverSuite : public Dispatcher { seastar::future<> keepalive_peer() { logger().info("[Test] keepalive_peer()"); + assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); ceph_assert(!tracked_conn->is_protocol_closed()); return tracked_conn->send_keepalive(); @@ -1233,6 +1287,7 @@ class FailoverSuite : public Dispatcher { seastar::future<> try_send_peer() { logger().info("[Test] try_send_peer()"); + assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); ceph_assert(!tracked_conn->is_protocol_closed()); return send_op(false); @@ -1240,21 +1295,28 @@ class FailoverSuite : public Dispatcher { seastar::future<> markdown() { logger().info("[Test] markdown() in 100ms ..."); + assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); // sleep to propagate potential remaining acks return seastar::sleep(100ms ).then([this] { - tracked_conn->mark_down(); + return seastar::smp::submit_to( + tracked_conn->get_shard_id(), [tracked_conn=tracked_conn] { + assert(tracked_conn->get_shard_id() == seastar::this_shard_id()); + tracked_conn->mark_down(); + }); }); } seastar::future<> wait_blocked() { logger().info("[Test] wait_blocked() ..."); + assert(seastar::this_shard_id() == primary_sid); return interceptor.blocker.wait_blocked(); } void unblock() { logger().info("[Test] unblock()"); + assert(seastar::this_shard_id() == primary_sid); return interceptor.blocker.unblock(); } @@ -1277,6 +1339,7 @@ class FailoverSuite : public Dispatcher { } bool is_standby() { + assert(seastar::this_shard_id() == primary_sid); ceph_assert(tracked_conn); return tracked_conn->is_protocol_standby(); } @@ -1417,26 +1480,34 @@ class FailoverTest : public Dispatcher { logger().info("\n\n[{}]", name); ceph_assert(!test_suite); SocketPolicy test_policy_ = to_socket_policy(test_policy); - return FailoverSuite::create( - test_addr, test_policy_, test_peer_addr, interceptor - ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable { - ceph_assert(suite->get_addr() == test_addr); - test_suite.swap(suite); - return start_peer(peer_policy).then([this, f = std::move(f)] { - return f(*test_suite); - }).then([this] { - test_suite->post_check(); - logger().info("\n[SUCCESS]"); - }).handle_exception([this] (auto eptr) { - logger().info("\n[FAIL: {}]", eptr); - test_suite->dump_results(); - throw; - }).then([this] { - return stop_peer(); - }).then([this] { - return test_suite->shutdown().then([this] { - test_suite.reset(); + return ShardedGates::create( + ).then([this, test_policy_, peer_policy, interceptor, + f=std::move(f)](auto *gates) mutable { + return FailoverSuite::create( + test_addr, test_policy_, test_peer_addr, interceptor, *gates + ).then([this, peer_policy, f = std::move(f)](auto suite) mutable { + ceph_assert(suite->get_addr() == test_addr); + test_suite.swap(suite); + return start_peer(peer_policy + ).then([this, f = std::move(f)] { + return f(*test_suite); + }).then([this] { + test_suite->post_check(); + logger().info("\n[SUCCESS]"); + }).handle_exception([this](auto eptr) { + logger().info("\n[FAIL: {}]", eptr); + test_suite->dump_results(); + throw; + }).then([this] { + return stop_peer(); + }).then([this] { + return test_suite->shutdown( + ).then([this] { + test_suite.reset(); + }); }); + }).then([gates] { + return gates->close(); }); }); } -- 2.39.5