]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: maintain the sent queue for lossless policy
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 8 Aug 2019 07:56:17 +0000 (15:56 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 12 Aug 2019 09:17:43 +0000 (17:17 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 4af213471c6b811182f86e57b9c9a397d2e4a903..b8dd99d688e2794717740559d734617f6e343798 100644 (file)
@@ -96,6 +96,48 @@ void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
   write_event();
 }
 
+void Protocol::requeue_sent()
+{
+  assert(write_state != write_state_t::open);
+  if (conn.sent.empty()) {
+    return;
+  }
+
+  conn.out_seq -= conn.sent.size();
+  logger().debug("{} requeue {} items, revert out_seq to {}",
+                 conn, conn.sent.size(), conn.out_seq);
+  for (MessageRef& msg : conn.sent) {
+    msg->clear_payload();
+    msg->set_seq(0);
+  }
+  conn.out_q.insert(conn.out_q.begin(),
+                    std::make_move_iterator(conn.sent.begin()),
+                    std::make_move_iterator(conn.sent.end()));
+  conn.sent.clear();
+}
+
+void Protocol::requeue_up_to(seq_num_t seq)
+{
+  assert(write_state != write_state_t::open);
+  if (conn.sent.empty() && conn.out_q.empty()) {
+    logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
+                   conn, conn.out_seq, seq);
+    conn.out_seq = seq;
+    return;
+  }
+  logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
+                 conn, seq, conn.sent.size(), conn.out_seq);
+  while (!conn.sent.empty()) {
+    auto cur_seq = conn.sent.front()->get_seq();
+    if (cur_seq == 0 || cur_seq > seq) {
+      break;
+    } else {
+      conn.sent.pop_front();
+    }
+  }
+  requeue_sent();
+}
+
 void Protocol::reset_write()
 {
   assert(write_state != write_state_t::open);
@@ -106,6 +148,18 @@ void Protocol::reset_write()
   keepalive_ack = std::nullopt;
 }
 
