]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: move close logic from Protocol to ProtocolV2
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 3 Nov 2022 05:45:47 +0000 (13:45 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Protocol class will be removed.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc

index 27fd98f54aeb17ba2ccb943194463d1eac89e327..e38590a1aec86508ff503c3c7b69d057d6f9202b 100644 (file)
@@ -31,61 +31,6 @@ Protocol::~Protocol()
   assert(!out_exit_dispatching);
 }
 
-void Protocol::close(bool dispatch_reset,
-                     std::optional<std::function<void()>> f_accept_new)
-{
-  if (closed) {
-    // already closing
-    return;
-  }
-
-  bool is_replace = f_accept_new ? true : false;
-  logger().info("{} closing: reset {}, replace {}", conn,
-                dispatch_reset ? "yes" : "no",
-                is_replace ? "yes" : "no");
-
-  // atomic operations
-  closed = true;
-  trigger_close();
-  if (f_accept_new) {
-    (*f_accept_new)();
-  }
-  if (conn.socket) {
-    conn.socket->shutdown();
-  }
-  set_out_state(out_state_t::drop);
-  assert(!gate.is_closed());
-  auto gate_closed = gate.close();
-
-  if (dispatch_reset) {
-    dispatchers.ms_handle_reset(
-        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
-        is_replace);
-  }
-
-  // asynchronous operations
-  assert(!close_ready.valid());
-  close_ready = std::move(gate_closed).then([this] {
-    if (conn.socket) {
-      return conn.socket->close();
-    } else {
-      return seastar::now();
-    }
-  }).then([this] {
-    logger().debug("{} closed!", conn);
-    on_closed();
-#ifdef UNIT_TESTS_BUILT
-    is_closed_clean = true;
-    if (conn.interceptor) {
-      conn.interceptor->register_conn_closed(conn);
-    }
-#endif
-  }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
-    logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr);
-    ceph_abort();
-  });
-}
-
 ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
       size_t num_msgs,
       bool require_keepalive,
