]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: close() with ms_handle_reset()
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 12 Mar 2020 04:45:38 +0000 (12:45 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 20 Mar 2020 08:07:48 +0000 (16:07 +0800)
* ms_handle_reset() should not be able to contaminate the internal
atomic messenger status, so make it an asynchronous event along
with close();

* add is_closed_clean() for messenger unit test, because the reset event
now happens after connection closed.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Connection.h
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/test/crimson/test_messenger.cc

index d43d61b699e6488b2999e2415e2d59626d80e8d8..3beb8f42bb5698c491aa8eba5ee9ba961b692fd4 100644 (file)
@@ -95,6 +95,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
 #ifdef UNIT_TESTS_BUILT
   virtual bool is_closed() const = 0;
 
+  virtual bool is_closed_clean() const = 0;
+
   virtual bool peer_wins() const = 0;
 #endif
 
index bf2633c1c221444988580273351ff48fb44b53c8..9c51c900342fed726bce11ca008b389fba85f302 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "crimson/common/log.h"
 #include "crimson/net/Errors.h"
+#include "crimson/net/Dispatcher.h"
 #include "crimson/net/Socket.h"
 #include "crimson/net/SocketConnection.h"
 #include "msg/Message.h"
@@ -39,37 +40,60 @@ bool Protocol::is_connected() const
   return write_state == write_state_t::open;
 }
 
-seastar::future<> Protocol::close()
+void Protocol::close(bool dispatch_reset)
 {
   if (closed) {
     // already closing
     assert(close_ready.valid());
-    return close_ready.get_future();
+    return;
   }
 
   // unregister_conn() drops a reference, so hold another until completion
   auto cleanup = [conn_ref = conn.shared_from_this(), this] {
       logger().debug("{} closed!", conn);
+#ifdef UNIT_TESTS_BUILT
+      is_closed_clean = true;
+      if (conn.interceptor) {
+        conn.interceptor->register_conn_closed(conn);
+      }
+#endif
     };
 
-  trigger_close();
-
   // close_ready become valid only after state is state_t::closing
   assert(!close_ready.valid());
 
+  // atomic operations
+  trigger_close();
   if (socket) {
     socket->shutdown();
-    close_ready = pending_dispatch.close().finally([this] {
-      return socket->close();
-    }).finally(std::move(cleanup));
-  } else {
-    close_ready = pending_dispatch.close().finally(std::move(cleanup));
   }
-
   closed = true;
   set_write_state(write_state_t::drop);
+  auto gate_closed = pending_dispatch.close();
 
-  return close_ready.get_future();
+  // asynchronous operations
+  close_ready = seastar::when_all_succeed(
+    std::move(gate_closed).finally([this] {
+      if (socket) {
+        return socket->close();
+      }
+      return seastar::now();
+    }),
+    [this, dispatch_reset] {
+      if (dispatch_reset) {
+        // force ms_handle_reset() to be an asynchronous task to prevent
+        // internal state contamination.
+        return seastar::sleep(0s).then([this] {
+          return dispatcher.ms_handle_reset(
+              seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
+          ceph_abort("unexpected exception from ms_handle_reset()");
+        });
+      }
+      return seastar::now();
+    }
+  ).finally(std::move(cleanup));
 }
 
 seastar::future<> Protocol::send(MessageRef msg)
index 4df2549c37c9a4c51dde917d111fc4daf58699dc..df9a12aa45e76535185afe4b5f20822a444f7ccc 100644 (file)
@@ -24,10 +24,17 @@ class Protocol {
 
   bool is_connected() const;
 
+#ifdef UNIT_TESTS_BUILT
+  bool is_closed_clean = false;
   bool is_closed() const { return closed; }
+#endif
 
   // Reentrant closing
-  seastar::future<> close();
+  void close(bool dispatch_reset);
+  seastar::future<> close_clean(bool dispatch_reset) {
+    close(dispatch_reset);
+    return close_ready.get_future();
+  }
 
   virtual void start_connect(const entity_addr_t& peer_addr,
                              const entity_type_t& peer_type) = 0;
index 3eac0d2c924312a4e7b82015901206de077ba537..becb7d6812563b83076b90e0190c3e4dddea316f 100644 (file)
@@ -377,7 +377,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the connecting state
           logger().warn("{} connecting fault: {}", conn, eptr);
-          (void) close();
+          close(false);
         });
     });
 }
