]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: make create/delete_file_event within event thread
authorHaomai Wang <haomai@xsky.com>
Tue, 8 Mar 2016 05:59:50 +0000 (13:59 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 12 Jul 2016 15:37:22 +0000 (23:37 +0800)
We are make each AsyncConnection/AsyncMessenger only modify its file event
in event thread. So make sure create/delete_file_event aren't directly called.

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc

index 196a43ae03031f114e97a0a25038ee09e1df6592..b857976b5383b3d9fc28f923db2852049ed73df2 100644 (file)
@@ -362,14 +362,21 @@ ssize_t AsyncConnection::_try_send(bool more)
                              << " remaining bytes " << outcoming_bl.length() << dendl;
 
   if (!open_write && is_queued()) {
-    center->create_file_event(sd, EVENT_WRITABLE, write_handler);
-    open_write = true;
+    if (center->in_thread()) {
+      center->create_file_event(sd, EVENT_WRITABLE, write_handler);
+      open_write = true;
+    } else {
+      center->dispatch_event_external(write_handler);
+    }
   }
 
   if (open_write && !is_queued()) {
-    center->delete_file_event(sd, EVENT_WRITABLE);
-    open_write = false;
-
+    if (center->in_thread()) {
+      center->delete_file_event(sd, EVENT_WRITABLE);
+      open_write = false;
+    } else {
+      center->dispatch_event_external(write_handler);
+    }
     if (state_after_send != STATE_NONE)
       center->dispatch_event_external(read_handler);
   }
@@ -1333,6 +1340,7 @@ ssize_t AsyncConnection::_process_connection()
 
         net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf);
         net.set_priority(sd, async_msgr->get_socket_priority());
+        center->create_file_event(sd, EVENT_READABLE, read_handler);
 
         bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
 
@@ -1796,15 +1804,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
       existing->is_reset_from_peer = true;
     }
 
-    // Now existing connection will be alive and the current connection will
-    // exchange socket with existing connection because we want to maintain
-    // original "connection_state"
-    if (existing->sd >= 0)
-      existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
-    existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
-
-    reply.global_seq = existing->peer_global_seq;
 
     // Clean up output buffer
     existing->outcoming_bl.clear();
@@ -1815,7 +1815,13 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     existing->requeue_sent();
     existing->reset_recv_state();
 
-    swap(existing->sd, sd);
+    int new_fd = sd;
+    int pre_exist_fd = existing->sd;
+    std::swap(existing->sd, sd);
+    _stop();
+    // queue a reset on the new connection, which we're dumping for the old
+    dispatch_queue->queue_reset(this);
+    ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
     existing->can_write = WriteStatus::NOWRITE;
     existing->open_write = false;
     existing->replacing = true;
@@ -1827,17 +1833,22 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     assert(recv_start == recv_end);
 
     existing->write_lock.Unlock();
-
-    ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
-    _stop();
-    // queue a reset on the new connection, which we're dumping for the old
-    dispatch_queue->queue_reset(this);
-    int new_fd = existing->sd;
-    center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable {
+    // existing->sd now isn't registering any event while it's new,
+    // previous existing->sd now is closed, no event will notify
+    // existing(EventCenter*) from now.
+    center->submit_to(existing->center->get_id(), [existing, pre_exist_fd, new_fd, connect, reply, authorizer_reply]() mutable {
       Mutex::Locker l(existing->lock);
       if (new_fd != existing->sd)
         return ;
 
+      if (existing->state != STATE_ACCEPTING_WAIT_CONNECT_MSG) {
+        existing->fault();
+        return ;
+      }
+      reply.global_seq = existing->peer_global_seq;
+      if (pre_exist_fd >= 0)
+        existing->center->delete_file_event(pre_exist_fd, EVENT_READABLE|EVENT_WRITABLE);
+      existing->center->create_file_event(new_fd, EVENT_READABLE, existing->read_handler);
       if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
         // handle error
         existing->fault();
@@ -1961,7 +1972,6 @@ void AsyncConnection::accept(int incoming)
   Mutex::Locker l(lock);
   sd = incoming;
   state = STATE_ACCEPTING;
-  center->create_file_event(sd, EVENT_READABLE, read_handler);
   // rescheduler connection in order to avoid lock dep
   center->dispatch_event_external(read_handler);
 }
@@ -2231,8 +2241,6 @@ void AsyncConnection::_stop()
 
   ldout(async_msgr->cct, 1) << __func__ << dendl;
   Mutex::Locker l(write_lock);
-  if (sd >= 0)
-    center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
 
   reset_recv_state();
   dispatch_queue->discard_queue(conn_id);
@@ -2244,14 +2252,8 @@ void AsyncConnection::_stop()
   open_write = false;
   can_write = WriteStatus::CLOSED;
   state_offset = 0;
-  if (sd >= 0) {
-    shutdown_socket();
-    ::close(sd);
-  }
-  sd = -1;
   // Make sure in-queue events will been processed
   center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
-
 }
 
 void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
index 5769827b9d720819205e858ff6f790377c19afad..1f8379756cf857083c708f6462df7b4db2c4099c 100644 (file)
@@ -393,6 +393,12 @@ class AsyncConnection : public Connection {
       center->delete_time_event(t);
     register_time_events.clear();
     center->delete_time_event(last_tick_id);
+    if (sd >= 0) {
+      center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+      shutdown_socket();
+      ::close(sd);
+      sd = -1;
+    }
     delete read_handler;
     delete write_handler;
     delete wakeup_handler;
index 78251ff699666ee0c3b7fb4e39343681c73f6cfe..b81a0781a313a4b0763618b99c2f0c8b281c82e1 100644 (file)
@@ -244,7 +244,8 @@ int Processor::start(Worker *w)
   // start thread
   if (listen_sd >= 0) {
     worker = w;
-    w->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler);
+    worker->center.submit_to(worker->center.get_id(), [this]() {
+      worker->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); });
   }
 
   return 0;
@@ -290,10 +291,12 @@ void Processor::stop()
   ldout(msgr->cct,10) << __func__ << dendl;
 
   if (listen_sd >= 0) {
-    worker->center.delete_file_event(listen_sd, EVENT_READABLE);
-    ::shutdown(listen_sd, SHUT_RDWR);
-    ::close(listen_sd);
-    listen_sd = -1;
+    worker->center.submit_to(worker->center.get_id(), [this]() {
+      worker->center.delete_file_event(listen_sd, EVENT_READABLE);
+      ::shutdown(listen_sd, SHUT_RDWR);
+      ::close(listen_sd);
+      listen_sd = -1;
+    });
   }
 }