int r = 0;
ceph_msg_connect_reply reply;
bufferlist reply_bl;
- uint64_t existing_seq = -1;
bool is_reset_from_peer = false;
- char reply_tag = 0;
memset(&reply, 0, sizeof(reply));
reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
assert(0);
replace:
- // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
- if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
- reply_tag = CEPH_MSGR_TAG_SEQ;
- existing_seq = existing->in_seq;
- }
ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
if (async_msgr->cct->_conf->ms_inject_internal_delays) {
reply.connect_seq = existing->connect_seq + 1;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
existing->lock.Unlock();
- if (r < 0) {
+ if (r < 0)
goto fail;
- }
- return r;
+ return 0;
}
- // Here we use "_stop" instead of "mark_down" because "mark_down" is a async
- // operation, but now we need ensure all variables in `existing` is cleaned up
- // and we will reuse it next.
- existing->_stop(true);
if (existing->policy.lossy) {
// disconnect from the Connection
center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing)));
- existing->discard_out_queue();
+ existing->_stop();
} else {
// queue a reset on the new connection, which we're dumping for the old
center->dispatch_event_external(reset_handler);
if (is_reset_from_peer)
existing->in_seq = 0;
- // Clean up output buffer
- existing->outcoming_bl.clear();
- existing->requeue_sent();
- reply.connect_seq = connect.connect_seq + 1;
- if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
- existing->lock.Unlock();
- goto fail;
- }
-
// Now existing connection will be alive and the current connection will
// exchange socket with existing connection because we want to maintain
// original "connection_state"
+ existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
- existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
- center->create_file_event(existing->sd, EVENT_READABLE, read_handler);
+ reply.connect_seq = connect.connect_seq + 1;
+
+ // Clean up output buffer
+ existing->outcoming_bl.clear();
+ existing->requeue_sent();
swap(existing->sd, sd);
- existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
existing->open_write = false;
existing->replacing = true;
+ existing->state_offset = 0;
+ existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+ // there should exist any buffer
+ assert(recv_start == recv_end);
+
+ if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
+ // handle error
+ existing->center->dispatch_event_external(existing->write_handler);
+ }
+
_stop();
existing->lock.Unlock();
return 0;
existing->lock.Unlock();
open:
- replacing = false;
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
- << connect_seq << ", sending READY" << dendl;
+ << connect_seq << ", sending READY" << dendl;
+
+ int next_state;
+
+ // if it is a hard reset from peer(in_seq == 0), we don't need a round-trip to negotiate in/out sequence
+ if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && in_seq) {
+ reply.tag = CEPH_MSGR_TAG_SEQ;
+ next_state = STATE_ACCEPTING_WAIT_SEQ;
+ } else {
+ reply.tag = CEPH_MSGR_TAG_READY;
+ next_state = STATE_ACCEPTING_READY;
+ discard_requeued_up_to(0);
+ }
// send READY reply
- reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
reply.features = policy.features_supported;
reply.global_seq = async_msgr->get_global_seq();
reply.connect_seq = connect_seq;
if (reply.authorizer_len)
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
- int next_state;
+ if (reply.tag == CEPH_MSGR_TAG_SEQ)
+ reply_bl.append((char*)&in_seq, sizeof(in_seq));
- if (reply_tag == CEPH_MSGR_TAG_SEQ) {
- reply_bl.append((char*)&existing_seq, sizeof(existing_seq));
- next_state = STATE_ACCEPTING_WAIT_SEQ;
- } else {
- next_state = STATE_ACCEPTING_READY;
- discard_requeued_up_to(0);
- }
-
- // if replacing, this con is alreadly accepted.
lock.Unlock();
+ // Because "replacing" will prevent other connections preempt this addr,
+ // it's safe that here we don't acquire Connection's lock
r = async_msgr->accept_conn(this);
if (async_msgr->cct->_conf->ms_inject_internal_delays) {
}
lock.Lock();
+ replacing = false;
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
<< " just fail later one(this)" << dendl;
center->dispatch_event_external(accept_handler);
async_msgr->ms_deliver_handle_fast_accept(this);
+
r = _try_send(reply_bl);
if (r < 0)
goto fail_registered;
if (policy.lossy && state != STATE_CONNECTING) {
ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
center->dispatch_event_external(reset_handler);
- discard_out_queue();
_stop();
return ;
}
requeue_sent();
recv_start = recv_end = 0;
state_offset = 0;
+ replacing = false;
outcoming_bl.clear();
if (policy.standby && !is_queued()) {
ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
}
// *note: `async` is true only happen when replacing connection process
-void AsyncConnection::_stop(bool replacing)
+void AsyncConnection::_stop()
{
assert(lock.is_locked());
ldout(async_msgr->cct, 10) << __func__ << dendl;
if (sd > 0)
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
- if (!replacing) {
- discard_out_queue();
- async_msgr->unregister_conn(this);
- }
+ discard_out_queue();
+ async_msgr->unregister_conn(this);
if (async_msgr->cct->_conf->ms_inject_internal_delays) {
ldout(msgr->cct, 10) << __func__ << " sleep for "
t.sleep();
}
+ state = STATE_CLOSED;
shutdown_socket();
open_write = false;
state_offset = 0;
- state = STATE_CLOSED;
if (sd > 0)
::close(sd);
sd = -1;