]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: enforce strict reset/accept order
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 12 Mar 2020 07:59:53 +0000 (15:59 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 20 Mar 2020 08:07:48 +0000 (16:07 +0800)
When a new connection tries to replace the old one, the event order
should be like:
1. reset(old);
2. accept(new);

This means we cannot just reschedule the reset event asynchronously. And
we still need to make sure the internal state is integral when reset.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/test/crimson/test_messenger.cc

index 9c51c900342fed726bce11ca008b389fba85f302..2183543efb4756e692dd3f923e7d31d2dc309339 100644 (file)
@@ -40,7 +40,8 @@ bool Protocol::is_connected() const
   return write_state == write_state_t::open;
 }
 
-void Protocol::close(bool dispatch_reset)
+void Protocol::close(bool dispatch_reset,
+                     std::optional<std::function<void()>> f_accept_new)
 {
   if (closed) {
     // already closing
@@ -64,12 +65,25 @@ void Protocol::close(bool dispatch_reset)
 
   // atomic operations
   trigger_close();
+  if (f_accept_new) {
+    (*f_accept_new)();
+  }
   if (socket) {
     socket->shutdown();
   }
   closed = true;
   set_write_state(write_state_t::drop);
   auto gate_closed = pending_dispatch.close();
+  auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] {
+    if (dispatch_reset) {
+      return dispatcher.ms_handle_reset(
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+    }
+    return seastar::now();
+  }).handle_exception([this] (std::exception_ptr eptr) {
+    logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
+    ceph_abort("unexpected exception from ms_handle_reset()");
+  });
 
   // asynchronous operations
   close_ready = seastar::when_all_succeed(
@@ -79,20 +93,7 @@ void Protocol::close(bool dispatch_reset)
       }
       return seastar::now();
     }),
-    [this, dispatch_reset] {
-      if (dispatch_reset) {
-        // force ms_handle_reset() to be an asynchronous task to prevent
-        // internal state contamination.
-        return seastar::sleep(0s).then([this] {
-          return dispatcher.ms_handle_reset(
-              seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-        }).handle_exception([this] (std::exception_ptr eptr) {
-          logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
-          ceph_abort("unexpected exception from ms_handle_reset()");
-        });
-      }
-      return seastar::now();
-    }
+    std::move(reset_dispatched)
   ).finally(std::move(cleanup));
 }
 
index df9a12aa45e76535185afe4b5f20822a444f7ccc..bb73746e12b7e1277dfef5a17de9fdbe1d2f1748 100644 (file)
@@ -30,7 +30,7 @@ class Protocol {
 #endif
 
   // Reentrant closing
-  void close(bool dispatch_reset);
+  void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
   seastar::future<> close_clean(bool dispatch_reset) {
     close(dispatch_reset);
     return close_ready.get_future();
index 5169f78b50c5cbe37b830073fa990878b270fccd..c5732554221c1174a1a25996d9cdc86bc802aa62 100644 (file)
@@ -1158,14 +1158,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     logger().warn("{} server_connect:"
                   " existing connection {} is a lossy channel. Close existing in favor of"
                   " this connection", conn, *existing_conn);
-    existing_proto->close(true);
-
-    if (unlikely(state != state_t::ACCEPTING)) {
-      logger().debug("{} triggered {} in execute_accepting()",
-                     conn, get_state_name(state));
-      abort_protocol();
-    }
-    execute_establishing();
+    execute_establishing(existing_conn, true);
     return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   }
 
@@ -1289,6 +1282,7 @@ ProtocolV2::server_connect()
 
     SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
 
+    bool dispatch_reset = true;
     if (existing_conn) {
       if (existing_conn->protocol->proto_type != proto_t::v2) {
         logger().warn("{} existing connection {} proto version is {}, close existing",
@@ -1296,18 +1290,13 @@ ProtocolV2::server_connect()
                       static_cast<int>(existing_conn->protocol->proto_type));
         // should unregister the existing from msgr atomically
         // NOTE: this is following async messenger logic, but we may miss the reset event.
-        (void) existing_conn->close();
+        dispatch_reset = false;
       } else {
         return handle_existing_connection(existing_conn);
       }
     }
 
-    if (unlikely(state != state_t::ACCEPTING)) {
-      logger().debug("{} triggered {} in execute_accepting()",
-                     conn, get_state_name(state));
-      abort_protocol();
-    }
-    execute_establishing();
+    execute_establishing(existing_conn, dispatch_reset);
     return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   });
 }
@@ -1602,8 +1591,30 @@ seastar::future<> ProtocolV2::finish_auth()
 
 // ESTABLISHING
 
-void ProtocolV2::execute_establishing() {
+void ProtocolV2::execute_establishing(
+    SocketConnectionRef existing_conn, bool dispatch_reset) {
+  if (unlikely(state != state_t::ACCEPTING)) {
+    logger().debug("{} triggered {} before execute_establishing()",
+                   conn, get_state_name(state));
+    abort_protocol();
+  }
+
+  auto accept_me = [this] {
+    messenger.register_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+    messenger.unaccept_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+  };
+
   trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+  if (existing_conn) {
+    existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+  } else {
+    accept_me();
+  }
+
   (void) seastar::with_gate(pending_dispatch, [this] {
     return dispatcher.ms_handle_accept(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
@@ -1611,12 +1622,7 @@ void ProtocolV2::execute_establishing() {
     logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
     ceph_abort("unexpected exception from ms_handle_accept()");
   });
-  messenger.register_conn(
-    seastar::static_pointer_cast<SocketConnection>(
-      conn.shared_from_this()));
-  messenger.unaccept_conn(
-    seastar::static_pointer_cast<SocketConnection>(
-      conn.shared_from_this()));
+
   execution_done = seastar::with_gate(pending_dispatch, [this] {
     return seastar::futurize_apply([this] {
       return send_server_ident();
index 7a64d410942ea9690c5bf2c879e70b094c286b19..4aa7b276070900fc823e291f32dec5a4101e9671 100644 (file)
@@ -173,7 +173,7 @@ class ProtocolV2 final : public Protocol {
   seastar::future<> finish_auth();
 
   // ESTABLISHING
-  void execute_establishing();
+  void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset);
 
   // ESTABLISHING/REPLACING (server)
   seastar::future<> send_server_ident();
index cc8e96856075d7da6c75d55ee9fdd43a138214f2..1f30f947e336dbf34dee88c0b9fd38827b48892f 100644 (file)
@@ -1375,6 +1375,7 @@ class FailoverSuitePeer : public Dispatcher {
   }
 
   seastar::future<> ms_handle_accept(ConnectionRef conn) override {
+    logger().info("[TestPeer] got accept from Test");
     ceph_assert(!tracked_conn ||
                 tracked_conn->is_closed() ||
                 tracked_conn == conn);
@@ -1383,6 +1384,7 @@ class FailoverSuitePeer : public Dispatcher {
   }
 
   seastar::future<> ms_handle_reset(ConnectionRef conn) override {
+    logger().info("[TestPeer] got reset from Test");
     ceph_assert(tracked_conn == conn);
     tracked_conn = nullptr;
     return seastar::now();