From: Sage Weil Date: Wed, 10 Dec 2008 20:15:00 +0000 (-0800) Subject: workqueue: non-inline worker, control methods; debugging X-Git-Tag: v0.6~109 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=642420aa4411f95db78a09d72c83c95e0b450675;p=ceph.git workqueue: non-inline worker, control methods; debugging --- diff --git a/src/Makefile.am b/src/Makefile.am index f3bb88331ec..e7308d1d767 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -181,6 +181,7 @@ libcommon_a_SOURCES = \ common/sctp_crc32.c\ common/assert.cc \ common/debug.cc \ + common/WorkQueue.cc \ mon/MonMap.cc \ mon/MonClient.cc \ osd/OSDMap.cc \ diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc new file mode 100644 index 00000000000..5ce590b77a3 --- /dev/null +++ b/src/common/WorkQueue.cc @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "WorkQueue.h" + +#include "config.h" + +#define DOUT_SUBSYS tp +#undef dout_prefix +#define dout_prefix *_dout << dbeginl << pthread_self() << " " << name << " " + + +void ThreadPool::worker() +{ + _lock.Lock(); + dout(10) << "worker start" << dendl; + while (!_stop) { + if (!_pause && work_queues.size()) { + _WorkQueue *wq; + int tries = work_queues.size(); + bool did = false; + while (tries--) { + last_work_queue++; + last_work_queue %= work_queues.size(); + wq = work_queues[last_work_queue]; + + void *item = wq->_void_dequeue(); + if (item) { + processing++; + dout(12) << "worker wq " << wq->name << " start processing " << item << dendl; + _lock.Unlock(); + wq->_void_process(item); + _lock.Lock(); + dout(15) << "worker wq " << wq->name << " done processing " << item << dendl; + processing--; + if (_pause) + _wait_cond.Signal(); + did = true; + break; + } + } + if (did) + continue; + } + dout(15) << "worker waiting" << dendl; + _cond.Wait(_lock); + } + dout(0) << "worker finish" << dendl; + _lock.Unlock(); +} + +void ThreadPool::start() +{ + dout(10) << "start" << dendl; + for (set::iterator p = _threads.begin(); + p != _threads.end(); + p++) + (*p)->create(); + dout(15) << "started" << dendl; +} +void ThreadPool::stop(bool clear_after) +{ + dout(10) << "stop" << dendl; + _lock.Lock(); + _stop = true; + _cond.Signal(); + _lock.Unlock(); + for (set::iterator p = _threads.begin(); + p != _threads.end(); + p++) + (*p)->join(); + _lock.Lock(); + for (unsigned i=0; i_clear(); + _lock.Unlock(); + dout(15) << "stopped" << dendl; +} + + +void ThreadPool::pause() +{ + dout(10) << "pause" << dendl; + _lock.Lock(); + assert(!_pause); + _pause = true; + while (processing) + _wait_cond.Wait(_lock); + _lock.Unlock(); + dout(15) << "paused" << dendl; +} + +void ThreadPool::pause_new() +{ + dout(10) << "pause_new" << dendl; + _lock.Lock(); + assert(!_pause); + _pause = true; + _lock.Unlock(); +} + +void ThreadPool::unpause() +{ + dout(10) << "unpause" << dendl; + _lock.Lock(); + assert(_pause); + _pause = false; + _cond.Signal(); + _lock.Unlock(); +} diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index d8787c5062d..e275198e50e 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -19,7 +19,8 @@ #include "Cond.h" #include "Thread.h" -class WorkThreadPool { +class ThreadPool { + string name; Mutex _lock; Cond _cond; bool _stop, _pause; @@ -28,14 +29,15 @@ class WorkThreadPool { struct _WorkQueue { string name; _WorkQueue(string n) : name(n) {} - virtual bool _try_process() = 0; virtual void _clear() = 0; + virtual void *_void_dequeue() = 0; + virtual void _void_process(void *) = 0; }; public: template class WorkQueue : public _WorkQueue { - WorkThreadPool *pool; + ThreadPool *pool; virtual bool _enqueue(T *) = 0; virtual void _dequeue(T *) = 0; @@ -43,8 +45,15 @@ public: virtual void _process(T *) = 0; virtual void _clear() = 0; + void *_void_dequeue() { + return (void *)_dequeue(); + } + void _void_process(void *p) { + _process((T *)p); + } + public: - WorkQueue(string n, WorkThreadPool *p) : _WorkQueue(n), pool(p) { + WorkQueue(string n, ThreadPool *p) : _WorkQueue(n), pool(p) { pool->add_work_queue(this); } ~WorkQueue() { @@ -69,17 +78,6 @@ public: pool->_lock.Unlock(); } - bool _try_process() { - T *item = _dequeue(); - if (item) { - pool->_lock.Unlock(); - _process(item); - pool->_lock.Lock(); - return true; - } - return false; - } - void lock() { pool->lock(); } @@ -99,10 +97,10 @@ private: // threads struct WorkThread : public Thread { - WorkThreadPool *pool; - WorkThread(WorkThreadPool *p) : pool(p) {} + ThreadPool *pool; + WorkThread(ThreadPool *p) : pool(p) {} void *entry() { - pool->entry(); + pool->worker(); return 0; } }; @@ -110,42 +108,11 @@ private: set _threads; int processing; - - void entry() { - _lock.Lock(); - //generic_dout(0) << "entry start" << dendl; - while (!_stop) { - if (!_pause && work_queues.size()) { - _WorkQueue *wq; - int tries = work_queues.size(); - bool did = false; - while (tries--) { - last_work_queue++; - last_work_queue %= work_queues.size(); - wq = work_queues[last_work_queue]; - - processing++; - //generic_dout(0) << "entry trying wq " << wq->name << dendl; - did = wq->_try_process(); - processing--; - //if (did) generic_dout(0) << "entry did wq " << wq->name << dendl; - if (did && _pause) - _wait_cond.Signal(); - if (did) - break; - } - if (did) - continue; - } - //generic_dout(0) << "entry waiting" << dendl; - _cond.Wait(_lock); - } - //generic_dout(0) << "entry finish" << dendl; - _lock.Unlock(); - } + void worker(); public: - WorkThreadPool(string name, int n=1) : + ThreadPool(string nm, int n=1) : + name(nm), _lock((new string(name + "::lock"))->c_str()), // deliberately leak this _stop(false), _pause(false), @@ -153,7 +120,7 @@ public: processing(0) { set_num_threads(n); } - ~WorkThreadPool() { + ~ThreadPool() { for (set::iterator p = _threads.begin(); p != _threads.end(); p++) @@ -180,26 +147,6 @@ public: } } - void start() { - for (set::iterator p = _threads.begin(); - p != _threads.end(); - p++) - (*p)->create(); - } - void stop(bool clear_after=true) { - _lock.Lock(); - _stop = true; - _cond.Signal(); - _lock.Unlock(); - for (set::iterator p = _threads.begin(); - p != _threads.end(); - p++) - (*p)->join(); - _lock.Lock(); - for (unsigned i=0; i_clear(); - _lock.Unlock(); - } void kick() { _lock.Lock(); _cond.Signal(); @@ -216,29 +163,11 @@ public: _lock.Unlock(); } - void pause() { - _lock.Lock(); - assert(!_pause); - _pause = true; - while (processing) - _wait_cond.Wait(_lock); - _lock.Unlock(); - } - void pause_new() { - _lock.Lock(); - assert(!_pause); - _pause = true; - _lock.Unlock(); - } - - void unpause() { - _lock.Lock(); - assert(_pause); - _pause = false; - _cond.Signal(); - _lock.Unlock(); - } - + void start(); + void stop(bool clear_after=true); + void pause(); + void pause_new(); + void unpause(); }; diff --git a/src/config.cc b/src/config.cc index 673000fc854..9ef1d9d6cc2 100644 --- a/src/config.cc +++ b/src/config.cc @@ -241,6 +241,7 @@ md_config_t g_conf = { debug_ms: 0, debug_mon: 1, debug_paxos: 0, + debug_tp: 0, debug_after: 0, @@ -808,6 +809,11 @@ void parse_config_options(std::vector& args, bool open) g_conf.debug_paxos = atoi(args[++i]); else g_debug_after_conf.debug_paxos = atoi(args[++i]); + else if (strcmp(args[i], "--debug_tp") == 0) + if (!g_conf.debug_after) + g_conf.debug_tp = atoi(args[++i]); + else + g_debug_after_conf.debug_tp = atoi(args[++i]); else if (strcmp(args[i], "--debug_after") == 0) { g_conf.debug_after = atoi(args[++i]); diff --git a/src/config.h b/src/config.h index d51a70e054a..992a44afced 100644 --- a/src/config.h +++ b/src/config.h @@ -104,6 +104,7 @@ struct md_config_t { int debug_ms; int debug_mon; int debug_paxos; + int debug_tp; int debug_after; diff --git a/src/include/buffer.h b/src/include/buffer.h index 5319ad83cb6..7beba30eacf 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -41,8 +41,13 @@ void *valloc(size_t); #endif #include +#include #include #include +#include + +using std::istream; +using std::string; #include "atomic.h" #include "page.h" diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 27cacff9414..9f6fabc1369 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -18,7 +18,6 @@ #include "ObjectStore.h" #include "JournalingObjectStore.h" -#include "common/ThreadPool.h" #include "common/Mutex.h" #include "Fake.h" diff --git a/src/osd/OSD.h b/src/osd/OSD.h index a8c48f8dfcd..33c557523fd 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -19,7 +19,6 @@ #include "common/Mutex.h" #include "common/RWLock.h" -#include "common/ThreadPool.h" #include "common/Timer.h" #include "common/WorkQueue.h" #include "common/LogClient.h" @@ -114,9 +113,9 @@ public: private: - WorkThreadPool op_tp; - WorkThreadPool recovery_tp; - WorkThreadPool disk_tp; + ThreadPool op_tp; + ThreadPool recovery_tp; + ThreadPool disk_tp; @@ -271,9 +270,9 @@ private: // -- op queue -- deque op_queue; - struct OpWQ : public WorkThreadPool::WorkQueue { + struct OpWQ : public ThreadPool::WorkQueue { OSD *osd; - OpWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::OpWQ", tp), osd(o) {} + OpWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::OpWQ", tp), osd(o) {} bool _enqueue(PG *pg) { pg->get(); @@ -502,9 +501,9 @@ private: utime_t defer_recovery_until; int recovery_ops_active; - struct RecoveryWQ : public WorkThreadPool::WorkQueue { + struct RecoveryWQ : public ThreadPool::WorkQueue { OSD *osd; - RecoveryWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::RecoveryWQ", tp), osd(o) {} + RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::RecoveryWQ", tp), osd(o) {} bool _enqueue(PG *pg) { if (!pg->recovery_item.get_xlist()) { @@ -578,9 +577,9 @@ private: // -- snap trimming -- xlist snap_trim_queue; - struct SnapTrimWQ : public WorkThreadPool::WorkQueue { + struct SnapTrimWQ : public ThreadPool::WorkQueue { OSD *osd; - SnapTrimWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::SnapTrimWQ", tp), osd(o) {} + SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::SnapTrimWQ", tp), osd(o) {} bool _enqueue(PG *pg) { if (pg->snap_trim_item.is_on_xlist()) @@ -610,9 +609,9 @@ private: // -- scrubbing -- xlist scrub_queue; - struct ScrubWQ : public WorkThreadPool::WorkQueue { + struct ScrubWQ : public ThreadPool::WorkQueue { OSD *osd; - ScrubWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::ScrubWQ", tp), osd(o) {} + ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::ScrubWQ", tp), osd(o) {} bool _enqueue(PG *pg) { if (pg->scrub_item.is_on_xlist())