]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: batch messages instead of chaining futures
authorYingxin Cheng <yingxincheng@gmail.com>
Tue, 29 Jan 2019 12:49:29 +0000 (20:49 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Fri, 22 Mar 2019 06:52:23 +0000 (14:52 +0800)
Instead of chaining writes with send_ready, connection will batch
messages in out_q, and will reap them by write_events() in the open
state.

The performance of pingpong is 3.7 times better from observation.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index a3045b1dedc564bf4ba165342153de000f541695..ff0c9c07b378d434f230c292ce1ea025ca0bff1a 100644 (file)
@@ -68,60 +68,73 @@ seastar::future<bool> SocketConnection::is_connected()
     });
 }
 
-//TODO(performance): batch messages in out_q instead of chaining individual write events
-//TODO: should discard all the pending messages when reset
-seastar::future<> SocketConnection::write_event(MessageRef msg)
+void SocketConnection::write_event()
 {
+  if (write_dispatching) {
+    // already dispatching
+    return;
+  }
+  write_dispatching = true;
   switch (write_state) {
    case write_state_t::open:
    case write_state_t::delay:
-    return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
-      seastar::shared_future<> f = send_ready.then([this, msg = std::move(msg)] {
-        return seastar::repeat([this, msg=std::move(msg)] {
-          switch (write_state) {
-           case write_state_t::open:
-            return seastar::futurize_apply([this] {
-                if (m_keepalive) {
-                  return do_keepalive()
-                    .then([this] { m_keepalive = false; });
-                }
-                return seastar::now();
-              }).then([this] {
-                if (m_keepalive_ack) {
-                  return do_keepalive_ack()
-                    .then([this] { m_keepalive_ack = false; });
-                }
-                return seastar::now();
-              }).then([this, msg] {
-                if (msg) {
-                  return write_message(msg);
+    seastar::with_gate(pending_dispatch, [this] {
+      return seastar::repeat([this] {
+        switch (write_state) {
+         case write_state_t::open:
+          return seastar::futurize_apply([this] {
+            if (m_keepalive) {
+              return do_keepalive()
+              .then([this] { m_keepalive = false; });
+            }
+            return seastar::now();
+          }).then([this] {
+            if (m_keepalive_ack) {
+              return do_keepalive_ack()
+              .then([this] { m_keepalive_ack = false; });
+            }
+            return seastar::now();
+          }).then([this] {
+            if (!out_q.empty()){
+              MessageRef msg = out_q.front();
+              return write_message(msg)
+              .then([this, msg] {
+                if (msg == out_q.front()) {
+                  out_q.pop();
                 }
-                return seastar::now();
-              }).then([this] {
-                return socket->flush();
-              }).then([] {
-                return stop_t::yes;
-              }).handle_exception([this] (std::exception_ptr eptr) {
-                logger().warn("{} write_event fault: {}", *this, eptr);
-                close();
                 return stop_t::no;
               });
-           case write_state_t::delay:
-            // delay all the writes until open
-            return state_changed.get_shared_future()
-              .then([] { return stop_t::no; });
-           case write_state_t::drop:
-            return seastar::make_ready_future<stop_t>(stop_t::yes);
-           default:
-            ceph_assert(false);
-          }
-        });
+            } else {
+              return socket->flush()
+              .then([this] {
+                if (!out_q.empty()) {
+                  return stop_t::no;
+                } else {
+                  write_dispatching = false;
+                  return stop_t::yes;
+                }
+              });
+            }
+          }).handle_exception([this] (std::exception_ptr eptr) {
+            logger().warn("{} write_event fault: {}", *this, eptr);
+            close();
+            return stop_t::no;
+          });
+         case write_state_t::delay:
+          // delay dispatching writes until open
+          return state_changed.get_shared_future()
+          .then([] { return stop_t::no; });
+         case write_state_t::drop:
+          write_dispatching = false;
+          return seastar::make_ready_future<stop_t>(stop_t::yes);
+         default:
+          ceph_assert(false);
+        }
       });
-      send_ready = f.get_future();
-      return f.get_future();
     });
+    return;
    case write_state_t::drop:
-    return seastar::now();
+    write_dispatching = false;
    default:
     ceph_assert(false);
   }
@@ -131,7 +144,10 @@ seastar::future<> SocketConnection::send(MessageRef msg)
 {
   logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg);
   return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
-    return write_event(msg);
+    if (write_state != write_state_t::drop) {
+      out_q.push(std::move(msg));
+      write_event();
+    }
   });
 }
 
index cc8ab2db49ca0fe6bd338614d28ffc835f149903..93794310c0c81f3dd00492ef6c9b2d6a427d97ce 100644 (file)
@@ -123,12 +123,8 @@ class SocketConnection : public Connection {
   seastar::future<> handle_tags();
   seastar::future<> handle_ack();
 
-  seastar::future<> write_event(MessageRef msg=nullptr);
-
-  /// becomes available when handshake completes, and when all previous messages
-  /// have been sent to the output stream. send() chains new messages as
-  /// continuations to this future to act as a queue
-  seastar::future<> send_ready = seastar::now();
+  bool write_dispatching = false;
+  void write_event();
 
   /// encode/write a message
   seastar::future<> write_message(MessageRef msg);