From: Ricardo Dias Date: Thu, 8 Nov 2018 17:15:03 +0000 (+0000) Subject: msg/async: msgr2: implement reconnect X-Git-Tag: v14.1.0~271^2~49 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2063fa9ecf5bfc105e1b54b0c8cc89094e23e69e;p=ceph-ci.git msg/async: msgr2: implement reconnect Signed-off-by: Ricardo Dias --- diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h index 4eba36371a5..ddf8bafe363 100644 --- a/src/msg/async/Protocol.h +++ b/src/msg/async/Protocol.h @@ -46,7 +46,7 @@ public: } }; -#define CONTINUATION_DECL(C, F, ...) \ +#define CONTINUATION_DECL(C, F, ...) \ std::unique_ptr> F##_cont_ = \ std::make_unique>(&C::F); \ CtFun *F##_cont = F##_cont_.get() @@ -64,7 +64,16 @@ public: } \ } -#define READ_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, char*, int) +#define CONTINUATION_RUN2(I, CT) \ + { \ + Ct::type> *_cont = CT; \ + while (_cont) { \ + _cont = _cont->call(I); \ + } \ + } + +#define READ_HANDLER_CONTINUATION_DECL(C, F) \ + CONTINUATION_DECL(C, F, char *, int) #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int) ////////////////////////////////////////////////////////////////////// diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 9323e71f2c8..b8d29b641a5 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -54,9 +54,11 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) state(NONE), peer_required_features(0), cookie(0), + connect_seq(0), + peer_global_seq(0), message_seq(0), + replacing(false), can_write(false), - connect_seq(0), bannerExchangeCallback(nullptr), next_frame_len(0), keepalive(false) { @@ -95,6 +97,30 @@ void ProtocolV2::discard_out_queue() { out_queue.clear(); } +void ProtocolV2::reset_session() { + ldout(cct, 20) << __func__ << dendl; + + std::lock_guard l(connection->write_lock); + if (connection->delay_state) { + connection->delay_state->discard(); + } + + connection->dispatch_queue->discard_queue(connection->conn_id); + discard_out_queue(); + connection->outcoming_bl.clear(); + + connection->dispatch_queue->queue_remote_reset(connection); + + out_seq = 0; + in_seq = 0; + cookie = 0; + connect_seq = 0; + peer_global_seq = 0; + message_seq = 0; + ack_left = 0; + can_write = false; +} + void ProtocolV2::stop() { ldout(cct, 2) << __func__ << dendl; if (state == CLOSED) { @@ -131,6 +157,28 @@ void ProtocolV2::requeue_sent() { } } +uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { + ldout(cct, 10) << __func__ << " " << seq << dendl; + std::lock_guard l(connection->write_lock); + if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) { + return seq; + } + list > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; + uint64_t count = out_seq; + while (!rq.empty()) { + pair p = rq.front(); + if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break; + ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq " + << p.second->get_seq() << " <= " << seq << ", discarding" + << dendl; + p.second->put(); + rq.pop_front(); + count++; + } + if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST); + return count; +} + void ProtocolV2::reset_recv_state() { // clean read and write callbacks connection->pendingReadLen.reset(); @@ -190,15 +238,16 @@ CtPtr ProtocolV2::_fault() { requeue_sent(); if (out_queue.empty() && state >= START_ACCEPT && - state <= ACCEPTED_CLIENT_IDENT) { + state <= ACCEPTING_SESSION) { ldout(cct, 10) << __func__ << " with nothing to send and in the half " << " accept state just closed" << dendl; + connection->write_lock.unlock(); stop(); connection->dispatch_queue->queue_reset(connection); - connection->write_lock.unlock(); return nullptr; } + replacing = false; connection->fault(); reset_recv_state(); @@ -297,7 +346,7 @@ void ProtocolV2::send_message(Message *m) { out_queue[m->get_priority()].emplace_back(std::move(bl), m); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; - if (can_write) { + if ((!replacing && can_write) || state == STANDBY) { connection->center->dispatch_event_external(connection->write_handler); } } @@ -520,6 +569,9 @@ void ProtocolV2::write_event() { connection->write_lock.lock(); if (state == STANDBY && !connection->policy.server && is_queued()) { ldout(cct, 10) << __func__ << " policy.server is false" << dendl; + if (cookie) { // only increment connect_seq if there is a session + connect_seq++; + } connection->_connect(); } else if (connection->cs && state != NONE && state != CLOSED && state != START_CONNECT) { @@ -537,7 +589,9 @@ void ProtocolV2::write_event() { } } -bool ProtocolV2::is_queued() { return false; } +bool ProtocolV2::is_queued() { + return !out_queue.empty() || connection->is_queued(); +} CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int), int len, char *buffer) { @@ -650,7 +704,7 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); ceph_assert(state == ACCEPTING); connection->policy = messenger->get_policy(peer_type); - ldout(cct, 10) << __func__ << " accept of host_type " << peer_type + ldout(cct, 10) << __func__ << " accept of host_type " << (int)peer_type << ", policy.lossy=" << connection->policy.lossy << " policy.server=" << connection->policy.server << " policy.standby=" << connection->policy.standby @@ -692,6 +746,17 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); this->peer_required_features = peer_required_features; + if (cct->_conf->ms_inject_internal_delays && + cct->_conf->ms_inject_socket_failures) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 10) << __func__ << " sleep for " + << cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + } + CtPtr callback; callback = bannerExchangeCallback; bannerExchangeCallback = nullptr; @@ -731,10 +796,18 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) { case Tag::AUTH_DONE: case Tag::IDENT: case Tag::IDENT_MISSING_FEATURES: + case Tag::SESSION_RECONNECT: + case Tag::SESSION_RETRY: + case Tag::SESSION_RETRY_GLOBAL: + case Tag::SESSION_RECONNECT_OK: case Tag::KEEPALIVE2: case Tag::KEEPALIVE2_ACK: case Tag::ACK: return READ(next_frame_len, handle_frame_payload); + case Tag::SESSION_RESET: + return handle_session_reset(); + case Tag::WAIT: + return handle_wait(); case Tag::MESSAGE: return handle_message(); } @@ -766,6 +839,14 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { return handle_ident(buffer, next_frame_len); case Tag::IDENT_MISSING_FEATURES: return handle_ident_missing_features(buffer, next_frame_len); + case Tag::SESSION_RECONNECT: + return handle_reconnect(buffer, next_frame_len); + case Tag::SESSION_RETRY: + return handle_session_retry(buffer, next_frame_len); + case Tag::SESSION_RETRY_GLOBAL: + return handle_session_retry_global(buffer, next_frame_len); + case Tag::SESSION_RECONNECT_OK: + return handle_reconnect_ok(buffer, next_frame_len); case Tag::KEEPALIVE2: return handle_keepalive2(buffer, next_frame_len); case Tag::KEEPALIVE2_ACK: @@ -857,8 +938,9 @@ CtPtr ProtocolV2::ready() { } } - state = READY; + connection->maybe_start_delay_thread(); + state = READY; return CONTINUE(read_frame); } @@ -1330,6 +1412,8 @@ CtPtr ProtocolV2::start_client_banner_exchange() { ldout(cct, 20) << __func__ << dendl; state = CONNECTING; + global_seq = messenger->get_global_seq(); + return _banner_exchange(CONTINUATION(post_client_banner_exchange)); } @@ -1409,7 +1493,13 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { ldout(cct, 1) << __func__ << " authentication done," << " flags=" << auth_done.flags << dendl; - return send_client_ident(); + if (!cookie) { + ceph_assert(connect_seq == 0); + return send_client_ident(); + } else { // reconnecting to previous session + ceph_assert(connect_seq > 0); + return send_reconnect(); + } } CtPtr ProtocolV2::send_client_ident() { @@ -1420,21 +1510,21 @@ CtPtr ProtocolV2::send_client_ident() { flags |= CEPH_MSG_CONNECT_LOSSY; } - cookie = ceph::util::generate_random_number(0, -1ll); - - IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(), - connection->policy.features_supported, - connection->policy.features_required, flags, cookie); + ClientIdentFrame client_ident(messenger->get_myaddrs(), + messenger->get_myname().num(), global_seq, + connection->policy.features_supported, + connection->policy.features_required, flags); ldout(cct, 5) << __func__ << " sending identification: " - << "addrs: " << ident.addrs << " gid: " << ident.gid - << " features_supported: " << std::hex - << ident.supported_features - << " features_required: " << ident.required_features - << " flags: " << ident.flags << " cookie: " << std::dec - << ident.cookie << dendl; - - bufferlist &bl = ident.get_buffer(); + << "addrs=" << messenger->get_myaddrs() + << " gid=" << messenger->get_myname().num() + << " global_seq=" << global_seq + << " features_supported=" << std::hex + << connection->policy.features_supported + << " features_required=" << connection->policy.features_required + << " flags=" << flags << std::dec << dendl; + + bufferlist &bl = client_ident.get_buffer(); return WRITE(bl, handle_client_ident_write); } @@ -1450,6 +1540,31 @@ CtPtr ProtocolV2::handle_client_ident_write(int r) { return CONTINUE(read_frame); } +CtPtr ProtocolV2::send_reconnect() { + ldout(cct, 20) << __func__ << dendl; + + ReconnectFrame reconnect(messenger->get_myaddrs(), cookie, global_seq, + connect_seq, in_seq); + + ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie + << " gs=" << global_seq << " cs=" << connect_seq + << " ms=" << in_seq << dendl; + bufferlist &bl = reconnect.get_buffer(); + return WRITE(bl, handle_reconnect_write); +} + +CtPtr ProtocolV2::handle_reconnect_write(int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " reconnect write failed r=" << r << " (" + << cpp_strerror(r) << ")" << dendl; + return _fault(); + } + + return CONTINUE(read_frame); +} + CtPtr ProtocolV2::handle_ident_missing_features(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; @@ -1462,23 +1577,106 @@ CtPtr ProtocolV2::handle_ident_missing_features(char *payload, return _fault(); } +CtPtr ProtocolV2::handle_session_reset() { + ldout(cct, 20) << __func__ << dendl; + + ldout(cct, 1) << __func__ << " received session reset" << dendl; + reset_session(); + + return send_client_ident(); +} + +CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; + + RetryFrame retry(payload, length); + connect_seq = retry.connect_seq + 1; + + ldout(cct, 1) << __func__ + << " received session retry connect_seq=" << retry.connect_seq + << ", inc to cs=" << connect_seq << dendl; + + return send_reconnect(); +} + +CtPtr ProtocolV2::handle_session_retry_global(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; + + RetryGlobalFrame retry(payload, length); + global_seq = messenger->get_global_seq(retry.global_seq); + + ldout(cct, 1) << __func__ << " received session retry global global_seq=" + << retry.global_seq << ", choose new gs=" << global_seq + << dendl; + + return send_reconnect(); +} + +CtPtr ProtocolV2::handle_wait() { + ldout(cct, 20) << __func__ << dendl; + ldout(cct, 1) << __func__ << " received WAIT (connection race)" << dendl; + state = WAIT; + return _fault(); +} + +CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; + + ReconnectOkFrame reconnect_ok(payload, length); + ldout(cct, 5) << __func__ + << " reconnect accepted: sms=" << reconnect_ok.msg_seq << dendl; + + out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq); + + backoff = utime_t(); + ldout(cct, 10) << __func__ << " reconnect success " << connect_seq + << ", lossy = " << connection->policy.lossy << ", features " + << connection->get_features() << dendl; + + if (connection->delay_state) { + ceph_assert(connection->delay_state->ready()); + } + + connection->dispatch_queue->queue_connect(connection); + messenger->ms_deliver_handle_fast_connect(connection); + + return ready(); +} + CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - IdentFrame server_ident(payload, length); + ServerIdentFrame server_ident(payload, length); ldout(cct, 5) << __func__ << " received server identification: " - << "addrs: " << server_ident.addrs - << " gid: " << server_ident.gid - << " features_supported: " << std::hex + << "addrs=" << server_ident.addrs << " gid=" << server_ident.gid + << " global_seq=" << server_ident.global_seq + << " features_supported=" << std::hex << server_ident.supported_features - << " features_required: " << server_ident.required_features - << " flags: " << server_ident.flags << " cookie: " << std::dec + << " features_required=" << server_ident.required_features + << " flags=" << server_ident.flags << " cookie=" << std::dec << server_ident.cookie << dendl; + cookie = server_ident.cookie; + connection->set_peer_addrs(server_ident.addrs); connection->peer_global_id = server_ident.gid; - connection->set_features(server_ident.required_features & + connection->set_features(server_ident.supported_features & connection->policy.features_supported); + peer_global_seq = server_ident.global_seq; + + connection->policy.lossy = server_ident.flags & CEPH_MSG_CONNECT_LOSSY; + + backoff = utime_t(); + ldout(cct, 10) << __func__ << " connect success " << connect_seq + << ", lossy = " << connection->policy.lossy << ", features " + << connection->get_features() << dendl; + + if (connection->delay_state) { + ceph_assert(connection->delay_state->ready()); + } + + connection->dispatch_queue->queue_connect(connection); + messenger->ms_deliver_handle_fast_connect(connection); return ready(); } @@ -1614,16 +1812,15 @@ CtPtr ProtocolV2::handle_auth_done_write(int r) { CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; - IdentFrame client_ident(payload, length); + ClientIdentFrame client_ident(payload, length); ldout(cct, 5) << __func__ << " received client identification: " - << "addrs: " << client_ident.addrs - << " gid: " << client_ident.gid - << " features_supported: " << std::hex + << "addrs=" << client_ident.addrs << " gid=" << client_ident.gid + << " global_seq=" << client_ident.global_seq + << " features_supported=" << std::hex << client_ident.supported_features - << " features_required: " << client_ident.required_features - << " flags: " << client_ident.flags << " cookie: " << std::dec - << client_ident.cookie << dendl; + << " features_required=" << client_ident.required_features + << " flags=" << client_ident.flags << std::dec << dendl; if (client_ident.addrs.empty()) { connection->set_peer_addr(connection->target_addr); @@ -1631,6 +1828,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { // Should we check if one of the ident.addrs match connection->target_addr // as we do in ProtocolV1? connection->set_peer_addrs(client_ident.addrs); + connection->target_addr = client_ident.addrs.msgr2_addr(); } uint64_t feat_missing = connection->policy.features_required & @@ -1644,27 +1842,404 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { return WRITE(bl, handle_ident_missing_features_write); } - state = ACCEPTED_CLIENT_IDENT; + connection_features = + client_ident.supported_features & connection->policy.features_supported; + + state = ACCEPTING_SESSION; + peer_global_seq = client_ident.global_seq; + + // Looks good so far, let's check if there is already an existing connection + // to this peer. + + connection->lock.unlock(); + AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); + + connection->inject_delay(); + + connection->lock.lock(); + if (state != ACCEPTING_SESSION) { + ldout(cct, 1) << __func__ + << " state changed while accept, it must be mark_down" + << dendl; + ceph_assert(state == CLOSED); + return _fault(); + } + + if (existing) { + return handle_existing_connection(existing); + } // if everything is OK reply with server identification - connection->peer_global_id = client_ident.gid; - cookie = client_ident.cookie; + return send_server_ident(); +} + +CtPtr ProtocolV2::handle_ident_missing_features_write(int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r + << " (" << cpp_strerror(r) << ")" << dendl; + return _fault(); + } + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; + + ReconnectFrame reconnect(payload, length); + + ldout(cct, 5) << __func__ + << " received reconnect: cookie=" << reconnect.cookie + << " gs=" << reconnect.global_seq + << " cs=" << reconnect.connect_seq + << " ms=" << reconnect.msg_seq << dendl; + + if (reconnect.addrs.empty()) { + connection->set_peer_addr(connection->target_addr); + } else { + // Should we check if one of the ident.addrs match connection->target_addr + // as we do in ProtocolV1? + connection->set_peer_addrs(reconnect.addrs); + connection->target_addr = reconnect.addrs.msgr2_addr(); + } + + connection->lock.unlock(); + AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); + + connection->inject_delay(); + + connection->lock.lock(); + if (state != ACCEPTING) { + ldout(cct, 1) << __func__ + << " state changed while accept, it must be mark_down" + << dendl; + ceph_assert(state == CLOSED); + return _fault(); + } + + ResetFrame reset; + bufferlist &bl = reset.get_buffer(); + + if (!existing) { + // there is no existing connection therefore cannot reconnect to previous + // session + ldout(cct, 0) << __func__ + << " no existing connection exists, reseting client" << dendl; + return WRITE(bl, handle_session_reset_write); + } + + std::lock_guard l(existing->lock); + + ProtocolV2 *exproto = dynamic_cast(existing->protocol.get()); + if (!exproto) { + ldout(cct, 1) << __func__ << " existing=" << existing << dendl; + ceph_assert(false); + } + + if (exproto->state == CLOSED) { + ldout(cct, 5) << __func__ << " existing " << existing + << " already closed. Reseting client" << dendl; + return WRITE(bl, handle_session_reset_write); + } + + if (exproto->replacing) { + ldout(cct, 1) << __func__ + << " existing racing replace happened while replacing." + << " existing=" << existing << dendl; + RetryGlobalFrame retry(exproto->peer_global_seq); + bufferlist &bl = retry.get_buffer(); + return WRITE(bl, handle_session_retry_write); + } + + if (!exproto->cookie) { + // server connection was reseted, reset client + ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl; + return WRITE(bl, handle_session_reset_write); + } else if (exproto->cookie != reconnect.cookie) { + ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie + << " cc=" << reconnect.cookie << ", reseting client" << dendl; + return WRITE(bl, handle_session_reset_write); + } + + if (exproto->peer_global_seq > reconnect.global_seq) { + ldout(cct, 5) << __func__ + << " stale global_seq: sgs=" << exproto->peer_global_seq + << " cgs=" << reconnect.global_seq + << ", ask client to retry global" << dendl; + RetryGlobalFrame retry(exproto->peer_global_seq); + bufferlist &bl = retry.get_buffer(); + return WRITE(bl, handle_session_retry_write); + } + + if (exproto->connect_seq >= reconnect.connect_seq) { + ldout(cct, 5) << __func__ + << " stale connect_seq scs=" << exproto->connect_seq + << " ccs=" << reconnect.connect_seq + << " , ask client to retry" << dendl; + RetryFrame retry(exproto->connect_seq); + bufferlist &bl = retry.get_buffer(); + return WRITE(bl, handle_session_retry_write); + } + + // everything looks good + exproto->connect_seq = reconnect.connect_seq; + exproto->message_seq = reconnect.msg_seq; + + return reuse_connection(existing, exproto, true); +} + +CtPtr ProtocolV2::handle_session_reset_write(int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " session reset write failed r=" << r << " (" + << cpp_strerror(r) << ")" << dendl; + return _fault(); + } + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::handle_session_retry_write(int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " session retry write failed r=" << r << " (" + << cpp_strerror(r) << ")" << dendl; + return _fault(); + } + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { + ldout(cct, 20) << __func__ << " existing=" << existing << dendl; + + std::lock_guard l(existing->lock); + + ProtocolV2 *exproto = dynamic_cast(existing->protocol.get()); + if (!exproto) { + ldout(cct, 1) << __func__ << " existing=" << existing << dendl; + ceph_assert(false); + } + + if (exproto->state == CLOSED) { + ldout(cct, 1) << __func__ << " existing " << existing << " already closed." + << dendl; + return send_server_ident(); + } + + if (exproto->replacing) { + ldout(cct, 1) << __func__ + << " existing racing replace happened while replacing." + << " existing=" << existing << dendl; + WaitFrame wait; + bufferlist &bl = wait.get_buffer(); + return WRITE(bl, handle_wait_write); + } + + if (existing->policy.lossy) { + // existing connection can be thrown out in favor of this one + ldout(cct, 1) + << __func__ + << " accept replacing existing (lossy) channel (new one lossy=" + << connection->policy.lossy << ")" << dendl; + existing->protocol->stop(); + existing->dispatch_queue->queue_reset(existing.get()); + return send_server_ident(); + } + + if (exproto->cookie) { // Found previous session + // peer has reseted and we're going to reuse the existing connection + // by replacing the communication socket + if (connection->policy.resetcheck) { + exproto->reset_session(); + } + return reuse_connection(existing, exproto, false); + } + + if (exproto->state == READY || exproto->state == STANDBY) { + ldout(cct, 10) << __func__ + << " existing connection is READY/STANDBY, lets reuse it" + << dendl; + return reuse_connection(existing, exproto, false); + } + + // Looks like a connection race: server and client are both connecting to + // each other at the same time. + if (connection->peer_addrs->msgr2_addr() < + messenger->get_myaddrs().msgr2_addr() || + existing->policy.server) { + // this connection wins + ldout(cct, 10) << __func__ + << " connection race detected, replacing existing=" + << existing << " socket by this connection's socket" + << dendl; + return reuse_connection(existing, exproto, false); + } else { + // the existing connection wins + ldout(cct, 10) << __func__ + << " connection race detected, this connection loses" + << dendl; + ceph_assert(connection->peer_addrs->msgr2_addr() > + messenger->get_myaddrs().msgr2_addr()); + WaitFrame wait; + bufferlist &bl = wait.get_buffer(); + return WRITE(bl, handle_wait_write); + } +} + +CtPtr ProtocolV2::handle_wait_write(int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " wait write failed r=" << r << " (" + << cpp_strerror(r) << ")" << dendl; + return _fault(); + } + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, + ProtocolV2 *exproto, bool reconnect) { + ldout(cct, 20) << __func__ << " existing=" << existing + << " reconnect=" << reconnect << dendl; + + connection->inject_delay(); + + std::lock_guard l(existing->write_lock); + + connection->center->delete_file_event(connection->cs.fd(), + EVENT_READABLE | EVENT_WRITABLE); + + if (existing->delay_state) { + existing->delay_state->flush(); + ceph_assert(!connection->delay_state); + } + exproto->reset_recv_state(); + if (!reconnect) { + exproto->peer_global_seq = peer_global_seq; + exproto->connection_features = connection_features; + } + + auto temp_cs = std::move(connection->cs); + EventCenter *new_center = connection->center; + Worker *new_worker = connection->worker; + // avoid _stop shutdown replacing socket + // queue a reset on the new connection, which we're dumping for the old + stop(); + + connection->dispatch_queue->queue_reset(connection); + ldout(messenger->cct, 1) << __func__ << " stop myself to swap existing" + << dendl; + exproto->can_write = false; + exproto->replacing = true; + existing->state_offset = 0; + // avoid previous thread modify event + exproto->state = NONE; + existing->state = AsyncConnection::STATE_NONE; + // Discard existing prefetch buffer in `recv_buf` + existing->recv_start = existing->recv_end = 0; + // there shouldn't exist any buffer + ceph_assert(connection->recv_start == connection->recv_end); + + auto deactivate_existing = std::bind( + [existing, new_worker, new_center, exproto, + reconnect](ConnectedSocket &cs) mutable { + // we need to delete time event in original thread + { + std::lock_guard l(existing->lock); + existing->write_lock.lock(); + exproto->requeue_sent(); + existing->outcoming_bl.clear(); + existing->open_write = false; + existing->write_lock.unlock(); + if (exproto->state == NONE) { + existing->shutdown_socket(); + existing->cs = std::move(cs); + existing->worker->references--; + new_worker->references++; + existing->logger = new_worker->get_perf_counter(); + existing->worker = new_worker; + existing->center = new_center; + if (existing->delay_state) + existing->delay_state->set_center(new_center); + } else if (exproto->state == CLOSED) { + auto back_to_close = std::bind( + [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs)); + new_center->submit_to(new_center->get_id(), + std::move(back_to_close), true); + return; + } else { + ceph_abort(); + } + } + + // Before changing existing->center, it may already exists some + // events in existing->center's queue. Then if we mark down + // `existing`, it will execute in another thread and clean up + // connection. Previous event will result in segment fault + auto transfer_existing = [existing, exproto, reconnect]() mutable { + std::lock_guard l(existing->lock); + if (exproto->state == CLOSED) return; + ceph_assert(exproto->state == NONE); + + exproto->state = ACCEPTING_SESSION; + existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED; + existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, + existing->read_handler); + if (!reconnect) { + CONTINUATION_RUN2(exproto, exproto->send_server_ident()) + } else { + CONTINUATION_RUN2(exproto, exproto->send_reconnect_ok()) + } + }; + if (existing->center->in_thread()) + transfer_existing(); + else + existing->center->submit_to(existing->center->get_id(), + std::move(transfer_existing), true); + }, + std::move(temp_cs)); + + existing->center->submit_to(existing->center->get_id(), + std::move(deactivate_existing), true); + return nullptr; +} + +CtPtr ProtocolV2::send_server_ident() { + ldout(cct, 20) << __func__ << dendl; + + // this is required for the case when this connection is being replaced + out_seq = discard_requeued_up_to(out_seq, 0); + in_seq = 0; + + if (!connection->policy.lossy) { + cookie = ceph::util::generate_random_number(0, -1ll); + } uint64_t flags = 0; if (connection->policy.lossy) { flags = flags | CEPH_MSG_CONNECT_LOSSY; } - IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(), - connection->policy.features_supported, - connection->policy.features_required, flags, cookie); + + uint64_t gs = messenger->get_global_seq(); + ServerIdentFrame server_ident( + messenger->get_myaddrs(), messenger->get_myname().num(), gs, + connection->policy.features_supported, + connection->policy.features_required, flags, cookie); ldout(cct, 5) << __func__ << " sending identification: " - << "addrs: " << ident.addrs << " gid: " << ident.gid - << " features_supported: " << std::hex - << ident.supported_features - << " features_required: " << ident.required_features - << " flags: " << ident.flags << " cookie: " << std::dec - << ident.cookie << dendl; + << "addrs=" << messenger->get_myaddrs() + << " gid=" << messenger->get_myname().num() + << " global_seq=" << gs << " features_supported=" << std::hex + << connection->policy.features_supported + << " features_required=" << connection->policy.features_required + << " flags=" << flags << " cookie=" << std::dec << cookie + << dendl; connection->lock.unlock(); // Because "replacing" will prevent other connections preempt this addr, @@ -1674,35 +2249,84 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { connection->inject_delay(); connection->lock.lock(); + replacing = false; if (r < 0) { ldout(cct, 1) << __func__ << " existing race replacing process for addr = " - << connection->peer_addrs.msgr2_addr() + << connection->peer_addrs->msgr2_addr() << " just fail later one(this)" << dendl; - ldout(cct, 10) << "accept fault after register" << dendl; connection->inject_delay(); return _fault(); } - if (state != ACCEPTED_CLIENT_IDENT) { + if (state != ACCEPTING_SESSION) { ldout(cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl; ceph_assert(state == CLOSED || state == NONE); - ldout(cct, 10) << "accept fault after register" << dendl; connection->inject_delay(); return _fault(); } - bufferlist &bl = ident.get_buffer(); - return WRITE(bl, handle_send_server_ident_write); + connection->set_features(connection_features); + + // notify + connection->dispatch_queue->queue_accept(connection); + messenger->ms_deliver_handle_fast_accept(connection); + + bufferlist &bl = server_ident.get_buffer(); + return WRITE(bl, handle_server_ident_write); } -CtPtr ProtocolV2::handle_ident_missing_features_write(int r) { +CtPtr ProtocolV2::handle_server_ident_write(int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { - ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r - << " (" << cpp_strerror(r) << ")" << dendl; + ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " (" + << cpp_strerror(r) << ")" << dendl; + connection->inject_delay(); + return _fault(); + } + + if (connection->delay_state) { + ceph_assert(connection->delay_state->ready()); + } + + return ready(); +} + +CtPtr ProtocolV2::send_reconnect_ok() { + ldout(cct, 20) << __func__ << dendl; + + out_seq = discard_requeued_up_to(out_seq, message_seq); + + uint64_t ms = in_seq; + ReconnectOkFrame reconnect_ok(ms); + + ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl; + + connection->lock.unlock(); + // Because "replacing" will prevent other connections preempt this addr, + // it's safe that here we don't acquire Connection's lock + ssize_t r = messenger->accept_conn(connection); + + connection->inject_delay(); + + connection->lock.lock(); + replacing = false; + + if (r < 0) { + ldout(cct, 1) << __func__ << " existing race replacing process for addr = " + << connection->peer_addrs->msgr2_addr() + << " just fail later one(this)" << dendl; + connection->inject_delay(); + return _fault(); + } + if (state != ACCEPTING_SESSION) { + ldout(cct, 1) << __func__ + << " state changed while accept_conn, it must be mark_down" + << dendl; + ceph_assert(state == CLOSED || state == NONE); + connection->inject_delay(); return _fault(); } @@ -1710,17 +2334,23 @@ CtPtr ProtocolV2::handle_ident_missing_features_write(int r) { connection->dispatch_queue->queue_accept(connection); messenger->ms_deliver_handle_fast_accept(connection); - return CONTINUE(read_frame); + bufferlist &bl = reconnect_ok.get_buffer(); + return WRITE(bl, handle_reconnect_ok_write); } -CtPtr ProtocolV2::handle_send_server_ident_write(int r) { +CtPtr ProtocolV2::handle_reconnect_ok_write(int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { - ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " (" + ldout(cct, 1) << __func__ << " reconnect ok write failed r=" << r << " (" << cpp_strerror(r) << ")" << dendl; + connection->inject_delay(); return _fault(); } + if (connection->delay_state) { + ceph_assert(connection->delay_state->ready()); + } + return ready(); } diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 616239d4fd9..bd8ff9e4191 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -14,7 +14,7 @@ private: CONNECTING, START_ACCEPT, ACCEPTING, - ACCEPTED_CLIENT_IDENT, + ACCEPTING_SESSION, READY, THROTTLE_MESSAGE, THROTTLE_BYTES, @@ -32,7 +32,7 @@ private: "CONNECTING", "START_ACCEPT", "ACCEPTING", - "ACCEPTED_CLIENT_IDENT", + "ACCEPTING_SESSION", "READY", "THROTTLE_MESSAGE", "THROTTLE_BYTES", @@ -45,7 +45,6 @@ private: return statenames[state]; } -public: enum class Tag : uint32_t { AUTH_REQUEST, AUTH_BAD_METHOD, @@ -54,6 +53,12 @@ public: AUTH_DONE, IDENT, IDENT_MISSING_FEATURES, + SESSION_RECONNECT, + SESSION_RESET, + SESSION_RETRY, + SESSION_RETRY_GLOBAL, + SESSION_RECONNECT_OK, + WAIT, MESSAGE, KEEPALIVE2, KEEPALIVE2_ACK, @@ -93,12 +98,9 @@ public: bufferlist auth_payload; AuthRequestFrame(uint32_t method, bufferlist &auth_payload) - : Frame(Tag::AUTH_REQUEST), - method(method), - len(auth_payload.length()), - auth_payload(auth_payload) { + : Frame(Tag::AUTH_REQUEST) { encode(method, payload, 0); - encode(len, payload, 0); + encode(auth_payload.length(), payload, 0); payload.claim_append(auth_payload); } @@ -115,12 +117,10 @@ public: std::vector<__u32> allowed_methods; AuthBadMethodFrame(uint32_t method, std::vector<__u32> methods) - : Frame(Tag::AUTH_BAD_METHOD), - method(method), - allowed_methods(methods) { + : Frame(Tag::AUTH_BAD_METHOD) { encode(method, payload, 0); - encode((uint32_t)allowed_methods.size(), payload, 0); - for (const auto &a_meth : allowed_methods) { + encode((uint32_t)methods.size(), payload, 0); + for (const auto &a_meth : methods) { encode(a_meth, payload, 0); } } @@ -141,9 +141,7 @@ public: std::string error_msg; AuthBadAuthFrame(uint32_t error_code, std::string error_msg) - : Frame(Tag::AUTH_BAD_AUTH), - error_code(error_code), - error_msg(error_msg) { + : Frame(Tag::AUTH_BAD_AUTH) { encode(error_code, payload, 0); encode(error_msg, payload, 0); } @@ -159,11 +157,8 @@ public: uint32_t len; bufferlist auth_payload; - AuthMoreFrame(bufferlist &auth_payload) - : Frame(Tag::AUTH_MORE), - len(auth_payload.length()), - auth_payload(auth_payload) { - encode(len, payload, 0); + AuthMoreFrame(bufferlist &auth_payload) : Frame(Tag::AUTH_MORE) { + encode(auth_payload.length(), payload, 0); payload.claim_append(auth_payload); } @@ -177,7 +172,7 @@ public: struct AuthDoneFrame : public Frame { uint64_t flags; - AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE), flags(flags) { + AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE) { encode(flags, payload, 0); } @@ -186,42 +181,165 @@ public: } }; - struct IdentFrame : public SignedEncryptedFrame { + struct ClientIdentFrame : public SignedEncryptedFrame { entity_addrvec_t addrs; int64_t gid; + uint64_t global_seq; uint64_t supported_features; // CEPH_FEATURE_* uint64_t required_features; // CEPH_FEATURE_* uint64_t flags; // CEPH_MSG_CONNECT_* - uint64_t cookie; - IdentFrame(entity_addrvec_t addrs, int64_t gid, uint64_t supported_features, - uint64_t required_features, uint64_t flags, uint64_t cookie) - : SignedEncryptedFrame(Tag::IDENT), - addrs(addrs), - gid(gid), - supported_features(supported_features), - required_features(required_features), - flags(flags), - cookie(cookie) { + ClientIdentFrame(const entity_addrvec_t &addrs, int64_t gid, + uint64_t global_seq, uint64_t supported_features, + uint64_t required_features, uint64_t flags) + : SignedEncryptedFrame(Tag::IDENT) { encode(addrs, payload, -1ll); encode(gid, payload, -1ll); + encode(global_seq, payload, -1ll); encode(supported_features, payload, -1ll); encode(required_features, payload, -1ll); encode(flags, payload, -1ll); + } + + ClientIdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { + bufferlist bl; + bl.push_back(buffer::create_static(length, payload)); + try { + auto ti = bl.cbegin(); + decode_frame(ti); + } catch (const buffer::error &e) { + } + } + + ClientIdentFrame() : SignedEncryptedFrame() {} + + protected: + void decode_frame(ceph::buffer::list::const_iterator &ti) { + decode(addrs, ti); + decode(gid, ti); + decode(global_seq, ti); + decode(supported_features, ti); + decode(required_features, ti); + decode(flags, ti); + } + }; + + struct ServerIdentFrame : public ClientIdentFrame { + uint64_t cookie; + + ServerIdentFrame(const entity_addrvec_t &addrs, int64_t gid, + uint64_t global_seq, uint64_t supported_features, + uint64_t required_features, uint64_t flags, + uint64_t cookie) + : ClientIdentFrame(addrs, gid, global_seq, supported_features, + required_features, flags) { encode(cookie, payload, -1ll); } - IdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { + ServerIdentFrame(char *payload, uint32_t length) : ClientIdentFrame() { + bufferlist bl; + bl.push_back(buffer::create_static(length, payload)); + try { + auto ti = bl.cbegin(); + ClientIdentFrame::decode_frame(ti); + decode(cookie, ti); + } catch (const buffer::error &e) { + } + } + }; + + struct ReconnectFrame : public SignedEncryptedFrame { + entity_addrvec_t addrs; + uint64_t cookie; + uint64_t global_seq; + uint64_t connect_seq; + uint64_t msg_seq; + + ReconnectFrame(const entity_addrvec_t &addrs, uint64_t cookie, + uint64_t global_seq, uint64_t connect_seq, uint64_t msg_seq) + : SignedEncryptedFrame(Tag::SESSION_RECONNECT) { + encode(addrs, payload, -1ll); + encode(cookie, payload, 0); + encode(global_seq, payload, 0); + encode(connect_seq, payload, 0); + encode(msg_seq, payload, 0); + } + + ReconnectFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { bufferlist bl; bl.push_back(buffer::create_static(length, payload)); try { auto ti = bl.cbegin(); decode(addrs, ti); - decode(gid, ti); - decode(supported_features, ti); - decode(required_features, ti); - decode(flags, ti); decode(cookie, ti); + decode(global_seq, ti); + decode(connect_seq, ti); + decode(msg_seq, ti); + } catch (const buffer::error &e) { + } + } + }; + + struct ResetFrame : public SignedEncryptedFrame { + ResetFrame() : SignedEncryptedFrame(Tag::SESSION_RESET) {} + }; + + struct RetryFrame : public SignedEncryptedFrame { + uint64_t connect_seq; + + RetryFrame(uint64_t connect_seq) + : SignedEncryptedFrame(Tag::SESSION_RETRY) { + encode(connect_seq, payload); + } + + RetryFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { + bufferlist bl; + bl.push_back(buffer::create_static(length, payload)); + try { + auto ti = bl.cbegin(); + decode(connect_seq, ti); + } catch (const buffer::error &e) { + } + } + }; + + struct RetryGlobalFrame : public SignedEncryptedFrame { + uint64_t global_seq; + + RetryGlobalFrame(uint64_t global_seq) + : SignedEncryptedFrame(Tag::SESSION_RETRY_GLOBAL) { + encode(global_seq, payload); + } + + RetryGlobalFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { + bufferlist bl; + bl.push_back(buffer::create_static(length, payload)); + try { + auto ti = bl.cbegin(); + decode(global_seq, ti); + } catch (const buffer::error &e) { + } + } + }; + + struct WaitFrame : public SignedEncryptedFrame { + WaitFrame() : SignedEncryptedFrame(Tag::WAIT) {} + }; + + struct ReconnectOkFrame : public SignedEncryptedFrame { + uint64_t msg_seq; + + ReconnectOkFrame(uint64_t msg_seq) + : SignedEncryptedFrame(Tag::SESSION_RECONNECT_OK) { + encode(msg_seq, payload, 0); + } + + ReconnectOkFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { + bufferlist bl; + bl.push_back(buffer::create_static(length, payload)); + try { + auto ti = bl.cbegin(); + decode(msg_seq, ti); } catch (const buffer::error &e) { } } @@ -317,12 +435,16 @@ public: char *temp_buffer; State state; uint64_t peer_required_features; + uint64_t connection_features; uint64_t cookie; + uint64_t global_seq; + uint64_t connect_seq; + uint64_t peer_global_seq; uint64_t message_seq; + bool replacing; bool can_write; std::map>> out_queue; std::list sent; - __u32 connect_seq; std::atomic out_seq{0}; std::atomic in_seq{0}; std::atomic ack_left{0}; @@ -351,9 +473,11 @@ public: bufferlist &bl); void requeue_sent(); + uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); void reset_recv_state(); Ct *_fault(); void discard_out_queue(); + void reset_session(); void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); Message *_get_next_outgoing(bufferlist *bl); ssize_t write_message(Message *m, bufferlist &bl, bool more); @@ -434,6 +558,7 @@ private: CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange); WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_request_write); WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_client_ident_write); + WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_write); Ct *start_client_banner_exchange(); Ct *post_client_banner_exchange(); @@ -444,7 +569,14 @@ private: Ct *handle_auth_done(char *payload, uint32_t length); Ct *send_client_ident(); Ct *handle_client_ident_write(int r); + Ct *send_reconnect(); + Ct *handle_reconnect_write(int r); Ct *handle_ident_missing_features(char *payload, uint32_t length); + Ct *handle_session_reset(); + Ct *handle_session_retry(char *payload, uint32_t length); + Ct *handle_session_retry_global(char *payload, uint32_t length); + Ct *handle_wait(); + Ct *handle_reconnect_ok(char *payload, uint32_t length); Ct *handle_server_ident(char *payload, uint32_t length); // Server Protocol @@ -455,7 +587,11 @@ private: WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_done_write); WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_ident_missing_features_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_send_server_ident_write); + WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_reset_write); + WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_retry_write); + WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_wait_write); + WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_server_ident_write); + WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_ok_write); Ct *start_server_banner_exchange(); Ct *post_server_banner_exchange(); @@ -465,7 +601,17 @@ private: Ct *handle_auth_done_write(int r); Ct *handle_client_ident(char *payload, uint32_t length); Ct *handle_ident_missing_features_write(int r); - Ct *handle_send_server_ident_write(int r); + Ct *handle_reconnect(char *payload, uint32_t length); + Ct *handle_session_reset_write(int r); + Ct *handle_session_retry_write(int r); + Ct *handle_existing_connection(AsyncConnectionRef existing); + Ct *handle_wait_write(int r); + Ct *reuse_connection(AsyncConnectionRef existing, + ProtocolV2 *exproto, bool reconnect); + Ct *send_server_ident(); + Ct *handle_server_ident_write(int r); + Ct *send_reconnect_ok(); + Ct *handle_reconnect_ok_write(int r); }; #endif /* _MSG_ASYNC_PROTOCOL_V2_ */