}
ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads):
- cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), wq(NULL) {}
+ cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads),
+ stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), wq(NULL) {}
void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
{
ss << name << " thread " << (void*)pthread_self();
heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str());
- wq->_process(thread_index, hb);
+ while (!stop_threads.read()) {
+ if(pause_threads.read()) {
+ shardedpool_lock.Lock();
+ while(pause_threads.read()) {
+ cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+ shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0));
+ }
+ shardedpool_lock.Unlock();
+ }
+
+ wq->_process(thread_index, hb, in_process);
+
+ if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
+ shardedpool_lock.Lock();
+ wait_cond.Signal();
+ shardedpool_lock.Unlock();
+ }
+
+ }
ldout(cct,10) << "sharded worker finish" << dendl;
void ShardedThreadPool::stop()
{
ldout(cct,10) << "stop" << dendl;
+ stop_threads.set(1);
assert (wq != NULL);
- wq->stop_threads_on_queue();
-
+ wq->return_waiting_threads();
for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
p != threads_shardedpool.end();
++p) {
void ShardedThreadPool::pause()
{
ldout(cct,10) << "pause" << dendl;
+ shardedpool_lock.Lock();
+ pause_threads.set(1);
assert (wq != NULL);
- wq->pause_threads_on_queue();
+ wq->return_waiting_threads();
+ while (in_process.read()){
+ wait_cond.Wait(shardedpool_lock);
+ }
+ shardedpool_lock.Unlock();
ldout(cct,10) << "paused" << dendl;
}
void ShardedThreadPool::pause_new()
{
ldout(cct,10) << "pause_new" << dendl;
+ shardedpool_lock.Lock();
+ pause_threads.set(1);
assert (wq != NULL);
- wq->pause_new_threads_on_queue();
+ wq->return_waiting_threads();
+ shardedpool_lock.Unlock();
ldout(cct,10) << "paused_new" << dendl;
}
void ShardedThreadPool::unpause()
{
ldout(cct,10) << "unpause" << dendl;
- assert (wq != NULL);
- wq->unpause_threads_on_queue();
+ shardedpool_lock.Lock();
+ pause_threads.set(0);
+ shardedpol_cond.Signal();
+ shardedpool_lock.Unlock();
ldout(cct,10) << "unpaused" << dendl;
}
void ShardedThreadPool::drain()
{
ldout(cct,10) << "drain" << dendl;
+ shardedpool_lock.Lock();
+ drain_threads.set(1);
assert (wq != NULL);
- wq->drain_threads_on_queue();
+ wq->return_waiting_threads();
+ while (in_process.read() || !wq->is_all_shard_empty()){
+ wait_cond.Wait(shardedpool_lock);
+ }
+ drain_threads.set(0);
+ shardedpool_lock.Unlock();
ldout(cct,10) << "drained" << dendl;
}
string lockname;
Mutex shardedpool_lock;
Cond shardedpol_cond;
+ Cond wait_cond;
uint32_t num_threads;
+ atomic_t stop_threads;
+ atomic_t pause_threads;
+ atomic_t drain_threads;
+ atomic_t in_process;
public:
class baseShardedWQ {
-
+
public:
time_t timeout_interval, suicide_interval;
-
- protected:
- atomic_t stop_threads;
- atomic_t pause_threads;
- atomic_t drain_threads;
- atomic_t in_process;
-
-
- public:
-
- baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti)
- ,stop_threads(0), pause_threads(0)
- ,drain_threads(0), in_process(0) {}
+ baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
virtual ~baseShardedWQ() {}
- virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb) = 0;
- virtual void stop_threads_on_queue() = 0;
- virtual void pause_threads_on_queue() = 0;
- virtual void pause_new_threads_on_queue() = 0;
- virtual void unpause_threads_on_queue() = 0;
- virtual void drain_threads_on_queue() = 0;
- };
+ virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) = 0;
+ virtual void return_waiting_threads() = 0;
+ virtual bool is_all_shard_empty() = 0;
+ };
template <typename T>
class ShardedWQ: public baseShardedWQ {
+
+ ShardedThreadPool* sharded_pool;
+
+ protected:
+ virtual void _enqueue(T) = 0;
+ virtual void _enqueue_front(T) = 0;
+
public:
- ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp):baseShardedWQ(ti, sti) {
+ ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): baseShardedWQ(ti, sti),
+ sharded_pool(tp) {
tp->set_wq(this);
-
}
-
- virtual void _enqueue(T) = 0;
- virtual void _enqueue_front(T) = 0;
+ virtual ~ShardedWQ() {}
void queue(T item) {
_enqueue(item);
void queue_front(T item) {
_enqueue_front(item);
}
+ void drain() {
+ sharded_pool->drain();
+ }
+
};
private:
};
vector<WorkThreadSharded*> threads_shardedpool;
+ void start_threads();
+ void shardedthreadpool_worker(uint32_t thread_index);
+ void set_wq(baseShardedWQ* swq) {
+ wq = swq;
+ }
+
+
public:
~ShardedThreadPool(){};
- void set_wq(baseShardedWQ* swq) {
- wq = swq;
- }
-
/// start thread pool thread
void start();
/// stop thread pool thread
/// wait for all work to complete
void drain();
- void start_threads();
- void shardedthreadpool_worker(uint32_t thread_index);
-
-
-
};