public:
explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
void do_request(int id) {
- conn->cleanup_handler();
+ conn->cleanup();
delete this;
}
};
break;
}
+ case STATE_NONE:
+ {
+ ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
+ break;
+ }
+
case STATE_CLOSED:
{
if (sd >= 0)
existing->reset_recv_state();
int new_fd = sd;
- int pre_exist_fd = existing->sd;
- std::swap(existing->sd, sd);
- _stop();
+ EventCenter *new_center = center;
+ Worker *new_worker = worker;
+ // avoid _stop shutdown replacing socket
+ sd = -1;
// queue a reset on the new connection, which we're dumping for the old
+ _stop();
dispatch_queue->queue_reset(this);
ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
existing->can_write = WriteStatus::NOWRITE;
existing->open_write = false;
existing->replacing = true;
existing->state_offset = 0;
- existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+ // avoid previous thread modify event
+ existing->state = STATE_NONE;
// Discard existing prefetch buffer in `recv_buf`
existing->recv_start = existing->recv_end = 0;
// there shouldn't exist any buffer
assert(recv_start == recv_end);
existing->write_lock.Unlock();
- // existing->sd now isn't registering any event while it's new,
- // previous existing->sd now is closed, no event will notify
- // existing(EventCenter*) from now.
- center->submit_to(existing->center->get_id(), [existing, pre_exist_fd, new_fd, connect, reply, authorizer_reply]() mutable {
- Mutex::Locker l(existing->lock);
- if (new_fd != existing->sd)
- return ;
-
- if (existing->state != STATE_ACCEPTING_WAIT_CONNECT_MSG) {
- existing->fault();
- return ;
- }
- reply.global_seq = existing->peer_global_seq;
- if (pre_exist_fd >= 0)
- existing->center->delete_file_event(pre_exist_fd, EVENT_READABLE|EVENT_WRITABLE);
- existing->center->create_file_event(new_fd, EVENT_READABLE, existing->read_handler);
- if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
- // handle error
- existing->fault();
+ // new sd now isn't registered any event while origin events
+ // have been deleted.
+ // previous existing->sd now is still open, event will continue to
+ // notify previous existing->center from now.
+ // From now, no one will dispatch event to `existing`
+ // Note: we must use async dispatch instead of execute this inline
+ // even existing->center == center. Because we must ensure below
+ // event executed after all pending external events like
+ // "dispatch_state->queue"
+ existing->center->submit_to(
+ existing->center->get_id(),
+ [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable {
+ // we need to delete time event in original thread
+ {
+ Mutex::Locker l(existing->lock);
+ if (existing->state == STATE_NONE) {
+ existing->shutdown_socket();
+ existing->sd = new_fd;
+ existing->worker->references--;
+ new_worker->references++;
+ existing->logger = new_worker->get_perf_counter();
+ existing->worker = new_worker;
+ existing->center = new_center;
+ if (existing->delay_state)
+ existing->delay_state->set_center(new_center);
+ } else if (existing->state == STATE_CLOSED) {
+ ::close(new_fd);
+ return ;
+ } else {
+ assert(0);
+ }
}
+
+ // Before changing existing->center, it may already exists some events in existing->center's queue.
+ // Then if we mark down `existing`, it will execute in another thread and clean up connection.
+ // Previous event will result in segment fault
+ auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable {
+ Mutex::Locker l(existing->lock);
+ if (existing->state == STATE_CLOSED)
+ return ;
+ assert(new_fd == existing->sd);
+ assert(existing->state == STATE_NONE);
+
+ existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+ existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler);
+ reply.global_seq = existing->peer_global_seq;
+ if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
+ // handle error
+ existing->fault();
+ }
+ };
+ if (existing->center->in_thread())
+ transfer_existing();
+ else
+ existing->center->submit_to(
+ existing->center->get_id(), std::move(transfer_existing), true);
}, true);
- existing->lock.Unlock();
+ existing->lock.Unlock();
return 0;
}
existing->lock.Unlock();
void AsyncConnection::fault()
{
- if (state == STATE_CLOSED) {
+ if (state == STATE_CLOSED || state == STATE_NONE) {
ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl;
return ;
}
if (state == STATE_STANDBY && !policy.server && is_queued()) {
ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
_connect();
- } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
+ } else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;