_lock.Unlock();
}
-void ThreadPool::drain()
+void ThreadPool::drain(_WorkQueue *wq)
{
dout(10) << "drain" << dendl;
_lock.Lock();
_draining = true;
- while (processing)
+ while (processing || (wq != NULL && !wq->_empty()))
_wait_cond.Wait(_lock);
_draining = false;
_lock.Unlock();
_WorkQueue(string n) : name(n) {}
virtual ~_WorkQueue() {}
virtual void _clear() = 0;
+ virtual bool _empty() = 0;
virtual void *_void_dequeue() = 0;
virtual void _void_process(void *) = 0;
virtual void _void_process_finish(void *) = 0;
virtual T *_dequeue() = 0;
virtual void _process(T *) = 0;
virtual void _process_finish(T *) {}
- virtual void _clear() = 0;
void *_void_dequeue() {
return (void *)_dequeue();
void _kick() {
pool->_kick();
}
+ void drain() {
+ pool->drain(this);
+ }
};
void pause();
void pause_new();
void unpause();
- void drain();
+ void drain(_WorkQueue *wq = 0);
};
void FileStore::_flush_op_queue()
{
- dout(10) << "_flush_op_queue draining for op tp" << dendl;
- op_tp.drain();
+ dout(10) << "_flush_op_queue draining op tp" << dendl;
+ op_wq.drain();
dout(10) << "_flush_op_queue waiting for apply finisher" << dendl;
op_finisher.wait_for_empty();
}
void _dequeue(PG *pg) {
assert(0);
}
+ bool _empty() {
+ return osd->op_queue.empty();
+ }
PG *_dequeue() {
if (osd->op_queue.empty())
return NULL;
OSD *osd;
BacklogWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", tp), osd(o) {}
+ bool _empty() {
+ return osd->backlog_queue.empty();
+ }
bool _enqueue(PG *pg) {
if (!pg->backlog_item.get_xlist()) {
pg->get();
OSD *osd;
RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
+ bool _empty() {
+ return osd->recovery_queue.empty();
+ }
bool _enqueue(PG *pg) {
if (!pg->recovery_item.get_xlist()) {
pg->get();
OSD *osd;
SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
+ bool _empty() {
+ return osd->snap_trim_queue.empty();
+ }
bool _enqueue(PG *pg) {
if (pg->snap_trim_item.is_on_xlist())
return false;
OSD *osd;
ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
+ bool _empty() {
+ return osd->scrub_queue.empty();
+ }
bool _enqueue(PG *pg) {
if (pg->scrub_item.is_on_xlist())
return false;
OSD *osd;
RemoveWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", tp), osd(o) {}
+ bool _empty() {
+ return osd->remove_queue.empty();
+ }
bool _enqueue(PG *pg) {
if (pg->remove_item.is_on_xlist())
return false;