};
public:
+ template<class T>
+ class BatchWorkQueue : public WorkQueue_ {
+ ThreadPool *pool;
+ const size_t batch_size;
+
+ virtual bool _enqueue(T *) = 0;
+ virtual void _dequeue(T *) = 0;
+ virtual T *_dequeue() = 0;
+ virtual void _process(const list<T*> &) = 0;
+ virtual void _process_finish(const list<T*> &) {}
+
+ void *_void_dequeue() {
+ list<T*> *out(new list<T*>);
+ while (out->size() < batch_size) {
+ T *val = _dequeue();
+ if (!val)
+ break;
+ out->push_back(val);
+ }
+ if (out->size()) {
+ return (void *)out;
+ } else {
+ delete out;
+ return 0;
+ }
+ }
+ void _void_process(void *p) {
+ _process(*((list<T*>*)p));
+ }
+ void _void_process_finish(void *p) {
+ _process_finish(*(list<T*>*)p);
+ delete (list<T*> *)p;
+ }
+
+ public:
+ BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p,
+ size_t batch_size) :
+ WorkQueue_(n, ti, sti), pool(p), batch_size(batch_size) {
+ pool->add_work_queue(this);
+ }
+ ~BatchWorkQueue() {
+ pool->remove_work_queue(this);
+ }
+
+ bool queue(T *item) {
+ pool->_lock.Lock();
+ bool r = _enqueue(item);
+ pool->_cond.SignalOne();
+ pool->_lock.Unlock();
+ return r;
+ }
+ void dequeue(T *item) {
+ pool->_lock.Lock();
+ _dequeue(item);
+ pool->_lock.Unlock();
+ }
+ void clear() {
+ pool->_lock.Lock();
+ _clear();
+ pool->_lock.Unlock();
+ }
+
+ void lock() {
+ pool->lock();
+ }
+ void unlock() {
+ pool->unlock();
+ }
+ void kick() {
+ pool->kick();
+ }
+ void drain() {
+ pool->drain(this);
+ }
+
+ };
template<class T>
class WorkQueue : public WorkQueue_ {
ThreadPool *pool;