]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: add keepalive support to Connection
authorKefu Chai <kchai@redhat.com>
Sat, 18 Aug 2018 10:04:20 +0000 (18:04 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 21 Aug 2018 08:22:27 +0000 (16:22 +0800)
* do not mix buffered writes and zero-copy writes.

/root/ceph/src/seastar/include/seastar/core/iostream-impl.hh:114:
seastar::future<> seastar::output_stream::write(seastar::net::packet)
[with CharType = char]: Assertion `!_end && "Mixing buffered writes and
zero-copy writes not supported yet"' failed.
Aborting on shard 0.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/net/Connection.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 3cbe0c1f837b7605454c517440110233d49764e5..e5e2a11f92e557464fe5ef59c37b4362699bafed 100644 (file)
@@ -59,6 +59,10 @@ class Connection : public boost::intrusive_ref_counter<Connection,
   /// send a message over a connection that has completed its handshake
   virtual seastar::future<> send(MessageRef msg) = 0;
 
+  /// send a keepalive message over a connection that has completed its
+  /// handshake
+  virtual seastar::future<> keepalive() = 0;
+
   /// close the connection and cancel any any pending futures from read/send
   virtual seastar::future<> close() = 0;
 
index 1edaf1f90987ca1c755b2a64dbf6b654a6c0e411..a9d8162ab5ca4cf82cdca03d7219e6a70a135bd0 100644 (file)
@@ -15,6 +15,7 @@
 #include <algorithm>
 #include <seastar/core/shared_future.hh>
 #include <seastar/core/sleep.hh>
+#include <seastar/net/packet.hh>
 
 #include "Config.h"
 #include "Messenger.h"
@@ -312,6 +313,21 @@ seastar::future<> SocketConnection::send(MessageRef msg)
   return f.get_future();
 }
 
+seastar::future<> SocketConnection::keepalive()
+{
+  seastar::shared_future<> f = send_ready.then([this] {
+      k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+        ceph::coarse_real_clock::now());
+      seastar::net::packet msg{reinterpret_cast<const char*>(&k.req),
+                               sizeof(k.req)};
+      return out.write(std::move(msg));
+    }).then([this] {
+      return out.flush();
+    });
+  send_ready = f.get_future();
+  return f.get_future();
+}
+
 seastar::future<> SocketConnection::close()
 {
   // unregister_conn() drops a reference, so hold another until completion
@@ -539,14 +555,11 @@ SocketConnection::handle_keepalive2()
 {
   return in.read_exactly(sizeof(ceph_timespec))
     .then([this] (auto buf) {
-      auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
-      k.reply_stamp = *t;
-      std::cout << "keepalive2 " << t->tv_sec << std::endl;
-      char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
-      return out.write(reinterpret_cast<const char*>(&tag), sizeof(tag));
-    }).then([this] {
-      out.write(reinterpret_cast<const char*>(&k.reply_stamp),
-                sizeof(k.reply_stamp));
+      k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
+      std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
+      seastar::net::packet msg{reinterpret_cast<const char*>(&k.ack),
+                               sizeof(k.ack)};
+      return out.write(std::move(msg));
     }).then([this] {
       return out.flush();
     });
index e5d8f2b949494b48c3103cd6ef358a95318b066b..01c8cb532db3803862954aa9d3b39d158db30549 100644 (file)
@@ -128,7 +128,14 @@ class SocketConnection : public Connection {
   static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
 
   struct Keepalive {
-    ceph_timespec reply_stamp;
+    struct {
+      const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
+      ceph_timespec stamp;
+    } __attribute__((packed)) req;
+    struct {
+      const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+      ceph_timespec stamp;
+    } __attribute__((packed)) ack;
     ceph_timespec ack_stamp;
   } k;
 
@@ -152,6 +159,8 @@ class SocketConnection : public Connection {
 
   seastar::future<> send(MessageRef msg) override;
 
+  seastar::future<> keepalive() override;
+
   seastar::future<> close() override;
 
   uint32_t connect_seq() const override {