<< " remaining bytes " << outcoming_bl.length() << dendl;
if (!open_write && is_queued()) {
- center->create_file_event(sd, EVENT_WRITABLE, write_handler);
- open_write = true;
+ if (center->in_thread()) {
+ center->create_file_event(sd, EVENT_WRITABLE, write_handler);
+ open_write = true;
+ } else {
+ center->dispatch_event_external(write_handler);
+ }
}
if (open_write && !is_queued()) {
- center->delete_file_event(sd, EVENT_WRITABLE);
- open_write = false;
-
+ if (center->in_thread()) {
+ center->delete_file_event(sd, EVENT_WRITABLE);
+ open_write = false;
+ } else {
+ center->dispatch_event_external(write_handler);
+ }
if (state_after_send != STATE_NONE)
center->dispatch_event_external(read_handler);
}
net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf);
net.set_priority(sd, async_msgr->get_socket_priority());
+ center->create_file_event(sd, EVENT_READABLE, read_handler);
bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
existing->is_reset_from_peer = true;
}
- // Now existing connection will be alive and the current connection will
- // exchange socket with existing connection because we want to maintain
- // original "connection_state"
- if (existing->sd >= 0)
- existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
- existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
-
- reply.global_seq = existing->peer_global_seq;
// Clean up output buffer
existing->outcoming_bl.clear();
existing->requeue_sent();
existing->reset_recv_state();
- swap(existing->sd, sd);
+ int new_fd = sd;
+ int pre_exist_fd = existing->sd;
+ std::swap(existing->sd, sd);
+ _stop();
+ // queue a reset on the new connection, which we're dumping for the old
+ dispatch_queue->queue_reset(this);
+ ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
existing->can_write = WriteStatus::NOWRITE;
existing->open_write = false;
existing->replacing = true;
assert(recv_start == recv_end);
existing->write_lock.Unlock();
-
- ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
- _stop();
- // queue a reset on the new connection, which we're dumping for the old
- dispatch_queue->queue_reset(this);
- int new_fd = existing->sd;
- center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable {
+ // existing->sd now isn't registering any event while it's new,
+ // previous existing->sd now is closed, no event will notify
+ // existing(EventCenter*) from now.
+ center->submit_to(existing->center->get_id(), [existing, pre_exist_fd, new_fd, connect, reply, authorizer_reply]() mutable {
Mutex::Locker l(existing->lock);
if (new_fd != existing->sd)
return ;
+ if (existing->state != STATE_ACCEPTING_WAIT_CONNECT_MSG) {
+ existing->fault();
+ return ;
+ }
+ reply.global_seq = existing->peer_global_seq;
+ if (pre_exist_fd >= 0)
+ existing->center->delete_file_event(pre_exist_fd, EVENT_READABLE|EVENT_WRITABLE);
+ existing->center->create_file_event(new_fd, EVENT_READABLE, existing->read_handler);
if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
existing->fault();
Mutex::Locker l(lock);
sd = incoming;
state = STATE_ACCEPTING;
- center->create_file_event(sd, EVENT_READABLE, read_handler);
// rescheduler connection in order to avoid lock dep
center->dispatch_event_external(read_handler);
}
ldout(async_msgr->cct, 1) << __func__ << dendl;
Mutex::Locker l(write_lock);
- if (sd >= 0)
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
reset_recv_state();
dispatch_queue->discard_queue(conn_id);
open_write = false;
can_write = WriteStatus::CLOSED;
state_offset = 0;
- if (sd >= 0) {
- shutdown_socket();
- ::close(sd);
- }
- sd = -1;
// Make sure in-queue events will been processed
center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
-
}
void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)