From ea9e0056bd62d7eb4ef61a6f4b69d941accddc4d Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 13 Mar 2019 22:48:12 +0800 Subject: [PATCH] crimson/net: protocolv2 handshake frame exchanges Signed-off-by: Yingxin Cheng --- src/crimson/net/Connection.h | 1 + src/crimson/net/ProtocolV2.cc | 633 ++++++++++++++++++++++++++++------ src/crimson/net/ProtocolV2.h | 6 + 3 files changed, 534 insertions(+), 106 deletions(-) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index d69d32ab0a1..e644447297c 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -28,6 +28,7 @@ class Connection : public seastar::enable_shared_from_this { protected: entity_addr_t peer_addr; peer_type_t peer_type = -1; + int64_t peer_id = -1; public: uint64_t peer_global_id = 0; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 505a67f8b3b..7a12511b1fd 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -4,6 +4,7 @@ #include "ProtocolV2.h" #include "include/msgr.h" +#include "include/random.h" #include "crimson/auth/AuthClient.h" #include "crimson/auth/AuthServer.h" @@ -323,6 +324,33 @@ void ProtocolV2::dispatch_reset() }); } +void ProtocolV2::reset_session(bool full) +{ + if (full) { + server_cookie = 0; + connect_seq = 0; + conn.in_seq = 0; + } else { + conn.out_seq = 0; + conn.in_seq = 0; + client_cookie = 0; + server_cookie = 0; + connect_seq = 0; + peer_global_seq = 0; + // TODO: + // discard_out_queue(); + // message_seq = 0; + // ack_left = 0; + seastar::with_gate(pending_dispatch, [this] { + return dispatcher.ms_handle_remote_reset( + seastar::static_pointer_cast(conn.shared_from_this())) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_remote_reset caust exception: {}", conn, eptr); + }); + }); + } +} + seastar::future ProtocolV2::banner_exchange() { // 1. prepare and send banner @@ -522,62 +550,205 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods seastar::future ProtocolV2::process_wait() { -//return read_frame_payload() -//.then([this] (bufferlist payload) { -// handle_wait() logic -// return false; -//}); - return seastar::make_ready_future(false); + return read_frame_payload() + .then([this] { + // handle_wait() logic + logger().debug("{} received WAIT (connection race)", conn); + WaitFrame::Decode(rx_segments_data.back()); + return false; + }); } seastar::future ProtocolV2::client_connect() { // send_client_ident() logic - // - // return read_main_preamble() - // .then([this] (Tag tag) { - // switch (tag) { - // case Tag::IDENT_MISSING_FEATURES: - // abort_in_fault(); - // case Tag::WAIT: - // return process_wait(); - // case Tag::SERVER_IDENT: - // - return seastar::make_ready_future(true); - // default: { - // unexpected_tag(tag, conn, "post_client_connect"); - // } - // } - // }); + if (!conn.policy.lossy && !client_cookie) { + client_cookie = ceph::util::generate_random_number(1, -1ll); + } + + // TODO: get socket address and learn(not supported by seastar) + entity_addr_t a; + a.u.sa.sa_family = AF_INET; + a.set_type(entity_addr_t::TYPE_MSGR2); + logger().debug("{} learn from addr {}", conn, a); + return messenger.learned_addr(a) + .then([this] { + uint64_t flags = 0; + if (conn.policy.lossy) { + flags |= CEPH_MSG_CONNECT_LOSSY; + } + + auto client_ident = ClientIdentFrame::Encode( + messenger.get_myaddrs(), + conn.target_addr, + messenger.get_myname().num(), + global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, flags, + client_cookie); + + logger().debug("{} sending identification: addrs={} target={} gid={}" + " global_seq={} features_supported={} features_required={}" + " flags={} cookie={}", + conn, messenger.get_myaddrs(), conn.target_addr, + messenger.get_myname().num(), global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, client_cookie); + return write_frame(client_ident); + }).then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + switch (tag) { + case Tag::IDENT_MISSING_FEATURES: + return read_frame_payload() + .then([this] { + // handle_ident_missing_features() logic + auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back()); + logger().error("{} client does not support all server features: {}", + conn, ident_missing.features()); + abort_in_fault(); + // won't be executed + return false; + }); + case Tag::WAIT: + return process_wait(); + case Tag::SERVER_IDENT: + return read_frame_payload() + .then([this] { + // handle_server_ident() logic + auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back()); + logger().debug("{} received server identification:" + " addrs={} gid={} global_seq={}" + " features_supported={} features_required={}" + " flags={} cookie={}", + conn, + server_ident.addrs(), server_ident.gid(), + server_ident.global_seq(), + server_ident.supported_features(), + server_ident.required_features(), + server_ident.flags(), server_ident.cookie()); + + // is this who we intended to talk to? + // be a bit forgiving here, since we may be connecting based on addresses parsed out + // of mon_host or something. + if (!server_ident.addrs().contains(conn.target_addr)) { + logger().warn("{} peer identifies as {}, does not include {}", + conn, server_ident.addrs(), conn.target_addr); + abort_in_fault(); + } + + server_cookie = server_ident.cookie(); + + // TODO: change peer_addr to entity_addrvec_t + ceph_assert(conn.peer_addr == server_ident.addrs().front()); + peer_name = entity_name_t(conn.get_peer_type(), server_ident.gid()); + conn.set_features(server_ident.supported_features() & + conn.policy.features_supported); + peer_global_seq = server_ident.global_seq(); + + // TODO: lossless policy + ceph_assert(server_ident.flags() & CEPH_MSG_CONNECT_LOSSY); + conn.policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY; + // TODO: backoff = utime_t(); + logger().debug("{} connect success {}, lossy={}, features={}", + conn, connect_seq, conn.policy.lossy, conn.features); + + return dispatcher.ms_handle_connect( + seastar::static_pointer_cast(conn.shared_from_this())) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_connect caust exception: {}", conn, eptr); + }); + }).then([this] { + return true; + }); + default: { + unexpected_tag(tag, conn, "post_client_connect"); + // won't be executed + return seastar::make_ready_future(false); + } + } + }); } seastar::future ProtocolV2::client_reconnect() { // send_reconnect() logic - // - // - // return read_main_preamble() - // .then([this] (Tag tag) { - // switch (tag) { - // case Tag::SESSION_RETRY_GLOBAL: - // - // return client_reconnect(); - // case Tag::SESSION_RETRY: - // - // return client_reconnect(); - // case Tag::SESSION_RESET: - // - // return client_connect(); - // case Tag::WAIT: - // return process_wait(); - // case Tag::SESSION_RECONNECT_OK: - // - return seastar::make_ready_future(true); - // default: { - // unexpected_tag(tag, conn, "post_client_reconnect"); - // } - // } - // }); + auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(), + client_cookie, + server_cookie, + global_seq, + connect_seq, + conn.in_seq); + logger().debug("{} reconnect to session: client_cookie={}" + " server_cookie={} gs={} cs={} ms={}", + conn, client_cookie, server_cookie, + global_seq, connect_seq, conn.in_seq); + return write_frame(reconnect) + .then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + switch (tag) { + case Tag::SESSION_RETRY_GLOBAL: + return read_frame_payload() + .then([this] { + // handle_session_retry_global() logic + auto retry = RetryGlobalFrame::Decode(rx_segments_data.back()); + global_seq = messenger.get_global_seq(retry.global_seq()); + logger().warn("{} received session retry global " + "global_seq={}, choose new gs={}", + conn, retry.global_seq(), global_seq); + return client_reconnect(); + }); + case Tag::SESSION_RETRY: + return read_frame_payload() + .then([this] { + // handle_session_retry() logic + auto retry = RetryFrame::Decode(rx_segments_data.back()); + connect_seq = retry.connect_seq() + 1; + logger().warn("{} received session retry connect_seq={}, inc to cs={}", + conn, retry.connect_seq(), connect_seq); + return client_reconnect(); + }); + case Tag::SESSION_RESET: + return read_frame_payload() + .then([this] { + // handle_session_reset() logic + auto reset = ResetFrame::Decode(rx_segments_data.back()); + logger().warn("{} received session reset full={}", reset.full()); + reset_session(reset.full()); + return client_connect(); + }); + case Tag::WAIT: + return process_wait(); + case Tag::SESSION_RECONNECT_OK: + return read_frame_payload() + .then([this] { + // handle_reconnect_ok() logic + auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back()); + logger().debug("{} received reconnect ok:" + "sms={}, lossy={}, features={}", + conn, reconnect_ok.msg_seq(), + connect_seq, conn.policy.lossy, conn.features); + // TODO + // discard_requeued_up_to() + // backoff = utime_t(); + return dispatcher.ms_handle_connect( + seastar::static_pointer_cast( + conn.shared_from_this())) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_connect caust exception: {}", conn, eptr); + }); + }).then([this] { + return true; + }); + default: { + unexpected_tag(tag, conn, "post_client_reconnect"); + // won't be executed + return seastar::make_ready_future(false); + } + } + }); } void ProtocolV2::execute_connecting() @@ -609,9 +780,11 @@ void ProtocolV2::execute_connecting() }).then([this] { return client_auth(); }).then([this] { - if (1) { // TODO check connect or reconnect + if (!server_cookie) { + ceph_assert(connect_seq == 0); return client_connect(); } else { + ceph_assert(connect_seq > 0); // TODO: lossless policy ceph_assert(false); return client_reconnect(); @@ -725,85 +898,286 @@ seastar::future<> ProtocolV2::server_auth() seastar::future ProtocolV2::send_wait() { - // - // - return seastar::make_ready_future(false); + auto wait = WaitFrame::Encode(); + return write_frame(wait) + .then([this] { + return false; + }); } seastar::future ProtocolV2::handle_existing_connection(SocketConnectionRef existing) { + // handle_existing_connection() logic + logger().debug("{} {}: {}", conn, __func__, *existing); + + ProtocolV2 *exproto = dynamic_cast(existing->protocol.get()); + ceph_assert(exproto); + + if (exproto->state == state_t::CLOSING) { + logger().warn("{} existing {} already closed.", conn, *existing); + return send_server_ident() + .then([this] { + return true; + }); + } + + if (exproto->state == state_t::REPLACING) { + logger().warn("{} existing racing replace happened while replacing: {}", + conn, *existing); + return send_wait(); + } + + if (exproto->peer_global_seq > peer_global_seq) { + logger().warn("{} this is a stale connection, peer_global_seq={}" + "existing->peer_global_seq={} close this connection", + conn, peer_global_seq, exproto->peer_global_seq); + dispatch_reset(); + abort_in_close(); + } + + if (existing->policy.lossy) { + // existing connection can be thrown out in favor of this one + logger().warn("{} existing={} is a lossy channel. Close existing in favor of" + " this connection", conn, *existing); + exproto->dispatch_reset(); + exproto->close(); + return send_server_ident() + .then([this] { + return true; + }); + } + // TODO: lossless policy ceph_assert(false); } seastar::future ProtocolV2::server_connect() { - // handle_client_ident() logic - // - // - // - // - // return seastar::make_ready_future(false); - // - // return handle_existing_connection(existing); - // - return send_server_ident() + return read_frame_payload() + .then([this] { + // handle_client_ident() logic + auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back()); + logger().debug("{} received client identification: addrs={} target={}" + " gid={} global_seq={} features_supported={}" + " features_required={} flags={} cookie={}", + conn, client_ident.addrs(), client_ident.target_addr(), + client_ident.gid(), client_ident.global_seq(), + client_ident.supported_features(), + client_ident.required_features(), + client_ident.flags(), client_ident.cookie()); + + if (client_ident.addrs().empty() || + client_ident.addrs().front() == entity_addr_t()) { + logger().error("{} oops, client_ident.addrs() is empty", conn); + abort_in_fault(); + } + if (!messenger.get_myaddrs().contains(client_ident.target_addr())) { + logger().error("{} peer is trying to reach {} which is not us ({})", + conn, client_ident.target_addr(), messenger.get_myaddrs()); + abort_in_fault(); + } + // TODO: change peer_addr to entity_addrvec_t + entity_addr_t paddr = client_ident.addrs().front(); + conn.peer_addr = conn.target_addr; + conn.peer_addr.set_type(paddr.get_type()); + conn.peer_addr.set_port(paddr.get_port()); + conn.peer_addr.set_nonce(paddr.get_nonce()); + logger().debug("{} got paddr={}, conn.peer_addr={}", conn, paddr, conn.peer_addr); + conn.target_addr = conn.peer_addr; + + peer_name = entity_name_t(conn.get_peer_type(), client_ident.gid()); + conn.peer_id = client_ident.gid(); + client_cookie = client_ident.cookie(); + + uint64_t feat_missing = + (conn.policy.features_required | msgr2_required) & + ~(uint64_t)client_ident.supported_features(); + if (feat_missing) { + logger().warn("{} peer missing required features {}", conn, feat_missing); + auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); + return write_frame(ident_missing_features) .then([this] { - // goto ready - return true; + return false; }); + } + connection_features = + client_ident.supported_features() & conn.policy.features_supported; + + peer_global_seq = client_ident.global_seq(); + + // Looks good so far, let's check if there is already an existing connection + // to this peer. + + SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr); + + if (existing) { + if (existing->protocol->proto_type != 2) { + logger().warn("{} existing {} proto version is {}, close", + conn, *existing, existing->protocol->proto_type); + // should unregister the existing from msgr atomically + existing->close(); + } else { + return handle_existing_connection(existing); + } + } + + // if everything is OK reply with server identification + return send_server_ident() + .then([this] { + // goto ready + return true; + }); + }); } seastar::future ProtocolV2::read_reconnect() { - // return read_main_preamble() - // .then([this] (Tag tag) { - // expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect"); - return server_reconnect(); - // }); + return read_main_preamble() + .then([this] (Tag tag) { + expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect"); + return server_reconnect(); + }); } seastar::future ProtocolV2::send_retry(uint64_t connect_seq) { - // - // - return read_reconnect(); + auto retry = RetryFrame::Encode(connect_seq); + return write_frame(retry) + .then([this] { + return read_reconnect(); + }); } seastar::future ProtocolV2::send_retry_global(uint64_t global_seq) { - // - // - return read_reconnect(); + auto retry = RetryGlobalFrame::Encode(global_seq); + return write_frame(retry) + .then([this] { + return read_reconnect(); + }); } seastar::future ProtocolV2::send_reset(bool full) { - // - // - // return read_main_preamble() - // .then([this] (Tag tag) { - // expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset"); - return server_connect(); - // }); + auto reset = ResetFrame::Encode(full); + return write_frame(reset) + .then([this] { + return read_main_preamble(); + }).then([this] (Tag tag) { + expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset"); + return server_connect(); + }); } seastar::future ProtocolV2::server_reconnect() { - // handle_reconnect() logic - // - // - return send_reset(0); - // - // return send_retry_global(); - // - // return send_wait(); - // - // return send_retry(); - // - // return send_reset(); - // TODO: lossless policy - // return reuse_connection(existing, exproto); + return read_frame_payload() + .then([this] { + // handle_reconnect() logic + auto reconnect = ReconnectFrame::Decode(rx_segments_data.back()); + + logger().debug("{} received reconnect: client_cookie={} server_cookie={}" + " gs={} cs={} ms={}", + conn, reconnect.client_cookie(), reconnect.server_cookie(), + reconnect.global_seq(), reconnect.connect_seq(), + reconnect.msg_seq()); + + // can peer_addrs be changed on-the-fly? + if (conn.peer_addr != reconnect.addrs().front()) { + logger().error("{} peer identifies as {}, while conn.peer_addr={}", + conn, reconnect.addrs().front(), conn.peer_addr); + ceph_assert(false); + } + // TODO: change peer_addr to entity_addrvec_t + ceph_assert(conn.peer_addr == conn.target_addr); + peer_global_seq = reconnect.global_seq(); + + SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr); + + if (!existing) { + // there is no existing connection therefore cannot reconnect to previous + // session + logger().error("{} server_reconnect: no existing connection," + " reseting client", conn); + return send_reset(true); + } + + if (existing->protocol->proto_type != 2) { + logger().warn("{} server_reconnect: existing {} proto version is {}," + "close existing and resetting client.", conn, *existing); + existing->close(); + return send_reset(true); + } + + ProtocolV2 *exproto = dynamic_cast(existing->protocol.get()); + ceph_assert(exproto); + + if (exproto->state == state_t::REPLACING) { + logger().warn("{} server_reconnect: existing racing replace happened while " + " replacing, retry_global. existing={}", conn, *existing); + return send_retry_global(exproto->peer_global_seq); + } + + if (exproto->client_cookie != reconnect.client_cookie()) { + logger().warn("{} server_reconnect: existing={} client cookie mismatch," + " I must have reseted: cc={} rcc={}, reseting client.", + conn, *existing, exproto->client_cookie, reconnect.client_cookie()); + return send_reset(conn.policy.resetcheck); + } else if (exproto->server_cookie == 0) { + // this happens when: + // - a connects to b + // - a sends client_ident + // - b gets client_ident, sends server_ident and sets cookie X + // - connection fault + // - b reconnects to a with cookie X, connect_seq=1 + // - a has cookie==0 + logger().warn("{} server_reconnect: I was a client and didn't received the" + " server_ident. Asking peer to resume session establishment", + conn); + return send_reset(false); + } + + if (exproto->peer_global_seq > reconnect.global_seq()) { + logger().debug("{} server_reconnect: stale global_seq: sgs={} cgs={}," + " ask client to retry global", + conn, exproto->peer_global_seq, reconnect.global_seq()); + return send_retry_global(exproto->peer_global_seq); + } + + if (exproto->connect_seq > reconnect.connect_seq()) { + logger().debug("{} server_reconnect: stale connect_seq scs={} ccs={}," + " ask client to retry", + conn, exproto->connect_seq, reconnect.connect_seq()); + return send_retry(exproto->connect_seq); + } + + if (exproto->connect_seq == reconnect.connect_seq()) { + // reconnect race: both peers are sending reconnect messages + if (existing->peer_addr > messenger.get_myaddrs().msgr2_addr() && + !existing->policy.server) { + // the existing connection wins + logger().warn("{} server_reconnect: reconnect race detected," + " this connection loses to existing={}," + " ask client to wait", conn, *existing); + return send_wait(); + } else { + // this connection wins + logger().warn("{} server_reconnect: reconnect race detected," + " replacing existing={} socket by this connection's socket", + conn, *existing); + } + } + + logger().warn("{} server_reconnect: reconnect to exsiting={}", conn, *existing); + + // everything looks good + exproto->connect_seq = reconnect.connect_seq(); + //exproto->message_seq = reconnect.msg_seq(); + + // TODO: lossless policy + // return reuse_connection(existing, exproto); + ceph_assert(false); + }); } void ProtocolV2::execute_accepting() @@ -825,17 +1199,21 @@ void ProtocolV2::execute_accepting() conn.policy.standby, conn.policy.resetcheck); return server_auth(); }).then([this] { - // return read_main_preamble() - //}).then([this] (Tag tag) { - // switch (tag) { - // case Tag::CLIENT_IDENT: + return read_main_preamble(); + }).then([this] (Tag tag) { + switch (tag) { + case Tag::CLIENT_IDENT: return server_connect(); - // case Tag::SESSION_RECONNECT: - // return server_reconnect(); - // default: { - // unexpected_tag(tag, conn, "post_server_auth"); - // } - // } + case Tag::SESSION_RECONNECT: + // TODO: lossless policy + ceph_assert(false); + return server_reconnect(); + default: { + unexpected_tag(tag, conn, "post_server_auth"); + // won't be executed + return seastar::make_ready_future(false); + } + } }).then([this] (bool proceed_or_wait) { if (proceed_or_wait) { messenger.register_conn( @@ -896,9 +1274,52 @@ seastar::future<> ProtocolV2::finish_auth() seastar::future<> ProtocolV2::send_server_ident() { // send_server_ident() logic - // - return seastar::now(); + // this is required for the case when this connection is being replaced + // TODO + // out_seq = discard_requeued_up_to(out_seq, 0); + conn.in_seq = 0; + + if (!conn.policy.lossy) { + server_cookie = ceph::util::generate_random_number(1, -1ll); + } + + uint64_t flags = 0; + if (conn.policy.lossy) { + flags = flags | CEPH_MSG_CONNECT_LOSSY; + } + + // refered to async-conn v2: not assign gs to global_seq + uint64_t gs = messenger.get_global_seq(); + auto server_ident = ServerIdentFrame::Encode( + messenger.get_myaddrs(), + messenger.get_myname().num(), + gs, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, + server_cookie); + + logger().debug("{} sending server identification: addrs={} gid={}" + " global_seq={} features_supported={} features_required={}" + " flags={} cookie={}", + conn, messenger.get_myaddrs(), messenger.get_myname().num(), + gs, conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, server_cookie); + + conn.set_features(connection_features); + + // notify + seastar::with_gate(pending_dispatch, [this] { + return dispatcher.ms_handle_accept( + seastar::static_pointer_cast(conn.shared_from_this())) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_accept caust exception: {}", conn, eptr); + }); + }); + + return write_frame(server_ident); } // REPLACING state diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 75a49c6cd30..08f935470fd 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -62,10 +62,15 @@ class ProtocolV2 final : public Protocol { void trigger_state(state_t state, write_state_t write_state, bool reentrant); + entity_name_t peer_name; uint64_t connection_features = 0; uint64_t peer_required_features = 0; + uint64_t client_cookie = 0; + uint64_t server_cookie = 0; uint64_t global_seq = 0; + uint64_t peer_global_seq = 0; + uint64_t connect_seq = 0; // TODO: Frame related implementations, probably to a separate class. private: @@ -94,6 +99,7 @@ class ProtocolV2 final : public Protocol { private: seastar::future<> fault(); void dispatch_reset(); + void reset_session(bool full); seastar::future banner_exchange(); // CONNECTING (client) -- 2.39.5