#include "Cond.h"
#include "Thread.h"
-template<class T>
-class WorkQueue {
-
+class WorkThreadPool {
Mutex _lock;
- Cond cond;
+ Cond _cond;
bool _stop, _pause;
+ Cond _wait_cond;
+
+ struct _WorkQueue {
+ string name;
+ _WorkQueue(string n) : name(n) {}
+ virtual bool _try_process() = 0;
+ virtual void _clear() = 0;
+ };
+
+public:
+ template<class T>
+ class WorkQueue : public _WorkQueue {
+ WorkThreadPool *pool;
+
+ virtual bool _enqueue(T *) = 0;
+ virtual void _dequeue(T *) = 0;
+ virtual T *_dequeue() = 0;
+ virtual void _process(T *) = 0;
+ virtual void _clear() = 0;
+
+ public:
+ WorkQueue(string n, WorkThreadPool *p) : _WorkQueue(n), pool(p) {
+ pool->add_work_queue(this);
+ }
+ ~WorkQueue() {
+ pool->remove_work_queue(this);
+ }
+
+ bool queue(T *item) {
+ pool->_lock.Lock();
+ bool r = _enqueue(item);
+ pool->_cond.SignalOne();
+ pool->_lock.Unlock();
+ return r;
+ }
+ void dequeue(T *item) {
+ pool->_lock.Lock();
+ _dequeue(item);
+ pool->_lock.Unlock();
+ }
+ void clear() {
+ pool->_lock.Lock();
+ _clear();
+ pool->_lock.Unlock();
+ }
+
+ bool _try_process() {
+ T *item = _dequeue();
+ if (item) {
+ pool->_lock.Unlock();
+ _process(item);
+ pool->_lock.Lock();
+ return true;
+ }
+ return false;
+ }
+
+ void lock() {
+ pool->lock();
+ }
+ void unlock() {
+ pool->unlock();
+ }
+ void _kick() {
+ pool->_kick();
+ }
+
+ };
+
+private:
+ vector<_WorkQueue*> work_queues;
+ int last_work_queue;
+
+
+ // threads
+ struct WorkThread : public Thread {
+ WorkThreadPool *pool;
+ WorkThread(WorkThreadPool *p) : pool(p) {}
+ void *entry() {
+ pool->entry();
+ return 0;
+ }
+ };
+
+ set<WorkThread*> _threads;
int processing;
- Cond wait_cond;
+
void entry() {
_lock.Lock();
+ //generic_dout(0) << "entry start" << dendl;
while (!_stop) {
- if (!_pause) {
- T *item = _dequeue();
- if (item) {
+ if (!_pause && work_queues.size()) {
+ _WorkQueue *wq;
+ int tries = work_queues.size();
+ bool did = false;
+ while (tries--) {
+ last_work_queue++;
+ last_work_queue %= work_queues.size();
+ wq = work_queues[last_work_queue];
+
processing++;
- _lock.Unlock();
- _process(item);
- _lock.Lock();
+ //generic_dout(0) << "entry trying wq " << wq->name << dendl;
+ did = wq->_try_process();
processing--;
- if (_pause)
- wait_cond.Signal();
- continue;
+ //if (did) generic_dout(0) << "entry did wq " << wq->name << dendl;
+ if (did && _pause)
+ _wait_cond.Signal();
+ if (did)
+ break;
}
+ if (did)
+ continue;
}
- cond.Wait(_lock);
+ //generic_dout(0) << "entry waiting" << dendl;
+ _cond.Wait(_lock);
}
+ //generic_dout(0) << "entry finish" << dendl;
_lock.Unlock();
}
- struct WorkThread : public Thread {
- WorkQueue *wq;
- WorkThread(WorkQueue *q) : wq(q) {}
- void *entry() {
- wq->entry();
- return 0;
- }
- } thread;
-
public:
- WorkQueue(string name) :
+ WorkThreadPool(string name, int n=1) :
_lock((new string(name + "::lock"))->c_str()), // deliberately leak this
- _stop(false), _pause(false),
- processing(0),
- thread(this) {}
+ _stop(false),
+ _pause(false),
+ last_work_queue(0),
+ processing(0) {
+ set_num_threads(n);
+ }
+ ~WorkThreadPool() {
+ for (set<WorkThread*>::iterator p = _threads.begin();
+ p != _threads.end();
+ p++)
+ delete *p;
+ }
+
+ void add_work_queue(_WorkQueue *wq) {
+ work_queues.push_back(wq);
+ }
+ void remove_work_queue(_WorkQueue *wq) {
+ unsigned i = 0;
+ while (work_queues[i] != wq)
+ i++;
+ for (i++; i < work_queues.size(); i++)
+ work_queues[i-1] = work_queues[i];
+ assert(i == work_queues.size());
+ work_queues.resize(i-1);
+ }
- virtual bool _enqueue(T *) = 0;
- virtual void _dequeue(T *) = 0;
- virtual T *_dequeue() = 0;
- virtual void _process(T *) = 0;
- virtual void _clear() = 0;
+ void set_num_threads(unsigned n) {
+ while (_threads.size() < n) {
+ WorkThread *t = new WorkThread(this);
+ _threads.insert(t);
+ }
+ }
void start() {
- thread.create();
+ for (set<WorkThread*>::iterator p = _threads.begin();
+ p != _threads.end();
+ p++)
+ (*p)->create();
}
void stop(bool clear_after=true) {
_lock.Lock();
_stop = true;
- cond.Signal();
+ _cond.Signal();
_lock.Unlock();
- thread.join();
- if (clear_after)
- clear();
+ for (set<WorkThread*>::iterator p = _threads.begin();
+ p != _threads.end();
+ p++)
+ (*p)->join();
+ _lock.Lock();
+ for (unsigned i=0; i<work_queues.size(); i++)
+ work_queues[i]->_clear();
+ _lock.Unlock();
}
void kick() {
_lock.Lock();
- cond.Signal();
+ _cond.Signal();
_lock.Unlock();
}
void _kick() {
assert(_lock.is_locked());
- cond.Signal();
+ _cond.Signal();
}
void lock() {
_lock.Lock();
assert(!_pause);
_pause = true;
while (processing)
- wait_cond.Wait(_lock);
+ _wait_cond.Wait(_lock);
_lock.Unlock();
}
void pause_new() {
_lock.Lock();
assert(_pause);
_pause = false;
- cond.Signal();
- _lock.Unlock();
- }
-
- bool queue(T *item) {
- _lock.Lock();
- bool r = _enqueue(item);
- cond.Signal();
- _lock.Unlock();
- return r;
- }
- void dequeue(T *item) {
- _lock.Lock();
- _dequeue(item);
- _lock.Unlock();
- }
- void clear() {
- _lock.Lock();
- _clear();
+ _cond.Signal();
_lock.Unlock();
}
};
+
#endif
whoami(id), dev_name(dev),
boot_epoch(0), last_active_epoch(0),
state(STATE_BOOTING),
+ recovery_tp("OSD::recovery_tp", 1),
+ disk_tp("OSD::disk_tp", 2),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_epoch(0),
heartbeat_messenger(hbm),
tid_lock("OSD::tid_lock"),
num_pulling(0),
recovery_ops_active(0),
- recovery_wq(this),
+ recovery_wq(this, &recovery_tp),
remove_list_lock("OSD::remove_list_lock"),
- snap_trim_wq(this),
- scrub_wq(this)
+ snap_trim_wq(this, &disk_tp),
+ scrub_wq(this, &disk_tp)
{
osdmap = 0;
// announce to monitor i exist and have booted.
do_mon_report();
- recovery_wq.start();
- scrub_wq.start();
- snap_trim_wq.start();
+ recovery_tp.add_work_queue(&recovery_wq);
+ recovery_tp.start();
+
+ disk_tp.add_work_queue(&scrub_wq);
+ disk_tp.add_work_queue(&snap_trim_wq);
+ disk_tp.start();
// start the heartbeat
heartbeat_thread.create();
delete threadpool;
threadpool = 0;
- recovery_wq.stop();
- dout(10) << "recovery wq stopped" << dendl;
- scrub_wq.stop();
- dout(10) << "scrub wq stopped" << dendl;
- snap_trim_wq.stop();
- dout(10) << "snap trim wq stopped" << dendl;
+ recovery_tp.stop();
+ dout(10) << "recovery tp stopped" << dendl;
+ disk_tp.stop();
+ dout(10) << "disk tp stopped" << dendl;
// tell pgs we're shutting down
for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
}
// periodically kick recovery work queue
- recovery_wq.kick();
+ recovery_tp.kick();
map_lock.get_read();
if (pg->is_primary()) {
if (m->repair)
pg->state_set(PG_STATE_REPAIR);
- if (!pg->is_scrubbing())
+ if (!pg->is_scrubbing()) {
+ dout(10) << "queueing " << *pg << " for scrub" << dendl;
scrub_wq.queue(pg);
+ }
}
}
} else {
if (pg->is_primary()) {
if (m->repair)
pg->state_set(PG_STATE_REPAIR);
- if (!pg->is_scrubbing())
+ if (!pg->is_scrubbing()) {
+ dout(10) << "queueing " << *pg << " for scrub" << dendl;
scrub_wq.queue(pg);
+ }
}
}
}
state = STATE_ACTIVE;
wait_for_no_ops();
- recovery_wq.pause();
- scrub_wq.pause_new(); // _process() may be waiting for a replica message
- snap_trim_wq.pause();
+ recovery_tp.pause();
+ disk_tp.pause_new(); // _process() may be waiting for a replica message
map_lock.get_write();
assert(osd_lock.is_locked());
map_lock.put_write();
- recovery_wq.unpause();
- scrub_wq.unpause();
- snap_trim_wq.unpause();
+ recovery_tp.unpause();
+ disk_tp.unpause();
//if (osdmap->get_epoch() == 1) store->sync(); // in case of early death, blah
private:
+
+ WorkThreadPool recovery_tp;
+ WorkThreadPool disk_tp;
+
+
+
// -- heartbeat --
Mutex heartbeat_lock;
Cond heartbeat_cond;
map<int, MOSDPGInfo*>* info_map,
int& created);
+
+
// -- pg recovery --
xlist<PG*> recovery_queue;
utime_t defer_recovery_until;
int recovery_ops_active;
- struct RecoveryWQ : public WorkQueue<PG> {
+ struct RecoveryWQ : public WorkThreadPool::WorkQueue<PG> {
OSD *osd;
- RecoveryWQ(OSD *o) : WorkQueue<PG>("OSD::RecoveryWQ"), osd(o) {}
+ RecoveryWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (!pg->recovery_item.get_xlist()) {
// -- snap trimming --
xlist<PG*> snap_trim_queue;
- struct SnapTrimWQ : public WorkQueue<PG> {
+ struct SnapTrimWQ : public WorkThreadPool::WorkQueue<PG> {
OSD *osd;
- SnapTrimWQ(OSD *o) : WorkQueue<PG>("OSD::SnapTrimWQ"), osd(o) {}
+ SnapTrimWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (pg->snap_trim_item.is_on_xlist())
// -- scrubbing --
xlist<PG*> scrub_queue;
- struct ScrubWQ : public WorkQueue<PG> {
+ struct ScrubWQ : public WorkThreadPool::WorkQueue<PG> {
OSD *osd;
- ScrubWQ(OSD *o) : WorkQueue<PG>("OSD::ScrubWQ"), osd(o) {}
+ ScrubWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (pg->scrub_item.is_on_xlist())