@@ -663,7 +663,7 @@ void ProtocolV1::start_accept(SocketRef&& sock,
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the accepting state
           logger().warn("{} accepting fault: {}", conn, eptr);
-          (void) close();
+          close(false);
         });
     });
 }
@@ -901,16 +901,13 @@ void ProtocolV1::execute_open()
           logger().warn("{} open fault: {}", conn, e);
           if (e.code() == error::protocol_aborted ||
               e.code() == std::errc::connection_reset) {
-            return dispatcher.ms_handle_reset(
-                seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
-              .then([this] {
-                (void) close();
-              });
+            close(true);
+            return seastar::now();
           } else if (e.code() == error::read_eof) {
             return dispatcher.ms_handle_remote_reset(
                 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
               .then([this] {
-                (void) close();
+                close(false);
               });
           } else {
             throw e;
@@ -918,7 +915,7 @@ void ProtocolV1::execute_open()
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the open state
           logger().warn("{} open fault: {}", conn, eptr);
-          (void) close();
+          close(false);
         });
     });
 }
index befbaa5862e4516e5cf90c80876df41ff037eb1e..8a42e89d01b2595885b9692a9fb50e226c85f3fc 100644 (file)
@@ -62,8 +62,8 @@ void abort_protocol() {
   throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
 }
 
-void abort_in_close(crimson::net::ProtocolV2& proto) {
-  (void) proto.close();
+void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
+  proto.close(dispatch_reset);
   abort_protocol();
 }
 
@@ -413,8 +413,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
   if (conn.policy.lossy) {
     logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
                   conn, func_name, get_state_name(state), eptr);
-    dispatch_reset();
-    (void) close();
+    close(true);
   } else if (conn.policy.server ||
              (conn.policy.standby &&
               (!is_queued() && conn.sent.empty()))) {
@@ -432,17 +431,6 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
   }
 }
 
-void ProtocolV2::dispatch_reset()
-{
-  (void) seastar::with_gate(pending_dispatch, [this] {
-    return dispatcher.ms_handle_reset(
-        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  }).handle_exception([this] (std::exception_ptr eptr) {
-    logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
-    ceph_abort("unexpected exception from ms_handle_reset()");
-  });
-}
-
 void ProtocolV2::reset_session(bool full)
 {
   server_cookie = 0;
@@ -538,13 +526,13 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
         logger().error("{} peer does not support all required features"
                        " required={} peer_supported={}",
                        conn, required_features, peer_supported_features);
-        abort_in_close(*this);
+        abort_in_close(*this, false);
       }
       if ((supported_features & peer_required_features) != peer_required_features) {
         logger().error("{} we do not support all peer required features"
                        " peer_required={} supported={}",
                        conn, peer_required_features, supported_features);
-        abort_in_close(*this);
+        abort_in_close(*this, false);
       }
       this->peer_required_features = peer_required_features;
       if (this->peer_required_features == 0) {
@@ -668,8 +656,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
     });
   } catch (const crimson::auth::error& e) {
     logger().error("{} get_initial_auth_request returned {}", conn, e);
-    dispatch_reset();
-    abort_in_close(*this);
+    abort_in_close(*this, true);
     return seastar::now();
   }
 }
@@ -915,8 +902,7 @@ void ProtocolV2::execute_connecting()
             logger().warn("{} connection peer type does not match what peer advertises {} != {}",
                           conn, ceph_entity_type_name(conn.get_peer_type()),
                           ceph_entity_type_name(_peer_type));
-            dispatch_reset();
-            abort_in_close(*this);
+            abort_in_close(*this, true);
           }
           conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
                                   SocketConnection::side_t::connector);
