From: Samuel Just Date: Thu, 7 Jun 2012 23:38:08 +0000 (-0700) Subject: common/WorkQueue.h: add BatchWorkQueue X-Git-Tag: v0.50~109^2~2^2~29 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3ca6359ce5beac6bc69117cff7d953a22b493e92;p=ceph.git common/WorkQueue.h: add BatchWorkQueue Rather than dispatching one item at a time to process, etc, BatchWorkQueue dispatches up to a configurable number of items. Signed-off-by: Samuel Just --- diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index b02ce75dcc21..ed2bf6c7b3af 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -47,6 +47,82 @@ class ThreadPool { }; public: + template + class BatchWorkQueue : public WorkQueue_ { + ThreadPool *pool; + const size_t batch_size; + + virtual bool _enqueue(T *) = 0; + virtual void _dequeue(T *) = 0; + virtual T *_dequeue() = 0; + virtual void _process(const list &) = 0; + virtual void _process_finish(const list &) {} + + void *_void_dequeue() { + list *out(new list); + while (out->size() < batch_size) { + T *val = _dequeue(); + if (!val) + break; + out->push_back(val); + } + if (out->size()) { + return (void *)out; + } else { + delete out; + return 0; + } + } + void _void_process(void *p) { + _process(*((list*)p)); + } + void _void_process_finish(void *p) { + _process_finish(*(list*)p); + delete (list *)p; + } + + public: + BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p, + size_t batch_size) : + WorkQueue_(n, ti, sti), pool(p), batch_size(batch_size) { + pool->add_work_queue(this); + } + ~BatchWorkQueue() { + pool->remove_work_queue(this); + } + + bool queue(T *item) { + pool->_lock.Lock(); + bool r = _enqueue(item); + pool->_cond.SignalOne(); + pool->_lock.Unlock(); + return r; + } + void dequeue(T *item) { + pool->_lock.Lock(); + _dequeue(item); + pool->_lock.Unlock(); + } + void clear() { + pool->_lock.Lock(); + _clear(); + pool->_lock.Unlock(); + } + + void lock() { + pool->lock(); + } + void unlock() { + pool->unlock(); + } + void kick() { + pool->kick(); + } + void drain() { + pool->drain(this); + } + + }; template class WorkQueue : public WorkQueue_ { ThreadPool *pool;