_lock.Lock();
dout(15) << "worker wq " << wq->name << " done processing " << item << dendl;
processing--;
- if (_pause)
+ if (_pause || _draining)
_wait_cond.Signal();
did = true;
break;
_cond.Signal();
_lock.Unlock();
}
+
+void ThreadPool::drain()
+{
+ dout(10) << "drain" << dendl;
+ _lock.Lock();
+ _draining = true;
+ while (processing)
+ _wait_cond.Wait(_lock);
+ _draining = false;
+ _lock.Unlock();
+}
+
string name;
Mutex _lock;
Cond _cond;
- bool _stop, _pause;
+ bool _stop, _pause, _draining;
Cond _wait_cond;
struct _WorkQueue {
name(nm),
_lock((new string(name + "::lock"))->c_str()), // deliberately leak this
_stop(false),
- _pause(false),
+ _pause(false), _draining(false),
last_work_queue(0),
processing(0) {
set_num_threads(n);
void pause();
void pause_new();
void unpause();
+ void drain();
};