From 3e78b18b09d75626ca2599bac3b9f9c9889507a5 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 7 Jul 2015 12:11:13 -0400 Subject: [PATCH] WorkQueue: new PointerWQ base class for ContextWQ The existing work queues do not properly function if added to a running thread pool. librbd uses a singleton thread pool which requires dynamically adding/removing work queues as images are opened and closed. Fixes: #13636 Signed-off-by: Jason Dillaman --- src/common/WorkQueue.h | 112 +++++++++++++++++++++++++++++++++-------- 1 file changed, 90 insertions(+), 22 deletions(-) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index f0754de8e1992..0a26b3c169962 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -18,6 +18,7 @@ #include "Mutex.h" #include "Cond.h" #include "Thread.h" +#include "include/unordered_map.h" #include "common/config_obs.h" #include "common/HeartbeatMap.h" @@ -349,6 +350,66 @@ public: }; + template + class PointerWQ : public WorkQueue_ { + public: + PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p) + : WorkQueue_(n, ti, sti), m_pool(p) { + m_pool->add_work_queue(this); + } + ~PointerWQ() { + m_pool->remove_work_queue(this); + } + void drain() { + m_pool->drain(this); + } + void queue(T *item) { + Mutex::Locker l(m_pool->_lock); + m_items.push_back(item); + m_pool->_cond.SignalOne(); + } + protected: + virtual void _clear() { + assert(m_pool->_lock.is_locked()); + m_items.clear(); + } + virtual bool _empty() { + assert(m_pool->_lock.is_locked()); + return m_items.empty(); + } + virtual void *_void_dequeue() { + assert(m_pool->_lock.is_locked()); + if (m_items.empty()) { + return NULL; + } + + T *item = m_items.front(); + m_items.pop_front(); + return item; + } + virtual void _void_process(void *item, ThreadPool::TPHandle &handle) { + process(reinterpret_cast(item)); + } + virtual void _void_process_finish(void *item) { + } + + virtual void process(T *item) = 0; + + T *front() { + assert(m_pool->_lock.is_locked()); + if (m_items.empty()) { + return NULL; + } + return m_items.front(); + } + void signal() { + Mutex::Locker pool_locker(m_pool->_lock); + m_pool->_cond.SignalOne(); + } + private: + ThreadPool *m_pool; + std::list m_items; + }; private: vector work_queues; int last_work_queue; @@ -488,37 +549,44 @@ public: /// Work queue that asynchronously completes contexts (executes callbacks). /// @see Finisher -class ContextWQ : public ThreadPool::WorkQueueVal > { +class ContextWQ : public ThreadPool::PointerWQ { public: ContextWQ(const string &name, time_t ti, ThreadPool *tp) - : ThreadPool::WorkQueueVal >(name, ti, 0, tp) {} + : ThreadPool::PointerWQ(name, ti, 0, tp), + m_lock("ContextWQ::m_lock") { + } void queue(Context *ctx, int result = 0) { - ThreadPool::WorkQueueVal >::queue( - std::make_pair(ctx, result)); + if (result != 0) { + Mutex::Locker locker(m_lock); + m_context_results[ctx] = result; + } + ThreadPool::PointerWQ::queue(ctx); } - protected: - virtual void _enqueue(std::pair item) { - _queue.push_back(item); - } - virtual void _enqueue_front(std::pair item) { - _queue.push_front(item); - } - virtual bool _empty() { - return _queue.empty(); - } - virtual std::pair _dequeue() { - std::pair item = _queue.front(); - _queue.pop_front(); - return item; + virtual void _clear() { + ThreadPool::PointerWQ::_clear(); + + Mutex::Locker locker(m_lock); + m_context_results.clear(); } - virtual void _process(std::pair item) { - item.first->complete(item.second); + + virtual void process(Context *ctx) { + int result = 0; + { + Mutex::Locker locker(m_lock); + ceph::unordered_map::iterator it = + m_context_results.find(ctx); + if (it != m_context_results.end()) { + result = it->second; + m_context_results.erase(it); + } + } + ctx->complete(result); } - using ThreadPool::WorkQueueVal >::_process; private: - list > _queue; + Mutex m_lock; + ceph::unordered_map m_context_results; }; class ShardedThreadPool { -- 2.39.5