]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
WorkQueue: new PointerWQ base class for ContextWQ 6633/head
authorJason Dillaman <dillaman@redhat.com>
Tue, 7 Jul 2015 16:11:13 +0000 (12:11 -0400)
committerAbhishek Varshney <abhishek.varshney@flipkart.com>
Wed, 18 Nov 2015 08:13:08 +0000 (13:43 +0530)
The existing work queues do not properly function if added to a running
thread pool.  librbd uses a singleton thread pool which requires
dynamically adding/removing work queues as images are opened and closed.

Fixes: #13636
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 3e78b18b09d75626ca2599bac3b9f9c9889507a5)

src/common/WorkQueue.h

index f0754de8e1992bb4974698ea7dbdc7ec02a31ff2..0a26b3c16996299bef0b94c29cf6df7c38638998 100644 (file)
@@ -18,6 +18,7 @@
 #include "Mutex.h"
 #include "Cond.h"
 #include "Thread.h"
+#include "include/unordered_map.h"
 #include "common/config_obs.h"
 #include "common/HeartbeatMap.h"
 
@@ -349,6 +350,66 @@ public:
 
   };
 
+  template<typename T>
+  class PointerWQ : public WorkQueue_ {
+  public:
+    PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
+      : WorkQueue_(n, ti, sti), m_pool(p) {
+      m_pool->add_work_queue(this);
+    }
+    ~PointerWQ() {
+      m_pool->remove_work_queue(this);
+    }
+    void drain() {
+      m_pool->drain(this);
+    }
+    void queue(T *item) {
+      Mutex::Locker l(m_pool->_lock);
+      m_items.push_back(item);
+      m_pool->_cond.SignalOne();
+    }
+  protected:
+    virtual void _clear() {
+      assert(m_pool->_lock.is_locked());
+      m_items.clear();
+    }
+    virtual bool _empty() {
+      assert(m_pool->_lock.is_locked());
+      return m_items.empty();
+    }
+    virtual void *_void_dequeue() {
+      assert(m_pool->_lock.is_locked());
+      if (m_items.empty()) {
+        return NULL;
+      }
+
+      T *item = m_items.front();
+      m_items.pop_front();
+      return item;
+    }
+    virtual void _void_process(void *item, ThreadPool::TPHandle &handle) {
+      process(reinterpret_cast<T *>(item));
+    }
+    virtual void _void_process_finish(void *item) {
+    }
+
+    virtual void process(T *item) = 0;
+
+    T *front() {
+      assert(m_pool->_lock.is_locked());
+      if (m_items.empty()) {
+        return NULL;
+      }
+      return m_items.front();
+    }
+    void signal() {
+      Mutex::Locker pool_locker(m_pool->_lock);
+      m_pool->_cond.SignalOne();
+    }
+  private:
+    ThreadPool *m_pool;
+    std::list<T *> m_items;
+  };
 private:
   vector<WorkQueue_*> work_queues;
   int last_work_queue;
@@ -488,37 +549,44 @@ public:
 
 /// Work queue that asynchronously completes contexts (executes callbacks).
 /// @see Finisher
-class ContextWQ : public ThreadPool::WorkQueueVal<std::pair<Context *, int> > {
+class ContextWQ : public ThreadPool::PointerWQ<Context> {
 public:
   ContextWQ(const string &name, time_t ti, ThreadPool *tp)
-    : ThreadPool::WorkQueueVal<std::pair<Context *, int> >(name, ti, 0, tp) {}
+    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
+      m_lock("ContextWQ::m_lock") {
+  }
 
   void queue(Context *ctx, int result = 0) {
-    ThreadPool::WorkQueueVal<std::pair<Context *, int> >::queue(
-      std::make_pair(ctx, result));
+    if (result != 0) {
+      Mutex::Locker locker(m_lock);
+      m_context_results[ctx] = result;
+    }
+    ThreadPool::PointerWQ<Context>::queue(ctx);
   }
-
 protected:
-  virtual void _enqueue(std::pair<Context *, int> item) {
-    _queue.push_back(item);
-  }
-  virtual void _enqueue_front(std::pair<Context *, int> item) {
-    _queue.push_front(item);
-  }
-  virtual bool _empty() {
-    return _queue.empty();
-  }
-  virtual std::pair<Context *, int> _dequeue() {
-    std::pair<Context *, int> item = _queue.front();
-    _queue.pop_front();
-    return item;
+  virtual void _clear() {
+    ThreadPool::PointerWQ<Context>::_clear();
+
+    Mutex::Locker locker(m_lock);
+    m_context_results.clear();
   }
-  virtual void _process(std::pair<Context *, int> item) {
-    item.first->complete(item.second);
+
+  virtual void process(Context *ctx) {
+    int result = 0;
+    {
+      Mutex::Locker locker(m_lock);
+      ceph::unordered_map<Context *, int>::iterator it =
+        m_context_results.find(ctx);
+      if (it != m_context_results.end()) {
+        result = it->second;
+        m_context_results.erase(it);
+      }
+    }
+    ctx->complete(result);
   }
-  using ThreadPool::WorkQueueVal<std::pair<Context *, int> >::_process;
 private:
-  list<std::pair<Context *, int> > _queue;
+  Mutex m_lock;
+  ceph::unordered_map<Context*, int> m_context_results;
 };
 
 class ShardedThreadPool {