crimson/net: use optional keepalive_ack to notify writes
authorYingxin Cheng <yingxincheng@gmail.com>
Thu, 25 Apr 2019 21:50:19 +0000 (05:50 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Mon, 29 Apr 2019 07:58:03 +0000 (15:58 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV1.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 3e8538fa06e1df99b1a53341d1560442dbe6f5fb..3131f568324d8402661e6aae96b289b07fc9380b 100644 (file)
@@ -87,12 +87,11 @@ seastar::future<> Protocol::keepalive()
   return seastar::now();
 }
 
-void Protocol::notify_keepalive_ack()
+void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
 {
-  if (!need_keepalive_ack) {
-    need_keepalive_ack = true;
-    write_event();
-  }
+  logger().debug("{} got keepalive ack {}", conn, _keepalive_ack);
+  keepalive_ack = _keepalive_ack;
+  write_event();
 }
 
 seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
@@ -101,26 +100,28 @@ seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
    case write_state_t::open: {
     size_t num_msgs = conn.out_q.size();
     // we must have something to write...
-    ceph_assert(num_msgs || need_keepalive || need_keepalive_ack);
+    ceph_assert(num_msgs || need_keepalive || keepalive_ack.has_value());
     Message* msg_ptr = nullptr;
     if (likely(num_msgs)) {
       msg_ptr = conn.out_q.front().get();
     }
     // sweep all pending writes with the concrete Protocol
     return socket->write(do_sweep_messages(
-        conn.out_q, num_msgs, need_keepalive, need_keepalive_ack))
-    .then([this, msg_ptr, num_msgs] {
+        conn.out_q, num_msgs, need_keepalive, keepalive_ack))
+    .then([this, msg_ptr, num_msgs, prv_keepalive_ack=keepalive_ack] {
       need_keepalive = false;
-      need_keepalive_ack = false;
+      if (keepalive_ack == prv_keepalive_ack) {
+        keepalive_ack = std::nullopt;
+      }
       if (likely(num_msgs && msg_ptr == conn.out_q.front().get())) {
         // we have sent some messages successfully
         // and the out_q was not reset during socket write
         conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs);
       }
-      if (conn.out_q.empty()) {
+      if (conn.out_q.empty() && !keepalive_ack.has_value()) {
         // good, we have nothing pending to send now.
         return socket->flush().then([this] {
-          if (conn.out_q.empty() && !need_keepalive && !need_keepalive_ack) {
+          if (conn.out_q.empty() && !need_keepalive && !keepalive_ack.has_value()) {
             // still nothing pending to send after flush,
             // the dispatching can ONLY stop now
             ceph_assert(write_dispatching);
index 78ffcb6f0df9126d4dfba0605a67c836eee92a78..5da963753509d83764f576e288497e9c2cbb65e0 100644 (file)
@@ -48,7 +48,7 @@ class Protocol {
       const std::deque<MessageRef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
-      bool require_keepalive_ack) = 0;
+      std::optional<utime_t> keepalive_ack) = 0;
 
  public:
   const proto_t proto_type;
@@ -75,7 +75,7 @@ class Protocol {
     state_changed = seastar::shared_promise<>();
   }
 
-  void notify_keepalive_ack();
+  void notify_keepalive_ack(utime_t keepalive_ack);
 
  private:
   write_state_t write_state = write_state_t::none;
@@ -87,7 +87,7 @@ class Protocol {
   seastar::shared_future<> close_ready;
 
   bool need_keepalive = false;
-  bool need_keepalive_ack = false;
+  std::optional<utime_t> keepalive_ack = std::nullopt;
   bool write_dispatching = false;
   seastar::future<stop_t> do_write_dispatch_sweep();
   void write_event();
index 5225de1f0fa66bf2ba8cbdc4d8bada0e61a26d31..7b775d74b2c02742dd6c4d1a34139604bcb87666 100644 (file)
@@ -655,7 +655,7 @@ ceph::bufferlist ProtocolV1::do_sweep_messages(
     const std::deque<MessageRef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
-    bool require_keepalive_ack)
+    std::optional<utime_t> _keepalive_ack)
 {
   static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
                                          sizeof(ceph_msg_header) +
@@ -680,8 +680,9 @@ ceph::bufferlist ProtocolV1::do_sweep_messages(
     bl.append(create_static(k.req));
   }
 
-  if (unlikely(require_keepalive_ack)) {
-    logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec);
+  if (unlikely(_keepalive_ack.has_value())) {
+    logger().debug("{} write keepalive2 ack {}", conn, *_keepalive_ack);
+    k.ack.stamp = ceph_timespec(*_keepalive_ack);
     bl.append(create_static(k.ack));
   }
 
@@ -736,9 +737,8 @@ seastar::future<> ProtocolV1::handle_keepalive2()
 {
   return socket->read_exactly(sizeof(ceph_timespec))
     .then([this] (auto buf) {
-      k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
-      logger().debug("{} got keepalive2 {}", conn, k.ack.stamp.tv_sec);
-      notify_keepalive_ack();
+      utime_t ack{*reinterpret_cast<const ceph_timespec*>(buf.get())};
+      notify_keepalive_ack(ack);
     });
 }
 
index 5f6a75da781c21fe878f67cded6a41bcaca8454e..53539ca0f7f80312bc97cc9e385af388ff9907dd 100644 (file)
@@ -30,7 +30,7 @@ class ProtocolV1 final : public Protocol {
       const std::deque<MessageRef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
-      bool require_keepalive_ack) override;
+      std::optional<utime_t> keepalive_ack) override;
 
  private:
   SocketMessenger &messenger;
index 877a07b6471f74f04784b7c9a9a2cabfae4d0063..315936140bd45333724ec9f6c0000c129741fdd4 100644 (file)
@@ -1346,7 +1346,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     const std::deque<MessageRef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
-    bool require_keepalive_ack)
+    std::optional<utime_t> _keepalive_ack)
 {
   ceph::bufferlist bl;
 
@@ -1355,8 +1355,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     bl.append(keepalive_frame.get_buffer(session_stream_handlers));
   }
 
-  if (unlikely(require_keepalive_ack)) {
-    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
+  if (unlikely(_keepalive_ack.has_value())) {
+    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
     bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
   }
 
@@ -1540,11 +1540,8 @@ void ProtocolV2::execute_ready()
             return read_frame_payload().then([this] {
               // handle_keepalive2() logic
               auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
-              last_keepalive_ack_to_send = keepalive_frame.timestamp();
-              logger().debug("{} got KEEPALIVE2 {}",
-                             conn, last_keepalive_ack_to_send);
+              notify_keepalive_ack(keepalive_frame.timestamp());
               conn.set_last_keepalive(seastar::lowres_system_clock::now());
-              notify_keepalive_ack();
             });
           case Tag::KEEPALIVE2_ACK:
             return read_frame_payload().then([this] {
index eb1bc65005d6282b6302b42e569dc1b684c53ce3..b8681c2199ded11fec6eb70594a952f02d27c4f5 100644 (file)
@@ -29,7 +29,7 @@ class ProtocolV2 final : public Protocol {
       const std::deque<MessageRef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
-      bool require_keepalive_ack) override;
+      std::optional<utime_t> keepalive_ack) override;
 
  private:
   SocketMessenger &messenger;
@@ -72,8 +72,6 @@ class ProtocolV2 final : public Protocol {
   uint64_t peer_global_seq = 0;
   uint64_t connect_seq = 0;
 
-  utime_t last_keepalive_ack_to_send;
-
  // TODO: Frame related implementations, probably to a separate class.
  private:
   bool record_io = false;