]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: centralized write_event()
authorYingxin Cheng <yingxincheng@gmail.com>
Fri, 25 Jan 2019 08:36:27 +0000 (16:36 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Fri, 22 Mar 2019 06:52:23 +0000 (14:52 +0800)
* introduce write_state_t to decouple write behaviors from states.
* replace `h.promise` with `state_changed`, with a more general way to
  change write behaviors according to state switches.
* centralize write_event() to dispatch writes in the open state.
* friendly interface for v1/v2 protocol abstraction.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index a0268ef949f036396591c7342f2dcce47b262fe3..a3045b1dedc564bf4ba165342153de000f541695 100644 (file)
@@ -46,8 +46,7 @@ namespace {
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    Dispatcher& dispatcher)
   : messenger(messenger),
-    dispatcher(dispatcher),
-    send_ready(h.promise.get_future())
+    dispatcher(dispatcher)
 {
   ceph_assert(&messenger.container().local() == &messenger);
 }
@@ -65,39 +64,85 @@ SocketConnection::get_messenger() const {
 seastar::future<bool> SocketConnection::is_connected()
 {
   return seastar::smp::submit_to(shard_id(), [this] {
-      return !send_ready.failed();
+      return write_state == write_state_t::open;
     });
 }
 
+//TODO(performance): batch messages in out_q instead of chaining individual write events
+//TODO: should discard all the pending messages when reset
+seastar::future<> SocketConnection::write_event(MessageRef msg)
+{
+  switch (write_state) {
+   case write_state_t::open:
+   case write_state_t::delay:
+    return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
+      seastar::shared_future<> f = send_ready.then([this, msg = std::move(msg)] {
+        return seastar::repeat([this, msg=std::move(msg)] {
+          switch (write_state) {
+           case write_state_t::open:
+            return seastar::futurize_apply([this] {
+                if (m_keepalive) {
+                  return do_keepalive()
+                    .then([this] { m_keepalive = false; });
+                }
+                return seastar::now();
+              }).then([this] {
+                if (m_keepalive_ack) {
+                  return do_keepalive_ack()
+                    .then([this] { m_keepalive_ack = false; });
+                }
+                return seastar::now();
+              }).then([this, msg] {
+                if (msg) {
+                  return write_message(msg);
+                }
+                return seastar::now();
+              }).then([this] {
+                return socket->flush();
+              }).then([] {
+                return stop_t::yes;
+              }).handle_exception([this] (std::exception_ptr eptr) {
+                logger().warn("{} write_event fault: {}", *this, eptr);
+                close();
+                return stop_t::no;
+              });
+           case write_state_t::delay:
+            // delay all the writes until open
+            return state_changed.get_shared_future()
+              .then([] { return stop_t::no; });
+           case write_state_t::drop:
+            return seastar::make_ready_future<stop_t>(stop_t::yes);
+           default:
+            ceph_assert(false);
+          }
+        });
+      });
+      send_ready = f.get_future();
+      return f.get_future();
+    });
+   case write_state_t::drop:
+    return seastar::now();
+   default:
+    ceph_assert(false);
+  }
+}
+
 seastar::future<> SocketConnection::send(MessageRef msg)
 {
   logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg);
   return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
-      if (state == state_t::closing)
-        return seastar::now();
-      return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
-          return do_send(std::move(msg))
-            .handle_exception([this] (std::exception_ptr eptr) {
-              logger().warn("{} send fault: {}", *this, eptr);
-              close();
-            });
-        });
-    });
+    return write_event(msg);
+  });
 }
 
 seastar::future<> SocketConnection::keepalive()
 {
   return seastar::smp::submit_to(shard_id(), [this] {
-      if (state == state_t::closing)
-        return seastar::now();
-      return seastar::with_gate(pending_dispatch, [this] {
-          return do_keepalive()
-            .handle_exception([this] (std::exception_ptr eptr) {
-              logger().warn("{} keepalive fault: {}", *this, eptr);
-              close();
-            });
-        });
-    });
+    if (!m_keepalive) {
+      m_keepalive = true;
+      write_event();
+    }
+  });
 }
 
 seastar::future<> SocketConnection::close()
