From: Ricardo Dias Date: Mon, 18 Feb 2019 14:29:28 +0000 (+0000) Subject: msg/async: msgr2: added client and server cookie to protocol X-Git-Tag: v14.1.0~42^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=b564d808fd405f93206dfcfba483ca9f20909acf;p=ceph-ci.git msg/async: msgr2: added client and server cookie to protocol Signed-off-by: Ricardo Dias --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index ce2ef6490b5..678e9f1d9d8 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -325,10 +325,15 @@ struct SignedEncryptedFrame : public PayloadFrame { }; struct ClientIdentFrame - : public SignedEncryptedFrame { + : public SignedEncryptedFrame { // client cookie const ProtocolV2::Tag tag = ProtocolV2::Tag::CLIENT_IDENT; using SignedEncryptedFrame::SignedEncryptedFrame; @@ -339,6 +344,7 @@ struct ClientIdentFrame inline uint64_t &supported_features() { return get_val<4>(); } inline uint64_t &required_features() { return get_val<5>(); } inline uint64_t &flags() { return get_val<6>(); } + inline uint64_t &cookie() { return get_val<7>(); } }; struct ServerIdentFrame @@ -358,36 +364,36 @@ struct ServerIdentFrame }; struct ReconnectFrame - : public SignedEncryptedFrame { + : public SignedEncryptedFrame { // message sequence const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT; using SignedEncryptedFrame::SignedEncryptedFrame; inline entity_addrvec_t &addrs() { return get_val<0>(); } - inline entity_addr_t &target_addr() { return get_val<1>(); } - inline uint64_t &cookie() { return get_val<2>(); } - inline int64_t &gid() { return get_val<3>(); } - inline uint64_t &global_seq() { return get_val<4>(); } - inline uint64_t &connect_seq() { return get_val<5>(); } - inline uint64_t &supported_features() { return get_val<6>(); } - inline uint64_t &required_features() { return get_val<7>(); } - inline uint64_t &msg_seq() { return get_val<8>(); } + inline uint64_t &client_cookie() { return get_val<1>(); } + inline uint64_t &server_cookie() { return get_val<2>(); } + inline uint64_t &global_seq() { return get_val<3>(); } + inline uint64_t &connect_seq() { return get_val<4>(); } + inline uint64_t &msg_seq() { return get_val<5>(); } }; -struct ResetFrame : public Frame { +struct ResetFrame : public SignedEncryptedFrame { const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RESET; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline bool &full() { return get_val<0>(); } }; struct RetryFrame : public SignedEncryptedFrame { const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY; using SignedEncryptedFrame::SignedEncryptedFrame; - uint64_t connect_seq() { return get_val<0>(); } + inline uint64_t &connect_seq() { return get_val<0>(); } }; struct RetryGlobalFrame @@ -467,7 +473,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) temp_buffer(nullptr), state(NONE), peer_required_features(0), - cookie(0), + client_cookie(0), + server_cookie(0), global_seq(0), connect_seq(0), peer_global_seq(0), @@ -537,7 +544,8 @@ void ProtocolV2::reset_session() { out_seq = 0; in_seq = 0; - cookie = 0; + client_cookie = 0; + server_cookie = 0; connect_seq = 0; peer_global_seq = 0; message_seq = 0; @@ -681,13 +689,6 @@ CtPtr ProtocolV2::_fault() { connection->fault(); reset_recv_state(); - if (state == ACCEPTING_SESSION && !reconnecting) { - // if the connection is generating a new session and fails during this - // process, we need to clean the cookie. Otherwise, the connection might - // later try to reconnect to a non-existing session, and the other side - // will force a SESSION_RESET, which will cause a drop of the out_queue. - cookie = 0; - } reconnecting = false; if (connection->policy.standby && out_queue.empty() && !keepalive && @@ -735,6 +736,10 @@ CtPtr ProtocolV2::_fault() { backoff.set_from_double(cct->_conf->ms_max_backoff); } + if (server_cookie) { + connect_seq++; + } + global_seq = messenger->get_global_seq(); state = START_CONNECT; connection->state = AsyncConnection::STATE_CONNECTING; @@ -1050,7 +1055,7 @@ void ProtocolV2::write_event() { connection->write_lock.lock(); if (state == STANDBY && !connection->policy.server && is_queued()) { ldout(cct, 10) << __func__ << " policy.server is false" << dendl; - if (cookie) { // only increment connect_seq if there is a session + if (server_cookie) { // only increment connect_seq if there is a session connect_seq++; } connection->_connect(); @@ -1443,6 +1448,7 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) { case Tag::SERVER_IDENT: case Tag::IDENT_MISSING_FEATURES: case Tag::SESSION_RECONNECT: + case Tag::SESSION_RESET: case Tag::SESSION_RETRY: case Tag::SESSION_RETRY_GLOBAL: case Tag::SESSION_RECONNECT_OK: @@ -1450,8 +1456,6 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) { case Tag::KEEPALIVE2_ACK: case Tag::ACK: return READ(next_payload_len, handle_frame_payload); - case Tag::SESSION_RESET: - return handle_session_reset(); case Tag::WAIT: return handle_wait(); case Tag::MESSAGE: @@ -1503,6 +1507,8 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { return handle_ident_missing_features(buffer, next_payload_len); case Tag::SESSION_RECONNECT: return handle_reconnect(buffer, next_payload_len); + case Tag::SESSION_RESET: + return handle_session_reset(buffer, next_payload_len); case Tag::SESSION_RETRY: return handle_session_retry(buffer, next_payload_len); case Tag::SESSION_RETRY_GLOBAL: @@ -1545,8 +1551,9 @@ CtPtr ProtocolV2::ready() { connection->maybe_start_delay_thread(); state = READY; - ldout(cct, 1) << __func__ << " entity=" << peer_name << " cookie=" << std::hex - << cookie << std::dec << " in_seq=" << in_seq + ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie=" + << std::hex << client_cookie << " server_cookie=" + << server_cookie << std::dec << " in_seq=" << in_seq << " out_seq=" << out_seq << dendl; return CONTINUE(read_frame); @@ -2201,7 +2208,7 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { auth_meta->connection_secret, CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2)); - if (!cookie) { + if (!server_cookie) { ceph_assert(connect_seq == 0); return send_client_ident(); } else { // reconnecting to previous session @@ -2213,6 +2220,10 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { CtPtr ProtocolV2::send_client_ident() { ldout(cct, 20) << __func__ << dendl; + if (!connection->policy.lossy && !client_cookie) { + client_cookie = ceph::util::generate_random_number(1, -1ll); + } + uint64_t flags = 0; if (connection->policy.lossy) { flags |= CEPH_MSG_CONNECT_LOSSY; @@ -2251,11 +2262,11 @@ CtPtr ProtocolV2::send_client_ident() { } ClientIdentFrame client_ident(this, messenger->get_myaddrs(), - connection->target_addr, + connection->target_addr, messenger->get_myname().num(), global_seq, connection->policy.features_supported, connection->policy.features_required | msgr2_required, - flags); + flags, client_cookie); ldout(cct, 5) << __func__ << " sending identification: " << "addrs=" << messenger->get_myaddrs() @@ -2264,8 +2275,9 @@ CtPtr ProtocolV2::send_client_ident() { << " features_supported=" << std::hex << connection->policy.features_supported << " features_required=" - << (connection->policy.features_required | msgr2_required) - << " flags=" << flags << std::dec << dendl; + << (connection->policy.features_required | msgr2_required) + << " flags=" << flags + << " cookie=" << client_cookie << std::dec << dendl; bufferlist &bl = client_ident.get_buffer(); return WRITE(bl, "client ident", read_frame); @@ -2275,20 +2287,18 @@ CtPtr ProtocolV2::send_reconnect() { ldout(cct, 20) << __func__ << dendl; ReconnectFrame reconnect(this, messenger->get_myaddrs(), - connection->target_addr, - cookie, - messenger->get_myname().num(), - global_seq, + client_cookie, + server_cookie, + global_seq, connect_seq, - connection->policy.features_supported, - connection->policy.features_required | msgr2_required, - in_seq); + in_seq); - ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie + ldout(cct, 5) << __func__ << " reconnect to session: client_cookie=" + << std::hex << client_cookie << " server_cookie=" + << server_cookie << std::dec << " gs=" << global_seq << " cs=" << connect_seq << " ms=" << in_seq << dendl; - bufferlist &bl = reconnect.get_buffer(); - return WRITE(bl, "reconnect", read_frame); + return WRITE(reconnect.get_buffer(), "reconnect", read_frame); } CtPtr ProtocolV2::handle_ident_missing_features(char *payload, @@ -2303,11 +2313,20 @@ CtPtr ProtocolV2::handle_ident_missing_features(char *payload, return _fault(); } -CtPtr ProtocolV2::handle_session_reset() { +CtPtr ProtocolV2::handle_session_reset(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << dendl; - ldout(cct, 1) << __func__ << " received session reset" << dendl; - reset_session(); + ResetFrame reset(this, payload, length); + + ldout(cct, 1) << __func__ << " received session reset full=" << reset.full() + << dendl; + if (reset.full()) { + reset_session(); + } else { + server_cookie = 0; + connect_seq = 0; + in_seq = 0; + } return send_client_ident(); } @@ -2393,7 +2412,7 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { return _fault(); } - cookie = server_ident.cookie(); + server_cookie = server_ident.cookie(); connection->set_peer_addrs(server_ident.addrs()); peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid()); @@ -2528,15 +2547,17 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { ClientIdentFrame client_ident(this, payload, length); - ldout(cct, 5) << __func__ << " received client identification: " - << "addrs=" << client_ident.addrs() - << " target=" << client_ident.target_addr() + ldout(cct, 5) << __func__ << " received client identification:" + << " addrs=" << client_ident.addrs() + << " target=" << client_ident.target_addr() << " gid=" << client_ident.gid() << " global_seq=" << client_ident.global_seq() << " features_supported=" << std::hex << client_ident.supported_features() << " features_required=" << client_ident.required_features() - << " flags=" << client_ident.flags() << std::dec << dendl; + << " flags=" << client_ident.flags() + << " cookie=" << client_ident.cookie() << std::dec << dendl; + if (client_ident.addrs().empty() || client_ident.addrs().front() == entity_addr_t()) { return _fault(); // a v2 peer should never do this @@ -2555,6 +2576,8 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid()); connection->set_peer_id(client_ident.gid()); + client_cookie = client_ident.cookie(); + uint64_t feat_missing = (connection->policy.features_required | msgr2_required) & ~(uint64_t)client_ident.supported_features(); @@ -2613,44 +2636,20 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { ReconnectFrame reconnect(this, payload, length); ldout(cct, 5) << __func__ - << " received reconnect: cookie=" << reconnect.cookie() - << " target_addr=" << reconnect.target_addr() - << " gid=" << reconnect.gid() + << " received reconnect:" + << " client_cookie=" << std::hex << reconnect.client_cookie() + << " server_cookie=" << reconnect.server_cookie() << std::dec << " gs=" << reconnect.global_seq() << " cs=" << reconnect.connect_seq() << " ms=" << reconnect.msg_seq() - << dendl; - - if (!messenger->get_myaddrs().contains(reconnect.target_addr())) { - ldout(cct,5) << __func__ << " peer is trying to reach " - << reconnect.target_addr() - << " which is not us (" << messenger->get_myaddrs() << ")" - << dendl; - return _fault(); - } + << dendl; // Should we check if one of the ident.addrs match connection->target_addr // as we do in ProtocolV1? connection->set_peer_addrs(reconnect.addrs()); connection->target_addr = connection->_infer_target_addr(reconnect.addrs()); - - peer_name = entity_name_t(connection->get_peer_type(), reconnect.gid()); - connection_features = reconnect.supported_features() & - connection->policy.features_supported; peer_global_seq = reconnect.global_seq(); - uint64_t feat_missing = - (connection->policy.features_required | msgr2_required) & - ~(uint64_t)reconnect.supported_features(); - if (feat_missing) { - ldout(cct, 1) << __func__ << " peer missing required features " << std::hex - << feat_missing << std::dec << dendl; - IdentMissingFeaturesFrame ident_missing_features(this, feat_missing); - - bufferlist &bl = ident_missing_features.get_buffer(); - return WRITE(bl, "ident missing features", read_frame); - } - connection->lock.unlock(); AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); @@ -2674,15 +2673,13 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { return _fault(); } - ResetFrame reset; - bufferlist &bl = reset.get_buffer(); - if (!existing) { // there is no existing connection therefore cannot reconnect to previous // session ldout(cct, 0) << __func__ << " no existing connection exists, reseting client" << dendl; - return WRITE(bl, "session reset", read_frame); + ResetFrame reset(this, true); + return WRITE(reset.get_buffer(), "session reset", read_frame); } std::lock_guard l(existing->lock); @@ -2696,7 +2693,8 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { if (exproto->state == CLOSED) { ldout(cct, 5) << __func__ << " existing " << existing << " already closed. Reseting client" << dendl; - return WRITE(bl, "session reset", read_frame); + ResetFrame reset(this, true); + return WRITE(reset.get_buffer(), "session reset", read_frame); } if (exproto->replacing) { @@ -2708,29 +2706,28 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { return WRITE(bl, "session retry", read_frame); } - if (connection->policy.resetcheck) { - if (!exproto->cookie) { - // server connection was reseted, reset client - ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl; - return WRITE(bl, "session reset", read_frame); - } else if (exproto->cookie != reconnect.cookie()) { - ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie - << " cc=" << reconnect.cookie() << ", reseting client" - << dendl; - return WRITE(bl, "session reset", read_frame); - } - } else { - if (exproto->cookie == 0) { - // this happens when: - // - a connects to b - // - a sends client_ident - // - b gets client_ident, sends server_ident and sets cookie X - // - connection fault - // - b reconnects to a with cookie X, connect_seq=1 - // - a has cookie==0 - ldout(cct, 5) << __func__ << " no cookie set, setting" << dendl; - exproto->cookie = reconnect.cookie(); - } + if (exproto->client_cookie != reconnect.client_cookie()) { + ldout(cct, 1) << __func__ << " existing=" << existing + << " client cookie mismatch, I must have reseted:" + << " cc=" << std::hex << exproto->client_cookie + << " rcc=" << reconnect.client_cookie() + << ", reseting client." + << dendl; + ResetFrame reset(this, connection->policy.resetcheck); + return WRITE(reset.get_buffer(), "session reset", read_frame); + } else if (exproto->server_cookie == 0) { + // this happens when: + // - a connects to b + // - a sends client_ident + // - b gets client_ident, sends server_ident and sets cookie X + // - connection fault + // - b reconnects to a with cookie X, connect_seq=1 + // - a has cookie==0 + ldout(cct, 1) << __func__ << " I was a client and didn't received the" + << " server_ident. Asking peer to resume session" + << " establishment" << dendl; + ResetFrame reset(this, false); + return WRITE(reset.get_buffer(), "session reset", read_frame); } if (exproto->peer_global_seq > reconnect.global_seq()) { @@ -2739,8 +2736,7 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { << " cgs=" << reconnect.global_seq() << ", ask client to retry global" << dendl; RetryGlobalFrame retry(this, exproto->peer_global_seq); - bufferlist &bl = retry.get_buffer(); - return WRITE(bl, "session retry", read_frame); + return WRITE(retry.get_buffer(), "session retry", read_frame); } if (exproto->connect_seq >= reconnect.connect_seq()) { @@ -2749,8 +2745,7 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { << " ccs=" << reconnect.connect_seq() << " , ask client to retry" << dendl; RetryFrame retry(this, exproto->connect_seq); - bufferlist &bl = retry.get_buffer(); - return WRITE(bl, "session retry", read_frame); + return WRITE(retry.get_buffer(), "session retry", read_frame); } ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl; @@ -2810,7 +2805,8 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { return send_server_ident(); } - if (exproto->cookie) { + if (exproto->server_cookie && exproto->client_cookie && + exproto->client_cookie != client_cookie) { // Found previous session // peer has reseted and we're going to reuse the existing connection // by replacing the communication socket @@ -2822,6 +2818,14 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { return reuse_connection(existing, exproto); } + if (exproto->client_cookie == client_cookie) { + // session establishment interrupted between client_ident and server_ident, + // continuing... + ldout(cct, 1) << __func__ << " found previous session existing=" << existing + << ", continuing session establishment." << dendl; + return reuse_connection(existing, exproto); + } + if (exproto->state == READY || exproto->state == STANDBY) { ldout(cct, 1) << __func__ << " existing=" << existing << " is READY/STANDBY, lets reuse it" << dendl; @@ -2877,6 +2881,8 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, } exproto->reset_recv_state(); + exproto->client_cookie = client_cookie; + exproto->peer_name = peer_name; exproto->peer_global_seq = peer_global_seq; exproto->connection_features = connection_features; @@ -2979,7 +2985,7 @@ CtPtr ProtocolV2::send_server_ident() { in_seq = 0; if (!connection->policy.lossy) { - cookie = ceph::util::generate_random_number(0, -1ll); + server_cookie = ceph::util::generate_random_number(1, -1ll); } uint64_t flags = 0; @@ -2993,7 +2999,7 @@ CtPtr ProtocolV2::send_server_ident() { connection->policy.features_supported, connection->policy.features_required | msgr2_required, flags, - cookie); + server_cookie); ldout(cct, 5) << __func__ << " sending identification:" << " addrs=" << messenger->get_myaddrs() @@ -3001,8 +3007,8 @@ CtPtr ProtocolV2::send_server_ident() { << " global_seq=" << gs << " features_supported=" << std::hex << connection->policy.features_supported << " features_required=" - << (connection->policy.features_required | msgr2_required) - << " flags=" << flags << " cookie=" << std::dec << cookie + << (connection->policy.features_required | msgr2_required) + << " flags=" << flags << " cookie=" << std::dec << server_cookie << dendl; connection->lock.unlock(); diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 64bc10f4c1a..9edcd473da0 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -77,7 +77,8 @@ private: uint64_t peer_required_features; std::shared_ptr session_security; - uint64_t cookie; + uint64_t client_cookie; + uint64_t server_cookie; uint64_t global_seq; uint64_t connect_seq; uint64_t peer_global_seq; @@ -223,7 +224,7 @@ private: Ct *send_client_ident(); Ct *send_reconnect(); Ct *handle_ident_missing_features(char *payload, uint32_t length); - Ct *handle_session_reset(); + Ct *handle_session_reset(char *payload, uint32_t length); Ct *handle_session_retry(char *payload, uint32_t length); Ct *handle_session_retry_global(char *payload, uint32_t length); Ct *handle_wait();