]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/WorkQueue.h: add BatchWorkQueue
authorSamuel Just <sam.just@inktank.com>
Thu, 7 Jun 2012 23:38:08 +0000 (16:38 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:15:01 +0000 (10:15 -0700)
Rather than dispatching one item at a time to process, etc,
BatchWorkQueue dispatches up to a configurable number of
items.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/WorkQueue.h

index b02ce75dcc21ca20f6b1da0675015e4b3704bff0..ed2bf6c7b3af56aa448b9f63a9edff55649a95f3 100644 (file)
@@ -47,6 +47,82 @@ class ThreadPool {
   };  
 
 public:
+  template<class T>
+  class BatchWorkQueue : public WorkQueue_ {
+    ThreadPool *pool;
+    const size_t batch_size;
+
+    virtual bool _enqueue(T *) = 0;
+    virtual void _dequeue(T *) = 0;
+    virtual T *_dequeue() = 0;
+    virtual void _process(const list<T*> &) = 0;
+    virtual void _process_finish(const list<T*> &) {}
+
+    void *_void_dequeue() {
+      list<T*> *out(new list<T*>);
+      while (out->size() < batch_size) {
+       T *val = _dequeue();
+       if (!val)
+         break;
+       out->push_back(val);
+      }
+      if (out->size()) {
+       return (void *)out;
+      } else {
+       delete out;
+       return 0;
+      }
+    }
+    void _void_process(void *p) {
+      _process(*((list<T*>*)p));
+    }
+    void _void_process_finish(void *p) {
+      _process_finish(*(list<T*>*)p);
+      delete (list<T*> *)p;
+    }
+
+  public:
+    BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p,
+                  size_t batch_size) :
+      WorkQueue_(n, ti, sti), pool(p), batch_size(batch_size) {
+      pool->add_work_queue(this);
+    }
+    ~BatchWorkQueue() {
+      pool->remove_work_queue(this);
+    }
+
+    bool queue(T *item) {
+      pool->_lock.Lock();
+      bool r = _enqueue(item);
+      pool->_cond.SignalOne();
+      pool->_lock.Unlock();
+      return r;
+    }
+    void dequeue(T *item) {
+      pool->_lock.Lock();
+      _dequeue(item);
+      pool->_lock.Unlock();
+    }
+    void clear() {
+      pool->_lock.Lock();
+      _clear();
+      pool->_lock.Unlock();
+    }
+
+    void lock() {
+      pool->lock();
+    }
+    void unlock() {
+      pool->unlock();
+    }
+    void kick() {
+      pool->kick();
+    }
+    void drain() {
+      pool->drain(this);
+    }
+
+  };
   template<class T>
   class WorkQueue : public WorkQueue_ {
     ThreadPool *pool;