From: Haomai Wang Date: Fri, 13 Mar 2015 04:21:32 +0000 (+0800) Subject: AsyncConnection: Lock existing's lock in advance avoid existing's state changed X-Git-Tag: v9.0.0~137^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f7a1fdb5637d0435544df5b9c77eae0aa7d35672;p=ceph.git AsyncConnection: Lock existing's lock in advance avoid existing's state changed Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index bda361a4440..8db6849302b 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -272,6 +272,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more) // else return < 0 means error int AsyncConnection::_try_send(bufferlist send_bl, bool send) { + assert(lock.is_locked()); if (send_bl.length()) { if (outcoming_bl.length()) outcoming_bl.claim_append(send_bl); @@ -1587,11 +1588,27 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (existing == this) existing = NULL; if (existing) { + // There is no possible that existing connection will acquire this + // connection's lock + existing->lock.Lock(); + + if (existing->replacing || existing->state == STATE_CLOSED) { + ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." + << " state=" << get_state_name(existing->state) << dendl; + reply.global_seq = existing->peer_global_seq; + r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); + existing->lock.Unlock(); + if (r < 0) + goto fail; + return 0; + } + if (connect.global_seq < existing->peer_global_seq) { ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".gseq " << existing->peer_global_seq << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; reply.global_seq = existing->peer_global_seq; // so we can send it below.. + existing->lock.Unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); } else { ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing @@ -1625,6 +1642,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a << existing->connect_seq << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; reply.connect_seq = existing->connect_seq + 1; + existing->lock.Unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); } @@ -1639,6 +1657,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl; reply.connect_seq = existing->connect_seq + 1; + existing->lock.Unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); } @@ -1657,6 +1676,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a assert(peer_addr > async_msgr->get_myaddr()); // make sure our outgoing connection will follow through existing->send_keepalive(); + existing->lock.Unlock(); return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply); } } @@ -1668,6 +1688,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; + existing->lock.Unlock(); return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); } @@ -1700,20 +1721,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a t.sleep(); } - // There is no possible that existing connection will acquire this lock - existing->lock.Lock(); - - if (existing->replacing || existing->state == STATE_CLOSED) { - ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." - << " state=" << get_state_name(existing->state) << dendl; - reply.global_seq = existing->peer_global_seq; - r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); - existing->lock.Unlock(); - if (r < 0) - goto fail; - return 0; - } - if (existing->policy.lossy) { // disconnect from the Connection existing->center->dispatch_event_external(existing->reset_handler);