port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"),
- got_bad_auth(false), authorizer(NULL), replacing(false), allow_session_reset(true),
+ got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
{
read_handler.reset(new C_handle_read(this));
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
assert(reply.connect_seq > connect_seq);
- connect_seq = reply.connect_seq;
ldout(async_msgr->cct, 10) << __func__ << " connect got RETRY_SESSION "
- << connect_seq << " -> "
- << reply.connect_seq << dendl;
+ << connect_seq << " -> "
+ << reply.connect_seq << dendl;
+ connect_seq = reply.connect_seq;
state = STATE_CONNECTING_SEND_CONNECT_MSG;
}
if (reply.tag == CEPH_MSGR_TAG_WAIT) {
assert(connect.connect_seq > existing->connect_seq);
assert(connect.global_seq >= existing->peer_global_seq);
if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
- existing->connect_seq == 0 && allow_session_reset) {
+ existing->connect_seq == 0) {
ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
<< connect.connect_seq << ", " << existing << ".cseq = "
<< existing->connect_seq << "), sending RESETSESSION" << dendl;
- allow_session_reset = false;
return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
}
if (existing->replacing || existing->state == STATE_CLOSED) {
ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
<< " state=" << get_state_name(existing->state) << dendl;
- reply.connect_seq = connect.connect_seq + 1;
- r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
+ reply.global_seq = existing->peer_global_seq;
+ r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
existing->lock.Unlock();
if (r < 0)
goto fail;
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
- reply.connect_seq = connect.connect_seq + 1;
+ reply.global_seq = existing->peer_global_seq;
// Clean up output buffer
existing->outcoming_bl.clear();
// there should exist any buffer
assert(recv_start == recv_end);
- if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
+ if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
existing->center->dispatch_event_external(existing->write_handler);
}
}
lock.Lock();
- once_ready = true;
replacing = false;
- allow_session_reset = true;
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
<< " just fail later one(this)" << dendl;
goto fail_registered;
}
- // notify
- center->dispatch_event_external(accept_handler);
- async_msgr->ms_deliver_handle_fast_accept(this);
-
-
r = _try_send(reply_bl);
if (r < 0)
goto fail_registered;
+ // notify
+ center->dispatch_event_external(accept_handler);
+ async_msgr->ms_deliver_handle_fast_accept(this);
+ once_ready = true;
+
if (r == 0) {
state = next_state;
ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl;
state_offset = 0;
replacing = false;
outcoming_bl.clear();
+ if (!once_ready && !is_queued() &&
+ state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
+ ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
+ << "accept state just closed, state="
+ << get_state_name(state) << dendl;
+ _stop();
+ return ;
+ }
if (policy.standby && !is_queued()) {
- if (!once_ready) {
- // a half connection, close
- _stop();
- } else {
- ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
- state = STATE_STANDBY;
- }
+ ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
+ state = STATE_STANDBY;
return;
}
center->dispatch_event_external(remote_reset_handler);
if (randomize_out_seq()) {
- lsubdout(async_msgr->cct,ms,15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+ ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
}
in_seq = 0;
connect_seq = 0;
in_seq_acked = 0;
- allow_session_reset = true;
+ once_ready = false;
}
void AsyncConnection::_stop()