From 6cacf1f7b25718b8d985d014eadcbcf3ce47aaec Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 8 Aug 2019 15:56:17 +0800 Subject: [PATCH] crimson/net: maintain the sent queue for lossless policy Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 59 +++++++++++++++++++++++++++++ src/crimson/net/Protocol.h | 6 +++ src/crimson/net/ProtocolV1.cc | 12 +++--- src/crimson/net/ProtocolV2.cc | 18 +++------ src/crimson/net/ProtocolV2.h | 1 - src/crimson/net/SocketConnection.cc | 10 ----- src/crimson/net/SocketConnection.h | 11 ------ 7 files changed, 76 insertions(+), 41 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 4af213471c6..b8dd99d688e 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -96,6 +96,48 @@ void Protocol::notify_keepalive_ack(utime_t _keepalive_ack) write_event(); } +void Protocol::requeue_sent() +{ + assert(write_state != write_state_t::open); + if (conn.sent.empty()) { + return; + } + + conn.out_seq -= conn.sent.size(); + logger().debug("{} requeue {} items, revert out_seq to {}", + conn, conn.sent.size(), conn.out_seq); + for (MessageRef& msg : conn.sent) { + msg->clear_payload(); + msg->set_seq(0); + } + conn.out_q.insert(conn.out_q.begin(), + std::make_move_iterator(conn.sent.begin()), + std::make_move_iterator(conn.sent.end())); + conn.sent.clear(); +} + +void Protocol::requeue_up_to(seq_num_t seq) +{ + assert(write_state != write_state_t::open); + if (conn.sent.empty() && conn.out_q.empty()) { + logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", + conn, conn.out_seq, seq); + conn.out_seq = seq; + return; + } + logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})", + conn, seq, conn.sent.size(), conn.out_seq); + while (!conn.sent.empty()) { + auto cur_seq = conn.sent.front()->get_seq(); + if (cur_seq == 0 || cur_seq > seq) { + break; + } else { + conn.sent.pop_front(); + } + } + requeue_sent(); +} + void Protocol::reset_write() { assert(write_state != write_state_t::open); @@ -106,6 +148,18 @@ void Protocol::reset_write() keepalive_ack = std::nullopt; } +void Protocol::ack_writes(seq_num_t seq) +{ + if (conn.policy.lossy) { // lossy connections don't keep sent messages + return; + } + while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) { + logger().trace("{} got ack seq {} >= {}, pop {}", + conn, seq, conn.sent.front()->get_seq(), conn.sent.front()); + conn.sent.pop_front(); + } +} + seastar::future Protocol::do_write_dispatch_sweep() { switch (write_state) { @@ -118,6 +172,11 @@ seastar::future Protocol::do_write_dispatch_sweep() conn.pending_q.clear(); conn.pending_q.swap(conn.out_q); + if (!conn.policy.lossy) { + conn.sent.insert(conn.sent.end(), + conn.pending_q.begin(), + conn.pending_q.end()); + } // sweep all pending writes with the concrete Protocol return socket->write(do_sweep_messages( conn.pending_q, num_msgs, need_keepalive, keepalive_ack)) diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index a41d1f20156..5f8a8d4b89b 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -102,6 +102,10 @@ class Protocol { void notify_keepalive_ack(utime_t keepalive_ack); + void requeue_up_to(seq_num_t seq); + + void requeue_sent(); + void reset_write(); bool is_queued() const { @@ -110,6 +114,8 @@ class Protocol { keepalive_ack.has_value()); } + void ack_writes(seq_num_t seq); + private: write_state_t write_state = write_state_t::none; // wait until current state changed diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 55c8d0f7978..2de312faee5 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -459,12 +459,12 @@ seastar::future ProtocolV1::replace_existing( reply_tag = CEPH_MSGR_TAG_READY; } if (!existing->is_lossy()) { - // reset the in_seq if this is a hard reset from peer, - // otherwise we respect our original connection's value - conn.in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num(); - // steal outgoing queue and out_seq - existing->requeue_sent(); - std::tie(conn.out_seq, conn.out_q) = existing->get_out_queue(); + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); } seastar::do_with( std::move(existing), diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 4b3506f3ba2..7dc055a3dfd 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -693,6 +693,7 @@ ProtocolV2::client_connect() case Tag::SERVER_IDENT: return read_frame_payload().then([this] { // handle_server_ident() logic + requeue_sent(); auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back()); logger().debug("{} GOT ServerIdentFrame:" " addrs={}, gid={}, gs={}," @@ -815,8 +816,8 @@ ProtocolV2::client_reconnect() auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back()); logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", conn, reconnect_ok.msg_seq()); + requeue_up_to(reconnect_ok.msg_seq()); // TODO - // discard_requeued_up_to() // backoff = utime_t(); return dispatcher.ms_handle_connect( seastar::static_pointer_cast( @@ -1476,8 +1477,7 @@ ProtocolV2::send_server_ident() logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); // this is required for the case when this connection is being replaced - // TODO - // out_seq = discard_requeued_up_to(out_seq, 0); + requeue_up_to(0); conn.in_seq = 0; if (!conn.policy.lossy) { @@ -1587,14 +1587,6 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( return bl; } -void ProtocolV2::handle_message_ack(seq_num_t seq) { - if (conn.policy.lossy) { // lossy connections don't keep sent messages - return; - } - - // TODO: lossless policy -} - seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) { return read_frame_payload() @@ -1672,7 +1664,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) if (!conn.policy.lossy) { // ++ack_left; } - handle_message_ack(current_header.ack_seq); + ack_writes(current_header.ack_seq); // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; @@ -1728,7 +1720,7 @@ void ProtocolV2::execute_ready() // handle_message_ack() logic auto ack = AckFrame::Decode(rx_segments_data.back()); logger().debug("{} GOT AckFrame: seq={}", ack.seq()); - handle_message_ack(ack.seq()); + ack_writes(ack.seq()); }); case Tag::KEEPALIVE2: return read_frame_payload().then([this] { diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 79907ac93f2..5c48fc71927 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -149,7 +149,6 @@ class ProtocolV2 final : public Protocol { // READY seastar::future<> read_message(utime_t throttle_stamp); - void handle_message_ack(seq_num_t seq); void execute_ready(); // STANDBY diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index da5423db1ed..bb66a3e524a 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -71,16 +71,6 @@ seastar::future<> SocketConnection::close() }); } -void SocketConnection::requeue_sent() -{ - out_seq -= sent.size(); - while (!sent.empty()) { - auto m = sent.front(); - sent.pop_front(); - out_q.push_back(std::move(m)); - } -} - bool SocketConnection::update_rx_seq(seq_num_t seq) { if (seq <= in_seq) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 369f042ed99..a8c4942ed0e 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -101,10 +101,6 @@ class SocketConnection : public Connection { void start_accept(SocketFRef&& socket, const entity_addr_t& peer_addr); - seq_num_t rx_seq_num() const { - return in_seq; - } - bool is_server_side() const { return policy.server; } @@ -113,13 +109,6 @@ class SocketConnection : public Connection { return policy.lossy; } - /// move all messages in the sent list back into the queue - void requeue_sent(); - - std::tuple> get_out_queue() { - return {out_seq, std::move(out_q)}; - } - friend class Protocol; friend class ProtocolV1; friend class ProtocolV2; -- 2.39.5