]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: wait for dispatch event done
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 8 Jan 2015 13:45:53 +0000 (21:45 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:11 +0000 (03:07 +0800)
In order to avoid deadlock like:
1. mark_down_all with holding lock
2. ms_dispatch_reset
3. get_connection want to get lock
4. deadlock

We signal a workerpool barrier to wait for all in-queue events done.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 659eeca4795cf7532ef3aedf280b8cd869c69537..e1d803e7e39b4f990f1133f361e8a12d673a52af 100644 (file)
@@ -2111,6 +2111,7 @@ void AsyncConnection::mark_down()
   stopping.set(1);
   Mutex::Locker l(lock);
   _stop();
+  center->dispatch_event_external(reset_handler);
 }
 
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
index 5e8594a43239280b5a861184f63c250f1a6bcfbf..902a684e2bb0777a9778421ee8aebf23d1e08f93 100644 (file)
@@ -326,7 +326,9 @@ void *Worker::entry()
  *******************/
 const string WorkerPool::name = "AsyncMessenger::WorkerPool";
 
-WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false)
+WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false),
+                                        barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
+                                        barrier_count(0)
 {
   assert(cct->_conf->ms_async_op_threads > 0);
   for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
@@ -365,6 +367,24 @@ void WorkerPool::start()
   }
 }
 
+void WorkerPool::barrier()
+{
+  ldout(cct, 10) << __func__ << " started." << dendl;
+  pthread_t cur = pthread_self();
+  uint64_t send = 0;
+  for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
+    assert(cur != (*it)->center.get_owner());
+    (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this)));
+    barrier_count.inc();
+  }
+  ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl;
+  Mutex::Locker l(barrier_lock);
+  while (barrier_count.read())
+    barrier_cond.Wait(barrier_lock);
+
+  ldout(cct, 10) << __func__ << " end." << dendl;
+}
+
 
 /*******************
  * AsyncMessenger
@@ -406,11 +426,12 @@ void AsyncMessenger::ready()
 int AsyncMessenger::shutdown()
 {
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
-  mark_down_all();
 
   // break ref cycles on the loopback connection
   processor.stop();
+  mark_down_all();
   local_connection->set_priv(NULL);
+  pool->barrier();
   lock.Lock();
   stop_cond.Signal();
   lock.Unlock();
@@ -644,7 +665,6 @@ void AsyncMessenger::mark_down_all()
     AsyncConnectionRef p = *q;
     ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
     p->mark_down();
-    ms_deliver_handle_reset(p.get());
   }
   accepting_conns.clear();
 
@@ -654,7 +674,6 @@ void AsyncMessenger::mark_down_all()
     ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
     conns.erase(it);
     p->mark_down();
-    ms_deliver_handle_reset(p.get());
   }
 
   while (!deleted_conns.empty()) {
@@ -674,7 +693,6 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
   if (p) {
     ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
     p->mark_down();
-    ms_deliver_handle_reset(p.get());
   } else {
     ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
   }
index 8cee7f902a0ff1b0c454424a85e8e2683c5d9671..f4bb34a8c915da522c836c0b4522b72fb5ea94e9 100644 (file)
@@ -86,7 +86,21 @@ class WorkerPool: CephContext::AssociatedSingletonObject {
   vector<int> coreids;
   // Used to indicate whether thread started
   bool started;
-
+  Mutex barrier_lock;
+  Cond barrier_cond;
+  atomic_t barrier_count;
+
+  class C_barrier : public EventCallback {
+    WorkerPool *pool;
+   public:
+    C_barrier(WorkerPool *p): pool(p) {}
+    void do_request(int id) {
+      Mutex::Locker l(pool->barrier_lock);
+      pool->barrier_count.dec();
+      pool->barrier_cond.Signal();
+    }
+  };
+  friend class C_barrier;
  public:
   WorkerPool(CephContext *c);
   virtual ~WorkerPool();
@@ -99,6 +113,7 @@ class WorkerPool: CephContext::AssociatedSingletonObject {
       return -1;
     return coreids[id % coreids.size()];
   }
+  void barrier();
   // uniq name for CephContext to distinguish differnt object
   static const string name;
 };