]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/AsyncMessenger: make sure all connections closed then shutdown dq 9781/head
authorHaomai Wang <haomai@xsky.com>
Mon, 27 Jun 2016 15:23:20 +0000 (23:23 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 28 Jun 2016 14:29:59 +0000 (22:29 +0800)
1. ensure stop accepter before shutdown dispatcherqueue
2. ensure we don't generate new item after dispatcher queue shutdown

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 36c8f2d7de10d299061ea235b67d60cef3c4f77d..af1747951c0c269fa069c74cd9588d68824a799a 100644 (file)
@@ -367,9 +367,9 @@ class AsyncConnection : public Connection {
   void process();
   void wakeup_from(uint64_t id);
   void local_deliver();
-  void stop() {
+  void stop(bool queue_reset) {
     lock.Lock();
-    bool need_queue_reset = (state != STATE_CLOSED);
+    bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
     lock.Unlock();
     mark_down();
     if (need_queue_reset)
index c2c9d7081779a12e3c020edb25c38aa61e681774..50d48c3ecb0d63f415857317d67bdd6ebcd6f98a 100644 (file)
@@ -577,16 +577,17 @@ int AsyncMessenger::shutdown()
 {
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
 
-  // break ref cycles on the loopback connection
-  processor.stop();
   mark_down_all();
-  dispatch_queue.shutdown();
+  // break ref cycles on the loopback connection
   local_connection->set_priv(NULL);
-  pool->barrier();
+   // done!  clean up.
+  processor.stop();
+  did_bind = false;
   lock.Lock();
   stop_cond.Signal();
-  lock.Unlock();
   stopped = true;
+  lock.Unlock();
+  pool->barrier();
   return 0;
 }
 
@@ -659,15 +660,7 @@ void AsyncMessenger::wait()
 
   lock.Unlock();
 
-  // done!  clean up.
-  ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
-  processor.stop();
-  did_bind = false;
-  ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;
-
-  // close all connections
-  mark_down_all();
-
+  dispatch_queue.shutdown();
   if (dispatch_queue.is_started()) {
     ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
     dispatch_queue.wait();
@@ -675,6 +668,9 @@ void AsyncMessenger::wait()
     ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
   }
 
+  // close all connections
+  shutdown_connections(false);
+
   ldout(cct, 10) << __func__ << ": done." << dendl;
   ldout(cct, 1) << __func__ << " complete." << dendl;
   started = false;
@@ -815,7 +811,7 @@ int AsyncMessenger::send_keepalive(Connection *con)
   return 0;
 }
 
-void AsyncMessenger::mark_down_all()
+void AsyncMessenger::shutdown_connections(bool queue_reset)
 {
   ldout(cct,1) << __func__ << " " << dendl;
   lock.Lock();
@@ -823,7 +819,7 @@ void AsyncMessenger::mark_down_all()
        q != accepting_conns.end(); ++q) {
     AsyncConnectionRef p = *q;
     ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
-    p->stop();
+    p->stop(queue_reset);
   }
   accepting_conns.clear();
 
@@ -833,7 +829,7 @@ void AsyncMessenger::mark_down_all()
     ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
     conns.erase(it);
     p->get_perf_counter()->dec(l_msgr_active_connections);
-    p->stop();
+    p->stop(queue_reset);
   }
 
   {
index e54d3cd139c39dc831b6ba638ee7f21f4049c835..61f8f3b76d1285c59186a6a117bc673b971e1dce 100644 (file)
@@ -176,8 +176,10 @@ public:
   ConnectionRef get_connection(const entity_inst_t& dest) override;
   ConnectionRef get_loopback_connection() override;
   int send_keepalive(Connection *con);
-  void mark_down(const entity_addr_t& addr) override;
-  void mark_down_all() override;
+  virtual void mark_down(const entity_addr_t& addr) override;
+  virtual void mark_down_all() override {
+    shutdown_connections(true);
+  }
   /** @} // Connection Management */
 
   /**
@@ -348,6 +350,8 @@ private:
     ms_deliver_handle_fast_connect(local_connection.get());
   }
 
+  void shutdown_connections(bool queue_reset);
+
 public:
 
   /// con used for sending messages to ourselves