]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: refactor socket managements
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 25 Nov 2022 02:08:52 +0000 (10:08 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Previously, the socket state is transparent to the protocol and
implicitly handled. Move the responsibilities into protocol for finer
controls to further decouple the IO and handshake.

Also, refactor the fault handling and make the in/out message
dispatching more symmetric.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/Socket.cc
src/crimson/net/Socket.h

index cf41989746070b7c4f17895f4f27b79892f2bef7..034cf8ed4ca3a2228fa54d75671fa3a3a630fc58 100644 (file)
@@ -50,7 +50,7 @@ void FrameAssemblerV2::reset_handlers()
 FrameAssemblerV2::mover_t
 FrameAssemblerV2::to_replace()
 {
-  assert(has_socket());
+  assert(is_socket_valid());
   socket = nullptr;
   return mover_t{
       std::move(conn.socket),
@@ -58,14 +58,14 @@ FrameAssemblerV2::to_replace()
       std::move(session_comp_handlers)};
 }
 
-void FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
+seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
 {
-  set_socket(std::move(mover.socket));
   record_io = false;
   rxbuf.clear();
   txbuf.clear();
   session_stream_handlers = std::move(mover.session_stream_handlers);
   session_comp_handlers = std::move(mover.session_comp_handlers);
+  return replace_shutdown_socket(std::move(mover.socket));
 }
 
 void FrameAssemblerV2::start_recording()
@@ -89,13 +89,17 @@ bool FrameAssemblerV2::has_socket() const
   return socket != nullptr;
 }
 
-void FrameAssemblerV2::set_socket(SocketRef &&_socket)
+bool FrameAssemblerV2::is_socket_valid() const
+{
+  return has_socket() && !socket->is_shutdown();
+}
+
+void FrameAssemblerV2::set_socket(SocketRef &&new_socket)
 {
   assert(!has_socket());
-  ceph_assert_always(!conn.socket);
-  socket = _socket.get();
-  conn.socket = std::move(_socket);
-  assert(has_socket());
+  socket = new_socket.get();
+  conn.socket = std::move(new_socket);
+  assert(is_socket_valid());
 }
 
 void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
@@ -106,23 +110,26 @@ void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
 
 void FrameAssemblerV2::shutdown_socket()
 {
-  if (has_socket()) {
-    socket->shutdown();
-  }
+  assert(is_socket_valid());
+  socket->shutdown();
 }
 
-seastar::future<> FrameAssemblerV2::reset_and_close_socket(bool do_reset)
+seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketRef &&new_socket)
 {
-  if (!has_socket()) {
-    return seastar::now();
-  }
-  if (do_reset) {
-    socket = nullptr;
-    return conn.socket->close(
-    ).then([sock = std::move(conn.socket)] {});
-  } else {
-    return socket->close();
-  }
+  assert(has_socket());
+  assert(socket->is_shutdown());
+  socket = nullptr;
+  auto old_socket = std::move(conn.socket);
+  set_socket(std::move(new_socket));
+  return old_socket->close(
+  ).then([sock = std::move(old_socket)] {});
+}
+
+seastar::future<> FrameAssemblerV2::close_shutdown_socket()
+{
+  assert(has_socket());
+  assert(socket->is_shutdown());
+  return socket->close();
 }
 
 seastar::future<Socket::tmp_buf>
index f72eeeeae4588c91e923f1f3b9615fd021fd4076..b3ee3e03a3cb6dba4e6af7c4598b1adf5a12bcba 100644 (file)
@@ -43,7 +43,7 @@ public:
 
   mover_t to_replace();
 
-  void replace_by(mover_t &&);
+  seastar::future<> replace_by(mover_t &&);
 
   /*
    * auth signature interfaces
@@ -61,15 +61,15 @@ public:
    * socket maintainence interfaces
    */
 
-  bool has_socket() const;
-
   void set_socket(SocketRef &&);
 
   void learn_socket_ephemeral_port_as_connector(uint16_t port);
 
   void shutdown_socket();
 
