From: Yingxin Date: Wed, 21 Nov 2018 21:09:20 +0000 (+0800) Subject: crimson/net: remove unecessary future dependencies for accept/dispatch X-Git-Tag: v14.1.0~574^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=60d5dd0c2491c8803ccd6eee925848c2eacf3b67;p=ceph-ci.git crimson/net: remove unecessary future dependencies for accept/dispatch Signed-off-by: Yingxin --- diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 480e416525a..1359dc89008 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -843,8 +843,7 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, // TODO: retry on fault }).then([this] { // dispatch replies on this connection - dispatch() - .handle_exception([] (std::exception_ptr eptr) {}); + dispatch(); }); }); } @@ -885,7 +884,7 @@ SocketConnection::start_accept() }); } -seastar::future<> +void SocketConnection::accept(seastar::connected_socket&& fd, const entity_addr_t& _peer_addr) { @@ -895,7 +894,7 @@ SocketConnection::accept(seastar::connected_socket&& fd, socket.emplace(std::move(fd)); messenger.accept_conn(this); state = state_t::accepting; - return seastar::with_gate(pending_dispatch, [this] { + seastar::with_gate(pending_dispatch, [this] { return start_accept() .then([this] { // notify the dispatcher and allow them to reject the connection @@ -909,15 +908,15 @@ SocketConnection::accept(seastar::connected_socket&& fd, }).then([this] { // dispatch messages until the connection closes or the dispatch // queue shuts down - return dispatch(); + dispatch(); }); }); } -seastar::future<> +void SocketConnection::dispatch() { - return seastar::with_gate(pending_dispatch, [this] { + seastar::with_gate(pending_dispatch, [this] { return seastar::keep_doing([=] { return read_message() .then([=] (MessageRef msg) { @@ -942,6 +941,8 @@ SocketConnection::dispatch() } else { throw e; } + }).handle_exception([] (std::exception_ptr eptr) { + // TODO: handle fault in the open state }); }); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 3413b4d054e..bf249d8e46a 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -153,7 +153,7 @@ class SocketConnection : public Connection { seastar::future<> fault(); - seastar::future<> dispatch(); + void dispatch(); /// start a handshake from the client's perspective, /// only call when SocketConnection first construct @@ -185,8 +185,8 @@ class SocketConnection : public Connection { public: void connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type); - seastar::future<> accept(seastar::connected_socket&& socket, - const entity_addr_t& peer_addr); + void accept(seastar::connected_socket&& socket, + const entity_addr_t& peer_addr); /// read a message from a connection that has completed its handshake seastar::future read_message(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 779f5fe2e5f..6ecf9d3ddef 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -40,18 +40,6 @@ void SocketMessenger::bind(const entity_addr_t& addr) listener = seastar::listen(address, lo); } -seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, - seastar::socket_address paddr) -{ - // allocate the connection - entity_addr_t peer_addr; - peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); - peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); - // initiate the handshake - return conn->accept(std::move(socket), peer_addr); -} - seastar::future<> SocketMessenger::start(Dispatcher *disp) { dispatcher = disp; @@ -62,10 +50,13 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) return listener->accept() .then([this] (seastar::connected_socket socket, seastar::socket_address paddr) { - // start processing the connection - accept(std::move(socket), paddr) - .handle_exception([] (std::exception_ptr eptr) {}); + // allocate the connection + entity_addr_t peer_addr; + peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); // don't wait before accepting another + conn->accept(std::move(socket), peer_addr); }); }).handle_exception_type([this] (const std::system_error& e) { // stop gracefully on connection_aborted