From 96b3a622c44d61b8c86a82c3f8fdaf75f3b070e6 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 10 Jul 2023 15:18:22 +0800 Subject: [PATCH] test/crimson/test_messenger: implement multi-shard test_echo::test_state::Client Signed-off-by: Yingxin Cheng (cherry picked from commit 9b5182779d76043c428e664f201acca5986aab7c) --- src/test/crimson/test_messenger.cc | 214 ++++++++++++++++++----------- 1 file changed, 130 insertions(+), 84 deletions(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 6351eedef0a20..1f2d4628dccfc 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -166,8 +166,73 @@ static seastar::future<> test_echo(unsigned rounds, } }; - struct Client final - : public crimson::net::Dispatcher { + class Client final + : public crimson::net::Dispatcher, + public seastar::peering_sharded_service { + public: + Client(seastar::shard_id primary_sid, + unsigned rounds, + double keepalive_ratio, + ShardedGates *gates) + : primary_sid{primary_sid}, + keepalive_dist(std::bernoulli_distribution{keepalive_ratio}), + rounds(rounds), + gates{*gates} {} + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + assert(seastar::this_shard_id() == primary_sid); + msgr = crimson::net::Messenger::create( + name, lname, nonce, false); + msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->start({this}); + } + + seastar::future<> shutdown() { + assert(seastar::this_shard_id() == primary_sid); + ceph_assert(msgr); + msgr->stop(); + return msgr->shutdown(); + } + + seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) { + assert(seastar::this_shard_id() == primary_sid); + mono_time start_time = mono_clock::now(); + auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD); + return seastar::futurize_invoke([this, conn] { + return do_dispatch_pingpong(conn); + }).then([] { + // 500ms should be enough to establish the connection + return seastar::sleep(500ms); + }).then([this, conn, start_time] { + return container().invoke_on( + conn->get_shard_id(), + [pconn=&*conn, start_time](auto &local) { + assert(pconn->is_connected()); + auto session = local.find_session(pconn); + std::chrono::duration dur_handshake = session->connected_time - start_time; + std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; + logger().info("{}: handshake {}, pingpong {}", + *pconn, dur_handshake.count(), dur_pingpong.count()); + }).then([conn] {}); + }); + } + + static seastar::future create( + unsigned rounds, + double keepalive_ratio, + ShardedGates *gates) { + return create_sharded( + seastar::this_shard_id(), + rounds, + keepalive_ratio, + gates); + } + + private: struct PingSession : public seastar::enable_shared_from_this { unsigned count = 0u; mono_time connected_time; @@ -175,38 +240,22 @@ static seastar::future<> test_echo(unsigned rounds, }; using PingSessionRef = seastar::shared_ptr; - unsigned rounds; - std::bernoulli_distribution keepalive_dist; - crimson::net::MessengerRef msgr; - std::map> pending_conns; - std::map sessions; - crimson::auth::DummyAuthClientServer dummy_auth; - - Client(unsigned rounds, double keepalive_ratio) - : rounds(rounds), - keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {} - - PingSessionRef find_session(crimson::net::ConnectionRef c) { - auto found = sessions.find(c); - if (found == sessions.end()) { - ceph_assert(false); - } - return found->second; - } - void ms_handle_connect( crimson::net::ConnectionRef conn, seastar::shard_id prv_shard) override { + auto &local = container().local(); assert(prv_shard == seastar::this_shard_id()); auto session = seastar::make_shared(); - auto [i, added] = sessions.emplace(conn, session); + auto [i, added] = local.sessions.emplace(&*conn, session); std::ignore = i; ceph_assert(added); session->connected_time = mono_clock::now(); } + std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { - auto session = find_session(c); + auto &local = container().local(); + auto session = local.find_session(&*c); ++(session->count); if (verbose) { logger().info("client ms_dispatch {}", session->count); @@ -215,47 +264,27 @@ static seastar::future<> test_echo(unsigned rounds, if (session->count == rounds) { logger().info("{}: finished receiving {} pongs", *c, session->count); session->finish_time = mono_clock::now(); - auto found = pending_conns.find(c); - ceph_assert(found != pending_conns.end()); - found->second.set_value(); + gates.dispatch_in_background("echo_notify_done", [c, this] { + return container().invoke_on(primary_sid, [pconn=&*c](auto &local) { + auto found = local.pending_conns.find(pconn); + ceph_assert(found != local.pending_conns.end()); + found->second.set_value(); + }).then([c] {}); + }); } return {seastar::now()}; } - seastar::future<> init(const entity_name_t& name, - const std::string& lname, - const uint64_t nonce) { - msgr = crimson::net::Messenger::create( - name, lname, nonce, true); - msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - msgr->set_auth_client(&dummy_auth); - msgr->set_auth_server(&dummy_auth); - return msgr->start({this}); - } - - seastar::future<> shutdown() { - ceph_assert(msgr); - msgr->stop(); - return msgr->shutdown(); - } - - seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) { - mono_time start_time = mono_clock::now(); - auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD); - return seastar::futurize_invoke([this, conn] { - return do_dispatch_pingpong(conn); - }).then([this, conn, start_time] { - auto session = find_session(conn); - std::chrono::duration dur_handshake = session->connected_time - start_time; - std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; - logger().info("{}: handshake {}, pingpong {}", - *conn, dur_handshake.count(), dur_pingpong.count()); - }); + PingSessionRef find_session(crimson::net::Connection *c) { + auto found = sessions.find(c); + if (found == sessions.end()) { + ceph_assert(false); + } + return found->second; } - private: seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) { - auto [i, added] = pending_conns.emplace(conn, seastar::promise<>()); + auto [i, added] = pending_conns.emplace(&*conn, seastar::promise<>()); std::ignore = i; ceph_assert(added); return seastar::do_with(0u, 0u, @@ -271,29 +300,43 @@ static seastar::future<> test_echo(unsigned rounds, }, [this, conn, &count_ping, &count_keepalive] { return seastar::repeat([this, conn, &count_ping, &count_keepalive] { - if (keepalive_dist(rng)) { - return conn->send_keepalive() - .then([&count_keepalive] { - count_keepalive += 1; - return seastar::make_ready_future( - seastar::stop_iteration::no); - }); - } else { - return conn->send(crimson::make_message()) - .then([&count_ping] { - count_ping += 1; - return seastar::make_ready_future( - seastar::stop_iteration::yes); - }); - } - }); + if (keepalive_dist(rng)) { + return conn->send_keepalive( + ).then([&count_keepalive] { + count_keepalive += 1; + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + } else { + return conn->send(crimson::make_message() + ).then([&count_ping] { + count_ping += 1; + return seastar::make_ready_future( + seastar::stop_iteration::yes); + }); + } + }); }).then([this, conn] { - auto found = pending_conns.find(conn); + auto found = pending_conns.find(&*conn); + assert(found != pending_conns.end()); return found->second.get_future(); } ); }); } + + private: + // primary shard only + const seastar::shard_id primary_sid; + std::bernoulli_distribution keepalive_dist; + crimson::net::MessengerRef msgr; + std::map> pending_conns; + crimson::auth::DummyAuthClientServer dummy_auth; + + // per shard + const unsigned rounds; + std::map sessions; + ShardedGates &gates; }; }; @@ -301,10 +344,13 @@ static seastar::future<> test_echo(unsigned rounds, rounds, keepalive_ratio); return ShardedGates::create( ).then([rounds, keepalive_ratio](auto *gates) { + return seastar::when_all_succeed( + test_state::Client::create(rounds, keepalive_ratio, gates), + test_state::Client::create(rounds, keepalive_ratio, gates), + seastar::make_ready_future(gates)); + }).then_unpack([](auto *client1, auto *client2, auto *gates) { auto server1 = seastar::make_shared(*gates); auto server2 = seastar::make_shared(*gates); - auto client1 = seastar::make_shared(rounds, keepalive_ratio); - auto client2 = seastar::make_shared(rounds, keepalive_ratio); // start servers and clients auto addr1 = get_server_addr(); auto addr2 = get_server_addr(); @@ -318,13 +364,13 @@ static seastar::future<> test_echo(unsigned rounds, // dispatch pingpoing ).then_unpack([client1, client2, server1, server2] { return seastar::when_all_succeed( - // test connecting in parallel, accepting in parallel - client1->dispatch_pingpong(server2->msgr->get_myaddr()), - client2->dispatch_pingpong(server1->msgr->get_myaddr())); + // test connecting in parallel, accepting in parallel + client1->dispatch_pingpong(server1->msgr->get_myaddr()), + client1->dispatch_pingpong(server2->msgr->get_myaddr()), + client2->dispatch_pingpong(server1->msgr->get_myaddr()), + client2->dispatch_pingpong(server2->msgr->get_myaddr())); // shutdown - }).then_unpack([] { - return seastar::now(); - }).then([client1] { + }).then_unpack([client1] { logger().info("client1 shutdown..."); return client1->shutdown(); }).then([client2] { @@ -338,10 +384,10 @@ static seastar::future<> test_echo(unsigned rounds, return server2->shutdown(); }).then([] { logger().info("test_echo() done!\n"); - }).handle_exception([server1, server2, client1, client2] (auto eptr) { + }).handle_exception([](auto eptr) { logger().error("test_echo() failed: got exception {}", eptr); throw; - }).finally([gates] { + }).finally([gates, server1, server2] { return gates->close(); }); }); -- 2.39.5