From: Yingxin Cheng Date: Tue, 1 Nov 2022 08:43:15 +0000 (+0800) Subject: crimson/net: move IO members into Protocol class X-Git-Tag: v18.1.0~375^2~32 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f26c1c09faadb8401cc2147f3d520c6680fcece3;p=ceph.git crimson/net: move IO members into Protocol class In order to introduce the cross-core IOHandler class. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 734af8eda538..1adfe895455f 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -92,24 +92,24 @@ ceph::bufferlist Protocol::sweep_messages_and_move_to_sent( std::optional keepalive_ack, bool require_ack) { - ceph::bufferlist bl = do_sweep_messages(conn.out_q, - num_msgs, - require_keepalive, - keepalive_ack, + ceph::bufferlist bl = do_sweep_messages(out_q, + num_msgs, + require_keepalive, + keepalive_ack, require_ack); if (!conn.policy.lossy) { - conn.sent.insert(conn.sent.end(), - std::make_move_iterator(conn.out_q.begin()), - std::make_move_iterator(conn.out_q.end())); + sent.insert(sent.end(), + std::make_move_iterator(out_q.begin()), + std::make_move_iterator(out_q.end())); } - conn.out_q.clear(); + out_q.clear(); return bl; } seastar::future<> Protocol::send(MessageURef msg) { if (write_state != write_state_t::drop) { - conn.out_q.push_back(std::move(msg)); + out_q.push_back(std::move(msg)); write_event(); } return seastar::now(); @@ -142,41 +142,41 @@ void Protocol::notify_ack() void Protocol::requeue_sent() { assert(write_state != write_state_t::open); - if (conn.sent.empty()) { + if (sent.empty()) { return; } - conn.out_seq -= conn.sent.size(); + out_seq -= sent.size(); logger().debug("{} requeue {} items, revert out_seq to {}", - conn, conn.sent.size(), conn.out_seq); - for (MessageURef& msg : conn.sent) { + conn, sent.size(), out_seq); + for (MessageURef& msg : sent) { msg->clear_payload(); msg->set_seq(0); } - conn.out_q.insert(conn.out_q.begin(), - std::make_move_iterator(conn.sent.begin()), - std::make_move_iterator(conn.sent.end())); - conn.sent.clear(); + out_q.insert(out_q.begin(), + std::make_move_iterator(sent.begin()), + std::make_move_iterator(sent.end())); + sent.clear(); write_event(); } void Protocol::requeue_up_to(seq_num_t seq) { assert(write_state != write_state_t::open); - if (conn.sent.empty() && conn.out_q.empty()) { + if (sent.empty() && out_q.empty()) { logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", - conn, conn.out_seq, seq); - conn.out_seq = seq; + conn, out_seq, seq); + out_seq = seq; return; } logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})", - conn, seq, conn.sent.size(), conn.out_seq); - while (!conn.sent.empty()) { - auto cur_seq = conn.sent.front()->get_seq(); + conn, seq, sent.size(), out_seq); + while (!sent.empty()) { + auto cur_seq = sent.front()->get_seq(); if (cur_seq == 0 || cur_seq > seq) { break; } else { - conn.sent.pop_front(); + sent.pop_front(); } } requeue_sent(); @@ -185,9 +185,9 @@ void Protocol::requeue_up_to(seq_num_t seq) void Protocol::reset_write() { assert(write_state != write_state_t::open); - conn.out_seq = 0; - conn.out_q.clear(); - conn.sent.clear(); + out_seq = 0; + out_q.clear(); + sent.clear(); need_keepalive = false; keepalive_ack = std::nullopt; ack_left = 0; @@ -198,10 +198,10 @@ void Protocol::ack_writes(seq_num_t seq) if (conn.policy.lossy) { // lossy connections don't keep sent messages return; } - while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) { + while (!sent.empty() && sent.front()->get_seq() <= seq) { logger().trace("{} got ack seq {} >= {}, pop {}", - conn, seq, conn.sent.front()->get_seq(), *conn.sent.front()); - conn.sent.pop_front(); + conn, seq, sent.front()->get_seq(), *sent.front()); + sent.pop_front(); } } @@ -233,13 +233,13 @@ seastar::future<> Protocol::do_write_dispatch_sweep() return seastar::repeat([this] { switch (write_state) { case write_state_t::open: { - size_t num_msgs = conn.out_q.size(); + size_t num_msgs = out_q.size(); bool still_queued = is_queued(); if (unlikely(!still_queued)) { return try_exit_sweep(); } auto acked = ack_left; - assert(acked == 0 || conn.in_seq > 0); + assert(acked == 0 || in_seq > 0); // sweep all pending writes with the concrete Protocol return conn.socket->write(sweep_messages_and_move_to_sent( num_msgs, need_keepalive, keepalive_ack, acked > 0) diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index f819c4692083..268ffc996820 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -50,7 +50,8 @@ class Protocol { virtual void start_accept(SocketRef&& socket, const entity_addr_t& peer_addr) = 0; - virtual void print(std::ostream&) const = 0; + virtual void print_conn(std::ostream&) const = 0; + protected: Protocol(ChainedDispatchers& dispatchers, SocketConnection& conn); @@ -86,9 +87,39 @@ class Protocol { // the write state-machine public: + using clock_t = seastar::lowres_system_clock; + seastar::future<> send(MessageURef msg); + seastar::future<> keepalive(); + clock_t::time_point get_last_keepalive() const { + return last_keepalive; + } + + clock_t::time_point get_last_keepalive_ack() const { + return last_keepalive_ack; + } + + void set_last_keepalive_ack(clock_t::time_point when) { + last_keepalive_ack = when; + } + + struct io_stat_printer { + const Protocol &protocol; + }; + void print_io_stat(std::ostream &out) const { + out << "io_stat(" + << "in_seq=" << in_seq + << ", out_seq=" << out_seq + << ", out_q_size=" << out_q.size() + << ", sent_size=" << sent.size() + << ", need_ack=" << (ack_left > 0) + << ", need_keepalive=" << need_keepalive + << ", need_keepalive_ack=" << bool(keepalive_ack) + << ")"; + } + // TODO: encapsulate a SessionedSender class protected: // write_state is changed with state atomically, indicating the write @@ -129,21 +160,56 @@ class Protocol { void reset_write(); + void reset_read() { + in_seq = 0; + } + bool is_queued() const { - return (!conn.out_q.empty() || + return (!out_q.empty() || ack_left > 0 || need_keepalive || keepalive_ack.has_value()); } + bool is_queued_or_sent() const { + return is_queued() || !sent.empty(); + } + void ack_writes(seq_num_t seq); + + void set_last_keepalive(clock_t::time_point when) { + last_keepalive = when; + } + + seq_num_t get_in_seq() const { + return in_seq; + } + + void set_in_seq(seq_num_t _in_seq) { + in_seq = _in_seq; + } + + seq_num_t increment_out() { + return ++out_seq; + } + crimson::common::Gated gate; private: write_state_t write_state = write_state_t::none; + // wait until current state changed seastar::shared_promise<> state_changed; + /// the seq num of the last transmitted message + seq_num_t out_seq = 0; + + // messages to be resent after connection gets reset + std::deque out_q; + + // messages sent, but not yet acked by peer + std::deque sent; + bool need_keepalive = false; std::optional keepalive_ack = std::nullopt; uint64_t ack_left = 0; @@ -153,16 +219,28 @@ class Protocol { // it needs to wait for exit_open until writing is stopped or failed. std::optional> exit_open; + /// the seq num of the last received message + seq_num_t in_seq = 0; + + clock_t::time_point last_keepalive; + + clock_t::time_point last_keepalive_ack; + seastar::future try_exit_sweep(); seastar::future<> do_write_dispatch_sweep(); void write_event(); }; inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) { - proto.print(out); + proto.print_conn(out); return out; } +inline std::ostream& operator<<( + std::ostream& out, Protocol::io_stat_printer stat) { + stat.protocol.print_io_stat(out); + return out; +} } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 11fdbbcd5c59..0e74ce827a59 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -379,8 +379,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e conn, func_name, get_state_name(state), eptr); close(true); } else if (conn.policy.server || - (conn.policy.standby && - (!is_queued() && conn.sent.empty()))) { + (conn.policy.standby && !is_queued_or_sent())) { logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}", conn, func_name, get_state_name(state), eptr); execute_standby(); @@ -399,7 +398,7 @@ void ProtocolV2::reset_session(bool full) { server_cookie = 0; connect_seq = 0; - conn.in_seq = 0; + reset_read(); if (full) { client_cookie = generate_client_cookie(); peer_global_seq = 0; @@ -755,12 +754,12 @@ ProtocolV2::client_reconnect() server_cookie, global_seq, connect_seq, - conn.in_seq); + get_in_seq()); logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={}," - " server_cookie={}, gs={}, cs={}, msg_seq={}", + " server_cookie={}, gs={}, cs={}, in_seq={}", conn, messenger.get_myaddrs(), client_cookie, server_cookie, - global_seq, connect_seq, conn.in_seq); + global_seq, connect_seq, get_in_seq()); return write_frame(reconnect).then([this] { return read_main_preamble(); }).then([this] (Tag tag) { @@ -899,12 +898,11 @@ void ProtocolV2::execute_connecting() } switch (next) { case next_step_t::ready: { - logger().info("{} connected:" - " gs={}, pgs={}, cs={}, client_cookie={}," - " server_cookie={}, in_seq={}, out_seq={}, out_q={}", + logger().info("{} connected: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}", conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, conn.in_seq, - conn.out_seq, conn.out_q.size()); + client_cookie, server_cookie, + io_stat_printer{*this}); execute_ready(true); break; } @@ -927,8 +925,7 @@ void ProtocolV2::execute_connecting() } if (conn.policy.server || - (conn.policy.standby && - (!is_queued() && conn.sent.empty()))) { + (conn.policy.standby && !is_queued_or_sent())) { logger().info("{} execute_connecting(): fault at {} with nothing to send," " going to STANDBY -- {}", conn, get_state_name(state), eptr); @@ -1607,11 +1604,11 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { conn, get_state_name(state)); abort_protocol(); } - logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={}," - " server_cookie={}, in_seq={}, out_seq={}, out_q={}", + logger().info("{} established: gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}", conn, global_seq, peer_global_seq, connect_seq, - client_cookie, server_cookie, conn.in_seq, - conn.out_seq, conn.out_q.size()); + client_cookie, server_cookie, + io_stat_printer{*this}); execute_ready(false); }).handle_exception([this] (std::exception_ptr eptr) { if (state != state_t::ESTABLISHING) { @@ -1639,7 +1636,7 @@ ProtocolV2::send_server_ident() // this is required for the case when this connection is being replaced requeue_up_to(0); - conn.in_seq = 0; + reset_read(); if (!conn.policy.lossy) { server_cookie = ceph::util::generate_random_number(1, -1ll); @@ -1739,8 +1736,8 @@ void ProtocolV2::trigger_replacing(bool reconnect, connect_seq = new_connect_seq; // send_reconnect_ok() logic requeue_up_to(new_msg_seq); - auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq); - logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq); + auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq()); + logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq()); return write_frame(reconnect_ok); } else { client_cookie = new_client_cookie; @@ -1761,12 +1758,12 @@ void ProtocolV2::trigger_replacing(bool reconnect, conn, get_state_name(state)); abort_protocol(); } - logger().info("{} replaced ({}):" - " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={}," - " in_seq={}, out_seq={}, out_q={}", + logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, " + "client_cookie={}, server_cookie={}, {}", conn, reconnect ? "reconnected" : "connected", - global_seq, peer_global_seq, connect_seq, client_cookie, - server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size()); + global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, + io_stat_printer{*this}); execute_ready(false); }).handle_exception([this] (std::exception_ptr eptr) { if (state != state_t::REPLACING) { @@ -1804,7 +1801,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( } if (require_ack && num_msgs == 0u) { - auto ack_frame = AckFrame::Encode(conn.in_seq); + auto ack_frame = AckFrame::Encode(get_in_seq()); bl.append(ack_frame.get_buffer(tx_frame_asm)); INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); } @@ -1817,7 +1814,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( msg->encode(conn.features, 0); ceph_assert(!msg->get_seq() && "message already has seq"); - msg->set_seq(++conn.out_seq); + msg->set_seq(increment_out()); ceph_msg_header &header = msg->get_header(); ceph_msg_footer &footer = msg->get_footer(); @@ -1826,7 +1823,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( header.type, header.priority, header.version, ceph_le32(0), header.data_off, - ceph_le64(conn.in_seq), + ceph_le64(get_in_seq()), footer.flags, header.compat_version, header.reserved}; @@ -1897,7 +1894,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // client side queueing because messages can't be renumbered, but the (kernel) // client will occasionally pull a message out of the sent queue to send // elsewhere. in that case it doesn't matter if we "got" it or not. - uint64_t cur_seq = conn.in_seq; + uint64_t cur_seq = get_in_seq(); if (message->get_seq() <= cur_seq) { logger().error("{} got old message {} <= {} {}, discarding", conn, message->get_seq(), cur_seq, *message); @@ -1915,7 +1912,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) } // note last received message. - conn.in_seq = message->get_seq(); + set_in_seq(message->get_seq()); logger().debug("{} <== #{} === {} ({})", conn, message->get_seq(), *message, message->get_type()); notify_ack(); @@ -1990,16 +1987,17 @@ void ProtocolV2::execute_ready(bool dispatch_connect) logger().debug("{} GOT KeepAliveFrame: timestamp={}", conn, keepalive_frame.timestamp()); notify_keepalive_ack(keepalive_frame.timestamp()); - conn.set_last_keepalive(seastar::lowres_system_clock::now()); + set_last_keepalive(seastar::lowres_system_clock::now()); }); case Tag::KEEPALIVE2_ACK: return read_frame_payload().then([this] { // handle_keepalive2_ack() logic auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back()); - conn.set_last_keepalive_ack( - seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}); + auto _last_keepalive_ack = + seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}; + set_last_keepalive_ack(_last_keepalive_ack); logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", - conn, conn.last_keepalive_ack); + conn, _last_keepalive_ack); }); default: { unexpected_tag(tag, conn, "execute_ready"); @@ -2122,7 +2120,7 @@ void ProtocolV2::on_closed() conn.shared_from_this())); } -void ProtocolV2::print(std::ostream& out) const +void ProtocolV2::print_conn(std::ostream& out) const { out << conn; } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index d86d4b5721bc..b580110834f9 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -20,7 +20,8 @@ class ProtocolV2 final : public Protocol { SocketConnection& conn, SocketMessenger& messenger); ~ProtocolV2() override; - void print(std::ostream&) const final; + void print_conn(std::ostream&) const final; + private: void on_closed() override; bool is_connected() const override; @@ -144,7 +145,7 @@ class ProtocolV2 final : public Protocol { private: void fault(bool backoff, const char* func_name, std::exception_ptr eptr); - void reset_session(bool full); + void reset_session(bool is_full); seastar::future> banner_exchange(bool is_connect); diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 5ba2ea5c5666..e5419125ae97 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -84,29 +84,27 @@ seastar::future<> SocketConnection::keepalive() }); } -void SocketConnection::mark_down() +SocketConnection::clock_t::time_point +SocketConnection::get_last_keepalive() const { - assert(seastar::this_shard_id() == shard_id()); - protocol->close(false); + return protocol->get_last_keepalive(); } -bool SocketConnection::update_rx_seq(seq_num_t seq) +SocketConnection::clock_t::time_point +SocketConnection::get_last_keepalive_ack() const { - if (seq <= in_seq) { - if (HAVE_FEATURE(features, RECONNECT_SEQ) && - local_conf()->ms_die_on_old_message) { - ceph_abort_msg("old msgs despite reconnect_seq feature"); - } - return false; - } else if (seq > in_seq + 1) { - if (local_conf()->ms_die_on_skipped_message) { - ceph_abort_msg("skipped incoming seq"); - } - return false; - } else { - in_seq = seq; - return true; - } + return protocol->get_last_keepalive_ack(); +} + +void SocketConnection::set_last_keepalive_ack(clock_t::time_point when) +{ + protocol->set_last_keepalive_ack(when); +} + +void SocketConnection::mark_down() +{ + assert(seastar::this_shard_id() == shard_id()); + protocol->close(false); } void diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 5a698919f28f..5e928de79db5 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -47,24 +47,10 @@ class SocketConnection : public Connection { // or should reconnect to (as peer) entity_addr_t target_addr; - clock_t::time_point last_keepalive; - - clock_t::time_point last_keepalive_ack; - uint64_t features = 0; ceph::net::Policy policy; - /// the seq num of the last transmitted message - seq_num_t out_seq = 0; - /// the seq num of the last received message - seq_num_t in_seq = 0; - - // messages to be resent after connection gets reset - std::deque out_q; - // messages sent, but not yet acked by peer - std::deque sent; - uint64_t peer_global_id = 0; std::unique_ptr user_private; @@ -98,17 +84,11 @@ class SocketConnection : public Connection { seastar::future<> keepalive() override; - clock_t::time_point get_last_keepalive() const override { - return last_keepalive; - } + clock_t::time_point get_last_keepalive() const override; - clock_t::time_point get_last_keepalive_ack() const override { - return last_keepalive_ack; - } + clock_t::time_point get_last_keepalive_ack() const override; - void set_last_keepalive_ack(clock_t::time_point when) override { - last_keepalive_ack = when; - } + void set_last_keepalive_ack(clock_t::time_point when) override; void mark_down() override; @@ -151,11 +131,6 @@ class SocketConnection : public Connection { private: seastar::shard_id shard_id() const; - /// update the seq num of last received message - /// @returns true if the @c seq is valid, and @c in_seq is updated, - /// false otherwise. - bool update_rx_seq(seq_num_t seq); - void set_peer_type(entity_type_t peer_type) { // it is not allowed to assign an unknown value when the current // value is known @@ -187,10 +162,6 @@ private: set_peer_id(name.num()); } - void set_last_keepalive(clock_t::time_point when) { - last_keepalive = when; - } - void set_features(uint64_t f) { features = f; }