// else return < 0 means error
int AsyncConnection::_try_send(bufferlist send_bl, bool send)
{
+ assert(lock.is_locked());
if (send_bl.length()) {
if (outcoming_bl.length())
outcoming_bl.claim_append(send_bl);
if (existing == this)
existing = NULL;
if (existing) {
+ // There is no possible that existing connection will acquire this
+ // connection's lock
+ existing->lock.Lock();
+
+ if (existing->replacing || existing->state == STATE_CLOSED) {
+ ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
+ << " state=" << get_state_name(existing->state) << dendl;
+ reply.global_seq = existing->peer_global_seq;
+ r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
+ existing->lock.Unlock();
+ if (r < 0)
+ goto fail;
+ return 0;
+ }
+
if (connect.global_seq < existing->peer_global_seq) {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
<< ".gseq " << existing->peer_global_seq << " > "
<< connect.global_seq << ", RETRY_GLOBAL" << dendl;
reply.global_seq = existing->peer_global_seq; // so we can send it below..
+ existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
} else {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
<< existing->connect_seq << " > " << connect.connect_seq
<< ", RETRY_SESSION" << dendl;
reply.connect_seq = existing->connect_seq + 1;
+ existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
}
<< ".cseq " << existing->connect_seq << " == "
<< connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
reply.connect_seq = existing->connect_seq + 1;
+ existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
}
assert(peer_addr > async_msgr->get_myaddr());
// make sure our outgoing connection will follow through
existing->send_keepalive();
+ existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
}
}
ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
<< connect.connect_seq << ", " << existing << ".cseq = "
<< existing->connect_seq << "), sending RESETSESSION" << dendl;
+ existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
}
t.sleep();
}
- // There is no possible that existing connection will acquire this lock
- existing->lock.Lock();
-
- if (existing->replacing || existing->state == STATE_CLOSED) {
- ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
- << " state=" << get_state_name(existing->state) << dendl;
- reply.global_seq = existing->peer_global_seq;
- r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
- existing->lock.Unlock();
- if (r < 0)
- goto fail;
- return 0;
- }
-
if (existing->policy.lossy) {
// disconnect from the Connection
existing->center->dispatch_event_external(existing->reset_handler);