IOHandler also represents the Connection as ConnectionHandler.
ProtocolV2 and IOHandler will be finally running in different cores, as
ProtocolV2 will need to call IOHandler interfaces asynchronously. And
IOHandler will also notify ProtocolV2 through HandshakeListener
asynchronously.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc
net/Errors.cc
net/FrameAssemblerV2.cc
+ net/io_handler.cc
net/Messenger.cc
net/SocketConnection.cc
net/SocketMessenger.cc
net/Socket.cc
- net/Protocol.cc
net/ProtocolV2.cc
net/chained_dispatchers.cc)
add_library(crimson STATIC
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "Protocol.h"
-
-#include "auth/Auth.h"
-
-#include "crimson/common/formatter.h"
-#include "crimson/common/log.h"
-#include "crimson/net/Errors.h"
-#include "crimson/net/chained_dispatchers.h"
-#include "crimson/net/SocketConnection.h"
-#include "crimson/net/SocketMessenger.h"
-#include "msg/Message.h"
-
-using namespace ceph::msgr::v2;
-using crimson::common::local_conf;
-
-namespace {
-
-seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_ms);
-}
-
-[[noreturn]] void abort_in_fault() {
- throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
-}
-
-[[noreturn]] void abort_protocol() {
- throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
-}
-
-std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
-{
- ceph_assert(rx_frame_asm.get_num_segments() > 0);
- size_t sum = 0;
- // we don't include SegmentIndex::Msg::HEADER.
- for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
- sum += rx_frame_asm.get_segment_logical_len(idx);
- }
- return sum;
-}
-
-} // namespace anonymous
-
-namespace crimson::net {
-
-Protocol::Protocol(ChainedDispatchers& dispatchers,
- SocketConnection& conn)
- : dispatchers(dispatchers),
- conn(conn)
-{}
-
-Protocol::~Protocol()
-{
- ceph_assert(gate.is_closed());
- assert(!out_exit_dispatching);
-}
-
-ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
- bool require_keepalive,
- std::optional<utime_t> maybe_keepalive_ack,
- bool require_ack)
-{
- std::size_t num_msgs = out_pending_msgs.size();
- ceph::bufferlist bl;
-
- if (unlikely(require_keepalive)) {
- auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(frame_assembler->get_buffer(keepalive_frame));
- }
-
- if (unlikely(maybe_keepalive_ack.has_value())) {
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
- bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
- }
-
- if (require_ack && num_msgs == 0u) {
- auto ack_frame = AckFrame::Encode(get_in_seq());
- bl.append(frame_assembler->get_buffer(ack_frame));
- }
-
- std::for_each(
- out_pending_msgs.begin(),
- out_pending_msgs.begin()+num_msgs,
- [this, &bl](const MessageURef& msg) {
- // set priority
- msg->get_header().src = conn.messenger.get_myname();
-
- msg->encode(conn.features, 0);
-
- ceph_assert(!msg->get_seq() && "message already has seq");
- msg->set_seq(++out_seq);
-
- ceph_msg_header &header = msg->get_header();
- ceph_msg_footer &footer = msg->get_footer();
-
- ceph_msg_header2 header2{header.seq, header.tid,
- header.type, header.priority,
- header.version,
- ceph_le32(0), header.data_off,
- ceph_le64(get_in_seq()),
- footer.flags, header.compat_version,
- header.reserved};
-
- auto message = MessageFrame::Encode(header2,
- msg->get_payload(), msg->get_middle(), msg->get_data());
- logger().debug("{} --> #{} === {} ({})",
- conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(frame_assembler->get_buffer(message));
- });
-
- if (!conn.policy.lossy) {
- out_sent_msgs.insert(
- out_sent_msgs.end(),
- std::make_move_iterator(out_pending_msgs.begin()),
- std::make_move_iterator(out_pending_msgs.end()));
- }
- out_pending_msgs.clear();
- return bl;
-}
-
-seastar::future<> Protocol::send(MessageURef msg)
-{
- if (io_state != io_state_t::drop) {
- out_pending_msgs.push_back(std::move(msg));
- notify_out_dispatch();
- }
- return seastar::now();
-}
-
-seastar::future<> Protocol::send_keepalive()
-{
- if (!need_keepalive) {
- need_keepalive = true;
- notify_out_dispatch();
- }
- return seastar::now();
-}
-
-void Protocol::mark_down()
-{
- ceph_assert_always(io_state != io_state_t::none);
- need_dispatch_reset = false;
- if (io_state == io_state_t::drop) {
- return;
- }
-
- logger().info("{} mark_down() with {}",
- conn, io_stat_printer{*this});
- set_io_state(io_state_t::drop);
- notify_mark_down();
-}
-
-void Protocol::print_io_stat(std::ostream &out) const
-{
- out << "io_stat("
- << "io_state=" << fmt::format("{}", io_state)
- << ", in_seq=" << in_seq
- << ", out_seq=" << out_seq
- << ", out_pending_msgs_size=" << out_pending_msgs.size()
- << ", out_sent_msgs_size=" << out_sent_msgs.size()
- << ", need_ack=" << (ack_left > 0)
- << ", need_keepalive=" << need_keepalive
- << ", need_keepalive_ack=" << bool(next_keepalive_ack)
- << ")";
-}
-
-void Protocol::set_io_state(
- const Protocol::io_state_t &new_state,
- FrameAssemblerV2Ref fa)
-{
- ceph_assert_always(!(
- (new_state == io_state_t::none && io_state != io_state_t::none) ||
- (new_state == io_state_t::open && io_state == io_state_t::open) ||
- (new_state != io_state_t::drop && io_state == io_state_t::drop)
- ));
-
- bool dispatch_in = false;
- if (new_state == io_state_t::open) {
- // to open
- ceph_assert_always(protocol_is_connected == true);
- assert(fa != nullptr);
- ceph_assert_always(frame_assembler == nullptr);
- frame_assembler = std::move(fa);
- ceph_assert_always(frame_assembler->is_socket_valid());
- dispatch_in = true;
-#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- conn.interceptor->register_conn_ready(conn);
- }
-#endif
- } else if (io_state == io_state_t::open) {
- // from open
- ceph_assert_always(protocol_is_connected == true);
- protocol_is_connected = false;
- assert(fa == nullptr);
- ceph_assert_always(frame_assembler->is_socket_valid());
- frame_assembler->shutdown_socket();
- if (out_dispatching) {
- ceph_assert_always(!out_exit_dispatching.has_value());
- out_exit_dispatching = seastar::promise<>();
- }
- } else {
- assert(fa == nullptr);
- }
-
- if (io_state != new_state) {
- io_state = new_state;
- io_state_changed.set_value();
- io_state_changed = seastar::promise<>();
- }
-
- /*
- * not atomic below
- */
-
- if (dispatch_in) {
- do_in_dispatch();
- }
-}
-
-seastar::future<FrameAssemblerV2Ref> Protocol::wait_io_exit_dispatching()
-{
- ceph_assert_always(io_state != io_state_t::open);
- ceph_assert_always(frame_assembler != nullptr);
- ceph_assert_always(!frame_assembler->is_socket_valid());
- return seastar::when_all(
- [this] {
- if (out_exit_dispatching) {
- return out_exit_dispatching->get_future();
- } else {
- return seastar::now();
- }
- }(),
- [this] {
- if (in_exit_dispatching) {
- return in_exit_dispatching->get_future();
- } else {
- return seastar::now();
- }
- }()
- ).discard_result().then([this] {
- return std::move(frame_assembler);
- });
-}
-
-void Protocol::reset_session(bool full)
-{
- // reset in
- in_seq = 0;
- if (full) {
- reset_out();
- dispatch_remote_reset();
- }
-}
-
-void Protocol::requeue_out_sent()
-{
- assert(io_state != io_state_t::open);
- if (out_sent_msgs.empty()) {
- return;
- }
-
- out_seq -= out_sent_msgs.size();
- logger().debug("{} requeue {} items, revert out_seq to {}",
- conn, out_sent_msgs.size(), out_seq);
- for (MessageURef& msg : out_sent_msgs) {
- msg->clear_payload();
- msg->set_seq(0);
- }
- out_pending_msgs.insert(
- out_pending_msgs.begin(),
- std::make_move_iterator(out_sent_msgs.begin()),
- std::make_move_iterator(out_sent_msgs.end()));
- out_sent_msgs.clear();
- notify_out_dispatch();
-}
-
-void Protocol::requeue_out_sent_up_to(seq_num_t seq)
-{
- assert(io_state != io_state_t::open);
- if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
- logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
- conn, out_seq, seq);
- out_seq = seq;
- return;
- }
- logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
- conn, seq, out_sent_msgs.size(), out_seq);
- while (!out_sent_msgs.empty()) {
- auto cur_seq = out_sent_msgs.front()->get_seq();
- if (cur_seq == 0 || cur_seq > seq) {
- break;
- } else {
- out_sent_msgs.pop_front();
- }
- }
- requeue_out_sent();
-}
-
-void Protocol::reset_out()
-{
- assert(io_state != io_state_t::open);
- out_seq = 0;
- out_pending_msgs.clear();
- out_sent_msgs.clear();
- need_keepalive = false;
- next_keepalive_ack = std::nullopt;
- ack_left = 0;
-}
-
-void Protocol::dispatch_accept()
-{
- if (io_state == io_state_t::drop) {
- return;
- }
- // protocol_is_connected can be from true to true here if the replacing is
- // happening to a connected connection.
- protocol_is_connected = true;
- dispatchers.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-}
-
-void Protocol::dispatch_connect()
-{
- if (io_state == io_state_t::drop) {
- return;
- }
- ceph_assert_always(protocol_is_connected == false);
- protocol_is_connected = true;
- dispatchers.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-}
-
-void Protocol::dispatch_reset(bool is_replace)
-{
- ceph_assert_always(io_state == io_state_t::drop);
- if (!need_dispatch_reset) {
- return;
- }
- need_dispatch_reset = false;
- dispatchers.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
- is_replace);
-}
-
-void Protocol::dispatch_remote_reset()
-{
- if (io_state == io_state_t::drop) {
- return;
- }
- dispatchers.ms_handle_remote_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-}
-
-void Protocol::ack_out_sent(seq_num_t seq)
-{
- if (conn.policy.lossy) { // lossy connections don't keep sent messages
- return;
- }
- while (!out_sent_msgs.empty() &&
- out_sent_msgs.front()->get_seq() <= seq) {
- logger().trace("{} got ack seq {} >= {}, pop {}",
- conn, seq, out_sent_msgs.front()->get_seq(),
- *out_sent_msgs.front());
- out_sent_msgs.pop_front();
- }
-}
-
-seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
- assert(!is_out_queued());
- return frame_assembler->flush(
- ).then([this] {
- if (!is_out_queued()) {
- // still nothing pending to send after flush,
- // the dispatching can ONLY stop now
- ceph_assert(out_dispatching);
- out_dispatching = false;
- if (unlikely(out_exit_dispatching.has_value())) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: nothing queued at {},"
- " set out_exit_dispatching",
- conn, io_state);
- }
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- } else {
- // something is pending to send during flushing
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }
- });
-}
-
-seastar::future<> Protocol::do_out_dispatch()
-{
- return seastar::repeat([this] {
- switch (io_state) {
- case io_state_t::open: {
- bool still_queued = is_out_queued();
- if (unlikely(!still_queued)) {
- return try_exit_out_dispatch();
- }
- auto to_ack = ack_left;
- assert(to_ack == 0 || in_seq > 0);
- // sweep all pending out with the concrete Protocol
- return frame_assembler->write(
- sweep_out_pending_msgs_to_sent(
- need_keepalive, next_keepalive_ack, to_ack > 0)
- ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
- need_keepalive = false;
- if (next_keepalive_ack == prv_keepalive_ack) {
- next_keepalive_ack = std::nullopt;
- }
- assert(ack_left >= to_ack);
- ack_left -= to_ack;
- if (!is_out_queued()) {
- return try_exit_out_dispatch();
- } else {
- // messages were enqueued during socket write
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }
- });
- }
- case io_state_t::delay:
- // delay out dispatching until open
- if (out_exit_dispatching) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
- } else {
- logger().info("{} do_out_dispatch: delay ...", conn);
- }
- return io_state_changed.get_future(
- ).then([] { return stop_t::no; });
- case io_state_t::drop:
- ceph_assert(out_dispatching);
- out_dispatching = false;
- if (out_exit_dispatching) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
- } else {
- logger().info("{} do_out_dispatch: dropped", conn);
- }
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- default:
- ceph_assert(false);
- }
- }).handle_exception_type([this] (const std::system_error& e) {
- if (e.code() != std::errc::broken_pipe &&
- e.code() != std::errc::connection_reset &&
- e.code() != error::negotiation_failure) {
- logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
- conn, io_state, e);
- ceph_abort();
- }
-
- if (io_state == io_state_t::open) {
- logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
- conn, io_state, e);
- std::exception_ptr eptr;
- try {
- throw e;
- } catch(...) {
- eptr = std::current_exception();
- }
- set_io_state(io_state_t::delay);
- notify_out_fault("do_out_dispatch", eptr);
- } else {
- logger().info("{} do_out_dispatch(): fault at {} -- {}",
- conn, io_state, e);
- }
-
- return do_out_dispatch();
- });
-}
-
-void Protocol::notify_out_dispatch()
-{
- notify_out();
- if (out_dispatching) {
- // already dispatching
- return;
- }
- out_dispatching = true;
- switch (io_state) {
- case io_state_t::open:
- [[fallthrough]];
- case io_state_t::delay:
- assert(!gate.is_closed());
- gate.dispatch_in_background("do_out_dispatch", conn, [this] {
- return do_out_dispatch();
- });
- return;
- case io_state_t::drop:
- out_dispatching = false;
- return;
- default:
- ceph_assert(false);
- }
-}
-
-seastar::future<>
-Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
-{
- return frame_assembler->read_frame_payload(
- ).then([this, throttle_stamp, msg_size](auto payload) {
- if (unlikely(io_state != io_state_t::open)) {
- logger().debug("{} triggered {} during read_message()",
- conn, io_state);
- abort_protocol();
- }
-
- utime_t recv_stamp{seastar::lowres_system_clock::now()};
-
- // we need to get the size before std::moving segments data
- auto msg_frame = MessageFrame::Decode(*payload);
- // XXX: paranoid copy just to avoid oops
- ceph_msg_header2 current_header = msg_frame.header();
-
- logger().trace("{} got {} + {} + {} byte message,"
- " envelope type={} src={} off={} seq={}",
- conn, msg_frame.front_len(), msg_frame.middle_len(),
- msg_frame.data_len(), current_header.type, conn.get_peer_name(),
- current_header.data_off, current_header.seq);
-
- ceph_msg_header header{current_header.seq,
- current_header.tid,
- current_header.type,
- current_header.priority,
- current_header.version,
- ceph_le32(msg_frame.front_len()),
- ceph_le32(msg_frame.middle_len()),
- ceph_le32(msg_frame.data_len()),
- current_header.data_off,
- conn.get_peer_name(),
- current_header.compat_version,
- current_header.reserved,
- ceph_le32(0)};
- ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
- ceph_le32(0), ceph_le64(0), current_header.flags};
-
- auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this());
- Message *message = decode_message(nullptr, 0, header, footer,
- msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
- if (!message) {
- logger().warn("{} decode message failed", conn);
- abort_in_fault();
- }
-
- // store reservation size in message, so we don't get confused
- // by messages entering the dispatch queue through other paths.
- message->set_dispatch_throttle_size(msg_size);
-
- message->set_throttle_stamp(throttle_stamp);
- message->set_recv_stamp(recv_stamp);
- message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
-
- // check received seq#. if it is old, drop the message.
- // note that incoming messages may skip ahead. this is convenient for the
- // client side queueing because messages can't be renumbered, but the (kernel)
- // client will occasionally pull a message out of the sent queue to send
- // elsewhere. in that case it doesn't matter if we "got" it or not.
- uint64_t cur_seq = get_in_seq();
- if (message->get_seq() <= cur_seq) {
- logger().error("{} got old message {} <= {} {}, discarding",
- conn, message->get_seq(), cur_seq, *message);
- if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
- local_conf()->ms_die_on_old_message) {
- ceph_assert(0 == "old msgs despite reconnect_seq feature");
- }
- return seastar::now();
- } else if (message->get_seq() > cur_seq + 1) {
- logger().error("{} missed message? skipped from seq {} to {}",
- conn, cur_seq, message->get_seq());
- if (local_conf()->ms_die_on_skipped_message) {
- ceph_assert(0 == "skipped incoming seq");
- }
- }
-
- // note last received message.
- in_seq = message->get_seq();
- if (conn.policy.lossy) {
- logger().debug("{} <== #{} === {} ({})",
- conn,
- message->get_seq(),
- *message,
- message->get_type());
- } else {
- logger().debug("{} <== #{},{} === {} ({})",
- conn,
- message->get_seq(),
- current_header.ack_seq,
- *message,
- message->get_type());
- }
-
- // notify ack
- if (!conn.policy.lossy) {
- ++ack_left;
- notify_out_dispatch();
- }
-
- ack_out_sent(current_header.ack_seq);
-
- // TODO: change MessageRef with seastar::shared_ptr
- auto msg_ref = MessageRef{message, false};
- assert(io_state == io_state_t::open);
- // throttle the reading process by the returned future
- return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
- });
-}
-
-void Protocol::do_in_dispatch()
-{
- ceph_assert_always(!in_exit_dispatching.has_value());
- in_exit_dispatching = seastar::promise<>();
- gate.dispatch_in_background("do_in_dispatch", conn, [this] {
- return seastar::keep_doing([this] {
- return frame_assembler->read_main_preamble(
- ).then([this](auto ret) {
- switch (ret.tag) {
- case Tag::MESSAGE: {
- size_t msg_size = get_msg_size(*ret.rx_frame_asm);
- return seastar::futurize_invoke([this] {
- // throttle_message() logic
- if (!conn.policy.throttler_messages) {
- return seastar::now();
- }
- // TODO: message throttler
- ceph_assert(false);
- return seastar::now();
- }).then([this, msg_size] {
- // throttle_bytes() logic
- if (!conn.policy.throttler_bytes) {
- return seastar::now();
- }
- if (!msg_size) {
- return seastar::now();
- }
- logger().trace("{} wants {} bytes from policy throttler {}/{}",
- conn, msg_size,
- conn.policy.throttler_bytes->get_current(),
- conn.policy.throttler_bytes->get_max());
- return conn.policy.throttler_bytes->get(msg_size);
- }).then([this, msg_size] {
- // TODO: throttle_dispatch_queue() logic
- utime_t throttle_stamp{seastar::lowres_system_clock::now()};
- return read_message(throttle_stamp, msg_size);
- });
- }
- case Tag::ACK:
- return frame_assembler->read_frame_payload(
- ).then([this](auto payload) {
- // handle_message_ack() logic
- auto ack = AckFrame::Decode(payload->back());
- logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
- ack_out_sent(ack.seq());
- });
- case Tag::KEEPALIVE2:
- return frame_assembler->read_frame_payload(
- ).then([this](auto payload) {
- // handle_keepalive2() logic
- auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
- logger().debug("{} GOT KeepAliveFrame: timestamp={}",
- conn, keepalive_frame.timestamp());
- // notify keepalive ack
- next_keepalive_ack = keepalive_frame.timestamp();
- notify_out_dispatch();
-
- last_keepalive = seastar::lowres_system_clock::now();
- });
- case Tag::KEEPALIVE2_ACK:
- return frame_assembler->read_frame_payload(
- ).then([this](auto payload) {
- // handle_keepalive2_ack() logic
- auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
- auto _last_keepalive_ack =
- seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
- set_last_keepalive_ack(_last_keepalive_ack);
- logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
- conn, _last_keepalive_ack);
- });
- default: {
- logger().warn("{} do_in_dispatch() received unexpected tag: {}",
- conn, static_cast<uint32_t>(ret.tag));
- abort_in_fault();
- }
- }
- });
- }).handle_exception([this](std::exception_ptr eptr) {
- const char *e_what;
- try {
- std::rethrow_exception(eptr);
- } catch (std::exception &e) {
- e_what = e.what();
- }
-
- if (io_state == io_state_t::open) {
- logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
- conn, io_state, e_what);
- set_io_state(io_state_t::delay);
- notify_out_fault("do_in_dispatch", eptr);
- } else {
- logger().info("{} do_in_dispatch(): fault at {} -- {}",
- conn, io_state, e_what);
- }
- }).finally([this] {
- ceph_assert_always(in_exit_dispatching.has_value());
- in_exit_dispatching->set_value();
- in_exit_dispatching = std::nullopt;
- });
- });
-}
-
-} // namespace crimson::net
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <seastar/core/gate.hh>
-#include <seastar/core/shared_future.hh>
-#include <seastar/util/later.hh>
-
-#include "crimson/common/gated.h"
-#include "crimson/common/log.h"
-#include "Fwd.h"
-#include "SocketConnection.h"
-#include "FrameAssemblerV2.h"
-
-namespace crimson::net {
-
-class Protocol {
-// public to SocketConnection
- public:
- Protocol(Protocol&&) = delete;
- virtual ~Protocol();
-
- virtual seastar::future<> close_clean_yielded() = 0;
-
-#ifdef UNIT_TESTS_BUILT
- virtual bool is_closed_clean() const = 0;
-
- virtual bool is_closed() const = 0;
-
-#endif
- virtual void start_connect(const entity_addr_t& peer_addr,
- const entity_name_t& peer_name) = 0;
-
- virtual void start_accept(SocketRef&& socket,
- const entity_addr_t& peer_addr) = 0;
-
- protected:
- Protocol(ChainedDispatchers& dispatchers,
- SocketConnection& conn);
-
- virtual void notify_out() = 0;
-
- virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
-
- virtual void notify_mark_down() = 0;
-
-// the write state-machine
- public:
- using clock_t = seastar::lowres_system_clock;
-
- bool is_connected() const {
- return protocol_is_connected;
- }
-
- seastar::future<> send(MessageURef msg);
-
- seastar::future<> send_keepalive();
-
- clock_t::time_point get_last_keepalive() const {
- return last_keepalive;
- }
-
- clock_t::time_point get_last_keepalive_ack() const {
- return last_keepalive_ack;
- }
-
- void set_last_keepalive_ack(clock_t::time_point when) {
- last_keepalive_ack = when;
- }
-
- void mark_down();
-
- struct io_stat_printer {
- const Protocol &protocol;
- };
- void print_io_stat(std::ostream &out) const;
-
-// TODO: encapsulate a SessionedSender class
- protected:
- seastar::future<> close_io(
- bool is_dispatch_reset,
- bool is_replace) {
- ceph_assert_always(io_state == io_state_t::drop);
-
- if (is_dispatch_reset) {
- dispatch_reset(is_replace);
- }
- assert(!gate.is_closed());
- return gate.close();
- }
-
- /**
- * io_state_t
- *
- * The io_state is changed with protocol state atomically, indicating the
- * IOHandler behavior of the according protocol state.
- */
- enum class io_state_t : uint8_t {
- none,
- delay,
- open,
- drop
- };
- friend class fmt::formatter<io_state_t>;
-
- void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
-
- seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
-
- void reset_session(bool full);
-
- void requeue_out_sent_up_to(seq_num_t seq);
-
- void requeue_out_sent();
-
- bool is_out_queued_or_sent() const {
- return is_out_queued() || !out_sent_msgs.empty();
- }
-
- seq_num_t get_in_seq() const {
- return in_seq;
- }
-
- void dispatch_accept();
-
- void dispatch_connect();
-
- private:
- void dispatch_reset(bool is_replace);
-
- void dispatch_remote_reset();
-
- bool is_out_queued() const {
- return (!out_pending_msgs.empty() ||
- ack_left > 0 ||
- need_keepalive ||
- next_keepalive_ack.has_value());
- }
-
- void reset_out();
-
- seastar::future<stop_t> try_exit_out_dispatch();
-
- seastar::future<> do_out_dispatch();
-
- ceph::bufferlist sweep_out_pending_msgs_to_sent(
- bool require_keepalive,
- std::optional<utime_t> maybe_keepalive_ack,
- bool require_ack);
-
- void notify_out_dispatch();
-
- void ack_out_sent(seq_num_t seq);
-
- seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
-
- void do_in_dispatch();
-
-private:
- ChainedDispatchers &dispatchers;
-
- SocketConnection &conn;
-
- crimson::common::Gated gate;
-
- FrameAssemblerV2Ref frame_assembler;
-
- bool protocol_is_connected = false;
-
- bool need_dispatch_reset = true;
-
- io_state_t io_state = io_state_t::none;
-
- // wait until current io_state changed
- seastar::promise<> io_state_changed;
-
- /*
- * out states for writing
- */
-
- bool out_dispatching = false;
-
- std::optional<seastar::promise<>> out_exit_dispatching;
-
- /// the seq num of the last transmitted message
- seq_num_t out_seq = 0;
-
- // messages to be resent after connection gets reset
- std::deque<MessageURef> out_pending_msgs;
-
- // messages sent, but not yet acked by peer
- std::deque<MessageURef> out_sent_msgs;
-
- bool need_keepalive = false;
-
- std::optional<utime_t> next_keepalive_ack = std::nullopt;
-
- uint64_t ack_left = 0;
-
- /*
- * in states for reading
- */
-
- std::optional<seastar::promise<>> in_exit_dispatching;
-
- /// the seq num of the last received message
- seq_num_t in_seq = 0;
-
- clock_t::time_point last_keepalive;
-
- clock_t::time_point last_keepalive_ack;
-};
-
-inline std::ostream& operator<<(
- std::ostream& out, Protocol::io_stat_printer stat) {
- stat.protocol.print_io_stat(out);
- return out;
-}
-
-} // namespace crimson::net
-
-template <>
-struct fmt::formatter<crimson::net::Protocol::io_state_t>
- : fmt::formatter<std::string_view> {
- template <typename FormatContext>
- auto format(crimson::net::Protocol::io_state_t state, FormatContext& ctx) {
- using enum crimson::net::Protocol::io_state_t;
- std::string_view name;
- switch (state) {
- case none:
- name = "none";
- break;
- case delay:
- name = "delay";
- break;
- case open:
- name = "open";
- break;
- case drop:
- name = "drop";
- break;
- }
- return formatter<string_view>::format(name, ctx);
- }
-};
#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
#include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
#include "Errors.h"
-#include "SocketConnection.h"
#include "SocketMessenger.h"
#ifdef UNIT_TESTS_BUILT
using namespace ceph::msgr::v2;
using crimson::common::local_conf;
+using io_state_t = crimson::net::IOHandler::io_state_t;
+using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
namespace {
});
}
-ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
- SocketConnection& conn)
- : Protocol(dispatchers, conn),
- conn{conn},
+ProtocolV2::ProtocolV2(SocketConnection& conn,
+ IOHandler &io_handler)
+ : conn{conn},
messenger{conn.messenger},
+ io_handler{io_handler},
frame_assembler{FrameAssemblerV2::create(conn)},
auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
protocol_timer{conn}
if (new_state == state_t::READY) {
// I'm not responsible to shutdown the socket at READY
is_socket_valid = false;
- set_io_state(new_io_state, std::move(frame_assembler));
+ io_handler.set_io_state(new_io_state, std::move(frame_assembler));
} else {
- set_io_state(new_io_state, nullptr);
+ io_handler.set_io_state(new_io_state, nullptr);
}
/*
if (pre_state == state_t::READY) {
gate.dispatch_in_background("exit_io", conn, [this] {
- return wait_io_exit_dispatching(
+ return io_handler.wait_io_exit_dispatching(
).then([this](FrameAssemblerV2Ref fa) {
frame_assembler = std::move(fa);
exit_io->set_value();
}
if (conn.policy.server ||
- (conn.policy.standby && !is_out_queued_or_sent())) {
+ (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
if (conn.policy.server) {
logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
conn,
get_state_name(state),
where,
- io_stat_printer{*this},
+ io_stat_printer{io_handler},
e_what);
} else {
logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
conn,
get_state_name(state),
where,
- io_stat_printer{*this},
+ io_stat_printer{io_handler},
e_what);
}
execute_standby();
conn,
get_state_name(state),
where,
- io_stat_printer{*this},
+ io_stat_printer{io_handler},
e_what);
execute_wait(false);
} else {
conn,
get_state_name(state),
where,
- io_stat_printer{*this},
+ io_stat_printer{io_handler},
e_what);
execute_connecting();
}
client_cookie = generate_client_cookie();
peer_global_seq = 0;
}
- do_reset_session(full);
+ io_handler.reset_session(full);
}
seastar::future<std::tuple<entity_type_t, entity_addr_t>>
return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_server_ident() logic
- requeue_out_sent();
+ io_handler.requeue_out_sent();
auto server_ident = ServerIdentFrame::Decode(payload->back());
logger().debug("{} GOT ServerIdentFrame:"
" addrs={}, gid={}, gs={},"
server_cookie,
global_seq,
connect_seq,
- get_in_seq());
+ io_handler.get_in_seq());
logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
" server_cookie={}, gs={}, cs={}, in_seq={}",
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
- global_seq, connect_seq, get_in_seq());
+ global_seq, connect_seq, io_handler.get_in_seq());
return frame_assembler->write_flush_frame(reconnect).then([this] {
return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
conn, reconnect_ok.msg_seq());
- requeue_out_sent_up_to(reconnect_ok.msg_seq());
+ io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
"client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie,
- io_stat_printer{*this});
- dispatch_connect();
+ io_stat_printer{io_handler});
+ io_handler.dispatch_connect();
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} after ms_handle_connect(), abort",
conn, get_state_name(state));
accept_me();
}
- dispatch_accept();
+ io_handler.dispatch_accept();
if (unlikely(state != state_t::ESTABLISHING)) {
logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
conn, get_state_name(state));
"client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie,
- io_stat_printer{*this});
+ io_stat_printer{io_handler});
execute_ready();
}).handle_exception([this](std::exception_ptr eptr) {
fault(state_t::ESTABLISHING, "execute_establishing", eptr);
logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
// this is required for the case when this connection is being replaced
- requeue_out_sent_up_to(0);
- do_reset_session(false);
+ io_handler.requeue_out_sent_up_to(0);
+ io_handler.reset_session(false);
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
ceph_assert_always(state == state_t::REPLACING);
- dispatch_accept();
+ io_handler.dispatch_accept();
// state may become CLOSING, close mover.socket and abort later
return wait_exit_io(
).then([this] {
if (reconnect) {
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
- requeue_out_sent_up_to(new_msg_seq);
- auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
- logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
+ io_handler.requeue_out_sent_up_to(new_msg_seq);
+ auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
return frame_assembler->write_flush_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
conn, reconnect ? "reconnected" : "connected",
global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie,
- io_stat_printer{*this});
+ io_stat_printer{io_handler});
execute_ready();
}).handle_exception([this](std::exception_ptr eptr) {
fault(state_t::REPLACING, "trigger_replacing", eptr);
}
assert(!gate.is_closed());
auto handshake_closed = gate.close();
- auto io_closed = close_io(
+ auto io_closed = io_handler.close_io(
is_dispatch_reset, is_replace);
// asynchronous operations
#pragma once
+#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
-#include "Protocol.h"
+#include "io_handler.h"
namespace crimson::net {
-class ProtocolV2 final : public Protocol {
+class ProtocolV2 final : public HandshakeListener {
using AuthConnectionMetaRef = seastar::lw_shared_ptr<AuthConnectionMeta>;
- public:
- ProtocolV2(ChainedDispatchers& dispatchers,
- SocketConnection& conn);
- ~ProtocolV2() override;
+public:
+ ProtocolV2(SocketConnection &,
+ IOHandler &);
-// public to SocketConnection, but private to the others
- private:
- seastar::future<> close_clean_yielded() override;
+ ~ProtocolV2() final;
-#ifdef UNIT_TESTS_BUILT
- bool is_closed_clean() const override {
- return closed_clean;
- }
+ ProtocolV2(const ProtocolV2 &) = delete;
+ ProtocolV2(ProtocolV2 &&) = delete;
+ ProtocolV2 &operator=(const ProtocolV2 &) = delete;
+ ProtocolV2 &operator=(ProtocolV2 &&) = delete;
- bool is_closed() const override {
- return closed;
- }
+/**
+ * as HandshakeListener
+ */
+private:
+ void notify_out() final;
-#endif
+ void notify_out_fault(const char *, std::exception_ptr) final;
+
+ void notify_mark_down() final;
+
+/*
+* as ProtocolV2 to be called by SocketConnection
+*/
+public:
void start_connect(const entity_addr_t& peer_addr,
- const entity_name_t& peer_name) override;
+ const entity_name_t& peer_name);
void start_accept(SocketRef&& socket,
- const entity_addr_t& peer_addr) override;
+ const entity_addr_t& peer_addr);
- private:
- void notify_out() override;
+ seastar::future<> close_clean_yielded();
- void notify_out_fault(const char *, std::exception_ptr) override;
+#ifdef UNIT_TESTS_BUILT
+ bool is_closed_clean() const {
+ return closed_clean;
+ }
- void notify_mark_down() override;
+ bool is_closed() const {
+ return closed;
+ }
- private:
+#endif
+private:
seastar::future<> wait_exit_io() {
if (exit_io.has_value()) {
return exit_io->get_shared_future();
return statenames[static_cast<int>(state)];
}
- void trigger_state(state_t state, io_state_t io_state, bool reentrant);
+ void trigger_state(state_t state, IOHandler::io_state_t io_state, bool reentrant);
template <typename Func, typename T>
void gated_execute(const char *what, T &who, Func &&func) {
void do_close(bool is_dispatch_reset,
std::optional<std::function<void()>> f_accept_new=std::nullopt);
- private:
+private:
SocketConnection &conn;
SocketMessenger &messenger;
+ IOHandler &io_handler;
+
bool has_socket = false;
// the socket exists and it is not shutdown
Timer protocol_timer;
};
+struct create_handlers_ret {
+ std::unique_ptr<ConnectionHandler> io_handler;
+ std::unique_ptr<ProtocolV2> protocol;
+};
+inline create_handlers_ret create_handlers(ChainedDispatchers &dispatchers, SocketConnection &conn) {
+ std::unique_ptr<ConnectionHandler> io_handler = std::make_unique<IOHandler>(dispatchers, conn);
+ IOHandler &io_handler_concrete = static_cast<IOHandler&>(*io_handler);
+ auto protocol = std::make_unique<ProtocolV2>(conn, io_handler_concrete);
+ io_handler_concrete.set_handshake_listener(*protocol);
+ return {std::move(io_handler), std::move(protocol)};
+}
+
} // namespace crimson::net
#if FMT_VERSION >= 90000
SocketConnection::SocketConnection(SocketMessenger& messenger,
ChainedDispatchers& dispatchers)
: core(messenger.shard_id()),
- messenger(messenger),
- protocol(std::make_unique<ProtocolV2>(dispatchers, *this))
+ messenger(messenger)
{
+ auto ret = create_handlers(dispatchers, *this);
+ io_handler = std::move(ret.io_handler);
+ protocol = std::move(ret.protocol);
#ifdef UNIT_TESTS_BUILT
if (messenger.interceptor) {
interceptor = messenger.interceptor;
bool SocketConnection::is_connected() const
{
assert(seastar::this_shard_id() == shard_id());
- return protocol->is_connected();
+ return io_handler->is_connected();
}
#ifdef UNIT_TESTS_BUILT
return seastar::smp::submit_to(
shard_id(),
[this, msg=std::move(msg)]() mutable {
- return protocol->send(std::move(msg));
+ return io_handler->send(std::move(msg));
});
}
return seastar::smp::submit_to(
shard_id(),
[this] {
- return protocol->send_keepalive();
+ return io_handler->send_keepalive();
});
}
SocketConnection::clock_t::time_point
SocketConnection::get_last_keepalive() const
{
- return protocol->get_last_keepalive();
+ return io_handler->get_last_keepalive();
}
SocketConnection::clock_t::time_point
SocketConnection::get_last_keepalive_ack() const
{
- return protocol->get_last_keepalive_ack();
+ return io_handler->get_last_keepalive_ack();
}
void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
{
- protocol->set_last_keepalive_ack(when);
+ io_handler->set_last_keepalive_ack(when);
}
void SocketConnection::mark_down()
{
assert(seastar::this_shard_id() == shard_id());
- protocol->mark_down();
+ io_handler->mark_down();
}
void
namespace crimson::net {
-class Protocol;
+class ProtocolV2;
class SocketMessenger;
class SocketConnection;
using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
class Interceptor;
#endif
+/**
+ * ConnectionHandler
+ *
+ * The interface class to implement Connection, called by SocketConnection.
+ */
+class ConnectionHandler {
+public:
+ using clock_t = seastar::lowres_system_clock;
+
+ virtual ~ConnectionHandler() = default;
+
+ ConnectionHandler(const ConnectionHandler &) = delete;
+ ConnectionHandler(ConnectionHandler &&) = delete;
+ ConnectionHandler &operator=(const ConnectionHandler &) = delete;
+ ConnectionHandler &operator=(ConnectionHandler &&) = delete;
+
+ virtual bool is_connected() const = 0;
+
+ virtual seastar::future<> send(MessageURef) = 0;
+
+ virtual seastar::future<> send_keepalive() = 0;
+
+ virtual clock_t::time_point get_last_keepalive() const = 0;
+
+ virtual clock_t::time_point get_last_keepalive_ack() const = 0;
+
+ virtual void set_last_keepalive_ack(clock_t::time_point) = 0;
+
+ virtual void mark_down() = 0;
+
+protected:
+ ConnectionHandler() = default;
+};
+
class SocketConnection : public Connection {
const seastar::shard_id core;
+
SocketMessenger& messenger;
- std::unique_ptr<Protocol> protocol;
+
+ std::unique_ptr<ConnectionHandler> io_handler;
+
+ std::unique_ptr<ProtocolV2> protocol;
SocketRef socket;
bool peer_wins() const;
#endif
- friend class Protocol;
+ friend class IOHandler;
friend class ProtocolV2;
friend class FrameAssemblerV2;
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "io_handler.h"
+
+#include "auth/Auth.h"
+
+#include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/SocketMessenger.h"
+#include "msg/Message.h"
+
+using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
+
+namespace {
+
+seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+}
+
+[[noreturn]] void abort_in_fault() {
+ throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+}
+
+[[noreturn]] void abort_protocol() {
+ throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
+}
+
+std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
+{
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
+ }
+ return sum;
+}
+
+} // namespace anonymous
+
+namespace crimson::net {
+
+IOHandler::IOHandler(ChainedDispatchers &dispatchers,
+ SocketConnection &conn)
+ : dispatchers(dispatchers),
+ conn(conn)
+{}
+
+IOHandler::~IOHandler()
+{
+ ceph_assert(gate.is_closed());
+ assert(!out_exit_dispatching);
+}
+
+ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
+ bool require_keepalive,
+ std::optional<utime_t> maybe_keepalive_ack,
+ bool require_ack)
+{
+ std::size_t num_msgs = out_pending_msgs.size();
+ ceph::bufferlist bl;
+
+ if (unlikely(require_keepalive)) {
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ bl.append(frame_assembler->get_buffer(keepalive_frame));
+ }
+
+ if (unlikely(maybe_keepalive_ack.has_value())) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
+ bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
+ }
+
+ if (require_ack && num_msgs == 0u) {
+ auto ack_frame = AckFrame::Encode(get_in_seq());
+ bl.append(frame_assembler->get_buffer(ack_frame));
+ }
+
+ std::for_each(
+ out_pending_msgs.begin(),
+ out_pending_msgs.begin()+num_msgs,
+ [this, &bl](const MessageURef& msg) {
+ // set priority
+ msg->get_header().src = conn.messenger.get_myname();
+
+ msg->encode(conn.features, 0);
+
+ ceph_assert(!msg->get_seq() && "message already has seq");
+ msg->set_seq(++out_seq);
+
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ ceph_le32(0), header.data_off,
+ ceph_le64(get_in_seq()),
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto message = MessageFrame::Encode(header2,
+ msg->get_payload(), msg->get_middle(), msg->get_data());
+ logger().debug("{} --> #{} === {} ({})",
+ conn, msg->get_seq(), *msg, msg->get_type());
+ bl.append(frame_assembler->get_buffer(message));
+ });
+
+ if (!conn.policy.lossy) {
+ out_sent_msgs.insert(
+ out_sent_msgs.end(),
+ std::make_move_iterator(out_pending_msgs.begin()),
+ std::make_move_iterator(out_pending_msgs.end()));
+ }
+ out_pending_msgs.clear();
+ return bl;
+}
+
+seastar::future<> IOHandler::send(MessageURef msg)
+{
+ if (io_state != io_state_t::drop) {
+ out_pending_msgs.push_back(std::move(msg));
+ notify_out_dispatch();
+ }
+ return seastar::now();
+}
+
+seastar::future<> IOHandler::send_keepalive()
+{
+ if (!need_keepalive) {
+ need_keepalive = true;
+ notify_out_dispatch();
+ }
+ return seastar::now();
+}
+
+void IOHandler::mark_down()
+{
+ ceph_assert_always(io_state != io_state_t::none);
+ need_dispatch_reset = false;
+ if (io_state == io_state_t::drop) {
+ return;
+ }
+
+ logger().info("{} mark_down() with {}",
+ conn, io_stat_printer{*this});
+ set_io_state(io_state_t::drop);
+ handshake_listener->notify_mark_down();
+}
+
+void IOHandler::print_io_stat(std::ostream &out) const
+{
+ out << "io_stat("
+ << "io_state=" << fmt::format("{}", io_state)
+ << ", in_seq=" << in_seq
+ << ", out_seq=" << out_seq
+ << ", out_pending_msgs_size=" << out_pending_msgs.size()
+ << ", out_sent_msgs_size=" << out_sent_msgs.size()
+ << ", need_ack=" << (ack_left > 0)
+ << ", need_keepalive=" << need_keepalive
+ << ", need_keepalive_ack=" << bool(next_keepalive_ack)
+ << ")";
+}
+
+void IOHandler::set_io_state(
+ const IOHandler::io_state_t &new_state,
+ FrameAssemblerV2Ref fa)
+{
+ ceph_assert_always(!(
+ (new_state == io_state_t::none && io_state != io_state_t::none) ||
+ (new_state == io_state_t::open && io_state == io_state_t::open) ||
+ (new_state != io_state_t::drop && io_state == io_state_t::drop)
+ ));
+
+ bool dispatch_in = false;
+ if (new_state == io_state_t::open) {
+ // to open
+ ceph_assert_always(protocol_is_connected == true);
+ assert(fa != nullptr);
+ ceph_assert_always(frame_assembler == nullptr);
+ frame_assembler = std::move(fa);
+ ceph_assert_always(frame_assembler->is_socket_valid());
+ dispatch_in = true;
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_ready(conn);
+ }
+#endif
+ } else if (io_state == io_state_t::open) {
+ // from open
+ ceph_assert_always(protocol_is_connected == true);
+ protocol_is_connected = false;
+ assert(fa == nullptr);
+ ceph_assert_always(frame_assembler->is_socket_valid());
+ frame_assembler->shutdown_socket();
+ if (out_dispatching) {
+ ceph_assert_always(!out_exit_dispatching.has_value());
+ out_exit_dispatching = seastar::promise<>();
+ }
+ } else {
+ assert(fa == nullptr);
+ }
+
+ if (io_state != new_state) {
+ io_state = new_state;
+ io_state_changed.set_value();
+ io_state_changed = seastar::promise<>();
+ }
+
+ /*
+ * not atomic below
+ */
+
+ if (dispatch_in) {
+ do_in_dispatch();
+ }
+}
+
+seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
+{
+ ceph_assert_always(io_state != io_state_t::open);
+ ceph_assert_always(frame_assembler != nullptr);
+ ceph_assert_always(!frame_assembler->is_socket_valid());
+ return seastar::when_all(
+ [this] {
+ if (out_exit_dispatching) {
+ return out_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }(),
+ [this] {
+ if (in_exit_dispatching) {
+ return in_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }()
+ ).discard_result().then([this] {
+ return std::move(frame_assembler);
+ });
+}
+
+void IOHandler::reset_session(bool full)
+{
+ // reset in
+ in_seq = 0;
+ if (full) {
+ reset_out();
+ dispatch_remote_reset();
+ }
+}
+
+void IOHandler::requeue_out_sent()
+{
+ assert(io_state != io_state_t::open);
+ if (out_sent_msgs.empty()) {
+ return;
+ }
+
+ out_seq -= out_sent_msgs.size();
+ logger().debug("{} requeue {} items, revert out_seq to {}",
+ conn, out_sent_msgs.size(), out_seq);
+ for (MessageURef& msg : out_sent_msgs) {
+ msg->clear_payload();
+ msg->set_seq(0);
+ }
+ out_pending_msgs.insert(
+ out_pending_msgs.begin(),
+ std::make_move_iterator(out_sent_msgs.begin()),
+ std::make_move_iterator(out_sent_msgs.end()));
+ out_sent_msgs.clear();
+ notify_out_dispatch();
+}
+
+void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
+{
+ assert(io_state != io_state_t::open);
+ if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
+ logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
+ conn, out_seq, seq);
+ out_seq = seq;
+ return;
+ }
+ logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
+ conn, seq, out_sent_msgs.size(), out_seq);
+ while (!out_sent_msgs.empty()) {
+ auto cur_seq = out_sent_msgs.front()->get_seq();
+ if (cur_seq == 0 || cur_seq > seq) {
+ break;
+ } else {
+ out_sent_msgs.pop_front();
+ }
+ }
+ requeue_out_sent();
+}
+
+void IOHandler::reset_out()
+{
+ assert(io_state != io_state_t::open);
+ out_seq = 0;
+ out_pending_msgs.clear();
+ out_sent_msgs.clear();
+ need_keepalive = false;
+ next_keepalive_ack = std::nullopt;
+ ack_left = 0;
+}
+
+void IOHandler::dispatch_accept()
+{
+ if (io_state == io_state_t::drop) {
+ return;
+ }
+ // protocol_is_connected can be from true to true here if the replacing is
+ // happening to a connected connection.
+ protocol_is_connected = true;
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void IOHandler::dispatch_connect()
+{
+ if (io_state == io_state_t::drop) {
+ return;
+ }
+ ceph_assert_always(protocol_is_connected == false);
+ protocol_is_connected = true;
+ dispatchers.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void IOHandler::dispatch_reset(bool is_replace)
+{
+ ceph_assert_always(io_state == io_state_t::drop);
+ if (!need_dispatch_reset) {
+ return;
+ }
+ need_dispatch_reset = false;
+ dispatchers.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
+}
+
+void IOHandler::dispatch_remote_reset()
+{
+ if (io_state == io_state_t::drop) {
+ return;
+ }
+ dispatchers.ms_handle_remote_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void IOHandler::ack_out_sent(seq_num_t seq)
+{
+ if (conn.policy.lossy) { // lossy connections don't keep sent messages
+ return;
+ }
+ while (!out_sent_msgs.empty() &&
+ out_sent_msgs.front()->get_seq() <= seq) {
+ logger().trace("{} got ack seq {} >= {}, pop {}",
+ conn, seq, out_sent_msgs.front()->get_seq(),
+ *out_sent_msgs.front());
+ out_sent_msgs.pop_front();
+ }
+}
+
+seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
+ assert(!is_out_queued());
+ return frame_assembler->flush(
+ ).then([this] {
+ if (!is_out_queued()) {
+ // still nothing pending to send after flush,
+ // the dispatching can ONLY stop now
+ ceph_assert(out_dispatching);
+ out_dispatching = false;
+ if (unlikely(out_exit_dispatching.has_value())) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: nothing queued at {},"
+ " set out_exit_dispatching",
+ conn, io_state);
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ } else {
+ // something is pending to send during flushing
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+}
+
+seastar::future<> IOHandler::do_out_dispatch()
+{
+ return seastar::repeat([this] {
+ switch (io_state) {
+ case io_state_t::open: {
+ bool still_queued = is_out_queued();
+ if (unlikely(!still_queued)) {
+ return try_exit_out_dispatch();
+ }
+ auto to_ack = ack_left;
+ assert(to_ack == 0 || in_seq > 0);
+ return frame_assembler->write(
+ sweep_out_pending_msgs_to_sent(
+ need_keepalive, next_keepalive_ack, to_ack > 0)
+ ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
+ need_keepalive = false;
+ if (next_keepalive_ack == prv_keepalive_ack) {
+ next_keepalive_ack = std::nullopt;
+ }
+ assert(ack_left >= to_ack);
+ ack_left -= to_ack;
+ if (!is_out_queued()) {
+ return try_exit_out_dispatch();
+ } else {
+ // messages were enqueued during socket write
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+ }
+ case io_state_t::delay:
+ // delay out dispatching until open
+ if (out_exit_dispatching) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
+ } else {
+ logger().info("{} do_out_dispatch: delay ...", conn);
+ }
+ return io_state_changed.get_future(
+ ).then([] { return stop_t::no; });
+ case io_state_t::drop:
+ ceph_assert(out_dispatching);
+ out_dispatching = false;
+ if (out_exit_dispatching) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
+ } else {
+ logger().info("{} do_out_dispatch: dropped", conn);
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_assert(false);
+ }
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() != std::errc::broken_pipe &&
+ e.code() != std::errc::connection_reset &&
+ e.code() != error::negotiation_failure) {
+ logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
+ conn, io_state, e);
+ ceph_abort();
+ }
+
+ if (io_state == io_state_t::open) {
+ logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
+ conn, io_state, e);
+ std::exception_ptr eptr;
+ try {
+ throw e;
+ } catch(...) {
+ eptr = std::current_exception();
+ }
+ set_io_state(io_state_t::delay);
+ handshake_listener->notify_out_fault("do_out_dispatch", eptr);
+ } else {
+ logger().info("{} do_out_dispatch(): fault at {} -- {}",
+ conn, io_state, e);
+ }
+
+ return do_out_dispatch();
+ });
+}
+
+void IOHandler::notify_out_dispatch()
+{
+ handshake_listener->notify_out();
+ if (out_dispatching) {
+ // already dispatching
+ return;
+ }
+ out_dispatching = true;
+ switch (io_state) {
+ case io_state_t::open:
+ [[fallthrough]];
+ case io_state_t::delay:
+ assert(!gate.is_closed());
+ gate.dispatch_in_background("do_out_dispatch", conn, [this] {
+ return do_out_dispatch();
+ });
+ return;
+ case io_state_t::drop:
+ out_dispatching = false;
+ return;
+ default:
+ ceph_assert(false);
+ }
+}
+
+seastar::future<>
+IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
+{
+ return frame_assembler->read_frame_payload(
+ ).then([this, throttle_stamp, msg_size](auto payload) {
+ if (unlikely(io_state != io_state_t::open)) {
+ logger().debug("{} triggered {} during read_message()",
+ conn, io_state);
+ abort_protocol();
+ }
+
+ utime_t recv_stamp{seastar::lowres_system_clock::now()};
+
+ // we need to get the size before std::moving segments data
+ auto msg_frame = MessageFrame::Decode(*payload);
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = msg_frame.header();
+
+ logger().trace("{} got {} + {} + {} byte message,"
+ " envelope type={} src={} off={} seq={}",
+ conn, msg_frame.front_len(), msg_frame.middle_len(),
+ msg_frame.data_len(), current_header.type, conn.get_peer_name(),
+ current_header.data_off, current_header.seq);
+
+ ceph_msg_header header{current_header.seq,
+ current_header.tid,
+ current_header.type,
+ current_header.priority,
+ current_header.version,
+ ceph_le32(msg_frame.front_len()),
+ ceph_le32(msg_frame.middle_len()),
+ ceph_le32(msg_frame.data_len()),
+ current_header.data_off,
+ conn.get_peer_name(),
+ current_header.compat_version,
+ current_header.reserved,
+ ceph_le32(0)};
+ ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+ ceph_le32(0), ceph_le64(0), current_header.flags};
+
+ auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this());
+ Message *message = decode_message(nullptr, 0, header, footer,
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+ if (!message) {
+ logger().warn("{} decode message failed", conn);
+ abort_in_fault();
+ }
+
+ // store reservation size in message, so we don't get confused
+ // by messages entering the dispatch queue through other paths.
+ message->set_dispatch_throttle_size(msg_size);
+
+ message->set_throttle_stamp(throttle_stamp);
+ message->set_recv_stamp(recv_stamp);
+ message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
+
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the
+ // client side queueing because messages can't be renumbered, but the (kernel)
+ // client will occasionally pull a message out of the sent queue to send
+ // elsewhere. in that case it doesn't matter if we "got" it or not.
+ uint64_t cur_seq = get_in_seq();
+ if (message->get_seq() <= cur_seq) {
+ logger().error("{} got old message {} <= {} {}, discarding",
+ conn, message->get_seq(), cur_seq, *message);
+ if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
+ local_conf()->ms_die_on_old_message) {
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return seastar::now();
+ } else if (message->get_seq() > cur_seq + 1) {
+ logger().error("{} missed message? skipped from seq {} to {}",
+ conn, cur_seq, message->get_seq());
+ if (local_conf()->ms_die_on_skipped_message) {
+ ceph_assert(0 == "skipped incoming seq");
+ }
+ }
+
+ // note last received message.
+ in_seq = message->get_seq();
+ if (conn.policy.lossy) {
+ logger().debug("{} <== #{} === {} ({})",
+ conn,
+ message->get_seq(),
+ *message,
+ message->get_type());
+ } else {
+ logger().debug("{} <== #{},{} === {} ({})",
+ conn,
+ message->get_seq(),
+ current_header.ack_seq,
+ *message,
+ message->get_type());
+ }
+
+ // notify ack
+ if (!conn.policy.lossy) {
+ ++ack_left;
+ notify_out_dispatch();
+ }
+
+ ack_out_sent(current_header.ack_seq);
+
+ // TODO: change MessageRef with seastar::shared_ptr
+ auto msg_ref = MessageRef{message, false};
+ assert(io_state == io_state_t::open);
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ });
+}
+
+void IOHandler::do_in_dispatch()
+{
+ ceph_assert_always(!in_exit_dispatching.has_value());
+ in_exit_dispatching = seastar::promise<>();
+ gate.dispatch_in_background("do_in_dispatch", conn, [this] {
+ return seastar::keep_doing([this] {
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::MESSAGE: {
+ size_t msg_size = get_msg_size(*ret.rx_frame_asm);
+ return seastar::futurize_invoke([this] {
+ // throttle_message() logic
+ if (!conn.policy.throttler_messages) {
+ return seastar::now();
+ }
+ // TODO: message throttler
+ ceph_assert(false);
+ return seastar::now();
+ }).then([this, msg_size] {
+ // throttle_bytes() logic
+ if (!conn.policy.throttler_bytes) {
+ return seastar::now();
+ }
+ if (!msg_size) {
+ return seastar::now();
+ }
+ logger().trace("{} wants {} bytes from policy throttler {}/{}",
+ conn, msg_size,
+ conn.policy.throttler_bytes->get_current(),
+ conn.policy.throttler_bytes->get_max());
+ return conn.policy.throttler_bytes->get(msg_size);
+ }).then([this, msg_size] {
+ // TODO: throttle_dispatch_queue() logic
+ utime_t throttle_stamp{seastar::lowres_system_clock::now()};
+ return read_message(throttle_stamp, msg_size);
+ });
+ }
+ case Tag::ACK:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_message_ack() logic
+ auto ack = AckFrame::Decode(payload->back());
+ logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
+ ack_out_sent(ack.seq());
+ });
+ case Tag::KEEPALIVE2:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_keepalive2() logic
+ auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
+ logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+ conn, keepalive_frame.timestamp());
+ // notify keepalive ack
+ next_keepalive_ack = keepalive_frame.timestamp();
+ notify_out_dispatch();
+
+ last_keepalive = seastar::lowres_system_clock::now();
+ });
+ case Tag::KEEPALIVE2_ACK:
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ // handle_keepalive2_ack() logic
+ auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
+ auto _last_keepalive_ack =
+ seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
+ set_last_keepalive_ack(_last_keepalive_ack);
+ logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
+ conn, _last_keepalive_ack);
+ });
+ default: {
+ logger().warn("{} do_in_dispatch() received unexpected tag: {}",
+ conn, static_cast<uint32_t>(ret.tag));
+ abort_in_fault();
+ }
+ }
+ });
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+
+ if (io_state == io_state_t::open) {
+ logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
+ conn, io_state, e_what);
+ set_io_state(io_state_t::delay);
+ handshake_listener->notify_out_fault("do_in_dispatch", eptr);
+ } else {
+ logger().info("{} do_in_dispatch(): fault at {} -- {}",
+ conn, io_state, e_what);
+ }
+ }).finally([this] {
+ ceph_assert_always(in_exit_dispatching.has_value());
+ in_exit_dispatching->set_value();
+ in_exit_dispatching = std::nullopt;
+ });
+ });
+}
+
+} // namespace crimson::net
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/util/later.hh>
+
+#include "crimson/common/gated.h"
+#include "Fwd.h"
+#include "SocketConnection.h"
+#include "FrameAssemblerV2.h"
+
+namespace crimson::net {
+
+/**
+ * HandshakeListener
+ *
+ * The interface class for IOHandler to notify the ProtocolV2 for handshake.
+ *
+ * The notifications may be cross-core and asynchronous.
+ */
+class HandshakeListener {
+public:
+ virtual ~HandshakeListener() = default;
+
+ HandshakeListener(const HandshakeListener&) = delete;
+ HandshakeListener(HandshakeListener &&) = delete;
+ HandshakeListener &operator=(const HandshakeListener &) = delete;
+ HandshakeListener &operator=(HandshakeListener &&) = delete;
+
+ virtual void notify_out() = 0;
+
+ virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
+
+ virtual void notify_mark_down() = 0;
+
+protected:
+ HandshakeListener() = default;
+};
+
+/**
+ * IOHandler
+ *
+ * Implements the message read and write paths after the handshake, and also be
+ * responsible to dispatch events. It is supposed to be working on the same
+ * core with the underlying socket and the FrameAssemblerV2 class.
+ */
+class IOHandler final : public ConnectionHandler {
+public:
+ IOHandler(ChainedDispatchers &,
+ SocketConnection &);
+
+ ~IOHandler() final;
+
+ IOHandler(const IOHandler &) = delete;
+ IOHandler(IOHandler &&) = delete;
+ IOHandler &operator=(const IOHandler &) = delete;
+ IOHandler &operator=(IOHandler &&) = delete;
+
+/*
+ * as ConnectionHandler
+ */
+private:
+ bool is_connected() const final {
+ return protocol_is_connected;
+ }
+
+ seastar::future<> send(MessageURef msg) final;
+
+ seastar::future<> send_keepalive() final;
+
+ clock_t::time_point get_last_keepalive() const final {
+ return last_keepalive;
+ }
+
+ clock_t::time_point get_last_keepalive_ack() const final {
+ return last_keepalive_ack;
+ }
+
+ void set_last_keepalive_ack(clock_t::time_point when) final {
+ last_keepalive_ack = when;
+ }
+
+ void mark_down() final;
+
+/*
+ * as IOHandler to be called by ProtocolV2 handshake
+ *
+ * The calls may be cross-core and asynchronous
+ */
+public:
+ void set_handshake_listener(HandshakeListener &hl) {
+ ceph_assert_always(handshake_listener == nullptr);
+ handshake_listener = &hl;
+ }
+
+ struct io_stat_printer {
+ const IOHandler &io_handler;
+ };
+ void print_io_stat(std::ostream &out) const;
+
+ seastar::future<> close_io(
+ bool is_dispatch_reset,
+ bool is_replace) {
+ ceph_assert_always(io_state == io_state_t::drop);
+
+ if (is_dispatch_reset) {
+ dispatch_reset(is_replace);
+ }
+ assert(!gate.is_closed());
+ return gate.close();
+ }
+
+ /**
+ * io_state_t
+ *
+ * The io_state is changed with the protocol state, to control the
+ * io behavior accordingly.
+ */
+ enum class io_state_t : uint8_t {
+ none, // no IO is possible as the connection is not available to the user yet.
+ delay, // IO is delayed until open.
+ open, // Dispatch In and Out concurrently.
+ drop // Drop IO as the connection is closed.
+ };
+ friend class fmt::formatter<io_state_t>;
+
+ void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+
+ seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
+
+ void reset_session(bool full);
+
+ void requeue_out_sent_up_to(seq_num_t seq);
+
+ void requeue_out_sent();
+
+ bool is_out_queued_or_sent() const {
+ return is_out_queued() || !out_sent_msgs.empty();
+ }
+
+ seq_num_t get_in_seq() const {
+ return in_seq;
+ }
+
+ void dispatch_accept();
+
+ void dispatch_connect();
+
+ private:
+ void dispatch_reset(bool is_replace);
+
+ void dispatch_remote_reset();
+
+ bool is_out_queued() const {
+ return (!out_pending_msgs.empty() ||
+ ack_left > 0 ||
+ need_keepalive ||
+ next_keepalive_ack.has_value());
+ }
+
+ void reset_out();
+
+ seastar::future<stop_t> try_exit_out_dispatch();
+
+ seastar::future<> do_out_dispatch();
+
+ ceph::bufferlist sweep_out_pending_msgs_to_sent(
+ bool require_keepalive,
+ std::optional<utime_t> maybe_keepalive_ack,
+ bool require_ack);
+
+ void notify_out_dispatch();
+
+ void ack_out_sent(seq_num_t seq);
+
+ seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
+
+ void do_in_dispatch();
+
+private:
+ ChainedDispatchers &dispatchers;
+
+ SocketConnection &conn;
+
+ HandshakeListener *handshake_listener = nullptr;
+
+ crimson::common::Gated gate;
+
+ FrameAssemblerV2Ref frame_assembler;
+
+ bool protocol_is_connected = false;
+
+ bool need_dispatch_reset = true;
+
+ io_state_t io_state = io_state_t::none;
+
+ // wait until current io_state changed
+ seastar::promise<> io_state_changed;
+
+ /*
+ * out states for writing
+ */
+
+ bool out_dispatching = false;
+
+ std::optional<seastar::promise<>> out_exit_dispatching;
+
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+
+ // messages to be resent after connection gets reset
+ std::deque<MessageURef> out_pending_msgs;
+
+ // messages sent, but not yet acked by peer
+ std::deque<MessageURef> out_sent_msgs;
+
+ bool need_keepalive = false;
+
+ std::optional<utime_t> next_keepalive_ack = std::nullopt;
+
+ uint64_t ack_left = 0;
+
+ /*
+ * in states for reading
+ */
+
+ std::optional<seastar::promise<>> in_exit_dispatching;
+
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+
+ clock_t::time_point last_keepalive;
+
+ clock_t::time_point last_keepalive_ack;
+};
+
+inline std::ostream& operator<<(
+ std::ostream& out, IOHandler::io_stat_printer stat) {
+ stat.io_handler.print_io_stat(out);
+ return out;
+}
+
+} // namespace crimson::net
+
+template <>
+struct fmt::formatter<crimson::net::IOHandler::io_state_t>
+ : fmt::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(crimson::net::IOHandler::io_state_t state, FormatContext& ctx) {
+ using enum crimson::net::IOHandler::io_state_t;
+ std::string_view name;
+ switch (state) {
+ case none:
+ name = "none";
+ break;
+ case delay:
+ name = "delay";
+ break;
+ case open:
+ name = "open";
+ break;
+ case drop:
+ name = "drop";
+ break;
+ }
+ return formatter<string_view>::format(name, ctx);
+ }
+};