From: Haomai Wang Date: Tue, 8 Mar 2016 05:59:50 +0000 (+0800) Subject: AsyncMessenger: make create/delete_file_event within event thread X-Git-Tag: ses5-milestone5~429^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=478dd900271bf6485d458d51f81852fba6079a67;p=ceph.git AsyncMessenger: make create/delete_file_event within event thread We are make each AsyncConnection/AsyncMessenger only modify its file event in event thread. So make sure create/delete_file_event aren't directly called. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 196a43ae0303..b857976b5383 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -362,14 +362,21 @@ ssize_t AsyncConnection::_try_send(bool more) << " remaining bytes " << outcoming_bl.length() << dendl; if (!open_write && is_queued()) { - center->create_file_event(sd, EVENT_WRITABLE, write_handler); - open_write = true; + if (center->in_thread()) { + center->create_file_event(sd, EVENT_WRITABLE, write_handler); + open_write = true; + } else { + center->dispatch_event_external(write_handler); + } } if (open_write && !is_queued()) { - center->delete_file_event(sd, EVENT_WRITABLE); - open_write = false; - + if (center->in_thread()) { + center->delete_file_event(sd, EVENT_WRITABLE); + open_write = false; + } else { + center->dispatch_event_external(write_handler); + } if (state_after_send != STATE_NONE) center->dispatch_event_external(read_handler); } @@ -1333,6 +1340,7 @@ ssize_t AsyncConnection::_process_connection() net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf); net.set_priority(sd, async_msgr->get_socket_priority()); + center->create_file_event(sd, EVENT_READABLE, read_handler); bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); @@ -1796,15 +1804,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->is_reset_from_peer = true; } - // Now existing connection will be alive and the current connection will - // exchange socket with existing connection because we want to maintain - // original "connection_state" - if (existing->sd >= 0) - existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE); center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler); - - reply.global_seq = existing->peer_global_seq; // Clean up output buffer existing->outcoming_bl.clear(); @@ -1815,7 +1815,13 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->requeue_sent(); existing->reset_recv_state(); - swap(existing->sd, sd); + int new_fd = sd; + int pre_exist_fd = existing->sd; + std::swap(existing->sd, sd); + _stop(); + // queue a reset on the new connection, which we're dumping for the old + dispatch_queue->queue_reset(this); + ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; existing->can_write = WriteStatus::NOWRITE; existing->open_write = false; existing->replacing = true; @@ -1827,17 +1833,22 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis assert(recv_start == recv_end); existing->write_lock.Unlock(); - - ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; - _stop(); - // queue a reset on the new connection, which we're dumping for the old - dispatch_queue->queue_reset(this); - int new_fd = existing->sd; - center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable { + // existing->sd now isn't registering any event while it's new, + // previous existing->sd now is closed, no event will notify + // existing(EventCenter*) from now. + center->submit_to(existing->center->get_id(), [existing, pre_exist_fd, new_fd, connect, reply, authorizer_reply]() mutable { Mutex::Locker l(existing->lock); if (new_fd != existing->sd) return ; + if (existing->state != STATE_ACCEPTING_WAIT_CONNECT_MSG) { + existing->fault(); + return ; + } + reply.global_seq = existing->peer_global_seq; + if (pre_exist_fd >= 0) + existing->center->delete_file_event(pre_exist_fd, EVENT_READABLE|EVENT_WRITABLE); + existing->center->create_file_event(new_fd, EVENT_READABLE, existing->read_handler); if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error existing->fault(); @@ -1961,7 +1972,6 @@ void AsyncConnection::accept(int incoming) Mutex::Locker l(lock); sd = incoming; state = STATE_ACCEPTING; - center->create_file_event(sd, EVENT_READABLE, read_handler); // rescheduler connection in order to avoid lock dep center->dispatch_event_external(read_handler); } @@ -2231,8 +2241,6 @@ void AsyncConnection::_stop() ldout(async_msgr->cct, 1) << __func__ << dendl; Mutex::Locker l(write_lock); - if (sd >= 0) - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); reset_recv_state(); dispatch_queue->discard_queue(conn_id); @@ -2244,14 +2252,8 @@ void AsyncConnection::_stop() open_write = false; can_write = WriteStatus::CLOSED; state_offset = 0; - if (sd >= 0) { - shutdown_socket(); - ::close(sd); - } - sd = -1; // Make sure in-queue events will been processed center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); - } void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl) diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 5769827b9d72..1f8379756cf8 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -393,6 +393,12 @@ class AsyncConnection : public Connection { center->delete_time_event(t); register_time_events.clear(); center->delete_time_event(last_tick_id); + if (sd >= 0) { + center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + shutdown_socket(); + ::close(sd); + sd = -1; + } delete read_handler; delete write_handler; delete wakeup_handler; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 78251ff69966..b81a0781a313 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -244,7 +244,8 @@ int Processor::start(Worker *w) // start thread if (listen_sd >= 0) { worker = w; - w->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); }); } return 0; @@ -290,10 +291,12 @@ void Processor::stop() ldout(msgr->cct,10) << __func__ << dendl; if (listen_sd >= 0) { - worker->center.delete_file_event(listen_sd, EVENT_READABLE); - ::shutdown(listen_sd, SHUT_RDWR); - ::close(listen_sd); - listen_sd = -1; + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.delete_file_event(listen_sd, EVENT_READABLE); + ::shutdown(listen_sd, SHUT_RDWR); + ::close(listen_sd); + listen_sd = -1; + }); } }