]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: fix is_connected() to identify if handshake has completed 34332/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 31 Mar 2020 08:07:13 +0000 (16:07 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 1 Apr 2020 03:49:08 +0000 (11:49 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV1.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index c95cf1d08b54734c9a302ea6684bdd8633866069..0cbbe4a49dbdae1f603e81c2d9eac1328a77020f 100644 (file)
@@ -35,11 +35,6 @@ Protocol::~Protocol()
   assert(!exit_open);
 }
 
-bool Protocol::is_connected() const
-{
-  return write_state == write_state_t::open;
-}
-
 void Protocol::close(bool dispatch_reset,
                      std::optional<std::function<void()>> f_accept_new)
 {
index a6c922c33f1314eba7e3490bf4388346f4c78859..290cb3fb8432b744b056411a7f3adea82537575c 100644 (file)
@@ -23,7 +23,7 @@ class Protocol {
   Protocol(Protocol&&) = delete;
   virtual ~Protocol();
 
-  bool is_connected() const;
+  virtual bool is_connected() const = 0;
 
 #ifdef UNIT_TESTS_BUILT
   bool is_closed_clean = false;
index a5a813a81a6f17544de897f2f628f298cf63ccbb..d0c677df9a55068637761ea23767b85c0d16accf 100644 (file)
@@ -131,6 +131,11 @@ ProtocolV1::ProtocolV1(Dispatcher& dispatcher,
 
 ProtocolV1::~ProtocolV1() {}
 
+bool ProtocolV1::is_connected() const
+{
+  return state == state_t::open;
+}
+
 // connecting state
 
 void ProtocolV1::reset_session()
@@ -368,12 +373,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
             return repeat_connect();
           });
         }).then([this] {
-          // notify the dispatcher and allow them to reject the connection
-          return dispatcher.ms_handle_connect(
-            seastar::static_pointer_cast<SocketConnection>(
-              conn.shared_from_this()));
-        }).then([this] {
-          execute_open();
+          execute_open(open_t::connected);
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the connecting state
           logger().warn("{} connecting fault: {}", conn, eptr);
@@ -656,16 +656,12 @@ void ProtocolV1::start_accept(SocketRef&& sock,
           return seastar::repeat([this] {
             return repeat_handle_connect();
           });
-        }).then([this] {
-          // notify the dispatcher and allow them to reject the connection
-          return dispatcher.ms_handle_accept(
-            seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
         }).then([this] {
           messenger.register_conn(
             seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
           messenger.unaccept_conn(
             seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-          execute_open();
+          execute_open(open_t::accepted);
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the accepting state
           logger().warn("{} accepting fault: {}", conn, eptr);
@@ -890,12 +886,24 @@ seastar::future<> ProtocolV1::handle_tags()
     });
 }
 
-void ProtocolV1::execute_open()
+void ProtocolV1::execute_open(open_t type)
 {
   logger().trace("{} trigger open, was {}", conn, static_cast<int>(state));
   state = state_t::open;
   set_write_state(write_state_t::open);
 
+  if (type == open_t::connected) {
+    gated_dispatch("ms_handle_connect", [this] {
+      return dispatcher.ms_handle_connect(
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+    });
+  } else { // type == open_t::accepted
+    gated_dispatch("ms_handle_accept", [this] {
+      return dispatcher.ms_handle_accept(
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+    });
+  }
+
   gated_dispatch("execute_open", [this] {
       // start background processing of tags
       return handle_tags()
index 31a6ddc2eae213168c55b2aae7653c545fdacad8..8278230f779cb0a02005462a97e8e6003849aec3 100644 (file)
@@ -18,6 +18,8 @@ class ProtocolV1 final : public Protocol {
   ~ProtocolV1() override;
 
  private:
+  bool is_connected() const override;
+
   void start_connect(const entity_addr_t& peer_addr,
                      const entity_name_t& peer_name) override;
 
@@ -110,7 +112,12 @@ class ProtocolV1 final : public Protocol {
   seastar::future<> maybe_throttle();
   seastar::future<> read_message();
   seastar::future<> handle_tags();
-  void execute_open();
+
+  enum class open_t {
+    connected,
+    accepted
+  };
+  void execute_open(open_t type);
 
   // replacing
   // the number of connections initiated in this session, increment when a
index e823ece20e512d33cbb25d4fbaeee530d693edea..3f590e66b0d57a7113e18659fa298a992ea82cdf 100644 (file)
@@ -153,6 +153,12 @@ ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
 
 ProtocolV2::~ProtocolV2() {}
 
+bool ProtocolV2::is_connected() const {
+  return state == state_t::READY ||
+         state == state_t::ESTABLISHING ||
+         state == state_t::REPLACING;
+}
+
 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
                                const entity_name_t& _peer_name)
 {
@@ -937,17 +943,13 @@ void ProtocolV2::execute_connecting()
           }
           switch (next) {
            case next_step_t::ready: {
-            gated_dispatch("ms_handle_connect", [this] {
-              return dispatcher.ms_handle_connect(
-                  seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-            });
             logger().info("{} connected:"
                           " gs={}, pgs={}, cs={}, client_cookie={},"
                           " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
                           conn, global_seq, peer_global_seq, connect_seq,
                           client_cookie, server_cookie, conn.in_seq,
                           conn.out_seq, conn.out_q.size());
-            execute_ready();
+            execute_ready(true);
             break;
            }
            case next_step_t::wait: {
@@ -1669,7 +1671,7 @@ void ProtocolV2::execute_establishing(
                     conn, global_seq, peer_global_seq, connect_seq,
                     client_cookie, server_cookie, conn.in_seq,
                     conn.out_seq, conn.out_q.size());
-      execute_ready();
+      execute_ready(false);
     }).handle_exception([this] (std::exception_ptr eptr) {
       if (state != state_t::ESTABLISHING) {
         logger().info("{} execute_establishing() protocol aborted at {} -- {}",
@@ -1822,7 +1824,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                     conn, reconnect ? "reconnected" : "connected",
                     global_seq, peer_global_seq, connect_seq, client_cookie,
                     server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
-      execute_ready();
+      execute_ready(false);
     }).handle_exception([this] (std::exception_ptr eptr) {
       if (state != state_t::REPLACING) {
         logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
@@ -1985,10 +1987,16 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
   });
 }
 
-void ProtocolV2::execute_ready()
+void ProtocolV2::execute_ready(bool dispatch_connect)
 {
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   trigger_state(state_t::READY, write_state_t::open, false);
+  if (dispatch_connect) {
+    gated_dispatch("ms_handle_connect", [this] {
+      return dispatcher.ms_handle_connect(
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+    });
+  }
 #ifdef UNIT_TESTS_BUILT
   if (conn.interceptor) {
     conn.interceptor->register_conn_ready(conn);
index 5d3a6742181205500dc98131647190cafdee6b30..53d27a3603f8daf558705506e2f7bdd134ebf099 100644 (file)
@@ -19,6 +19,8 @@ class ProtocolV2 final : public Protocol {
   ~ProtocolV2() override;
 
  private:
+  bool is_connected() const override;
+
   void start_connect(const entity_addr_t& peer_addr,
                      const entity_name_t& peer_name) override;
 
@@ -204,7 +206,7 @@ class ProtocolV2 final : public Protocol {
 
   // READY
   seastar::future<> read_message(utime_t throttle_stamp);
-  void execute_ready();
+  void execute_ready(bool dispatch_connect);
 
   // STANDBY
   void execute_standby();