]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: allow FrameAssemblerV2 to check and shutdown a foreign socket
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 26 Apr 2023 09:27:03 +0000 (17:27 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/io_handler.cc

index 4a56829493a07f67878c300e1bd15cae882dad73..4608f83da533ed9aee89a3367bed5b2193c48143 100644 (file)
@@ -135,7 +135,12 @@ bool FrameAssemblerV2::has_socket() const
 bool FrameAssemblerV2::is_socket_valid() const
 {
   assert(seastar::this_shard_id() == sid);
-  return has_socket() && !socket->is_shutdown();
+#ifndef NDEBUG
+  if (has_socket() && socket->get_shard_id() == sid) {
+    assert(socket->is_shutdown() == is_socket_shutdown);
+  }
+#endif
+  return has_socket() && !is_socket_shutdown;
 }
 
 SocketFRef FrameAssemblerV2::move_socket()
@@ -152,6 +157,7 @@ void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
   assert(new_socket);
   socket = std::move(new_socket);
   conn.set_socket(socket.get());
+  is_socket_shutdown = false;
   assert(is_socket_valid());
 }
 
@@ -162,30 +168,55 @@ void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
   socket->learn_ephemeral_port_as_connector(port);
 }
 
-void FrameAssemblerV2::shutdown_socket()
+template <bool may_cross_core>
+void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *gate)
 {
   assert(seastar::this_shard_id() == sid);
   assert(is_socket_valid());
-  socket->shutdown();
+  is_socket_shutdown = true;
+  if constexpr (may_cross_core) {
+    assert(conn.get_messenger_shard_id() == sid);
+    assert(gate);
+    gate->dispatch_in_background("shutdown_socket", conn, [this] {
+      return seastar::smp::submit_to(
+          socket->get_shard_id(), [this] {
+        socket->shutdown();
+      });
+    });
+  } else {
+    assert(socket->get_shard_id() == sid);
+    assert(!gate);
+    socket->shutdown();
+  }
 }
+template void FrameAssemblerV2::shutdown_socket<true>(crimson::common::Gated *);
+template void FrameAssemblerV2::shutdown_socket<false>(crimson::common::Gated *);
 
 seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  assert(socket->is_shutdown());
+  assert(!is_socket_valid());
   auto old_socket = move_socket();
+  auto old_socket_shard_id = old_socket->get_shard_id();
   set_socket(std::move(new_socket));
-  return old_socket->close(
-  ).then([sock = std::move(old_socket)] {});
+  return seastar::smp::submit_to(
+      old_socket_shard_id,
+      [old_socket = std::move(old_socket)]() mutable {
+    return old_socket->close(
+    ).then([sock = std::move(old_socket)] {});
+  });
 }
 
 seastar::future<> FrameAssemblerV2::close_shutdown_socket()
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  assert(socket->is_shutdown());
-  return socket->close();
+  assert(!is_socket_valid());
+  return seastar::smp::submit_to(
+      socket->get_shard_id(), [this] {
+    return socket->close();
+  });
 }
 
 seastar::future<ceph::bufferptr>
index d31e9c4b78d86e10673fda574a2a91c29e273db8..0cc495574c2fe7ca1e4bfcdaaa15d506be1746b3 100644 (file)
@@ -7,6 +7,7 @@
 #include "msg/async/crypto_onwire.h"
 #include "msg/async/compression_onwire.h"
 
+#include "crimson/common/gated.h"
 #include "crimson/net/Socket.h"
 
 namespace crimson::net {
@@ -70,7 +71,9 @@ public:
 
   void learn_socket_ephemeral_port_as_connector(uint16_t port);
 
-  void shutdown_socket();
+  // if may_cross_core == true, gate is required for cross-core shutdown
+  template <bool may_cross_core>
+  void shutdown_socket(crimson::common::Gated *gate);
 
   seastar::future<> replace_shutdown_socket(SocketFRef &&);
 
@@ -141,6 +144,10 @@ private:
 
   SocketFRef socket;
 
+  // checking Socket::is_shutdown() synchronously is impossible when sid is
+  // different from the socket sid.
+  bool is_socket_shutdown = false;
+
   seastar::shard_id sid;
 
   /*
index ee8f1a99906f0bc0c565ce741a8e30690212f806..ae532acb08374c047b8e881d175cd678ebce28b7 100644 (file)
@@ -225,6 +225,7 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
       return io_handler.wait_io_exit_dispatching(
       ).then([this](FrameAssemblerV2Ref fa) {
         frame_assembler = std::move(fa);
+        ceph_assert_always(!frame_assembler->is_socket_valid());
         exit_io->set_value();
         exit_io = std::nullopt;
       });
@@ -283,7 +284,7 @@ void ProtocolV2::fault(
   if (likely(has_socket)) {
     if (likely(is_socket_valid)) {
       ceph_assert_always(state != state_t::READY);
-      frame_assembler->shutdown_socket();
+      frame_assembler->shutdown_socket<true>(&gate);
       is_socket_valid = false;
     } else {
       ceph_assert_always(state != state_t::ESTABLISHING);
@@ -899,7 +900,7 @@ void ProtocolV2::execute_connecting()
            case next_step_t::wait: {
             logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
             ceph_assert_always(is_socket_valid);
-            frame_assembler->shutdown_socket();
+            frame_assembler->shutdown_socket<true>(&gate);
             is_socket_valid = false;
             execute_wait(true);
             break;
@@ -1723,7 +1724,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
   ceph_assert_always(!mover.socket->is_shutdown());
   trigger_state(state_t::REPLACING, io_state_t::delay, false);
   if (is_socket_valid) {
-    frame_assembler->shutdown_socket();
+    frame_assembler->shutdown_socket<true>(&gate);
     is_socket_valid = false;
   }
   gate.dispatch_in_background(
@@ -1981,7 +1982,7 @@ void ProtocolV2::do_close(
     (*f_accept_new)();
   }
   if (is_socket_valid) {
-    frame_assembler->shutdown_socket();
+    frame_assembler->shutdown_socket<true>(&gate);
     is_socket_valid = false;
   }
   assert(!gate.is_closed());
index 70e6650c699cb44110509b356dbb9e77045a409b..0c0cdc76a2cdac54e6ae677f72a6b5252c911030 100644 (file)
@@ -203,7 +203,7 @@ void IOHandler::set_io_state(
     protocol_is_connected = false;
     assert(fa == nullptr);
     ceph_assert_always(frame_assembler->is_socket_valid());
-    frame_assembler->shutdown_socket();
+    frame_assembler->shutdown_socket<false>(nullptr);
     if (out_dispatching) {
       ceph_assert_always(!out_exit_dispatching.has_value());
       out_exit_dispatching = seastar::promise<>();