]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: move message write path from ProtocolV2 to Protocol
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 1 Dec 2022 02:34:56 +0000 (10:34 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 5cea92440fe223ee9c76c4079509850d4ab90977..3165a048fc19dc4293f41998aeb927a36bc64152 100644 (file)
@@ -61,6 +61,9 @@ public:
    * socket maintainence interfaces
    */
 
+  // the socket exists and not shutdown
+  bool is_socket_valid() const;
+
   void set_socket(SocketRef &&);
 
   void learn_socket_ephemeral_port_as_connector(uint16_t port);
@@ -120,8 +123,6 @@ public:
 private:
   bool has_socket() const;
 
-  bool is_socket_valid() const;
-
   void log_main_preamble(const ceph::bufferlist &bl);
 
 #ifdef UNIT_TESTS_BUILT
index 58edd882561d3cbee8fa939b9d2c24d6eb2ba21b..3ec265d4b2338445c4d7d34216a06166a589945b 100644 (file)
@@ -9,14 +9,17 @@
 #include "crimson/net/Errors.h"
 #include "crimson/net/chained_dispatchers.h"
 #include "crimson/net/SocketConnection.h"
+#include "crimson/net/SocketMessenger.h"
 #include "msg/Message.h"
 
 namespace {
-  seastar::logger& logger() {
-    return crimson::get_logger(ceph_subsys_ms);
-  }
+
+seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_ms);
 }
 
