]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: move message read path from ProtocolV2 to Protocol
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 1 Dec 2022 02:36:35 +0000 (10:36 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Also move socket shutdown ownership to Protocol at READY/open.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 3ec265d4b2338445c4d7d34216a06166a589945b..38b0217631fc1b77ecd2041e4ab4504830398033 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "auth/Auth.h"
 
+#include "crimson/common/formatter.h"
 #include "crimson/common/log.h"
 #include "crimson/net/Errors.h"
 #include "crimson/net/chained_dispatchers.h"
 #include "crimson/net/SocketMessenger.h"
 #include "msg/Message.h"
 
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
 namespace {
 
 seastar::logger& logger() {
   return crimson::get_logger(ceph_subsys_ms);
 }
 
+[[noreturn]] void abort_in_fault() {
+  throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+  throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
+{
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
+  size_t sum = 0;
+  // we don't include SegmentIndex::Msg::HEADER.
+  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+    sum += rx_frame_asm.get_segment_logical_len(idx);
+  }
+  return sum;
+}
+
 } // namespace anonymous
 
 namespace crimson::net {
@@ -125,13 +148,22 @@ void Protocol::set_out_state(
     (new_state != out_state_t::drop && out_state == out_state_t::drop)
   ));
 
+  bool dispatch_in = false;
   if (out_state != out_state_t::open &&
       new_state == out_state_t::open) {
     // to open
     ceph_assert_always(frame_assembler.is_socket_valid());
+    dispatch_in = true;
+#ifdef UNIT_TESTS_BUILT
+    if (conn.interceptor) {
+      conn.interceptor->register_conn_ready(conn);
+    }
+#endif
   } else if (out_state == out_state_t::open &&
              new_state != out_state_t::open) {
     // from open
+    ceph_assert_always(frame_assembler.is_socket_valid());
+    frame_assembler.shutdown_socket();
     if (out_dispatching) {
       ceph_assert_always(!out_exit_dispatching.has_value());
       out_exit_dispatching = seastar::shared_promise<>();
@@ -143,21 +175,33 @@ void Protocol::set_out_state(
     out_state_changed.set_value();
     out_state_changed = seastar::shared_promise<>();
   }
-}
 
-void Protocol::notify_keepalive_ack(utime_t keepalive_ack)
-{
-  logger().trace("{} got keepalive ack {}", conn, keepalive_ack);
-  next_keepalive_ack = keepalive_ack;
-  notify_out_dispatch();
+  // The above needs to be atomic
+  if (dispatch_in) {
+    do_in_dispatch();
+  }
 }
 
-void Protocol::notify_ack()
+seastar::future<> Protocol::wait_io_exit_dispatching()
 {
-  if (!conn.policy.lossy) {
-    ++ack_left;
-    notify_out_dispatch();
-  }
+  ceph_assert_always(out_state != out_state_t::open);
+  ceph_assert_always(!frame_assembler.is_socket_valid());
+  return seastar::when_all(
+    [this] {
+      if (out_exit_dispatching) {
+        return out_exit_dispatching->get_shared_future();
+      } else {
+        return seastar::now();
+      }
+    }(),
+    [this] {
+      if (in_exit_dispatching) {
+        return in_exit_dispatching->get_shared_future();
+      } else {
+        return seastar::now();
+      }
+    }()
+  ).discard_result();
 }
 
 void Protocol::requeue_out_sent()
@@ -326,7 +370,7 @@ seastar::future<> Protocol::do_out_dispatch()
         eptr = std::current_exception();
       }
       set_out_state(out_state_t::delay);
-      notify_out_fault(eptr);
+      notify_out_fault("do_out_dispatch", eptr);
     } else {
       logger().info("{} do_out_dispatch(): fault at {} -- {}",
                     conn, out_state, e);
@@ -361,4 +405,218 @@ void Protocol::notify_out_dispatch()
   }
 }
 
