}
};
+ template<typename T, typename U>
+ class WorkQueueVal : public WorkQueue_ {
+ Mutex _lock;
+ ThreadPool *pool;
+ list<U> to_process;
+ list<U> to_finish;
+ virtual void _enqueue(T) = 0;
+ virtual void _enqueue_front(T) = 0;
+ virtual bool _empty() = 0;
+ virtual U _dequeue() = 0;
+ virtual void _process(U) = 0;
+ virtual void _process_finish(U) {}
+
+ void *_void_dequeue() {
+ {
+ Mutex::Locker l(_lock);
+ if (_empty())
+ return 0;
+ U u = _dequeue();
+ to_process.push_back(u);
+ }
+ return ((void*)1); // Not used
+ }
+ void _void_process(void *) {
+ _lock.Lock();
+ assert(!to_process.empty());
+ U u = to_process.front();
+ to_process.pop_front();
+ _lock.Unlock();
+
+ _process(u);
+
+ _lock.Lock();
+ to_finish.push_back(u);
+ _lock.Unlock();
+ }
+
+ void _void_process_finish(void *) {
+ _lock.Lock();
+ assert(!to_finish.empty());
+ U u = to_finish.front();
+ to_finish.pop_front();
+ _lock.Unlock();
+
+ _process_finish(u);
+ }
+
+ void _clear() {}
+
+ public:
+ WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
+ : WorkQueue_(n, ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
+ pool->add_work_queue(this);
+ }
+ ~WorkQueueVal() {
+ pool->remove_work_queue(this);
+ }
+ void queue(T item) {
+ Mutex::Locker l(pool->_lock);
+ _enqueue(item);
+ pool->_cond.SignalOne();
+ }
+ void queue_front(T item) {
+ Mutex::Locker l(pool->_lock);
+ _enqueue_front(item);
+ pool->_cond.SignalOne();
+ }
+ void drain() {
+ pool->drain(this);
+ }
+ protected:
+ void lock() {
+ pool->lock();
+ }
+ void unlock() {
+ pool->unlock();
+ }
+ };
template<class T>
class WorkQueue : public WorkQueue_ {
ThreadPool *pool;