]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: send AckFrame for lossless policy
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 8 Aug 2019 08:10:54 +0000 (16:10 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 12 Aug 2019 09:18:15 +0000 (17:18 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV1.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index b8dd99d688e2794717740559d734617f6e343798..2a70b9f4b3ecf686db596d43435968948976ef79 100644 (file)
@@ -96,6 +96,14 @@ void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
   write_event();
 }
 
+void Protocol::notify_ack()
+{
+  if (!conn.policy.lossy) {
+    ++ack_left;
+    write_event();
+  }
+}
+
 void Protocol::requeue_sent()
 {
   assert(write_state != write_state_t::open);
@@ -146,6 +154,7 @@ void Protocol::reset_write()
   conn.sent.clear();
   need_keepalive = false;
   keepalive_ack = std::nullopt;
+  ack_left = 0;
 }
 
 void Protocol::ack_writes(seq_num_t seq)
@@ -177,14 +186,18 @@ seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
                        conn.pending_q.begin(),
                        conn.pending_q.end());
     }
+    auto acked = ack_left;
+    assert(acked == 0 || conn.in_seq > 0);
     // sweep all pending writes with the concrete Protocol
     return socket->write(do_sweep_messages(
-        conn.pending_q, num_msgs, need_keepalive, keepalive_ack))
-    .then([this, prv_keepalive_ack=keepalive_ack] {
+        conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
+    ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
       need_keepalive = false;
       if (keepalive_ack == prv_keepalive_ack) {
         keepalive_ack = std::nullopt;
       }
+      assert(ack_left >= acked);
+      ack_left -= acked;
       if (!is_queued()) {
         // good, we have nothing pending to send now.
         return socket->flush().then([this] {
index 5f8a8d4b89b3fa3cee654c26d37b9a47c8ae15dc..c3d127083d82e0bd51b9aeb55011eb830b703686 100644 (file)
@@ -44,7 +44,8 @@ class Protocol {
       const std::deque<MessageRef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
-      std::optional<utime_t> keepalive_ack) = 0;
+      std::optional<utime_t> keepalive_ack,
+      bool require_ack) = 0;
 
  public:
   const proto_t proto_type;
@@ -102,6 +103,8 @@ class Protocol {
 
   void notify_keepalive_ack(utime_t keepalive_ack);
 
+  void notify_ack();
+
   void requeue_up_to(seq_num_t seq);
 
   void requeue_sent();
@@ -110,6 +113,7 @@ class Protocol {
 
   bool is_queued() const {
     return (!conn.out_q.empty() ||
+            ack_left > 0 ||
             need_keepalive ||
             keepalive_ack.has_value());
   }
@@ -123,6 +127,7 @@ class Protocol {
 
   bool need_keepalive = false;
   std::optional<utime_t> keepalive_ack = std::nullopt;
+  uint64_t ack_left = 0;
   bool write_dispatching = false;
   // Indicate if we are in the middle of writing.
   bool open_write = false;
index 2de312faee5bef314e9fa7206afc82e1025896da..9e2ba07c9b5ecfb9fe64e50bda85c7bdcfe34645 100644 (file)
@@ -680,7 +680,8 @@ ceph::bufferlist ProtocolV1::do_sweep_messages(
     const std::deque<MessageRef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
-    std::optional<utime_t> _keepalive_ack)
+    std::optional<utime_t> _keepalive_ack,
+    bool require_ack)
 {
   static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
                                          sizeof(ceph_msg_header) +
@@ -711,6 +712,15 @@ ceph::bufferlist ProtocolV1::do_sweep_messages(
     bl.append(create_static(k.ack));
   }
 
+  if (require_ack) {
+    // XXX: we decided not to support lossless connection in v1. as the
+    // client's default policy is
+    // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+    // lossy. And by the time of crimson-osd's GA, the in-cluster communication
+    // will all be performed using v2 protocol.
+    ceph_abort("lossless policy not supported for v1");
+  }
+
   std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
     ceph_assert(!msg->get_seq() && "message already has seq");
     msg->set_seq(++conn.out_seq);
index 53539ca0f7f80312bc97cc9e385af388ff9907dd..834016e5ec723fb36c6a4652edaea773700c9c4c 100644 (file)
@@ -30,7 +30,8 @@ class ProtocolV1 final : public Protocol {
       const std::deque<MessageRef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
-      std::optional<utime_t> keepalive_ack) override;
+      std::optional<utime_t> keepalive_ack,
+      bool require_ack) override;
 
  private:
   SocketMessenger &messenger;
index 7dc055a3dfdcb608033874d17bb536fcf16f88b3..76e0f150889445ad60e8e0e04f26f1e43fa8e545 100644 (file)
@@ -1540,7 +1540,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     const std::deque<MessageRef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
-    std::optional<utime_t> _keepalive_ack)
+    std::optional<utime_t> _keepalive_ack,
+    bool require_ack)
 {
   ceph::bufferlist bl;
 
@@ -1554,6 +1555,11 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
   }
 
+  if (require_ack && !num_msgs) {
+    auto ack_frame = AckFrame::Encode(conn.in_seq);
+    bl.append(ack_frame.get_buffer(session_stream_handlers));
+  }
+
   std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
     // TODO: move to common code
     // set priority
@@ -1563,8 +1569,6 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
 
     ceph_assert(!msg->get_seq() && "message already has seq");
     msg->set_seq(++conn.out_seq);
-    uint64_t ack_seq = conn.in_seq;
-    // ack_left = 0;
 
     ceph_msg_header &header = msg->get_header();
     ceph_msg_footer &footer = msg->get_footer();
@@ -1573,7 +1577,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
                              header.type,       header.priority,
                              header.version,
                              0,                 header.data_off,
-                             ack_seq,
+                             conn.in_seq,
                              footer.flags,      header.compat_version,
                              header.reserved};
 
@@ -1661,9 +1665,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     conn.in_seq = message->get_seq();
     logger().debug("{} <== #{} === {} ({})",
                   conn, message->get_seq(), *message, message->get_type());
-    if (!conn.policy.lossy) {
-      // ++ack_left;
-    }
+    notify_ack();
     ack_writes(current_header.ack_seq);
 
     // TODO: change MessageRef with seastar::shared_ptr
index 5c48fc71927850c60dc635396a5f8fc341ffef05..3bd31af724896f08b43343b0ad79655822434ee8 100644 (file)
@@ -29,7 +29,8 @@ class ProtocolV2 final : public Protocol {
       const std::deque<MessageRef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
-      std::optional<utime_t> keepalive_ack) override;
+      std::optional<utime_t> keepalive_ack,
+      bool require_ack) override;
 
  private:
   SocketMessenger &messenger;