crimson/net: gather message buffers when send
authorYingxin Cheng <yingxincheng@gmail.com>
Thu, 25 Apr 2019 19:50:00 +0000 (03:50 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Mon, 29 Apr 2019 07:58:03 +0000 (15:58 +0800)
Gather buffers from pending messages/keepalive and send them together.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV1.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 1248bbc9335395666bc0a356f5b3d643b68b6b0b..3e8538fa06e1df99b1a53341d1560442dbe6f5fb 100644 (file)
@@ -72,7 +72,7 @@ seastar::future<> Protocol::close()
 seastar::future<> Protocol::send(MessageRef msg)
 {
   if (write_state != write_state_t::drop) {
-    conn.out_q.push(std::move(msg));
+    conn.out_q.push_back(std::move(msg));
     write_event();
   }
   return seastar::now();
@@ -98,46 +98,49 @@ void Protocol::notify_keepalive_ack()
 seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
 {
   switch (write_state) {
-   case write_state_t::open:
-    return seastar::futurize_apply([this] {
-      if (need_keepalive) {
-        return do_keepalive()
-        .then([this] { need_keepalive = false; });
-      }
-      return seastar::now();
-    }).then([this] {
-      if (need_keepalive_ack) {
-        return do_keepalive_ack()
-        .then([this] { need_keepalive_ack = false; });
+   case write_state_t::open: {
+    size_t num_msgs = conn.out_q.size();
+    // we must have something to write...
+    ceph_assert(num_msgs || need_keepalive || need_keepalive_ack);
+    Message* msg_ptr = nullptr;
+    if (likely(num_msgs)) {
+      msg_ptr = conn.out_q.front().get();
+    }
+    // sweep all pending writes with the concrete Protocol
+    return socket->write(do_sweep_messages(
+        conn.out_q, num_msgs, need_keepalive, need_keepalive_ack))
+    .then([this, msg_ptr, num_msgs] {
+      need_keepalive = false;
+      need_keepalive_ack = false;
+      if (likely(num_msgs && msg_ptr == conn.out_q.front().get())) {
+        // we have sent some messages successfully
+        // and the out_q was not reset during socket write
+        conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs);
       }
-      return seastar::now();
-    }).then([this] {
-      if (!conn.out_q.empty()){
-        MessageRef msg = conn.out_q.front();
-        return write_message(msg)
-        .then([this, msg] {
-          if (msg == conn.out_q.front()) {
-            conn.out_q.pop();
-          }
-          return stop_t::no;
-        });
-      } else {
+      if (conn.out_q.empty()) {
+        // good, we have nothing pending to send now.
         return socket->flush().then([this] {
-          if (!conn.out_q.empty()) {
-            return stop_t::no;
-          } else {
-            // the dispatching can only stop when out_q is empty
+          if (conn.out_q.empty() && !need_keepalive && !need_keepalive_ack) {
+            // still nothing pending to send after flush,
+            // the dispatching can ONLY stop now
             ceph_assert(write_dispatching);
             write_dispatching = false;
-            return stop_t::yes;
+            return seastar::make_ready_future<stop_t>(stop_t::yes);
+          } else {
+            // something is pending to send during flushing
+            return seastar::make_ready_future<stop_t>(stop_t::no);
           }
         });
+      } else {
+        // messages were enqueued during socket write
+        return seastar::make_ready_future<stop_t>(stop_t::no);
       }
     }).handle_exception([this] (std::exception_ptr eptr) {
       logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr);
       close();
-      return stop_t::no;
+      return seastar::make_ready_future<stop_t>(stop_t::no);
     });
+   }
    case write_state_t::delay: {
     // delay dispatching writes until open
     return state_changed.get_shared_future()
index 73c43b6f695bbd7780b65ad218b6e03f086137b0..78ffcb6f0df9126d4dfba0605a67c836eee92a78 100644 (file)
@@ -44,12 +44,11 @@ class Protocol {
 
   virtual void trigger_close() = 0;
 
-  // encode/write a message
-  virtual seastar::future<> write_message(MessageRef msg) = 0;
-
-  virtual seastar::future<> do_keepalive() = 0;
-
-  virtual seastar::future<> do_keepalive_ack() = 0;
+  virtual ceph::bufferlist do_sweep_messages(
+      const std::deque<MessageRef>& msgs,
+      size_t num_msgs,
+      bool require_keepalive,
+      bool require_keepalive_ack) = 0;
 
  public:
   const proto_t proto_type;
index 408f7f0d0fecce440c6eccad8513348639686016..5225de1f0fa66bf2ba8cbdc4d8bada0e61a26d31 100644 (file)
@@ -129,12 +129,12 @@ uint32_t get_proto_version(entity_type_t peer_type, bool connect)
   }
 }
 
-void discard_up_to(std::queue<MessageRef>* queue,
+void discard_up_to(std::deque<MessageRef>* queue,
                    ceph::net::seq_num_t seq)
 {
   while (!queue->empty() &&
          queue->front()->get_seq() < seq) {
-    queue->pop();
+    queue->pop_front();
   }
 }
 
@@ -651,62 +651,75 @@ void ProtocolV1::start_accept(SocketFRef&& sock,
 
 // open state
 
-seastar::future<> ProtocolV1::write_message(MessageRef msg)
+ceph::bufferlist ProtocolV1::do_sweep_messages(
+    const std::deque<MessageRef>& msgs,
+    size_t num_msgs,
+    bool require_keepalive,
+    bool require_keepalive_ack)
 {
-  msg->set_seq(++conn.out_seq);
-  auto& header = msg->get_header();
-  header.src = messenger.get_myname();
-  msg->encode(conn.features, messenger.get_crc_flags());
-  if (session_security) {
-    session_security->sign_message(msg.get());
-  }
-  bufferlist bl;
-  bl.append(CEPH_MSGR_TAG_MSG);
-  bl.append((const char*)&header, sizeof(header));
-  bl.append(msg->get_payload());
-  bl.append(msg->get_middle());
-  bl.append(msg->get_data());
-  auto& footer = msg->get_footer();
-  if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
-    bl.append((const char*)&footer, sizeof(footer));
-  } else {
-    ceph_msg_footer_old old_footer;
-    if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
-      old_footer.front_crc = footer.front_crc;
-      old_footer.middle_crc = footer.middle_crc;
-    } else {
-      old_footer.front_crc = old_footer.middle_crc = 0;
-    }
-    if (messenger.get_crc_flags() & MSG_CRC_DATA) {
-      old_footer.data_crc = footer.data_crc;
+  static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
+                                         sizeof(ceph_msg_header) +
+                                         sizeof(ceph_msg_footer);
+  static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) +
+                                             sizeof(ceph_msg_header) +
+                                             sizeof(ceph_msg_footer_old);
+
+  ceph::bufferlist bl;
+  if (likely(num_msgs)) {
+    if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+      bl.reserve(num_msgs * RESERVE_MSG_SIZE);
     } else {
-      old_footer.data_crc = 0;
+      bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD);
     }
-    old_footer.flags = footer.flags;
-    bl.append((const char*)&old_footer, sizeof(old_footer));
   }
-  // write as a seastar::net::packet
-  return socket->write(std::move(bl));
-  // TODO: lossless policy
-  //  .then([this, msg = std::move(msg)] {
-  //    if (!policy.lossy) {
-  //      sent.push(std::move(msg));
-  //    }
-  //  });
-}
 
-seastar::future<> ProtocolV1::do_keepalive()
-{
-  k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
-    ceph::coarse_real_clock::now());
-  logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec);
-  return socket->write(make_static_packet(k.req));
-}
+  if (unlikely(require_keepalive)) {
+    k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+      ceph::coarse_real_clock::now());
+    logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec);
+    bl.append(create_static(k.req));
+  }
 
-seastar::future<> ProtocolV1::do_keepalive_ack()
-{
-  logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec);
-  return socket->write(make_static_packet(k.ack));
+  if (unlikely(require_keepalive_ack)) {
+    logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec);
+    bl.append(create_static(k.ack));
+  }
+
+  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+    msg->set_seq(++conn.out_seq);
+    auto& header = msg->get_header();
+    header.src = messenger.get_myname();
+    msg->encode(conn.features, messenger.get_crc_flags());
+    if (session_security) {
+      session_security->sign_message(msg.get());
+    }
+    bl.append(CEPH_MSGR_TAG_MSG);
+    bl.append((const char*)&header, sizeof(header));
+    bl.append(msg->get_payload());
+    bl.append(msg->get_middle());
+    bl.append(msg->get_data());
+    auto& footer = msg->get_footer();
+    if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+      bl.append((const char*)&footer, sizeof(footer));
+    } else {
+      ceph_msg_footer_old old_footer;
+      if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
+        old_footer.front_crc = footer.front_crc;
+        old_footer.middle_crc = footer.middle_crc;
+      } else {
+        old_footer.front_crc = old_footer.middle_crc = 0;
+      }
+      if (messenger.get_crc_flags() & MSG_CRC_DATA) {
+        old_footer.data_crc = footer.data_crc;
+      } else {
+        old_footer.data_crc = 0;
+      }
+      old_footer.flags = footer.flags;
+      bl.append((const char*)&old_footer, sizeof(old_footer));
+    }
+  });
+
+  return bl;
 }
 
 seastar::future<> ProtocolV1::handle_keepalive2_ack()
