]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
workqueue: make drain() wait for processing AND queue
authorSage Weil <sage@newdream.net>
Tue, 9 Feb 2010 18:02:41 +0000 (10:02 -0800)
committerSage Weil <sage@newdream.net>
Tue, 9 Feb 2010 18:02:41 +0000 (10:02 -0800)
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/os/FileStore.cc
src/os/FileStore.h
src/osd/OSD.h

index dde01bae40179621e3f2f46274475d2e15731b24..62508125a8c905e27b8e14e92cb34ee4007eab0b 100644 (file)
@@ -121,12 +121,12 @@ void ThreadPool::unpause()
   _lock.Unlock();
 }
 
-void ThreadPool::drain()
+void ThreadPool::drain(_WorkQueue *wq)
 {
   dout(10) << "drain" << dendl;
   _lock.Lock();
   _draining = true;
-  while (processing)
+  while (processing || (wq != NULL && !wq->_empty()))
     _wait_cond.Wait(_lock);
   _draining = false;
   _lock.Unlock();
index e94a032ff8588c72b43e21b78029e838901f5518..c24d32e2972f8f4470f5154ab47eac113e035184 100644 (file)
@@ -31,6 +31,7 @@ class ThreadPool {
     _WorkQueue(string n) : name(n) {}
     virtual ~_WorkQueue() {}
     virtual void _clear() = 0;
+    virtual bool _empty() = 0;
     virtual void *_void_dequeue() = 0;
     virtual void _void_process(void *) = 0;
     virtual void _void_process_finish(void *) = 0;
@@ -46,7 +47,6 @@ public:
     virtual T *_dequeue() = 0;
     virtual void _process(T *) = 0;
     virtual void _process_finish(T *) {}
-    virtual void _clear() = 0;
     
     void *_void_dequeue() {
       return (void *)_dequeue();
@@ -93,6 +93,9 @@ public:
     void _kick() {
       pool->_kick();
     }
+    void drain() {
+      pool->drain(this);
+    }
 
   };
 
@@ -177,7 +180,7 @@ public:
   void pause();
   void pause_new();
   void unpause();
-  void drain();
+  void drain(_WorkQueue *wq = 0);
 };
 
 
index c19655b91b11d1c80585a9f0a1e34013f667168b..febf4407b387f5549b79c76e27889d8ec1759368 100644 (file)
@@ -1533,8 +1533,8 @@ void FileStore::sync(Context *onsafe)
 
 void FileStore::_flush_op_queue()
 {
-  dout(10) << "_flush_op_queue draining for op tp" << dendl;
-  op_tp.drain();
+  dout(10) << "_flush_op_queue draining op tp" << dendl;
+  op_wq.drain();
   dout(10) << "_flush_op_queue waiting for apply finisher" << dendl;
   op_finisher.wait_for_empty();
 }
index 4cc0a534957c72db41fe4dc83674076fc029245d..9595c070b27b0fd924c1da1ebdc39909299fb14a 100644 (file)
@@ -113,6 +113,9 @@ class FileStore : public JournalingObjectStore {
     void _dequeue(Op *o) {
       assert(0);
     }
+    bool _empty() {
+      return store->op_queue.empty();
+    }
     Op *_dequeue() {
       if (store->op_queue.empty())
        return NULL;
index ece5afcdc234730f2c367dac5512485ac35e5dd6..206ff66cb8311ed0c3b30581721e121b4bc2f75a 100644 (file)
@@ -367,6 +367,9 @@ private:
     void _dequeue(PG *pg) {
       assert(0);
     }
+    bool _empty() {
+      return osd->op_queue.empty();
+    }
     PG *_dequeue() {
       if (osd->op_queue.empty())
        return NULL;
@@ -618,6 +621,9 @@ protected:
     OSD *osd;
     BacklogWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", tp), osd(o) {}
 
+    bool _empty() {
+      return osd->backlog_queue.empty();
+    }
     bool _enqueue(PG *pg) {
       if (!pg->backlog_item.get_xlist()) {
        pg->get();
@@ -666,6 +672,9 @@ protected:
     OSD *osd;
     RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
 
+    bool _empty() {
+      return osd->recovery_queue.empty();
+    }
     bool _enqueue(PG *pg) {
       if (!pg->recovery_item.get_xlist()) {
        pg->get();
@@ -737,6 +746,9 @@ protected:
     OSD *osd;
     SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
 
+    bool _empty() {
+      return osd->snap_trim_queue.empty();
+    }
     bool _enqueue(PG *pg) {
       if (pg->snap_trim_item.is_on_xlist())
        return false;
@@ -769,6 +781,9 @@ protected:
     OSD *osd;
     ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
 
+    bool _empty() {
+      return osd->scrub_queue.empty();
+    }
     bool _enqueue(PG *pg) {
       if (pg->scrub_item.is_on_xlist())
        return false;
@@ -807,6 +822,9 @@ protected:
     OSD *osd;
     RemoveWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", tp), osd(o) {}
 
+    bool _empty() {
+      return osd->remove_queue.empty();
+    }
     bool _enqueue(PG *pg) {
       if (pg->remove_item.is_on_xlist())
        return false;