+void Protocol::ack_writes(seq_num_t seq)
+{
+  if (conn.policy.lossy) {  // lossy connections don't keep sent messages
+    return;
+  }
+  while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
+    logger().trace("{} got ack seq {} >= {}, pop {}",
+                   conn, seq, conn.sent.front()->get_seq(), conn.sent.front());
+    conn.sent.pop_front();
+  }
+}
+
 seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
 {
   switch (write_state) {
@@ -118,6 +172,11 @@ seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
 
     conn.pending_q.clear();
     conn.pending_q.swap(conn.out_q);
+    if (!conn.policy.lossy) {
+      conn.sent.insert(conn.sent.end(),
+                       conn.pending_q.begin(),
+                       conn.pending_q.end());
+    }
     // sweep all pending writes with the concrete Protocol
     return socket->write(do_sweep_messages(
         conn.pending_q, num_msgs, need_keepalive, keepalive_ack))
index a41d1f20156b5203e7652fa0e75378292168e345..5f8a8d4b89b3fa3cee654c26d37b9a47c8ae15dc 100644 (file)
@@ -102,6 +102,10 @@ class Protocol {
 
   void notify_keepalive_ack(utime_t keepalive_ack);
 
+  void requeue_up_to(seq_num_t seq);
+
+  void requeue_sent();
+
   void reset_write();
 
   bool is_queued() const {
@@ -110,6 +114,8 @@ class Protocol {
             keepalive_ack.has_value());
   }
 
+  void ack_writes(seq_num_t seq);
+
  private:
   write_state_t write_state = write_state_t::none;
   // wait until current state changed
index 55c8d0f7978d9746b0959acfa82ef7079b57e400..2de312faee5bef314e9fa7206afc82e1025896da 100644 (file)
@@ -459,12 +459,12 @@ seastar::future<stop_t> ProtocolV1::replace_existing(
     reply_tag = CEPH_MSGR_TAG_READY;
   }
   if (!existing->is_lossy()) {
-    // reset the in_seq if this is a hard reset from peer,
-    // otherwise we respect our original connection's value
-    conn.in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
-    // steal outgoing queue and out_seq
-    existing->requeue_sent();
-    std::tie(conn.out_seq, conn.out_q) = existing->get_out_queue();
+    // XXX: we decided not to support lossless connection in v1. as the
+    // client's default policy is
+    // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+    // lossy. And by the time
+    // will all be performed using v2 protocol.
+    ceph_abort("lossless policy not supported for v1");
   }
   seastar::do_with(
     std::move(existing),
index 4b3506f3ba248dc93d07584c956084e5c68f0849..7dc055a3dfdcb608033874d17bb536fcf16f88b3 100644 (file)
@@ -693,6 +693,7 @@ ProtocolV2::client_connect()
       case Tag::SERVER_IDENT:
         return read_frame_payload().then([this] {
           // handle_server_ident() logic
+          requeue_sent();
           auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
           logger().debug("{} GOT ServerIdentFrame:"
                          " addrs={}, gid={}, gs={},"
@@ -815,8 +816,8 @@ ProtocolV2::client_reconnect()
           auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
                          conn, reconnect_ok.msg_seq());
+          requeue_up_to(reconnect_ok.msg_seq());
           // TODO
-          // discard_requeued_up_to()
           // backoff = utime_t();
           return dispatcher.ms_handle_connect(
               seastar::static_pointer_cast<SocketConnection>(
@@ -1476,8 +1477,7 @@ ProtocolV2::send_server_ident()
     logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
 
     // this is required for the case when this connection is being replaced
-    // TODO
-    // out_seq = discard_requeued_up_to(out_seq, 0);
+    requeue_up_to(0);
     conn.in_seq = 0;
 
     if (!conn.policy.lossy) {
@@ -1587,14 +1587,6 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
   return bl;
 }
 
-void ProtocolV2::handle_message_ack(seq_num_t seq) {
-  if (conn.policy.lossy) {  // lossy connections don't keep sent messages
-    return;
-  }
-
-  // TODO: lossless policy
-}
-
 seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
 {
   return read_frame_payload()
@@ -1672,7 +1664,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     if (!conn.policy.lossy) {
       // ++ack_left;
     }
-    handle_message_ack(current_header.ack_seq);
+    ack_writes(current_header.ack_seq);
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
@@ -1728,7 +1720,7 @@ void ProtocolV2::execute_ready()
               // handle_message_ack() logic
               auto ack = AckFrame::Decode(rx_segments_data.back());
               logger().debug("{} GOT AckFrame: seq={}", ack.seq());
-              handle_message_ack(ack.seq());
+              ack_writes(ack.seq());
             });
           case Tag::KEEPALIVE2:
             return read_frame_payload().then([this] {
index 79907ac93f241762bf9bbac4ad1b272923f7e0f2..5c48fc71927850c60dc635396a5f8fc341ffef05 100644 (file)
@@ -149,7 +149,6 @@ class ProtocolV2 final : public Protocol {
 
   // READY
   seastar::future<> read_message(utime_t throttle_stamp);
-  void handle_message_ack(seq_num_t seq);
   void execute_ready();
 
   // STANDBY
index da5423db1edea5b364d06ff04ef4977fe9376752..bb66a3e524a1d05c142786f1dda3aa071138174e 100644 (file)
@@ -71,16 +71,6 @@ seastar::future<> SocketConnection::close()
     });
 }
 
-void SocketConnection::requeue_sent()
-{
-  out_seq -= sent.size();
-  while (!sent.empty()) {
-    auto m = sent.front();
-    sent.pop_front();
-    out_q.push_back(std::move(m));
-  }
-}
-
 bool SocketConnection::update_rx_seq(seq_num_t seq)
 {
   if (seq <= in_seq) {
index 369f042ed99b5edbee6821326b763683d6982f84..a8c4942ed0e626e996b9c5f7dbd6e24d47cda8fe 100644 (file)
@@ -101,10 +101,6 @@ class SocketConnection : public Connection {
   void start_accept(SocketFRef&& socket,
                     const entity_addr_t& peer_addr);
 
-  seq_num_t rx_seq_num() const {
-    return in_seq;
-  }
-
   bool is_server_side() const {
     return policy.server;
   }
@@ -113,13 +109,6 @@ class SocketConnection : public Connection {
     return policy.lossy;
   }
 
-  /// move all messages in the sent list back into the queue
-  void requeue_sent();
-
-  std::tuple<seq_num_t, std::deque<MessageRef>> get_out_queue() {
-    return {out_seq, std::move(out_q)};
-  }
-
   friend class Protocol;
   friend class ProtocolV1;
   friend class ProtocolV2;