_lock.Unlock();
}
+ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads):
+ cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), wq(NULL) {}
+
+void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
+{
+ assert (wq != NULL);
+ ldout(cct,10) << "worker start" << dendl;
+
+ std::stringstream ss;
+ ss << name << " thread " << (void*)pthread_self();
+ heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str());
+
+ wq->_process(thread_index, hb);
+
+ ldout(cct,10) << "sharded worker finish" << dendl;
+
+ cct->get_heartbeat_map()->remove_worker(hb);
+
+}
+
+void ShardedThreadPool::start_threads()
+{
+ assert(shardedpool_lock.is_locked());
+ int32_t thread_index = 0;
+ while (threads_shardedpool.size() < num_threads) {
+
+ WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
+ ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
+ threads_shardedpool.push_back(wt);
+ wt->create();
+ thread_index ++;
+ }
+}
+
+void ShardedThreadPool::start()
+{
+ ldout(cct,10) << "start" << dendl;
+
+ shardedpool_lock.Lock();
+ start_threads();
+ shardedpool_lock.Unlock();
+ ldout(cct,15) << "started" << dendl;
+}
+
+void ShardedThreadPool::stop()
+{
+ ldout(cct,10) << "stop" << dendl;
+ assert (wq != NULL);
+ wq->stop_threads_on_queue();
+
+ for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
+ p != threads_shardedpool.end();
+ ++p) {
+ (*p)->join();
+ delete *p;
+ }
+ threads_shardedpool.clear();
+ ldout(cct,15) << "stopped" << dendl;
+}
+
+void ShardedThreadPool::pause()
+{
+ ldout(cct,10) << "pause" << dendl;
+ assert (wq != NULL);
+ wq->pause_threads_on_queue();
+ ldout(cct,10) << "paused" << dendl;
+}
+
+void ShardedThreadPool::pause_new()
+{
+ ldout(cct,10) << "pause_new" << dendl;
+ assert (wq != NULL);
+ wq->pause_new_threads_on_queue();
+ ldout(cct,10) << "paused_new" << dendl;
+}
+
+void ShardedThreadPool::unpause()
+{
+ ldout(cct,10) << "unpause" << dendl;
+ assert (wq != NULL);
+ wq->unpause_threads_on_queue();
+ ldout(cct,10) << "unpaused" << dendl;
+}
+
+void ShardedThreadPool::drain()
+{
+ ldout(cct,10) << "drain" << dendl;
+ assert (wq != NULL);
+ wq->drain_threads_on_queue();
+ ldout(cct,10) << "drained" << dendl;
+}
+
heartbeat_handle_d *hb;
time_t grace;
time_t suicide_grace;
+ public:
TPHandle(
CephContext *cct,
heartbeat_handle_d *hb,
time_t grace,
time_t suicide_grace)
: cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
- public:
void reset_tp_timeout();
void suspend_tp_timeout();
};
}
};
+class ShardedThreadPool {
+
+ CephContext *cct;
+ string name;
+ string lockname;
+ Mutex shardedpool_lock;
+ Cond shardedpol_cond;
+ uint32_t num_threads;
+
+public:
+
+ class baseShardedWQ {
+
+ public:
+ time_t timeout_interval, suicide_interval;
+
+ protected:
+ atomic_t stop_threads;
+ atomic_t pause_threads;
+ atomic_t drain_threads;
+ atomic_t in_process;
+
+
+ public:
+
+ baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti)
+ ,stop_threads(0), pause_threads(0)
+ ,drain_threads(0), in_process(0) {}
+ virtual ~baseShardedWQ() {}
+ virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb) = 0;
+ virtual void stop_threads_on_queue() = 0;
+ virtual void pause_threads_on_queue() = 0;
+ virtual void pause_new_threads_on_queue() = 0;
+ virtual void unpause_threads_on_queue() = 0;
+ virtual void drain_threads_on_queue() = 0;
+
+ };
+
+ template <typename T>
+ class ShardedWQ: public baseShardedWQ {
+
+ public:
+ ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp):baseShardedWQ(ti, sti) {
+ tp->set_wq(this);
+
+ }
+
+ virtual void _enqueue(T) = 0;
+ virtual void _enqueue_front(T) = 0;
+
+ void queue(T item) {
+ _enqueue(item);
+ }
+ void queue_front(T item) {
+ _enqueue_front(item);
+ }
+ };
+
+private:
+
+ baseShardedWQ* wq;
+ // threads
+ struct WorkThreadSharded : public Thread {
+ ShardedThreadPool *pool;
+ uint32_t thread_index;
+ WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),thread_index(pthread_index) {}
+ void *entry() {
+ pool->shardedthreadpool_worker(thread_index);
+ return 0;
+ }
+ };
+
+ vector<WorkThreadSharded*> threads_shardedpool;
+
+public:
+
+ ShardedThreadPool(CephContext *cct_, string nm, uint32_t pnum_threads);
+
+ ~ShardedThreadPool(){};
+
+ void set_wq(baseShardedWQ* swq) {
+ wq = swq;
+ }
+
+ /// start thread pool thread
+ void start();
+ /// stop thread pool thread
+ void stop();
+ /// pause thread pool (if it not already paused)
+ void pause();
+ /// pause initiation of new work
+ void pause_new();
+ /// resume work in thread pool. must match each pause() call 1:1 to resume.
+ void unpause();
+ /// wait for all work to complete
+ void drain();
+
+ void start_threads();
+ void shardedthreadpool_worker(uint32_t thread_index);
+
+
+
+};
+
+
#endif