From 0142e5440dc1e3e6370f61747881f04edee730b8 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 04:39:37 +0800 Subject: [PATCH] crimson/net: dispatch events in SocketConnection * move dispatch(), and exception handling logics in accept() and connect() from SocketMessenger into SocketConnection, so we can manage the state transition in the same class and at the same abstraction level. * gate the dangling futures in SocketConnection, because the connection's smart_ptr won't be hold by messenger any more during exception handling. * don't return close() inside SocketConnection to prevent recursive gating -- dead lock. Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 118 ++++++++++++++++++++++++---- src/crimson/net/SocketConnection.h | 26 ++++-- src/crimson/net/SocketMessenger.cc | 66 +--------------- src/crimson/net/SocketMessenger.h | 6 +- 4 files changed, 126 insertions(+), 90 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index a78bd5864bb..480e416525a 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -26,6 +26,7 @@ #include "crimson/common/log.h" #include "Config.h" +#include "Dispatcher.h" #include "Errors.h" #include "SocketMessenger.h" @@ -43,15 +44,18 @@ namespace { } SocketConnection::SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr) + const entity_addr_t& my_addr, + Dispatcher& dispatcher) : Connection(my_addr), messenger(messenger), + dispatcher(dispatcher), send_ready(h.promise.get_future()) { } SocketConnection::~SocketConnection() { + ceph_assert(pending_dispatch.is_closed()); // errors were reported to callers of send() ceph_assert(send_ready.available()); send_ready.ignore_ready_future(); @@ -310,10 +314,13 @@ seastar::future<> SocketConnection::close() assert(!close_ready.valid()); if (socket) { - close_ready = socket->close().finally(std::move(cleanup)); + close_ready = socket->close() + .then([this] { + return pending_dispatch.close(); + }).finally(std::move(cleanup)); } else { ceph_assert(state == state_t::connecting); - close_ready = seastar::now(); + close_ready = pending_dispatch.close().finally(std::move(cleanup)); } state = state_t::closing; return close_ready.get_future(); @@ -765,15 +772,8 @@ SocketConnection::repeat_connect() } seastar::future<> -SocketConnection::start_connect(const entity_addr_t& _peer_addr, - const entity_type_t& _peer_type) +SocketConnection::start_connect() { - ceph_assert(state == state_t::none); - ceph_assert(!socket); - peer_addr = _peer_addr; - peer_type = _peer_type; - messenger.register_conn(this); - state = state_t::connecting; return seastar::connect(peer_addr.in4_addr()) .then([this](seastar::connected_socket fd) { if (state == state_t::closing) { @@ -819,16 +819,39 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, }); } -seastar::future<> -SocketConnection::start_accept(seastar::connected_socket&& fd, - const entity_addr_t& _peer_addr) +void +SocketConnection::connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) { ceph_assert(state == state_t::none); ceph_assert(!socket); peer_addr = _peer_addr; - socket.emplace(std::move(fd)); - messenger.accept_conn(this); - state = state_t::accepting; + peer_type = _peer_type; + messenger.register_conn(this); + state = state_t::connecting; + seastar::with_gate(pending_dispatch, [this] { + return start_connect() + .then([this] { + // notify the dispatcher and allow them to reject the connection + return seastar::with_gate(messenger.pending_dispatch, [this] { + return dispatcher.ms_handle_connect(this); + }); + }).handle_exception([this] (std::exception_ptr eptr) { + // close the connection before returning errors + return seastar::make_exception_future<>(eptr) + .finally([this] { close(); }); + // TODO: retry on fault + }).then([this] { + // dispatch replies on this connection + dispatch() + .handle_exception([] (std::exception_ptr eptr) {}); + }); + }); +} + +seastar::future<> +SocketConnection::start_accept() +{ // encode/send server's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); @@ -862,6 +885,67 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, }); } +seastar::future<> +SocketConnection::accept(seastar::connected_socket&& fd, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr = _peer_addr; + socket.emplace(std::move(fd)); + messenger.accept_conn(this); + state = state_t::accepting; + return seastar::with_gate(pending_dispatch, [this] { + return start_accept() + .then([this] { + // notify the dispatcher and allow them to reject the connection + return seastar::with_gate(messenger.pending_dispatch, [=] { + return dispatcher.ms_handle_accept(this); + }); + }).handle_exception([this] (std::exception_ptr eptr) { + // close the connection before returning errors + return seastar::make_exception_future<>(eptr) + .finally([this] { close(); }); + }).then([this] { + // dispatch messages until the connection closes or the dispatch + // queue shuts down + return dispatch(); + }); + }); +} + +seastar::future<> +SocketConnection::dispatch() +{ + return seastar::with_gate(pending_dispatch, [this] { + return seastar::keep_doing([=] { + return read_message() + .then([=] (MessageRef msg) { + // start dispatch, ignoring exceptions from the application layer + seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] { + return dispatcher.ms_dispatch(this, std::move(msg)) + .handle_exception([] (std::exception_ptr eptr) {}); + }); + // return immediately to start on the next message + return seastar::now(); + }); + }).handle_exception_type([=] (const std::system_error& e) { + if (e.code() == error::connection_aborted || + e.code() == error::connection_reset) { + return seastar::with_gate(messenger.pending_dispatch, [=] { + return dispatcher.ms_handle_reset(this); + }); + } else if (e.code() == error::read_eof) { + return seastar::with_gate(messenger.pending_dispatch, [=] { + return dispatcher.ms_handle_remote_reset(this); + }); + } else { + throw e; + } + }); + }); +} + seastar::future<> SocketConnection::fault() { if (policy.lossy) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 3054744450d..3413b4d054e 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -36,6 +37,8 @@ using SocketConnectionRef = boost::intrusive_ptr; class SocketConnection : public Connection { SocketMessenger& messenger; std::optional socket; + Dispatcher& dispatcher; + seastar::gate pending_dispatch; enum class state_t { none, @@ -150,9 +153,19 @@ class SocketConnection : public Connection { seastar::future<> fault(); + seastar::future<> dispatch(); + + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + seastar::future<> start_connect(); + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + seastar::future<> start_accept(); + public: SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr); + const entity_addr_t& my_addr, + Dispatcher& dispatcher); ~SocketConnection(); Messenger* get_messenger() const override; @@ -170,13 +183,10 @@ class SocketConnection : public Connection { seastar::future<> close() override; public: - /// complete a handshake from the client's perspective - seastar::future<> start_connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type); - - /// complete a handshake from the server's perspective - seastar::future<> start_accept(seastar::connected_socket&& socket, - const entity_addr_t& peer_addr); + void connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + seastar::future<> accept(seastar::connected_socket&& socket, + const entity_addr_t& peer_addr); /// read a message from a connection that has completed its handshake seastar::future read_message(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 827267f238d..779f5fe2e5f 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -40,35 +40,6 @@ void SocketMessenger::bind(const entity_addr_t& addr) listener = seastar::listen(address, lo); } -seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn) -{ - return seastar::keep_doing([=] { - return conn->read_message() - .then([=] (MessageRef msg) { - // start dispatch, ignoring exceptions from the application layer - seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] { - return dispatcher->ms_dispatch(conn, std::move(msg)) - .handle_exception([] (std::exception_ptr eptr) {}); - }); - // return immediately to start on the next message - return seastar::now(); - }); - }).handle_exception_type([=] (const std::system_error& e) { - if (e.code() == error::connection_aborted || - e.code() == error::connection_reset) { - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_handle_reset(conn); - }); - } else if (e.code() == error::read_eof) { - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_handle_remote_reset(conn); - }); - } else { - throw e; - } - }); -} - seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, seastar::socket_address paddr) { @@ -76,23 +47,9 @@ seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, entity_addr_t peer_addr; peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr()); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); // initiate the handshake - return conn->start_accept(std::move(socket), peer_addr) - .then([this, conn] { - // notify the dispatcher and allow them to reject the connection - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_handle_accept(conn); - }); - }).handle_exception([conn] (std::exception_ptr eptr) { - // close the connection before returning errors - return seastar::make_exception_future<>(eptr) - .finally([conn] { return conn->close(); }); - }).then([this, conn] { - // dispatch messages until the connection closes or the dispatch - // queue shuts down - return dispatch(std::move(conn)); - }); + return conn->accept(std::move(socket), peer_addr); } seastar::future<> SocketMessenger::start(Dispatcher *disp) @@ -127,23 +84,8 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe if (auto found = lookup_conn(peer_addr); found) { return found; } - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr()); - conn->start_connect(peer_addr, peer_type) - .then([this, conn] { - // notify the dispatcher and allow them to reject the connection - return seastar::with_gate(pending_dispatch, [this, conn] { - return dispatcher->ms_handle_connect(conn); - }); - }).handle_exception([conn] (std::exception_ptr eptr) { - // close the connection before returning errors - return seastar::make_exception_future<>(eptr) - .finally([conn] { return conn->close(); }); - // TODO: retry on fault - }).then([this, conn] { - // dispatch replies on this connection - dispatch(conn) - .handle_exception([] (std::exception_ptr eptr) {}); - }); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); + conn->connect(peer_addr, peer_type); return conn; } diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index d2ef0b6456d..077f8a2716e 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -36,9 +36,6 @@ class SocketMessenger final : public Messenger { std::set accepting_conns; using Throttle = ceph::thread::Throttle; ceph::net::PolicySet policy_set; - seastar::gate pending_dispatch; - - seastar::future<> dispatch(SocketConnectionRef conn); seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); @@ -65,6 +62,9 @@ class SocketMessenger final : public Messenger { bool force_new) override; public: + // TODO: change to per-connection messenger gate + seastar::gate pending_dispatch; + void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p); void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); -- 2.39.5