@@ -1128,7 +1114,7 @@ ProtocolV2::reuse_connection(
   // close this connection because all the necessary information is delivered
   // to the exisiting connection, and jump to error handling code to abort the
   // current state.
-  abort_in_close(*this);
+  abort_in_close(*this, false);
   return seastar::make_ready_future<next_step_t>(next_step_t::none);
 }
 
@@ -1172,8 +1158,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     logger().warn("{} server_connect:"
                   " existing connection {} is a lossy channel. Close existing in favor of"
                   " this connection", conn, *existing_conn);
-    existing_proto->dispatch_reset();
-    (void) existing_proto->close();
+    existing_proto->close(true);
 
     if (unlikely(state != state_t::ACCEPTING)) {
       logger().debug("{} triggered {} in execute_accepting()",
@@ -1573,7 +1558,7 @@ void ProtocolV2::execute_accepting()
         }).handle_exception([this] (std::exception_ptr eptr) {
           logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
                         conn, get_state_name(state), eptr);
-          (void) close();
+          close(false);
         });
     });
 }
@@ -2116,7 +2101,7 @@ void ProtocolV2::execute_server_wait()
     }).handle_exception([this] (std::exception_ptr eptr) {
       logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
                     conn, get_state_name(state), eptr);
-      (void) close();
+      close(false);
     });
   });
 }
@@ -2141,11 +2126,6 @@ void ProtocolV2::trigger_close()
   protocol_timer.cancel();
 
   trigger_state(state_t::CLOSING, write_state_t::drop, false);
-#ifdef UNIT_TESTS_BUILT
-  if (conn.interceptor) {
-    conn.interceptor->register_conn_closed(conn);
-  }
-#endif
 }
 
 } // namespace crimson::net
index 0e8f2ff90e823f2d8f73ba96bec5034ac7499297..54db2722d752b032ab2c3615f403e2bcbbdedc4d 100644 (file)
@@ -124,7 +124,6 @@ class ProtocolV2 final : public Protocol {
 
  private:
   void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
-  void dispatch_reset();
   void reset_session(bool full);
   seastar::future<entity_type_t, entity_addr_t> banner_exchange();
 
index 4a73034e922628f29c92423be9e7a312cf49cd9b..5bf2c30c4a8614a11eaaf157175e21af4af947c9 100644 (file)
@@ -63,6 +63,12 @@ bool SocketConnection::is_closed() const
   return protocol->is_closed();
 }
 
+bool SocketConnection::is_closed_clean() const
+{
+  assert(seastar::engine().cpu_id() == shard_id());
+  return protocol->is_closed_clean;
+}
+
 #endif
 bool SocketConnection::peer_wins() const
 {
@@ -84,7 +90,7 @@ seastar::future<> SocketConnection::keepalive()
 seastar::future<> SocketConnection::close()
 {
   assert(seastar::engine().cpu_id() == shard_id());
-  return protocol->close();
+  return protocol->close_clean(false);
 }
 
 bool SocketConnection::update_rx_seq(seq_num_t seq)
index 503d4e55fb049819b0902715ba6d50b348f2d930..de814c9418b413953854068d89086f45523c1e06 100644 (file)
@@ -77,6 +77,8 @@ class SocketConnection : public Connection {
   bool is_connected() const override;
 
 #ifdef UNIT_TESTS_BUILT
+  bool is_closed_clean() const override;
+
   bool is_closed() const override;
 
   bool peer_wins() const override;
index 68d6da744dd0684f9fa3e10ddb13acfe717268a2..cc8e96856075d7da6c75d55ee9fdd43a138214f2 100644 (file)
@@ -932,7 +932,7 @@ class FailoverSuite : public Dispatcher {
     unsigned pending_establish = 0;
     unsigned replaced_conns = 0;
     for (auto& result : interceptor.results) {
-      if (result.conn->is_closed()) {
+      if (result.conn->is_closed_clean()) {
         if (result.state == conn_state_t::replaced) {
           ++replaced_conns;
         }