]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: pending_q to store the pending(sending) messages
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 9 Aug 2019 09:26:09 +0000 (17:26 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 12 Aug 2019 09:02:52 +0000 (17:02 +0800)
We cannot left the pending messages in the out_q, because with lossless
policy, they can be partially sent and even acknowledged.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/SocketConnection.h

index bb0657f50475f21732dd9066cd483785a07f160e..81d7992641002dd482b2031a4adee60de1f57d48 100644 (file)
@@ -106,23 +106,16 @@ seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
     assert(!open_write);
     open_write = true;
 
-    MessageRef front_msg;
-    if (likely(num_msgs)) {
-      front_msg = conn.out_q.front();
-    }
+    conn.pending_q.clear();
+    conn.pending_q.swap(conn.out_q);
     // sweep all pending writes with the concrete Protocol
     return socket->write(do_sweep_messages(
-        conn.out_q, num_msgs, need_keepalive, keepalive_ack))
-    .then([this, front_msg, num_msgs, prv_keepalive_ack=keepalive_ack] {
+        conn.pending_q, num_msgs, need_keepalive, keepalive_ack))
+    .then([this, prv_keepalive_ack=keepalive_ack] {
       need_keepalive = false;
       if (keepalive_ack == prv_keepalive_ack) {
         keepalive_ack = std::nullopt;
       }
-      if (likely(num_msgs && front_msg == conn.out_q.front())) {
-        // we have sent some messages successfully
-        // and the out_q was not reset during socket write
-        conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs);
-      }
       if (!is_queued()) {
         // good, we have nothing pending to send now.
         return socket->flush().then([this] {
index fc49cfee6ecd2348b9b7ae71b64cf6796368b304..369f042ed99b5edbee6821326b763683d6982f84 100644 (file)
@@ -63,6 +63,7 @@ class SocketConnection : public Connection {
 
   // messages to be resent after connection gets reset
   std::deque<MessageRef> out_q;
+  std::deque<MessageRef> pending_q;
   // messages sent, but not yet acked by peer
   std::deque<MessageRef> sent;