+} // namespace anonymous
+
 namespace crimson::net {
 
 Protocol::Protocol(ChainedDispatchers& dispatchers,
@@ -33,16 +36,58 @@ Protocol::~Protocol()
 }
 
 ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
-      size_t num_msgs,
-      bool require_keepalive,
-      std::optional<utime_t> maybe_keepalive_ack,
-      bool require_ack)
+  bool require_keepalive,
+  std::optional<utime_t> maybe_keepalive_ack,
+  bool require_ack)
 {
-  ceph::bufferlist bl = do_sweep_messages(out_pending_msgs,
-                                          num_msgs,
-                                          require_keepalive,
-                                          maybe_keepalive_ack,
-                                          require_ack);
+  std::size_t num_msgs = out_pending_msgs.size();
+  ceph::bufferlist bl;
+
+  if (unlikely(require_keepalive)) {
+    auto keepalive_frame = KeepAliveFrame::Encode();
+    bl.append(frame_assembler.get_buffer(keepalive_frame));
+  }
+
+  if (unlikely(maybe_keepalive_ack.has_value())) {
+    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
+    bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
+  }
+
+  if (require_ack && num_msgs == 0u) {
+    auto ack_frame = AckFrame::Encode(get_in_seq());
+    bl.append(frame_assembler.get_buffer(ack_frame));
+  }
+
+  std::for_each(
+      out_pending_msgs.begin(),
+      out_pending_msgs.begin()+num_msgs,
+      [this, &bl](const MessageURef& msg) {
+    // set priority
+    msg->get_header().src = conn.messenger.get_myname();
+
+    msg->encode(conn.features, 0);
+
+    ceph_assert(!msg->get_seq() && "message already has seq");
+    msg->set_seq(++out_seq);
+
+    ceph_msg_header &header = msg->get_header();
+    ceph_msg_footer &footer = msg->get_footer();
+
+    ceph_msg_header2 header2{header.seq,        header.tid,
+                             header.type,       header.priority,
+                             header.version,
+                             ceph_le32(0),      header.data_off,
+                             ceph_le64(get_in_seq()),
+                             footer.flags,      header.compat_version,
+                             header.reserved};
+
+    auto message = MessageFrame::Encode(header2,
+        msg->get_payload(), msg->get_middle(), msg->get_data());
+    logger().debug("{} --> #{} === {} ({})",
+                  conn, msg->get_seq(), *msg, msg->get_type());
+    bl.append(frame_assembler.get_buffer(message));
+  });
+
   if (!conn.policy.lossy) {
     out_sent_msgs.insert(
         out_sent_msgs.end(),
@@ -71,6 +116,35 @@ seastar::future<> Protocol::send_keepalive()
   return seastar::now();
 }
 
+void Protocol::set_out_state(
+    const Protocol::out_state_t &new_state)
+{
+  ceph_assert_always(!(
+    (new_state == out_state_t::none && out_state != out_state_t::none) ||
+    (new_state == out_state_t::open && out_state == out_state_t::open) ||
+    (new_state != out_state_t::drop && out_state == out_state_t::drop)
+  ));
+
+  if (out_state != out_state_t::open &&
+      new_state == out_state_t::open) {
+    // to open
+    ceph_assert_always(frame_assembler.is_socket_valid());
+  } else if (out_state == out_state_t::open &&
+             new_state != out_state_t::open) {
+    // from open
+    if (out_dispatching) {
+      ceph_assert_always(!out_exit_dispatching.has_value());
+      out_exit_dispatching = seastar::shared_promise<>();
+    }
+  }
+
+  if (out_state != new_state) {
+    out_state = new_state;
+    out_state_changed.set_value();
+    out_state_changed = seastar::shared_promise<>();
+  }
+}
+
 void Protocol::notify_keepalive_ack(utime_t keepalive_ack)
 {
   logger().trace("{} got keepalive ack {}", conn, keepalive_ack);
@@ -183,7 +257,6 @@ seastar::future<> Protocol::do_out_dispatch()
   return seastar::repeat([this] {
     switch (out_state) {
      case out_state_t::open: {
-      size_t num_msgs = out_pending_msgs.size();
       bool still_queued = is_out_queued();
       if (unlikely(!still_queued)) {
         return try_exit_out_dispatch();
@@ -193,7 +266,7 @@ seastar::future<> Protocol::do_out_dispatch()
       // sweep all pending out with the concrete Protocol
       return frame_assembler.write(
         sweep_out_pending_msgs_to_sent(
-          num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0)
+          need_keepalive, next_keepalive_ack, to_ack > 0)
       ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
         need_keepalive = false;
         if (next_keepalive_ack == prv_keepalive_ack) {
@@ -243,22 +316,22 @@ seastar::future<> Protocol::do_out_dispatch()
       ceph_abort();
     }
 
-    std::exception_ptr eptr;
-    try {
-      throw e;
-    } catch(...) {
-      eptr = std::current_exception();
-    }
-    notify_out_fault(eptr);
-
     if (out_state == out_state_t::open) {
       logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
                     conn, out_state, e);
-      out_state = out_state_t::delay;
+      std::exception_ptr eptr;
+      try {
+        throw e;
+      } catch(...) {
+        eptr = std::current_exception();
+      }
+      set_out_state(out_state_t::delay);
+      notify_out_fault(eptr);
     } else {
       logger().info("{} do_out_dispatch(): fault at {} -- {}",
                     conn, out_state, e);
     }
+
     return do_out_dispatch();
   });
 }
index c7bc71630b9574af5f957665fc62a5948d8ebb76..4c82d9847dba7b0e918094e337aee0dcd2ec994f 100644 (file)
@@ -45,13 +45,6 @@ class Protocol {
   Protocol(ChainedDispatchers& dispatchers,
            SocketConnection& conn);
 
-  virtual ceph::bufferlist do_sweep_messages(
-      const std::deque<MessageURef>& msgs,
-      size_t num_msgs,
-      bool require_keepalive,
-      std::optional<utime_t> maybe_keepalive_ack,
-      bool require_ack) = 0;
-
   virtual void notify_out() = 0;
 
   virtual void notify_out_fault(std::exception_ptr) = 0;
@@ -110,18 +103,9 @@ class Protocol {
     open,
     drop
   };
-
   friend class fmt::formatter<out_state_t>;
-  void set_out_state(const out_state_t& state) {
-    if (out_state == out_state_t::open &&
-        state != out_state_t::open &&
-        out_dispatching) {
-      out_exit_dispatching = seastar::shared_promise<>();
-    }
-    out_state = state;
-    out_state_changed.set_value();
-    out_state_changed = seastar::shared_promise<>();
-  }
+
+  void set_out_state(const out_state_t &new_state);
 
   seastar::future<> wait_out_exit_dispatching() {
     if (out_exit_dispatching) {
@@ -162,10 +146,6 @@ class Protocol {
     in_seq = _in_seq;
   }
 
-  seq_num_t increment_out_seq() {
-    return ++out_seq;
-  }
-
   ChainedDispatchers& dispatchers;
 
   SocketConnection &conn;
@@ -185,7 +165,6 @@ class Protocol {
   seastar::future<> do_out_dispatch();
 
   ceph::bufferlist sweep_out_pending_msgs_to_sent(
-      size_t num_msgs,
       bool require_keepalive,
       std::optional<utime_t> maybe_keepalive_ack,
       bool require_ack);
index 0d5963edff65293932940734e22b7ca0724551b3..8bce2ee6a8458e4b4bca8ad2f7fdc5e12af6a796 100644 (file)
@@ -1766,61 +1766,6 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 
 // READY state
 
-ceph::bufferlist ProtocolV2::do_sweep_messages(
-    const std::deque<MessageURef>& msgs,
-    size_t num_msgs,
-    bool require_keepalive,
-    std::optional<utime_t> maybe_keepalive_ack,
-    bool require_ack)
-{
-  ceph::bufferlist bl;
-
-  if (unlikely(require_keepalive)) {
-    auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(frame_assembler.get_buffer(keepalive_frame));
-  }
-
-  if (unlikely(maybe_keepalive_ack.has_value())) {
-    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
-    bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
-  }
-
-  if (require_ack && num_msgs == 0u) {
-    auto ack_frame = AckFrame::Encode(get_in_seq());
-    bl.append(frame_assembler.get_buffer(ack_frame));
-  }
-
-  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
-    // TODO: move to common code
-    // set priority
-    msg->get_header().src = messenger.get_myname();
-
-    msg->encode(conn.features, 0);
-
-    ceph_assert(!msg->get_seq() && "message already has seq");
-    msg->set_seq(increment_out_seq());
-
-    ceph_msg_header &header = msg->get_header();
-    ceph_msg_footer &footer = msg->get_footer();
-
-    ceph_msg_header2 header2{header.seq,        header.tid,
-                             header.type,       header.priority,
-                             header.version,
-                             ceph_le32(0),      header.data_off,
-                             ceph_le64(get_in_seq()),
-                             footer.flags,      header.compat_version,
-                             header.reserved};
-
-    auto message = MessageFrame::Encode(header2,
-        msg->get_payload(), msg->get_middle(), msg->get_data());
-    logger().debug("{} --> #{} === {} ({})",
-                  conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(frame_assembler.get_buffer(message));
-  });
-
-  return bl;
-}
-
 void ProtocolV2::notify_out_fault(std::exception_ptr eptr)
 {
   fault(state_t::READY, "notify_out_fault", eptr);
index 3b02b2f6a95b17d8ab7fd1d4c68e4c2abd55007b..5c7d369bb2a947c48b36a1ba1734da1839bc5639 100644 (file)
@@ -45,13 +45,6 @@ class ProtocolV2 final : public Protocol {
   void print_conn(std::ostream&) const final;
 
  private:
-  ceph::bufferlist do_sweep_messages(
-      const std::deque<MessageURef>& msgs,
-      size_t num_msgs,
-      bool require_keepalive,
-      std::optional<utime_t> keepalive_ack,
-      bool require_ack) override;
-
   void notify_out() override;
 
   void notify_out_fault(std::exception_ptr) override;