From 3ca6359ce5beac6bc69117cff7d953a22b493e92 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 7 Jun 2012 16:38:08 -0700 Subject: [PATCH] common/WorkQueue.h: add BatchWorkQueue Rather than dispatching one item at a time to process, etc, BatchWorkQueue dispatches up to a configurable number of items. Signed-off-by: Samuel Just --- src/common/WorkQueue.h | 76 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index b02ce75dcc21c..ed2bf6c7b3af5 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -47,6 +47,82 @@ class ThreadPool { }; public: + template + class BatchWorkQueue : public WorkQueue_ { + ThreadPool *pool; + const size_t batch_size; + + virtual bool _enqueue(T *) = 0; + virtual void _dequeue(T *) = 0; + virtual T *_dequeue() = 0; + virtual void _process(const list &) = 0; + virtual void _process_finish(const list &) {} + + void *_void_dequeue() { + list *out(new list); + while (out->size() < batch_size) { + T *val = _dequeue(); + if (!val) + break; + out->push_back(val); + } + if (out->size()) { + return (void *)out; + } else { + delete out; + return 0; + } + } + void _void_process(void *p) { + _process(*((list*)p)); + } + void _void_process_finish(void *p) { + _process_finish(*(list*)p); + delete (list *)p; + } + + public: + BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p, + size_t batch_size) : + WorkQueue_(n, ti, sti), pool(p), batch_size(batch_size) { + pool->add_work_queue(this); + } + ~BatchWorkQueue() { + pool->remove_work_queue(this); + } + + bool queue(T *item) { + pool->_lock.Lock(); + bool r = _enqueue(item); + pool->_cond.SignalOne(); + pool->_lock.Unlock(); + return r; + } + void dequeue(T *item) { + pool->_lock.Lock(); + _dequeue(item); + pool->_lock.Unlock(); + } + void clear() { + pool->_lock.Lock(); + _clear(); + pool->_lock.Unlock(); + } + + void lock() { + pool->lock(); + } + void unlock() { + pool->unlock(); + } + void kick() { + pool->kick(); + } + void drain() { + pool->drain(this); + } + + }; template class WorkQueue : public WorkQueue_ { ThreadPool *pool; -- 2.39.5