From: Haomai Wang Date: Thu, 8 Jan 2015 13:45:53 +0000 (+0800) Subject: AsyncMessenger: wait for dispatch event done X-Git-Tag: v0.93~247^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e7db9114893bfbe58a24e14d8db742023fcbc1d8;p=ceph.git AsyncMessenger: wait for dispatch event done In order to avoid deadlock like: 1. mark_down_all with holding lock 2. ms_dispatch_reset 3. get_connection want to get lock 4. deadlock We signal a workerpool barrier to wait for all in-queue events done. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 659eeca4795c..e1d803e7e39b 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2111,6 +2111,7 @@ void AsyncConnection::mark_down() stopping.set(1); Mutex::Locker l(lock); _stop(); + center->dispatch_event_external(reset_handler); } void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 5e8594a43239..902a684e2bb0 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -326,7 +326,9 @@ void *Worker::entry() *******************/ const string WorkerPool::name = "AsyncMessenger::WorkerPool"; -WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false) +WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false), + barrier_lock("WorkerPool::WorkerPool::barrier_lock"), + barrier_count(0) { assert(cct->_conf->ms_async_op_threads > 0); for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { @@ -365,6 +367,24 @@ void WorkerPool::start() } } +void WorkerPool::barrier() +{ + ldout(cct, 10) << __func__ << " started." << dendl; + pthread_t cur = pthread_self(); + uint64_t send = 0; + for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { + assert(cur != (*it)->center.get_owner()); + (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this))); + barrier_count.inc(); + } + ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl; + Mutex::Locker l(barrier_lock); + while (barrier_count.read()) + barrier_cond.Wait(barrier_lock); + + ldout(cct, 10) << __func__ << " end." << dendl; +} + /******************* * AsyncMessenger @@ -406,11 +426,12 @@ void AsyncMessenger::ready() int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; - mark_down_all(); // break ref cycles on the loopback connection processor.stop(); + mark_down_all(); local_connection->set_priv(NULL); + pool->barrier(); lock.Lock(); stop_cond.Signal(); lock.Unlock(); @@ -644,7 +665,6 @@ void AsyncMessenger::mark_down_all() AsyncConnectionRef p = *q; ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl; p->mark_down(); - ms_deliver_handle_reset(p.get()); } accepting_conns.clear(); @@ -654,7 +674,6 @@ void AsyncMessenger::mark_down_all() ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl; conns.erase(it); p->mark_down(); - ms_deliver_handle_reset(p.get()); } while (!deleted_conns.empty()) { @@ -674,7 +693,6 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) if (p) { ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; p->mark_down(); - ms_deliver_handle_reset(p.get()); } else { ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl; } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 8cee7f902a0f..f4bb34a8c915 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -86,7 +86,21 @@ class WorkerPool: CephContext::AssociatedSingletonObject { vector coreids; // Used to indicate whether thread started bool started; - + Mutex barrier_lock; + Cond barrier_cond; + atomic_t barrier_count; + + class C_barrier : public EventCallback { + WorkerPool *pool; + public: + C_barrier(WorkerPool *p): pool(p) {} + void do_request(int id) { + Mutex::Locker l(pool->barrier_lock); + pool->barrier_count.dec(); + pool->barrier_cond.Signal(); + } + }; + friend class C_barrier; public: WorkerPool(CephContext *c); virtual ~WorkerPool(); @@ -99,6 +113,7 @@ class WorkerPool: CephContext::AssociatedSingletonObject { return -1; return coreids[id % coreids.size()]; } + void barrier(); // uniq name for CephContext to distinguish differnt object static const string name; };