]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: move mark_down() from ProtocolV2 to Protocol
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 6 Dec 2022 01:15:28 +0000 (09:15 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Process mark_down in Protocol rather than in ProtocolV2 to prevent
further event dispatching after mark_down is called by user. Then notify
ProtocolV2 as the IO/socket core and handshake core can be different and
the notification can be asynchronous.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc

index 12985bc7e001f0ce260340a006276d5fec4ca5a3..b38f72539b5a3032a43afe0590e0201e593b727f 100644 (file)
@@ -138,6 +138,34 @@ seastar::future<> Protocol::send_keepalive()
   return seastar::now();
 }
 
+void Protocol::mark_down()
+{
+  ceph_assert_always(out_state != out_state_t::none);
+  need_dispatch_reset = false;
+  if (out_state == out_state_t::drop) {
+    return;
+  }
+
+  logger().info("{} mark_down() with {}",
+                conn, io_stat_printer{*this});
+  set_out_state(out_state_t::drop);
+  notify_mark_down();
+}
+
+void Protocol::print_io_stat(std::ostream &out) const
+{
+  out << "io_stat("
+      << "out_state=" << fmt::format("{}", out_state)
+      << ", in_seq=" << in_seq
+      << ", out_seq=" << out_seq
+      << ", out_pending_msgs_size=" << out_pending_msgs.size()
+      << ", out_sent_msgs_size=" << out_sent_msgs.size()
+      << ", need_ack=" << (ack_left > 0)
+      << ", need_keepalive=" << need_keepalive
+      << ", need_keepalive_ack=" << bool(next_keepalive_ack)
+      << ")";
+}
+
 void Protocol::set_out_state(
     const Protocol::out_state_t &new_state,
     FrameAssemblerV2Ref fa)
@@ -274,6 +302,9 @@ void Protocol::reset_out()
 
 void Protocol::dispatch_accept()
 {
+  if (out_state == out_state_t::drop) {
+    return;
+  }
   // protocol_is_connected can be from true to true here if the replacing is
   // happening to a connected connection.
   protocol_is_connected = true;
@@ -283,6 +314,9 @@ void Protocol::dispatch_accept()
 
 void Protocol::dispatch_connect()
 {
+  if (out_state == out_state_t::drop) {
+    return;
+  }
   ceph_assert_always(protocol_is_connected == false);
   protocol_is_connected = true;
   dispatchers.ms_handle_connect(
@@ -291,6 +325,11 @@ void Protocol::dispatch_connect()
 
 void Protocol::dispatch_reset(bool is_replace)
 {
+  ceph_assert_always(out_state == out_state_t::drop);
+  if (!need_dispatch_reset) {
+    return;
+  }
+  need_dispatch_reset = false;
   dispatchers.ms_handle_reset(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
     is_replace);
@@ -298,6 +337,9 @@ void Protocol::dispatch_reset(bool is_replace)
 
 void Protocol::dispatch_remote_reset()
 {
+  if (out_state == out_state_t::drop) {
+    return;
+  }
   dispatchers.ms_handle_remote_reset(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
 }
index 4446e1781fbb1eeb4737f9ccaeadcd58494942d9..e6e6c956ea7e80cd10ce2db17146d8e98c3e2aed 100644 (file)
@@ -21,8 +21,6 @@ class Protocol {
   Protocol(Protocol&&) = delete;
   virtual ~Protocol();
 
-  virtual void close() = 0;
-
   virtual seastar::future<> close_clean_yielded() = 0;
 
 #ifdef UNIT_TESTS_BUILT
@@ -47,6 +45,8 @@ class Protocol {
 
   virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
 
+  virtual void notify_mark_down() = 0;
+
 // the write state-machine
  public:
   using clock_t = seastar::lowres_system_clock;
@@ -71,24 +71,17 @@ class Protocol {
     last_keepalive_ack = when;
   }
 
+  void mark_down();
+
   struct io_stat_printer {
     const Protocol &protocol;
   };
-  void print_io_stat(std::ostream &out) const {
-    out << "io_stat("
-        << "in_seq=" << in_seq
-        << ", out_seq=" << out_seq
-        << ", out_pending_msgs_size=" << out_pending_msgs.size()
-        << ", out_sent_msgs_size=" << out_sent_msgs.size()
-        << ", need_ack=" << (ack_left > 0)
-        << ", need_keepalive=" << need_keepalive
-        << ", need_keepalive_ack=" << bool(next_keepalive_ack)
-        << ")";
-  }
+  void print_io_stat(std::ostream &out) const;
 
 // TODO: encapsulate a SessionedSender class
  protected:
   seastar::future<> close_out() {
+    ceph_assert_always(out_state == out_state_t::drop);
     assert(!gate.is_closed());
     return gate.close();
   }
@@ -172,6 +165,8 @@ class Protocol {
 
   bool protocol_is_connected = false;
 
+  bool need_dispatch_reset = true;
+
   /*
    * out states for writing
    */
index 76e21de048463e2a3fe260b464f6dc3f94e86bc0..13be01e32ded9021647114c340aac9f441b5d275 100644 (file)
@@ -1862,7 +1862,7 @@ void ProtocolV2::execute_server_wait()
 
 // CLOSING state
 
-void ProtocolV2::close()
+void ProtocolV2::notify_mark_down()
 {
   do_close(false);
 }
index ec4d26468139fd84550990336c2023d3e9ce8bff..807c63de5038ef553a15e8fa24892f43686bf2dc 100644 (file)
@@ -19,8 +19,6 @@ class ProtocolV2 final : public Protocol {
 
 // public to SocketConnection, but private to the others
  private:
-  void close() override;
-
   seastar::future<> close_clean_yielded() override;
 
 #ifdef UNIT_TESTS_BUILT
@@ -46,6 +44,8 @@ class ProtocolV2 final : public Protocol {
 
   void notify_out_fault(const char *, std::exception_ptr) override;
 
+  void notify_mark_down() override;
+
   seastar::future<> wait_exit_io() {
     if (exit_io.has_value()) {
       return exit_io->get_shared_future();
index e4a2d7d789b6cdf3eee88a840c17145b48d17f8b..5b3d806ed7e9a66bda0304362cd898beb3100b75 100644 (file)
@@ -104,7 +104,7 @@ void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
 void SocketConnection::mark_down()
 {
   assert(seastar::this_shard_id() == shard_id());
-  protocol->close();
+  protocol->mark_down();
 }
 
 void