" this connection", conn, *existing_conn);
existing_proto->dispatch_reset();
existing_proto->close();
- return send_server_ident();
+
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} in execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing();
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
if (existing_proto->server_cookie != 0) {
}
}
- // TODO: atomically register & unaccept the connecton with lookup_conn()
-
- // if everything is OK reply with server identification
- return send_server_ident();
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} in execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing();
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
}
void ProtocolV2::execute_accepting()
{
- // TODO: change to write_state_t::none
- trigger_state(state_t::ACCEPTING, write_state_t::delay, false);
+ trigger_state(state_t::ACCEPTING, write_state_t::none, false);
seastar::with_gate(pending_dispatch, [this] {
return seastar::futurize_apply([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
}
}
}).then([this] (next_step_t next) {
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} at the end of execute_accepting()",
- conn, get_state_name(state));
- abort_protocol();
- }
switch (next) {
- case next_step_t::ready: {
- seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_accept()");
- });
- messenger.register_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
- messenger.unaccept_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
- logger().info("{} accepted: gs={}, pgs={}, cs={},"
- " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
- conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, conn.in_seq, conn.out_seq);
- execute_ready();
+ case next_step_t::ready:
+ assert(state != state_t::ACCEPTING);
break;
- }
- case next_step_t::wait: {
+ case next_step_t::wait:
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} at the end of execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
execute_server_wait();
break;
- }
- default: {
+ default:
ceph_abort("impossible next step");
- }
}
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
});
}
-// ACCEPTING or REPLACING state
+// ESTABLISHING
-seastar::future<ProtocolV2::next_step_t>
+void ProtocolV2::execute_establishing() {
+ trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+ seastar::with_gate(pending_dispatch, [this] {
+ return dispatcher.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
+ ceph_abort("unexpected exception from ms_handle_accept()");
+ });
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ logger().info("{} accepted: gs={}, pgs={}, cs={},"
+ " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq, conn.out_seq);
+ execution_done = seastar::with_gate(pending_dispatch, [this] {
+ return seastar::futurize_apply([this] {
+ return send_server_ident();
+ }).then([this] {
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} at the end of execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_ready();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ if (state != state_t::ESTABLISHING) {
+ logger().info("{} execute_establishing() protocol aborted at {} -- {}",
+ conn, get_state_name(state), eptr);
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ return;
+ }
+ fault(false, "execute_establishing()", eptr);
+ });
+ });
+}
+
+// ESTABLISHING or REPLACING state
+
+seastar::future<>
ProtocolV2::send_server_ident()
{
// send_server_ident() logic
conn.set_features(connection_features);
return write_frame(server_ident);
- }).then([] {
- return next_step_t::ready;
});
}
if (socket) {
socket->shutdown();
}
+ if (!reconnect && new_client_cookie != client_cookie) {
+ seastar::with_gate(pending_dispatch, [this] {
+ return dispatcher.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
+ ceph_abort("unexpected exception from ms_handle_accept()");
+ });
+ }
seastar::with_gate(pending_dispatch,
[this,
reconnect,
client_cookie = new_client_cookie;
conn.set_peer_name(new_peer_name);
connection_features = new_conn_features;
- return send_server_ident().then([] (next_step_t next) {
- assert(next == next_step_t::ready);
- });
+ return send_server_ident();
}
}).then([this] {
if (unlikely(state != state_t::REPLACING)) {
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
- } else if (state >= state_t::CONNECTING && state < state_t::CLOSING) {
+ } else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
messenger.unregister_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));