]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: notify and update io_states from io_handler to protocol asynchronously
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 31 May 2023 08:57:41 +0000 (16:57 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index 1ba97617f58aa012823bc0b9090f83d3bc75c71f..99a48536de5abdebc9b37b07cb6be6e91f6977a8 100644 (file)
@@ -23,8 +23,6 @@
 
 using namespace ceph::msgr::v2;
 using crimson::common::local_conf;
-using io_state_t = crimson::net::IOHandler::io_state_t;
-using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
 
 namespace {
 
@@ -148,7 +146,9 @@ ProtocolV2::ProtocolV2(SocketConnection& conn,
     frame_assembler{FrameAssemblerV2::create(conn)},
     auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
     protocol_timer{conn}
-{}
+{
+  io_states = io_handler.get_states();
+}
 
 ProtocolV2::~ProtocolV2() {}
 
@@ -207,13 +207,21 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
     ceph_assert_always(!exit_io.has_value());
     exit_io = seastar::shared_promise<>();
   }
+
+  bool need_notify_out;
+  if (new_state == state_t::STANDBY && !conn.policy.server) {
+    need_notify_out = true;
+  } else {
+    need_notify_out = false;
+  }
+
   state = new_state;
   if (new_state == state_t::READY) {
     // I'm not responsible to shutdown the socket at READY
     is_socket_valid = false;
-    io_handler.set_io_state(new_io_state, std::move(frame_assembler));
+    io_handler.set_io_state(new_io_state, std::move(frame_assembler), need_notify_out);
   } else {
-    io_handler.set_io_state(new_io_state, nullptr);
+    io_handler.set_io_state(new_io_state, nullptr, need_notify_out);
   }
 
   /*
@@ -223,9 +231,10 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
   if (pre_state == state_t::READY) {
     gate.dispatch_in_background("exit_io", conn, [this] {
       return io_handler.wait_io_exit_dispatching(
-      ).then([this](FrameAssemblerV2Ref fa) {
-        frame_assembler = std::move(fa);
+      ).then([this](auto ret) {
+        frame_assembler = std::move(ret.frame_assembler);
         ceph_assert_always(!frame_assembler->is_socket_valid());
+        io_states = ret.io_states;
         exit_io->set_value();
         exit_io = std::nullopt;
       });
@@ -295,20 +304,20 @@ void ProtocolV2::fault(
   }
 
   if (conn.policy.server ||
-      (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
+      (conn.policy.standby && !io_states.is_out_queued_or_sent())) {
     if (conn.policy.server) {
       logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
                     conn,
                     get_state_name(state),
                     where,
-                    io_stat_printer{io_handler},
+                    io_states,
                     e_what);
     } else {
       logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
                     conn,
                     get_state_name(state),
                     where,
-                    io_stat_printer{io_handler},
+                    io_states,
                     e_what);
     }
     execute_standby();
@@ -318,7 +327,7 @@ void ProtocolV2::fault(
                   conn,
                   get_state_name(state),
                   where,
-                  io_stat_printer{io_handler},
+                  io_states,
                   e_what);
     execute_wait(false);
   } else {
@@ -328,7 +337,7 @@ void ProtocolV2::fault(
                   conn,
                   get_state_name(state),
                   where,
-                  io_stat_printer{io_handler},
+                  io_states,
                   e_what);
     execute_connecting();
   }
@@ -342,6 +351,7 @@ void ProtocolV2::reset_session(bool full)
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
   }
+  io_states.reset_session(full);
   io_handler.reset_session(full);
 }
 
@@ -623,6 +633,7 @@ ProtocolV2::client_connect()
         return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_server_ident() logic
+          io_states.requeue_out_sent();
           io_handler.requeue_out_sent();
           auto server_ident = ServerIdentFrame::Decode(payload->back());
           logger().debug("{} GOT ServerIdentFrame:"
@@ -699,12 +710,12 @@ ProtocolV2::client_reconnect()
                                           server_cookie,
                                           global_seq,
                                           connect_seq,
-                                          io_handler.get_in_seq());
+                                          io_states.in_seq);
   logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
                  " server_cookie={}, gs={}, cs={}, in_seq={}",
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
-                 global_seq, connect_seq, io_handler.get_in_seq());
+                 global_seq, connect_seq, io_states.in_seq);
   return frame_assembler->write_flush_frame(reconnect).then([this] {
     return frame_assembler->read_main_preamble();
   }).then([this](auto ret) {
@@ -754,6 +765,7 @@ ProtocolV2::client_reconnect()
           auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
                          conn, reconnect_ok.msg_seq());
+          io_states.requeue_out_sent_up_to();
           io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
@@ -886,8 +898,7 @@ void ProtocolV2::execute_connecting()
             logger().info("{} connected: gs={}, pgs={}, cs={}, "
                           "client_cookie={}, server_cookie={}, {}",
                           conn, global_seq, peer_global_seq, connect_seq,
-                          client_cookie, server_cookie,
-                          io_stat_printer{io_handler});
+                          client_cookie, server_cookie, io_states);
             io_handler.dispatch_connect();
             if (unlikely(state != state_t::CONNECTING)) {
               logger().debug("{} triggered {} after ms_handle_connect(), abort",
@@ -1653,8 +1664,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
       logger().info("{} established: gs={}, pgs={}, cs={}, "
                     "client_cookie={}, server_cookie={}, {}",
                     conn, global_seq, peer_global_seq, connect_seq,
-                    client_cookie, server_cookie,
-                    io_stat_printer{io_handler});
+                    client_cookie, server_cookie, io_states);
       execute_ready();
     }).handle_exception([this](std::exception_ptr eptr) {
       fault(state_t::ESTABLISHING, "execute_establishing", eptr);
@@ -1674,6 +1684,7 @@ ProtocolV2::send_server_ident()
   logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
 
   // this is required for the case when this connection is being replaced
+  io_states.reset_peer_state();
   io_handler.reset_peer_state();
 
   if (!conn.policy.lossy) {
@@ -1783,9 +1794,10 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       if (reconnect) {
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
+        io_states.requeue_out_sent_up_to();
         io_handler.requeue_out_sent_up_to(new_msg_seq);
-        auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
-        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
+        auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
+        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
         return frame_assembler->write_flush_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
@@ -1809,8 +1821,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                     "client_cookie={}, server_cookie={}, {}",
                     conn, reconnect ? "reconnected" : "connected",
                     global_seq, peer_global_seq, connect_seq,
-                    client_cookie, server_cookie,
-                    io_stat_printer{io_handler});
+                    client_cookie, server_cookie, io_states);
       execute_ready();
     }).handle_exception([this](std::exception_ptr eptr) {
       fault(state_t::REPLACING, "trigger_replacing", eptr);
@@ -1820,8 +1831,12 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 
 // READY state
 
-void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
+void ProtocolV2::notify_out_fault(
+    const char *where,
+    std::exception_ptr eptr,
+    io_handler_state _io_states)
 {
+  io_states = _io_states;
   fault(state_t::READY, where, eptr);
 }
 
@@ -1843,6 +1858,7 @@ void ProtocolV2::execute_standby()
 
 void ProtocolV2::notify_out()
 {
+  io_states.is_out_queued = true;
   if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
     logger().info("{} notify_out(): at {}, going to CONNECTING",
                   conn, get_state_name(state));
index f81ffdbfbc69a81c1eeac59630294dfb7ed31a20..2aa9496ef83e529da026235f38aa61976f00cb38 100644 (file)
@@ -30,7 +30,7 @@ public:
 private:
   void notify_out() final;
 
-  void notify_out_fault(const char *, std::exception_ptr) final;
+  void notify_out_fault(const char *where, std::exception_ptr, io_handler_state) final;
 
   void notify_mark_down() final;
 
@@ -57,6 +57,8 @@ public:
 
 #endif
 private:
+  using io_state_t = IOHandler::io_state_t;
+
   seastar::future<> wait_exit_io() {
     if (exit_io.has_value()) {
       return exit_io->get_shared_future();
@@ -92,7 +94,7 @@ private:
     return statenames[static_cast<int>(state)];
   }
 
-  void trigger_state(state_t state, IOHandler::io_state_t io_state, bool reentrant);
+  void trigger_state(state_t state, io_state_t io_state, bool reentrant);
 
   template <typename Func, typename T>
   void gated_execute(const char *what, T &who, Func &&func) {
@@ -215,6 +217,9 @@ private:
 
   IOHandler &io_handler;
 
+  // asynchronously populated from io_handler
+  io_handler_state io_states;
+
   bool has_socket = false;
 
   // the socket exists and it is not shutdown
index 952158c5d31cd222f7e65bf5b5884a32517cd1ea..cbd16013e0708078f1d86fc9929995c3a11606f3 100644 (file)
@@ -78,7 +78,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
   }
 
   if (require_ack && num_msgs == 0u) {
-    auto ack_frame = AckFrame::Encode(get_in_seq());
+    auto ack_frame = AckFrame::Encode(in_seq);
     bl.append(frame_assembler->get_buffer(ack_frame));
   }
 
@@ -101,7 +101,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
                              header.type,       header.priority,
                              header.version,
                              ceph_le32(0),      header.data_off,
-                             ceph_le64(get_in_seq()),
+                             ceph_le64(in_seq),
                              footer.flags,      header.compat_version,
                              header.reserved};
 
@@ -172,8 +172,9 @@ void IOHandler::print_io_stat(std::ostream &out) const
 }
 
 void IOHandler::set_io_state(
-    const IOHandler::io_state_t &new_state,
-    FrameAssemblerV2Ref fa)
+    io_state_t new_state,
+    FrameAssemblerV2Ref fa,
+    bool set_notify_out)
 {
   ceph_assert_always(!(
     (new_state == io_state_t::none && io_state != io_state_t::none) ||
@@ -212,6 +213,16 @@ void IOHandler::set_io_state(
     assert(fa == nullptr);
   }
 
+  if (new_state == io_state_t::delay) {
+    need_notify_out = set_notify_out;
+    if (need_notify_out) {
+      maybe_notify_out_dispatch();
+    }
+  } else {
+    assert(set_notify_out == false);
+    need_notify_out = false;
+  }
+
   if (io_state != new_state) {
     io_state = new_state;
     io_state_changed.set_value();
@@ -227,7 +238,8 @@ void IOHandler::set_io_state(
   }
 }
 
-seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
+seastar::future<IOHandler::exit_dispatching_ret>
+IOHandler::wait_io_exit_dispatching()
 {
   ceph_assert_always(io_state != io_state_t::open);
   ceph_assert_always(frame_assembler != nullptr);
@@ -248,7 +260,9 @@ seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
       }
     }()
   ).discard_result().then([this] {
-    return std::move(frame_assembler);
+    return exit_dispatching_ret{
+      std::move(frame_assembler),
+      get_states()};
   });
 }
 
@@ -289,7 +303,7 @@ void IOHandler::requeue_out_sent()
       std::make_move_iterator(out_sent_msgs.begin()),
       std::make_move_iterator(out_sent_msgs.end()));
   out_sent_msgs.clear();
-  notify_out_dispatch();
+  maybe_notify_out_dispatch();
 }
 
 void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
@@ -487,7 +501,9 @@ seastar::future<> IOHandler::do_out_dispatch()
         eptr = std::current_exception();
       }
       set_io_state(io_state_t::delay);
-      handshake_listener->notify_out_fault("do_out_dispatch", eptr);
+      auto states = get_states();
+      handshake_listener->notify_out_fault(
+          "do_out_dispatch", eptr, states);
     } else {
       logger().info("{} do_out_dispatch(): fault at {} -- {}",
                     conn, io_state, e.what());
@@ -497,9 +513,18 @@ seastar::future<> IOHandler::do_out_dispatch()
   });
 }
 
+void IOHandler::maybe_notify_out_dispatch()
+{
+  if (is_out_queued()) {
+    notify_out_dispatch();
+  }
+}
+
 void IOHandler::notify_out_dispatch()
 {
-  handshake_listener->notify_out();
+  if (need_notify_out) {
+    handshake_listener->notify_out();
+  }
   if (out_dispatching) {
     // already dispatching
     return;
@@ -587,7 +612,7 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
     // client side queueing because messages can't be renumbered, but the (kernel)
     // client will occasionally pull a message out of the sent queue to send
     // elsewhere.  in that case it doesn't matter if we "got" it or not.
-    uint64_t cur_seq = get_in_seq();
+    uint64_t cur_seq = in_seq;
     if (message->get_seq() <= cur_seq) {
       logger().error("{} got old message {} <= {} {}, discarding",
                      conn, message->get_seq(), cur_seq, *message);
@@ -726,7 +751,9 @@ void IOHandler::do_in_dispatch()
         logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
                       conn, io_state, e_what);
         set_io_state(io_state_t::delay);
-        handshake_listener->notify_out_fault("do_in_dispatch", eptr);
+        auto states = get_states();
+        handshake_listener->notify_out_fault(
+            "do_in_dispatch", eptr, states);
       } else {
         logger().info("{} do_in_dispatch(): fault at {} -- {}",
                       conn, io_state, e_what);
index 2b48c8ab170128e05e59aa1f2b72c20aafc82c72..db82de5160ec89ab60d28d1c6ab2eb2c30e9f9f5 100644 (file)
 
 namespace crimson::net {
 
+/**
+ * io_handler_state
+ *
+ * It is required to populate the states from IOHandler to ProtocolV2
+ * asynchronously.
+ */
+struct io_handler_state {
+  seq_num_t in_seq;
+  bool is_out_queued;
+  bool has_out_sent;
+
+  bool is_out_queued_or_sent() const {
+    return is_out_queued || has_out_sent;
+  }
+
+  /*
+   * should be consistent with the accroding interfaces in IOHandler
+   */
+
+  void reset_session(bool full) {
+    in_seq = 0;
+    if (full) {
+      is_out_queued = false;
+      has_out_sent = false;
+    }
+  }
+
+  void reset_peer_state() {
+    in_seq = 0;
+    is_out_queued = is_out_queued_or_sent();
+    has_out_sent = false;
+  }
+
+  void requeue_out_sent_up_to() {
+    // noop since the information is insufficient
+  }
+
+  void requeue_out_sent() {
+    if (has_out_sent) {
+      has_out_sent = false;
+      is_out_queued = true;
+    }
+  }
+};
+
 /**
  * HandshakeListener
  *
- * The interface class for IOHandler to notify the ProtocolV2 for handshake.
+ * The interface class for IOHandler to notify the ProtocolV2.
  *
  * The notifications may be cross-core and asynchronous.
  */
@@ -30,7 +75,10 @@ public:
 
   virtual void notify_out() = 0;
 
-  virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
+  virtual void notify_out_fault(
+      const char *where,
+      std::exception_ptr,
+      io_handler_state) = 0;
 
   virtual void notify_mark_down() = 0;
 
@@ -102,6 +150,10 @@ public:
     handshake_listener = &hl;
   }
 
+  io_handler_state get_states() const {
+    return {in_seq, is_out_queued(), has_out_sent()};
+  }
+
   struct io_stat_printer {
     const IOHandler &io_handler;
   };
@@ -137,9 +189,16 @@ public:
   };
   friend class fmt::formatter<io_state_t>;
 
