From: Somnath Roy Date: Wed, 14 May 2014 22:50:25 +0000 (-0700) Subject: ceph-common: Implementation of the sharded threadpool. X-Git-Tag: v0.83~110^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8369c08e4c00a3777402e93f810d09f279aae376;p=ceph.git ceph-common: Implementation of the sharded threadpool. Threadpool will only be having a single work queue and internally the work queue will be having multiple storage data structures. Based on some logic (which is derived class implementation specific) the work queue will shard the requests among these storage structures. Each storage will be guarded by finer grained sunchronization objects. Sharded threadpool threads will be assigned to work on a shard based on some algorithm which is again derived class implementation specific. Signed-off-by: Somnath Roy --- diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index f47435bf3abc..16f79528873d 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -255,3 +255,95 @@ void ThreadPool::drain(WorkQueue_* wq) _lock.Unlock(); } +ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads): + cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), wq(NULL) {} + +void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) +{ + assert (wq != NULL); + ldout(cct,10) << "worker start" << dendl; + + std::stringstream ss; + ss << name << " thread " << (void*)pthread_self(); + heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str()); + + wq->_process(thread_index, hb); + + ldout(cct,10) << "sharded worker finish" << dendl; + + cct->get_heartbeat_map()->remove_worker(hb); + +} + +void ShardedThreadPool::start_threads() +{ + assert(shardedpool_lock.is_locked()); + int32_t thread_index = 0; + while (threads_shardedpool.size() < num_threads) { + + WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index); + ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; + threads_shardedpool.push_back(wt); + wt->create(); + thread_index ++; + } +} + +void ShardedThreadPool::start() +{ + ldout(cct,10) << "start" << dendl; + + shardedpool_lock.Lock(); + start_threads(); + shardedpool_lock.Unlock(); + ldout(cct,15) << "started" << dendl; +} + +void ShardedThreadPool::stop() +{ + ldout(cct,10) << "stop" << dendl; + assert (wq != NULL); + wq->stop_threads_on_queue(); + + for (vector::iterator p = threads_shardedpool.begin(); + p != threads_shardedpool.end(); + ++p) { + (*p)->join(); + delete *p; + } + threads_shardedpool.clear(); + ldout(cct,15) << "stopped" << dendl; +} + +void ShardedThreadPool::pause() +{ + ldout(cct,10) << "pause" << dendl; + assert (wq != NULL); + wq->pause_threads_on_queue(); + ldout(cct,10) << "paused" << dendl; +} + +void ShardedThreadPool::pause_new() +{ + ldout(cct,10) << "pause_new" << dendl; + assert (wq != NULL); + wq->pause_new_threads_on_queue(); + ldout(cct,10) << "paused_new" << dendl; +} + +void ShardedThreadPool::unpause() +{ + ldout(cct,10) << "unpause" << dendl; + assert (wq != NULL); + wq->unpause_threads_on_queue(); + ldout(cct,10) << "unpaused" << dendl; +} + +void ShardedThreadPool::drain() +{ + ldout(cct,10) << "drain" << dendl; + assert (wq != NULL); + wq->drain_threads_on_queue(); + ldout(cct,10) << "drained" << dendl; +} + diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 71cf89605e7c..41ad6bd407e7 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -41,13 +41,13 @@ public: heartbeat_handle_d *hb; time_t grace; time_t suicide_grace; + public: TPHandle( CephContext *cct, heartbeat_handle_d *hb, time_t grace, time_t suicide_grace) : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {} - public: void reset_tp_timeout(); void suspend_tp_timeout(); }; @@ -429,4 +429,109 @@ public: } }; +class ShardedThreadPool { + + CephContext *cct; + string name; + string lockname; + Mutex shardedpool_lock; + Cond shardedpol_cond; + uint32_t num_threads; + +public: + + class baseShardedWQ { + + public: + time_t timeout_interval, suicide_interval; + + protected: + atomic_t stop_threads; + atomic_t pause_threads; + atomic_t drain_threads; + atomic_t in_process; + + + public: + + baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) + ,stop_threads(0), pause_threads(0) + ,drain_threads(0), in_process(0) {} + virtual ~baseShardedWQ() {} + virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb) = 0; + virtual void stop_threads_on_queue() = 0; + virtual void pause_threads_on_queue() = 0; + virtual void pause_new_threads_on_queue() = 0; + virtual void unpause_threads_on_queue() = 0; + virtual void drain_threads_on_queue() = 0; + + }; + + template + class ShardedWQ: public baseShardedWQ { + + public: + ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp):baseShardedWQ(ti, sti) { + tp->set_wq(this); + + } + + virtual void _enqueue(T) = 0; + virtual void _enqueue_front(T) = 0; + + void queue(T item) { + _enqueue(item); + } + void queue_front(T item) { + _enqueue_front(item); + } + }; + +private: + + baseShardedWQ* wq; + // threads + struct WorkThreadSharded : public Thread { + ShardedThreadPool *pool; + uint32_t thread_index; + WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),thread_index(pthread_index) {} + void *entry() { + pool->shardedthreadpool_worker(thread_index); + return 0; + } + }; + + vector threads_shardedpool; + +public: + + ShardedThreadPool(CephContext *cct_, string nm, uint32_t pnum_threads); + + ~ShardedThreadPool(){}; + + void set_wq(baseShardedWQ* swq) { + wq = swq; + } + + /// start thread pool thread + void start(); + /// stop thread pool thread + void stop(); + /// pause thread pool (if it not already paused) + void pause(); + /// pause initiation of new work + void pause_new(); + /// resume work in thread pool. must match each pause() call 1:1 to resume. + void unpause(); + /// wait for all work to complete + void drain(); + + void start_threads(); + void shardedthreadpool_worker(uint32_t thread_index); + + + +}; + + #endif