From: Yingxin Cheng Date: Thu, 12 Mar 2020 15:38:22 +0000 (+0800) Subject: crimson/net: cleanup dispatches with connection gate X-Git-Tag: v16.0.0~8^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6c51c7aa8248eec4e09be51efb2741954cfcb78e;p=ceph-ci.git crimson/net: cleanup dispatches with connection gate Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 2183543efb4..8ae7bb1d3af 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -314,13 +314,8 @@ void Protocol::write_event() case write_state_t::open: [[fallthrough]]; case write_state_t::delay: - (void) seastar::with_gate(pending_dispatch, [this] { - return do_write_dispatch_sweep( - ).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} do_write_dispatch_sweep(): unexpected exception {}", - conn, eptr); - ceph_abort(); - }); + gated_dispatch("do_write_dispatch_sweep", [this] { + return do_write_dispatch_sweep(); }); return; case write_state_t::drop: diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index bb73746e12b..c5f8a5b7286 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -6,6 +6,7 @@ #include #include +#include "crimson/common/log.h" #include "Fwd.h" #include "SocketConnection.h" @@ -62,17 +63,27 @@ class Protocol { const proto_t proto_type; protected: + template + void gated_dispatch(const char* what, Func&& func) { + (void) seastar::with_gate(pending_dispatch, std::forward(func) + ).handle_exception([this, what] (std::exception_ptr eptr) { + crimson::get_logger(ceph_subsys_ms).error( + "{} gated_dispatch() {} caught exception: {}", conn, what, eptr); + ceph_abort("unexpected exception from gated_dispatch()"); + }); + } + Dispatcher &dispatcher; SocketConnection &conn; SocketRef socket; - seastar::gate pending_dispatch; AuthConnectionMetaRef auth_meta; private: bool closed = false; // become valid only after closed == true seastar::shared_future<> close_ready; + seastar::gate pending_dispatch; // the write state-machine public: diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index db18a550445..26344b0fb4f 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -318,7 +318,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, conn.policy = messenger.get_policy(_peer_type); messenger.register_conn( seastar::static_pointer_cast(conn.shared_from_this())); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("start_connect", [this] { return Socket::connect(conn.peer_addr) .then([this](SocketRef sock) { socket = std::move(sock); @@ -618,7 +618,7 @@ void ProtocolV1::start_accept(SocketRef&& sock, socket = std::move(sock); messenger.accept_conn( seastar::static_pointer_cast(conn.shared_from_this())); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("start_accept", [this] { // stop learning my_addr before sending it out, so it won't change return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] { // encode/send server's handshake header @@ -848,15 +848,11 @@ seastar::future<> ProtocolV1::read_message() } // start dispatch, ignoring exceptions from the application layer - (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { - logger().debug("{} <== #{} === {} ({})", - conn, msg->get_seq(), *msg, msg->get_type()); - return dispatcher.ms_dispatch(&conn, std::move(msg)) - .handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_dispatch caught exception: {}", conn, eptr); - ceph_assert(false); - }); - }); + gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] { + logger().debug("{} <== #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + return dispatcher.ms_dispatch(&conn, std::move(msg)); + }); }); } @@ -895,7 +891,7 @@ void ProtocolV1::execute_open() state = state_t::open; set_write_state(write_state_t::open); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("execute_open", [this] { // start background processing of tags return handle_tags() .handle_exception_type([this] (const std::system_error& e) { diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index c5732554221..cc752f43a1c 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -440,12 +440,9 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; reset_write(); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("ms_handle_remote_reset", [this] { return dispatcher.ms_handle_remote_reset( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_remote_reset()"); }); } } @@ -850,7 +847,7 @@ void ProtocolV2::execute_connecting() if (socket) { socket->shutdown(); } - execution_done = seastar::with_gate(pending_dispatch, [this] { + gated_execute("execute_connecting", [this] { // we don't know my socket_port yet conn.set_ephemeral_port(0, SocketConnection::side_t::none); return messenger.get_global_seq().then([this] (auto gs) { @@ -874,7 +871,8 @@ void ProtocolV2::execute_connecting() abort_protocol(); } if (socket) { - (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable { + gated_dispatch("close_sockect_connecting", + [sock = std::move(socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); } @@ -932,12 +930,9 @@ void ProtocolV2::execute_connecting() } switch (next) { case next_step_t::ready: { - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("ms_handle_connect", [this] { return dispatcher.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_connect caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_connect()"); }); logger().info("{} connected:" " gs={}, pgs={}, cs={}, client_cookie={}," @@ -1488,7 +1483,7 @@ ProtocolV2::server_reconnect() void ProtocolV2::execute_accepting() { trigger_state(state_t::ACCEPTING, write_state_t::none, false); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("execute_accepting", [this] { return seastar::futurize_apply([this] { INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared(); @@ -1615,15 +1610,12 @@ void ProtocolV2::execute_establishing( accept_me(); } - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("ms_handle_accept_establishing", [this] { return dispatcher.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_accept()"); }); - execution_done = seastar::with_gate(pending_dispatch, [this] { + gated_execute("execute_establishing", [this] { return seastar::futurize_apply([this] { return send_server_ident(); }).then([this] { @@ -1716,23 +1708,20 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (socket) { socket->shutdown(); } - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("ms_handle_accept_replacing", [this] { return dispatcher.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_accept()"); }); - (void) seastar::with_gate(pending_dispatch, - [this, - reconnect, - do_reset, - new_socket = std::move(new_socket), - new_auth_meta = std::move(new_auth_meta), - new_rxtx = std::move(new_rxtx), - new_client_cookie, new_peer_name, - new_conn_features, new_peer_global_seq, - new_connect_seq, new_msg_seq] () mutable { + gated_dispatch("trigger_replacing", + [this, + reconnect, + do_reset, + new_socket = std::move(new_socket), + new_auth_meta = std::move(new_auth_meta), + new_rxtx = std::move(new_rxtx), + new_client_cookie, new_peer_name, + new_conn_features, new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { return wait_write_exit().then([this, do_reset] { if (do_reset) { reset_session(true); @@ -1754,7 +1743,8 @@ void ProtocolV2::trigger_replacing(bool reconnect, } if (socket) { - (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable { + gated_dispatch("close_socket_replacing", + [sock = std::move(socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); } @@ -1946,11 +1936,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; - (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { + gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] { return dispatcher.ms_dispatch(&conn, std::move(msg)); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_dispatch caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_dispatch()"); }); }); } @@ -1964,7 +1951,7 @@ void ProtocolV2::execute_ready() conn.interceptor->register_conn_ready(conn); } #endif - execution_done = seastar::with_gate(pending_dispatch, [this] { + gated_execute("execute_ready", [this] { protocol_timer.cancel(); return seastar::keep_doing([this] { return read_main_preamble() @@ -2070,8 +2057,7 @@ void ProtocolV2::execute_wait(bool max_backoff) if (socket) { socket->shutdown(); } - execution_done = seastar::with_gate(pending_dispatch, - [this, max_backoff] { + gated_execute("execute_wait", [this, max_backoff] { double backoff = protocol_timer.last_dur(); if (max_backoff) { backoff = conf.ms_max_backoff; @@ -2102,7 +2088,7 @@ void ProtocolV2::execute_wait(bool max_backoff) void ProtocolV2::execute_server_wait() { trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false); - execution_done = seastar::with_gate(pending_dispatch, [this] { + gated_execute("execute_server_wait", [this] { return read_exactly(1).then([this] (auto bl) { logger().warn("{} SERVER_WAIT got read, abort", conn); abort_in_fault(); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 4aa7b276070..f98bf3d4366 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -80,6 +80,14 @@ class ProtocolV2 final : public Protocol { seastar::shared_future<> execution_done = seastar::now(); + template + void gated_execute(const char* what, Func&& func) { + gated_dispatch(what, [this, &func] { + execution_done = seastar::futurize_apply(std::forward(func)); + return execution_done.get_future(); + }); + } + class Timer { double last_dur_ = 0.0; const SocketConnection& conn;