-  void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+  void set_io_state(
+      io_state_t new_state,
+      FrameAssemblerV2Ref fa = nullptr,
+      bool set_notify_out = false);
 
-  seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
+  struct exit_dispatching_ret {
+    FrameAssemblerV2Ref frame_assembler;
+    io_handler_state io_states;
+  };
+  seastar::future<exit_dispatching_ret> wait_io_exit_dispatching();
 
   void reset_session(bool full);
 
@@ -149,14 +208,6 @@ public:
 
   void requeue_out_sent();
 
-  bool is_out_queued_or_sent() const {
-    return is_out_queued() || !out_sent_msgs.empty();
-  }
-
-  seq_num_t get_in_seq() const {
-    return in_seq;
-  }
-
   void dispatch_accept();
 
   void dispatch_connect();
@@ -188,6 +239,8 @@ public:
       std::optional<utime_t> maybe_keepalive_ack,
       bool require_ack);
 
+  void maybe_notify_out_dispatch();
+
   void notify_out_dispatch();
 
   void ack_out_sent(seq_num_t seq);
@@ -244,6 +297,8 @@ private:
 
   uint64_t ack_left = 0;
 
+  bool need_notify_out = false;
+
   /*
    * in states for reading
    */
@@ -266,6 +321,23 @@ inline std::ostream& operator<<(
 
 } // namespace crimson::net
 
+template <>
+struct fmt::formatter<crimson::net::io_handler_state> {
+  constexpr auto parse(format_parse_context& ctx) {
+    return ctx.begin();
+  }
+
+  template <typename FormatContext>
+  auto format(crimson::net::io_handler_state state, FormatContext& ctx) {
+    return fmt::format_to(
+        ctx.out(),
+        "io(in_seq={}, is_out_queued={}, has_out_sent={})",
+        state.in_seq,
+        state.is_out_queued,
+        state.has_out_sent);
+  }
+};
+
 template <>
 struct fmt::formatter<crimson::net::IOHandler::io_state_t>
   : fmt::formatter<std::string_view> {