+seastar::future<>
+Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
+{
+  return frame_assembler.read_frame_payload(
+  ).then([this, throttle_stamp, msg_size](auto payload) {
+    if (unlikely(out_state != out_state_t::open)) {
+      logger().debug("{} triggered {} during read_message()",
+                     conn, out_state);
+      abort_protocol();
+    }
+
+    utime_t recv_stamp{seastar::lowres_system_clock::now()};
+
+    // we need to get the size before std::moving segments data
+    auto msg_frame = MessageFrame::Decode(*payload);
+    // XXX: paranoid copy just to avoid oops
+    ceph_msg_header2 current_header = msg_frame.header();
+
+    logger().trace("{} got {} + {} + {} byte message,"
+                   " envelope type={} src={} off={} seq={}",
+                   conn, msg_frame.front_len(), msg_frame.middle_len(),
+                   msg_frame.data_len(), current_header.type, conn.get_peer_name(),
+                   current_header.data_off, current_header.seq);
+
+    ceph_msg_header header{current_header.seq,
+                           current_header.tid,
+                           current_header.type,
+                           current_header.priority,
+                           current_header.version,
+                           ceph_le32(msg_frame.front_len()),
+                           ceph_le32(msg_frame.middle_len()),
+                           ceph_le32(msg_frame.data_len()),
+                           current_header.data_off,
+                           conn.get_peer_name(),
+                           current_header.compat_version,
+                           current_header.reserved,
+                           ceph_le32(0)};
+    ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+                           ceph_le32(0), ceph_le64(0), current_header.flags};
+
+    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this());
+    Message *message = decode_message(nullptr, 0, header, footer,
+        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+    if (!message) {
+      logger().warn("{} decode message failed", conn);
+      abort_in_fault();
+    }
+
+    // store reservation size in message, so we don't get confused
+    // by messages entering the dispatch queue through other paths.
+    message->set_dispatch_throttle_size(msg_size);
+
+    message->set_throttle_stamp(throttle_stamp);
+    message->set_recv_stamp(recv_stamp);
+    message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
+
+    // check received seq#.  if it is old, drop the message.
+    // note that incoming messages may skip ahead.  this is convenient for the
+    // client side queueing because messages can't be renumbered, but the (kernel)
+    // client will occasionally pull a message out of the sent queue to send
+    // elsewhere.  in that case it doesn't matter if we "got" it or not.
+    uint64_t cur_seq = get_in_seq();
+    if (message->get_seq() <= cur_seq) {
+      logger().error("{} got old message {} <= {} {}, discarding",
+                     conn, message->get_seq(), cur_seq, *message);
+      if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+          local_conf()->ms_die_on_old_message) {
+        ceph_assert(0 == "old msgs despite reconnect_seq feature");
+      }
+      return seastar::now();
+    } else if (message->get_seq() > cur_seq + 1) {
+      logger().error("{} missed message? skipped from seq {} to {}",
+                     conn, cur_seq, message->get_seq());
+      if (local_conf()->ms_die_on_skipped_message) {
+        ceph_assert(0 == "skipped incoming seq");
+      }
+    }
+
+    // note last received message.
+    in_seq = message->get_seq();
+    if (conn.policy.lossy) {
+      logger().debug("{} <== #{} === {} ({})",
+                     conn,
+                     message->get_seq(),
+                     *message,
+                     message->get_type());
+    } else {
+      logger().debug("{} <== #{},{} === {} ({})",
+                     conn,
+                     message->get_seq(),
+                     current_header.ack_seq,
+                     *message,
+                     message->get_type());
+    }
+
+    // notify ack
+    if (!conn.policy.lossy) {
+      ++ack_left;
+      notify_out_dispatch();
+    }
+
+    ack_out_sent(current_header.ack_seq);
+
+    // TODO: change MessageRef with seastar::shared_ptr
+    auto msg_ref = MessageRef{message, false};
+    assert(out_state == out_state_t::open);
+    // throttle the reading process by the returned future
+    return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+  });
+}
+
+void Protocol::do_in_dispatch()
+{
+  ceph_assert_always(!in_exit_dispatching.has_value());
+  in_exit_dispatching = seastar::shared_promise<>();
+  gate.dispatch_in_background("do_in_dispatch", *this, [this] {
+    return seastar::keep_doing([this] {
+      return frame_assembler.read_main_preamble(
+      ).then([this](auto ret) {
+        switch (ret.tag) {
+          case Tag::MESSAGE: {
+            size_t msg_size = get_msg_size(*ret.rx_frame_asm);
+            return seastar::futurize_invoke([this] {
+              // throttle_message() logic
+              if (!conn.policy.throttler_messages) {
+                return seastar::now();
+              }
+              // TODO: message throttler
+              ceph_assert(false);
+              return seastar::now();
+            }).then([this, msg_size] {
+              // throttle_bytes() logic
+              if (!conn.policy.throttler_bytes) {
+                return seastar::now();
+              }
+              if (!msg_size) {
+                return seastar::now();
+              }
+              logger().trace("{} wants {} bytes from policy throttler {}/{}",
+                             conn, msg_size,
+                             conn.policy.throttler_bytes->get_current(),
+                             conn.policy.throttler_bytes->get_max());
+              return conn.policy.throttler_bytes->get(msg_size);
+            }).then([this, msg_size] {
+              // TODO: throttle_dispatch_queue() logic
+              utime_t throttle_stamp{seastar::lowres_system_clock::now()};
+              return read_message(throttle_stamp, msg_size);
+            });
+          }
+          case Tag::ACK:
+            return frame_assembler.read_frame_payload(
+            ).then([this](auto payload) {
+              // handle_message_ack() logic
+              auto ack = AckFrame::Decode(payload->back());
+              logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
+              ack_out_sent(ack.seq());
+            });
+          case Tag::KEEPALIVE2:
+            return frame_assembler.read_frame_payload(
+            ).then([this](auto payload) {
+              // handle_keepalive2() logic
+              auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
+              logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+                             conn, keepalive_frame.timestamp());
+              // notify keepalive ack
+              next_keepalive_ack = keepalive_frame.timestamp();
+              notify_out_dispatch();
+
+              last_keepalive = seastar::lowres_system_clock::now();
+            });
+          case Tag::KEEPALIVE2_ACK:
+            return frame_assembler.read_frame_payload(
+            ).then([this](auto payload) {
+              // handle_keepalive2_ack() logic
+              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
+              auto _last_keepalive_ack =
+                seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
+              set_last_keepalive_ack(_last_keepalive_ack);
+              logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
+                             conn, _last_keepalive_ack);
+            });
+          default: {
+            logger().warn("{} do_in_dispatch() received unexpected tag: {}",
+                          conn, static_cast<uint32_t>(ret.tag));
+            abort_in_fault();
+          }
+        }
+      });
+    }).handle_exception([this](std::exception_ptr eptr) {
+      const char *e_what;
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        e_what = e.what();
+      }
+
+      if (out_state == out_state_t::open) {
+        logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
+                      conn, out_state, e_what);
+        set_out_state(out_state_t::delay);
+        notify_out_fault("do_in_dispatch", eptr);
+      } else {
+        logger().info("{} do_in_dispatch(): fault at {} -- {}",
+                      conn, out_state, e_what);
+      }
+    }).finally([this] {
+      ceph_assert_always(in_exit_dispatching.has_value());
+      in_exit_dispatching->set_value();
+      in_exit_dispatching = std::nullopt;
+    });
+  });
+}
+
 } // namespace crimson::net
