int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
bufferlist &authorizer_reply)
{
- int r;
+ int r = 0;
ceph_msg_connect_reply reply;
bufferlist reply_bl;
uint64_t existing_seq = -1;
ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl;
// existing?
+ lock.Unlock();
AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
+ lock.Lock();
+ if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
+ ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state="
+ << get_state_name(state) << dendl;
+ assert(state == STATE_CLOSED);
+ goto fail;
+ }
+
if (existing) {
if (connect.global_seq < existing->peer_global_seq) {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
}
ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
- // In order to avoid dead lock, here need to lock in ordering.
- // It may be another thread access this connection between unlock and lock
- // call, this is rely to EventCenter to guarantee only one thread can access
- // one connection.
- lock.Unlock();
- if (existing->sd > sd) {
- existing->lock.Lock();
- lock.Lock();
- } else {
- lock.Lock();
- existing->lock.Lock();
+ // There is no possible that existing connection will acquire this lock
+ existing->lock.Lock();
+
+ if (existing->replacing || existing->state == STATE_CLOSED) {
+ ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
+ << " state=" << get_state_name(existing->state) << dendl;
+ reply.connect_seq = existing->connect_seq + 1;
+ r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
+ existing->lock.Unlock();
+ if (r < 0) {
+ goto fail;
+ }
+ return r;
}
+
// Here we use "_stop" instead of "mark_down" because "mark_down" is a async
// operation, but now we need ensure all variables in `existing` is cleaned up
// and we will reuse it next.
existing->outcoming_bl.clear();
existing->requeue_sent();
reply.connect_seq = connect.connect_seq + 1;
- if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0)
+ if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
+ existing->lock.Unlock();
goto fail;
+ }
// Now existing connection will be alive and the current connection will
// exchange socket with existing connection because we want to maintain
// original "connection_state"
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
- center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
+ existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
+
+ existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
+ center->create_file_event(existing->sd, EVENT_READABLE, read_handler);
swap(existing->sd, sd);
existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol,
session_key, get_features()));
- // notify
- center->dispatch_event_external(accept_handler);
- async_msgr->ms_deliver_handle_fast_accept(this);
-
- // ok!
- async_msgr->accept_conn(this);
-
reply_bl.append((char*)&reply, sizeof(reply));
if (reply.authorizer_len)
discard_requeued_up_to(0);
}
- r = _try_send(reply_bl);
+ // if replacing, this con is alreadly accepted.
+ lock.Unlock();
+ r = async_msgr->accept_conn(this);
+ lock.Lock();
if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
+ << " just fail later one(this)" << dendl;
goto fail;
}
+ if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
+ ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state="
+ << get_state_name(state) << dendl;
+ assert(state == STATE_CLOSED);
+ goto fail;
+ }
+
+ // notify
+ center->dispatch_event_external(accept_handler);
+ async_msgr->ms_deliver_handle_fast_accept(this);
+
+ r = _try_send(reply_bl);
+ if (r < 0)
+ goto fail;
if (r == 0) {
state = next_state;