]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: introduce IOHandler class for message and event dispatching
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 7 Dec 2022 02:08:18 +0000 (10:08 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
IOHandler also represents the Connection as ConnectionHandler.

ProtocolV2 and IOHandler will be finally running in different cores, as
ProtocolV2 will need to call IOHandler interfaces asynchronously. And
IOHandler will also notify ProtocolV2 through HandshakeListener
asynchronously.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/CMakeLists.txt
src/crimson/net/Protocol.cc [deleted file]
src/crimson/net/Protocol.h [deleted file]
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/io_handler.cc [new file with mode: 0644]
src/crimson/net/io_handler.h [new file with mode: 0644]

index cd5d1fb0cd8a3d6ec7014760d3219537499277d9..68e1b64beb9db2deb083e22b18ab1d69684a5ac5 100644 (file)
@@ -176,11 +176,11 @@ set(crimson_net_srcs
   ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc
   net/Errors.cc
   net/FrameAssemblerV2.cc
+  net/io_handler.cc
   net/Messenger.cc
   net/SocketConnection.cc
   net/SocketMessenger.cc
   net/Socket.cc
-  net/Protocol.cc
   net/ProtocolV2.cc
   net/chained_dispatchers.cc)
 add_library(crimson STATIC
diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc
deleted file mode 100644 (file)
index b470abf..0000000
+++ /dev/null
@@ -1,718 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "Protocol.h"
-
-#include "auth/Auth.h"
-
-#include "crimson/common/formatter.h"
-#include "crimson/common/log.h"
-#include "crimson/net/Errors.h"
-#include "crimson/net/chained_dispatchers.h"
-#include "crimson/net/SocketConnection.h"
-#include "crimson/net/SocketMessenger.h"
-#include "msg/Message.h"
-
-using namespace ceph::msgr::v2;
-using crimson::common::local_conf;
-
-namespace {
-
-seastar::logger& logger() {
-  return crimson::get_logger(ceph_subsys_ms);
-}
-
-[[noreturn]] void abort_in_fault() {
-  throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
-}
-
-[[noreturn]] void abort_protocol() {
-  throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
-}
-
-std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
-{
-  ceph_assert(rx_frame_asm.get_num_segments() > 0);
-  size_t sum = 0;
-  // we don't include SegmentIndex::Msg::HEADER.
-  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
-    sum += rx_frame_asm.get_segment_logical_len(idx);
-  }
-  return sum;
-}
-
-} // namespace anonymous
-
-namespace crimson::net {
-
-Protocol::Protocol(ChainedDispatchers& dispatchers,
-                   SocketConnection& conn)
-  : dispatchers(dispatchers),
-    conn(conn)
-{}
-
-Protocol::~Protocol()
-{
-  ceph_assert(gate.is_closed());
-  assert(!out_exit_dispatching);
-}
-
-ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
-  bool require_keepalive,
-  std::optional<utime_t> maybe_keepalive_ack,
-  bool require_ack)
-{
-  std::size_t num_msgs = out_pending_msgs.size();
-  ceph::bufferlist bl;
-
-  if (unlikely(require_keepalive)) {
-    auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(frame_assembler->get_buffer(keepalive_frame));
-  }
-
-  if (unlikely(maybe_keepalive_ack.has_value())) {
-    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
-    bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
-  }
-
-  if (require_ack && num_msgs == 0u) {
-    auto ack_frame = AckFrame::Encode(get_in_seq());
-    bl.append(frame_assembler->get_buffer(ack_frame));
-  }
-
-  std::for_each(
-      out_pending_msgs.begin(),
-      out_pending_msgs.begin()+num_msgs,
-      [this, &bl](const MessageURef& msg) {
-    // set priority
-    msg->get_header().src = conn.messenger.get_myname();
-
-    msg->encode(conn.features, 0);
-
-    ceph_assert(!msg->get_seq() && "message already has seq");
-    msg->set_seq(++out_seq);
-
-    ceph_msg_header &header = msg->get_header();
-    ceph_msg_footer &footer = msg->get_footer();
-
-    ceph_msg_header2 header2{header.seq,        header.tid,
-                             header.type,       header.priority,
-                             header.version,
-                             ceph_le32(0),      header.data_off,
-                             ceph_le64(get_in_seq()),
-                             footer.flags,      header.compat_version,
-                             header.reserved};
-
-    auto message = MessageFrame::Encode(header2,
-        msg->get_payload(), msg->get_middle(), msg->get_data());
-    logger().debug("{} --> #{} === {} ({})",
-                  conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(frame_assembler->get_buffer(message));
-  });
-
-  if (!conn.policy.lossy) {
-    out_sent_msgs.insert(
-        out_sent_msgs.end(),
-        std::make_move_iterator(out_pending_msgs.begin()),
-        std::make_move_iterator(out_pending_msgs.end()));
-  }
-  out_pending_msgs.clear();
-  return bl;
-}
-
-seastar::future<> Protocol::send(MessageURef msg)
-{
-  if (io_state != io_state_t::drop) {
-    out_pending_msgs.push_back(std::move(msg));
-    notify_out_dispatch();
-  }
-  return seastar::now();
-}
-
-seastar::future<> Protocol::send_keepalive()
-{
-  if (!need_keepalive) {
-    need_keepalive = true;
-    notify_out_dispatch();
-  }
-  return seastar::now();
-}
-
-void Protocol::mark_down()
-{
-  ceph_assert_always(io_state != io_state_t::none);
-  need_dispatch_reset = false;
-  if (io_state == io_state_t::drop) {
-    return;
-  }
-
-  logger().info("{} mark_down() with {}",
-                conn, io_stat_printer{*this});
-  set_io_state(io_state_t::drop);
-  notify_mark_down();
-}
-
-void Protocol::print_io_stat(std::ostream &out) const
-{
-  out << "io_stat("
-      << "io_state=" << fmt::format("{}", io_state)
-      << ", in_seq=" << in_seq
-      << ", out_seq=" << out_seq
-      << ", out_pending_msgs_size=" << out_pending_msgs.size()
-      << ", out_sent_msgs_size=" << out_sent_msgs.size()
-      << ", need_ack=" << (ack_left > 0)
-      << ", need_keepalive=" << need_keepalive
-      << ", need_keepalive_ack=" << bool(next_keepalive_ack)
-      << ")";
-}
-
-void Protocol::set_io_state(
-    const Protocol::io_state_t &new_state,
-    FrameAssemblerV2Ref fa)
-{
-  ceph_assert_always(!(
-    (new_state == io_state_t::none && io_state != io_state_t::none) ||
-    (new_state == io_state_t::open && io_state == io_state_t::open) ||
-    (new_state != io_state_t::drop && io_state == io_state_t::drop)
-  ));
-
-  bool dispatch_in = false;
-  if (new_state == io_state_t::open) {
-    // to open
-    ceph_assert_always(protocol_is_connected == true);
-    assert(fa != nullptr);
-    ceph_assert_always(frame_assembler == nullptr);
-    frame_assembler = std::move(fa);
-    ceph_assert_always(frame_assembler->is_socket_valid());
-    dispatch_in = true;
-#ifdef UNIT_TESTS_BUILT
-    if (conn.interceptor) {
-      conn.interceptor->register_conn_ready(conn);
-    }
-#endif
-  } else if (io_state == io_state_t::open) {
-    // from open
-    ceph_assert_always(protocol_is_connected == true);
-    protocol_is_connected = false;
-    assert(fa == nullptr);
-    ceph_assert_always(frame_assembler->is_socket_valid());
-    frame_assembler->shutdown_socket();
-    if (out_dispatching) {
-      ceph_assert_always(!out_exit_dispatching.has_value());
-      out_exit_dispatching = seastar::promise<>();
-    }
-  } else {
-    assert(fa == nullptr);
-  }
-
-  if (io_state != new_state) {
-    io_state = new_state;
-    io_state_changed.set_value();
-    io_state_changed = seastar::promise<>();
-  }
-
-  /*
-   * not atomic below
-   */
-
-  if (dispatch_in) {
-    do_in_dispatch();
-  }
-}
-
-seastar::future<FrameAssemblerV2Ref> Protocol::wait_io_exit_dispatching()
-{
-  ceph_assert_always(io_state != io_state_t::open);
-  ceph_assert_always(frame_assembler != nullptr);
-  ceph_assert_always(!frame_assembler->is_socket_valid());
-  return seastar::when_all(
-    [this] {
-      if (out_exit_dispatching) {
-        return out_exit_dispatching->get_future();
-      } else {
-        return seastar::now();
-      }
-    }(),
-    [this] {
-      if (in_exit_dispatching) {
-        return in_exit_dispatching->get_future();
-      } else {
-        return seastar::now();
-      }
-    }()
-  ).discard_result().then([this] {
-    return std::move(frame_assembler);
-  });
-}
-
-void Protocol::reset_session(bool full)
-{
-  // reset in
-  in_seq = 0;
-  if (full) {
-    reset_out();
-    dispatch_remote_reset();
-  }
-}
-
-void Protocol::requeue_out_sent()
-{
-  assert(io_state != io_state_t::open);
-  if (out_sent_msgs.empty()) {
-    return;
-  }
-
-  out_seq -= out_sent_msgs.size();
-  logger().debug("{} requeue {} items, revert out_seq to {}",
-                 conn, out_sent_msgs.size(), out_seq);
-  for (MessageURef& msg : out_sent_msgs) {
-    msg->clear_payload();
-    msg->set_seq(0);
-  }
-  out_pending_msgs.insert(
-      out_pending_msgs.begin(),
-      std::make_move_iterator(out_sent_msgs.begin()),
-      std::make_move_iterator(out_sent_msgs.end()));
-  out_sent_msgs.clear();
-  notify_out_dispatch();
-}
-
-void Protocol::requeue_out_sent_up_to(seq_num_t seq)
-{
-  assert(io_state != io_state_t::open);
-  if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
-    logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
-                   conn, out_seq, seq);
-    out_seq = seq;
-    return;
-  }
-  logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
-                 conn, seq, out_sent_msgs.size(), out_seq);
-  while (!out_sent_msgs.empty()) {
-    auto cur_seq = out_sent_msgs.front()->get_seq();
-    if (cur_seq == 0 || cur_seq > seq) {
-      break;
-    } else {
-      out_sent_msgs.pop_front();
-    }
-  }
-  requeue_out_sent();
-}
-
-void Protocol::reset_out()
-{
-  assert(io_state != io_state_t::open);
-  out_seq = 0;
-  out_pending_msgs.clear();
-  out_sent_msgs.clear();
-  need_keepalive = false;
-  next_keepalive_ack = std::nullopt;
-  ack_left = 0;
-}
-
-void Protocol::dispatch_accept()
-{
-  if (io_state == io_state_t::drop) {
-    return;
-  }
-  // protocol_is_connected can be from true to true here if the replacing is
-  // happening to a connected connection.
-  protocol_is_connected = true;
-  dispatchers.ms_handle_accept(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-}
-
-void Protocol::dispatch_connect()
-{
-  if (io_state == io_state_t::drop) {
-    return;
-  }
-  ceph_assert_always(protocol_is_connected == false);
-  protocol_is_connected = true;
-  dispatchers.ms_handle_connect(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-}
-
-void Protocol::dispatch_reset(bool is_replace)
-{
-  ceph_assert_always(io_state == io_state_t::drop);
-  if (!need_dispatch_reset) {
-    return;
-  }
-  need_dispatch_reset = false;
-  dispatchers.ms_handle_reset(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
-    is_replace);
-}
-
-void Protocol::dispatch_remote_reset()
-{
-  if (io_state == io_state_t::drop) {
-    return;
-  }
-  dispatchers.ms_handle_remote_reset(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-}
-
-void Protocol::ack_out_sent(seq_num_t seq)
-{
-  if (conn.policy.lossy) {  // lossy connections don't keep sent messages
-    return;
-  }
-  while (!out_sent_msgs.empty() &&
-         out_sent_msgs.front()->get_seq() <= seq) {
-    logger().trace("{} got ack seq {} >= {}, pop {}",
-                   conn, seq, out_sent_msgs.front()->get_seq(),
-                   *out_sent_msgs.front());
-    out_sent_msgs.pop_front();
-  }
-}
-
-seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
-  assert(!is_out_queued());
-  return frame_assembler->flush(
-  ).then([this] {
-    if (!is_out_queued()) {
-      // still nothing pending to send after flush,
-      // the dispatching can ONLY stop now
-      ceph_assert(out_dispatching);
-      out_dispatching = false;
-      if (unlikely(out_exit_dispatching.has_value())) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: nothing queued at {},"
-                      " set out_exit_dispatching",
-                      conn, io_state);
-      }
-      return seastar::make_ready_future<stop_t>(stop_t::yes);
-    } else {
-      // something is pending to send during flushing
-      return seastar::make_ready_future<stop_t>(stop_t::no);
-    }
-  });
-}
-
-seastar::future<> Protocol::do_out_dispatch()
-{
-  return seastar::repeat([this] {
-    switch (io_state) {
-     case io_state_t::open: {
-      bool still_queued = is_out_queued();
-      if (unlikely(!still_queued)) {
-        return try_exit_out_dispatch();
-      }
-      auto to_ack = ack_left;
-      assert(to_ack == 0 || in_seq > 0);
-      // sweep all pending out with the concrete Protocol
-      return frame_assembler->write(
-        sweep_out_pending_msgs_to_sent(
-          need_keepalive, next_keepalive_ack, to_ack > 0)
-      ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
-        need_keepalive = false;
-        if (next_keepalive_ack == prv_keepalive_ack) {
-          next_keepalive_ack = std::nullopt;
-        }
-        assert(ack_left >= to_ack);
-        ack_left -= to_ack;
-        if (!is_out_queued()) {
-          return try_exit_out_dispatch();
-        } else {
-          // messages were enqueued during socket write
-          return seastar::make_ready_future<stop_t>(stop_t::no);
-        }
-      });
-     }
-     case io_state_t::delay:
-      // delay out dispatching until open
-      if (out_exit_dispatching) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
-      } else {
-        logger().info("{} do_out_dispatch: delay ...", conn);
-      }
-      return io_state_changed.get_future(
-      ).then([] { return stop_t::no; });
-     case io_state_t::drop:
-      ceph_assert(out_dispatching);
-      out_dispatching = false;
-      if (out_exit_dispatching) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
-      } else {
-        logger().info("{} do_out_dispatch: dropped", conn);
-      }
-      return seastar::make_ready_future<stop_t>(stop_t::yes);
-     default:
-      ceph_assert(false);
-    }
-  }).handle_exception_type([this] (const std::system_error& e) {
-    if (e.code() != std::errc::broken_pipe &&
-        e.code() != std::errc::connection_reset &&
-        e.code() != error::negotiation_failure) {
-      logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
-                     conn, io_state, e);
-      ceph_abort();
-    }
-
-    if (io_state == io_state_t::open) {
-      logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
-                    conn, io_state, e);
-      std::exception_ptr eptr;
-      try {
-        throw e;
-      } catch(...) {
-        eptr = std::current_exception();
-      }
-      set_io_state(io_state_t::delay);
-      notify_out_fault("do_out_dispatch", eptr);
-    } else {
-      logger().info("{} do_out_dispatch(): fault at {} -- {}",
-                    conn, io_state, e);
-    }
-
-    return do_out_dispatch();
-  });
-}
-
-void Protocol::notify_out_dispatch()
-{
-  notify_out();
-  if (out_dispatching) {
-    // already dispatching
-    return;
-  }
-  out_dispatching = true;
-  switch (io_state) {
-   case io_state_t::open:
-     [[fallthrough]];
-   case io_state_t::delay:
-    assert(!gate.is_closed());
-    gate.dispatch_in_background("do_out_dispatch", conn, [this] {
-      return do_out_dispatch();
-    });
-    return;
-   case io_state_t::drop:
-    out_dispatching = false;
-    return;
-   default:
-    ceph_assert(false);
-  }
-}
-
-seastar::future<>
-Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
-{
-  return frame_assembler->read_frame_payload(
-  ).then([this, throttle_stamp, msg_size](auto payload) {
-    if (unlikely(io_state != io_state_t::open)) {
-      logger().debug("{} triggered {} during read_message()",
-                     conn, io_state);
-      abort_protocol();
-    }
-
-    utime_t recv_stamp{seastar::lowres_system_clock::now()};
-
-    // we need to get the size before std::moving segments data
-    auto msg_frame = MessageFrame::Decode(*payload);
-    // XXX: paranoid copy just to avoid oops
-    ceph_msg_header2 current_header = msg_frame.header();
-
-    logger().trace("{} got {} + {} + {} byte message,"
-                   " envelope type={} src={} off={} seq={}",
-                   conn, msg_frame.front_len(), msg_frame.middle_len(),
-                   msg_frame.data_len(), current_header.type, conn.get_peer_name(),
-                   current_header.data_off, current_header.seq);
-
-    ceph_msg_header header{current_header.seq,
-                           current_header.tid,
-                           current_header.type,
-                           current_header.priority,
-                           current_header.version,
-                           ceph_le32(msg_frame.front_len()),
-                           ceph_le32(msg_frame.middle_len()),
-                           ceph_le32(msg_frame.data_len()),
-                           current_header.data_off,
-                           conn.get_peer_name(),
-                           current_header.compat_version,
-                           current_header.reserved,
-                           ceph_le32(0)};
-    ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
-                           ceph_le32(0), ceph_le64(0), current_header.flags};
-
-    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
-        conn.shared_from_this());
-    Message *message = decode_message(nullptr, 0, header, footer,
-        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
-    if (!message) {
-      logger().warn("{} decode message failed", conn);
-      abort_in_fault();
-    }
-
-    // store reservation size in message, so we don't get confused
-    // by messages entering the dispatch queue through other paths.
-    message->set_dispatch_throttle_size(msg_size);
-
-    message->set_throttle_stamp(throttle_stamp);
-    message->set_recv_stamp(recv_stamp);
-    message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
-
-    // check received seq#.  if it is old, drop the message.
-    // note that incoming messages may skip ahead.  this is convenient for the
-    // client side queueing because messages can't be renumbered, but the (kernel)
-    // client will occasionally pull a message out of the sent queue to send
-    // elsewhere.  in that case it doesn't matter if we "got" it or not.
-    uint64_t cur_seq = get_in_seq();
-    if (message->get_seq() <= cur_seq) {
-      logger().error("{} got old message {} <= {} {}, discarding",
-                     conn, message->get_seq(), cur_seq, *message);
-      if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
-          local_conf()->ms_die_on_old_message) {
-        ceph_assert(0 == "old msgs despite reconnect_seq feature");
-      }
-      return seastar::now();
-    } else if (message->get_seq() > cur_seq + 1) {
-      logger().error("{} missed message? skipped from seq {} to {}",
-                     conn, cur_seq, message->get_seq());
-      if (local_conf()->ms_die_on_skipped_message) {
-        ceph_assert(0 == "skipped incoming seq");
-      }
-    }
-
-    // note last received message.
-    in_seq = message->get_seq();
-    if (conn.policy.lossy) {
-      logger().debug("{} <== #{} === {} ({})",
-                     conn,
-                     message->get_seq(),
-                     *message,
-                     message->get_type());
-    } else {
-      logger().debug("{} <== #{},{} === {} ({})",
-                     conn,
-                     message->get_seq(),
-                     current_header.ack_seq,
-                     *message,
-                     message->get_type());
-    }
-
-    // notify ack
-    if (!conn.policy.lossy) {
-      ++ack_left;
-      notify_out_dispatch();
-    }
-
-    ack_out_sent(current_header.ack_seq);
-
-    // TODO: change MessageRef with seastar::shared_ptr
-    auto msg_ref = MessageRef{message, false};
-    assert(io_state == io_state_t::open);
-    // throttle the reading process by the returned future
-    return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
-  });
-}
-
-void Protocol::do_in_dispatch()
-{
-  ceph_assert_always(!in_exit_dispatching.has_value());
-  in_exit_dispatching = seastar::promise<>();
-  gate.dispatch_in_background("do_in_dispatch", conn, [this] {
-    return seastar::keep_doing([this] {
-      return frame_assembler->read_main_preamble(
-      ).then([this](auto ret) {
-        switch (ret.tag) {
-          case Tag::MESSAGE: {
-            size_t msg_size = get_msg_size(*ret.rx_frame_asm);
-            return seastar::futurize_invoke([this] {
-              // throttle_message() logic
-              if (!conn.policy.throttler_messages) {
-                return seastar::now();
-              }
-              // TODO: message throttler
-              ceph_assert(false);
-              return seastar::now();
-            }).then([this, msg_size] {
-              // throttle_bytes() logic
-              if (!conn.policy.throttler_bytes) {
-                return seastar::now();
-              }
-              if (!msg_size) {
-                return seastar::now();
-              }
-              logger().trace("{} wants {} bytes from policy throttler {}/{}",
-                             conn, msg_size,
-                             conn.policy.throttler_bytes->get_current(),
-                             conn.policy.throttler_bytes->get_max());
-              return conn.policy.throttler_bytes->get(msg_size);
-            }).then([this, msg_size] {
-              // TODO: throttle_dispatch_queue() logic
-              utime_t throttle_stamp{seastar::lowres_system_clock::now()};
-              return read_message(throttle_stamp, msg_size);
-            });
-          }
-          case Tag::ACK:
-            return frame_assembler->read_frame_payload(
-            ).then([this](auto payload) {
-              // handle_message_ack() logic
-              auto ack = AckFrame::Decode(payload->back());
-              logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
-              ack_out_sent(ack.seq());
-            });
-          case Tag::KEEPALIVE2:
-            return frame_assembler->read_frame_payload(
-            ).then([this](auto payload) {
-              // handle_keepalive2() logic
-              auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
-              logger().debug("{} GOT KeepAliveFrame: timestamp={}",
-                             conn, keepalive_frame.timestamp());
-              // notify keepalive ack
-              next_keepalive_ack = keepalive_frame.timestamp();
-              notify_out_dispatch();
-
-              last_keepalive = seastar::lowres_system_clock::now();
-            });
-          case Tag::KEEPALIVE2_ACK:
-            return frame_assembler->read_frame_payload(
-            ).then([this](auto payload) {
-              // handle_keepalive2_ack() logic
-              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
-              auto _last_keepalive_ack =
-                seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
-              set_last_keepalive_ack(_last_keepalive_ack);
-              logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
-                             conn, _last_keepalive_ack);
-            });
-          default: {
-            logger().warn("{} do_in_dispatch() received unexpected tag: {}",
-                          conn, static_cast<uint32_t>(ret.tag));
-            abort_in_fault();
-          }
-        }
-      });
-    }).handle_exception([this](std::exception_ptr eptr) {
-      const char *e_what;
-      try {
-        std::rethrow_exception(eptr);
-      } catch (std::exception &e) {
-        e_what = e.what();
-      }
-
-      if (io_state == io_state_t::open) {
-        logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
-                      conn, io_state, e_what);
-        set_io_state(io_state_t::delay);
-        notify_out_fault("do_in_dispatch", eptr);
-      } else {
-        logger().info("{} do_in_dispatch(): fault at {} -- {}",
-                      conn, io_state, e_what);
-      }
-    }).finally([this] {
-      ceph_assert_always(in_exit_dispatching.has_value());
-      in_exit_dispatching->set_value();
-      in_exit_dispatching = std::nullopt;
-    });
-  });
-}
-
-} // namespace crimson::net
diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h
deleted file mode 100644 (file)
index 5bfdc71..0000000
+++ /dev/null
@@ -1,246 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <seastar/core/gate.hh>
-#include <seastar/core/shared_future.hh>
-#include <seastar/util/later.hh>
-
-#include "crimson/common/gated.h"
-#include "crimson/common/log.h"
-#include "Fwd.h"
-#include "SocketConnection.h"
-#include "FrameAssemblerV2.h"
-
-namespace crimson::net {
-
-class Protocol {
-// public to SocketConnection
- public:
-  Protocol(Protocol&&) = delete;
-  virtual ~Protocol();
-
-  virtual seastar::future<> close_clean_yielded() = 0;
-
-#ifdef UNIT_TESTS_BUILT
-  virtual bool is_closed_clean() const = 0;
-
-  virtual bool is_closed() const = 0;
-
-#endif
-  virtual void start_connect(const entity_addr_t& peer_addr,
-                             const entity_name_t& peer_name) = 0;
-
-  virtual void start_accept(SocketRef&& socket,
-                            const entity_addr_t& peer_addr) = 0;
-
- protected:
-  Protocol(ChainedDispatchers& dispatchers,
-           SocketConnection& conn);
-
-  virtual void notify_out() = 0;
-
-  virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
-
-  virtual void notify_mark_down() = 0;
-
-// the write state-machine
- public:
-  using clock_t = seastar::lowres_system_clock;
-
-  bool is_connected() const {
-    return protocol_is_connected;
-  }
-
-  seastar::future<> send(MessageURef msg);
-
-  seastar::future<> send_keepalive();
-
-  clock_t::time_point get_last_keepalive() const {
-    return last_keepalive;
-  }
-
-  clock_t::time_point get_last_keepalive_ack() const {
-    return last_keepalive_ack;
-  }
-
-  void set_last_keepalive_ack(clock_t::time_point when) {
-    last_keepalive_ack = when;
-  }
-
-  void mark_down();
-
-  struct io_stat_printer {
-    const Protocol &protocol;
-  };
-  void print_io_stat(std::ostream &out) const;
-
-// TODO: encapsulate a SessionedSender class
- protected:
-  seastar::future<> close_io(
-      bool is_dispatch_reset,
-      bool is_replace) {
-    ceph_assert_always(io_state == io_state_t::drop);
-
-    if (is_dispatch_reset) {
-      dispatch_reset(is_replace);
-    }
-    assert(!gate.is_closed());
-    return gate.close();
-  }
-
-  /**
-   * io_state_t
-   *
-   * The io_state is changed with protocol state atomically, indicating the
-   * IOHandler behavior of the according protocol state.
-   */
-  enum class io_state_t : uint8_t {
-    none,
-    delay,
-    open,
-    drop
-  };
-  friend class fmt::formatter<io_state_t>;
-
-  void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
-
-  seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
-
-  void reset_session(bool full);
-
-  void requeue_out_sent_up_to(seq_num_t seq);
-
-  void requeue_out_sent();
-
-  bool is_out_queued_or_sent() const {
-    return is_out_queued() || !out_sent_msgs.empty();
-  }
-
-  seq_num_t get_in_seq() const {
-    return in_seq;
-  }
-
-  void dispatch_accept();
-
-  void dispatch_connect();
-
- private:
-  void dispatch_reset(bool is_replace);
-
-  void dispatch_remote_reset();
-
-  bool is_out_queued() const {
-    return (!out_pending_msgs.empty() ||
-            ack_left > 0 ||
-            need_keepalive ||
-            next_keepalive_ack.has_value());
-  }
-
-  void reset_out();
-
-  seastar::future<stop_t> try_exit_out_dispatch();
-
-  seastar::future<> do_out_dispatch();
-
-  ceph::bufferlist sweep_out_pending_msgs_to_sent(
-      bool require_keepalive,
-      std::optional<utime_t> maybe_keepalive_ack,
-      bool require_ack);
-
-  void notify_out_dispatch();
-
-  void ack_out_sent(seq_num_t seq);
-
-  seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
-
-  void do_in_dispatch();
-
-private:
-  ChainedDispatchers &dispatchers;
-
-  SocketConnection &conn;
-
-  crimson::common::Gated gate;
-
-  FrameAssemblerV2Ref frame_assembler;
-
-  bool protocol_is_connected = false;
-
-  bool need_dispatch_reset = true;
-
-  io_state_t io_state = io_state_t::none;
-
-  // wait until current io_state changed
-  seastar::promise<> io_state_changed;
-
-  /*
-   * out states for writing
-   */
-
-  bool out_dispatching = false;
-
-  std::optional<seastar::promise<>> out_exit_dispatching;
-
-  /// the seq num of the last transmitted message
-  seq_num_t out_seq = 0;
-
-  // messages to be resent after connection gets reset
-  std::deque<MessageURef> out_pending_msgs;
-
-  // messages sent, but not yet acked by peer
-  std::deque<MessageURef> out_sent_msgs;
-
-  bool need_keepalive = false;
-
-  std::optional<utime_t> next_keepalive_ack = std::nullopt;
-
-  uint64_t ack_left = 0;
-
-  /*
-   * in states for reading
-   */
-
-  std::optional<seastar::promise<>> in_exit_dispatching;
-
-  /// the seq num of the last received message
-  seq_num_t in_seq = 0;
-
-  clock_t::time_point last_keepalive;
-
-  clock_t::time_point last_keepalive_ack;
-};
-
-inline std::ostream& operator<<(
-    std::ostream& out, Protocol::io_stat_printer stat) {
-  stat.protocol.print_io_stat(out);
-  return out;
-}
-
-} // namespace crimson::net
-
-template <>
-struct fmt::formatter<crimson::net::Protocol::io_state_t>
-  : fmt::formatter<std::string_view> {
-  template <typename FormatContext>
-  auto format(crimson::net::Protocol::io_state_t state, FormatContext& ctx) {
-    using enum crimson::net::Protocol::io_state_t;
-    std::string_view name;
-    switch (state) {
-    case none:
-      name = "none";
-      break;
-    case delay:
-      name = "delay";
-      break;
-    case open:
-      name = "open";
-      break;
-    case drop:
-      name = "drop";
-      break;
-    }
-    return formatter<string_view>::format(name, ctx);
-  }
-};
index b4a5767d7fa26d794973c9693e7287f0482b8f1b..0b9d4be2d257d33fce3d419e18b9820440449d82 100644 (file)
@@ -12,9 +12,9 @@
 #include "crimson/auth/AuthClient.h"
 #include "crimson/auth/AuthServer.h"
 #include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
 
 #include "Errors.h"
