From: Somnath Roy Date: Tue, 20 May 2014 23:01:34 +0000 (-0700) Subject: ceph-common: The Sharded threadpool worker logic changed X-Git-Tag: v0.83~110^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c24ef0074dbcf20b8cef15ff53e306cb72a5b05a;p=ceph.git ceph-common: The Sharded threadpool worker logic changed Now, the _process() of the derived queue is processing one request at a time and the outer loop is controlled by the sharded threadpool. The stop/pause/drain functionalities are controlled by the sharded TP. Signed-off-by: Somnath Roy --- diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index 16f79528873d..ea5678c5921d 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -256,7 +256,8 @@ void ThreadPool::drain(WorkQueue_* wq) } ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads): - cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), wq(NULL) {} + cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), + stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), wq(NULL) {} void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) { @@ -267,7 +268,25 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) ss << name << " thread " << (void*)pthread_self(); heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str()); - wq->_process(thread_index, hb); + while (!stop_threads.read()) { + if(pause_threads.read()) { + shardedpool_lock.Lock(); + while(pause_threads.read()) { + cct->get_heartbeat_map()->reset_timeout(hb, 4, 0); + shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0)); + } + shardedpool_lock.Unlock(); + } + + wq->_process(thread_index, hb, in_process); + + if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) { + shardedpool_lock.Lock(); + wait_cond.Signal(); + shardedpool_lock.Unlock(); + } + + } ldout(cct,10) << "sharded worker finish" << dendl; @@ -302,9 +321,9 @@ void ShardedThreadPool::start() void ShardedThreadPool::stop() { ldout(cct,10) << "stop" << dendl; + stop_threads.set(1); assert (wq != NULL); - wq->stop_threads_on_queue(); - + wq->return_waiting_threads(); for (vector::iterator p = threads_shardedpool.begin(); p != threads_shardedpool.end(); ++p) { @@ -318,32 +337,50 @@ void ShardedThreadPool::stop() void ShardedThreadPool::pause() { ldout(cct,10) << "pause" << dendl; + shardedpool_lock.Lock(); + pause_threads.set(1); assert (wq != NULL); - wq->pause_threads_on_queue(); + wq->return_waiting_threads(); + while (in_process.read()){ + wait_cond.Wait(shardedpool_lock); + } + shardedpool_lock.Unlock(); ldout(cct,10) << "paused" << dendl; } void ShardedThreadPool::pause_new() { ldout(cct,10) << "pause_new" << dendl; + shardedpool_lock.Lock(); + pause_threads.set(1); assert (wq != NULL); - wq->pause_new_threads_on_queue(); + wq->return_waiting_threads(); + shardedpool_lock.Unlock(); ldout(cct,10) << "paused_new" << dendl; } void ShardedThreadPool::unpause() { ldout(cct,10) << "unpause" << dendl; - assert (wq != NULL); - wq->unpause_threads_on_queue(); + shardedpool_lock.Lock(); + pause_threads.set(0); + shardedpol_cond.Signal(); + shardedpool_lock.Unlock(); ldout(cct,10) << "unpaused" << dendl; } void ShardedThreadPool::drain() { ldout(cct,10) << "drain" << dendl; + shardedpool_lock.Lock(); + drain_threads.set(1); assert (wq != NULL); - wq->drain_threads_on_queue(); + wq->return_waiting_threads(); + while (in_process.read() || !wq->is_all_shard_empty()){ + wait_cond.Wait(shardedpool_lock); + } + drain_threads.set(0); + shardedpool_lock.Unlock(); ldout(cct,10) << "drained" << dendl; } diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 41ad6bd407e7..6311cd5e0495 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -436,48 +436,43 @@ class ShardedThreadPool { string lockname; Mutex shardedpool_lock; Cond shardedpol_cond; + Cond wait_cond; uint32_t num_threads; + atomic_t stop_threads; + atomic_t pause_threads; + atomic_t drain_threads; + atomic_t in_process; public: class baseShardedWQ { - + public: time_t timeout_interval, suicide_interval; - - protected: - atomic_t stop_threads; - atomic_t pause_threads; - atomic_t drain_threads; - atomic_t in_process; - - - public: - - baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) - ,stop_threads(0), pause_threads(0) - ,drain_threads(0), in_process(0) {} + baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {} virtual ~baseShardedWQ() {} - virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb) = 0; - virtual void stop_threads_on_queue() = 0; - virtual void pause_threads_on_queue() = 0; - virtual void pause_new_threads_on_queue() = 0; - virtual void unpause_threads_on_queue() = 0; - virtual void drain_threads_on_queue() = 0; - }; + virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) = 0; + virtual void return_waiting_threads() = 0; + virtual bool is_all_shard_empty() = 0; + }; template class ShardedWQ: public baseShardedWQ { + + ShardedThreadPool* sharded_pool; + + protected: + virtual void _enqueue(T) = 0; + virtual void _enqueue_front(T) = 0; + public: - ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp):baseShardedWQ(ti, sti) { + ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): baseShardedWQ(ti, sti), + sharded_pool(tp) { tp->set_wq(this); - } - - virtual void _enqueue(T) = 0; - virtual void _enqueue_front(T) = 0; + virtual ~ShardedWQ() {} void queue(T item) { _enqueue(item); @@ -485,6 +480,10 @@ public: void queue_front(T item) { _enqueue_front(item); } + void drain() { + sharded_pool->drain(); + } + }; private: @@ -502,6 +501,13 @@ private: }; vector threads_shardedpool; + void start_threads(); + void shardedthreadpool_worker(uint32_t thread_index); + void set_wq(baseShardedWQ* swq) { + wq = swq; + } + + public: @@ -509,10 +515,6 @@ public: ~ShardedThreadPool(){}; - void set_wq(baseShardedWQ* swq) { - wq = swq; - } - /// start thread pool thread void start(); /// stop thread pool thread @@ -526,11 +528,6 @@ public: /// wait for all work to complete void drain(); - void start_threads(); - void shardedthreadpool_worker(uint32_t thread_index); - - - };