connect_seq(0),
peer_global_seq(0),
message_seq(0),
+ reconnecting(false),
replacing(false),
can_write(false),
bannerExchangeCallback(nullptr),
connection->fault();
reset_recv_state();
+ if (state == ACCEPTING_SESSION && !reconnecting) {
+ // if the connection is generating a new session and fails during this
+ // process, we need to clean the cookie. Otherwise, the connection might
+ // later try to reconnect to a non-existing session, and the other side
+ // will force a SESSION_RESET, which will cause a drop of the out_queue.
+ cookie = 0;
+ }
+ reconnecting = false;
+
if (connection->policy.standby && out_queue.empty() && !keepalive &&
state != WAIT) {
ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
CtPtr ProtocolV2::ready() {
ldout(cct, 25) << __func__ << dendl;
+ reconnecting = false;
replacing = false;
// make sure no pending tick timer
ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
+ reconnecting = true;
+
// everything looks good
exproto->connect_seq = reconnect.connect_seq();
exproto->message_seq = reconnect.msg_seq();
- return reuse_connection(existing, exproto, true);
+ return reuse_connection(existing, exproto);
}
CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
if (connection->policy.resetcheck) {
exproto->reset_session();
}
- return reuse_connection(existing, exproto, false);
+ return reuse_connection(existing, exproto);
}
if (exproto->state == READY || exproto->state == STANDBY) {
ldout(cct, 1) << __func__ << " existing=" << existing
<< " is READY/STANDBY, lets reuse it" << dendl;
- return reuse_connection(existing, exproto, false);
+ return reuse_connection(existing, exproto);
}
// Looks like a connection race: server and client are both connecting to
ldout(cct, 1) << __func__
<< " connection race detected, replacing existing="
<< existing << " socket by this connection's socket" << dendl;
- return reuse_connection(existing, exproto, false);
+ return reuse_connection(existing, exproto);
} else {
// the existing connection wins
ldout(cct, 1)
}
CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
- ProtocolV2 *exproto, bool reconnect) {
+ ProtocolV2 *exproto) {
ldout(cct, 20) << __func__ << " existing=" << existing
- << " reconnect=" << reconnect << dendl;
+ << " reconnect=" << reconnecting << dendl;
connection->inject_delay();
ceph_assert(!connection->delay_state);
}
exproto->reset_recv_state();
- if (!reconnect) {
+ if (!reconnecting) {
exproto->peer_name = peer_name;
exproto->peer_global_seq = peer_global_seq;
exproto->connection_features = connection_features;
connection->dispatch_queue->queue_reset(connection);
exproto->can_write = false;
+ exproto->reconnecting = reconnecting;
exproto->replacing = true;
exproto->session_security = session_security;
exproto->auth_method = auth_method;
ceph_assert(connection->recv_start == connection->recv_end);
auto deactivate_existing = std::bind(
- [existing, new_worker, new_center, exproto,
- reconnect](ConnectedSocket &cs) mutable {
+ [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
std::lock_guard<std::mutex> l(existing->lock);
// events in existing->center's queue. Then if we mark down
// `existing`, it will execute in another thread and clean up
// connection. Previous event will result in segment fault
- auto transfer_existing = [existing, exproto, reconnect]() mutable {
+ auto transfer_existing = [existing, exproto]() mutable {
std::lock_guard<std::mutex> l(existing->lock);
if (exproto->state == CLOSED) return;
ceph_assert(exproto->state == NONE);
existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
existing->read_handler);
- if (!reconnect) {
+ if (!exproto->reconnecting) {
exproto->run_continuation(exproto->send_server_ident());
} else {
exproto->run_continuation(exproto->send_reconnect_ok());