-#include "SocketConnection.h"
 #include "SocketMessenger.h"
 
 #ifdef UNIT_TESTS_BUILT
@@ -23,6 +23,8 @@
 
 using namespace ceph::msgr::v2;
 using crimson::common::local_conf;
+using io_state_t = crimson::net::IOHandler::io_state_t;
+using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
 
 namespace {
 
@@ -154,11 +156,11 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds)
   });
 }
 
-ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
-                       SocketConnection& conn)
-  : Protocol(dispatchers, conn),
-    conn{conn},
+ProtocolV2::ProtocolV2(SocketConnection& conn,
+                       IOHandler &io_handler)
+  : conn{conn},
     messenger{conn.messenger},
+    io_handler{io_handler},
     frame_assembler{FrameAssemblerV2::create(conn)},
     auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
     protocol_timer{conn}
@@ -225,9 +227,9 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
   if (new_state == state_t::READY) {
     // I'm not responsible to shutdown the socket at READY
     is_socket_valid = false;
-    set_io_state(new_io_state, std::move(frame_assembler));
+    io_handler.set_io_state(new_io_state, std::move(frame_assembler));
   } else {
-    set_io_state(new_io_state, nullptr);
+    io_handler.set_io_state(new_io_state, nullptr);
   }
 
   /*
@@ -236,7 +238,7 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
 
   if (pre_state == state_t::READY) {
     gate.dispatch_in_background("exit_io", conn, [this] {
-      return wait_io_exit_dispatching(
+      return io_handler.wait_io_exit_dispatching(
       ).then([this](FrameAssemblerV2Ref fa) {
         frame_assembler = std::move(fa);
         exit_io->set_value();
@@ -308,20 +310,20 @@ void ProtocolV2::fault(
   }
 
   if (conn.policy.server ||
-      (conn.policy.standby && !is_out_queued_or_sent())) {
+      (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
     if (conn.policy.server) {
       logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
                     conn,
                     get_state_name(state),
                     where,
-                    io_stat_printer{*this},
+                    io_stat_printer{io_handler},
                     e_what);
     } else {
       logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
                     conn,
                     get_state_name(state),
                     where,
-                    io_stat_printer{*this},
+                    io_stat_printer{io_handler},
                     e_what);
     }
     execute_standby();
@@ -331,7 +333,7 @@ void ProtocolV2::fault(
                   conn,
                   get_state_name(state),
                   where,
-                  io_stat_printer{*this},
+                  io_stat_printer{io_handler},
                   e_what);
     execute_wait(false);
   } else {
@@ -341,7 +343,7 @@ void ProtocolV2::fault(
                   conn,
                   get_state_name(state),
                   where,
-                  io_stat_printer{*this},
+                  io_stat_printer{io_handler},
                   e_what);
     execute_connecting();
   }
@@ -355,7 +357,7 @@ void ProtocolV2::reset_session(bool full)
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
   }
-  do_reset_session(full);
+  io_handler.reset_session(full);
 }
 
 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
@@ -633,7 +635,7 @@ ProtocolV2::client_connect()
         return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
           // handle_server_ident() logic
-          requeue_out_sent();
+          io_handler.requeue_out_sent();
           auto server_ident = ServerIdentFrame::Decode(payload->back());
           logger().debug("{} GOT ServerIdentFrame:"
                          " addrs={}, gid={}, gs={},"
@@ -709,12 +711,12 @@ ProtocolV2::client_reconnect()
                                           server_cookie,
                                           global_seq,
                                           connect_seq,
-                                          get_in_seq());
+                                          io_handler.get_in_seq());
   logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
                  " server_cookie={}, gs={}, cs={}, in_seq={}",
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
-                 global_seq, connect_seq, get_in_seq());
+                 global_seq, connect_seq, io_handler.get_in_seq());
   return frame_assembler->write_flush_frame(reconnect).then([this] {
     return frame_assembler->read_main_preamble();
   }).then([this](auto ret) {
@@ -764,7 +766,7 @@ ProtocolV2::client_reconnect()
           auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
                          conn, reconnect_ok.msg_seq());
-          requeue_out_sent_up_to(reconnect_ok.msg_seq());
+          io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
@@ -872,8 +874,8 @@ void ProtocolV2::execute_connecting()
                           "client_cookie={}, server_cookie={}, {}",
                           conn, global_seq, peer_global_seq, connect_seq,
                           client_cookie, server_cookie,
-                          io_stat_printer{*this});
-            dispatch_connect();
+                          io_stat_printer{io_handler});
+            io_handler.dispatch_connect();
             if (unlikely(state != state_t::CONNECTING)) {
               logger().debug("{} triggered {} after ms_handle_connect(), abort",
                              conn, get_state_name(state));
@@ -1593,7 +1595,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
     accept_me();
   }
 
-  dispatch_accept();
+  io_handler.dispatch_accept();
   if (unlikely(state != state_t::ESTABLISHING)) {
     logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
                    conn, get_state_name(state));
@@ -1613,7 +1615,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
                     "client_cookie={}, server_cookie={}, {}",
                     conn, global_seq, peer_global_seq, connect_seq,
                     client_cookie, server_cookie,
-                    io_stat_printer{*this});
+                    io_stat_printer{io_handler});
       execute_ready();
     }).handle_exception([this](std::exception_ptr eptr) {
       fault(state_t::ESTABLISHING, "execute_establishing", eptr);
@@ -1633,8 +1635,8 @@ ProtocolV2::send_server_ident()
   logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
 
   // this is required for the case when this connection is being replaced
-  requeue_out_sent_up_to(0);
-  do_reset_session(false);
+  io_handler.requeue_out_sent_up_to(0);
+  io_handler.reset_session(false);
 
   if (!conn.policy.lossy) {
     server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
@@ -1699,7 +1701,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
        new_peer_global_seq,
        new_connect_seq, new_msg_seq] () mutable {
     ceph_assert_always(state == state_t::REPLACING);
-    dispatch_accept();
+    io_handler.dispatch_accept();
     // state may become CLOSING, close mover.socket and abort later
     return wait_exit_io(
     ).then([this] {
@@ -1742,9 +1744,9 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       if (reconnect) {
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
-        requeue_out_sent_up_to(new_msg_seq);
-        auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
-        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
+        io_handler.requeue_out_sent_up_to(new_msg_seq);
+        auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
+        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
         return frame_assembler->write_flush_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
@@ -1769,7 +1771,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                     conn, reconnect ? "reconnected" : "connected",
                     global_seq, peer_global_seq, connect_seq,
                     client_cookie, server_cookie,
-                    io_stat_printer{*this});
+                    io_stat_printer{io_handler});
       execute_ready();
     }).handle_exception([this](std::exception_ptr eptr) {
       fault(state_t::REPLACING, "trigger_replacing", eptr);
@@ -1933,7 +1935,7 @@ void ProtocolV2::do_close(
   }
   assert(!gate.is_closed());
   auto handshake_closed = gate.close();
-  auto io_closed = close_io(
+  auto io_closed = io_handler.close_io(
       is_dispatch_reset, is_replace);
 
   // asynchronous operations
index 820d8e5f0acfd3dc799fcbf85a72be3daf086456..b6f67b566510e53d1369eb346b56700efae35a62 100644 (file)
@@ -3,48 +3,60 @@
 
 #pragma once
 
+#include <seastar/core/shared_future.hh>
 #include <seastar/core/sleep.hh>
 
-#include "Protocol.h"
+#include "io_handler.h"
 
 namespace crimson::net {
 
-class ProtocolV2 final : public Protocol {
+class ProtocolV2 final : public HandshakeListener {
   using AuthConnectionMetaRef = seastar::lw_shared_ptr<AuthConnectionMeta>;
 
- public:
-  ProtocolV2(ChainedDispatchers& dispatchers,
-             SocketConnection& conn);
-  ~ProtocolV2() override;
+public:
+  ProtocolV2(SocketConnection &,
+             IOHandler &);
 
-// public to SocketConnection, but private to the others
- private:
-  seastar::future<> close_clean_yielded() override;
+  ~ProtocolV2() final;
 
-#ifdef UNIT_TESTS_BUILT
-  bool is_closed_clean() const override {
-    return closed_clean;
-  }
+  ProtocolV2(const ProtocolV2 &) = delete;
+  ProtocolV2(ProtocolV2 &&) = delete;
+  ProtocolV2 &operator=(const ProtocolV2 &) = delete;
+  ProtocolV2 &operator=(ProtocolV2 &&) = delete;
 
-  bool is_closed() const override {
-    return closed;
-  }
+/**
+ * as HandshakeListener
+ */
+private:
+  void notify_out() final;
 
