]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: cleanups to Socket.h/cc
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 20 Feb 2023 01:41:58 +0000 (09:41 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/crimson/net/SocketMessenger.cc
src/test/crimson/test_socket.cc

index 6434a407f22a56c6b2c3f75a8d12eedcf181e7ad..842304c68f4bf8407c7be25b89f6fc3c766e9169 100644 (file)
@@ -19,6 +19,9 @@ seastar::logger& logger() {
   return crimson::get_logger(ceph_subsys_ms);
 }
 
+using tmp_buf = Socket::tmp_buf;
+using packet = Socket::packet;
+
 // an input_stream consumer that reads buffer segments into a bufferlist up to
 // the given number of remaining bytes
 struct bufferlist_consumer {
@@ -28,7 +31,6 @@ struct bufferlist_consumer {
   bufferlist_consumer(bufferlist& bl, size_t& remaining)
     : bl(bl), remaining(remaining) {}
 
-  using tmp_buf = seastar::temporary_buffer<char>;
   using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
 
   // consume some or all of a buffer segment
@@ -59,9 +61,58 @@ struct bufferlist_consumer {
   };
 };
 
+seastar::future<> inject_delay()
+{
+  if (float delay_period = local_conf()->ms_inject_internal_delays;
+      delay_period) {
+    logger().debug("Socket::inject_delay: sleep for {}", delay_period);
+    return seastar::sleep(
+      std::chrono::milliseconds((int)(delay_period * 1000.0)));
+  }
+  return seastar::now();
+}
+
+void inject_failure()
+{
+  if (local_conf()->ms_inject_socket_failures) {
+    uint64_t rand =
+      ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
+    if (rand % local_conf()->ms_inject_socket_failures == 0) {
+      logger().warn("Socket::inject_failure: injecting socket failure");
+      throw std::system_error(make_error_code(
+        error::negotiation_failure));
+    }
+  }
+}
+
 } // anonymous namespace
 
-seastar::future<bufferlist> Socket::read(size_t bytes)
+Socket::Socket(
+    seastar::connected_socket &&_socket,
+    side_t _side,
+    uint16_t e_port,
+    construct_tag)
+  : sid{seastar::this_shard_id()},
+    socket(std::move(_socket)),
+    in(socket.input()),
+    // the default buffer size 8192 is too small that may impact our write
+    // performance. see seastar::net::connected_socket::output()
+    out(socket.output(65536)),
+    socket_is_shutdown(false),
+    side(_side),
+    ephemeral_port(e_port)
+{
+}
+
+Socket::~Socket()
+{
+#ifndef NDEBUG
+  assert(closed);
+#endif
+}
+
+seastar::future<bufferlist>
+Socket::read(size_t bytes)
 {
 #ifdef UNIT_TESTS_BUILT
   return try_trap_pre(next_trap_read).then([bytes, this] {
@@ -81,9 +132,9 @@ seastar::future<bufferlist> Socket::read(size_t bytes)
       });
     });
 #ifdef UNIT_TESTS_BUILT
-  }).then([this] (auto buf) {
+  }).then([this](auto buf) {
     return try_trap_post(next_trap_read
-    ).then([buf = std::move(buf)] () mutable {
+    ).then([buf = std::move(buf)]() mutable {
       return std::move(buf);
     });
   });
@@ -104,20 +155,70 @@ Socket::read_exactly(size_t bytes) {
       }
       inject_failure();
       return inject_delay(
-      ).then([buf = std::move(buf)] () mutable {
+      ).then([buf = std::move(buf)]() mutable {
         return seastar::make_ready_future<tmp_buf>(std::move(buf));
       });
     });
 #ifdef UNIT_TESTS_BUILT
-  }).then([this] (auto buf) {
+  }).then([this](auto buf) {
     return try_trap_post(next_trap_read
-    ).then([buf = std::move(buf)] () mutable {
+    ).then([buf = std::move(buf)]() mutable {
       return std::move(buf);
     });
   });
 #endif
 }
 
