From b7c7dc0b26fe8e5b24037d10f87c943b7b3f4c9c Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 9 Aug 2019 17:26:09 +0800 Subject: [PATCH] crimson/net: pending_q to store the pending(sending) messages We cannot left the pending messages in the out_q, because with lossless policy, they can be partially sent and even acknowledged. Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 15 ++++----------- src/crimson/net/SocketConnection.h | 1 + 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index bb0657f5047..81d79926410 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -106,23 +106,16 @@ seastar::future Protocol::do_write_dispatch_sweep() assert(!open_write); open_write = true; - MessageRef front_msg; - if (likely(num_msgs)) { - front_msg = conn.out_q.front(); - } + conn.pending_q.clear(); + conn.pending_q.swap(conn.out_q); // sweep all pending writes with the concrete Protocol return socket->write(do_sweep_messages( - conn.out_q, num_msgs, need_keepalive, keepalive_ack)) - .then([this, front_msg, num_msgs, prv_keepalive_ack=keepalive_ack] { + conn.pending_q, num_msgs, need_keepalive, keepalive_ack)) + .then([this, prv_keepalive_ack=keepalive_ack] { need_keepalive = false; if (keepalive_ack == prv_keepalive_ack) { keepalive_ack = std::nullopt; } - if (likely(num_msgs && front_msg == conn.out_q.front())) { - // we have sent some messages successfully - // and the out_q was not reset during socket write - conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs); - } if (!is_queued()) { // good, we have nothing pending to send now. return socket->flush().then([this] { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index fc49cfee6ec..369f042ed99 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -63,6 +63,7 @@ class SocketConnection : public Connection { // messages to be resent after connection gets reset std::deque out_q; + std::deque pending_q; // messages sent, but not yet acked by peer std::deque sent; -- 2.39.5