From da772fa88db47703d491ebaa9bd709e62858b3b0 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 14 Aug 2015 13:28:13 -0400 Subject: [PATCH] WorkQueue: PointerWQ drain no longer waits for other queues If another (independent) queue was processing, drain could block waiting. Instead, allow drain to exit quickly if no items are being processed and the queue is empty for the current WQ. Signed-off-by: Jason Dillaman (cherry picked from commit b118d7df1e34387b6e5649a5b205cf061598d0d4) --- src/common/WorkQueue.h | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 0a26b3c169962..7c3ccb5c5c269 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -354,13 +354,22 @@ public: class PointerWQ : public WorkQueue_ { public: PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p) - : WorkQueue_(n, ti, sti), m_pool(p) { + : WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) { m_pool->add_work_queue(this); } ~PointerWQ() { m_pool->remove_work_queue(this); + assert(m_processing == 0); } void drain() { + { + // if this queue is empty and not processing, don't wait for other + // queues to finish processing + Mutex::Locker l(m_pool->_lock); + if (m_processing == 0 && m_items.empty()) { + return; + } + } m_pool->drain(this); } void queue(T *item) { @@ -368,6 +377,10 @@ public: m_items.push_back(item); m_pool->_cond.SignalOne(); } + bool empty() { + Mutex::Locker l(m_pool->_lock); + return _empty(); + } protected: virtual void _clear() { assert(m_pool->_lock.is_locked()); @@ -383,6 +396,7 @@ public: return NULL; } + ++m_processing; T *item = m_items.front(); m_items.pop_front(); return item; @@ -391,6 +405,9 @@ public: process(reinterpret_cast(item)); } virtual void _void_process_finish(void *item) { + assert(m_pool->_lock.is_locked()); + assert(m_processing > 0); + --m_processing; } virtual void process(T *item) = 0; @@ -409,6 +426,7 @@ public: private: ThreadPool *m_pool; std::list m_items; + uint32_t m_processing; }; private: vector work_queues; -- 2.39.5