]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
WorkQueue: PointerWQ drain no longer waits for other queues
authorJason Dillaman <dillaman@redhat.com>
Fri, 14 Aug 2015 17:28:13 +0000 (13:28 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 3 Feb 2016 03:00:46 +0000 (22:00 -0500)
If another (independent) queue was processing, drain could
block waiting.  Instead, allow drain to exit quickly if
no items are being processed and the queue is empty for
the current WQ.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit b118d7df1e34387b6e5649a5b205cf061598d0d4)

src/common/WorkQueue.h

index 7d453e6e25c35f56eef3cbdc48c07ef508cd45b9..1c2e475d1296556bc4713ab9036eff07bcf7fdb3 100644 (file)
@@ -310,13 +310,22 @@ public:
   class PointerWQ : public WorkQueue_ {
   public:
     PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
-      : WorkQueue_(n, ti, sti), m_pool(p) {
+      : WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) {
       m_pool->add_work_queue(this);
     }
     ~PointerWQ() {
       m_pool->remove_work_queue(this);
+      assert(m_processing == 0);
     }
     void drain() {
+      {
+        // if this queue is empty and not processing, don't wait for other
+        // queues to finish processing
+        Mutex::Locker l(m_pool->_lock);
+        if (m_processing == 0 && m_items.empty()) {
+          return;
+        }
+      }
       m_pool->drain(this);
     }
     void queue(T *item) {
@@ -324,6 +333,10 @@ public:
       m_items.push_back(item);
       m_pool->_cond.SignalOne();
     }
+    bool empty() {
+      Mutex::Locker l(m_pool->_lock);
+      return _empty();
+    }
   protected:
     virtual void _clear() {
       assert(m_pool->_lock.is_locked());
@@ -339,6 +352,7 @@ public:
         return NULL;
       }
 
+      ++m_processing;
       T *item = m_items.front();
       m_items.pop_front();
       return item;
@@ -347,6 +361,9 @@ public:
       process(reinterpret_cast<T *>(item));
     }
     virtual void _void_process_finish(void *item) {
+      assert(m_pool->_lock.is_locked());
+      assert(m_processing > 0);
+      --m_processing;
     }
 
     virtual void process(T *item) = 0;
@@ -365,6 +382,7 @@ public:
   private:
     ThreadPool *m_pool;
     std::list<T *> m_items;
+    uint32_t m_processing;
   };
 private:
   vector<WorkQueue_*> work_queues;