From ff57f47295d72ae848e02b34c79ded220a7cce8f Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 1 Dec 2022 10:34:56 +0800 Subject: [PATCH] crimson/net: move message write path from ProtocolV2 to Protocol Signed-off-by: Yingxin Cheng --- src/crimson/net/FrameAssemblerV2.h | 5 +- src/crimson/net/Protocol.cc | 119 +++++++++++++++++++++++------ src/crimson/net/Protocol.h | 25 +----- src/crimson/net/ProtocolV2.cc | 55 ------------- src/crimson/net/ProtocolV2.h | 7 -- 5 files changed, 101 insertions(+), 110 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index 5cea92440fe..3165a048fc1 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -61,6 +61,9 @@ public: * socket maintainence interfaces */ + // the socket exists and not shutdown + bool is_socket_valid() const; + void set_socket(SocketRef &&); void learn_socket_ephemeral_port_as_connector(uint16_t port); @@ -120,8 +123,6 @@ public: private: bool has_socket() const; - bool is_socket_valid() const; - void log_main_preamble(const ceph::bufferlist &bl); #ifdef UNIT_TESTS_BUILT diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 58edd882561..3ec265d4b23 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -9,14 +9,17 @@ #include "crimson/net/Errors.h" #include "crimson/net/chained_dispatchers.h" #include "crimson/net/SocketConnection.h" +#include "crimson/net/SocketMessenger.h" #include "msg/Message.h" namespace { - seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_ms); - } + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); } +} // namespace anonymous + namespace crimson::net { Protocol::Protocol(ChainedDispatchers& dispatchers, @@ -33,16 +36,58 @@ Protocol::~Protocol() } ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent( - size_t num_msgs, - bool require_keepalive, - std::optional maybe_keepalive_ack, - bool require_ack) + bool require_keepalive, + std::optional maybe_keepalive_ack, + bool require_ack) { - ceph::bufferlist bl = do_sweep_messages(out_pending_msgs, - num_msgs, - require_keepalive, - maybe_keepalive_ack, - require_ack); + std::size_t num_msgs = out_pending_msgs.size(); + ceph::bufferlist bl; + + if (unlikely(require_keepalive)) { + auto keepalive_frame = KeepAliveFrame::Encode(); + bl.append(frame_assembler.get_buffer(keepalive_frame)); + } + + if (unlikely(maybe_keepalive_ack.has_value())) { + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); + bl.append(frame_assembler.get_buffer(keepalive_ack_frame)); + } + + if (require_ack && num_msgs == 0u) { + auto ack_frame = AckFrame::Encode(get_in_seq()); + bl.append(frame_assembler.get_buffer(ack_frame)); + } + + std::for_each( + out_pending_msgs.begin(), + out_pending_msgs.begin()+num_msgs, + [this, &bl](const MessageURef& msg) { + // set priority + msg->get_header().src = conn.messenger.get_myname(); + + msg->encode(conn.features, 0); + + ceph_assert(!msg->get_seq() && "message already has seq"); + msg->set_seq(++out_seq); + + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + ceph_msg_header2 header2{header.seq, header.tid, + header.type, header.priority, + header.version, + ceph_le32(0), header.data_off, + ceph_le64(get_in_seq()), + footer.flags, header.compat_version, + header.reserved}; + + auto message = MessageFrame::Encode(header2, + msg->get_payload(), msg->get_middle(), msg->get_data()); + logger().debug("{} --> #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + bl.append(frame_assembler.get_buffer(message)); + }); + if (!conn.policy.lossy) { out_sent_msgs.insert( out_sent_msgs.end(), @@ -71,6 +116,35 @@ seastar::future<> Protocol::send_keepalive() return seastar::now(); } +void Protocol::set_out_state( + const Protocol::out_state_t &new_state) +{ + ceph_assert_always(!( + (new_state == out_state_t::none && out_state != out_state_t::none) || + (new_state == out_state_t::open && out_state == out_state_t::open) || + (new_state != out_state_t::drop && out_state == out_state_t::drop) + )); + + if (out_state != out_state_t::open && + new_state == out_state_t::open) { + // to open + ceph_assert_always(frame_assembler.is_socket_valid()); + } else if (out_state == out_state_t::open && + new_state != out_state_t::open) { + // from open + if (out_dispatching) { + ceph_assert_always(!out_exit_dispatching.has_value()); + out_exit_dispatching = seastar::shared_promise<>(); + } + } + + if (out_state != new_state) { + out_state = new_state; + out_state_changed.set_value(); + out_state_changed = seastar::shared_promise<>(); + } +} + void Protocol::notify_keepalive_ack(utime_t keepalive_ack) { logger().trace("{} got keepalive ack {}", conn, keepalive_ack); @@ -183,7 +257,6 @@ seastar::future<> Protocol::do_out_dispatch() return seastar::repeat([this] { switch (out_state) { case out_state_t::open: { - size_t num_msgs = out_pending_msgs.size(); bool still_queued = is_out_queued(); if (unlikely(!still_queued)) { return try_exit_out_dispatch(); @@ -193,7 +266,7 @@ seastar::future<> Protocol::do_out_dispatch() // sweep all pending out with the concrete Protocol return frame_assembler.write( sweep_out_pending_msgs_to_sent( - num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0) + need_keepalive, next_keepalive_ack, to_ack > 0) ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { need_keepalive = false; if (next_keepalive_ack == prv_keepalive_ack) { @@ -243,22 +316,22 @@ seastar::future<> Protocol::do_out_dispatch() ceph_abort(); } - std::exception_ptr eptr; - try { - throw e; - } catch(...) { - eptr = std::current_exception(); - } - notify_out_fault(eptr); - if (out_state == out_state_t::open) { logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}", conn, out_state, e); - out_state = out_state_t::delay; + std::exception_ptr eptr; + try { + throw e; + } catch(...) { + eptr = std::current_exception(); + } + set_out_state(out_state_t::delay); + notify_out_fault(eptr); } else { logger().info("{} do_out_dispatch(): fault at {} -- {}", conn, out_state, e); } + return do_out_dispatch(); }); } diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index c7bc71630b9..4c82d9847db 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -45,13 +45,6 @@ class Protocol { Protocol(ChainedDispatchers& dispatchers, SocketConnection& conn); - virtual ceph::bufferlist do_sweep_messages( - const std::deque& msgs, - size_t num_msgs, - bool require_keepalive, - std::optional maybe_keepalive_ack, - bool require_ack) = 0; - virtual void notify_out() = 0; virtual void notify_out_fault(std::exception_ptr) = 0; @@ -110,18 +103,9 @@ class Protocol { open, drop }; - friend class fmt::formatter; - void set_out_state(const out_state_t& state) { - if (out_state == out_state_t::open && - state != out_state_t::open && - out_dispatching) { - out_exit_dispatching = seastar::shared_promise<>(); - } - out_state = state; - out_state_changed.set_value(); - out_state_changed = seastar::shared_promise<>(); - } + + void set_out_state(const out_state_t &new_state); seastar::future<> wait_out_exit_dispatching() { if (out_exit_dispatching) { @@ -162,10 +146,6 @@ class Protocol { in_seq = _in_seq; } - seq_num_t increment_out_seq() { - return ++out_seq; - } - ChainedDispatchers& dispatchers; SocketConnection &conn; @@ -185,7 +165,6 @@ class Protocol { seastar::future<> do_out_dispatch(); ceph::bufferlist sweep_out_pending_msgs_to_sent( - size_t num_msgs, bool require_keepalive, std::optional maybe_keepalive_ack, bool require_ack); diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 0d5963edff6..8bce2ee6a84 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1766,61 +1766,6 @@ void ProtocolV2::trigger_replacing(bool reconnect, // READY state -ceph::bufferlist ProtocolV2::do_sweep_messages( - const std::deque& msgs, - size_t num_msgs, - bool require_keepalive, - std::optional maybe_keepalive_ack, - bool require_ack) -{ - ceph::bufferlist bl; - - if (unlikely(require_keepalive)) { - auto keepalive_frame = KeepAliveFrame::Encode(); - bl.append(frame_assembler.get_buffer(keepalive_frame)); - } - - if (unlikely(maybe_keepalive_ack.has_value())) { - auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); - bl.append(frame_assembler.get_buffer(keepalive_ack_frame)); - } - - if (require_ack && num_msgs == 0u) { - auto ack_frame = AckFrame::Encode(get_in_seq()); - bl.append(frame_assembler.get_buffer(ack_frame)); - } - - std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) { - // TODO: move to common code - // set priority - msg->get_header().src = messenger.get_myname(); - - msg->encode(conn.features, 0); - - ceph_assert(!msg->get_seq() && "message already has seq"); - msg->set_seq(increment_out_seq()); - - ceph_msg_header &header = msg->get_header(); - ceph_msg_footer &footer = msg->get_footer(); - - ceph_msg_header2 header2{header.seq, header.tid, - header.type, header.priority, - header.version, - ceph_le32(0), header.data_off, - ceph_le64(get_in_seq()), - footer.flags, header.compat_version, - header.reserved}; - - auto message = MessageFrame::Encode(header2, - msg->get_payload(), msg->get_middle(), msg->get_data()); - logger().debug("{} --> #{} === {} ({})", - conn, msg->get_seq(), *msg, msg->get_type()); - bl.append(frame_assembler.get_buffer(message)); - }); - - return bl; -} - void ProtocolV2::notify_out_fault(std::exception_ptr eptr) { fault(state_t::READY, "notify_out_fault", eptr); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 3b02b2f6a95..5c7d369bb2a 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -45,13 +45,6 @@ class ProtocolV2 final : public Protocol { void print_conn(std::ostream&) const final; private: - ceph::bufferlist do_sweep_messages( - const std::deque& msgs, - size_t num_msgs, - bool require_keepalive, - std::optional keepalive_ack, - bool require_ack) override; - void notify_out() override; void notify_out_fault(std::exception_ptr) override; -- 2.39.5