From: Haomai Wang Date: Wed, 11 Feb 2015 09:37:34 +0000 (+0800) Subject: AsyncConnection: Use retry_global tag instead of retry_session X-Git-Tag: suse_latest~36^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a640ca93307e92f8032881e35fdd74b60f0091d0;p=ceph.git AsyncConnection: Use retry_global tag instead of retry_session Via two qa lab tests, it's hard to use retry_session which will cause connect_seq increase, it will let reset_check harder. So use retry_global should avoid side effect for replacing. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 55431f091a11..e043f778ead4 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -180,7 +180,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"), - got_bad_auth(false), authorizer(NULL), replacing(false), allow_session_reset(true), + got_bad_auth(false), authorizer(NULL), replacing(false), is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) { read_handler.reset(new C_handle_read(this)); @@ -1472,10 +1472,10 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co } if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { assert(reply.connect_seq > connect_seq); - connect_seq = reply.connect_seq; ldout(async_msgr->cct, 10) << __func__ << " connect got RETRY_SESSION " - << connect_seq << " -> " - << reply.connect_seq << dendl; + << connect_seq << " -> " + << reply.connect_seq << dendl; + connect_seq = reply.connect_seq; state = STATE_CONNECTING_SEND_CONNECT_MSG; } if (reply.tag == CEPH_MSGR_TAG_WAIT) { @@ -1656,11 +1656,10 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a assert(connect.connect_seq > existing->connect_seq); assert(connect.global_seq >= existing->peer_global_seq); if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other - existing->connect_seq == 0 && allow_session_reset) { + existing->connect_seq == 0) { ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; - allow_session_reset = false; return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); } @@ -1699,8 +1698,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (existing->replacing || existing->state == STATE_CLOSED) { ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." << " state=" << get_state_name(existing->state) << dendl; - reply.connect_seq = connect.connect_seq + 1; - r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); + reply.global_seq = existing->peer_global_seq; + r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); existing->lock.Unlock(); if (r < 0) goto fail; @@ -1729,7 +1728,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler); - reply.connect_seq = connect.connect_seq + 1; + reply.global_seq = existing->peer_global_seq; // Clean up output buffer existing->outcoming_bl.clear(); @@ -1743,7 +1742,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a // there should exist any buffer assert(recv_start == recv_end); - if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) { + if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error existing->center->dispatch_event_external(existing->write_handler); } @@ -1812,9 +1811,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } lock.Lock(); - once_ready = true; replacing = false; - allow_session_reset = true; if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr << " just fail later one(this)" << dendl; @@ -1827,15 +1824,15 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a goto fail_registered; } - // notify - center->dispatch_event_external(accept_handler); - async_msgr->ms_deliver_handle_fast_accept(this); - - r = _try_send(reply_bl); if (r < 0) goto fail_registered; + // notify + center->dispatch_event_external(accept_handler); + async_msgr->ms_deliver_handle_fast_accept(this); + once_ready = true; + if (r == 0) { state = next_state; ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl; @@ -2025,14 +2022,17 @@ void AsyncConnection::fault() state_offset = 0; replacing = false; outcoming_bl.clear(); + if (!once_ready && !is_queued() && + state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { + ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half " + << "accept state just closed, state=" + << get_state_name(state) << dendl; + _stop(); + return ; + } if (policy.standby && !is_queued()) { - if (!once_ready) { - // a half connection, close - _stop(); - } else { - ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; - state = STATE_STANDBY; - } + ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; + state = STATE_STANDBY; return; } @@ -2072,13 +2072,13 @@ void AsyncConnection::was_session_reset() center->dispatch_event_external(remote_reset_handler); if (randomize_out_seq()) { - lsubdout(async_msgr->cct,ms,15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; + ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; } in_seq = 0; connect_seq = 0; in_seq_acked = 0; - allow_session_reset = true; + once_ready = false; } void AsyncConnection::_stop()