-#endif
+  void notify_out_fault(const char *, std::exception_ptr) final;
+
+  void notify_mark_down() final;
+
+/*
+* as ProtocolV2 to be called by SocketConnection
+*/
+public:
   void start_connect(const entity_addr_t& peer_addr,
-                     const entity_name_t& peer_name) override;
+                     const entity_name_t& peer_name);
 
   void start_accept(SocketRef&& socket,
-                    const entity_addr_t& peer_addr) override;
+                    const entity_addr_t& peer_addr);
 
- private:
-  void notify_out() override;
+  seastar::future<> close_clean_yielded();
 
-  void notify_out_fault(const char *, std::exception_ptr) override;
+#ifdef UNIT_TESTS_BUILT
+  bool is_closed_clean() const {
+    return closed_clean;
+  }
 
-  void notify_mark_down() override;
+  bool is_closed() const {
+    return closed;
+  }
 
- private:
+#endif
+private:
   seastar::future<> wait_exit_io() {
     if (exit_io.has_value()) {
       return exit_io->get_shared_future();
@@ -80,7 +92,7 @@ class ProtocolV2 final : public Protocol {
     return statenames[static_cast<int>(state)];
   }
 
-  void trigger_state(state_t state, io_state_t io_state, bool reentrant);
+  void trigger_state(state_t state, IOHandler::io_state_t io_state, bool reentrant);
 
   template <typename Func, typename T>
   void gated_execute(const char *what, T &who, Func &&func) {
@@ -196,11 +208,13 @@ class ProtocolV2 final : public Protocol {
   void do_close(bool is_dispatch_reset,
                 std::optional<std::function<void()>> f_accept_new=std::nullopt);
 
- private:
+private:
   SocketConnection &conn;
 
   SocketMessenger &messenger;
 
+  IOHandler &io_handler;
+
   bool has_socket = false;
 
   // the socket exists and it is not shutdown
@@ -254,6 +268,18 @@ class ProtocolV2 final : public Protocol {
   Timer protocol_timer;
 };
 
+struct create_handlers_ret {
+  std::unique_ptr<ConnectionHandler> io_handler;
+  std::unique_ptr<ProtocolV2> protocol;
+};
+inline create_handlers_ret create_handlers(ChainedDispatchers &dispatchers, SocketConnection &conn) {
+  std::unique_ptr<ConnectionHandler> io_handler = std::make_unique<IOHandler>(dispatchers, conn);
+  IOHandler &io_handler_concrete = static_cast<IOHandler&>(*io_handler);
+  auto protocol = std::make_unique<ProtocolV2>(conn, io_handler_concrete);
+  io_handler_concrete.set_handshake_listener(*protocol);
+  return {std::move(io_handler), std::move(protocol)};
+}
+
 } // namespace crimson::net
 
 #if FMT_VERSION >= 90000
index 5b3d806ed7e9a66bda0304362cd898beb3100b75..aa7fcc027790d7f4aa113bc1589bca1d82ed8434 100644 (file)
@@ -28,9 +28,11 @@ using crimson::common::local_conf;
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    ChainedDispatchers& dispatchers)
   : core(messenger.shard_id()),
-    messenger(messenger),
-    protocol(std::make_unique<ProtocolV2>(dispatchers, *this))
+    messenger(messenger)
 {
+  auto ret = create_handlers(dispatchers, *this);
+  io_handler = std::move(ret.io_handler);
+  protocol = std::move(ret.protocol);
 #ifdef UNIT_TESTS_BUILT
   if (messenger.interceptor) {
     interceptor = messenger.interceptor;
@@ -44,7 +46,7 @@ SocketConnection::~SocketConnection() {}
 bool SocketConnection::is_connected() const
 {
   assert(seastar::this_shard_id() == shard_id());
-  return protocol->is_connected();
+  return io_handler->is_connected();
 }
 
 #ifdef UNIT_TESTS_BUILT
@@ -71,7 +73,7 @@ seastar::future<> SocketConnection::send(MessageURef msg)
   return seastar::smp::submit_to(
     shard_id(),
     [this, msg=std::move(msg)]() mutable {
-      return protocol->send(std::move(msg));
+      return io_handler->send(std::move(msg));
     });
 }
 
@@ -80,31 +82,31 @@ seastar::future<> SocketConnection::send_keepalive()
   return seastar::smp::submit_to(
     shard_id(),
     [this] {
-      return protocol->send_keepalive();
+      return io_handler->send_keepalive();
     });
 }
 
 SocketConnection::clock_t::time_point
 SocketConnection::get_last_keepalive() const
 {
-  return protocol->get_last_keepalive();
+  return io_handler->get_last_keepalive();
 }
 
 SocketConnection::clock_t::time_point
 SocketConnection::get_last_keepalive_ack() const
 {
-  return protocol->get_last_keepalive_ack();
+  return io_handler->get_last_keepalive_ack();
 }
 
 void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
 {
-  protocol->set_last_keepalive_ack(when);
+  io_handler->set_last_keepalive_ack(when);
 }
 
 void SocketConnection::mark_down()
 {
   assert(seastar::this_shard_id() == shard_id());
-  protocol->mark_down();
+  io_handler->mark_down();
 }
 
 void
index 94d98302bdf6e2ba1c2e4a4e52a797624fa97255..863968cac58d72cebd726e9e01c79aa8e5724125 100644 (file)
@@ -23,7 +23,7 @@
 
 namespace crimson::net {
 
-class Protocol;
+class ProtocolV2;
 class SocketMessenger;
 class SocketConnection;
 using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
@@ -32,10 +32,48 @@ using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
 class Interceptor;
 #endif
 
+/**
+ * ConnectionHandler
+ *
+ * The interface class to implement Connection, called by SocketConnection.
+ */
+class ConnectionHandler {
+public:
+  using clock_t = seastar::lowres_system_clock;
+
+  virtual ~ConnectionHandler() = default;
+
+  ConnectionHandler(const ConnectionHandler &) = delete;
+  ConnectionHandler(ConnectionHandler &&) = delete;
+  ConnectionHandler &operator=(const ConnectionHandler &) = delete;
+  ConnectionHandler &operator=(ConnectionHandler &&) = delete;
+
+  virtual bool is_connected() const = 0;
+
+  virtual seastar::future<> send(MessageURef) = 0;
+
+  virtual seastar::future<> send_keepalive() = 0;
+
+  virtual clock_t::time_point get_last_keepalive() const = 0;
+
+  virtual clock_t::time_point get_last_keepalive_ack() const = 0;
+
+  virtual void set_last_keepalive_ack(clock_t::time_point) = 0;
+
+  virtual void mark_down() = 0;
+
+protected:
+  ConnectionHandler() = default;
+};
+
 class SocketConnection : public Connection {
   const seastar::shard_id core;
+
   SocketMessenger& messenger;
-  std::unique_ptr<Protocol> protocol;
+
+  std::unique_ptr<ConnectionHandler> io_handler;
+
+  std::unique_ptr<ProtocolV2> protocol;
 
   SocketRef socket;
 
@@ -178,7 +216,7 @@ private:
   bool peer_wins() const;
 #endif
 
-  friend class Protocol;
+  friend class IOHandler;
   friend class ProtocolV2;
   friend class FrameAssemblerV2;
 };
diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc
new file mode 100644 (file)
index 0000000..20bcdbd
--- /dev/null
@@ -0,0 +1,716 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "io_handler.h"
+
+#include "auth/Auth.h"
+
+#include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/SocketMessenger.h"
+#include "msg/Message.h"
+
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
+namespace {
+
+seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_ms);
+}
+
+[[noreturn]] void abort_in_fault() {
+  throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+  throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
+{
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
+  size_t sum = 0;
+  // we don't include SegmentIndex::Msg::HEADER.
+  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+    sum += rx_frame_asm.get_segment_logical_len(idx);
+  }
+  return sum;
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+IOHandler::IOHandler(ChainedDispatchers &dispatchers,
+                     SocketConnection &conn)
+  : dispatchers(dispatchers),
+    conn(conn)
+{}
+
+IOHandler::~IOHandler()
+{
+  ceph_assert(gate.is_closed());
+  assert(!out_exit_dispatching);
+}
+
+ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
+  bool require_keepalive,
+  std::optional<utime_t> maybe_keepalive_ack,
+  bool require_ack)
+{
+  std::size_t num_msgs = out_pending_msgs.size();
+  ceph::bufferlist bl;
+
+  if (unlikely(require_keepalive)) {
+    auto keepalive_frame = KeepAliveFrame::Encode();
+    bl.append(frame_assembler->get_buffer(keepalive_frame));
+  }
+
+  if (unlikely(maybe_keepalive_ack.has_value())) {
+    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
+    bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
+  }
+
+  if (require_ack && num_msgs == 0u) {
+    auto ack_frame = AckFrame::Encode(get_in_seq());
+    bl.append(frame_assembler->get_buffer(ack_frame));
+  }
+
+  std::for_each(
+      out_pending_msgs.begin(),
+      out_pending_msgs.begin()+num_msgs,
+      [this, &bl](const MessageURef& msg) {
+    // set priority
+    msg->get_header().src = conn.messenger.get_myname();
+
+    msg->encode(conn.features, 0);
+
+    ceph_assert(!msg->get_seq() && "message already has seq");
+    msg->set_seq(++out_seq);
+
+    ceph_msg_header &header = msg->get_header();
+    ceph_msg_footer &footer = msg->get_footer();
+
+    ceph_msg_header2 header2{header.seq,        header.tid,
+                             header.type,       header.priority,
+                             header.version,
+                             ceph_le32(0),      header.data_off,
+                             ceph_le64(get_in_seq()),
+                             footer.flags,      header.compat_version,
+                             header.reserved};
+
+    auto message = MessageFrame::Encode(header2,
+        msg->get_payload(), msg->get_middle(), msg->get_data());
+    logger().debug("{} --> #{} === {} ({})",
+                  conn, msg->get_seq(), *msg, msg->get_type());
+    bl.append(frame_assembler->get_buffer(message));
+  });
+
+  if (!conn.policy.lossy) {
+    out_sent_msgs.insert(
+        out_sent_msgs.end(),
+        std::make_move_iterator(out_pending_msgs.begin()),
+        std::make_move_iterator(out_pending_msgs.end()));
+  }
+  out_pending_msgs.clear();
+  return bl;
+}
+
+seastar::future<> IOHandler::send(MessageURef msg)
+{
+  if (io_state != io_state_t::drop) {
+    out_pending_msgs.push_back(std::move(msg));
+    notify_out_dispatch();
+  }
+  return seastar::now();
+}
+
+seastar::future<> IOHandler::send_keepalive()
+{
+  if (!need_keepalive) {
+    need_keepalive = true;
+    notify_out_dispatch();
+  }
+  return seastar::now();
+}
+
+void IOHandler::mark_down()
+{
+  ceph_assert_always(io_state != io_state_t::none);
+  need_dispatch_reset = false;
+  if (io_state == io_state_t::drop) {
+    return;
+  }
+
+  logger().info("{} mark_down() with {}",
+                conn, io_stat_printer{*this});
+  set_io_state(io_state_t::drop);
+  handshake_listener->notify_mark_down();
+}
+
+void IOHandler::print_io_stat(std::ostream &out) const
+{
+  out << "io_stat("
+      << "io_state=" << fmt::format("{}", io_state)
+      << ", in_seq=" << in_seq
+      << ", out_seq=" << out_seq
+      << ", out_pending_msgs_size=" << out_pending_msgs.size()
+      << ", out_sent_msgs_size=" << out_sent_msgs.size()
+      << ", need_ack=" << (ack_left > 0)
+      << ", need_keepalive=" << need_keepalive
+      << ", need_keepalive_ack=" << bool(next_keepalive_ack)
+      << ")";
+}
+
+void IOHandler::set_io_state(
+    const IOHandler::io_state_t &new_state,
+    FrameAssemblerV2Ref fa)
+{
+  ceph_assert_always(!(
+    (new_state == io_state_t::none && io_state != io_state_t::none) ||
+    (new_state == io_state_t::open && io_state == io_state_t::open) ||
+    (new_state != io_state_t::drop && io_state == io_state_t::drop)
+  ));
+
+  bool dispatch_in = false;
+  if (new_state == io_state_t::open) {
+    // to open
+    ceph_assert_always(protocol_is_connected == true);
+    assert(fa != nullptr);
+    ceph_assert_always(frame_assembler == nullptr);
+    frame_assembler = std::move(fa);
+    ceph_assert_always(frame_assembler->is_socket_valid());
+    dispatch_in = true;
+#ifdef UNIT_TESTS_BUILT
+    if (conn.interceptor) {
+      conn.interceptor->register_conn_ready(conn);
+    }
+#endif
+  } else if (io_state == io_state_t::open) {
+    // from open
+    ceph_assert_always(protocol_is_connected == true);
+    protocol_is_connected = false;
+    assert(fa == nullptr);
+    ceph_assert_always(frame_assembler->is_socket_valid());
+    frame_assembler->shutdown_socket();
+    if (out_dispatching) {
+      ceph_assert_always(!out_exit_dispatching.has_value());
+      out_exit_dispatching = seastar::promise<>();
+    }
+  } else {
+    assert(fa == nullptr);
+  }
+
+  if (io_state != new_state) {
+    io_state = new_state;
+    io_state_changed.set_value();
+    io_state_changed = seastar::promise<>();
+  }
+
+  /*
+   * not atomic below
+   */
+
+  if (dispatch_in) {
+    do_in_dispatch();
+  }
+}
+
+seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
+{
+  ceph_assert_always(io_state != io_state_t::open);
+  ceph_assert_always(frame_assembler != nullptr);
+  ceph_assert_always(!frame_assembler->is_socket_valid());
+  return seastar::when_all(
+    [this] {
+      if (out_exit_dispatching) {
+        return out_exit_dispatching->get_future();
+      } else {
+        return seastar::now();
+      }
+    }(),
+    [this] {
+      if (in_exit_dispatching) {
+        return in_exit_dispatching->get_future();
+      } else {
+        return seastar::now();
+      }
+    }()
+  ).discard_result().then([this] {
+    return std::move(frame_assembler);
+  });
+}
+
+void IOHandler::reset_session(bool full)
+{
+  // reset in
+  in_seq = 0;
+  if (full) {
+    reset_out();
+    dispatch_remote_reset();
+  }
+}
+
+void IOHandler::requeue_out_sent()
+{
+  assert(io_state != io_state_t::open);
+  if (out_sent_msgs.empty()) {
+    return;
+  }
+
+  out_seq -= out_sent_msgs.size();
+  logger().debug("{} requeue {} items, revert out_seq to {}",
+                 conn, out_sent_msgs.size(), out_seq);
+  for (MessageURef& msg : out_sent_msgs) {
+    msg->clear_payload();
+    msg->set_seq(0);
+  }
+  out_pending_msgs.insert(
+      out_pending_msgs.begin(),
+      std::make_move_iterator(out_sent_msgs.begin()),
+      std::make_move_iterator(out_sent_msgs.end()));
+  out_sent_msgs.clear();
+  notify_out_dispatch();
+}
+
+void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
+{
+  assert(io_state != io_state_t::open);
+  if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
+    logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
+                   conn, out_seq, seq);
+    out_seq = seq;
+    return;
+  }
+  logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
+                 conn, seq, out_sent_msgs.size(), out_seq);
+  while (!out_sent_msgs.empty()) {
+    auto cur_seq = out_sent_msgs.front()->get_seq();
+    if (cur_seq == 0 || cur_seq > seq) {
+      break;
+    } else {
+      out_sent_msgs.pop_front();
+    }
+  }
+  requeue_out_sent();
+}
+
+void IOHandler::reset_out()
+{
+  assert(io_state != io_state_t::open);
+  out_seq = 0;
+  out_pending_msgs.clear();
+  out_sent_msgs.clear();
+  need_keepalive = false;
+  next_keepalive_ack = std::nullopt;
+  ack_left = 0;
+}
+
+void IOHandler::dispatch_accept()
+{
+  if (io_state == io_state_t::drop) {
+    return;
+  }
+  // protocol_is_connected can be from true to true here if the replacing is
+  // happening to a connected connection.
+  protocol_is_connected = true;
+  dispatchers.ms_handle_accept(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void IOHandler::dispatch_connect()
+{
+  if (io_state == io_state_t::drop) {
+    return;
+  }
+  ceph_assert_always(protocol_is_connected == false);
+  protocol_is_connected = true;
+  dispatchers.ms_handle_connect(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void IOHandler::dispatch_reset(bool is_replace)
+{
+  ceph_assert_always(io_state == io_state_t::drop);
+  if (!need_dispatch_reset) {
+    return;
+  }
+  need_dispatch_reset = false;
+  dispatchers.ms_handle_reset(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+    is_replace);
+}
+
+void IOHandler::dispatch_remote_reset()
+{
+  if (io_state == io_state_t::drop) {
+    return;
+  }
+  dispatchers.ms_handle_remote_reset(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void IOHandler::ack_out_sent(seq_num_t seq)
+{
+  if (conn.policy.lossy) {  // lossy connections don't keep sent messages
+    return;
+  }
+  while (!out_sent_msgs.empty() &&
+         out_sent_msgs.front()->get_seq() <= seq) {
+    logger().trace("{} got ack seq {} >= {}, pop {}",
+                   conn, seq, out_sent_msgs.front()->get_seq(),
+                   *out_sent_msgs.front());
+    out_sent_msgs.pop_front();
+  }
+}
+
+seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
+  assert(!is_out_queued());
+  return frame_assembler->flush(
+  ).then([this] {
+    if (!is_out_queued()) {
+      // still nothing pending to send after flush,
+      // the dispatching can ONLY stop now
+      ceph_assert(out_dispatching);
+      out_dispatching = false;
+      if (unlikely(out_exit_dispatching.has_value())) {
+        out_exit_dispatching->set_value();
+        out_exit_dispatching = std::nullopt;
+        logger().info("{} do_out_dispatch: nothing queued at {},"
+                      " set out_exit_dispatching",
+                      conn, io_state);
+      }
+      return seastar::make_ready_future<stop_t>(stop_t::yes);
+    } else {
+      // something is pending to send during flushing
+      return seastar::make_ready_future<stop_t>(stop_t::no);
+    }
+  });
+}
+
+seastar::future<> IOHandler::do_out_dispatch()
+{
+  return seastar::repeat([this] {
+    switch (io_state) {
+     case io_state_t::open: {
+      bool still_queued = is_out_queued();
+      if (unlikely(!still_queued)) {
+        return try_exit_out_dispatch();
+      }
+      auto to_ack = ack_left;
+      assert(to_ack == 0 || in_seq > 0);
+      return frame_assembler->write(
+        sweep_out_pending_msgs_to_sent(
+          need_keepalive, next_keepalive_ack, to_ack > 0)
+      ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
+        need_keepalive = false;
+        if (next_keepalive_ack == prv_keepalive_ack) {
+          next_keepalive_ack = std::nullopt;
+        }
+        assert(ack_left >= to_ack);
+        ack_left -= to_ack;
+        if (!is_out_queued()) {
+          return try_exit_out_dispatch();
+        } else {
+          // messages were enqueued during socket write
+          return seastar::make_ready_future<stop_t>(stop_t::no);
+        }
+      });
+     }
+     case io_state_t::delay:
+      // delay out dispatching until open
+      if (out_exit_dispatching) {
+        out_exit_dispatching->set_value();
+        out_exit_dispatching = std::nullopt;
+        logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
+      } else {
+        logger().info("{} do_out_dispatch: delay ...", conn);
+      }
+      return io_state_changed.get_future(
+      ).then([] { return stop_t::no; });
+     case io_state_t::drop:
+      ceph_assert(out_dispatching);
+      out_dispatching = false;
+      if (out_exit_dispatching) {
+        out_exit_dispatching->set_value();
+        out_exit_dispatching = std::nullopt;
+        logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
+      } else {
+        logger().info("{} do_out_dispatch: dropped", conn);
+      }
+      return seastar::make_ready_future<stop_t>(stop_t::yes);
+     default:
+      ceph_assert(false);
+    }
+  }).handle_exception_type([this] (const std::system_error& e) {
+    if (e.code() != std::errc::broken_pipe &&
+        e.code() != std::errc::connection_reset &&
+        e.code() != error::negotiation_failure) {
+      logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
+                     conn, io_state, e);
+      ceph_abort();
+    }
+
+    if (io_state == io_state_t::open) {
+      logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
+                    conn, io_state, e);
+      std::exception_ptr eptr;
+      try {
+        throw e;
+      } catch(...) {
+        eptr = std::current_exception();
+      }
+      set_io_state(io_state_t::delay);
+      handshake_listener->notify_out_fault("do_out_dispatch", eptr);
+    } else {
+      logger().info("{} do_out_dispatch(): fault at {} -- {}",
+                    conn, io_state, e);
+    }
+
+    return do_out_dispatch();
+  });
+}
+
+void IOHandler::notify_out_dispatch()
+{
+  handshake_listener->notify_out();
+  if (out_dispatching) {
+    // already dispatching
+    return;
+  }
+  out_dispatching = true;
+  switch (io_state) {
+   case io_state_t::open:
+     [[fallthrough]];
+   case io_state_t::delay:
+    assert(!gate.is_closed());
+    gate.dispatch_in_background("do_out_dispatch", conn, [this] {
+      return do_out_dispatch();
+    });
+    return;
+   case io_state_t::drop:
+    out_dispatching = false;
+    return;
+   default:
+    ceph_assert(false);
+  }
+}
+
+seastar::future<>
+IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
+{
+  return frame_assembler->read_frame_payload(
+  ).then([this, throttle_stamp, msg_size](auto payload) {
+    if (unlikely(io_state != io_state_t::open)) {
+      logger().debug("{} triggered {} during read_message()",
+                     conn, io_state);
+      abort_protocol();
+    }
+
+    utime_t recv_stamp{seastar::lowres_system_clock::now()};
+
+    // we need to get the size before std::moving segments data
+    auto msg_frame = MessageFrame::Decode(*payload);
+    // XXX: paranoid copy just to avoid oops
+    ceph_msg_header2 current_header = msg_frame.header();
+
+    logger().trace("{} got {} + {} + {} byte message,"
+                   " envelope type={} src={} off={} seq={}",
+                   conn, msg_frame.front_len(), msg_frame.middle_len(),
+                   msg_frame.data_len(), current_header.type, conn.get_peer_name(),
+                   current_header.data_off, current_header.seq);
+
+    ceph_msg_header header{current_header.seq,
+                           current_header.tid,
+                           current_header.type,
+                           current_header.priority,
+                           current_header.version,
+                           ceph_le32(msg_frame.front_len()),
+                           ceph_le32(msg_frame.middle_len()),
+                           ceph_le32(msg_frame.data_len()),
+                           current_header.data_off,
+                           conn.get_peer_name(),
+                           current_header.compat_version,
+                           current_header.reserved,
+                           ceph_le32(0)};
+    ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+                           ceph_le32(0), ceph_le64(0), current_header.flags};
+
+    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this());
+    Message *message = decode_message(nullptr, 0, header, footer,
+        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+    if (!message) {
+      logger().warn("{} decode message failed", conn);
+      abort_in_fault();
+    }
+
+    // store reservation size in message, so we don't get confused
+    // by messages entering the dispatch queue through other paths.
+    message->set_dispatch_throttle_size(msg_size);
+
+    message->set_throttle_stamp(throttle_stamp);
+    message->set_recv_stamp(recv_stamp);
+    message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
+
+    // check received seq#.  if it is old, drop the message.
+    // note that incoming messages may skip ahead.  this is convenient for the
+    // client side queueing because messages can't be renumbered, but the (kernel)
+    // client will occasionally pull a message out of the sent queue to send
+    // elsewhere.  in that case it doesn't matter if we "got" it or not.
+    uint64_t cur_seq = get_in_seq();
+    if (message->get_seq() <= cur_seq) {
+      logger().error("{} got old message {} <= {} {}, discarding",
+                     conn, message->get_seq(), cur_seq, *message);
+      if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+          local_conf()->ms_die_on_old_message) {
+        ceph_assert(0 == "old msgs despite reconnect_seq feature");
+      }
+      return seastar::now();
+    } else if (message->get_seq() > cur_seq + 1) {
+      logger().error("{} missed message? skipped from seq {} to {}",
+                     conn, cur_seq, message->get_seq());
+      if (local_conf()->ms_die_on_skipped_message) {
+        ceph_assert(0 == "skipped incoming seq");
+      }
+    }
+
+    // note last received message.
+    in_seq = message->get_seq();
+    if (conn.policy.lossy) {
+      logger().debug("{} <== #{} === {} ({})",
+                     conn,
+                     message->get_seq(),
+                     *message,
+                     message->get_type());
+    } else {
+      logger().debug("{} <== #{},{} === {} ({})",
+                     conn,
+                     message->get_seq(),
+                     current_header.ack_seq,
+                     *message,
+                     message->get_type());
+    }
+
+    // notify ack
+    if (!conn.policy.lossy) {
+      ++ack_left;
+      notify_out_dispatch();
+    }
+
+    ack_out_sent(current_header.ack_seq);
+
+    // TODO: change MessageRef with seastar::shared_ptr
+    auto msg_ref = MessageRef{message, false};
+    assert(io_state == io_state_t::open);
+    // throttle the reading process by the returned future
+    return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+  });
+}
+
+void IOHandler::do_in_dispatch()
+{
+  ceph_assert_always(!in_exit_dispatching.has_value());
+  in_exit_dispatching = seastar::promise<>();
+  gate.dispatch_in_background("do_in_dispatch", conn, [this] {
+    return seastar::keep_doing([this] {
+      return frame_assembler->read_main_preamble(
+      ).then([this](auto ret) {
+        switch (ret.tag) {
+          case Tag::MESSAGE: {
+            size_t msg_size = get_msg_size(*ret.rx_frame_asm);
+            return seastar::futurize_invoke([this] {
+              // throttle_message() logic
+              if (!conn.policy.throttler_messages) {
+                return seastar::now();
+              }
+              // TODO: message throttler
+              ceph_assert(false);
+              return seastar::now();
+            }).then([this, msg_size] {
+              // throttle_bytes() logic
+              if (!conn.policy.throttler_bytes) {
+                return seastar::now();
+              }
+              if (!msg_size) {
+                return seastar::now();
+              }
+              logger().trace("{} wants {} bytes from policy throttler {}/{}",
+                             conn, msg_size,
+                             conn.policy.throttler_bytes->get_current(),
+                             conn.policy.throttler_bytes->get_max());
+              return conn.policy.throttler_bytes->get(msg_size);
+            }).then([this, msg_size] {
+              // TODO: throttle_dispatch_queue() logic
+              utime_t throttle_stamp{seastar::lowres_system_clock::now()};
+              return read_message(throttle_stamp, msg_size);
+            });
+          }
+          case Tag::ACK:
+            return frame_assembler->read_frame_payload(
+            ).then([this](auto payload) {
+              // handle_message_ack() logic
+              auto ack = AckFrame::Decode(payload->back());
+              logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
+              ack_out_sent(ack.seq());
+            });
+          case Tag::KEEPALIVE2:
+            return frame_assembler->read_frame_payload(
+            ).then([this](auto payload) {
+              // handle_keepalive2() logic
+              auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
+              logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+                             conn, keepalive_frame.timestamp());
+              // notify keepalive ack
+              next_keepalive_ack = keepalive_frame.timestamp();
+              notify_out_dispatch();
+
+              last_keepalive = seastar::lowres_system_clock::now();
+            });
+          case Tag::KEEPALIVE2_ACK:
+            return frame_assembler->read_frame_payload(
+            ).then([this](auto payload) {
+              // handle_keepalive2_ack() logic
+              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
+              auto _last_keepalive_ack =
+                seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
+              set_last_keepalive_ack(_last_keepalive_ack);
+              logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
+                             conn, _last_keepalive_ack);
+            });
+          default: {
+            logger().warn("{} do_in_dispatch() received unexpected tag: {}",
+                          conn, static_cast<uint32_t>(ret.tag));
+            abort_in_fault();
+          }
+        }
+      });
+    }).handle_exception([this](std::exception_ptr eptr) {
+      const char *e_what;
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        e_what = e.what();
+      }
+
+      if (io_state == io_state_t::open) {
+        logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
+                      conn, io_state, e_what);
+        set_io_state(io_state_t::delay);
+        handshake_listener->notify_out_fault("do_in_dispatch", eptr);
+      } else {
+        logger().info("{} do_in_dispatch(): fault at {} -- {}",
+                      conn, io_state, e_what);
+      }
+    }).finally([this] {
+      ceph_assert_always(in_exit_dispatching.has_value());
+      in_exit_dispatching->set_value();
+      in_exit_dispatching = std::nullopt;
+    });
+  });
+}
+
+} // namespace crimson::net
diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h
new file mode 100644 (file)
index 0000000..d7beb9c
--- /dev/null
@@ -0,0 +1,269 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/util/later.hh>
+
+#include "crimson/common/gated.h"
+#include "Fwd.h"
+#include "SocketConnection.h"
+#include "FrameAssemblerV2.h"
+
+namespace crimson::net {
+
+/**
+ * HandshakeListener
+ *
+ * The interface class for IOHandler to notify the ProtocolV2 for handshake.
+ *
+ * The notifications may be cross-core and asynchronous.
+ */
+class HandshakeListener {
+public:
+  virtual ~HandshakeListener() = default;
+
+  HandshakeListener(const HandshakeListener&) = delete;
+  HandshakeListener(HandshakeListener &&) = delete;
+  HandshakeListener &operator=(const HandshakeListener &) = delete;
+  HandshakeListener &operator=(HandshakeListener &&) = delete;
+
+  virtual void notify_out() = 0;
+
+  virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
+
+  virtual void notify_mark_down() = 0;
+
+protected:
+  HandshakeListener() = default;
+};
+
+/**
+ * IOHandler
+ *
+ * Implements the message read and write paths after the handshake, and also be
+ * responsible to dispatch events. It is supposed to be working on the same
+ * core with the underlying socket and the FrameAssemblerV2 class.
+ */
+class IOHandler final : public ConnectionHandler {
+public:
+  IOHandler(ChainedDispatchers &,
+            SocketConnection &);
+
+  ~IOHandler() final;
+
+  IOHandler(const IOHandler &) = delete;
+  IOHandler(IOHandler &&) = delete;
+  IOHandler &operator=(const IOHandler &) = delete;
+  IOHandler &operator=(IOHandler &&) = delete;
+
+/*
+ * as ConnectionHandler
+ */
+private:
+  bool is_connected() const final {
+    return protocol_is_connected;
+  }
+
+  seastar::future<> send(MessageURef msg) final;
+
+  seastar::future<> send_keepalive() final;
+
+  clock_t::time_point get_last_keepalive() const final {
+    return last_keepalive;
+  }
+
+  clock_t::time_point get_last_keepalive_ack() const final {
+    return last_keepalive_ack;
+  }
+
+  void set_last_keepalive_ack(clock_t::time_point when) final {
+    last_keepalive_ack = when;
+  }
+
+  void mark_down() final;
+
+/*
+ * as IOHandler to be called by ProtocolV2 handshake
+ *
+ * The calls may be cross-core and asynchronous
+ */
+public:
+  void set_handshake_listener(HandshakeListener &hl) {
+    ceph_assert_always(handshake_listener == nullptr);
+    handshake_listener = &hl;
+  }
+
+  struct io_stat_printer {
+    const IOHandler &io_handler;
+  };
+  void print_io_stat(std::ostream &out) const;
+
+  seastar::future<> close_io(
+      bool is_dispatch_reset,
+      bool is_replace) {
+    ceph_assert_always(io_state == io_state_t::drop);
+
+    if (is_dispatch_reset) {
+      dispatch_reset(is_replace);
+    }
+    assert(!gate.is_closed());
+    return gate.close();
+  }
+
+  /**
+   * io_state_t
+   *
+   * The io_state is changed with the protocol state, to control the
+   * io behavior accordingly.
+   */
+  enum class io_state_t : uint8_t {
+    none,  // no IO is possible as the connection is not available to the user yet.
+    delay, // IO is delayed until open.
+    open,  // Dispatch In and Out concurrently.
+    drop   // Drop IO as the connection is closed.
+  };
+  friend class fmt::formatter<io_state_t>;
+
+  void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+
+  seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
+
+  void reset_session(bool full);
+
+  void requeue_out_sent_up_to(seq_num_t seq);
+
+  void requeue_out_sent();
+
+  bool is_out_queued_or_sent() const {
+    return is_out_queued() || !out_sent_msgs.empty();
+  }
+
+  seq_num_t get_in_seq() const {
+    return in_seq;
+  }
+
+  void dispatch_accept();
+
+  void dispatch_connect();
+
+ private:
+  void dispatch_reset(bool is_replace);
+
+  void dispatch_remote_reset();
+
+  bool is_out_queued() const {
+    return (!out_pending_msgs.empty() ||
+            ack_left > 0 ||
+            need_keepalive ||
+            next_keepalive_ack.has_value());
+  }
+
+  void reset_out();
+
+  seastar::future<stop_t> try_exit_out_dispatch();
+
+  seastar::future<> do_out_dispatch();
+
+  ceph::bufferlist sweep_out_pending_msgs_to_sent(
+      bool require_keepalive,
+      std::optional<utime_t> maybe_keepalive_ack,
+      bool require_ack);
+
+  void notify_out_dispatch();
+
+  void ack_out_sent(seq_num_t seq);
+
+  seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
+
+  void do_in_dispatch();
+
+private:
+  ChainedDispatchers &dispatchers;
+
+  SocketConnection &conn;
+
+  HandshakeListener *handshake_listener = nullptr;
+
+  crimson::common::Gated gate;
+
+  FrameAssemblerV2Ref frame_assembler;
+
+  bool protocol_is_connected = false;
+
+  bool need_dispatch_reset = true;
+
+  io_state_t io_state = io_state_t::none;
+
+  // wait until current io_state changed
+  seastar::promise<> io_state_changed;
+
+  /*
+   * out states for writing
+   */
+
+  bool out_dispatching = false;
+
+  std::optional<seastar::promise<>> out_exit_dispatching;
+
+  /// the seq num of the last transmitted message
+  seq_num_t out_seq = 0;
+
+  // messages to be resent after connection gets reset
+  std::deque<MessageURef> out_pending_msgs;
+
+  // messages sent, but not yet acked by peer
+  std::deque<MessageURef> out_sent_msgs;
+
+  bool need_keepalive = false;
+
+  std::optional<utime_t> next_keepalive_ack = std::nullopt;
+
+  uint64_t ack_left = 0;
+
+  /*
+   * in states for reading
+   */
+
+  std::optional<seastar::promise<>> in_exit_dispatching;
+
+  /// the seq num of the last received message
+  seq_num_t in_seq = 0;
+
+  clock_t::time_point last_keepalive;
+
+  clock_t::time_point last_keepalive_ack;
+};
+
+inline std::ostream& operator<<(
+    std::ostream& out, IOHandler::io_stat_printer stat) {
+  stat.io_handler.print_io_stat(out);
+  return out;
+}
+
+} // namespace crimson::net
+
+template <>
+struct fmt::formatter<crimson::net::IOHandler::io_state_t>
+  : fmt::formatter<std::string_view> {
+  template <typename FormatContext>
+  auto format(crimson::net::IOHandler::io_state_t state, FormatContext& ctx) {
+    using enum crimson::net::IOHandler::io_state_t;
+    std::string_view name;
+    switch (state) {
+    case none:
+      name = "none";
+      break;
+    case delay:
+      name = "delay";
+      break;
+    case open:
+      name = "open";
+      break;
+    case drop:
+      name = "drop";
+      break;
+    }
+    return formatter<string_view>::format(name, ctx);
+  }
+};