]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/AsyncConnection: swap eventcenter when replacing
authorHaomai Wang <haomai@xsky.com>
Wed, 29 Jun 2016 08:26:29 +0000 (16:26 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 12 Jul 2016 15:47:53 +0000 (23:47 +0800)
Previously we only exchange fd when replacing, now we will introduce dpdk
plugin in the near future. It needs all fd used locally which not like
kernel socket shared by all cores.

So we need to add EventCenter swapping to let each socket is associated to
EventCenter.

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index b857976b5383b3d9fc28f923db2852049ed73df2..b1b54890d982efa938119c97bd9b38a7a0a86b7f 100644 (file)
@@ -82,7 +82,7 @@ class C_clean_handler : public EventCallback {
  public:
   explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
   void do_request(int id) {
-    conn->cleanup_handler();
+    conn->cleanup();
     delete this;
   }
 };
@@ -948,6 +948,12 @@ void AsyncConnection::process()
           break;
         }
 
+      case STATE_NONE:
+        {
+          ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
+          break;
+        }
+
       case STATE_CLOSED:
         {
           if (sd >= 0)
@@ -1816,46 +1822,85 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     existing->reset_recv_state();
 
     int new_fd = sd;
-    int pre_exist_fd = existing->sd;
-    std::swap(existing->sd, sd);
-    _stop();
+    EventCenter *new_center = center;
+    Worker *new_worker = worker;
+    // avoid _stop shutdown replacing socket
+    sd = -1;
     // queue a reset on the new connection, which we're dumping for the old
+    _stop();
     dispatch_queue->queue_reset(this);
     ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
     existing->can_write = WriteStatus::NOWRITE;
     existing->open_write = false;
     existing->replacing = true;
     existing->state_offset = 0;
-    existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+    // avoid previous thread modify event
+    existing->state = STATE_NONE;
     // Discard existing prefetch buffer in `recv_buf`
     existing->recv_start = existing->recv_end = 0;
     // there shouldn't exist any buffer
     assert(recv_start == recv_end);
 
     existing->write_lock.Unlock();
-    // existing->sd now isn't registering any event while it's new,
-    // previous existing->sd now is closed, no event will notify
-    // existing(EventCenter*) from now.
-    center->submit_to(existing->center->get_id(), [existing, pre_exist_fd, new_fd, connect, reply, authorizer_reply]() mutable {
-      Mutex::Locker l(existing->lock);
-      if (new_fd != existing->sd)
-        return ;
-
-      if (existing->state != STATE_ACCEPTING_WAIT_CONNECT_MSG) {
-        existing->fault();
-        return ;
-      }
-      reply.global_seq = existing->peer_global_seq;
-      if (pre_exist_fd >= 0)
-        existing->center->delete_file_event(pre_exist_fd, EVENT_READABLE|EVENT_WRITABLE);
-      existing->center->create_file_event(new_fd, EVENT_READABLE, existing->read_handler);
-      if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
-        // handle error
-        existing->fault();
+    // new sd now isn't registered any event while origin events
+    // have been deleted.
+    // previous existing->sd now is still open, event will continue to
+    // notify previous existing->center from now.
+    // From now, no one will dispatch event to `existing`
+    // Note: we must use async dispatch instead of execute this inline
+    // even existing->center == center. Because we must ensure below
+    // event executed after all pending external events like
+    // "dispatch_state->queue"
+    existing->center->submit_to(
+        existing->center->get_id(),
+        [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable {
+      // we need to delete time event in original thread
+      {
+        Mutex::Locker l(existing->lock);
+        if (existing->state == STATE_NONE) {
+          existing->shutdown_socket();
+          existing->sd = new_fd;
+          existing->worker->references--;
+          new_worker->references++;
+          existing->logger = new_worker->get_perf_counter();
+          existing->worker = new_worker;
+          existing->center = new_center;
+          if (existing->delay_state)
+            existing->delay_state->set_center(new_center);
+        } else if (existing->state == STATE_CLOSED) {
+          ::close(new_fd);
+          return ;
+        } else {
+          assert(0);
+        }
       }
+
+      // Before changing existing->center, it may already exists some events in existing->center's queue.
+      // Then if we mark down `existing`, it will execute in another thread and clean up connection.
+      // Previous event will result in segment fault
+      auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable {
+        Mutex::Locker l(existing->lock);
+        if (existing->state == STATE_CLOSED)
+          return ;
+        assert(new_fd == existing->sd);
+        assert(existing->state == STATE_NONE);
+
+        existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+        existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler);
+        reply.global_seq = existing->peer_global_seq;
+        if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
+          // handle error
+          existing->fault();
+        }
+      };
+      if (existing->center->in_thread())
+        transfer_existing();
+      else
+        existing->center->submit_to(
+            existing->center->get_id(), std::move(transfer_existing), true);
     }, true);
-    existing->lock.Unlock();
 
+    existing->lock.Unlock();
     return 0;
   }
   existing->lock.Unlock();
@@ -2124,7 +2169,7 @@ int AsyncConnection::randomize_out_seq()
 
 void AsyncConnection::fault()
 {
-  if (state == STATE_CLOSED) {
+  if (state == STATE_CLOSED || state == STATE_NONE) {
     ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl;
     return ;
   }
@@ -2579,7 +2624,7 @@ void AsyncConnection::handle_write()
     if (state == STATE_STANDBY && !policy.server && is_queued()) {
       ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
       _connect();
-    } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
+    } else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
       r = _try_send();
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
index 1f8379756cf857083c708f6462df7b4db2c4099c..4b9205926f8a3681ea57edab61ac0c2cd828d6bd 100644 (file)
@@ -156,6 +156,7 @@ class AsyncConnection : public Connection {
       assert(register_time_events.empty());
       assert(delay_queue.empty());
     }
+    void set_center(EventCenter *c) { center = c; }
     void do_request(int id) override;
     void queue(double delay_period, utime_t release, Message *m) {
       Mutex::Locker l(delay_lock);
@@ -388,17 +389,8 @@ class AsyncConnection : public Connection {
     if (need_queue_reset)
       dispatch_queue->queue_reset(this);
   }
-  void cleanup_handler() {
-    for (auto &&t : register_time_events)
-      center->delete_time_event(t);
-    register_time_events.clear();
-    center->delete_time_event(last_tick_id);
-    if (sd >= 0) {
-      center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
-      shutdown_socket();
-      ::close(sd);
-      sd = -1;
-    }
+  void cleanup() {
+    shutdown_socket();
     delete read_handler;
     delete write_handler;
     delete wakeup_handler;