From: Sage Weil Date: Wed, 10 Dec 2008 19:13:00 +0000 (-0800) Subject: osd: shared threadpool for multiple work queues X-Git-Tag: v0.6~112 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ec4da946b17af349371847d965ebd92a279739e6;p=ceph.git osd: shared threadpool for multiple work queues --- diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index c14d3af86dd..d8787c5062d 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -19,78 +19,195 @@ #include "Cond.h" #include "Thread.h" -template -class WorkQueue { - +class WorkThreadPool { Mutex _lock; - Cond cond; + Cond _cond; bool _stop, _pause; + Cond _wait_cond; + + struct _WorkQueue { + string name; + _WorkQueue(string n) : name(n) {} + virtual bool _try_process() = 0; + virtual void _clear() = 0; + }; + +public: + template + class WorkQueue : public _WorkQueue { + WorkThreadPool *pool; + + virtual bool _enqueue(T *) = 0; + virtual void _dequeue(T *) = 0; + virtual T *_dequeue() = 0; + virtual void _process(T *) = 0; + virtual void _clear() = 0; + + public: + WorkQueue(string n, WorkThreadPool *p) : _WorkQueue(n), pool(p) { + pool->add_work_queue(this); + } + ~WorkQueue() { + pool->remove_work_queue(this); + } + + bool queue(T *item) { + pool->_lock.Lock(); + bool r = _enqueue(item); + pool->_cond.SignalOne(); + pool->_lock.Unlock(); + return r; + } + void dequeue(T *item) { + pool->_lock.Lock(); + _dequeue(item); + pool->_lock.Unlock(); + } + void clear() { + pool->_lock.Lock(); + _clear(); + pool->_lock.Unlock(); + } + + bool _try_process() { + T *item = _dequeue(); + if (item) { + pool->_lock.Unlock(); + _process(item); + pool->_lock.Lock(); + return true; + } + return false; + } + + void lock() { + pool->lock(); + } + void unlock() { + pool->unlock(); + } + void _kick() { + pool->_kick(); + } + + }; + +private: + vector<_WorkQueue*> work_queues; + int last_work_queue; + + + // threads + struct WorkThread : public Thread { + WorkThreadPool *pool; + WorkThread(WorkThreadPool *p) : pool(p) {} + void *entry() { + pool->entry(); + return 0; + } + }; + + set _threads; int processing; - Cond wait_cond; + void entry() { _lock.Lock(); + //generic_dout(0) << "entry start" << dendl; while (!_stop) { - if (!_pause) { - T *item = _dequeue(); - if (item) { + if (!_pause && work_queues.size()) { + _WorkQueue *wq; + int tries = work_queues.size(); + bool did = false; + while (tries--) { + last_work_queue++; + last_work_queue %= work_queues.size(); + wq = work_queues[last_work_queue]; + processing++; - _lock.Unlock(); - _process(item); - _lock.Lock(); + //generic_dout(0) << "entry trying wq " << wq->name << dendl; + did = wq->_try_process(); processing--; - if (_pause) - wait_cond.Signal(); - continue; + //if (did) generic_dout(0) << "entry did wq " << wq->name << dendl; + if (did && _pause) + _wait_cond.Signal(); + if (did) + break; } + if (did) + continue; } - cond.Wait(_lock); + //generic_dout(0) << "entry waiting" << dendl; + _cond.Wait(_lock); } + //generic_dout(0) << "entry finish" << dendl; _lock.Unlock(); } - struct WorkThread : public Thread { - WorkQueue *wq; - WorkThread(WorkQueue *q) : wq(q) {} - void *entry() { - wq->entry(); - return 0; - } - } thread; - public: - WorkQueue(string name) : + WorkThreadPool(string name, int n=1) : _lock((new string(name + "::lock"))->c_str()), // deliberately leak this - _stop(false), _pause(false), - processing(0), - thread(this) {} + _stop(false), + _pause(false), + last_work_queue(0), + processing(0) { + set_num_threads(n); + } + ~WorkThreadPool() { + for (set::iterator p = _threads.begin(); + p != _threads.end(); + p++) + delete *p; + } + + void add_work_queue(_WorkQueue *wq) { + work_queues.push_back(wq); + } + void remove_work_queue(_WorkQueue *wq) { + unsigned i = 0; + while (work_queues[i] != wq) + i++; + for (i++; i < work_queues.size(); i++) + work_queues[i-1] = work_queues[i]; + assert(i == work_queues.size()); + work_queues.resize(i-1); + } - virtual bool _enqueue(T *) = 0; - virtual void _dequeue(T *) = 0; - virtual T *_dequeue() = 0; - virtual void _process(T *) = 0; - virtual void _clear() = 0; + void set_num_threads(unsigned n) { + while (_threads.size() < n) { + WorkThread *t = new WorkThread(this); + _threads.insert(t); + } + } void start() { - thread.create(); + for (set::iterator p = _threads.begin(); + p != _threads.end(); + p++) + (*p)->create(); } void stop(bool clear_after=true) { _lock.Lock(); _stop = true; - cond.Signal(); + _cond.Signal(); _lock.Unlock(); - thread.join(); - if (clear_after) - clear(); + for (set::iterator p = _threads.begin(); + p != _threads.end(); + p++) + (*p)->join(); + _lock.Lock(); + for (unsigned i=0; i_clear(); + _lock.Unlock(); } void kick() { _lock.Lock(); - cond.Signal(); + _cond.Signal(); _lock.Unlock(); } void _kick() { assert(_lock.is_locked()); - cond.Signal(); + _cond.Signal(); } void lock() { _lock.Lock(); @@ -104,7 +221,7 @@ public: assert(!_pause); _pause = true; while (processing) - wait_cond.Wait(_lock); + _wait_cond.Wait(_lock); _lock.Unlock(); } void pause_new() { @@ -118,29 +235,12 @@ public: _lock.Lock(); assert(_pause); _pause = false; - cond.Signal(); - _lock.Unlock(); - } - - bool queue(T *item) { - _lock.Lock(); - bool r = _enqueue(item); - cond.Signal(); - _lock.Unlock(); - return r; - } - void dequeue(T *item) { - _lock.Lock(); - _dequeue(item); - _lock.Unlock(); - } - void clear() { - _lock.Lock(); - _clear(); + _cond.Signal(); _lock.Unlock(); } }; + #endif diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 631c98385cd..ddafa660a71 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -260,6 +260,8 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) : whoami(id), dev_name(dev), boot_epoch(0), last_active_epoch(0), state(STATE_BOOTING), + recovery_tp("OSD::recovery_tp", 1), + disk_tp("OSD::disk_tp", 2), heartbeat_lock("OSD::heartbeat_lock"), heartbeat_stop(false), heartbeat_epoch(0), heartbeat_messenger(hbm), @@ -280,10 +282,10 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) : tid_lock("OSD::tid_lock"), num_pulling(0), recovery_ops_active(0), - recovery_wq(this), + recovery_wq(this, &recovery_tp), remove_list_lock("OSD::remove_list_lock"), - snap_trim_wq(this), - scrub_wq(this) + snap_trim_wq(this, &disk_tp), + scrub_wq(this, &disk_tp) { osdmap = 0; @@ -439,9 +441,12 @@ int OSD::init() // announce to monitor i exist and have booted. do_mon_report(); - recovery_wq.start(); - scrub_wq.start(); - snap_trim_wq.start(); + recovery_tp.add_work_queue(&recovery_wq); + recovery_tp.start(); + + disk_tp.add_work_queue(&scrub_wq); + disk_tp.add_work_queue(&snap_trim_wq); + disk_tp.start(); // start the heartbeat heartbeat_thread.create(); @@ -487,12 +492,10 @@ int OSD::shutdown() delete threadpool; threadpool = 0; - recovery_wq.stop(); - dout(10) << "recovery wq stopped" << dendl; - scrub_wq.stop(); - dout(10) << "scrub wq stopped" << dendl; - snap_trim_wq.stop(); - dout(10) << "snap trim wq stopped" << dendl; + recovery_tp.stop(); + dout(10) << "recovery tp stopped" << dendl; + disk_tp.stop(); + dout(10) << "disk tp stopped" << dendl; // tell pgs we're shutting down for (hash_map::iterator p = pg_map.begin(); @@ -1161,7 +1164,7 @@ void OSD::tick() } // periodically kick recovery work queue - recovery_wq.kick(); + recovery_tp.kick(); map_lock.get_read(); @@ -1642,8 +1645,10 @@ void OSD::handle_scrub(MOSDScrub *m) if (pg->is_primary()) { if (m->repair) pg->state_set(PG_STATE_REPAIR); - if (!pg->is_scrubbing()) + if (!pg->is_scrubbing()) { + dout(10) << "queueing " << *pg << " for scrub" << dendl; scrub_wq.queue(pg); + } } } } else { @@ -1655,8 +1660,10 @@ void OSD::handle_scrub(MOSDScrub *m) if (pg->is_primary()) { if (m->repair) pg->state_set(PG_STATE_REPAIR); - if (!pg->is_scrubbing()) + if (!pg->is_scrubbing()) { + dout(10) << "queueing " << *pg << " for scrub" << dendl; scrub_wq.queue(pg); + } } } } @@ -1724,9 +1731,8 @@ void OSD::handle_osd_map(MOSDMap *m) state = STATE_ACTIVE; wait_for_no_ops(); - recovery_wq.pause(); - scrub_wq.pause_new(); // _process() may be waiting for a replica message - snap_trim_wq.pause(); + recovery_tp.pause(); + disk_tp.pause_new(); // _process() may be waiting for a replica message map_lock.get_write(); assert(osd_lock.is_locked()); @@ -1920,9 +1926,8 @@ void OSD::handle_osd_map(MOSDMap *m) map_lock.put_write(); - recovery_wq.unpause(); - scrub_wq.unpause(); - snap_trim_wq.unpause(); + recovery_tp.unpause(); + disk_tp.unpause(); //if (osdmap->get_epoch() == 1) store->sync(); // in case of early death, blah diff --git a/src/osd/OSD.h b/src/osd/OSD.h index fe6ad22b714..3a384be2f6c 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -114,6 +114,12 @@ public: private: + + WorkThreadPool recovery_tp; + WorkThreadPool disk_tp; + + + // -- heartbeat -- Mutex heartbeat_lock; Cond heartbeat_cond; @@ -462,14 +468,16 @@ private: map* info_map, int& created); + + // -- pg recovery -- xlist recovery_queue; utime_t defer_recovery_until; int recovery_ops_active; - struct RecoveryWQ : public WorkQueue { + struct RecoveryWQ : public WorkThreadPool::WorkQueue { OSD *osd; - RecoveryWQ(OSD *o) : WorkQueue("OSD::RecoveryWQ"), osd(o) {} + RecoveryWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::RecoveryWQ", tp), osd(o) {} bool _enqueue(PG *pg) { if (!pg->recovery_item.get_xlist()) { @@ -543,9 +551,9 @@ private: // -- snap trimming -- xlist snap_trim_queue; - struct SnapTrimWQ : public WorkQueue { + struct SnapTrimWQ : public WorkThreadPool::WorkQueue { OSD *osd; - SnapTrimWQ(OSD *o) : WorkQueue("OSD::SnapTrimWQ"), osd(o) {} + SnapTrimWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::SnapTrimWQ", tp), osd(o) {} bool _enqueue(PG *pg) { if (pg->snap_trim_item.is_on_xlist()) @@ -575,9 +583,9 @@ private: // -- scrubbing -- xlist scrub_queue; - struct ScrubWQ : public WorkQueue { + struct ScrubWQ : public WorkThreadPool::WorkQueue { OSD *osd; - ScrubWQ(OSD *o) : WorkQueue("OSD::ScrubWQ"), osd(o) {} + ScrubWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::ScrubWQ", tp), osd(o) {} bool _enqueue(PG *pg) { if (pg->scrub_item.is_on_xlist())