From: Jason Dillaman Date: Fri, 14 Aug 2015 17:28:13 +0000 (-0400) Subject: WorkQueue: PointerWQ drain no longer waits for other queues X-Git-Tag: v0.94.7~40^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e5dfd3dd7a53bf79f1bfc17b8a4a720aba08d7c3;p=ceph.git WorkQueue: PointerWQ drain no longer waits for other queues If another (independent) queue was processing, drain could block waiting. Instead, allow drain to exit quickly if no items are being processed and the queue is empty for the current WQ. Signed-off-by: Jason Dillaman (cherry picked from commit b118d7df1e34387b6e5649a5b205cf061598d0d4) --- diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 7d453e6e25c3..1c2e475d1296 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -310,13 +310,22 @@ public: class PointerWQ : public WorkQueue_ { public: PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p) - : WorkQueue_(n, ti, sti), m_pool(p) { + : WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) { m_pool->add_work_queue(this); } ~PointerWQ() { m_pool->remove_work_queue(this); + assert(m_processing == 0); } void drain() { + { + // if this queue is empty and not processing, don't wait for other + // queues to finish processing + Mutex::Locker l(m_pool->_lock); + if (m_processing == 0 && m_items.empty()) { + return; + } + } m_pool->drain(this); } void queue(T *item) { @@ -324,6 +333,10 @@ public: m_items.push_back(item); m_pool->_cond.SignalOne(); } + bool empty() { + Mutex::Locker l(m_pool->_lock); + return _empty(); + } protected: virtual void _clear() { assert(m_pool->_lock.is_locked()); @@ -339,6 +352,7 @@ public: return NULL; } + ++m_processing; T *item = m_items.front(); m_items.pop_front(); return item; @@ -347,6 +361,9 @@ public: process(reinterpret_cast(item)); } virtual void _void_process_finish(void *item) { + assert(m_pool->_lock.is_locked()); + assert(m_processing > 0); + --m_processing; } virtual void process(T *item) = 0; @@ -365,6 +382,7 @@ public: private: ThreadPool *m_pool; std::list m_items; + uint32_t m_processing; }; private: vector work_queues;