From ad9f8dbc8867ac3fc7a67bebc92ea444923a44f7 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 9 Feb 2010 10:02:41 -0800 Subject: [PATCH] workqueue: make drain() wait for processing AND queue --- src/common/WorkQueue.cc | 4 ++-- src/common/WorkQueue.h | 7 +++++-- src/os/FileStore.cc | 4 ++-- src/os/FileStore.h | 3 +++ src/osd/OSD.h | 18 ++++++++++++++++++ 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index dde01bae40179..62508125a8c90 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -121,12 +121,12 @@ void ThreadPool::unpause() _lock.Unlock(); } -void ThreadPool::drain() +void ThreadPool::drain(_WorkQueue *wq) { dout(10) << "drain" << dendl; _lock.Lock(); _draining = true; - while (processing) + while (processing || (wq != NULL && !wq->_empty())) _wait_cond.Wait(_lock); _draining = false; _lock.Unlock(); diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index e94a032ff8588..c24d32e2972f8 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -31,6 +31,7 @@ class ThreadPool { _WorkQueue(string n) : name(n) {} virtual ~_WorkQueue() {} virtual void _clear() = 0; + virtual bool _empty() = 0; virtual void *_void_dequeue() = 0; virtual void _void_process(void *) = 0; virtual void _void_process_finish(void *) = 0; @@ -46,7 +47,6 @@ public: virtual T *_dequeue() = 0; virtual void _process(T *) = 0; virtual void _process_finish(T *) {} - virtual void _clear() = 0; void *_void_dequeue() { return (void *)_dequeue(); @@ -93,6 +93,9 @@ public: void _kick() { pool->_kick(); } + void drain() { + pool->drain(this); + } }; @@ -177,7 +180,7 @@ public: void pause(); void pause_new(); void unpause(); - void drain(); + void drain(_WorkQueue *wq = 0); }; diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index c19655b91b11d..febf4407b387f 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1533,8 +1533,8 @@ void FileStore::sync(Context *onsafe) void FileStore::_flush_op_queue() { - dout(10) << "_flush_op_queue draining for op tp" << dendl; - op_tp.drain(); + dout(10) << "_flush_op_queue draining op tp" << dendl; + op_wq.drain(); dout(10) << "_flush_op_queue waiting for apply finisher" << dendl; op_finisher.wait_for_empty(); } diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 4cc0a534957c7..9595c070b27b0 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -113,6 +113,9 @@ class FileStore : public JournalingObjectStore { void _dequeue(Op *o) { assert(0); } + bool _empty() { + return store->op_queue.empty(); + } Op *_dequeue() { if (store->op_queue.empty()) return NULL; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index ece5afcdc2347..206ff66cb8311 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -367,6 +367,9 @@ private: void _dequeue(PG *pg) { assert(0); } + bool _empty() { + return osd->op_queue.empty(); + } PG *_dequeue() { if (osd->op_queue.empty()) return NULL; @@ -618,6 +621,9 @@ protected: OSD *osd; BacklogWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::BacklogWQ", tp), osd(o) {} + bool _empty() { + return osd->backlog_queue.empty(); + } bool _enqueue(PG *pg) { if (!pg->backlog_item.get_xlist()) { pg->get(); @@ -666,6 +672,9 @@ protected: OSD *osd; RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::RecoveryWQ", tp), osd(o) {} + bool _empty() { + return osd->recovery_queue.empty(); + } bool _enqueue(PG *pg) { if (!pg->recovery_item.get_xlist()) { pg->get(); @@ -737,6 +746,9 @@ protected: OSD *osd; SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::SnapTrimWQ", tp), osd(o) {} + bool _empty() { + return osd->snap_trim_queue.empty(); + } bool _enqueue(PG *pg) { if (pg->snap_trim_item.is_on_xlist()) return false; @@ -769,6 +781,9 @@ protected: OSD *osd; ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::ScrubWQ", tp), osd(o) {} + bool _empty() { + return osd->scrub_queue.empty(); + } bool _enqueue(PG *pg) { if (pg->scrub_item.is_on_xlist()) return false; @@ -807,6 +822,9 @@ protected: OSD *osd; RemoveWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::RemoveWQ", tp), osd(o) {} + bool _empty() { + return osd->remove_queue.empty(); + } bool _enqueue(PG *pg) { if (pg->remove_item.is_on_xlist()) return false; -- 2.39.5