]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: change the static IS_FIXED_CPU to runtime
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 26 Apr 2023 08:13:48 +0000 (16:13 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Otherwise the messenger implementation needs to be templated.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/test/crimson/test_socket.cc

index 342053a3615dbb5dc5ee388c7a55f4bea677b5e9..1b8e863f7209acb78090602cf84e1c6962b8751f 100644 (file)
@@ -340,27 +340,23 @@ Socket::try_trap_post(bp_action_t& trap) {
 }
 #endif
 
-#define SERVER_SOCKET ShardedServerSocket<IS_FIXED_CPU>
-
-template <bool IS_FIXED_CPU>
-SERVER_SOCKET::ShardedServerSocket(
+ShardedServerSocket::ShardedServerSocket(
     seastar::shard_id sid,
+    bool is_fixed_cpu,
     construct_tag)
-  : primary_sid{sid}
+  : primary_sid{sid}, is_fixed_cpu{is_fixed_cpu}
 {
 }
 
-template <bool IS_FIXED_CPU>
-SERVER_SOCKET::~ShardedServerSocket()
+ShardedServerSocket::~ShardedServerSocket()
 {
   assert(!listener);
   // detect whether user have called destroy() properly
   ceph_assert_always(!service);
 }
 
-template <bool IS_FIXED_CPU>
 listen_ertr::future<>
-SERVER_SOCKET::listen(entity_addr_t addr)
+ShardedServerSocket::listen(entity_addr_t addr)
 {
   ceph_assert_always(seastar::this_shard_id() == primary_sid);
   logger().debug("ShardedServerSocket({})::listen()...", addr);
@@ -369,7 +365,7 @@ SERVER_SOCKET::listen(entity_addr_t addr)
     seastar::socket_address s_addr(addr.in4_addr());
     seastar::listen_options lo;
     lo.reuse_address = true;
-    if constexpr (IS_FIXED_CPU) {
+    if (ss.is_fixed_cpu) {
       lo.set_fixed_cpu(ss.primary_sid);
     }
     ss.listener = seastar::listen(s_addr, lo);
@@ -391,9 +387,8 @@ SERVER_SOCKET::listen(entity_addr_t addr)
   });
 }
 
-template <bool IS_FIXED_CPU>
 seastar::future<>
-SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
+ShardedServerSocket::accept(accept_func_t &&_fn_accept)
 {
   ceph_assert_always(seastar::this_shard_id() == primary_sid);
   logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
@@ -407,10 +402,12 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
       return seastar::keep_doing([&ss] {
         return ss.listener->accept(
         ).then([&ss](seastar::accept_result accept_result) {
-          if constexpr (IS_FIXED_CPU) {
+#ifndef NDEBUG
+          if (ss.is_fixed_cpu) {
             // see seastar::listen_options::set_fixed_cpu()
             ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
           }
+#endif
           auto [socket, paddr] = std::move(accept_result);
           entity_addr_t peer_addr;
           peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
@@ -419,8 +416,8 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
               std::move(socket), Socket::side_t::acceptor,
               peer_addr.get_port(), Socket::construct_tag{});
           logger().debug("ShardedServerSocket({})::accept(): "
-                         "accepted peer {}, socket {}",
-                         ss.listen_addr, peer_addr, fmt::ptr(_socket));
+                         "accepted peer {}, socket {}, is_fixed = {}",
+                         ss.listen_addr, peer_addr, fmt::ptr(_socket), ss.is_fixed_cpu);
           std::ignore = seastar::with_gate(
               ss.shutdown_gate,
               [socket=std::move(_socket), peer_addr, &ss]() mutable {
@@ -462,9 +459,8 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
   });
 }
 
-template <bool IS_FIXED_CPU>
 seastar::future<>
-SERVER_SOCKET::shutdown_destroy()
+ShardedServerSocket::shutdown_destroy()
 {
   assert(seastar::this_shard_id() == primary_sid);
   logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
@@ -490,15 +486,14 @@ SERVER_SOCKET::shutdown_destroy()
   });
 }
 
-template <bool IS_FIXED_CPU>
-seastar::future<SERVER_SOCKET*>
-SERVER_SOCKET::create()
+seastar::future<ShardedServerSocket*>
+ShardedServerSocket::create(bool is_fixed_cpu)
 {
   auto primary_sid = seastar::this_shard_id();
   // start the sharded service: we should only construct/stop shards on #0
-  return seastar::smp::submit_to(0, [primary_sid] {
+  return seastar::smp::submit_to(0, [primary_sid, is_fixed_cpu] {
     auto service = std::make_unique<sharded_service_t>();
-    return service->start(primary_sid, construct_tag{}
+    return service->start(primary_sid, is_fixed_cpu, construct_tag{}
     ).then([service = std::move(service)]() mutable {
       auto p_shard = service.get();
       p_shard->local().service = std::move(service);
@@ -509,7 +504,4 @@ SERVER_SOCKET::create()
   });
 }
 
-template class ShardedServerSocket<true>;
-template class ShardedServerSocket<false>;
-
 } // namespace crimson::net
index 58a4484aa87c1252be4507f73610e0577f38431c..6241c9fbad265bcdf3f4245c6fd5d26d97360a14 100644 (file)
@@ -144,20 +144,22 @@ using listen_ertr = crimson::errorator<
   crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
   >;
 
-template <bool IS_FIXED_CPU>
 class ShardedServerSocket
-    : public seastar::peering_sharded_service<ShardedServerSocket<IS_FIXED_CPU> > {
+    : public seastar::peering_sharded_service<ShardedServerSocket> {
   struct construct_tag {};
 
 public:
-  ShardedServerSocket(seastar::shard_id sid, construct_tag);
+  ShardedServerSocket(seastar::shard_id sid, bool is_fixed_cpu, construct_tag);
 
   ~ShardedServerSocket();
 
   ShardedServerSocket(ShardedServerSocket&&) = delete;
   ShardedServerSocket(const ShardedServerSocket&) = delete;
+  ShardedServerSocket& operator=(ShardedServerSocket&&) = delete;
   ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
 
+  bool is_fixed() const { return is_fixed_cpu; }
+
   listen_ertr::future<> listen(entity_addr_t addr);
 
   using accept_func_t =
@@ -166,11 +168,12 @@ public:
 
   seastar::future<> shutdown_destroy();
 
-  static seastar::future<ShardedServerSocket*> create();
+  static seastar::future<ShardedServerSocket*> create(bool is_fixed_cpu);
 
 private:
-  // the fixed CPU if IS_FIXED_CPU is true
+  // the fixed CPU if is_fixed_cpu is true
   const seastar::shard_id primary_sid;
+  const bool is_fixed_cpu;
   entity_addr_t listen_addr;
   std::optional<seastar::server_socket> listener;
   seastar::gate shutdown_gate;
index b3856e6f9a352968df7a5125bf0ceeb67b48b800..97795652388f95b38738a4979550e649b271c104 100644 (file)
@@ -92,7 +92,7 @@ SocketMessenger::do_listen(const entity_addrvec_t& addrs)
   set_myaddrs(addrs);
   return seastar::futurize_invoke([this] {
     if (!listener) {
-      return ShardedServerSocket<true>::create(
+      return ShardedServerSocket::create(true
       ).then([this] (auto _listener) {
         listener = _listener;
       });
@@ -218,6 +218,7 @@ seastar::future<> SocketMessenger::start(
     ceph_assert(get_myaddr().get_port() > 0);
 
     return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
+      assert(listener->is_fixed());
       assert(seastar::this_shard_id() == sid);
       assert(get_myaddr().is_msgr2());
       SocketConnectionRef conn =
index 940894ce1b3d131ff80de60ad7b27199f54ed7de..6e749abac76ac52dd6433ae9e210cef5ea0a66e6 100644 (file)
@@ -29,7 +29,6 @@
 
 namespace crimson::net {
 
-template <bool IS_FIXED_CPU>
 class ShardedServerSocket;
 
 class SocketMessenger final : public Messenger {
@@ -169,7 +168,7 @@ private:
   crimson::auth::AuthClient* auth_client = nullptr;
   crimson::auth::AuthServer* auth_server = nullptr;
 
-  ShardedServerSocket<true> *listener = nullptr;
+  ShardedServerSocket *listener = nullptr;
   ChainedDispatchers dispatchers;
   std::map<entity_addr_t, SocketConnectionRef> connections;
   std::set<SocketConnectionRef> accepting_conns;
index 4ca75c6e961911be6d21aae2e55fa6c36286d696..8df0f1be747ea1770209f879654497ab1ab844da 100644 (file)
@@ -70,14 +70,14 @@ future<> test_refused() {
   });
 }
 
-template <bool IS_FIXED_CPU>
-future<> test_bind_same() {
+future<> test_bind_same(bool is_fixed_cpu) {
   logger().info("test_bind_same()...");
-  return ShardedServerSocket<IS_FIXED_CPU>::create().then([](auto pss1) {
+  return ShardedServerSocket::create(is_fixed_cpu
+  ).then([is_fixed_cpu](auto pss1) {
     auto saddr = get_server_addr();
-    return pss1->listen(saddr).safe_then([saddr] {
+    return pss1->listen(saddr).safe_then([saddr, is_fixed_cpu] {
       // try to bind the same address
-      return ShardedServerSocket<IS_FIXED_CPU>::create(
+      return ShardedServerSocket::create(is_fixed_cpu
       ).then([saddr](auto pss2) {
         return pss2->listen(saddr).safe_then([] {
           logger().error("test_bind_same() should raise address_in_use");
@@ -112,10 +112,9 @@ future<> test_bind_same() {
   });
 }
 
-template <bool IS_FIXED_CPU>
-future<> test_accept() {
+future<> test_accept(bool is_fixed_cpu) {
   logger().info("test_accept()");
-  return ShardedServerSocket<IS_FIXED_CPU>::create(
+  return ShardedServerSocket::create(is_fixed_cpu
   ).then([](auto pss) {
     auto saddr = get_server_addr();
     return pss->listen(saddr
@@ -157,27 +156,29 @@ future<> test_accept() {
   });
 }
 
-template <bool IS_FIXED_CPU>
 class SocketFactory {
   static constexpr seastar::shard_id CLIENT_CPU = 0u;
   SocketRef client_socket;
   seastar::promise<> server_connected;
 
   static constexpr seastar::shard_id SERVER_CPU = 1u;
-  ShardedServerSocket<IS_FIXED_CPU> *pss = nullptr;
+  ShardedServerSocket *pss = nullptr;
 
   seastar::shard_id server_socket_CPU;
   SocketRef server_socket;
 
  public:
   template <typename FuncC, typename FuncS>
-  static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
+  static future<> dispatch_sockets(
+      bool is_fixed_cpu,
+      FuncC&& cb_client,
+      FuncS&& cb_server) {
     ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU);
     auto owner = std::make_unique<SocketFactory>();
     auto psf = owner.get();
     auto saddr = get_server_addr();
-    return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] {
-      return ShardedServerSocket<IS_FIXED_CPU>::create(
+    return seastar::smp::submit_to(SERVER_CPU, [psf, saddr, is_fixed_cpu] {
+      return ShardedServerSocket::create(is_fixed_cpu
       ).then([psf, saddr](auto pss) {
         psf->pss = pss;
         return pss->listen(saddr
@@ -201,7 +202,7 @@ class SocketFactory {
             logger().info("dispatch_sockets(): accepted at shard {}",
                           seastar::this_shard_id());
             psf->server_socket_CPU = seastar::this_shard_id();
-            if constexpr (IS_FIXED_CPU) {
+            if (psf->pss->is_fixed()) {
               ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
             }
             psf->server_socket = std::move(socket);
@@ -426,10 +427,10 @@ class Connection {
   }
 };
 
-template <bool IS_FIXED_CPU>
-future<> test_read_write() {
+future<> test_read_write(bool is_fixed_cpu) {
   logger().info("test_read_write()...");
-  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+  return SocketFactory::dispatch_sockets(
+    is_fixed_cpu,
     [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
     [](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
   ).then([] {
@@ -440,10 +441,10 @@ future<> test_read_write() {
   });
 }
 
-template <bool IS_FIXED_CPU>
-future<> test_unexpected_down() {
+future<> test_unexpected_down(bool is_fixed_cpu) {
   logger().info("test_unexpected_down()...");
-  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+  return SocketFactory::dispatch_sockets(
+    is_fixed_cpu,
     [](auto cs) {
       return Connection::dispatch_rw_bounded(cs, 128, true
         ).handle_exception_type([](const std::system_error& e) {
@@ -460,10 +461,10 @@ future<> test_unexpected_down() {
   });
 }
 
-template <bool IS_FIXED_CPU>
-future<> test_shutdown_propagated() {
+future<> test_shutdown_propagated(bool is_fixed_cpu) {
   logger().info("test_shutdown_propagated()...");
-  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+  return SocketFactory::dispatch_sockets(
+    is_fixed_cpu,
     [](auto cs) {
       logger().debug("test_shutdown_propagated() shutdown client socket");
       cs->shutdown();
@@ -478,10 +479,10 @@ future<> test_shutdown_propagated() {
   });
 }
 
-template <bool IS_FIXED_CPU>
-future<> test_preemptive_down() {
+future<> test_preemptive_down(bool is_fixed_cpu) {
   logger().info("test_preemptive_down()...");
-  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+  return SocketFactory::dispatch_sockets(
+    is_fixed_cpu,
     [](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
     [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
   ).then([] {
@@ -492,19 +493,18 @@ future<> test_preemptive_down() {
   });
 }
 
-template <bool IS_FIXED_CPU>
-future<> do_test_with_type() {
-  return test_bind_same<IS_FIXED_CPU>(
-  ).then([] {
-    return test_accept<IS_FIXED_CPU>();
-  }).then([] {
-    return test_read_write<IS_FIXED_CPU>();
-  }).then([] {
-    return test_unexpected_down<IS_FIXED_CPU>();
-  }).then([] {
-    return test_shutdown_propagated<IS_FIXED_CPU>();
-  }).then([] {
-    return test_preemptive_down<IS_FIXED_CPU>();
+future<> do_test_with_type(bool is_fixed_cpu) {
+  return test_bind_same(is_fixed_cpu
+  ).then([is_fixed_cpu] {
+    return test_accept(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_read_write(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_unexpected_down(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_shutdown_propagated(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_preemptive_down(is_fixed_cpu);
   });
 }
 
@@ -527,9 +527,9 @@ seastar::future<int> do_test(seastar::app_template& app)
   }).then([] {
     return test_refused();
   }).then([] {
-    return do_test_with_type<true>();
+    return do_test_with_type(true);
   }).then([] {
-    return do_test_with_type<false>();
+    return do_test_with_type(false);
   }).then([] {
     logger().info("All tests succeeded");
     // Seastar has bugs to have events undispatched during shutdown,