]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net/socket: extend the sockets to multiple shards
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 10 Mar 2023 02:22:56 +0000 (10:22 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/test/crimson/test_socket.cc

index 842304c68f4bf8407c7be25b89f6fc3c766e9169..a28211911c865b5503c4b10511e1355ec87bb8f1 100644 (file)
@@ -241,7 +241,7 @@ close_and_handle_errors(seastar::output_stream<char>& out)
 seastar::future<>
 Socket::close() {
 #ifndef NDEBUG
-  ceph_assert(!closed);
+  ceph_assert_always(!closed);
   closed = true;
 #endif
   return seastar::when_all_succeed(
@@ -282,14 +282,14 @@ Socket::connect(const entity_addr_t &peer_addr)
 void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
   blocker = blocker_;
   if (type == bp_type_t::READ) {
-    ceph_assert(next_trap_read == bp_action_t::CONTINUE);
+    ceph_assert_always(next_trap_read == bp_action_t::CONTINUE);
     next_trap_read = action;
   } else { // type == bp_type_t::WRITE
     if (next_trap_write == bp_action_t::CONTINUE) {
       next_trap_write = action;
     } else if (next_trap_write == bp_action_t::FAULT) {
       // do_sweep_messages() may combine multiple write events into one socket write
-      ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
+      ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
     } else {
       ceph_abort();
     }
@@ -336,67 +336,77 @@ Socket::try_trap_post(bp_action_t& trap) {
 }
 #endif
 
-FixedCPUServerSocket::FixedCPUServerSocket(
-    seastar::shard_id cpu,
+#define SERVER_SOCKET ShardedServerSocket<IS_FIXED_CPU>
+
+template <bool IS_FIXED_CPU>
+SERVER_SOCKET::ShardedServerSocket(
+    seastar::shard_id sid,
     construct_tag)
-  : fixed_cpu{cpu}
+  : primary_sid{sid}
 {
 }
 
-FixedCPUServerSocket::~FixedCPUServerSocket()
+template <bool IS_FIXED_CPU>
+SERVER_SOCKET::~ShardedServerSocket()
 {
   assert(!listener);
   // detect whether user have called destroy() properly
-  ceph_assert(!service);
+  ceph_assert_always(!service);
 }
 
+template <bool IS_FIXED_CPU>
 listen_ertr::future<>
-FixedCPUServerSocket::listen(entity_addr_t addr)
+SERVER_SOCKET::listen(entity_addr_t addr)
 {
-  assert(seastar::this_shard_id() == fixed_cpu);
-  logger().debug("FixedCPUServerSocket({})::listen()...", addr);
-  return container().invoke_on_all([addr](auto& ss) {
+  ceph_assert_always(seastar::this_shard_id() == primary_sid);
+  logger().debug("ShardedServerSocket({})::listen()...", addr);
+  return this->container().invoke_on_all([addr](auto& ss) {
     ss.listen_addr = addr;
     seastar::socket_address s_addr(addr.in4_addr());
     seastar::listen_options lo;
     lo.reuse_address = true;
-    lo.set_fixed_cpu(ss.fixed_cpu);
+    if constexpr (IS_FIXED_CPU) {
+      lo.set_fixed_cpu(ss.primary_sid);
+    }
     ss.listener = seastar::listen(s_addr, lo);
   }).then([] {
     return listen_ertr::now();
   }).handle_exception_type(
     [addr](const std::system_error& e) -> listen_ertr::future<> {
     if (e.code() == std::errc::address_in_use) {
-      logger().debug("FixedCPUServerSocket({})::listen(): address in use", addr);
+      logger().debug("ShardedServerSocket({})::listen(): address in use", addr);
       return crimson::ct_error::address_in_use::make();
     } else if (e.code() == std::errc::address_not_available) {
-      logger().debug("FixedCPUServerSocket({})::listen(): address not available",
+      logger().debug("ShardedServerSocket({})::listen(): address not available",
                      addr);
       return crimson::ct_error::address_not_available::make();
     }
-    logger().error("FixedCPUServerSocket({})::listen(): "
+    logger().error("ShardedServerSocket({})::listen(): "
                    "got unexpeted error {}", addr, e.what());
     ceph_abort();
   });
 }
 
+template <bool IS_FIXED_CPU>
 seastar::future<>
-FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
+SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
 {
-  assert(seastar::this_shard_id() == fixed_cpu);
-  logger().debug("FixedCPUServerSocket({})::accept()...", listen_addr);
-  return container().invoke_on_all([_fn_accept](auto &ss) {
+  ceph_assert_always(seastar::this_shard_id() == primary_sid);
+  logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
+  return this->container().invoke_on_all([_fn_accept](auto &ss) {
     assert(ss.listener);
     ss.fn_accept = _fn_accept;
     // gate accepting
-    // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+    // ShardedServerSocket::shutdown() will drain the continuations in the gate
     // so ignore the returned future
     std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
       return seastar::keep_doing([&ss] {
         return ss.listener->accept(
         ).then([&ss](seastar::accept_result accept_result) {
-          // assert seastar::listen_options::set_fixed_cpu() works
-          assert(seastar::this_shard_id() == ss.fixed_cpu);
+          if constexpr (IS_FIXED_CPU) {
+            // see seastar::listen_options::set_fixed_cpu()
+            ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
+          }
           auto [socket, paddr] = std::move(accept_result);
           entity_addr_t peer_addr;
           peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
@@ -404,7 +414,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
           SocketRef _socket = std::make_unique<Socket>(
               std::move(socket), Socket::side_t::acceptor,
               peer_addr.get_port(), Socket::construct_tag{});
-          logger().debug("FixedCPUServerSocket({})::accept(): "
+          logger().debug("ShardedServerSocket({})::accept(): "
                          "accepted peer {}, socket {}",
                          ss.listen_addr, peer_addr, fmt::ptr(_socket));
           std::ignore = seastar::with_gate(
@@ -418,7 +428,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
               } catch (std::exception &e) {
                 e_what = e.what();
               }
-              logger().error("FixedCPUServerSocket({})::accept(): "
+              logger().error("ShardedServerSocket({})::accept(): "
                              "fn_accept(s, {}) got unexpected exception {}",
                              ss.listen_addr, peer_addr, e_what);
               ceph_abort();
@@ -428,7 +438,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
       }).handle_exception_type([&ss](const std::system_error& e) {
         if (e.code() == std::errc::connection_aborted ||
             e.code() == std::errc::invalid_argument) {
-          logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
+          logger().debug("ShardedServerSocket({})::accept(): stopped ({})",
                          ss.listen_addr, e.what());
         } else {
           throw;
@@ -440,7 +450,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
         } catch (std::exception &e) {
           e_what = e.what();
         }
-        logger().error("FixedCPUServerSocket({})::accept(): "
+        logger().error("ShardedServerSocket({})::accept(): "
                        "got unexpected exception {}", ss.listen_addr, e_what);
         ceph_abort();
       });
@@ -448,41 +458,43 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
   });
 }
 
+template <bool IS_FIXED_CPU>
 seastar::future<>
-FixedCPUServerSocket::shutdown_destroy()
+SERVER_SOCKET::shutdown_destroy()
 {
-  assert(seastar::this_shard_id() == fixed_cpu);
-  logger().debug("FixedCPUServerSocket({})::shutdown_destroy()...", listen_addr);
+  assert(seastar::this_shard_id() == primary_sid);
+  logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
   // shutdown shards
-  return container().invoke_on_all([](auto& ss) {
+  return this->container().invoke_on_all([](auto& ss) {
     if (ss.listener) {
       ss.listener->abort_accept();
     }
     return ss.shutdown_gate.close();
   }).then([this] {
     // destroy shards
-    return container().invoke_on_all([](auto& ss) {
+    return this->container().invoke_on_all([](auto& ss) {
       assert(ss.shutdown_gate.is_closed());
       ss.listen_addr = entity_addr_t();
       ss.listener.reset();
     });
   }).then([this] {
     // stop the sharded service: we should only construct/stop shards on #0
-    return container().invoke_on(0, [](auto& ss) {
+    return this->container().invoke_on(0, [](auto& ss) {
       assert(ss.service);
       return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
     });
   });
 }
 
-seastar::future<FixedCPUServerSocket*>
-FixedCPUServerSocket::create()
+template <bool IS_FIXED_CPU>
+seastar::future<SERVER_SOCKET*>
+SERVER_SOCKET::create()
 {
-  auto fixed_cpu = seastar::this_shard_id();
+  auto primary_sid = seastar::this_shard_id();
   // start the sharded service: we should only construct/stop shards on #0
-  return seastar::smp::submit_to(0, [fixed_cpu] {
+  return seastar::smp::submit_to(0, [primary_sid] {
     auto service = std::make_unique<sharded_service_t>();
-    return service->start(fixed_cpu, construct_tag{}
+    return service->start(primary_sid, construct_tag{}
     ).then([service = std::move(service)]() mutable {
       auto p_shard = service.get();
       p_shard->local().service = std::move(service);
@@ -493,4 +505,7 @@ FixedCPUServerSocket::create()
   });
 }
 
+template class ShardedServerSocket<true>;
+template class ShardedServerSocket<false>;
+
 } // namespace crimson::net
index f7393a9f34118f57355eaface1da0a405b191ebf..beb66c08bfb56b026f56f207fc964378e3181c19 100644 (file)
@@ -139,7 +139,7 @@ private:
   socket_blocker* blocker = nullptr;
 
 #endif
-  friend class FixedCPUServerSocket;
+  friend class ShardedServerSocket;
 };
 
 using listen_ertr = crimson::errorator<
@@ -147,18 +147,19 @@ using listen_ertr = crimson::errorator<
   crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
   >;
 
-class FixedCPUServerSocket
-    : public seastar::peering_sharded_service<FixedCPUServerSocket> {
+template <bool IS_FIXED_CPU>
+class ShardedServerSocket
+    : public seastar::peering_sharded_service<ShardedServerSocket<IS_FIXED_CPU> > {
   struct construct_tag {};
 
 public:
-  FixedCPUServerSocket(seastar::shard_id cpu, construct_tag);
+  ShardedServerSocket(seastar::shard_id sid, construct_tag);
 
-  ~FixedCPUServerSocket();
+  ~ShardedServerSocket();
 
-  FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
-  FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
-  FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+  ShardedServerSocket(ShardedServerSocket&&) = delete;
+  ShardedServerSocket(const ShardedServerSocket&) = delete;
+  ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
 
   listen_ertr::future<> listen(entity_addr_t addr);
 
@@ -168,16 +169,17 @@ public:
 
   seastar::future<> shutdown_destroy();
 
-  static seastar::future<FixedCPUServerSocket*> create();
+  static seastar::future<ShardedServerSocket*> create();
 
 private:
-  const seastar::shard_id fixed_cpu;
+  // the fixed CPU if IS_FIXED_CPU is true
+  const seastar::shard_id primary_sid;
   entity_addr_t listen_addr;
   std::optional<seastar::server_socket> listener;
   seastar::gate shutdown_gate;
   accept_func_t fn_accept;
 
-  using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+  using sharded_service_t = seastar::sharded<ShardedServerSocket>;
   std::unique_ptr<sharded_service_t> service;
 };
 
index f0607c9a44597fa4426fc209bfa1d8bac5c43063..8bc7ebbbedc16cd408747426b118898ede4b4412 100644 (file)
@@ -91,7 +91,8 @@ SocketMessenger::do_listen(const entity_addrvec_t& addrs)
   set_myaddrs(addrs);
   return seastar::futurize_invoke([this] {
     if (!listener) {
-      return FixedCPUServerSocket::create().then([this] (auto _listener) {
+      return ShardedServerSocket<true>::create(
+      ).then([this] (auto _listener) {
         listener = _listener;
       });
     } else {
index 4eebaab3080021dce7072faebf54a22465e16c8c..60510666a12cc2e2f54145a8903afc3e4c59b5fd 100644 (file)
@@ -29,7 +29,8 @@
 
 namespace crimson::net {
 
-class FixedCPUServerSocket;
+template <bool IS_FIXED_CPU>
+class ShardedServerSocket;
 
 class SocketMessenger final : public Messenger {
   const seastar::shard_id master_sid;
@@ -42,7 +43,7 @@ class SocketMessenger final : public Messenger {
   crimson::auth::AuthClient* auth_client = nullptr;
   crimson::auth::AuthServer* auth_server = nullptr;
 
-  FixedCPUServerSocket* listener = nullptr;
+  ShardedServerSocket<true> *listener = nullptr;
   ChainedDispatchers dispatchers;
   std::map<entity_addr_t, SocketConnectionRef> connections;
   std::set<SocketConnectionRef> accepting_conns;
index 916583cc5c5f239be3341531e48ccf4e7e8e219c..423ae0cf2554dabe41747dc2c86fe474aa4415b6 100644 (file)
@@ -24,8 +24,8 @@ using namespace std::chrono_literals;
 using seastar::engine;
 using seastar::future;
 using crimson::net::error;
-using crimson::net::FixedCPUServerSocket;
 using crimson::net::listen_ertr;
+using crimson::net::ShardedServerSocket;
 using crimson::net::Socket;
 using crimson::net::SocketRef;
 using crimson::net::stop_t;
@@ -70,13 +70,14 @@ future<> test_refused() {
   });
 }
 
+template <bool IS_FIXED_CPU>
 future<> test_bind_same() {
   logger().info("test_bind_same()...");
-  return FixedCPUServerSocket::create().then([](auto pss1) {
+  return ShardedServerSocket<IS_FIXED_CPU>::create().then([](auto pss1) {
     auto saddr = get_server_addr();
     return pss1->listen(saddr).safe_then([saddr] {
       // try to bind the same address
-      return FixedCPUServerSocket::create(
+      return ShardedServerSocket<IS_FIXED_CPU>::create(
       ).then([saddr](auto pss2) {
         return pss2->listen(saddr).safe_then([] {
           logger().error("test_bind_same() should raise address_in_use");
@@ -111,9 +112,10 @@ future<> test_bind_same() {
   });
 }
 
+template <bool IS_FIXED_CPU>
 future<> test_accept() {
   logger().info("test_accept()");
-  return FixedCPUServerSocket::create(
+  return ShardedServerSocket<IS_FIXED_CPU>::create(
   ).then([](auto pss) {
     auto saddr = get_server_addr();
     return pss->listen(saddr
@@ -155,13 +157,16 @@ future<> test_accept() {
   });
 }
 
+template <bool IS_FIXED_CPU>
 class SocketFactory {
   static constexpr seastar::shard_id CLIENT_CPU = 0u;
   SocketRef client_socket;
   seastar::promise<> server_connected;
 
   static constexpr seastar::shard_id SERVER_CPU = 1u;
-  FixedCPUServerSocket *pss = nullptr;
+  ShardedServerSocket<IS_FIXED_CPU> *pss = nullptr;
+
+  seastar::shard_id server_socket_CPU;
   SocketRef server_socket;
 
  public:
@@ -172,7 +177,7 @@ class SocketFactory {
     auto psf = owner.get();
     auto saddr = get_server_addr();
     return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] {
-      return FixedCPUServerSocket::create(
+      return ShardedServerSocket<IS_FIXED_CPU>::create(
       ).then([psf, saddr](auto pss) {
         psf->pss = pss;
         return pss->listen(saddr
@@ -195,7 +200,10 @@ class SocketFactory {
           return psf->pss->accept([psf](auto socket, auto paddr) {
             logger().info("dispatch_sockets(): accepted at shard {}",
                           seastar::this_shard_id());
-            ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
+            psf->server_socket_CPU = seastar::this_shard_id();
+            if constexpr (IS_FIXED_CPU) {
+              ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
+            }
             psf->server_socket = std::move(socket);
             return seastar::smp::submit_to(CLIENT_CPU, [psf] {
               psf->server_connected.set_value();
@@ -230,7 +238,7 @@ class SocketFactory {
             ceph_abort();
           });
         }),
-        seastar::smp::submit_to(SERVER_CPU,
+        seastar::smp::submit_to(psf->server_socket_CPU,
             [socket = psf->server_socket.get(), cb_server = std::move(cb_server)] {
           return cb_server(socket).then([socket] {
             logger().debug("closing server socket...");
@@ -416,9 +424,10 @@ class Connection {
   }
 };
 
+template <bool IS_FIXED_CPU>
 future<> test_read_write() {
   logger().info("test_read_write()...");
-  return SocketFactory::dispatch_sockets(
+  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
     [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
     [](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
   ).then([] {
@@ -429,9 +438,10 @@ future<> test_read_write() {
   });
 }
 
+template <bool IS_FIXED_CPU>
 future<> test_unexpected_down() {
   logger().info("test_unexpected_down()...");
-  return SocketFactory::dispatch_sockets(
+  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
     [](auto cs) {
       return Connection::dispatch_rw_bounded(cs, 128, true
         ).handle_exception_type([](const std::system_error& e) {
@@ -448,9 +458,10 @@ future<> test_unexpected_down() {
   });
 }
 
+template <bool IS_FIXED_CPU>
 future<> test_shutdown_propagated() {
   logger().info("test_shutdown_propagated()...");
-  return SocketFactory::dispatch_sockets(
+  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
     [](auto cs) {
       logger().debug("test_shutdown_propagated() shutdown client socket");
       cs->shutdown();
@@ -465,9 +476,10 @@ future<> test_shutdown_propagated() {
   });
 }
 
+template <bool IS_FIXED_CPU>
 future<> test_preemptive_down() {
   logger().info("test_preemptive_down()...");
-  return SocketFactory::dispatch_sockets(
+  return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
     [](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
     [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
   ).then([] {
@@ -478,6 +490,22 @@ future<> test_preemptive_down() {
   });
 }
 
+template <bool IS_FIXED_CPU>
+future<> do_test_with_type() {
+  return test_bind_same<IS_FIXED_CPU>(
+  ).then([] {
+    return test_accept<IS_FIXED_CPU>();
+  }).then([] {
+    return test_read_write<IS_FIXED_CPU>();
+  }).then([] {
+    return test_unexpected_down<IS_FIXED_CPU>();
+  }).then([] {
+    return test_shutdown_propagated<IS_FIXED_CPU>();
+  }).then([] {
+    return test_preemptive_down<IS_FIXED_CPU>();
+  });
+}
+
 }
 
 seastar::future<int> do_test(seastar::app_template& app)
@@ -497,17 +525,9 @@ seastar::future<int> do_test(seastar::app_template& app)
   }).then([] {
     return test_refused();
   }).then([] {
-    return test_bind_same();
-  }).then([] {
-    return test_accept();
-  }).then([] {
-    return test_read_write();
-  }).then([] {
-    return test_unexpected_down();
-  }).then([] {
-    return test_shutdown_propagated();
+    return do_test_with_type<true>();
   }).then([] {
-    return test_preemptive_down();
+    return do_test_with_type<false>();
   }).then([] {
     logger().info("All tests succeeded");
     // Seastar has bugs to have events undispatched during shutdown,