From: Yingxin Date: Wed, 21 Nov 2018 21:24:31 +0000 (+0800) Subject: crimson/net: encapsulate protocol implementations with open state X-Git-Tag: v14.1.0~574^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=b22cf59c835b0e836f45186d1f3a69342d272376;p=ceph-ci.git crimson/net: encapsulate protocol implementations with open state Signed-off-by: Yingxin --- diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 1359dc89008..23a9e3de251 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -809,10 +809,6 @@ SocketConnection::start_connect() return repeat_connect(); }); // TODO: handle errors for state_t::connecting - }).then([this] { - state = state_t::open; - // start background processing of tags - read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); @@ -842,8 +838,7 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, .finally([this] { close(); }); // TODO: retry on fault }).then([this] { - // dispatch replies on this connection - dispatch(); + execute_open(); }); }); } @@ -874,10 +869,6 @@ SocketConnection::start_accept() return repeat_handle_connect(); }); // TODO: handle errors for state_t::accepting - }).then([this] { - state = state_t::open; - // start background processing of tags - read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); @@ -906,36 +897,37 @@ SocketConnection::accept(seastar::connected_socket&& fd, return seastar::make_exception_future<>(eptr) .finally([this] { close(); }); }).then([this] { - // dispatch messages until the connection closes or the dispatch - // queue shuts down - dispatch(); + execute_open(); }); }); } void -SocketConnection::dispatch() +SocketConnection::execute_open() { + state = state_t::open; seastar::with_gate(pending_dispatch, [this] { - return seastar::keep_doing([=] { + // start background processing of tags + read_tags_until_next_message(); + return seastar::keep_doing([this] { return read_message() - .then([=] (MessageRef msg) { + .then([this] (MessageRef msg) { // start dispatch, ignoring exceptions from the application layer - seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] { + seastar::with_gate(messenger.pending_dispatch, [this, msg = std::move(msg)] { return dispatcher.ms_dispatch(this, std::move(msg)) .handle_exception([] (std::exception_ptr eptr) {}); }); // return immediately to start on the next message return seastar::now(); }); - }).handle_exception_type([=] (const std::system_error& e) { + }).handle_exception_type([this] (const std::system_error& e) { if (e.code() == error::connection_aborted || e.code() == error::connection_reset) { - return seastar::with_gate(messenger.pending_dispatch, [=] { + return seastar::with_gate(messenger.pending_dispatch, [this] { return dispatcher.ms_handle_reset(this); }); } else if (e.code() == error::read_eof) { - return seastar::with_gate(messenger.pending_dispatch, [=] { + return seastar::with_gate(messenger.pending_dispatch, [this] { return dispatcher.ms_handle_remote_reset(this); }); } else { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index bf249d8e46a..b69d78d3c63 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -153,7 +153,7 @@ class SocketConnection : public Connection { seastar::future<> fault(); - void dispatch(); + void execute_open(); /// start a handshake from the client's perspective, /// only call when SocketConnection first construct