From ec6acfc352b1bd57b7e0d87114bbf51f2c0190b9 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 7 Jul 2023 10:13:18 +0800 Subject: [PATCH] test/crimson/test_messenger: implement multi-shard test_echo::test_state::Server Also introduces ShardedGates. Signed-off-by: Yingxin Cheng --- src/test/crimson/test_messenger.cc | 157 +++++++++++++++++++++-------- 1 file changed, 116 insertions(+), 41 deletions(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index bba62cc7974d4..ab98aa586bc4f 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -52,22 +53,91 @@ static entity_addr_t get_server_addr() { return saddr; } +template +seastar::future create_sharded(Args... args) { + // we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [=] { + auto sharded_obj = seastar::make_lw_shared>(); + return sharded_obj->start(args... + ).then([sharded_obj] { + seastar::engine().at_exit([sharded_obj] { + return sharded_obj->stop().then([sharded_obj] {}); + }); + return sharded_obj.get(); + }); + }).then([](seastar::sharded *ptr_shard) { + return &ptr_shard->local(); + }); +} + +class ShardedGates + : public seastar::peering_sharded_service { +public: + ShardedGates() = default; + ~ShardedGates() { + assert(gate.is_closed()); + } + + template + void dispatch_in_background(const char *what, Func &&f) { + std::ignore = seastar::with_gate( + container().local().gate, std::forward(f) + ).handle_exception([what](std::exception_ptr eptr) { + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + logger().error("ShardedGates::dispatch_in_background: " + "{} got exxception {}", what, e.what()); + } + }); + } + + seastar::future<> close() { + return container().invoke_on_all([](auto &local) { + return local.gate.close(); + }); + } + + static seastar::future create() { + return create_sharded(); + } + + // seastar::future<> stop() is intentially not implemented + +private: + seastar::gate gate; +}; + static seastar::future<> test_echo(unsigned rounds, double keepalive_ratio) { struct test_state { struct Server final : public crimson::net::Dispatcher { + ShardedGates &gates; crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; + Server(ShardedGates &gates) : gates{gates} {} + + void ms_handle_accept( + crimson::net::ConnectionRef conn, + seastar::shard_id new_shard, + bool is_replace) override { + logger().info("server accepted {}", *conn); + ceph_assert(new_shard == seastar::this_shard_id()); + ceph_assert(!is_replace); + } + std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { if (verbose) { logger().info("server got {}", *m); } // reply with a pong - std::ignore = c->send(crimson::make_message()); + gates.dispatch_in_background("echo_send_pong", [c] { + return c->send(crimson::make_message()); + }); return {seastar::now()}; } @@ -76,7 +146,7 @@ static seastar::future<> test_echo(unsigned rounds, const uint64_t nonce, const entity_addr_t& addr) { msgr = crimson::net::Messenger::create( - name, lname, nonce, true); + name, lname, nonce, false); msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); @@ -229,46 +299,51 @@ static seastar::future<> test_echo(unsigned rounds, logger().info("test_echo(rounds={}, keepalive_ratio={}):", rounds, keepalive_ratio); - auto server1 = seastar::make_shared(); - auto server2 = seastar::make_shared(); - auto client1 = seastar::make_shared(rounds, keepalive_ratio); - auto client2 = seastar::make_shared(rounds, keepalive_ratio); - // start servers and clients - auto addr1 = get_server_addr(); - auto addr2 = get_server_addr(); - addr1.set_type(entity_addr_t::TYPE_MSGR2); - addr2.set_type(entity_addr_t::TYPE_MSGR2); - return seastar::when_all_succeed( - server1->init(entity_name_t::OSD(0), "server1", 1, addr1), - server2->init(entity_name_t::OSD(1), "server2", 2, addr2), - client1->init(entity_name_t::OSD(2), "client1", 3), - client2->init(entity_name_t::OSD(3), "client2", 4) - // dispatch pingpoing - ).then_unpack([client1, client2, server1, server2] { + return ShardedGates::create( + ).then([rounds, keepalive_ratio](auto *gates) { + auto server1 = seastar::make_shared(*gates); + auto server2 = seastar::make_shared(*gates); + auto client1 = seastar::make_shared(rounds, keepalive_ratio); + auto client2 = seastar::make_shared(rounds, keepalive_ratio); + // start servers and clients + auto addr1 = get_server_addr(); + auto addr2 = get_server_addr(); + addr1.set_type(entity_addr_t::TYPE_MSGR2); + addr2.set_type(entity_addr_t::TYPE_MSGR2); return seastar::when_all_succeed( - // test connecting in parallel, accepting in parallel - client1->dispatch_pingpong(server2->msgr->get_myaddr()), - client2->dispatch_pingpong(server1->msgr->get_myaddr())); - // shutdown - }).then_unpack([] { - return seastar::now(); - }).then([client1] { - logger().info("client1 shutdown..."); - return client1->shutdown(); - }).then([client2] { - logger().info("client2 shutdown..."); - return client2->shutdown(); - }).then([server1] { - logger().info("server1 shutdown..."); - return server1->shutdown(); - }).then([server2] { - logger().info("server2 shutdown..."); - return server2->shutdown(); - }).then([] { - logger().info("test_echo() done!\n"); - }).handle_exception([server1, server2, client1, client2] (auto eptr) { - logger().error("test_echo() failed: got exception {}", eptr); - throw; + server1->init(entity_name_t::OSD(0), "server1", 1, addr1), + server2->init(entity_name_t::OSD(1), "server2", 2, addr2), + client1->init(entity_name_t::OSD(2), "client1", 3), + client2->init(entity_name_t::OSD(3), "client2", 4) + // dispatch pingpoing + ).then_unpack([client1, client2, server1, server2] { + return seastar::when_all_succeed( + // test connecting in parallel, accepting in parallel + client1->dispatch_pingpong(server2->msgr->get_myaddr()), + client2->dispatch_pingpong(server1->msgr->get_myaddr())); + // shutdown + }).then_unpack([] { + return seastar::now(); + }).then([client1] { + logger().info("client1 shutdown..."); + return client1->shutdown(); + }).then([client2] { + logger().info("client2 shutdown..."); + return client2->shutdown(); + }).then([server1] { + logger().info("server1 shutdown..."); + return server1->shutdown(); + }).then([server2] { + logger().info("server2 shutdown..."); + return server2->shutdown(); + }).then([] { + logger().info("test_echo() done!\n"); + }).handle_exception([server1, server2, client1, client2] (auto eptr) { + logger().error("test_echo() failed: got exception {}", eptr); + throw; + }).finally([gates] { + return gates->close(); + }); }); } -- 2.39.5