index c71b37f07c4d1a75a1e44ced2c1a2029bccb34ac..5260f05bae73aef89a91117723676bf433268a4a 100644 (file)
 namespace crimson::net {
 
 class Protocol {
+// public to SocketConnection
  public:
   Protocol(Protocol&&) = delete;
   virtual ~Protocol();
 
   virtual bool is_connected() const = 0;
 
+  virtual void close() = 0;
+
+  virtual seastar::future<> close_clean_yielded() = 0;
+
 #ifdef UNIT_TESTS_BUILT
-  bool is_closed_clean = false;
-  bool is_closed() const { return closed; }
-#endif
+  virtual bool is_closed_clean() const = 0;
 
-  // Reentrant closing
-  void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
-  seastar::future<> close_clean(bool dispatch_reset) {
-    // yield() so that close(dispatch_reset) can be called *after*
-    // close_clean() is applied to all connections in a container using
-    // seastar::parallel_for_each(). otherwise, we could erase a connection in
-    // the container when seastar::parallel_for_each() is still iterating in
-    // it. that'd lead to a segfault.
-    return seastar::yield(
-    ).then([this, dispatch_reset, conn_ref = conn.shared_from_this()] {
-      close(dispatch_reset);
-      // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
-      // which will otherwise result in deadlock
-      assert(close_ready.valid());
-      return close_ready.get_future();
-    });
-  }
+  virtual bool is_closed() const = 0;
 
+#endif
   virtual void start_connect(const entity_addr_t& peer_addr,
                              const entity_name_t& peer_name) = 0;
 
@@ -56,8 +44,6 @@ class Protocol {
   Protocol(ChainedDispatchers& dispatchers,
            SocketConnection& conn);
 
-  virtual void trigger_close() = 0;
-
   virtual ceph::bufferlist do_sweep_messages(
       const std::deque<MessageURef>& msgs,
       size_t num_msgs,
@@ -67,13 +53,6 @@ class Protocol {
 
   virtual void notify_out() = 0;
 
-  virtual void on_closed() = 0;
-
- private:
-  bool closed = false;
-  // become valid only after closed == true
-  seastar::shared_future<> close_ready;
-
 // the write state-machine
  public:
   using clock_t = seastar::lowres_system_clock;
index 0c911afdc2729a05653c59bac4c888deef843564..794698b96e18d8242309dce19f0c27408c7ba8e5 100644 (file)
@@ -68,9 +68,9 @@ seastar::logger& logger() {
   throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
 }
 
-[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
-  proto.close(dispatch_reset);
-  abort_protocol();
+#define ABORT_IN_CLOSE(dispatch_reset) { \
+  do_close(dispatch_reset);              \
+  abort_protocol();                      \
 }
 
 inline void expect_tag(const Tag& expected,
@@ -377,7 +377,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
   if (conn.policy.lossy) {
     logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
                   conn, func_name, get_state_name(state), eptr);
-    close(true);
+    do_close(true);
   } else if (conn.policy.server ||
              (conn.policy.standby && !is_out_queued_or_sent())) {
     logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
@@ -486,13 +486,13 @@ ProtocolV2::banner_exchange(bool is_connect)
         logger().error("{} peer does not support all required features"
                        " required={} peer_supported={}",
                        conn, required_features, _peer_supported_features);
-        abort_in_close(*this, is_connect);
+        ABORT_IN_CLOSE(is_connect);
       }
       if ((supported_features & _peer_required_features) != _peer_required_features) {
         logger().error("{} we do not support all peer required features"
                        " peer_required={} supported={}",
                        conn, _peer_required_features, supported_features);
-        abort_in_close(*this, is_connect);
+        ABORT_IN_CLOSE(is_connect);
       }
       peer_supported_features = _peer_supported_features;
       bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
@@ -617,7 +617,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
     });
   } catch (const crimson::auth::error& e) {
     logger().error("{} get_initial_auth_request returned {}", conn, e.what());
-    abort_in_close(*this, true);
+    ABORT_IN_CLOSE(true);
     return seastar::now();
   }
 }
@@ -715,7 +715,7 @@ ProtocolV2::client_connect()
             logger().error("{} connection peer id ({}) does not match "
                            "what it should be ({}) during connecting, close",
                             conn, server_ident.gid(), conn.get_peer_id());
-            abort_in_close(*this, true);
+            ABORT_IN_CLOSE(true);
           }
           conn.set_peer_id(server_ident.gid());
           conn.set_features(server_ident.supported_features() &
@@ -865,7 +865,7 @@ void ProtocolV2::execute_connecting()
             logger().warn("{} connection peer type does not match what peer advertises {} != {}",
                           conn, ceph_entity_type_name(conn.get_peer_type()),
                           ceph_entity_type_name(_peer_type));
-            abort_in_close(*this, true);
+            ABORT_IN_CLOSE(true);
           }
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} during banner_exchange(), abort",
@@ -1090,7 +1090,7 @@ ProtocolV2::reuse_connection(
   // close this connection because all the necessary information is delivered
   // to the exisiting connection, and jump to error handling code to abort the
   // current state.
-  abort_in_close(*this, false);
+  ABORT_IN_CLOSE(false);
   return seastar::make_ready_future<next_step_t>(next_step_t::none);
 }
 
@@ -1520,7 +1520,7 @@ void ProtocolV2::execute_accepting()
         }).handle_exception([this] (std::exception_ptr eptr) {
           logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
                         conn, get_state_name(state), eptr);
-          close(false);
+          do_close(false);
         });
     });
 }
@@ -1580,7 +1580,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
 
   trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
   if (existing_conn) {
-    existing_conn->protocol->close(
+    static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
         true /* dispatch_reset */, std::move(accept_me));
     if (unlikely(state != state_t::ESTABLISHING)) {
       logger().warn("{} triggered {} during execute_establishing(), "
@@ -2083,19 +2083,59 @@ void ProtocolV2::execute_server_wait()
     }).handle_exception([this] (std::exception_ptr eptr) {
       logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
                     conn, get_state_name(state), eptr);
-      close(false);
+      do_close(false);
     });
   });
 }
 
 // CLOSING state
 
