]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: drop the template from FixedCPUServerSocket::accept()
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 17 Feb 2023 09:03:02 +0000 (17:03 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:30 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 91c5ddc7ef06b1eaabddf06efc6ab8ed4859e27d)

src/crimson/net/Socket.h
src/crimson/net/SocketMessenger.cc
src/test/crimson/test_socket.cc

index b6125eb8a02a03a60fd39b0d79632e3cdc2f7d49..5b8f6517dd2a8b4c72fd07ead37173b2d2eca039 100644 (file)
@@ -206,6 +206,9 @@ class FixedCPUServerSocket
   entity_addr_t addr;
   std::optional<seastar::server_socket> listener;
   seastar::gate shutdown_gate;
+  using accept_func_t =
+    std::function<seastar::future<>(SocketRef, entity_addr_t)>;
+  accept_func_t fn_accept;
 
   using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
   std::unique_ptr<sharded_service_t> service;
@@ -238,24 +241,19 @@ public:
 
   listen_ertr::future<> listen(entity_addr_t addr);
 
-  // fn_accept should be a nothrow function of type
-  // seastar::future<>(SocketRef, entity_addr_t)
-  template <typename Func>
-  seastar::future<> accept(Func&& fn_accept) {
+  seastar::future<> accept(accept_func_t &&_fn_accept) {
     assert(seastar::this_shard_id() == cpu);
-    logger().trace("FixedCPUServerSocket({})::accept()...", addr);
-    return container().invoke_on_all(
-        [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
+    logger().debug("FixedCPUServerSocket({})::accept()...", addr);
+    return container().invoke_on_all([_fn_accept](auto &ss) {
       assert(ss.listener);
+      ss.fn_accept = _fn_accept;
       // gate accepting
       // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
       // so ignore the returned future
-      std::ignore = seastar::with_gate(ss.shutdown_gate,
-          [&ss, fn_accept = std::move(fn_accept)] () mutable {
-        return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
-          return ss.listener->accept().then(
-              [&ss, fn_accept = std::move(fn_accept)]
-              (seastar::accept_result accept_result) mutable {
+      std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
+        return seastar::keep_doing([&ss] {
+          return ss.listener->accept(
+          ).then([&ss](seastar::accept_result accept_result) {
             // assert seastar::listen_options::set_fixed_cpu() works
             assert(seastar::this_shard_id() == ss.cpu);
             auto [socket, paddr] = std::move(accept_result);
@@ -265,31 +263,44 @@ public:
             SocketRef _socket = std::make_unique<Socket>(
                 std::move(socket), Socket::side_t::acceptor,
                 peer_addr.get_port(), Socket::construct_tag{});
-            std::ignore = seastar::with_gate(ss.shutdown_gate,
-                [socket = std::move(_socket), peer_addr,
-                 &ss, fn_accept = std::move(fn_accept)] () mutable {
-              logger().trace("FixedCPUServerSocket({})::accept(): "
-                             "accepted peer {}", ss.addr, peer_addr);
-              return fn_accept(std::move(socket), peer_addr
-              ).handle_exception([&ss, peer_addr] (auto eptr) {
+            logger().debug("FixedCPUServerSocket({})::accept(): "
+                           "accepted peer {}, socket {}",
+                           ss.addr, peer_addr, fmt::ptr(_socket));
+            std::ignore = seastar::with_gate(
+                ss.shutdown_gate,
+                [socket=std::move(_socket), peer_addr, &ss]() mutable {
+              return ss.fn_accept(std::move(socket), peer_addr
+              ).handle_exception([&ss, peer_addr](auto eptr) {
+                const char *e_what;
+                try {
+                  std::rethrow_exception(eptr);
+                } catch (std::exception &e) {
+                  e_what = e.what();
+                }
                 logger().error("FixedCPUServerSocket({})::accept(): "
                                "fn_accept(s, {}) got unexpected exception {}",
-                               ss.addr, peer_addr, eptr);
+                               ss.addr, peer_addr, e_what);
                 ceph_abort();
               });
             });
           });
-        }).handle_exception_type([&ss] (const std::system_error& e) {
+        }).handle_exception_type([&ss](const std::system_error& e) {
           if (e.code() == std::errc::connection_aborted ||
               e.code() == std::errc::invalid_argument) {
-            logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
-                           ss.addr, e);
+            logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
+                           ss.addr, e.what());
           } else {
             throw;
           }
-        }).handle_exception([&ss] (auto eptr) {
+        }).handle_exception([&ss](auto eptr) {
+          const char *e_what;
+          try {
+            std::rethrow_exception(eptr);
+          } catch (std::exception &e) {
+            e_what = e.what();
+          }
           logger().error("FixedCPUServerSocket({})::accept(): "
-                         "got unexpected exception {}", ss.addr, eptr);
+                         "got unexpected exception {}", ss.addr, e_what);
           ceph_abort();
         });
       });
index a112b50800d4a0f6c6daca0127aa209496485112..8a83c1d59cefffdf6f2cad09a54dbb736aceb9ea 100644 (file)
@@ -214,7 +214,7 @@ seastar::future<> SocketMessenger::start(
     ceph_assert(get_myaddr().is_msgr2());
     ceph_assert(get_myaddr().get_port() > 0);
 
-    return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
+    return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
       assert(seastar::this_shard_id() == master_sid);
       assert(get_myaddr().is_msgr2());
       SocketConnectionRef conn =
index 5fd596a09282100141cd2013d403e6aa32ad728d..4861f0c91fa789760b433efea7b4ceffcf7acf1b 100644 (file)
@@ -115,10 +115,12 @@ future<> test_accept() {
   return FixedCPUServerSocket::create().then([] (auto pss) {
     auto saddr = get_server_addr();
     return pss->listen(saddr).safe_then([pss] {
-      return pss->accept([] (auto socket, auto paddr) {
+      return pss->accept([](auto socket, auto paddr) {
         // simple accept
-        return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
-          return socket->close().finally([cleanup = std::move(socket)] {});
+        return seastar::sleep(100ms
+        ).then([socket = std::move(socket)]() mutable {
+          return socket->close(
+          ).finally([cleanup = std::move(socket)] {});
         });
       });
     }, listen_ertr::all_same_way(