From d224876ec58c9122bd82c6497a4e9a50c1405c8c Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Wed, 15 Jun 2011 14:42:35 -0700 Subject: [PATCH] ThreadPool: de-globalize Signed-off-by: Colin McCabe --- src/common/WorkQueue.cc | 28 ++++++++++++++-------------- src/common/WorkQueue.h | 7 +++++-- src/os/FileStore.cc | 2 +- src/osd/OSD.cc | 6 +++--- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index 97e176253b3a8..a7d6ae029b8ae 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -25,7 +25,7 @@ void ThreadPool::worker() { _lock.Lock(); - dout(10) << "worker start" << dendl; + ldout(cct,10) << "worker start" << dendl; while (!_stop) { if (!_pause && work_queues.size()) { WorkQueue_* wq; @@ -39,12 +39,12 @@ void ThreadPool::worker() void *item = wq->_void_dequeue(); if (item) { processing++; - dout(12) << "worker wq " << wq->name << " start processing " << item << dendl; + ldout(cct,12) << "worker wq " << wq->name << " start processing " << item << dendl; _lock.Unlock(); wq->_void_process(item); _lock.Lock(); wq->_void_process_finish(item); - dout(15) << "worker wq " << wq->name << " done processing " << item << dendl; + ldout(cct,15) << "worker wq " << wq->name << " done processing " << item << dendl; processing--; if (_pause || _draining) _wait_cond.Signal(); @@ -55,25 +55,25 @@ void ThreadPool::worker() if (did) continue; } - dout(15) << "worker waiting" << dendl; + ldout(cct,15) << "worker waiting" << dendl; _cond.Wait(_lock); } - dout(0) << "worker finish" << dendl; + ldout(cct,0) << "worker finish" << dendl; _lock.Unlock(); } void ThreadPool::start() { - dout(10) << "start" << dendl; + ldout(cct,10) << "start" << dendl; for (set::iterator p = _threads.begin(); p != _threads.end(); p++) (*p)->create(); - dout(15) << "started" << dendl; + ldout(cct,15) << "started" << dendl; } void ThreadPool::stop(bool clear_after) { - dout(10) << "stop" << dendl; + ldout(cct,10) << "stop" << dendl; _lock.Lock(); _stop = true; _cond.Signal(); @@ -86,25 +86,25 @@ void ThreadPool::stop(bool clear_after) for (unsigned i=0; i_clear(); _lock.Unlock(); - dout(15) << "stopped" << dendl; + ldout(cct,15) << "stopped" << dendl; } void ThreadPool::pause() { - dout(10) << "pause" << dendl; + ldout(cct,10) << "pause" << dendl; _lock.Lock(); assert(!_pause); _pause = true; while (processing) _wait_cond.Wait(_lock); _lock.Unlock(); - dout(15) << "paused" << dendl; + ldout(cct,15) << "paused" << dendl; } void ThreadPool::pause_new() { - dout(10) << "pause_new" << dendl; + ldout(cct,10) << "pause_new" << dendl; _lock.Lock(); assert(!_pause); _pause = true; @@ -113,7 +113,7 @@ void ThreadPool::pause_new() void ThreadPool::unpause() { - dout(10) << "unpause" << dendl; + ldout(cct,10) << "unpause" << dendl; _lock.Lock(); assert(_pause); _pause = false; @@ -123,7 +123,7 @@ void ThreadPool::unpause() void ThreadPool::drain(WorkQueue_* wq) { - dout(10) << "drain" << dendl; + ldout(cct,10) << "drain" << dendl; _lock.Lock(); _draining++; while (processing || (wq != NULL && !wq->_empty())) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 030d13a0c9238..1e14d18548233 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -19,7 +19,10 @@ #include "Cond.h" #include "Thread.h" +class CephContext; + class ThreadPool { + CephContext *cct; string name; Mutex _lock; Cond _cond; @@ -121,8 +124,8 @@ private: void worker(); public: - ThreadPool(string nm, int n=1) : - name(nm), + ThreadPool(CephContext *cct_, string nm, int n=1) : + cct(cct_), name(nm), _lock((new string(name + "::lock"))->c_str()), // deliberately leak this _stop(false), _pause(false), diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 4d2d0267ebf92..fa0741b66d505 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -871,7 +871,7 @@ FileStore::FileStore(const std::string &base, const std::string &jdev) : timer(&g_ceph_context, sync_entry_timeo_lock), stop(false), sync_thread(this), op_queue_len(0), op_queue_bytes(0), op_finisher(&g_ceph_context), next_finish(0), - op_tp("FileStore::op_tp", g_conf->filestore_op_threads), op_wq(this, &op_tp), + op_tp(&g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads), op_wq(this, &op_tp), flusher_queue_len(0), flusher_thread(this), logger(NULL) { diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 315c2dc4e450f..b1c4c8fb63de6 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -418,9 +418,9 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, ceph_osd_feature_ro_compat, ceph_osd_feature_incompat), state(STATE_BOOTING), boot_epoch(0), up_epoch(0), - op_tp("OSD::op_tp", g_conf->osd_op_threads), - recovery_tp("OSD::recovery_tp", g_conf->osd_recovery_threads), - disk_tp("OSD::disk_tp", g_conf->osd_disk_threads), + op_tp(hbm->cct, "OSD::op_tp", g_conf->osd_op_threads), + recovery_tp(hbm->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads), + disk_tp(hbm->cct, "OSD::disk_tp", g_conf->osd_disk_threads), heartbeat_lock("OSD::heartbeat_lock"), heartbeat_stop(false), heartbeat_epoch(0), heartbeat_messenger(hbm), -- 2.39.5