conn.policy = messenger.get_policy(_peer_type);
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("start_connect", [this] {
return Socket::connect(conn.peer_addr)
.then([this](SocketRef sock) {
socket = std::move(sock);
socket = std::move(sock);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("start_accept", [this] {
// stop learning my_addr before sending it out, so it won't change
return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
// encode/send server's handshake header
}
// start dispatch, ignoring exceptions from the application layer
- (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
- logger().debug("{} <== #{} === {} ({})",
- conn, msg->get_seq(), *msg, msg->get_type());
- return dispatcher.ms_dispatch(&conn, std::move(msg))
- .handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
- ceph_assert(false);
- });
- });
+ gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
+ logger().debug("{} <== #{} === {} ({})",
+ conn, msg->get_seq(), *msg, msg->get_type());
+ return dispatcher.ms_dispatch(&conn, std::move(msg));
+ });
});
}
state = state_t::open;
set_write_state(write_state_t::open);
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("execute_open", [this] {
// start background processing of tags
return handle_tags()
.handle_exception_type([this] (const std::system_error& e) {
client_cookie = generate_client_cookie();
peer_global_seq = 0;
reset_write();
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("ms_handle_remote_reset", [this] {
return dispatcher.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_remote_reset()");
});
}
}
if (socket) {
socket->shutdown();
}
- execution_done = seastar::with_gate(pending_dispatch, [this] {
+ gated_execute("execute_connecting", [this] {
// we don't know my socket_port yet
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
return messenger.get_global_seq().then([this] (auto gs) {
abort_protocol();
}
if (socket) {
- (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+ gated_dispatch("close_sockect_connecting",
+ [sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
}
switch (next) {
case next_step_t::ready: {
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("ms_handle_connect", [this] {
return dispatcher.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_connect caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_connect()");
});
logger().info("{} connected:"
" gs={}, pgs={}, cs={}, client_cookie={},"
void ProtocolV2::execute_accepting()
{
trigger_state(state_t::ACCEPTING, write_state_t::none, false);
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("execute_accepting", [this] {
return seastar::futurize_apply([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
accept_me();
}
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("ms_handle_accept_establishing", [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_accept()");
});
- execution_done = seastar::with_gate(pending_dispatch, [this] {
+ gated_execute("execute_establishing", [this] {
return seastar::futurize_apply([this] {
return send_server_ident();
}).then([this] {
if (socket) {
socket->shutdown();
}
- (void) seastar::with_gate(pending_dispatch, [this] {
+ gated_dispatch("ms_handle_accept_replacing", [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_accept()");
});
- (void) seastar::with_gate(pending_dispatch,
- [this,
- reconnect,
- do_reset,
- new_socket = std::move(new_socket),
- new_auth_meta = std::move(new_auth_meta),
- new_rxtx = std::move(new_rxtx),
- new_client_cookie, new_peer_name,
- new_conn_features, new_peer_global_seq,
- new_connect_seq, new_msg_seq] () mutable {
+ gated_dispatch("trigger_replacing",
+ [this,
+ reconnect,
+ do_reset,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
return wait_write_exit().then([this, do_reset] {
if (do_reset) {
reset_session(true);
}
if (socket) {
- (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+ gated_dispatch("close_socket_replacing",
+ [sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+ gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
return dispatcher.ms_dispatch(&conn, std::move(msg));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_dispatch()");
});
});
}
conn.interceptor->register_conn_ready(conn);
}
#endif
- execution_done = seastar::with_gate(pending_dispatch, [this] {
+ gated_execute("execute_ready", [this] {
protocol_timer.cancel();
return seastar::keep_doing([this] {
return read_main_preamble()
if (socket) {
socket->shutdown();
}
- execution_done = seastar::with_gate(pending_dispatch,
- [this, max_backoff] {
+ gated_execute("execute_wait", [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
backoff = conf.ms_max_backoff;
void ProtocolV2::execute_server_wait()
{
trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
- execution_done = seastar::with_gate(pending_dispatch, [this] {
+ gated_execute("execute_server_wait", [this] {
return read_exactly(1).then([this] (auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();