+seastar::future<>
+Socket::write(packet &&buf)
+{
+#ifdef UNIT_TESTS_BUILT
+  return try_trap_pre(next_trap_write
+  ).then([buf = std::move(buf), this]() mutable {
+#endif
+    inject_failure();
+    return inject_delay(
+    ).then([buf = std::move(buf), this]() mutable {
+      return out.write(std::move(buf));
+    });
+#ifdef UNIT_TESTS_BUILT
+  }).then([this] {
+    return try_trap_post(next_trap_write);
+  });
+#endif
+}
+
+seastar::future<>
+Socket::flush()
+{
+  inject_failure();
+  return inject_delay().then([this] {
+    return out.flush();
+  });
+}
+
+seastar::future<>
+Socket::write_flush(packet &&buf)
+{
+#ifdef UNIT_TESTS_BUILT
+  return try_trap_pre(next_trap_write
+  ).then([buf = std::move(buf), this]() mutable {
+#endif
+    inject_failure();
+    return inject_delay(
+    ).then([buf = std::move(buf), this]() mutable {
+      return out.write(std::move(buf)
+      ).then([this] {
+        return out.flush();
+      });
+    });
+#ifdef UNIT_TESTS_BUILT
+  }).then([this] {
+    return try_trap_post(next_trap_write);
+  });
+#endif
+}
+
 void Socket::shutdown() {
   socket_is_shutdown = true;
   socket.shutdown_input();
@@ -127,17 +228,18 @@ void Socket::shutdown() {
 static inline seastar::future<>
 close_and_handle_errors(seastar::output_stream<char>& out)
 {
-  return out.close().handle_exception_type([] (const std::system_error& e) {
+  return out.close().handle_exception_type([](const std::system_error& e) {
     if (e.code() != std::errc::broken_pipe &&
         e.code() != std::errc::connection_reset) {
-      logger().error("Socket::close(): unexpected error {}", e);
+      logger().error("Socket::close(): unexpected error {}", e.what());
       ceph_abort();
     }
     // can happen when out is already shutdown, ignore
   });
 }
 
-seastar::future<> Socket::close() {
+seastar::future<>
+Socket::close() {
 #ifndef NDEBUG
   ceph_assert(!closed);
   closed = true;
@@ -148,39 +250,54 @@ seastar::future<> Socket::close() {
     close_and_handle_errors(out)
   ).then_unpack([] {
     return seastar::make_ready_future<>();
-  }).handle_exception([] (auto eptr) {
-    logger().error("Socket::close(): unexpected exception {}", eptr);
+  }).handle_exception([](auto eptr) {
+    const char *e_what;
+    try {
+      std::rethrow_exception(eptr);
+    } catch (std::exception &e) {
+      e_what = e.what();
+    }
+    logger().error("Socket::close(): unexpected exception {}", e_what);
     ceph_abort();
   });
 }
 
-seastar::future<> Socket::inject_delay () {
-  if (float delay_period = local_conf()->ms_inject_internal_delays;
-      delay_period) {
-    logger().debug("Socket::inject_delay: sleep for {}", delay_period);
-    return seastar::sleep(
-      std::chrono::milliseconds((int)(delay_period * 1000.0)));
-  }
-  return seastar::now();
+seastar::future<SocketRef>
+Socket::connect(const entity_addr_t &peer_addr)
+{
+  inject_failure();
+  return inject_delay(
+  ).then([peer_addr] {
+    return seastar::connect(peer_addr.in4_addr());
+  }).then([peer_addr](seastar::connected_socket socket) {
+    auto ret = std::make_unique<Socket>(
+      std::move(socket), side_t::connector, 0, construct_tag{});
+    logger().debug("Socket::connect(): connected to {}, socket {}",
+                   peer_addr, fmt::ptr(ret));
+    return ret;
+  });
 }
 
-void Socket::inject_failure()
-{
-  if (local_conf()->ms_inject_socket_failures) {
-    uint64_t rand =
-      ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
-    if (rand % local_conf()->ms_inject_socket_failures == 0) {
-      if (true) {
-        logger().warn("Socket::inject_failure: injecting socket failure");
-       throw std::system_error(make_error_code(
-         crimson::net::error::negotiation_failure));
-      }
+#ifdef UNIT_TESTS_BUILT
+void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
+  blocker = blocker_;
+  if (type == bp_type_t::READ) {
+    ceph_assert(next_trap_read == bp_action_t::CONTINUE);
+    next_trap_read = action;
+  } else { // type == bp_type_t::WRITE
+    if (next_trap_write == bp_action_t::CONTINUE) {
+      next_trap_write = action;
+    } else if (next_trap_write == bp_action_t::FAULT) {
+      // do_sweep_messages() may combine multiple write events into one socket write
+      ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
+    } else {
+      ceph_abort();
     }
   }
 }
 
-#ifdef UNIT_TESTS_BUILT
-seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
+seastar::future<>
+Socket::try_trap_pre(bp_action_t& trap) {
   auto action = trap;
   trap = bp_action_t::CONTINUE;
   switch (action) {
@@ -188,7 +305,7 @@ seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
     break;
    case bp_action_t::FAULT:
     logger().info("[Test] got FAULT");
-    throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+    throw std::system_error(make_error_code(error::negotiation_failure));
    case bp_action_t::BLOCK:
     logger().info("[Test] got BLOCK");
     return blocker->block();
@@ -201,7 +318,8 @@ seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
   return seastar::make_ready_future<>();
 }
 
-seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
+seastar::future<>
+Socket::try_trap_post(bp_action_t& trap) {
   auto action = trap;
   trap = bp_action_t::CONTINUE;
   switch (action) {
@@ -216,94 +334,161 @@ seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
   }
   return seastar::make_ready_future<>();
 }
+#endif
 
-void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
-  blocker = blocker_;
-  if (type == bp_type_t::READ) {
-    ceph_assert(next_trap_read == bp_action_t::CONTINUE);
-    next_trap_read = action;
-  } else { // type == bp_type_t::WRITE
-    if (next_trap_write == bp_action_t::CONTINUE) {
-      next_trap_write = action;
-    } else if (next_trap_write == bp_action_t::FAULT) {
-      // do_sweep_messages() may combine multiple write events into one socket write
-      ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
-    } else {
-      ceph_abort();
-    }
-  }
+FixedCPUServerSocket::FixedCPUServerSocket(
+    seastar::shard_id cpu,
+    construct_tag)
+  : fixed_cpu{cpu}
+{
 }
-#endif
 
-crimson::net::listen_ertr::future<>
+FixedCPUServerSocket::~FixedCPUServerSocket()
+{
+  assert(!listener);
+  // detect whether user have called destroy() properly
+  ceph_assert(!service);
+}
+
+listen_ertr::future<>
 FixedCPUServerSocket::listen(entity_addr_t addr)
 {
-  assert(seastar::this_shard_id() == cpu);
-  logger().trace("FixedCPUServerSocket::listen({})...", addr);
-  return container().invoke_on_all([addr] (auto& ss) {
-    ss.addr = addr;
+  assert(seastar::this_shard_id() == fixed_cpu);
+  logger().debug("FixedCPUServerSocket({})::listen()...", addr);
+  return container().invoke_on_all([addr](auto& ss) {
+    ss.listen_addr = addr;
     seastar::socket_address s_addr(addr.in4_addr());
     seastar::listen_options lo;
     lo.reuse_address = true;
-    lo.set_fixed_cpu(ss.cpu);
+    lo.set_fixed_cpu(ss.fixed_cpu);
     ss.listener = seastar::listen(s_addr, lo);
   }).then([] {
     return listen_ertr::now();
   }).handle_exception_type(
-    [addr] (const std::system_error& e) -> listen_ertr::future<> {
+    [addr](const std::system_error& e) -> listen_ertr::future<> {
     if (e.code() == std::errc::address_in_use) {
-      logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
+      logger().debug("FixedCPUServerSocket({})::listen(): address in use", addr);
       return crimson::ct_error::address_in_use::make();
     } else if (e.code() == std::errc::address_not_available) {
-      logger().trace("FixedCPUServerSocket::listen({}): address not available",
+      logger().debug("FixedCPUServerSocket({})::listen(): address not available",
                      addr);
       return crimson::ct_error::address_not_available::make();
     }
-    logger().error("FixedCPUServerSocket::listen({}): "
-                   "got unexpeted error {}", addr, e);
+    logger().error("FixedCPUServerSocket({})::listen(): "
+                   "got unexpeted error {}", addr, e.what());
     ceph_abort();
   });
 }
 
-seastar::future<> FixedCPUServerSocket::shutdown()
+seastar::future<>
+FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
+{
+  assert(seastar::this_shard_id() == fixed_cpu);
+  logger().debug("FixedCPUServerSocket({})::accept()...", listen_addr);
+  return container().invoke_on_all([_fn_accept](auto &ss) {
+    assert(ss.listener);
+    ss.fn_accept = _fn_accept;
+    // gate accepting
+    // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+    // so ignore the returned future
+    std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
+      return seastar::keep_doing([&ss] {
+        return ss.listener->accept(
+        ).then([&ss](seastar::accept_result accept_result) {
+          // assert seastar::listen_options::set_fixed_cpu() works
+          assert(seastar::this_shard_id() == ss.fixed_cpu);
+          auto [socket, paddr] = std::move(accept_result);
+          entity_addr_t peer_addr;
+          peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+          peer_addr.set_type(ss.listen_addr.get_type());
+          SocketRef _socket = std::make_unique<Socket>(
+              std::move(socket), Socket::side_t::acceptor,
+              peer_addr.get_port(), Socket::construct_tag{});
+          logger().debug("FixedCPUServerSocket({})::accept(): "
+                         "accepted peer {}, socket {}",
+                         ss.listen_addr, peer_addr, fmt::ptr(_socket));
+          std::ignore = seastar::with_gate(
+              ss.shutdown_gate,
+              [socket=std::move(_socket), peer_addr, &ss]() mutable {
+            return ss.fn_accept(std::move(socket), peer_addr
+            ).handle_exception([&ss, peer_addr](auto eptr) {
+              const char *e_what;
+              try {
+                std::rethrow_exception(eptr);
+              } catch (std::exception &e) {
+                e_what = e.what();
+              }
+              logger().error("FixedCPUServerSocket({})::accept(): "
+                             "fn_accept(s, {}) got unexpected exception {}",
+                             ss.listen_addr, peer_addr, e_what);
+              ceph_abort();
+            });
+          });
+        });
+      }).handle_exception_type([&ss](const std::system_error& e) {
+        if (e.code() == std::errc::connection_aborted ||
+            e.code() == std::errc::invalid_argument) {
+          logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
+                         ss.listen_addr, e.what());
+        } else {
+          throw;
+        }
+      }).handle_exception([&ss](auto eptr) {
+        const char *e_what;
+        try {
+          std::rethrow_exception(eptr);
+        } catch (std::exception &e) {
+          e_what = e.what();
+        }
+        logger().error("FixedCPUServerSocket({})::accept(): "
+                       "got unexpected exception {}", ss.listen_addr, e_what);
+        ceph_abort();
+      });
+    });
+  });
+}
+
+seastar::future<>
+FixedCPUServerSocket::shutdown_destroy()
 {
-  assert(seastar::this_shard_id() == cpu);
-  logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
-  return container().invoke_on_all([] (auto& ss) {
+  assert(seastar::this_shard_id() == fixed_cpu);
+  logger().debug("FixedCPUServerSocket({})::shutdown_destroy()...", listen_addr);
+  // shutdown shards
+  return container().invoke_on_all([](auto& ss) {
     if (ss.listener) {
       ss.listener->abort_accept();
     }
     return ss.shutdown_gate.close();
   }).then([this] {
-    return reset();
-  });
-}
-
-seastar::future<> FixedCPUServerSocket::destroy()
-{
-  assert(seastar::this_shard_id() == cpu);
-  return shutdown().then([this] {
-    // we should only construct/stop shards on #0
-    return container().invoke_on(0, [] (auto& ss) {
+    // destroy shards
+    return container().invoke_on_all([](auto& ss) {
+      assert(ss.shutdown_gate.is_closed());
+      ss.listen_addr = entity_addr_t();
+      ss.listener.reset();
+    });
+  }).then([this] {
+    // stop the sharded service: we should only construct/stop shards on #0
+    return container().invoke_on(0, [](auto& ss) {
       assert(ss.service);
       return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
     });
   });
 }
 
-seastar::future<FixedCPUServerSocket*> FixedCPUServerSocket::create()
+seastar::future<FixedCPUServerSocket*>
+FixedCPUServerSocket::create()
 {
-  auto cpu = seastar::this_shard_id();
-  // we should only construct/stop shards on #0
-  return seastar::smp::submit_to(0, [cpu] {
+  auto fixed_cpu = seastar::this_shard_id();
+  // start the sharded service: we should only construct/stop shards on #0
+  return seastar::smp::submit_to(0, [fixed_cpu] {
     auto service = std::make_unique<sharded_service_t>();
-    return service->start(cpu, construct_tag{}
-    ).then([service = std::move(service)] () mutable {
+    return service->start(fixed_cpu, construct_tag{}
+    ).then([service = std::move(service)]() mutable {
       auto p_shard = service.get();
       p_shard->local().service = std::move(service);
       return p_shard;
     });
-  }).then([] (auto p_shard) {
+  }).then([](auto p_shard) {
     return &p_shard->local();
   });
 }
index 5b8f6517dd2a8b4c72fd07ead37173b2d2eca039..f7393a9f34118f57355eaface1da0a405b191ebf 100644 (file)
@@ -23,96 +23,59 @@ namespace crimson::net {
 class Socket;
 using SocketRef = std::unique_ptr<Socket>;
 
-class Socket
-{
+class Socket {
   struct construct_tag {};
 
- public:
+public:
   // if acceptor side, peer is using a different port (ephemeral_port)
   // if connector side, I'm using a different port (ephemeral_port)
   enum class side_t {
     acceptor,
     connector
   };
+  Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag);
 
-  Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
-    : sid{seastar::this_shard_id()},
-      socket(std::move(_socket)),
-      in(socket.input()),
-      // the default buffer size 8192 is too small that may impact our write
-      // performance. see seastar::net::connected_socket::output()
-      out(socket.output(65536)),
-      socket_is_shutdown(false),
-      side(_side),
-      ephemeral_port(e_port) {}
-
-  ~Socket() {
-#ifndef NDEBUG
-    assert(closed);
-#endif
-  }
+  ~Socket();
 
   Socket(Socket&& o) = delete;
 
-  static seastar::future<SocketRef>
-  connect(const entity_addr_t& peer_addr) {
-    inject_failure();
-    return inject_delay(
-    ).then([peer_addr] {
-      return seastar::connect(peer_addr.in4_addr());
-    }).then([] (seastar::connected_socket socket) {
-      return std::make_unique<Socket>(
-        std::move(socket), side_t::connector, 0, construct_tag{});
-    });
+  side_t get_side() const {
+    return side;
+  }
+
+  uint16_t get_ephemeral_port() const {
+    return ephemeral_port;
+  }
+
+  seastar::socket_address get_local_address() const {
+    return socket.local_address();
+  }
+
+  bool is_shutdown() const {
+    return socket_is_shutdown;
+  }
+
+  // learn my ephemeral_port as connector.
+  // unfortunately, there's no way to identify which port I'm using as
+  // connector with current seastar interface.
+  void learn_ephemeral_port_as_connector(uint16_t port) {
+    assert(side == side_t::connector &&
+           (ephemeral_port == 0 || ephemeral_port == port));
+    ephemeral_port = port;
   }
 
   /// read the requested number of bytes into a bufferlist
   seastar::future<bufferlist> read(size_t bytes);
+
   using tmp_buf = seastar::temporary_buffer<char>;
   using packet = seastar::net::packet;
   seastar::future<tmp_buf> read_exactly(size_t bytes);
 
-  seastar::future<> write(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
-    return try_trap_pre(next_trap_write
-    ).then([buf = std::move(buf), this] () mutable {
-#endif
-      inject_failure();
-      return inject_delay(
-      ).then([buf = std::move(buf), this] () mutable {
-        return out.write(std::move(buf));
-      });
-#ifdef UNIT_TESTS_BUILT
-    }).then([this] {
-      return try_trap_post(next_trap_write);
-    });
-#endif
-  }
-  seastar::future<> flush() {
-    inject_failure();
-    return inject_delay().then([this] {
-      return out.flush();
-    });
-  }
-  seastar::future<> write_flush(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
-    return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
-#endif
-      inject_failure();
-      return inject_delay(
-      ).then([buf = std::move(buf), this] () mutable {
-        return out.write(std::move(buf)).then([this] { return out.flush(); });
-      });
-#ifdef UNIT_TESTS_BUILT
-    }).then([this] {
-      return try_trap_post(next_trap_write);
-    });
-#endif
-  }
+  seastar::future<> write(packet &&buf);
 
-  bool is_shutdown() const {
-    return socket_is_shutdown;
-  }
+  seastar::future<> flush();
+
+  seastar::future<> write_flush(packet &&buf);
 
   // preemptively disable further reads or writes, can only be shutdown once.
   void shutdown();
@@ -120,9 +83,12 @@ class Socket
   /// Socket can only be closed once.
   seastar::future<> close();
 
-  static seastar::future<> inject_delay();
+  static seastar::future<SocketRef>
+  connect(const entity_addr_t& peer_addr);
 
-  static void inject_failure();
+  /*
+   * test interfaces
+   */
 
   // shutdown for tests
   void force_shutdown() {
@@ -140,28 +106,7 @@ class Socket
     socket.shutdown_output();
   }
 
-  side_t get_side() const {
-    return side;
-  }
-
-  uint16_t get_ephemeral_port() const {
-    return ephemeral_port;
-  }
-
-  // learn my ephemeral_port as connector.
-  // unfortunately, there's no way to identify which port I'm using as
-  // connector with current seastar interface.
-  void learn_ephemeral_port_as_connector(uint16_t port) {
-    assert(side == side_t::connector &&
-           (ephemeral_port == 0 || ephemeral_port == port));
-    ephemeral_port = port;
-  }
-
-  seastar::socket_address get_local_address() const {
-    return socket.local_address();
-  }
-
- private:
+private:
   const seastar::shard_id sid;
   seastar::connected_socket socket;
   seastar::input_stream<char> in;
@@ -181,15 +126,17 @@ class Socket
   } r;
 
 #ifdef UNIT_TESTS_BUILT
- public:
+public:
   void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
 
- private:
+private:
+  seastar::future<> try_trap_pre(bp_action_t& trap);
+
+  seastar::future<> try_trap_post(bp_action_t& trap);
+
   bp_action_t next_trap_read = bp_action_t::CONTINUE;
   bp_action_t next_trap_write = bp_action_t::CONTINUE;
   socket_blocker* blocker = nullptr;
-  seastar::future<> try_trap_pre(bp_action_t& trap);
-  seastar::future<> try_trap_post(bp_action_t& trap);
 
 #endif
   friend class FixedCPUServerSocket;
@@ -202,38 +149,12 @@ using listen_ertr = crimson::errorator<
 
 class FixedCPUServerSocket
     : public seastar::peering_sharded_service<FixedCPUServerSocket> {
-  const seastar::shard_id cpu;
-  entity_addr_t addr;
-  std::optional<seastar::server_socket> listener;
-  seastar::gate shutdown_gate;
-  using accept_func_t =
-    std::function<seastar::future<>(SocketRef, entity_addr_t)>;
-  accept_func_t fn_accept;
-
-  using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
-  std::unique_ptr<sharded_service_t> service;
-
   struct construct_tag {};
 
-  static seastar::logger& logger() {
-    return crimson::get_logger(ceph_subsys_ms);
-  }
-
-  seastar::future<> reset() {
-    return container().invoke_on_all([] (auto& ss) {
-      assert(ss.shutdown_gate.is_closed());
-      ss.addr = entity_addr_t();
-      ss.listener.reset();
-    });
-  }
-
 public:
-  FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
-  ~FixedCPUServerSocket() {
-    assert(!listener);
-    // detect whether user have called destroy() properly
-    ceph_assert(!service);
-  }
+  FixedCPUServerSocket(seastar::shard_id cpu, construct_tag);
+
+  ~FixedCPUServerSocket();
 
   FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
   FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
@@ -241,75 +162,23 @@ public:
 
   listen_ertr::future<> listen(entity_addr_t addr);
 
-  seastar::future<> accept(accept_func_t &&_fn_accept) {
-    assert(seastar::this_shard_id() == cpu);
-    logger().debug("FixedCPUServerSocket({})::accept()...", addr);
-    return container().invoke_on_all([_fn_accept](auto &ss) {
-      assert(ss.listener);
-      ss.fn_accept = _fn_accept;
-      // gate accepting
-      // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
-      // so ignore the returned future
-      std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
-        return seastar::keep_doing([&ss] {
-          return ss.listener->accept(
-          ).then([&ss](seastar::accept_result accept_result) {
-            // assert seastar::listen_options::set_fixed_cpu() works
-            assert(seastar::this_shard_id() == ss.cpu);
-            auto [socket, paddr] = std::move(accept_result);
-            entity_addr_t peer_addr;
-            peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-            peer_addr.set_type(ss.addr.get_type());
-            SocketRef _socket = std::make_unique<Socket>(
-                std::move(socket), Socket::side_t::acceptor,
-                peer_addr.get_port(), Socket::construct_tag{});
-            logger().debug("FixedCPUServerSocket({})::accept(): "
-                           "accepted peer {}, socket {}",
-                           ss.addr, peer_addr, fmt::ptr(_socket));
-            std::ignore = seastar::with_gate(
-                ss.shutdown_gate,
-                [socket=std::move(_socket), peer_addr, &ss]() mutable {
-              return ss.fn_accept(std::move(socket), peer_addr
-              ).handle_exception([&ss, peer_addr](auto eptr) {
-                const char *e_what;
-                try {
-                  std::rethrow_exception(eptr);
-                } catch (std::exception &e) {
-                  e_what = e.what();
-                }
-                logger().error("FixedCPUServerSocket({})::accept(): "
-                               "fn_accept(s, {}) got unexpected exception {}",
-                               ss.addr, peer_addr, e_what);
-                ceph_abort();
-              });
-            });
-          });
-        }).handle_exception_type([&ss](const std::system_error& e) {
-          if (e.code() == std::errc::connection_aborted ||
-              e.code() == std::errc::invalid_argument) {
-            logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
-                           ss.addr, e.what());
-          } else {
-            throw;
-          }
-        }).handle_exception([&ss](auto eptr) {
-          const char *e_what;
-          try {
-            std::rethrow_exception(eptr);
-          } catch (std::exception &e) {
-            e_what = e.what();
-          }
-          logger().error("FixedCPUServerSocket({})::accept(): "
-                         "got unexpected exception {}", ss.addr, e_what);
-          ceph_abort();
-        });
-      });
-    });
-  }
+  using accept_func_t =
+    std::function<seastar::future<>(SocketRef, entity_addr_t)>;
+  seastar::future<> accept(accept_func_t &&_fn_accept);
+
+  seastar::future<> shutdown_destroy();
 
-  seastar::future<> shutdown();
-  seastar::future<> destroy();
   static seastar::future<FixedCPUServerSocket*> create();
+
+private:
+  const seastar::shard_id fixed_cpu;
+  entity_addr_t listen_addr;
+  std::optional<seastar::server_socket> listener;
+  seastar::gate shutdown_gate;
+  accept_func_t fn_accept;
+
+  using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+  std::unique_ptr<sharded_service_t> service;
 };
 
 } // namespace crimson::net
index 8a83c1d59cefffdf6f2cad09a54dbb736aceb9ea..f0607c9a44597fa4426fc209bfa1d8bac5c43063 100644 (file)
@@ -255,7 +255,7 @@ seastar::future<> SocketMessenger::shutdown()
     if (listener) {
       auto d_listener = listener;
       listener = nullptr;
-      return d_listener->destroy();
+      return d_listener->shutdown_destroy();
     } else {
       return seastar::now();
     }
index 4861f0c91fa789760b433efea7b4ceffcf7acf1b..d43f536db1be90fce6dc3a5974d91682ba1e57c1 100644 (file)
@@ -93,7 +93,7 @@ future<> test_bind_same() {
           // runtime error: member access within null pointer of type 'struct promise_base'
           return seastar::now();
         })).then([pss2] {
-          return pss2->destroy();
+          return pss2->shutdown_destroy();
         });
       });
     }, listen_ertr::all_same_way(
@@ -102,7 +102,7 @@ future<> test_bind_same() {
                    saddr);
       ceph_abort();
     })).then([pss1] {
-      return pss1->destroy();
+      return pss1->shutdown_destroy();
     }).handle_exception([] (auto eptr) {
       logger.error("test_bind_same() got unexpeted exception {}", eptr);
       ceph_abort();
@@ -143,7 +143,7 @@ future<> test_accept() {
     }).then([] {
       logger.info("test_accept() ok\n");
     }).then([pss] {
-      return pss->destroy();
+      return pss->shutdown_destroy();
     }).handle_exception([] (auto eptr) {
       logger.error("test_accept() got unexpeted exception {}", eptr);
       ceph_abort();
@@ -199,7 +199,7 @@ class SocketFactory {
     }).then([psf] {
       if (psf->pss) {
         return seastar::smp::submit_to(1u, [psf] {
-          return psf->pss->destroy();
+          return psf->pss->shutdown_destroy();
         });
       }
       return seastar::now();