]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: add proper gating and fault handling for send/keepalive
authorYingxin <yingxin.cheng@intel.com>
Fri, 28 Dec 2018 02:51:25 +0000 (10:51 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Fri, 4 Jan 2019 06:06:20 +0000 (14:06 +0800)
Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index a18e6c302a6124d669e5d797447deb18c044c84d..cf1d5cd591d752c5492314931b75ce44f1be6027 100644 (file)
@@ -54,9 +54,6 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
 SocketConnection::~SocketConnection()
 {
   ceph_assert(pending_dispatch.is_closed());
-  // errors were reported to callers of send()
-  ceph_assert(send_ready.available());
-  send_ready.ignore_ready_future();
 }
 
 ceph::net::Messenger*
@@ -69,6 +66,28 @@ bool SocketConnection::is_connected()
   return !send_ready.failed();
 }
 
+seastar::future<> SocketConnection::send(MessageRef msg)
+{
+  return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
+      return do_send(std::move(msg))
+        .handle_exception([this] (std::exception_ptr eptr) {
+          logger().warn("{} send fault: {}", *this, eptr);
+          close();
+        });
+    });
+}
+
+seastar::future<> SocketConnection::keepalive()
+{
+  return seastar::with_gate(pending_dispatch, [this] {
+      return do_keepalive()
+        .handle_exception([this] (std::exception_ptr eptr) {
+          logger().warn("{} keepalive fault: {}", *this, eptr);
+          close();
+        });
+    });
+}
+
 void SocketConnection::read_tags_until_next_message()
 {
   seastar::repeat([this] {
@@ -260,9 +279,10 @@ seastar::future<> SocketConnection::write_message(MessageRef msg)
     });
 }
 
-seastar::future<> SocketConnection::send(MessageRef msg)
+seastar::future<> SocketConnection::do_send(MessageRef msg)
 {
   // chain the message after the last message is sent
+  // TODO: retry send for lossless connection
   seastar::shared_future<> f = send_ready.then(
     [this, msg = std::move(msg)] {
       return write_message(std::move(msg));
@@ -274,8 +294,9 @@ seastar::future<> SocketConnection::send(MessageRef msg)
   return f.get_future();
 }
 
-seastar::future<> SocketConnection::keepalive()
+seastar::future<> SocketConnection::do_keepalive()
 {
+  // TODO: retry keepalive for lossless connection
   seastar::shared_future<> f = send_ready.then([this] {
       k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
         ceph::coarse_real_clock::now());
index a48f5aff66b3217e3669060746278de15999aefd..ecb2a9df595b38492c9d82885d5f2a5b07e13216 100644 (file)
@@ -165,6 +165,9 @@ class SocketConnection : public Connection {
 
   void execute_open();
 
+  seastar::future<> do_send(MessageRef msg);
+  seastar::future<> do_keepalive();
+
  public:
   SocketConnection(SocketMessenger& messenger,
                    Dispatcher& dispatcher);