-void ProtocolV2::trigger_close()
+void ProtocolV2::close()
 {
+  do_close(false);
+}
+
+seastar::future<> ProtocolV2::close_clean_yielded()
+{
+  // yield() so that do_close() can be called *after* close_clean_yielded() is
+  // applied to all connections in a container using
+  // seastar::parallel_for_each(). otherwise, we could erase a connection in
+  // the container when seastar::parallel_for_each() is still iterating in it.
+  // that'd lead to a segfault.
+  return seastar::yield(
+  ).then([this, conn_ref = conn.shared_from_this()] {
+    do_close(false);
+    // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
+    // which will otherwise result in deadlock
+    assert(closed_clean_fut.valid());
+    return closed_clean_fut.get_future();
+  });
+}
+
+void ProtocolV2::do_close(
+    bool dispatch_reset,
+    std::optional<std::function<void()>> f_accept_new)
+{
+  if (closed) {
+    // already closing
+    return;
+  }
+
+  bool is_replace = f_accept_new ? true : false;
+  logger().info("{} closing: reset {}, replace {}", conn,
+                dispatch_reset ? "yes" : "no",
+                is_replace ? "yes" : "no");
+
+  /*
+   * atomic operations
+   */
+
+  closed = true;
+
+  // trigger close
   messenger.closing_conn(
       seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this()));
-
   if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
     messenger.unaccept_conn(
       seastar::static_pointer_cast<SocketConnection>(
@@ -2108,16 +2148,48 @@ void ProtocolV2::trigger_close()
     // cannot happen
     ceph_assert(false);
   }
-
   protocol_timer.cancel();
   trigger_state(state_t::CLOSING, out_state_t::drop, false);
-}
 
-void ProtocolV2::on_closed()
-{
-  messenger.closed_conn(
-      seastar::static_pointer_cast<SocketConnection>(
-       conn.shared_from_this()));
+  if (f_accept_new) {
+    (*f_accept_new)();
+  }
+  if (conn.socket) {
+    conn.socket->shutdown();
+  }
+  assert(!gate.is_closed());
+  auto gate_closed = gate.close();
+
+  if (dispatch_reset) {
+    dispatchers.ms_handle_reset(
+        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+        is_replace);
+  }
+
+  // asynchronous operations
+  assert(!closed_clean_fut.valid());
+  closed_clean_fut = std::move(gate_closed).then([this] {
+    if (conn.socket) {
+      return conn.socket->close();
+    } else {
+      return seastar::now();
+    }
+  }).then([this] {
+    logger().debug("{} closed!", conn);
+    messenger.closed_conn(
+        seastar::static_pointer_cast<SocketConnection>(
+          conn.shared_from_this()));
+#ifdef UNIT_TESTS_BUILT
+    closed_clean = true;
+    if (conn.interceptor) {
+      conn.interceptor->register_conn_closed(conn);
+    }
+#endif
+  }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+    logger().error("{} closing: closed_clean_fut got unexpected exception {}",
+                   conn, eptr);
+    ceph_abort();
+  });
 }
 
 void ProtocolV2::print_conn(std::ostream& out) const
index cfd5781ff0435ac75f246df82cc7811029232fcf..40c32136b705de2127b48c09e764c6a5a28984f3 100644 (file)
@@ -20,20 +20,34 @@ class ProtocolV2 final : public Protocol {
              SocketConnection& conn,
              SocketMessenger& messenger);
   ~ProtocolV2() override;
-  void print_conn(std::ostream&) const final;
 
+// public to SocketConnection, but private to the others
  private:
-  void on_closed() override;
   bool is_connected() const override;
 
