From 23c0883d4f23cc249e3dd8f3837149ffc4e19602 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Thu, 24 Jan 2019 09:48:44 +0000 Subject: [PATCH] msg/async: msgr2: clean cookie if connection failed in ACCEPT_SESSION Signed-off-by: Ricardo Dias --- src/msg/async/ProtocolV2.cc | 35 ++++++++++++++++++++++++----------- src/msg/async/ProtocolV2.h | 3 ++- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index ec8e7e534b3..7084ca8ea84 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -423,6 +423,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) connect_seq(0), peer_global_seq(0), message_seq(0), + reconnecting(false), replacing(false), can_write(false), bannerExchangeCallback(nullptr), @@ -643,6 +644,15 @@ CtPtr ProtocolV2::_fault() { connection->fault(); reset_recv_state(); + if (state == ACCEPTING_SESSION && !reconnecting) { + // if the connection is generating a new session and fails during this + // process, we need to clean the cookie. Otherwise, the connection might + // later try to reconnect to a non-existing session, and the other side + // will force a SESSION_RESET, which will cause a drop of the out_queue. + cookie = 0; + } + reconnecting = false; + if (connection->policy.standby && out_queue.empty() && !keepalive && state != WAIT) { ldout(cct, 1) << __func__ << " with nothing to send, going to standby" @@ -1488,6 +1498,7 @@ CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) { CtPtr ProtocolV2::ready() { ldout(cct, 25) << __func__ << dendl; + reconnecting = false; replacing = false; // make sure no pending tick timer @@ -2710,11 +2721,13 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl; + reconnecting = true; + // everything looks good exproto->connect_seq = reconnect.connect_seq(); exproto->message_seq = reconnect.msg_seq(); - return reuse_connection(existing, exproto, true); + return reuse_connection(existing, exproto); } CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { @@ -2772,13 +2785,13 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { if (connection->policy.resetcheck) { exproto->reset_session(); } - return reuse_connection(existing, exproto, false); + return reuse_connection(existing, exproto); } if (exproto->state == READY || exproto->state == STANDBY) { ldout(cct, 1) << __func__ << " existing=" << existing << " is READY/STANDBY, lets reuse it" << dendl; - return reuse_connection(existing, exproto, false); + return reuse_connection(existing, exproto); } // Looks like a connection race: server and client are both connecting to @@ -2790,7 +2803,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { ldout(cct, 1) << __func__ << " connection race detected, replacing existing=" << existing << " socket by this connection's socket" << dendl; - return reuse_connection(existing, exproto, false); + return reuse_connection(existing, exproto); } else { // the existing connection wins ldout(cct, 1) @@ -2813,9 +2826,9 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { } CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, - ProtocolV2 *exproto, bool reconnect) { + ProtocolV2 *exproto) { ldout(cct, 20) << __func__ << " existing=" << existing - << " reconnect=" << reconnect << dendl; + << " reconnect=" << reconnecting << dendl; connection->inject_delay(); @@ -2829,7 +2842,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, ceph_assert(!connection->delay_state); } exproto->reset_recv_state(); - if (!reconnect) { + if (!reconnecting) { exproto->peer_name = peer_name; exproto->peer_global_seq = peer_global_seq; exproto->connection_features = connection_features; @@ -2848,6 +2861,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, connection->dispatch_queue->queue_reset(connection); exproto->can_write = false; + exproto->reconnecting = reconnecting; exproto->replacing = true; exproto->session_security = session_security; exproto->auth_method = auth_method; @@ -2864,8 +2878,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, ceph_assert(connection->recv_start == connection->recv_end); auto deactivate_existing = std::bind( - [existing, new_worker, new_center, exproto, - reconnect](ConnectedSocket &cs) mutable { + [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable { // we need to delete time event in original thread { std::lock_guard l(existing->lock); @@ -2899,7 +2912,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, // events in existing->center's queue. Then if we mark down // `existing`, it will execute in another thread and clean up // connection. Previous event will result in segment fault - auto transfer_existing = [existing, exproto, reconnect]() mutable { + auto transfer_existing = [existing, exproto]() mutable { std::lock_guard l(existing->lock); if (exproto->state == CLOSED) return; ceph_assert(exproto->state == NONE); @@ -2908,7 +2921,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED; existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); - if (!reconnect) { + if (!exproto->reconnecting) { exproto->run_continuation(exproto->send_server_ident()); } else { exproto->run_continuation(exproto->send_reconnect_ok()); diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index f289b95fcb7..d6721ffcf3d 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -87,6 +87,7 @@ private: uint64_t connect_seq; uint64_t peer_global_seq; uint64_t message_seq; + bool reconnecting; bool replacing; bool can_write; std::map>> out_queue; @@ -252,7 +253,7 @@ private: Ct *handle_reconnect(char *payload, uint32_t length); Ct *handle_existing_connection(AsyncConnectionRef existing); Ct *reuse_connection(AsyncConnectionRef existing, - ProtocolV2 *exproto, bool reconnect); + ProtocolV2 *exproto); Ct *send_server_ident(); Ct *send_reconnect_ok(); Ct *server_ready(); -- 2.39.5