]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson/test_messenger: implement multi-shard test_echo::test_state::Server
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 7 Jul 2023 02:13:18 +0000 (10:13 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 9 Aug 2023 05:31:25 +0000 (13:31 +0800)
Also introduces ShardedGates.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_messenger.cc

index bba62cc7974d436c5a485c7419635cbfe7f7e3d7..ab98aa586bc4f19eb616a89016bd976ae7a9da31 100644 (file)
@@ -23,6 +23,7 @@
 #include <seastar/core/app-template.hh>
 #include <seastar/core/do_with.hh>
 #include <seastar/core/future-util.hh>
+#include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sleep.hh>
 #include <seastar/core/with_timeout.hh>
@@ -52,22 +53,91 @@ static entity_addr_t get_server_addr() {
   return saddr;
 }
 
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+  // we should only construct/stop shards on #0
+  return seastar::smp::submit_to(0, [=] {
+    auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+    return sharded_obj->start(args...
+    ).then([sharded_obj] {
+      seastar::engine().at_exit([sharded_obj] {
+        return sharded_obj->stop().then([sharded_obj] {});
+      });
+      return sharded_obj.get();
+    });
+  }).then([](seastar::sharded<T> *ptr_shard) {
+    return &ptr_shard->local();
+  });
+}
+
+class ShardedGates
+  : public seastar::peering_sharded_service<ShardedGates> {
+public:
+  ShardedGates() = default;
+  ~ShardedGates() {
+    assert(gate.is_closed());
+  }
+
+  template <typename Func>
+  void dispatch_in_background(const char *what, Func &&f) {
+    std::ignore = seastar::with_gate(
+      container().local().gate, std::forward<Func>(f)
+    ).handle_exception([what](std::exception_ptr eptr) {
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        logger().error("ShardedGates::dispatch_in_background: "
+                       "{} got exxception {}", what, e.what());
+      }
+    });
+  }
+
+  seastar::future<> close() {
+    return container().invoke_on_all([](auto &local) {
+      return local.gate.close();
+    });
+  }
+
+  static seastar::future<ShardedGates*> create() {
+    return create_sharded<ShardedGates>();
+  }
+
+  // seastar::future<> stop() is intentially not implemented
+
+private:
+  seastar::gate gate;
+};
+
 static seastar::future<> test_echo(unsigned rounds,
                                    double keepalive_ratio)
 {
   struct test_state {
     struct Server final
         : public crimson::net::Dispatcher {
+      ShardedGates &gates;
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
+      Server(ShardedGates &gates) : gates{gates} {}
+
+      void ms_handle_accept(
+          crimson::net::ConnectionRef conn,
+          seastar::shard_id new_shard,
+          bool is_replace) override {
+        logger().info("server accepted {}", *conn);
+        ceph_assert(new_shard == seastar::this_shard_id());
+        ceph_assert(!is_replace);
+      }
+
       std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
         if (verbose) {
           logger().info("server got {}", *m);
         }
         // reply with a pong
-        std::ignore = c->send(crimson::make_message<MPing>());
+        gates.dispatch_in_background("echo_send_pong", [c] {
+          return c->send(crimson::make_message<MPing>());
+        });
         return {seastar::now()};
       }
 
@@ -76,7 +146,7 @@ static seastar::future<> test_echo(unsigned rounds,
                              const uint64_t nonce,
                              const entity_addr_t& addr) {
         msgr = crimson::net::Messenger::create(
-            name, lname, nonce, true);
+            name, lname, nonce, false);
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
@@ -229,46 +299,51 @@ static seastar::future<> test_echo(unsigned rounds,
 
   logger().info("test_echo(rounds={}, keepalive_ratio={}):",
                 rounds, keepalive_ratio);
-  auto server1 = seastar::make_shared<test_state::Server>();
-  auto server2 = seastar::make_shared<test_state::Server>();
-  auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
-  auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
-  // start servers and clients
-  auto addr1 = get_server_addr();
-  auto addr2 = get_server_addr();
-  addr1.set_type(entity_addr_t::TYPE_MSGR2);
-  addr2.set_type(entity_addr_t::TYPE_MSGR2);
-  return seastar::when_all_succeed(
-      server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
-      server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
-      client1->init(entity_name_t::OSD(2), "client1", 3),
-      client2->init(entity_name_t::OSD(3), "client2", 4)
-  // dispatch pingpoing
-  ).then_unpack([client1, client2, server1, server2] {
+  return ShardedGates::create(
+  ).then([rounds, keepalive_ratio](auto *gates) {
+    auto server1 = seastar::make_shared<test_state::Server>(*gates);
+    auto server2 = seastar::make_shared<test_state::Server>(*gates);
+    auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+    auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+    // start servers and clients
+    auto addr1 = get_server_addr();
+    auto addr2 = get_server_addr();
+    addr1.set_type(entity_addr_t::TYPE_MSGR2);
+    addr2.set_type(entity_addr_t::TYPE_MSGR2);
     return seastar::when_all_succeed(
-        // test connecting in parallel, accepting in parallel
-        client1->dispatch_pingpong(server2->msgr->get_myaddr()),
-        client2->dispatch_pingpong(server1->msgr->get_myaddr()));
-  // shutdown
-  }).then_unpack([] {
-    return seastar::now();
-  }).then([client1] {
-    logger().info("client1 shutdown...");
-    return client1->shutdown();
-  }).then([client2] {
-    logger().info("client2 shutdown...");
-    return client2->shutdown();
-  }).then([server1] {
-    logger().info("server1 shutdown...");
-    return server1->shutdown();
-  }).then([server2] {
-    logger().info("server2 shutdown...");
-    return server2->shutdown();
-  }).then([] {
-    logger().info("test_echo() done!\n");
-  }).handle_exception([server1, server2, client1, client2] (auto eptr) {
-    logger().error("test_echo() failed: got exception {}", eptr);
-    throw;
+        server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+        server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+        client1->init(entity_name_t::OSD(2), "client1", 3),
+        client2->init(entity_name_t::OSD(3), "client2", 4)
+    // dispatch pingpoing
+    ).then_unpack([client1, client2, server1, server2] {
+      return seastar::when_all_succeed(
+          // test connecting in parallel, accepting in parallel
+          client1->dispatch_pingpong(server2->msgr->get_myaddr()),
+          client2->dispatch_pingpong(server1->msgr->get_myaddr()));
+    // shutdown
+    }).then_unpack([] {
+      return seastar::now();
+    }).then([client1] {
+      logger().info("client1 shutdown...");
+      return client1->shutdown();
+    }).then([client2] {
+      logger().info("client2 shutdown...");
+      return client2->shutdown();
+    }).then([server1] {
+      logger().info("server1 shutdown...");
+      return server1->shutdown();
+    }).then([server2] {
+      logger().info("server2 shutdown...");
+      return server2->shutdown();
+    }).then([] {
+      logger().info("test_echo() done!\n");
+    }).handle_exception([server1, server2, client1, client2] (auto eptr) {
+      logger().error("test_echo() failed: got exception {}", eptr);
+      throw;
+    }).finally([gates] {
+      return gates->close();
+    });
   });
 }