]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: add aborts when the state is inconsistent
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 25 Nov 2022 01:51:50 +0000 (09:51 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
To prevent unexpected event dispatching and state transitions.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc

index b4097c3f844683cbdad4dfcdfc6a851d63a11663..7028e158c25ee948dff219e13b355f696c8dde93 100644 (file)
@@ -259,7 +259,12 @@ void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reen
   if (!reentrant && _state == state) {
     logger().error("{} is not allowed to re-trigger state {}",
                    conn, get_state_name(state));
-    ceph_assert(false);
+    ceph_abort();
+  }
+  if (state == state_t::CLOSING) {
+    logger().error("{} CLOSING is not allowed to trigger state {}",
+                   conn, get_state_name(_state));
+    ceph_abort();
   }
   logger().debug("{} TRIGGER {}, was {}",
                  conn, get_state_name(_state), get_state_name(state));
@@ -687,6 +692,11 @@ ProtocolV2::client_reconnect()
       case Tag::SESSION_RESET:
         return frame_assembler.read_frame_payload(
         ).then([this](auto payload) {
+          if (unlikely(state != state_t::CONNECTING)) {
+            logger().debug("{} triggered {} before reset_session()",
+                           conn, get_state_name(state));
+            abort_protocol();
+          }
           // handle_session_reset() logic
           auto reset = ResetFrame::Decode(payload->back());
           logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
@@ -967,6 +977,12 @@ ProtocolV2::reuse_connection(
     ProtocolV2* existing_proto, bool do_reset,
     bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
 {
+  if (unlikely(state != state_t::ACCEPTING)) {
+    logger().debug("{} triggered {} before trigger_replacing()",
+                   conn, get_state_name(state));
+    abort_protocol();
+  }
+
   existing_proto->trigger_replacing(reconnect,
                                     do_reset,
                                     frame_assembler.to_replace(),
@@ -1036,6 +1052,11 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     logger().warn("{} server_connect:"
                   " existing connection {} is a lossy channel. Close existing in favor of"
                   " this connection", conn, *existing_conn);
+    if (unlikely(state != state_t::ACCEPTING)) {
+      logger().debug("{} triggered {} before execute_establishing()",
+                     conn, get_state_name(state));
+      abort_protocol();
+    }
     execute_establishing(existing_conn);
     return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   }
@@ -1166,6 +1187,11 @@ ProtocolV2::server_connect()
     if (existing_conn) {
       return handle_existing_connection(existing_conn);
     } else {
+      if (unlikely(state != state_t::ACCEPTING)) {
+        logger().debug("{} triggered {} before execute_establishing()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
       execute_establishing(nullptr);
       return seastar::make_ready_future<next_step_t>(next_step_t::ready);
     }
@@ -1457,12 +1483,6 @@ seastar::future<> ProtocolV2::finish_auth()
 // ESTABLISHING
 
 void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
-  if (unlikely(state != state_t::ACCEPTING)) {
-    logger().debug("{} triggered {} before execute_establishing()",
-                   conn, get_state_name(state));
-    abort_protocol();
-  }
-
   auto accept_me = [this] {
     messenger.register_conn(
       seastar::static_pointer_cast<SocketConnection>(
@@ -1488,6 +1508,11 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
 
   dispatchers.ms_handle_accept(
       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  if (unlikely(state != state_t::ESTABLISHING)) {
+    logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
+                   conn, get_state_name(state));
+    abort_protocol();
+  }
 
   gated_execute("execute_establishing", [this] {
     return seastar::futurize_invoke([this] {
@@ -1726,6 +1751,12 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t m
 {
   return frame_assembler.read_frame_payload(
   ).then([this, throttle_stamp, msg_size](auto payload) {
+    if (unlikely(state != state_t::READY)) {
+      logger().debug("{} triggered {} during read_message()",
+                     conn, get_state_name(state));
+      abort_protocol();
+    }
+
     utime_t recv_stamp{seastar::lowres_system_clock::now()};
 
     // we need to get the size before std::moving segments data
@@ -1803,6 +1834,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t m
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
+    assert(state == state_t::READY);
     // throttle the reading process by the returned future
     return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
   });
@@ -1814,7 +1846,12 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
   trigger_state(state_t::READY, out_state_t::open, false);
   if (dispatch_connect) {
     dispatchers.ms_handle_connect(
-       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+      seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+    if (unlikely(state != state_t::READY)) {
+      logger().debug("{} triggered {} after ms_handle_connect() during execute_ready()",
+                     conn, get_state_name(state));
+      abort_protocol();
+    }
   }
 #ifdef UNIT_TESTS_BUILT
   if (conn.interceptor) {
@@ -2000,6 +2037,7 @@ void ProtocolV2::do_close(
 {
   if (closed) {
     // already closing
+    assert(state == state_t::CLOSING);
     return;
   }