index 4c82d9847dba7b0e918094e337aee0dcd2ec994f..5957956900b942aa3adfff1a7a55ee66a6bf5a80 100644 (file)
@@ -47,7 +47,7 @@ class Protocol {
 
   virtual void notify_out() = 0;
 
-  virtual void notify_out_fault(std::exception_ptr) = 0;
+  virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
 
 // the write state-machine
  public:
@@ -107,16 +107,7 @@ class Protocol {
 
   void set_out_state(const out_state_t &new_state);
 
-  seastar::future<> wait_out_exit_dispatching() {
-    if (out_exit_dispatching) {
-      return out_exit_dispatching->get_shared_future();
-    }
-    return seastar::now();
-  }
-
-  void notify_keepalive_ack(utime_t keepalive_ack);
-
-  void notify_ack();
+  seastar::future<> wait_io_exit_dispatching();
 
   void requeue_out_sent_up_to(seq_num_t seq);
 
@@ -132,20 +123,10 @@ class Protocol {
     return is_out_queued() || !out_sent_msgs.empty();
   }
 
-  void ack_out_sent(seq_num_t seq);
-
-  void set_last_keepalive(clock_t::time_point when) {
-    last_keepalive = when;
-  }
-
   seq_num_t get_in_seq() const {
     return in_seq;
   }
 
-  void set_in_seq(seq_num_t _in_seq) {
-    in_seq = _in_seq;
-  }
-
   ChainedDispatchers& dispatchers;
 
   SocketConnection &conn;
@@ -171,6 +152,12 @@ class Protocol {
 
   void notify_out_dispatch();
 
+  void ack_out_sent(seq_num_t seq);
+
+  seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
+
+  void do_in_dispatch();
+
   crimson::common::Gated gate;
 
   /*
@@ -208,6 +195,8 @@ class Protocol {
    * in states for reading
    */
 
+  std::optional<seastar::shared_promise<>> in_exit_dispatching;
+
   /// the seq num of the last received message
   seq_num_t in_seq = 0;
 
index 8bce2ee6a8458e4b4bca8ad2f7fdc5e12af6a796..2337ecfbc38b5de32cb9851bb9c7da7410d4cb3c 100644 (file)
@@ -3,7 +3,6 @@
 
 #include "ProtocolV2.h"
 
-#include <seastar/core/lowres_clock.hh>
 #include <fmt/format.h>
 #include <fmt/ranges.h>
 #include "include/msgr.h"
@@ -99,17 +98,6 @@ inline uint64_t generate_client_cookie() {
       1, std::numeric_limits<uint64_t>::max());
 }
 
-std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
-{
-  ceph_assert(rx_frame_asm.get_num_segments() > 0);
-  size_t sum = 0;
-  // we don't include SegmentIndex::Msg::HEADER.
-  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
-    sum += rx_frame_asm.get_segment_logical_len(idx);
-  }
-  return sum;
-}
-
 } // namespace anonymous
 
 namespace fmt {
@@ -287,11 +275,11 @@ void ProtocolV2::fault(
 
   if (likely(has_socket)) {
     if (likely(is_socket_valid)) {
+      ceph_assert_always(state != state_t::READY);
       frame_assembler.shutdown_socket();
       is_socket_valid = false;
     } else {
-      ceph_assert_always(state == state_t::CONNECTING ||
-                         state == state_t::REPLACING);
+      ceph_assert_always(state != state_t::ESTABLISHING);
     }
   } else { // !has_socket
     ceph_assert_always(state == state_t::CONNECTING);
@@ -783,10 +771,7 @@ void ProtocolV2::execute_connecting()
         assert(server_cookie == 0);
         logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
       }
-      return seastar::when_all(
-        wait_out_exit_dispatching(),
-        wait_in_exit_dispatching()
-      ).discard_result().then([this] {
+      return wait_io_exit_dispatching().then([this] {
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} before Socket::connect()",
                            conn, get_state_name(state));
@@ -1687,10 +1672,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
     dispatchers.ms_handle_accept(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
     // state may become CLOSING, close mover.socket and abort later
-    return seastar::when_all(
-      wait_out_exit_dispatching(),
-      wait_in_exit_dispatching()
-    ).discard_result().then([this] {
+    return wait_io_exit_dispatching(
+    ).then([this] {
       protocol_timer.cancel();
       auto done = std::move(execution_done);
       execution_done = seastar::now();
@@ -1766,206 +1749,19 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 
 // READY state
 
-void ProtocolV2::notify_out_fault(std::exception_ptr eptr)
-{
-  fault(state_t::READY, "notify_out_fault", eptr);
-}
-
-seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size)
+void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
 {
-  return frame_assembler.read_frame_payload(
-  ).then([this, throttle_stamp, msg_size](auto payload) {
-    if (unlikely(state != state_t::READY)) {
-      logger().debug("{} triggered {} during read_message()",
-                     conn, get_state_name(state));
-      abort_protocol();
-    }
-
-    utime_t recv_stamp{seastar::lowres_system_clock::now()};
-
-    // we need to get the size before std::moving segments data
-    auto msg_frame = MessageFrame::Decode(*payload);
-    // XXX: paranoid copy just to avoid oops
-    ceph_msg_header2 current_header = msg_frame.header();
-
-    logger().trace("{} got {} + {} + {} byte message,"
-                   " envelope type={} src={} off={} seq={}",
-                   conn, msg_frame.front_len(), msg_frame.middle_len(),
-                   msg_frame.data_len(), current_header.type, conn.get_peer_name(),
-                   current_header.data_off, current_header.seq);
-
-    ceph_msg_header header{current_header.seq,
-                           current_header.tid,
-                           current_header.type,
-                           current_header.priority,
-                           current_header.version,
-                           ceph_le32(msg_frame.front_len()),
-                           ceph_le32(msg_frame.middle_len()),
-                           ceph_le32(msg_frame.data_len()),
-                           current_header.data_off,
-                           conn.get_peer_name(),
-                           current_header.compat_version,
-                           current_header.reserved,
-                           ceph_le32(0)};
-    ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
-                           ceph_le32(0), ceph_le64(0), current_header.flags};
-
-    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
-        conn.shared_from_this());
-    Message *message = decode_message(nullptr, 0, header, footer,
-        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
-    if (!message) {
-      logger().warn("{} decode message failed", conn);
-      abort_in_fault();
-    }
-
-    // store reservation size in message, so we don't get confused
-    // by messages entering the dispatch queue through other paths.
-    message->set_dispatch_throttle_size(msg_size);
-
-    message->set_throttle_stamp(throttle_stamp);
-    message->set_recv_stamp(recv_stamp);
-    message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
-
-    // check received seq#.  if it is old, drop the message.
-    // note that incoming messages may skip ahead.  this is convenient for the
-    // client side queueing because messages can't be renumbered, but the (kernel)
-    // client will occasionally pull a message out of the sent queue to send
-    // elsewhere.  in that case it doesn't matter if we "got" it or not.
-    uint64_t cur_seq = get_in_seq();
-    if (message->get_seq() <= cur_seq) {
-      logger().error("{} got old message {} <= {} {}, discarding",
-                     conn, message->get_seq(), cur_seq, *message);
-      if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
-          local_conf()->ms_die_on_old_message) {
-        ceph_assert(0 == "old msgs despite reconnect_seq feature");
-      }
-      return seastar::now();
-    } else if (message->get_seq() > cur_seq + 1) {
-      logger().error("{} missed message? skipped from seq {} to {}",
-                     conn, cur_seq, message->get_seq());
-      if (local_conf()->ms_die_on_skipped_message) {
-        ceph_assert(0 == "skipped incoming seq");
-      }
-    }
-
-    // note last received message.
-    set_in_seq(message->get_seq());
-    if (conn.policy.lossy) {
-      logger().debug("{} <== #{} === {} ({})",
-                     conn,
-                     message->get_seq(),
-                     *message,
-                     message->get_type());
-    } else {
-      logger().debug("{} <== #{},{} === {} ({})",
-                     conn,
-                     message->get_seq(),
-                     current_header.ack_seq,
-                     *message,
-                     message->get_type());
-    }
-    notify_ack();
-    ack_out_sent(current_header.ack_seq);
-
-    // TODO: change MessageRef with seastar::shared_ptr
-    auto msg_ref = MessageRef{message, false};
-    assert(state == state_t::READY);
-    // throttle the reading process by the returned future
-    return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
-  });
+  fault(state_t::READY, where, eptr);
 }
 
 void ProtocolV2::execute_ready()
 {
-  ceph_assert_always(is_socket_valid);
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
+  protocol_timer.cancel();
+  ceph_assert_always(is_socket_valid);
+  // I'm not responsible to shutdown the socket at READY
+  is_socket_valid = false;
   trigger_state(state_t::READY, out_state_t::open, false);
-#ifdef UNIT_TESTS_BUILT
-  if (conn.interceptor) {
-    conn.interceptor->register_conn_ready(conn);
-  }
-#endif
-  ceph_assert_always(!in_exit_dispatching.has_value());
-  in_exit_dispatching = seastar::shared_promise<>();
-  gate.dispatch_in_background("execute_ready", *this, [this] {
-    protocol_timer.cancel();
-    return seastar::keep_doing([this] {
-      return frame_assembler.read_main_preamble(
-      ).then([this](auto ret) {
-        switch (ret.tag) {
-          case Tag::MESSAGE: {
-            size_t msg_size = get_msg_size(*ret.rx_frame_asm);
-            return seastar::futurize_invoke([this] {
-              // throttle_message() logic
-              if (!conn.policy.throttler_messages) {
-                return seastar::now();
-              }
-              // TODO: message throttler
-              ceph_assert(false);
-              return seastar::now();
-            }).then([this, msg_size] {
-              // throttle_bytes() logic
-              if (!conn.policy.throttler_bytes) {
-                return seastar::now();
-              }
-              if (!msg_size) {
-                return seastar::now();
-              }
-              logger().trace("{} wants {} bytes from policy throttler {}/{}",
-                             conn, msg_size,
-                             conn.policy.throttler_bytes->get_current(),
-                             conn.policy.throttler_bytes->get_max());
-              return conn.policy.throttler_bytes->get(msg_size);
-            }).then([this, msg_size] {
-              // TODO: throttle_dispatch_queue() logic
-              utime_t throttle_stamp{seastar::lowres_system_clock::now()};
-              return read_message(throttle_stamp, msg_size);
-            });
-          }
-          case Tag::ACK:
-            return frame_assembler.read_frame_payload(
-            ).then([this](auto payload) {
-              // handle_message_ack() logic
-              auto ack = AckFrame::Decode(payload->back());
-              logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
-              ack_out_sent(ack.seq());
-            });
-          case Tag::KEEPALIVE2:
-            return frame_assembler.read_frame_payload(
-            ).then([this](auto payload) {
-              // handle_keepalive2() logic
-              auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
-              logger().debug("{} GOT KeepAliveFrame: timestamp={}",
-                             conn, keepalive_frame.timestamp());
-              notify_keepalive_ack(keepalive_frame.timestamp());
-              set_last_keepalive(seastar::lowres_system_clock::now());
-            });
-          case Tag::KEEPALIVE2_ACK:
-            return frame_assembler.read_frame_payload(
-            ).then([this](auto payload) {
-              // handle_keepalive2_ack() logic
-              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
-              auto _last_keepalive_ack =
-                seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
-              set_last_keepalive_ack(_last_keepalive_ack);
-              logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
-                             conn, _last_keepalive_ack);
-            });
-          default: {
-            unexpected_tag(ret.tag, conn, "execute_ready");
-            return seastar::now();
-          }
-        }
-      });
-    }).handle_exception([this](std::exception_ptr eptr) {
-      fault(state_t::READY, "execute_ready", eptr);
-    }).finally([this] {
-      ceph_assert_always(in_exit_dispatching.has_value());
-      in_exit_dispatching->set_value();
-      in_exit_dispatching = std::nullopt;
-    });
-  });
 }
 
 // STANDBY state
index 5c7d369bb2a947c48b36a1ba1734da1839bc5639..206af1213d5e3746984da282fc817c34ee71c59e 100644 (file)
@@ -47,7 +47,7 @@ class ProtocolV2 final : public Protocol {
  private:
   void notify_out() override;
 
-  void notify_out_fault(std::exception_ptr) override;
+  void notify_out_fault(const char *, std::exception_ptr) override;
 
  private:
   SocketMessenger &messenger;
@@ -108,14 +108,6 @@ class ProtocolV2 final : public Protocol {
   uint64_t peer_global_seq = 0;
   uint64_t connect_seq = 0;
 
-  std::optional<seastar::shared_promise<>> in_exit_dispatching;
-  seastar::future<> wait_in_exit_dispatching() {
-    if (in_exit_dispatching.has_value()) {
-      return in_exit_dispatching->get_shared_future();
-    }
-    return seastar::now();
-  }
-
   seastar::future<> execution_done = seastar::now();
 
   template <typename Func>
@@ -235,7 +227,6 @@ class ProtocolV2 final : public Protocol {
                          uint64_t new_msg_seq);
 
   // READY
-  seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
   void execute_ready();
 
   // STANDBY