return client_connect();
} else {
ceph_assert(connect_seq > 0);
- // TODO: lossless policy
- ceph_assert(false);
return client_reconnect();
}
}).then([this] (next_step_t next) {
});
}
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::reuse_connection(
+ ProtocolV2* existing_proto, bool do_reset,
+ bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
+{
+ existing_proto->trigger_replacing(reconnect,
+ do_reset,
+ std::move(socket),
+ std::move(auth_meta),
+ std::move(session_stream_handlers),
+ peer_global_seq,
+ client_cookie,
+ conn.get_peer_name(),
+ connection_features,
+ conn_seq,
+ msg_seq);
+ // close this connection because all the necessary information is delivered
+ // to the exisiting connection, and jump to error handling code to abort the
+ // current state.
+ abort_in_close(*this);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+}
+
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
{
// handle_existing_connection() logic
- logger().trace("{} {}: {}", conn, __func__, *existing_conn);
-
ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
existing_conn->protocol.get());
ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ existing_conn, get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
if (existing_proto->state == state_t::CLOSING) {
logger().warn("{} existing connection {} already closed.", conn, *existing_conn);
" in favor of existing connection {}",
conn, peer_global_seq,
existing_proto->peer_global_seq, *existing_conn);
- dispatch_reset();
- abort_in_close(*this);
+ abort_in_fault();
}
if (existing_conn->policy.lossy) {
return send_server_ident();
}
- // TODO: lossless policy
- ceph_assert(false);
+ if (existing_proto->server_cookie != 0) {
+ if (existing_proto->client_cookie != client_cookie) {
+ // Found previous session
+ // peer has reset and we're going to reuse the existing connection
+ // by replacing the socket
+ logger().warn("{} found previous session with existing {}, peer must have reset",
+ conn, *existing_conn);
+ return reuse_connection(existing_proto, conn.policy.resetcheck);
+ } else {
+ // session establishment interrupted between client_ident and server_ident,
+ // continuing...
+ logger().warn("{} found previous session with existing {}, continuing session establishment",
+ conn, *existing_conn);
+ return reuse_connection(existing_proto);
+ }
+ } else {
+ // Looks like a connection race: server and client are both connecting to
+ // each other at the same time.
+ if (existing_proto->client_cookie != client_cookie) {
+ if (conn.peer_addr < messenger.get_myaddr() || existing_conn->policy.server) {
+ // this connection wins
+ logger().warn("{} connection race detected and win, reusing existing {}",
+ conn, *existing_conn);
+ return reuse_connection(existing_proto);
+ } else {
+ // the existing connection wins
+ logger().warn("{} connection race detected and lose to existing {}",
+ conn, *existing_conn);
+ existing_conn->keepalive();
+ return send_wait();
+ }
+ } else {
+ logger().warn("{} found previous client session with existing {}, continuing session establishment");
+ return reuse_connection(existing_proto);
+ }
+ }
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
existing_conn->protocol.get());
ceph_assert(existing_proto);
+ logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
+ " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
+ conn, global_seq, peer_global_seq, reconnect.connect_seq(),
+ reconnect.client_cookie(), reconnect.server_cookie(),
+ existing_conn,
+ get_state_name(existing_proto->state),
+ existing_proto->global_seq,
+ existing_proto->peer_global_seq,
+ existing_proto->connect_seq,
+ existing_proto->client_cookie,
+ existing_proto->server_cookie);
if (existing_proto->state == state_t::REPLACING) {
logger().warn("{} server_reconnect: racing replace happened while "
if (existing_proto->client_cookie != reconnect.client_cookie()) {
logger().warn("{} server_reconnect:"
" client_cookie mismatch with existing connection {},"
- " cc={} rcc={}. I must have reseted, reseting client.",
+ " cc={} rcc={}. I must have reset, reseting client.",
conn, *existing_conn,
existing_proto->client_cookie, reconnect.client_cookie());
return send_reset(conn.policy.resetcheck);
conn, existing_proto->connect_seq, reconnect.connect_seq(),
*existing_conn);
return send_retry(existing_proto->connect_seq);
- }
-
- if (existing_proto->connect_seq == reconnect.connect_seq()) {
+ } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
// reconnect race: both peers are sending reconnect messages
if (existing_conn->peer_addr > messenger.get_myaddrs().msgr2_addr() &&
!existing_conn->policy.server) {
" replacing existing connection {}"
" socket by this connection's socket",
conn, *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
}
+ } else { // existing_proto->connect_seq < reconnect.connect_seq()
+ logger().warn("{} server_reconnect: stale exsiting connection {},"
+ " replacing", conn, *existing_conn);
+ return reuse_connection(
+ existing_proto, false,
+ true, reconnect.connect_seq(), reconnect.msg_seq());
}
-
- logger().warn("{} server_reconnect: reconnect to exsiting connection {}",
- conn, *existing_conn);
-
- // everything looks good
- existing_proto->connect_seq = reconnect.connect_seq();
- //exproto->message_seq = reconnect.msg_seq();
-
- // TODO: lossless policy
- // return reuse_connection(existing, exproto);
- ceph_assert(false);
});
}
case Tag::CLIENT_IDENT:
return server_connect();
case Tag::SESSION_RECONNECT:
- // TODO: lossless policy
- ceph_assert(false);
return server_reconnect();
default: {
unexpected_tag(tag, conn, "post_server_auth");
// REPLACING state
-seastar::future<> ProtocolV2::send_reconnect_ok()
+void ProtocolV2::trigger_replacing(bool reconnect,
+ bool do_reset,
+ SocketFRef&& new_socket,
+ AuthConnectionMetaRef&& new_auth_meta,
+ ceph::crypto::onwire::rxtx_t new_rxtx,
+ uint64_t new_peer_global_seq,
+ uint64_t new_client_cookie,
+ entity_name_t new_peer_name,
+ uint64_t new_conn_features,
+ uint64_t new_connect_seq,
+ uint64_t new_msg_seq)
{
- // send_reconnect_ok() logic
- // <prepare and send ReconnectOKFrame>
+ trigger_state(state_t::REPLACING, write_state_t::delay, false);
+ if (socket) {
+ socket->shutdown();
+ }
+ seastar::with_gate(pending_dispatch,
+ [this,
+ reconnect,
+ do_reset,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ return wait_write_exit().then([this, do_reset] {
+ if (do_reset) {
+ reset_session(true);
+ }
+ protocol_timer.cancel();
+ return std::move(execution_done);
+ }).then([this,
+ reconnect,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ if (state != state_t::REPLACING) {
+ return new_socket->close().then([sock = std::move(new_socket)] {
+ abort_protocol();
+ });
+ }
- return seastar::now();
+ if (socket) {
+ with_gate(pending_dispatch, [this, sock = std::move(socket)] () mutable {
+ return sock->close().then([sock = std::move(sock)] {});
+ });
+ }
+ socket = std::move(new_socket);
+ auth_meta = std::move(new_auth_meta);
+ session_stream_handlers = std::move(new_rxtx);
+ record_io = false;
+ peer_global_seq = new_peer_global_seq;
+
+ if (reconnect) {
+ connect_seq = new_connect_seq;
+ // send_reconnect_ok() logic
+ requeue_up_to(new_msg_seq);
+ auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
+ return write_frame(reconnect_ok);
+ } else {
+ client_cookie = new_client_cookie;
+ conn.set_peer_name(new_peer_name);
+ connection_features = new_conn_features;
+ return send_server_ident().then([] (next_step_t next) {
+ assert(next == next_step_t::ready);
+ });
+ }
+ }).then([this] {
+ logger().info("{} reconnected(replaced): gs={}, pgs={}, cs={},"
+ " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq, conn.out_seq);
+ execute_ready();
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().debug("{} trigger_replacing(): got exception {} at state {}",
+ conn, eptr);
+ if (state != state_t::REPLACING) {
+ assert(state == state_t::CLOSING);
+ logger().debug("{} execute_replacing() protocol aborted", conn);
+ return;
+ }
+ fault(true);
+ });
+ });
}
// READY state
protocol_timer.cancel();
- if (!socket) {
- ceph_assert(state == state_t::CONNECTING);
- }
-
trigger_state(state_t::CLOSING, write_state_t::drop, false);
}