+  void close() override;
+
+  seastar::future<> close_clean_yielded() override;
+
+#ifdef UNIT_TESTS_BUILT
+  bool is_closed_clean() const override {
+    return closed_clean;
+  }
+
+  bool is_closed() const override {
+    return closed;
+  }
+
+#endif
   void start_connect(const entity_addr_t& peer_addr,
                      const entity_name_t& peer_name) override;
 
   void start_accept(SocketRef&& socket,
                     const entity_addr_t& peer_addr) override;
 
-  void trigger_close() override;
+  void print_conn(std::ostream&) const final;
 
+ private:
   ceph::bufferlist do_sweep_messages(
       const std::deque<MessageURef>& msgs,
       size_t num_msgs,
@@ -48,6 +62,15 @@ class ProtocolV2 final : public Protocol {
 
   AuthConnectionMetaRef auth_meta;
 
+  bool closed = false;
+
+  // become valid only after closed == true
+  seastar::shared_future<> closed_clean_fut;
+
+#ifdef UNIT_TESTS_BUILT
+  bool closed_clean = false;
+
+#endif
   enum class state_t {
     NONE = 0,
     ACCEPTING,
@@ -229,6 +252,11 @@ class ProtocolV2 final : public Protocol {
 
   // SERVER_WAIT
   void execute_server_wait();
+
+  // CLOSING
+  // reentrant
+  void do_close(bool dispatch_reset,
+                std::optional<std::function<void()>> f_accept_new=std::nullopt);
 };
 
 } // namespace crimson::net
index bd7259c6e7c94babe8b95a9e1f74aec37285a466..9f989e21c2f7f55afae283b2944df4e2bd11fd5c 100644 (file)
@@ -57,7 +57,7 @@ bool SocketConnection::is_closed() const
 bool SocketConnection::is_closed_clean() const
 {
   assert(seastar::this_shard_id() == shard_id());
-  return protocol->is_closed_clean;
+  return protocol->is_closed_clean();
 }
 
 #endif
@@ -104,7 +104,7 @@ void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
 void SocketConnection::mark_down()
 {
   assert(seastar::this_shard_id() == shard_id());
-  protocol->close(false);
+  protocol->close();
 }
 
 void
@@ -122,9 +122,9 @@ SocketConnection::start_accept(SocketRef&& sock,
 }
 
 seastar::future<>
-SocketConnection::close_clean(bool dispatch_reset)
+SocketConnection::close_clean_yielded()
 {
-  return protocol->close_clean(dispatch_reset);
+  return protocol->close_clean_yielded();
 }
 
 seastar::shard_id SocketConnection::shard_id() const {
index ea18407e4591eb65574d3d2b8c355728a3ea38cd..76703c76337c4e90abdc5170e7094576cd006ab5 100644 (file)
@@ -120,7 +120,7 @@ class SocketConnection : public Connection {
   void start_accept(SocketRef&& socket,
                     const entity_addr_t& peer_addr);
 
-  seastar::future<> close_clean(bool dispatch_reset);
+  seastar::future<> close_clean_yielded();
 
   seastar::socket_address get_local_address() const;
 
index 6bfb0f341de29d857a11db30697e9908fccfa2c8..b5cb03932b84fd134692f16d08e1aa4e7f396c57 100644 (file)
@@ -262,16 +262,16 @@ seastar::future<> SocketMessenger::shutdown()
   // close all connections
   }).then([this] {
     return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
-      return conn->close_clean(false);
+      return conn->close_clean_yielded();
     });
   }).then([this] {
     ceph_assert(accepting_conns.empty());
     return seastar::parallel_for_each(connections, [] (auto conn) {
-      return conn.second->close_clean(false);
+      return conn.second->close_clean_yielded();
     });
   }).then([this] {
     return seastar::parallel_for_each(closing_conns, [] (auto conn) {
-      return conn->close_clean(false);
+      return conn->close_clean_yielded();
     });
   }).then([this] {
     ceph_assert(connections.empty());