-  seastar::future<> reset_and_close_socket(bool do_reset=true);
+  seastar::future<> replace_shutdown_socket(SocketRef &&);
+
+  seastar::future<> close_shutdown_socket();
 
   /*
    * socket read and write interfaces
@@ -115,6 +115,10 @@ public:
   }
 
 private:
+  bool has_socket() const;
+
+  bool is_socket_valid() const;
+
   void log_main_preamble(const ceph::bufferlist &bl);
 
   SocketConnection &conn;
index f8bb3b7e932ae1295f31271c3208515c44e5c67c..58edd882561d3cbee8fa939b9d2c24d6eb2ba21b 100644 (file)
@@ -242,8 +242,15 @@ seastar::future<> Protocol::do_out_dispatch()
                      conn, out_state, e);
       ceph_abort();
     }
-    ceph_assert_always(frame_assembler.has_socket());
-    frame_assembler.shutdown_socket();
+
+    std::exception_ptr eptr;
+    try {
+      throw e;
+    } catch(...) {
+      eptr = std::current_exception();
+    }
+    notify_out_fault(eptr);
+
     if (out_state == out_state_t::open) {
       logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
                     conn, out_state, e);
index c352683dcb79c03246016950cf51d61bb7d43296..c7bc71630b9574af5f957665fc62a5948d8ebb76 100644 (file)
@@ -54,6 +54,8 @@ class Protocol {
 
   virtual void notify_out() = 0;
 
+  virtual void notify_out_fault(std::exception_ptr) = 0;
+
 // the write state-machine
  public:
   using clock_t = seastar::lowres_system_clock;
index 083978dd96ff4eb640f8c0c794f80958dc3bc6c4..8514da381e8840e15888f8644fda175bec4526a0 100644 (file)
@@ -193,7 +193,6 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
                                const entity_name_t& _peer_name)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!frame_assembler.has_socket());
   ceph_assert(!gate.is_closed());
   conn.peer_addr = _peer_addr;
   conn.target_addr = _peer_addr;
@@ -210,14 +209,15 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
   execute_connecting();
 }
 
-void ProtocolV2::start_accept(SocketRef&& sock,
+void ProtocolV2::start_accept(SocketRef&& new_socket,
                               const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!frame_assembler.has_socket());
   // until we know better
   conn.target_addr = _peer_addr;
-  frame_assembler.set_socket(std::move(sock));
+  frame_assembler.set_socket(std::move(new_socket));
+  has_socket = true;
+  is_socket_valid = true;
   logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
   messenger.accept_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
@@ -272,24 +272,103 @@ void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reen
   set_out_state(_out_state);
 }
 
-void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
+void ProtocolV2::fault(
+    state_t expected_state,
+    const char *where,
+    std::exception_ptr eptr)
 {
-  if (conn.policy.lossy) {
-    logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
-                  conn, func_name, get_state_name(state), eptr);
+  assert(expected_state == state_t::CONNECTING ||
+         expected_state == state_t::ESTABLISHING ||
+         expected_state == state_t::REPLACING ||
+         expected_state == state_t::READY);
+  const char *e_what;
+  try {
+    std::rethrow_exception(eptr);
+  } catch (std::exception &e) {
+    e_what = e.what();
+  }
+
+  if (state != expected_state) {
+    logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
+                  conn,
+                  get_state_name(expected_state),
+                  where,
+                  get_state_name(state),
+                  e_what);
+#ifndef NDEBUG
+    if (expected_state == state_t::REPLACING) {
+      assert(state == state_t::CLOSING);
+    } else if (expected_state == state_t::READY) {
+      assert(state == state_t::CLOSING ||
+             state == state_t::REPLACING ||
+             state == state_t::CONNECTING ||
+             state == state_t::STANDBY);
+    } else {
+      assert(state == state_t::CLOSING ||
+             state == state_t::REPLACING);
+    }
+#endif
+    return;
+  }
+  assert(state == expected_state);
+
+  if (state != state_t::CONNECTING && conn.policy.lossy) {
+    // socket will be shutdown in do_close()
+    logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
+                  conn, get_state_name(state), where, e_what);
     do_close(true);
-  } else if (conn.policy.server ||
-             (conn.policy.standby && !is_out_queued_or_sent())) {
-    logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
-                  conn, func_name, get_state_name(state), eptr);
+    return;
+  }
+
+  if (likely(has_socket)) {
+    if (likely(is_socket_valid)) {
+      frame_assembler.shutdown_socket();
+      is_socket_valid = false;
+    } else {
+      ceph_assert_always(state == state_t::CONNECTING ||
+                         state == state_t::REPLACING);
+    }
+  } else { // !has_socket
+    ceph_assert_always(state == state_t::CONNECTING);
+    assert(!is_socket_valid);
+  }
+
+  if (conn.policy.server ||
+      (conn.policy.standby && !is_out_queued_or_sent())) {
+    if (conn.policy.server) {
+      logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
+                    conn,
+                    get_state_name(state),
+                    where,
+                    io_stat_printer{*this},
+                    e_what);
+    } else {
+      logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
+                    conn,
+                    get_state_name(state),
+                    where,
+                    io_stat_printer{*this},
+                    e_what);
+    }
     execute_standby();
-  } else if (backoff) {
-    logger().info("{} {}: fault at {}, going to WAIT -- {}",
-                  conn, func_name, get_state_name(state), eptr);
+  } else if (state == state_t::CONNECTING ||
+             state == state_t::REPLACING) {
+    logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
+                  conn,
+                  get_state_name(state),
+                  where,
+                  io_stat_printer{*this},
+                  e_what);
     execute_wait(false);
   } else {
-    logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
-                  conn, func_name, get_state_name(state), eptr);
+    assert(state == state_t::READY ||
+           state == state_t::ESTABLISHING);
+    logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
+                  conn,
+                  get_state_name(state),
+                  where,
+                  io_stat_printer{*this},
+                  e_what);
     execute_connecting();
   }
 }
@@ -725,8 +804,8 @@ ProtocolV2::client_reconnect()
 
 void ProtocolV2::execute_connecting()
 {
+  ceph_assert_always(!is_socket_valid);
   trigger_state(state_t::CONNECTING, out_state_t::delay, false);
-  frame_assembler.shutdown_socket();
   gated_execute("execute_connecting", [this] {
       global_seq = messenger.get_global_seq();
       assert(client_cookie != 0);
@@ -739,28 +818,39 @@ void ProtocolV2::execute_connecting()
         assert(server_cookie == 0);
         logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
       }
-      return wait_out_exit_dispatching().then([this] {
+      return seastar::when_all(
+        wait_out_exit_dispatching(),
+        wait_in_exit_dispatching()
+      ).discard_result().then([this] {
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} before Socket::connect()",
                            conn, get_state_name(state));
             abort_protocol();
           }
-          gate.dispatch_in_background(
-              "reset_close_socket_connecting",
-              *this,
-              [this] { return frame_assembler.reset_and_close_socket(); });
           INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
           return Socket::connect(conn.peer_addr);
-        }).then([this](SocketRef sock) {
+        }).then([this](SocketRef new_socket) {
           logger().debug("{} socket connected", conn);
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} during Socket::connect()",
                            conn, get_state_name(state));
-            return sock->close().then([sock = std::move(sock)] {
+            return new_socket->close().then([sock=std::move(new_socket)] {
               abort_protocol();
             });
           }
-          frame_assembler.set_socket(std::move(sock));
+          if (!has_socket) {
+            frame_assembler.set_socket(std::move(new_socket));
+            has_socket = true;
+          } else {
+            gate.dispatch_in_background(
+              "replace_socket_connecting",
+              *this,
+              [this, new_socket=std::move(new_socket)]() mutable {
+                return frame_assembler.replace_shutdown_socket(std::move(new_socket));
+              }
+            );
+          }
+          is_socket_valid = true;
           return seastar::now();
         }).then([this] {
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
@@ -817,6 +907,9 @@ void ProtocolV2::execute_connecting()
            }
            case next_step_t::wait: {
             logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+            ceph_assert_always(is_socket_valid);
+            frame_assembler.shutdown_socket();
+            is_socket_valid = false;
             execute_wait(true);
             break;
            }
@@ -824,26 +917,8 @@ void ProtocolV2::execute_connecting()
             ceph_abort("impossible next step");
            }
           }
-        }).handle_exception([this] (std::exception_ptr eptr) {
-          if (state != state_t::CONNECTING) {
-            logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
-                          conn, get_state_name(state), eptr);
-            assert(state == state_t::CLOSING ||
-                   state == state_t::REPLACING);
-            return;
-          }
-
-          if (conn.policy.server ||
-              (conn.policy.standby && !is_out_queued_or_sent())) {
-            logger().info("{} execute_connecting(): fault at {} with nothing to send,"
-                          " going to STANDBY -- {}",
-                          conn, get_state_name(state), eptr);
-            execute_standby();
-          } else {
-            logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
-                          conn, get_state_name(state), eptr);
-            execute_wait(false);
-          }
+        }).handle_exception([this](std::exception_ptr eptr) {
+          fault(state_t::CONNECTING, "execute_connecting", eptr);
         });
     });
 }
@@ -994,6 +1069,9 @@ ProtocolV2::reuse_connection(
                                     peer_supported_features,
                                     conn_seq,
                                     msg_seq);
+  ceph_assert_always(has_socket && is_socket_valid);
+  is_socket_valid = false;
+  has_socket = false;
 #ifdef UNIT_TESTS_BUILT
   if (conn.interceptor) {
     conn.interceptor->register_conn_replaced(conn);
@@ -1402,6 +1480,7 @@ ProtocolV2::server_reconnect()
 
 void ProtocolV2::execute_accepting()
 {
+  assert(is_socket_valid);
   trigger_state(state_t::ACCEPTING, out_state_t::none, false);
   gate.dispatch_in_background("execute_accepting", *this, [this] {
       return seastar::futurize_invoke([this] {
@@ -1513,6 +1592,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
         conn.shared_from_this()));
   };
 
+  ceph_assert_always(is_socket_valid);
   trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
   if (existing_conn) {
     static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
@@ -1550,15 +1630,8 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
                     client_cookie, server_cookie,
                     io_stat_printer{*this});
       execute_ready(false);
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      if (state != state_t::ESTABLISHING) {
-        logger().info("{} execute_establishing() protocol aborted at {} -- {}",
-                      conn, get_state_name(state), eptr);
-        assert(state == state_t::CLOSING ||
-               state == state_t::REPLACING);
-        return;
-      }
-      fault(false, "execute_establishing()", eptr);
+    }).handle_exception([this](std::exception_ptr eptr) {
+      fault(state_t::ESTABLISHING, "execute_establishing", eptr);
     });
   });
 }
@@ -1622,7 +1695,12 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_msg_seq)
 {
   trigger_state(state_t::REPLACING, out_state_t::delay, false);
-  frame_assembler.shutdown_socket();
+  ceph_assert_always(has_socket);
+  ceph_assert_always(!mover.socket->is_shutdown());
+  if (is_socket_valid) {
+    frame_assembler.shutdown_socket();
+    is_socket_valid = false;
+  }
   gate.dispatch_in_background("trigger_replacing", *this,
                  [this,
                   reconnect,
@@ -1637,8 +1715,10 @@ void ProtocolV2::trigger_replacing(bool reconnect,
     dispatchers.ms_handle_accept(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
     // state may become CLOSING, close mover.socket and abort later
-    return wait_out_exit_dispatching(
-    ).then([this] {
+    return seastar::when_all(
+      wait_out_exit_dispatching(),
+      wait_in_exit_dispatching()
+    ).discard_result().then([this] {
       protocol_timer.cancel();
       auto done = std::move(execution_done);
       execution_done = seastar::now();
@@ -1663,13 +1743,16 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         });
       }
 
-      gate.dispatch_in_background(
-          "reset_close_socket_replacing",
-          *this,
-          [this] { return frame_assembler.reset_and_close_socket(); });
       auth_meta = std::move(new_auth_meta);
       peer_global_seq = new_peer_global_seq;
-      frame_assembler.replace_by(std::move(mover));
+      gate.dispatch_in_background(
+        "replace_frame_assembler",
+        *this,
+        [this, mover=std::move(mover)]() mutable {
+          return frame_assembler.replace_by(std::move(mover));
+        }
+      );
+      is_socket_valid = true;
 
       if (reconnect) {
         connect_seq = new_connect_seq;
@@ -1703,14 +1786,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                     client_cookie, server_cookie,
                     io_stat_printer{*this});
       execute_ready(false);
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      if (state != state_t::REPLACING) {
-        logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
-                      conn, get_state_name(state), eptr);
-        assert(state == state_t::CLOSING);
-        return;
-      }
-      fault(true, "trigger_replacing()", eptr);
+    }).handle_exception([this](std::exception_ptr eptr) {
+      fault(state_t::REPLACING, "trigger_replacing", eptr);
     });
   });
 }
@@ -1772,6 +1849,11 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
   return bl;
 }
 
+void ProtocolV2::notify_out_fault(std::exception_ptr eptr)
+{
+  fault(state_t::READY, "notify_out_fault", eptr);
+}
+
 seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size)
 {
   return frame_assembler.read_frame_payload(
@@ -1879,6 +1961,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t m
 
 void ProtocolV2::execute_ready(bool dispatch_connect)
 {
+  ceph_assert_always(is_socket_valid);
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   trigger_state(state_t::READY, out_state_t::open, false);
   if (dispatch_connect) {
@@ -1895,7 +1978,9 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
     conn.interceptor->register_conn_ready(conn);
   }
 #endif
-  gated_execute("execute_ready", [this] {
+  ceph_assert_always(!in_exit_dispatching.has_value());
+  in_exit_dispatching = seastar::shared_promise<>();
+  gate.dispatch_in_background("execute_ready", *this, [this] {
     protocol_timer.cancel();
     return seastar::keep_doing([this] {
       return read_main_preamble(
@@ -1965,15 +2050,12 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
           }
         }
       });
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      if (state != state_t::READY) {
-        logger().info("{} execute_ready(): protocol aborted at {} -- {}",
-                      conn, get_state_name(state), eptr);
-        assert(state == state_t::REPLACING ||
-               state == state_t::CLOSING);
-        return;
-      }
-      fault(false, "execute_ready()", eptr);
+    }).handle_exception([this](std::exception_ptr eptr) {
+      fault(state_t::READY, "execute_ready", eptr);
+    }).finally([this] {
+      ceph_assert_always(in_exit_dispatching.has_value());
+      in_exit_dispatching->set_value();
+      in_exit_dispatching = std::nullopt;
     });
   });
 }
@@ -1982,8 +2064,8 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
 
 void ProtocolV2::execute_standby()
 {
+  ceph_assert_always(!is_socket_valid);
   trigger_state(state_t::STANDBY, out_state_t::delay, false);
-  frame_assembler.shutdown_socket();
 }
 
 void ProtocolV2::notify_out()
@@ -1999,8 +2081,8 @@ void ProtocolV2::notify_out()
 
 void ProtocolV2::execute_wait(bool max_backoff)
 {
+  ceph_assert_always(!is_socket_valid);
   trigger_state(state_t::WAIT, out_state_t::delay, false);
-  frame_assembler.shutdown_socket();
   gated_execute("execute_wait", [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
@@ -2031,6 +2113,7 @@ void ProtocolV2::execute_wait(bool max_backoff)
 
 void ProtocolV2::execute_server_wait()
 {
+  ceph_assert_always(is_socket_valid);
   trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
   gated_execute("execute_server_wait", [this] {
     return frame_assembler.read_exactly(1).then([this] (auto bl) {
@@ -2111,7 +2194,10 @@ void ProtocolV2::do_close(
   if (f_accept_new) {
     (*f_accept_new)();
   }
-  frame_assembler.shutdown_socket();
+  if (is_socket_valid) {
+    frame_assembler.shutdown_socket();
+    is_socket_valid = false;
+  }
   assert(!gate.is_closed());
   auto gate_closed = gate.close();
   auto out_closed = close_out();
@@ -2127,7 +2213,11 @@ void ProtocolV2::do_close(
   closed_clean_fut = seastar::when_all(
       std::move(gate_closed), std::move(out_closed)
   ).discard_result().then([this] {
-    return frame_assembler.reset_and_close_socket(false);
+    if (has_socket) {
+      return frame_assembler.close_shutdown_socket();
+    } else {
+      return seastar::now();
+    }
   }).then([this] {
     logger().debug("{} closed!", conn);
     messenger.closed_conn(
index d9216570b7ace11b846b863582e005399deec843..b75c008f0d5e11bd566dc77474fb1ee8b6bd09cf 100644 (file)
@@ -54,9 +54,16 @@ class ProtocolV2 final : public Protocol {
 
   void notify_out() override;
 
+  void notify_out_fault(std::exception_ptr) override;
+
  private:
   SocketMessenger &messenger;
 
+  bool has_socket = false;
+
+  // the socket exists and it is not shutdown
+  bool is_socket_valid = false;
+
   AuthConnectionMetaRef auth_meta;
 
   crimson::common::Gated gate;
@@ -108,6 +115,14 @@ class ProtocolV2 final : public Protocol {
   uint64_t peer_global_seq = 0;
   uint64_t connect_seq = 0;
 
+  std::optional<seastar::shared_promise<>> in_exit_dispatching;
+  seastar::future<> wait_in_exit_dispatching() {
+    if (in_exit_dispatching.has_value()) {
+      return in_exit_dispatching->get_shared_future();
+    }
+    return seastar::now();
+  }
+
   seastar::future<> execution_done = seastar::now();
 
   template <typename Func>
@@ -159,7 +174,9 @@ class ProtocolV2 final : public Protocol {
   template <class F>
   seastar::future<> write_flush_frame(F &tx_frame);
 
-  void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
+  void fault(state_t expected_state,
+             const char *where,
+             std::exception_ptr eptr);
 
   void reset_session(bool is_full);
   seastar::future<std::tuple<entity_type_t, entity_addr_t>>
index d58fb3987b3562cf223873491844bb1c882d98dc..304f4dc16be824d48a939235b1906ad86b93ca94 100644 (file)
@@ -119,6 +119,7 @@ Socket::read_exactly(size_t bytes) {
 }
 
 void Socket::shutdown() {
+  socket_is_shutdown = true;
   socket.shutdown_input();
   socket.shutdown_output();
 }
@@ -210,7 +211,7 @@ seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
     break;
    case bp_action_t::STALL:
     logger().info("[Test] got STALL and block");
-    shutdown();
+    force_shutdown();
     return blocker->block();
    default:
     ceph_abort("unexpected action from trap");
index a533d3180b2daaa68e876981a0633d83a5ae5677..b6125eb8a02a03a60fd39b0d79632e3cdc2f7d49 100644 (file)
@@ -42,6 +42,7 @@ class Socket
       // the default buffer size 8192 is too small that may impact our write
       // performance. see seastar::net::connected_socket::output()
       out(socket.output(65536)),
+      socket_is_shutdown(false),
       side(_side),
       ephemeral_port(e_port) {}
 
@@ -109,6 +110,10 @@ class Socket
 #endif
   }
 
+  bool is_shutdown() const {
+    return socket_is_shutdown;
+  }
+
   // preemptively disable further reads or writes, can only be shutdown once.
   void shutdown();
 
@@ -119,6 +124,12 @@ class Socket
 
   static void inject_failure();
 
+  // shutdown for tests
+  void force_shutdown() {
+    socket.shutdown_input();
+    socket.shutdown_output();
+  }
+
   // shutdown input_stream only, for tests
   void force_shutdown_in() {
     socket.shutdown_input();
@@ -155,6 +166,7 @@ class Socket
   seastar::connected_socket socket;
   seastar::input_stream<char> in;
   seastar::output_stream<char> out;
+  bool socket_is_shutdown;
   side_t side;
   uint16_t ephemeral_port;