@@ -279,7 +324,7 @@ seastar::future<> SocketConnection::write_message(MessageRef msg)
     bl.append((const char*)&old_footer, sizeof(old_footer));
   }
   // write as a seastar::net::packet
-  return socket->write_flush(std::move(bl));
+  return socket->write(std::move(bl));
   // TODO: lossless policy
   //  .then([this, msg = std::move(msg)] {
   //    if (!policy.lossy) {
@@ -288,35 +333,18 @@ seastar::future<> SocketConnection::write_message(MessageRef msg)
   //  });
 }
 
-seastar::future<> SocketConnection::do_send(MessageRef msg)
+seastar::future<> SocketConnection::do_keepalive()
 {
-  // chain the message after the last message is sent
-  // TODO: retry send for lossless connection
-  seastar::shared_future<> f = send_ready.then(
-    [this, msg = std::move(msg)] {
-      if (state == state_t::closing)
-        return seastar::now();
-      return write_message(std::move(msg));
-    });
-
-  // chain any later messages after this one completes
-  send_ready = f.get_future();
-  // allow the caller to wait on the same future
-  return f.get_future();
+  k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+    ceph::coarse_real_clock::now());
+  logger().debug("{} write keepalive2 {}", *this, k.req.stamp.tv_sec);
+  return socket->write(make_static_packet(k.req));
 }
 
-seastar::future<> SocketConnection::do_keepalive()
+seastar::future<> SocketConnection::do_keepalive_ack()
 {
-  // TODO: retry keepalive for lossless connection
-  seastar::shared_future<> f = send_ready.then([this] {
-      if (state == state_t::closing)
-        return seastar::now();
-      k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
-        ceph::coarse_real_clock::now());
-      return socket->write_flush(make_static_packet(k.req));
-    });
-  send_ready = f.get_future();
-  return f.get_future();
+  logger().debug("{} write keepalive2 ack {}", *this, k.ack.stamp.tv_sec);
+  return socket->write(make_static_packet(k.ack));
 }
 
 seastar::future<> SocketConnection::do_close()
@@ -353,8 +381,13 @@ seastar::future<> SocketConnection::do_close()
     ceph_assert(state == state_t::connecting);
     close_ready = pending_dispatch.close().finally(std::move(cleanup));
   }
+
   logger().debug("{} trigger closing, was {}", *this, static_cast<int>(state));
   state = state_t::closing;
+  write_state = write_state_t::drop;
+  state_changed.set_value();
+  state_changed = seastar::shared_promise<>();
+
   return close_ready.get_future();
 }
 
@@ -575,12 +608,11 @@ SocketConnection::handle_keepalive2()
   return socket->read_exactly(sizeof(ceph_timespec))
     .then([this] (auto buf) {
       k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
-      seastar::shared_future<> f = send_ready.then([this] {
-          logger().debug("{} keepalive2 {}", *this, k.ack.stamp.tv_sec);
-          return socket->write_flush(make_static_packet(k.ack));
-        });
-      send_ready = f.get_future();
-      return f.get_future();
+      logger().debug("{} got keepalive2 {}", *this, k.ack.stamp.tv_sec);
+      if (!m_keepalive_ack) {
+        m_keepalive_ack = true;
+        write_event();
+      }
     });
 }
 
@@ -591,7 +623,7 @@ SocketConnection::handle_keepalive2_ack()
     .then([this] (auto buf) {
       auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
       k.ack_stamp = *t;
-      logger().debug("{} keepalive2 ack {}", *this, t->tv_sec);
+      logger().debug("{} got keepalive2 ack {}", *this, t->tv_sec);
     });
 }
 
