From e5dfd3dd7a53bf79f1bfc17b8a4a720aba08d7c3 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 14 Aug 2015 13:28:13 -0400 Subject: [PATCH] WorkQueue: PointerWQ drain no longer waits for other queues If another (independent) queue was processing, drain could block waiting. Instead, allow drain to exit quickly if no items are being processed and the queue is empty for the current WQ. Signed-off-by: Jason Dillaman (cherry picked from commit b118d7df1e34387b6e5649a5b205cf061598d0d4) --- src/common/WorkQueue.h | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 7d453e6e25c3..1c2e475d1296 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -310,13 +310,22 @@ public: class PointerWQ : public WorkQueue_ { public: PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p) - : WorkQueue_(n, ti, sti), m_pool(p) { + : WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) { m_pool->add_work_queue(this); } ~PointerWQ() { m_pool->remove_work_queue(this); + assert(m_processing == 0); } void drain() { + { + // if this queue is empty and not processing, don't wait for other + // queues to finish processing + Mutex::Locker l(m_pool->_lock); + if (m_processing == 0 && m_items.empty()) { + return; + } + } m_pool->drain(this); } void queue(T *item) { @@ -324,6 +333,10 @@ public: m_items.push_back(item); m_pool->_cond.SignalOne(); } + bool empty() { + Mutex::Locker l(m_pool->_lock); + return _empty(); + } protected: virtual void _clear() { assert(m_pool->_lock.is_locked()); @@ -339,6 +352,7 @@ public: return NULL; } + ++m_processing; T *item = m_items.front(); m_items.pop_front(); return item; @@ -347,6 +361,9 @@ public: process(reinterpret_cast(item)); } virtual void _void_process_finish(void *item) { + assert(m_pool->_lock.is_locked()); + assert(m_processing > 0); + --m_processing; } virtual void process(T *item) = 0; @@ -365,6 +382,7 @@ public: private: ThreadPool *m_pool; std::list m_items; + uint32_t m_processing; }; private: vector work_queues; -- 2.47.3