class PointerWQ : public WorkQueue_ {
public:
PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
- : WorkQueue_(n, ti, sti), m_pool(p) {
+ : WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) {
m_pool->add_work_queue(this);
}
~PointerWQ() {
m_pool->remove_work_queue(this);
+ assert(m_processing == 0);
}
void drain() {
+ {
+ // if this queue is empty and not processing, don't wait for other
+ // queues to finish processing
+ Mutex::Locker l(m_pool->_lock);
+ if (m_processing == 0 && m_items.empty()) {
+ return;
+ }
+ }
m_pool->drain(this);
}
void queue(T *item) {
m_items.push_back(item);
m_pool->_cond.SignalOne();
}
+ bool empty() {
+ Mutex::Locker l(m_pool->_lock);
+ return _empty();
+ }
protected:
virtual void _clear() {
assert(m_pool->_lock.is_locked());
return NULL;
}
+ ++m_processing;
T *item = m_items.front();
m_items.pop_front();
return item;
process(reinterpret_cast<T *>(item));
}
virtual void _void_process_finish(void *item) {
+ assert(m_pool->_lock.is_locked());
+ assert(m_processing > 0);
+ --m_processing;
}
virtual void process(T *item) = 0;
private:
ThreadPool *m_pool;
std::list<T *> m_items;
+ uint32_t m_processing;
};
private:
vector<WorkQueue_*> work_queues;