]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
WorkQueue: Add WorkQueueVal for passing params by val
authorSamuel Just <sam.just@inktank.com>
Fri, 28 Sep 2012 18:50:59 +0000 (11:50 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 13 Nov 2012 18:45:00 +0000 (10:45 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/WorkQueue.h

index ae69e1bf5ef996db0f18664301dcba1873ddd24a..2475b45d1ac0388916de49b095c51e1aae2375dd 100644 (file)
@@ -131,6 +131,84 @@ public:
     }
 
   };
+  template<typename T, typename U>
+  class WorkQueueVal : public WorkQueue_ {
+    Mutex _lock;
+    ThreadPool *pool;
+    list<U> to_process;
+    list<U> to_finish;
+    virtual void _enqueue(T) = 0;
+    virtual void _enqueue_front(T) = 0;
+    virtual bool _empty() = 0;
+    virtual U _dequeue() = 0;
+    virtual void _process(U) = 0;
+    virtual void _process_finish(U) {}
+
+    void *_void_dequeue() {
+      {
+       Mutex::Locker l(_lock);
+       if (_empty())
+         return 0;
+       U u = _dequeue();
+       to_process.push_back(u);
+      }
+      return ((void*)1); // Not used
+    }
+    void _void_process(void *) {
+      _lock.Lock();
+      assert(!to_process.empty());
+      U u = to_process.front();
+      to_process.pop_front();
+      _lock.Unlock();
+
+      _process(u);
+
+      _lock.Lock();
+      to_finish.push_back(u);
+      _lock.Unlock();
+    }
+
+    void _void_process_finish(void *) {
+      _lock.Lock();
+      assert(!to_finish.empty());
+      U u = to_finish.front();
+      to_finish.pop_front();
+      _lock.Unlock();
+
+      _process_finish(u);
+    }
+
+    void _clear() {}
+
+  public:
+    WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
+      : WorkQueue_(n, ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
+      pool->add_work_queue(this);
+    }
+    ~WorkQueueVal() {
+      pool->remove_work_queue(this);
+    }
+    void queue(T item) {
+      Mutex::Locker l(pool->_lock);
+      _enqueue(item);
+      pool->_cond.SignalOne();
+    }
+    void queue_front(T item) {
+      Mutex::Locker l(pool->_lock);
+      _enqueue_front(item);
+      pool->_cond.SignalOne();
+    }
+    void drain() {
+      pool->drain(this);
+    }
+  protected:
+    void lock() {
+      pool->lock();
+    }
+    void unlock() {
+      pool->unlock();
+    }
+  };
   template<class T>
   class WorkQueue : public WorkQueue_ {
     ThreadPool *pool;