From c41c44b2e9d6f3e1b47727917788296524e30596 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 8 Aug 2019 17:09:34 +0800 Subject: [PATCH] crimson/net: REPLACING state to resolve racing and retain session Signed-off-by: Yingxin Cheng --- src/crimson/net/Connection.h | 1 + src/crimson/net/ProtocolV2.cc | 212 ++++++++++++++++++++++++++++------ src/crimson/net/ProtocolV2.h | 23 +++- 3 files changed, 200 insertions(+), 36 deletions(-) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index e4408c19bc1..0f8fb12204d 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -35,6 +35,7 @@ class Connection : public seastar::enable_shared_from_this { void set_peer_type(entity_type_t peer_type) { peer_name._type = peer_type; } void set_peer_id(int64_t peer_id) { peer_name._num = peer_id; } + void set_peer_name(entity_name_t name) { peer_name = name; } public: uint64_t peer_global_id = 0; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 892f6a2afbd..2cb7b1352c3 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -935,8 +935,6 @@ void ProtocolV2::execute_connecting() return client_connect(); } else { ceph_assert(connect_seq > 0); - // TODO: lossless policy - ceph_assert(false); return client_reconnect(); } }).then([this] (next_step_t next) { @@ -1084,15 +1082,46 @@ ProtocolV2::send_wait() }); } +seastar::future +ProtocolV2::reuse_connection( + ProtocolV2* existing_proto, bool do_reset, + bool reconnect, uint64_t conn_seq, uint64_t msg_seq) +{ + existing_proto->trigger_replacing(reconnect, + do_reset, + std::move(socket), + std::move(auth_meta), + std::move(session_stream_handlers), + peer_global_seq, + client_cookie, + conn.get_peer_name(), + connection_features, + conn_seq, + msg_seq); + // close this connection because all the necessary information is delivered + // to the exisiting connection, and jump to error handling code to abort the + // current state. + abort_in_close(*this); + return seastar::make_ready_future(next_step_t::none); +} + seastar::future ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) { // handle_existing_connection() logic - logger().trace("{} {}: {}", conn, __func__, *existing_conn); - ProtocolV2 *existing_proto = dynamic_cast( existing_conn->protocol.get()); ceph_assert(existing_proto); + logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting," + " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, + existing_conn, get_state_name(existing_proto->state), + existing_proto->global_seq, + existing_proto->peer_global_seq, + existing_proto->connect_seq, + existing_proto->client_cookie, + existing_proto->server_cookie); if (existing_proto->state == state_t::CLOSING) { logger().warn("{} existing connection {} already closed.", conn, *existing_conn); @@ -1111,8 +1140,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) " in favor of existing connection {}", conn, peer_global_seq, existing_proto->peer_global_seq, *existing_conn); - dispatch_reset(); - abort_in_close(*this); + abort_in_fault(); } if (existing_conn->policy.lossy) { @@ -1124,8 +1152,42 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) return send_server_ident(); } - // TODO: lossless policy - ceph_assert(false); + if (existing_proto->server_cookie != 0) { + if (existing_proto->client_cookie != client_cookie) { + // Found previous session + // peer has reset and we're going to reuse the existing connection + // by replacing the socket + logger().warn("{} found previous session with existing {}, peer must have reset", + conn, *existing_conn); + return reuse_connection(existing_proto, conn.policy.resetcheck); + } else { + // session establishment interrupted between client_ident and server_ident, + // continuing... + logger().warn("{} found previous session with existing {}, continuing session establishment", + conn, *existing_conn); + return reuse_connection(existing_proto); + } + } else { + // Looks like a connection race: server and client are both connecting to + // each other at the same time. + if (existing_proto->client_cookie != client_cookie) { + if (conn.peer_addr < messenger.get_myaddr() || existing_conn->policy.server) { + // this connection wins + logger().warn("{} connection race detected and win, reusing existing {}", + conn, *existing_conn); + return reuse_connection(existing_proto); + } else { + // the existing connection wins + logger().warn("{} connection race detected and lose to existing {}", + conn, *existing_conn); + existing_conn->keepalive(); + return send_wait(); + } + } else { + logger().warn("{} found previous client session with existing {}, continuing session establishment"); + return reuse_connection(existing_proto); + } + } } seastar::future @@ -1306,6 +1368,17 @@ ProtocolV2::server_reconnect() ProtocolV2 *existing_proto = dynamic_cast( existing_conn->protocol.get()); ceph_assert(existing_proto); + logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting," + " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})", + conn, global_seq, peer_global_seq, reconnect.connect_seq(), + reconnect.client_cookie(), reconnect.server_cookie(), + existing_conn, + get_state_name(existing_proto->state), + existing_proto->global_seq, + existing_proto->peer_global_seq, + existing_proto->connect_seq, + existing_proto->client_cookie, + existing_proto->server_cookie); if (existing_proto->state == state_t::REPLACING) { logger().warn("{} server_reconnect: racing replace happened while " @@ -1317,7 +1390,7 @@ ProtocolV2::server_reconnect() if (existing_proto->client_cookie != reconnect.client_cookie()) { logger().warn("{} server_reconnect:" " client_cookie mismatch with existing connection {}," - " cc={} rcc={}. I must have reseted, reseting client.", + " cc={} rcc={}. I must have reset, reseting client.", conn, *existing_conn, existing_proto->client_cookie, reconnect.client_cookie()); return send_reset(conn.policy.resetcheck); @@ -1352,9 +1425,7 @@ ProtocolV2::server_reconnect() conn, existing_proto->connect_seq, reconnect.connect_seq(), *existing_conn); return send_retry(existing_proto->connect_seq); - } - - if (existing_proto->connect_seq == reconnect.connect_seq()) { + } else if (existing_proto->connect_seq == reconnect.connect_seq()) { // reconnect race: both peers are sending reconnect messages if (existing_conn->peer_addr > messenger.get_myaddrs().msgr2_addr() && !existing_conn->policy.server) { @@ -1369,19 +1440,17 @@ ProtocolV2::server_reconnect() " replacing existing connection {}" " socket by this connection's socket", conn, *existing_conn); + return reuse_connection( + existing_proto, false, + true, reconnect.connect_seq(), reconnect.msg_seq()); } + } else { // existing_proto->connect_seq < reconnect.connect_seq() + logger().warn("{} server_reconnect: stale exsiting connection {}," + " replacing", conn, *existing_conn); + return reuse_connection( + existing_proto, false, + true, reconnect.connect_seq(), reconnect.msg_seq()); } - - logger().warn("{} server_reconnect: reconnect to exsiting connection {}", - conn, *existing_conn); - - // everything looks good - existing_proto->connect_seq = reconnect.connect_seq(); - //exproto->message_seq = reconnect.msg_seq(); - - // TODO: lossless policy - // return reuse_connection(existing, exproto); - ceph_assert(false); }); } @@ -1421,8 +1490,6 @@ void ProtocolV2::execute_accepting() case Tag::CLIENT_IDENT: return server_connect(); case Tag::SESSION_RECONNECT: - // TODO: lossless policy - ceph_assert(false); return server_reconnect(); default: { unexpected_tag(tag, conn, "post_server_auth"); @@ -1557,12 +1624,95 @@ ProtocolV2::send_server_ident() // REPLACING state -seastar::future<> ProtocolV2::send_reconnect_ok() +void ProtocolV2::trigger_replacing(bool reconnect, + bool do_reset, + SocketFRef&& new_socket, + AuthConnectionMetaRef&& new_auth_meta, + ceph::crypto::onwire::rxtx_t new_rxtx, + uint64_t new_peer_global_seq, + uint64_t new_client_cookie, + entity_name_t new_peer_name, + uint64_t new_conn_features, + uint64_t new_connect_seq, + uint64_t new_msg_seq) { - // send_reconnect_ok() logic - // + trigger_state(state_t::REPLACING, write_state_t::delay, false); + if (socket) { + socket->shutdown(); + } + seastar::with_gate(pending_dispatch, + [this, + reconnect, + do_reset, + new_socket = std::move(new_socket), + new_auth_meta = std::move(new_auth_meta), + new_rxtx = std::move(new_rxtx), + new_client_cookie, new_peer_name, + new_conn_features, new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { + return wait_write_exit().then([this, do_reset] { + if (do_reset) { + reset_session(true); + } + protocol_timer.cancel(); + return std::move(execution_done); + }).then([this, + reconnect, + new_socket = std::move(new_socket), + new_auth_meta = std::move(new_auth_meta), + new_rxtx = std::move(new_rxtx), + new_client_cookie, new_peer_name, + new_conn_features, new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { + if (state != state_t::REPLACING) { + return new_socket->close().then([sock = std::move(new_socket)] { + abort_protocol(); + }); + } - return seastar::now(); + if (socket) { + with_gate(pending_dispatch, [this, sock = std::move(socket)] () mutable { + return sock->close().then([sock = std::move(sock)] {}); + }); + } + socket = std::move(new_socket); + auth_meta = std::move(new_auth_meta); + session_stream_handlers = std::move(new_rxtx); + record_io = false; + peer_global_seq = new_peer_global_seq; + + if (reconnect) { + connect_seq = new_connect_seq; + // send_reconnect_ok() logic + requeue_up_to(new_msg_seq); + auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq); + logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq); + return write_frame(reconnect_ok); + } else { + client_cookie = new_client_cookie; + conn.set_peer_name(new_peer_name); + connection_features = new_conn_features; + return send_server_ident().then([] (next_step_t next) { + assert(next == next_step_t::ready); + }); + } + }).then([this] { + logger().info("{} reconnected(replaced): gs={}, pgs={}, cs={}," + " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}", + conn, global_seq, peer_global_seq, connect_seq, + client_cookie, server_cookie, conn.in_seq, conn.out_seq); + execute_ready(); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().debug("{} trigger_replacing(): got exception {} at state {}", + conn, eptr); + if (state != state_t::REPLACING) { + assert(state == state_t::CLOSING); + logger().debug("{} execute_replacing() protocol aborted", conn); + return; + } + fault(true); + }); + }); } // READY state @@ -1876,10 +2026,6 @@ void ProtocolV2::trigger_close() protocol_timer.cancel(); - if (!socket) { - ceph_assert(state == state_t::CONNECTING); - } - trigger_state(state_t::CLOSING, write_state_t::drop, false); } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index b8e97ba80bf..ff8973fa968 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -47,7 +47,7 @@ class ProtocolV2 final : public Protocol { READY, STANDBY, WAIT, - REPLACING, // ? + REPLACING, CLOSING }; state_t state = state_t::NONE; @@ -60,7 +60,7 @@ class ProtocolV2 final : public Protocol { "READY", "STANDBY", "WAIT", - "REPLACING", // ? + "REPLACING", "CLOSING"}; return statenames[static_cast(state)]; } @@ -151,6 +151,11 @@ class ProtocolV2 final : public Protocol { seastar::future<> server_auth(); seastar::future send_wait(); + seastar::future reuse_connection(ProtocolV2* existing_proto, + bool do_reset=false, + bool reconnect=false, + uint64_t conn_seq=0, + uint64_t msg_seq=0); seastar::future handle_existing_connection(SocketConnectionRef existing_conn); seastar::future server_connect(); @@ -170,7 +175,19 @@ class ProtocolV2 final : public Protocol { seastar::future send_server_ident(); // REPLACING (server) - seastar::future<> send_reconnect_ok(); + void trigger_replacing(bool reconnect, + bool do_reset, + SocketFRef&& new_socket, + AuthConnectionMetaRef&& new_auth_meta, + ceph::crypto::onwire::rxtx_t new_rxtx, + uint64_t new_peer_global_seq, + // !reconnect + uint64_t new_client_cookie, + entity_name_t new_peer_name, + uint64_t new_conn_features, + // reconnect + uint64_t new_connect_seq, + uint64_t new_msg_seq); // READY seastar::future<> read_message(utime_t throttle_stamp); -- 2.39.5