}
seastar::future<ProtocolV2::next_step_t>
-ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
+ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
{
// handle_existing_connection() logic
- logger().trace("{} {}: {}", conn, __func__, *existing);
+ logger().trace("{} {}: {}", conn, __func__, *existing_conn);
- ProtocolV2 *exproto = dynamic_cast<ProtocolV2*>(existing->protocol.get());
- ceph_assert(exproto);
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
- if (exproto->state == state_t::CLOSING) {
- logger().warn("{} existing connection {} already closed.", conn, *existing);
+ if (existing_proto->state == state_t::CLOSING) {
+ logger().warn("{} existing connection {} already closed.", conn, *existing_conn);
return send_server_ident();
}
- if (exproto->state == state_t::REPLACING) {
+ if (existing_proto->state == state_t::REPLACING) {
logger().warn("{} racing replace happened while replacing existing connection {}",
- conn, *existing);
+ conn, *existing_conn);
return send_wait();
}
- if (exproto->peer_global_seq > peer_global_seq) {
+ if (existing_proto->peer_global_seq > peer_global_seq) {
logger().warn("{} this is a stale connection, because peer_global_seq({})"
"< existing->peer_global_seq({}), close this connection"
" in favor of existing connection {}",
- conn, peer_global_seq, exproto->peer_global_seq, *existing);
+ conn, peer_global_seq,
+ existing_proto->peer_global_seq, *existing_conn);
dispatch_reset();
abort_in_close(*this);
}
- if (existing->policy.lossy) {
+ if (existing_conn->policy.lossy) {
// existing connection can be thrown out in favor of this one
logger().warn("{} existing connection {} is a lossy channel. Close existing in favor of"
- " this connection", conn, *existing);
- exproto->dispatch_reset();
- exproto->close();
+ " this connection", conn, *existing_conn);
+ existing_proto->dispatch_reset();
+ existing_proto->close();
return send_server_ident();
}
// Looks good so far, let's check if there is already an existing connection
// to this peer.
- SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr);
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
- if (existing) {
- if (existing->protocol->proto_type != proto_t::v2) {
+ if (existing_conn) {
+ if (existing_conn->protocol->proto_type != proto_t::v2) {
logger().warn("{} existing connection {} proto version is {}, close existing",
- conn, *existing,
- static_cast<int>(existing->protocol->proto_type));
+ conn, *existing_conn,
+ static_cast<int>(existing_conn->protocol->proto_type));
// should unregister the existing from msgr atomically
- existing->close();
+ existing_conn->close();
} else {
- return handle_existing_connection(existing);
+ return handle_existing_connection(existing_conn);
}
}
ceph_assert(conn.peer_addr == conn.target_addr);
peer_global_seq = reconnect.global_seq();
- SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr);
+ SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
- if (!existing) {
+ if (!existing_conn) {
// there is no existing connection therefore cannot reconnect to previous
// session
logger().warn("{} server_reconnect: no existing connection,"
return send_reset(true);
}
- if (existing->protocol->proto_type != proto_t::v2) {
+ if (existing_conn->protocol->proto_type != proto_t::v2) {
logger().warn("{} server_reconnect: existing connection {} proto version is {},"
"close existing and reset client.",
- conn, *existing,
- static_cast<int>(existing->protocol->proto_type));
- existing->close();
+ conn, *existing_conn,
+ static_cast<int>(existing_conn->protocol->proto_type));
+ existing_conn->close();
return send_reset(true);
}
- ProtocolV2 *exproto = dynamic_cast<ProtocolV2*>(existing->protocol.get());
- ceph_assert(exproto);
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ ceph_assert(existing_proto);
- if (exproto->state == state_t::REPLACING) {
+ if (existing_proto->state == state_t::REPLACING) {
logger().warn("{} server_reconnect: racing replace happened while "
" replacing existing connection {}, retry global.",
- conn, *existing);
- return send_retry_global(exproto->peer_global_seq);
+ conn, *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
}
- if (exproto->client_cookie != reconnect.client_cookie()) {
+ if (existing_proto->client_cookie != reconnect.client_cookie()) {
logger().warn("{} server_reconnect:"
" client_cookie mismatch with existing connection {},"
" cc={} rcc={}. I must have reseted, reseting client.",
- conn, *existing, exproto->client_cookie, reconnect.client_cookie());
+ conn, *existing_conn,
+ existing_proto->client_cookie, reconnect.client_cookie());
return send_reset(conn.policy.resetcheck);
- } else if (exproto->server_cookie == 0) {
+ } else if (existing_proto->server_cookie == 0) {
// this happens when:
// - a connects to b
// - a sends client_ident
logger().warn("{} server_reconnect: I was a client and didn't received the"
" server_ident with existing connection {}."
" Asking peer to resume session establishment",
- conn, *existing);
+ conn, *existing_conn);
return send_reset(false);
}
- if (exproto->peer_global_seq > reconnect.global_seq()) {
+ if (existing_proto->peer_global_seq > reconnect.global_seq()) {
logger().warn("{} server_reconnect: stale global_seq: exist_pgs={} peer_gs={},"
" with existing connection {},"
" ask client to retry global",
- conn, exproto->peer_global_seq, reconnect.global_seq(),
- *existing);
- return send_retry_global(exproto->peer_global_seq);
+ conn, existing_proto->peer_global_seq,
+ reconnect.global_seq(), *existing_conn);
+ return send_retry_global(existing_proto->peer_global_seq);
}
- if (exproto->connect_seq > reconnect.connect_seq()) {
+ if (existing_proto->connect_seq > reconnect.connect_seq()) {
logger().warn("{} server_reconnect: stale connect_seq exist_cs={} peer_cs={},"
" with existing connection {},"
" ask client to retry",
- conn, exproto->connect_seq, reconnect.connect_seq(),
- *existing);
- return send_retry(exproto->connect_seq);
+ conn, existing_proto->connect_seq, reconnect.connect_seq(),
+ *existing_conn);
+ return send_retry(existing_proto->connect_seq);
}
- if (exproto->connect_seq == reconnect.connect_seq()) {
+ if (existing_proto->connect_seq == reconnect.connect_seq()) {
// reconnect race: both peers are sending reconnect messages
- if (existing->peer_addr > messenger.get_myaddrs().msgr2_addr() &&
- !existing->policy.server) {
+ if (existing_conn->peer_addr > messenger.get_myaddrs().msgr2_addr() &&
+ !existing_conn->policy.server) {
// the existing connection wins
logger().warn("{} server_reconnect: reconnect race detected,"
" this connection loses to existing connection {},"
- " ask client to wait", conn, *existing);
+ " ask client to wait", conn, *existing_conn);
return send_wait();
} else {
// this connection wins
logger().warn("{} server_reconnect: reconnect race detected,"
" replacing existing connection {}"
" socket by this connection's socket",
- conn, *existing);
+ conn, *existing_conn);
}
}
logger().warn("{} server_reconnect: reconnect to exsiting connection {}",
- conn, *existing);
+ conn, *existing_conn);
// everything looks good
- exproto->connect_seq = reconnect.connect_seq();
+ existing_proto->connect_seq = reconnect.connect_seq();
//exproto->message_seq = reconnect.msg_seq();
// TODO: lossless policy