]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
WorkQueue: PointerWQ drain no longer waits for other queues
authorJason Dillaman <dillaman@redhat.com>
Fri, 14 Aug 2015 17:28:13 +0000 (13:28 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 11 Feb 2016 15:02:55 +0000 (10:02 -0500)
If another (independent) queue was processing, drain could
block waiting.  Instead, allow drain to exit quickly if
no items are being processed and the queue is empty for
the current WQ.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit b118d7df1e34387b6e5649a5b205cf061598d0d4)

src/common/WorkQueue.h

index 0a26b3c16996299bef0b94c29cf6df7c38638998..7c3ccb5c5c2693c1ff0107f93f8fb15186aa79cd 100644 (file)
@@ -354,13 +354,22 @@ public:
   class PointerWQ : public WorkQueue_ {
   public:
     PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
-      : WorkQueue_(n, ti, sti), m_pool(p) {
+      : WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) {
       m_pool->add_work_queue(this);
     }
     ~PointerWQ() {
       m_pool->remove_work_queue(this);
+      assert(m_processing == 0);
     }
     void drain() {
+      {
+        // if this queue is empty and not processing, don't wait for other
+        // queues to finish processing
+        Mutex::Locker l(m_pool->_lock);
+        if (m_processing == 0 && m_items.empty()) {
+          return;
+        }
+      }
       m_pool->drain(this);
     }
     void queue(T *item) {
@@ -368,6 +377,10 @@ public:
       m_items.push_back(item);
       m_pool->_cond.SignalOne();
     }
+    bool empty() {
+      Mutex::Locker l(m_pool->_lock);
+      return _empty();
+    }
   protected:
     virtual void _clear() {
       assert(m_pool->_lock.is_locked());
@@ -383,6 +396,7 @@ public:
         return NULL;
       }
 
+      ++m_processing;
       T *item = m_items.front();
       m_items.pop_front();
       return item;
@@ -391,6 +405,9 @@ public:
       process(reinterpret_cast<T *>(item));
     }
     virtual void _void_process_finish(void *item) {
+      assert(m_pool->_lock.is_locked());
+      assert(m_processing > 0);
+      --m_processing;
     }
 
     virtual void process(T *item) = 0;
@@ -409,6 +426,7 @@ public:
   private:
     ThreadPool *m_pool;
     std::list<T *> m_items;
+    uint32_t m_processing;
   };
 private:
   vector<WorkQueue_*> work_queues;