From 469a9cda73dde8a1884d30a7231ade3b8b4ed7dd Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 7 Aug 2019 22:00:33 +0800 Subject: [PATCH] crimson/net: clean up, exsiting_conn and existing_proto Signed-off-by: Yingxin Cheng --- src/crimson/net/ProtocolV2.cc | 106 ++++++++++++++++++---------------- src/crimson/net/ProtocolV2.h | 2 +- 2 files changed, 56 insertions(+), 52 deletions(-) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 54fcb588a2c..c048d1812a9 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1024,40 +1024,42 @@ ProtocolV2::send_wait() } seastar::future -ProtocolV2::handle_existing_connection(SocketConnectionRef existing) +ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) { // handle_existing_connection() logic - logger().trace("{} {}: {}", conn, __func__, *existing); + logger().trace("{} {}: {}", conn, __func__, *existing_conn); - ProtocolV2 *exproto = dynamic_cast(existing->protocol.get()); - ceph_assert(exproto); + ProtocolV2 *existing_proto = dynamic_cast( + existing_conn->protocol.get()); + ceph_assert(existing_proto); - if (exproto->state == state_t::CLOSING) { - logger().warn("{} existing connection {} already closed.", conn, *existing); + if (existing_proto->state == state_t::CLOSING) { + logger().warn("{} existing connection {} already closed.", conn, *existing_conn); return send_server_ident(); } - if (exproto->state == state_t::REPLACING) { + if (existing_proto->state == state_t::REPLACING) { logger().warn("{} racing replace happened while replacing existing connection {}", - conn, *existing); + conn, *existing_conn); return send_wait(); } - if (exproto->peer_global_seq > peer_global_seq) { + if (existing_proto->peer_global_seq > peer_global_seq) { logger().warn("{} this is a stale connection, because peer_global_seq({})" "< existing->peer_global_seq({}), close this connection" " in favor of existing connection {}", - conn, peer_global_seq, exproto->peer_global_seq, *existing); + conn, peer_global_seq, + existing_proto->peer_global_seq, *existing_conn); dispatch_reset(); abort_in_close(*this); } - if (existing->policy.lossy) { + if (existing_conn->policy.lossy) { // existing connection can be thrown out in favor of this one logger().warn("{} existing connection {} is a lossy channel. Close existing in favor of" - " this connection", conn, *existing); - exproto->dispatch_reset(); - exproto->close(); + " this connection", conn, *existing_conn); + existing_proto->dispatch_reset(); + existing_proto->close(); return send_server_ident(); } @@ -1136,17 +1138,17 @@ ProtocolV2::server_connect() // Looks good so far, let's check if there is already an existing connection // to this peer. - SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr); + SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); - if (existing) { - if (existing->protocol->proto_type != proto_t::v2) { + if (existing_conn) { + if (existing_conn->protocol->proto_type != proto_t::v2) { logger().warn("{} existing connection {} proto version is {}, close existing", - conn, *existing, - static_cast(existing->protocol->proto_type)); + conn, *existing_conn, + static_cast(existing_conn->protocol->proto_type)); // should unregister the existing from msgr atomically - existing->close(); + existing_conn->close(); } else { - return handle_existing_connection(existing); + return handle_existing_connection(existing_conn); } } @@ -1221,9 +1223,9 @@ ProtocolV2::server_reconnect() ceph_assert(conn.peer_addr == conn.target_addr); peer_global_seq = reconnect.global_seq(); - SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr); + SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); - if (!existing) { + if (!existing_conn) { // there is no existing connection therefore cannot reconnect to previous // session logger().warn("{} server_reconnect: no existing connection," @@ -1231,32 +1233,34 @@ ProtocolV2::server_reconnect() return send_reset(true); } - if (existing->protocol->proto_type != proto_t::v2) { + if (existing_conn->protocol->proto_type != proto_t::v2) { logger().warn("{} server_reconnect: existing connection {} proto version is {}," "close existing and reset client.", - conn, *existing, - static_cast(existing->protocol->proto_type)); - existing->close(); + conn, *existing_conn, + static_cast(existing_conn->protocol->proto_type)); + existing_conn->close(); return send_reset(true); } - ProtocolV2 *exproto = dynamic_cast(existing->protocol.get()); - ceph_assert(exproto); + ProtocolV2 *existing_proto = dynamic_cast( + existing_conn->protocol.get()); + ceph_assert(existing_proto); - if (exproto->state == state_t::REPLACING) { + if (existing_proto->state == state_t::REPLACING) { logger().warn("{} server_reconnect: racing replace happened while " " replacing existing connection {}, retry global.", - conn, *existing); - return send_retry_global(exproto->peer_global_seq); + conn, *existing_conn); + return send_retry_global(existing_proto->peer_global_seq); } - if (exproto->client_cookie != reconnect.client_cookie()) { + if (existing_proto->client_cookie != reconnect.client_cookie()) { logger().warn("{} server_reconnect:" " client_cookie mismatch with existing connection {}," " cc={} rcc={}. I must have reseted, reseting client.", - conn, *existing, exproto->client_cookie, reconnect.client_cookie()); + conn, *existing_conn, + existing_proto->client_cookie, reconnect.client_cookie()); return send_reset(conn.policy.resetcheck); - } else if (exproto->server_cookie == 0) { + } else if (existing_proto->server_cookie == 0) { // this happens when: // - a connects to b // - a sends client_ident @@ -1267,51 +1271,51 @@ ProtocolV2::server_reconnect() logger().warn("{} server_reconnect: I was a client and didn't received the" " server_ident with existing connection {}." " Asking peer to resume session establishment", - conn, *existing); + conn, *existing_conn); return send_reset(false); } - if (exproto->peer_global_seq > reconnect.global_seq()) { + if (existing_proto->peer_global_seq > reconnect.global_seq()) { logger().warn("{} server_reconnect: stale global_seq: exist_pgs={} peer_gs={}," " with existing connection {}," " ask client to retry global", - conn, exproto->peer_global_seq, reconnect.global_seq(), - *existing); - return send_retry_global(exproto->peer_global_seq); + conn, existing_proto->peer_global_seq, + reconnect.global_seq(), *existing_conn); + return send_retry_global(existing_proto->peer_global_seq); } - if (exproto->connect_seq > reconnect.connect_seq()) { + if (existing_proto->connect_seq > reconnect.connect_seq()) { logger().warn("{} server_reconnect: stale connect_seq exist_cs={} peer_cs={}," " with existing connection {}," " ask client to retry", - conn, exproto->connect_seq, reconnect.connect_seq(), - *existing); - return send_retry(exproto->connect_seq); + conn, existing_proto->connect_seq, reconnect.connect_seq(), + *existing_conn); + return send_retry(existing_proto->connect_seq); } - if (exproto->connect_seq == reconnect.connect_seq()) { + if (existing_proto->connect_seq == reconnect.connect_seq()) { // reconnect race: both peers are sending reconnect messages - if (existing->peer_addr > messenger.get_myaddrs().msgr2_addr() && - !existing->policy.server) { + if (existing_conn->peer_addr > messenger.get_myaddrs().msgr2_addr() && + !existing_conn->policy.server) { // the existing connection wins logger().warn("{} server_reconnect: reconnect race detected," " this connection loses to existing connection {}," - " ask client to wait", conn, *existing); + " ask client to wait", conn, *existing_conn); return send_wait(); } else { // this connection wins logger().warn("{} server_reconnect: reconnect race detected," " replacing existing connection {}" " socket by this connection's socket", - conn, *existing); + conn, *existing_conn); } } logger().warn("{} server_reconnect: reconnect to exsiting connection {}", - conn, *existing); + conn, *existing_conn); // everything looks good - exproto->connect_seq = reconnect.connect_seq(); + existing_proto->connect_seq = reconnect.connect_seq(); //exproto->message_seq = reconnect.msg_seq(); // TODO: lossless policy diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index a6435e088c1..79907ac93f2 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -127,7 +127,7 @@ class ProtocolV2 final : public Protocol { seastar::future send_wait(); - seastar::future handle_existing_connection(SocketConnectionRef existing); + seastar::future handle_existing_connection(SocketConnectionRef existing_conn); seastar::future server_connect(); seastar::future read_reconnect(); -- 2.39.5