index ac65c8f9957510884900e2716cae842d5871034b..5f6a75da781c21fe878f67cded6a41bcaca8454e 100644 (file)
@@ -26,11 +26,11 @@ class ProtocolV1 final : public Protocol {
 
   void trigger_close() override;
 
-  seastar::future<> write_message(MessageRef msg) override;
-
-  seastar::future<> do_keepalive() override;
-
-  seastar::future<> do_keepalive_ack() override;
+  ceph::bufferlist do_sweep_messages(
+      const std::deque<MessageRef>& msgs,
+      size_t num_msgs,
+      bool require_keepalive,
+      bool require_keepalive_ack) override;
 
  private:
   SocketMessenger &messenger;
index e127a8a23c79aa763abc9ab8decf4139f255ab40..877a07b6471f74f04784b7c9a9a2cabfae4d0063 100644 (file)
@@ -1342,46 +1342,54 @@ seastar::future<> ProtocolV2::send_reconnect_ok()
 
 // READY state
 
-seastar::future<> ProtocolV2::write_message(MessageRef msg)
+ceph::bufferlist ProtocolV2::do_sweep_messages(
+    const std::deque<MessageRef>& msgs,
+    size_t num_msgs,
+    bool require_keepalive,
+    bool require_keepalive_ack)
 {
-  // TODO: move to common code
-  // set priority
-  msg->get_header().src = messenger.get_myname();
-
-  msg->encode(conn.features, 0);
-
-  msg->set_seq(++conn.out_seq);
-  uint64_t ack_seq = conn.in_seq;
-  // ack_left = 0;
-
-  ceph_msg_header &header = msg->get_header();
-  ceph_msg_footer &footer = msg->get_footer();
-
-  ceph_msg_header2 header2{header.seq,        header.tid,
-                           header.type,       header.priority,
-                           header.version,
-                           0,                 header.data_off,
-                           ack_seq,
-                           footer.flags,      header.compat_version,
-                           header.reserved};
-
-  auto message = MessageFrame::Encode(header2,
-      msg->get_payload(), msg->get_middle(), msg->get_data());
-  logger().debug("{} write msg type={} off={} seq={}",
-                 conn, header2.type, header2.data_off, header2.seq);
-  return write_frame(message, false);
-}
+  ceph::bufferlist bl;
 
-seastar::future<> ProtocolV2::do_keepalive()
-{
-  auto keepalive_frame = KeepAliveFrame::Encode();
-  return write_frame(keepalive_frame, false);
-}
+  if (unlikely(require_keepalive)) {
+    auto keepalive_frame = KeepAliveFrame::Encode();
+    bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+  }
 
-seastar::future<> ProtocolV2::do_keepalive_ack()
-{
-  auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
-  return write_frame(keepalive_ack_frame, false);
+  if (unlikely(require_keepalive_ack)) {
+    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
+    bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+  }
+
+  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+    // TODO: move to common code
+    // set priority
+    msg->get_header().src = messenger.get_myname();
+
+    msg->encode(conn.features, 0);
+
+    msg->set_seq(++conn.out_seq);
+    uint64_t ack_seq = conn.in_seq;
+    // ack_left = 0;
+
+    ceph_msg_header &header = msg->get_header();
+    ceph_msg_footer &footer = msg->get_footer();
+
+    ceph_msg_header2 header2{header.seq,        header.tid,
+                             header.type,       header.priority,
+                             header.version,
+                             0,                 header.data_off,
+                             ack_seq,
+                             footer.flags,      header.compat_version,
+                             header.reserved};
+
+    auto message = MessageFrame::Encode(header2,
+        msg->get_payload(), msg->get_middle(), msg->get_data());
+    logger().debug("{} write msg type={} off={} seq={}",
+                   conn, header2.type, header2.data_off, header2.seq);
+    bl.append(message.get_buffer(session_stream_handlers));
+  });
+
+  return bl;
 }
 
 void ProtocolV2::handle_message_ack(seq_num_t seq) {
index e176dc343fafbb45c4cea28981121bfce5197569..eb1bc65005d6282b6302b42e569dc1b684c53ce3 100644 (file)
@@ -25,11 +25,11 @@ class ProtocolV2 final : public Protocol {
 
   void trigger_close() override;
 
-  seastar::future<> write_message(MessageRef msg) override;
-
-  seastar::future<> do_keepalive() override;
-
-  seastar::future<> do_keepalive_ack() override;
+  ceph::bufferlist do_sweep_messages(
+      const std::deque<MessageRef>& msgs,
+      size_t num_msgs,
+      bool require_keepalive,
+      bool require_keepalive_ack) override;
 
  private:
   SocketMessenger &messenger;
index dc35c66a05e83244a8b43285a0e8fcf75a0bb1a1..a8f5cac1cb3e4bf989a4eebd427639fe022a6ebb 100644 (file)
@@ -81,8 +81,8 @@ void SocketConnection::requeue_sent()
   out_seq -= sent.size();
   while (!sent.empty()) {
     auto m = sent.front();
-    sent.pop();
-    out_q.push(std::move(m));
+    sent.pop_front();
+    out_q.push_back(std::move(m));
   }
 }
 
index e24c2633e1f5eb2051e19f424f39d1c4fa4e4f9b..482cbd723d1fe18cb43905cb7678a8eaa62d4605 100644 (file)
@@ -58,9 +58,9 @@ class SocketConnection : public Connection {
   bool update_rx_seq(seq_num_t seq);
 
   // messages to be resent after connection gets reset
-  std::queue<MessageRef> out_q;
+  std::deque<MessageRef> out_q;
   // messages sent, but not yet acked by peer
-  std::queue<MessageRef> sent;
+  std::deque<MessageRef> sent;
 
   // which of the peer_addrs we're connecting to (as client)
   // or should reconnect to (as peer)
@@ -115,7 +115,7 @@ class SocketConnection : public Connection {
   /// move all messages in the sent list back into the queue
   void requeue_sent();
 
-  std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
+  std::tuple<seq_num_t, std::deque<MessageRef>> get_out_queue() {
     return {out_seq, std::move(out_q)};
   }