From eb2775d46900e565163323a672f3b5be5c3093c1 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Fri, 28 Dec 2018 10:51:25 +0800 Subject: [PATCH] crimson/net: add proper gating and fault handling for send/keepalive Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 31 ++++++++++++++++++++++++----- src/crimson/net/SocketConnection.h | 3 +++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index a18e6c302a612..cf1d5cd591d75 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -54,9 +54,6 @@ SocketConnection::SocketConnection(SocketMessenger& messenger, SocketConnection::~SocketConnection() { ceph_assert(pending_dispatch.is_closed()); - // errors were reported to callers of send() - ceph_assert(send_ready.available()); - send_ready.ignore_ready_future(); } ceph::net::Messenger* @@ -69,6 +66,28 @@ bool SocketConnection::is_connected() return !send_ready.failed(); } +seastar::future<> SocketConnection::send(MessageRef msg) +{ + return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { + return do_send(std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} send fault: {}", *this, eptr); + close(); + }); + }); +} + +seastar::future<> SocketConnection::keepalive() +{ + return seastar::with_gate(pending_dispatch, [this] { + return do_keepalive() + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} keepalive fault: {}", *this, eptr); + close(); + }); + }); +} + void SocketConnection::read_tags_until_next_message() { seastar::repeat([this] { @@ -260,9 +279,10 @@ seastar::future<> SocketConnection::write_message(MessageRef msg) }); } -seastar::future<> SocketConnection::send(MessageRef msg) +seastar::future<> SocketConnection::do_send(MessageRef msg) { // chain the message after the last message is sent + // TODO: retry send for lossless connection seastar::shared_future<> f = send_ready.then( [this, msg = std::move(msg)] { return write_message(std::move(msg)); @@ -274,8 +294,9 @@ seastar::future<> SocketConnection::send(MessageRef msg) return f.get_future(); } -seastar::future<> SocketConnection::keepalive() +seastar::future<> SocketConnection::do_keepalive() { + // TODO: retry keepalive for lossless connection seastar::shared_future<> f = send_ready.then([this] { k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( ceph::coarse_real_clock::now()); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index a48f5aff66b32..ecb2a9df595b3 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -165,6 +165,9 @@ class SocketConnection : public Connection { void execute_open(); + seastar::future<> do_send(MessageRef msg); + seastar::future<> do_keepalive(); + public: SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher); -- 2.39.5