return write_state == write_state_t::open;
}
-void Protocol::close(bool dispatch_reset)
+void Protocol::close(bool dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
{
if (closed) {
// already closing
// atomic operations
trigger_close();
+ if (f_accept_new) {
+ (*f_accept_new)();
+ }
if (socket) {
socket->shutdown();
}
closed = true;
set_write_state(write_state_t::drop);
auto gate_closed = pending_dispatch.close();
+ auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] {
+ if (dispatch_reset) {
+ return dispatcher.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }
+ return seastar::now();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
+ ceph_abort("unexpected exception from ms_handle_reset()");
+ });
// asynchronous operations
close_ready = seastar::when_all_succeed(
}
return seastar::now();
}),
- [this, dispatch_reset] {
- if (dispatch_reset) {
- // force ms_handle_reset() to be an asynchronous task to prevent
- // internal state contamination.
- return seastar::sleep(0s).then([this] {
- return dispatcher.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_reset()");
- });
- }
- return seastar::now();
- }
+ std::move(reset_dispatched)
).finally(std::move(cleanup));
}
logger().warn("{} server_connect:"
" existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing_conn);
- existing_proto->close(true);
-
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} in execute_accepting()",
- conn, get_state_name(state));
- abort_protocol();
- }
- execute_establishing();
+ execute_establishing(existing_conn, true);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
+ bool dispatch_reset = true;
if (existing_conn) {
if (existing_conn->protocol->proto_type != proto_t::v2) {
logger().warn("{} existing connection {} proto version is {}, close existing",
static_cast<int>(existing_conn->protocol->proto_type));
// should unregister the existing from msgr atomically
// NOTE: this is following async messenger logic, but we may miss the reset event.
- (void) existing_conn->close();
+ dispatch_reset = false;
} else {
return handle_existing_connection(existing_conn);
}
}
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} in execute_accepting()",
- conn, get_state_name(state));
- abort_protocol();
- }
- execute_establishing();
+ execute_establishing(existing_conn, dispatch_reset);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
}
// ESTABLISHING
-void ProtocolV2::execute_establishing() {
+void ProtocolV2::execute_establishing(
+ SocketConnectionRef existing_conn, bool dispatch_reset) {
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto accept_me = [this] {
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ };
+
trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+ if (existing_conn) {
+ existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+ } else {
+ accept_me();
+ }
+
(void) seastar::with_gate(pending_dispatch, [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_accept()");
});
- messenger.register_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
- messenger.unaccept_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
+
execution_done = seastar::with_gate(pending_dispatch, [this] {
return seastar::futurize_apply([this] {
return send_server_ident();