From: Kefu Chai Date: Sat, 18 Aug 2018 10:04:20 +0000 (+0800) Subject: crimson/net: add keepalive support to Connection X-Git-Tag: v14.0.1~526^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=06dab8f1ad34377c768ae51fcddb524e3bdfe5ad;p=ceph.git crimson/net: add keepalive support to Connection * do not mix buffered writes and zero-copy writes. /root/ceph/src/seastar/include/seastar/core/iostream-impl.hh:114: seastar::future<> seastar::output_stream::write(seastar::net::packet) [with CharType = char]: Assertion `!_end && "Mixing buffered writes and zero-copy writes not supported yet"' failed. Aborting on shard 0. Signed-off-by: Kefu Chai --- diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 3cbe0c1f837b7..e5e2a11f92e55 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -59,6 +59,10 @@ class Connection : public boost::intrusive_ref_counter send(MessageRef msg) = 0; + /// send a keepalive message over a connection that has completed its + /// handshake + virtual seastar::future<> keepalive() = 0; + /// close the connection and cancel any any pending futures from read/send virtual seastar::future<> close() = 0; diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 1edaf1f90987c..a9d8162ab5ca4 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include "Config.h" #include "Messenger.h" @@ -312,6 +313,21 @@ seastar::future<> SocketConnection::send(MessageRef msg) return f.get_future(); } +seastar::future<> SocketConnection::keepalive() +{ + seastar::shared_future<> f = send_ready.then([this] { + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + seastar::net::packet msg{reinterpret_cast(&k.req), + sizeof(k.req)}; + return out.write(std::move(msg)); + }).then([this] { + return out.flush(); + }); + send_ready = f.get_future(); + return f.get_future(); +} + seastar::future<> SocketConnection::close() { // unregister_conn() drops a reference, so hold another until completion @@ -539,14 +555,11 @@ SocketConnection::handle_keepalive2() { return in.read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { - auto t = reinterpret_cast(buf.get()); - k.reply_stamp = *t; - std::cout << "keepalive2 " << t->tv_sec << std::endl; - char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; - return out.write(reinterpret_cast(&tag), sizeof(tag)); - }).then([this] { - out.write(reinterpret_cast(&k.reply_stamp), - sizeof(k.reply_stamp)); + k.ack.stamp = *reinterpret_cast(buf.get()); + std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl; + seastar::net::packet msg{reinterpret_cast(&k.ack), + sizeof(k.ack)}; + return out.write(std::move(msg)); }).then([this] { return out.flush(); }); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index e5d8f2b949494..01c8cb532db38 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -128,7 +128,14 @@ class SocketConnection : public Connection { static void discard_up_to(std::queue*, seq_num_t); struct Keepalive { - ceph_timespec reply_stamp; + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2; + ceph_timespec stamp; + } __attribute__((packed)) req; + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; + ceph_timespec stamp; + } __attribute__((packed)) ack; ceph_timespec ack_stamp; } k; @@ -152,6 +159,8 @@ class SocketConnection : public Connection { seastar::future<> send(MessageRef msg) override; + seastar::future<> keepalive() override; + seastar::future<> close() override; uint32_t connect_seq() const override {