@@ -799,12 +831,16 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
                                 const entity_type_t& _peer_type)
 {
   ceph_assert(state == state_t::none);
+  logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
+  state = state_t::connecting;
+  write_state = write_state_t::delay;
+  state_changed.set_value();
+  state_changed = seastar::shared_promise<>();
+
   ceph_assert(!socket);
   peer_addr = _peer_addr;
   peer_type = _peer_type;
   messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-  logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
-  state = state_t::connecting;
   seastar::with_gate(pending_dispatch, [this] {
       return seastar::connect(peer_addr.in4_addr())
         .then([this](seastar::connected_socket fd) {
@@ -847,7 +883,6 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the connecting state
           logger().warn("{} connecting fault: {}", *this, eptr);
-          h.promise.set_value();
           close();
         });
     });
@@ -858,6 +893,12 @@ SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& s
                                const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::none);
+  logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
+  state = state_t::accepting;
+  write_state = write_state_t::delay;
+  state_changed.set_value();
+  state_changed = seastar::shared_promise<>();
+
   ceph_assert(!socket);
   peer_addr.u = _peer_addr.u;
   peer_addr.set_port(0);
@@ -865,8 +906,6 @@ SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& s
   socket_port = _peer_addr.get_port();
   socket = std::move(sock);
   messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-  logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
-  state = state_t::accepting;
   seastar::with_gate(pending_dispatch, [this, _peer_addr] {
       // encode/send server's handshake header
       bufferlist bl;
@@ -899,7 +938,6 @@ SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& s
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the accepting state
           logger().warn("{} accepting fault: {}", *this, eptr);
-          h.promise.set_value();
           close();
         });
     });
@@ -910,8 +948,10 @@ SocketConnection::execute_open()
 {
   logger().debug("{} trigger open, was {}", *this, static_cast<int>(state));
   state = state_t::open;
-  // satisfy the handshake's promise
-  h.promise.set_value();
+  write_state = write_state_t::open;
+  state_changed.set_value();
+  state_changed = seastar::shared_promise<>();
+
   seastar::with_gate(pending_dispatch, [this] {
       // start background processing of tags
       return handle_tags()
index 9fd3bdf294362f9541d3ffc165c67a80707bef30..cc8ab2db49ca0fe6bd338614d28ffc835f149903 100644 (file)
@@ -61,6 +61,18 @@ class SocketConnection : public Connection {
     closing
   };
   state_t state = state_t::none;
+  // wait until current state changed
+  seastar::shared_promise<> state_changed;
+
+  // write_state is changed with state atomically, indicating the write
+  // behavior of the according state.
+  enum class write_state_t {
+    none,
+    delay,
+    open,
+    drop
+  };
+  write_state_t write_state = write_state_t::none;
 
   /// become valid only when state is state_t::closing
   seastar::shared_future<> close_ready;
@@ -74,7 +86,6 @@ class SocketConnection : public Connection {
     uint32_t connect_seq = 0;
     uint32_t peer_global_seq = 0;
     uint32_t global_seq;
-    seastar::promise<> promise;
   } h;
 
   /// server side of handshake negotiation
@@ -112,10 +123,12 @@ class SocketConnection : public Connection {
   seastar::future<> handle_tags();
   seastar::future<> handle_ack();
 
+  seastar::future<> write_event(MessageRef msg=nullptr);
+
   /// becomes available when handshake completes, and when all previous messages
   /// have been sent to the output stream. send() chains new messages as
   /// continuations to this future to act as a queue
-  seastar::future<> send_ready;
+  seastar::future<> send_ready = seastar::now();
 
   /// encode/write a message
   seastar::future<> write_message(MessageRef msg);
@@ -156,13 +169,15 @@ class SocketConnection : public Connection {
     } __attribute__((packed)) ack;
     ceph_timespec ack_stamp;
   } k;
+  bool m_keepalive = false;
+  bool m_keepalive_ack = false;
 
   seastar::future<> fault();
 
   void execute_open();
 
-  seastar::future<> do_send(MessageRef msg);
   seastar::future<> do_keepalive();
+  seastar::future<> do_keepalive_ack();
   seastar::future<> do_close();
 
  public: