ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads):
cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads),
- stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), wq(NULL) {}
+ stop_threads(0), pause_threads(0),drain_threads(0), in_process(0), num_paused(0), num_drained(0), wq(NULL) {}
void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
{
while (!stop_threads.read()) {
if(pause_threads.read()) {
shardedpool_lock.Lock();
+ ++num_paused;
+ wait_cond.Signal();
while(pause_threads.read()) {
cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0));
}
+ --num_paused;
shardedpool_lock.Unlock();
}
-
- wq->_process(thread_index, hb, in_process);
-
- if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
+ if (drain_threads.read()) {
shardedpool_lock.Lock();
- wait_cond.Signal();
+ if (wq->is_shard_empty(thread_index)) {
+ ++num_drained;
+ wait_cond.Signal();
+ while (drain_threads.read()) {
+ cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+ shardedpol_cond.WaitInterval(cct, shardedpool_lock, utime_t(2, 0));
+ }
+ --num_drained;
+ }
shardedpool_lock.Unlock();
}
+ wq->_process(thread_index, hb);
+
}
ldout(cct,10) << "sharded worker finish" << dendl;
pause_threads.set(1);
assert (wq != NULL);
wq->return_waiting_threads();
- while (in_process.read()){
+ while (num_threads != num_paused){
wait_cond.Wait(shardedpool_lock);
}
shardedpool_lock.Unlock();
drain_threads.set(1);
assert (wq != NULL);
wq->return_waiting_threads();
- while (in_process.read() || !wq->is_all_shard_empty()){
+ while (num_threads != num_drained) {
wait_cond.Wait(shardedpool_lock);
}
drain_threads.set(0);
atomic_t pause_threads;
atomic_t drain_threads;
atomic_t in_process;
+ uint32_t num_paused;
+ uint32_t num_drained;
public:
baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
virtual ~baseShardedWQ() {}
- virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) = 0;
+ virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
virtual void return_waiting_threads() = 0;
- virtual bool is_all_shard_empty() = 0;
+ virtual bool is_shard_empty(uint32_t thread_index) = 0;
};
template <typename T>
pg->queue_op(op);
}
-void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) {
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) {
uint32_t shard_index = thread_index % num_shards;
return;
}
}
-
pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
sdata->pg_for_processing[&*(item.first)].push_back(item.second);
sdata->sdata_op_ordering_lock.Unlock();
- in_process.inc();
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
(item.first)->lock_suspend_timeout(tp_handle);
osd->dequeue_op(item.first, op, tp_handle);
(item.first)->unlock();
- in_process.dec();
}
}
}
- void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process );
+ void _process(uint32_t thread_index, heartbeat_handle_d *hb);
void _enqueue(pair <PGRef, OpRequestRef> item);
void _enqueue_front(pair <PGRef, OpRequestRef> item);
}
}
-
- bool is_all_shard_empty() {
- bool is_empty = true;
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
- assert(NULL != sdata);
- sdata->sdata_op_ordering_lock.Lock();
- if (!sdata->pqueue.empty()) {
- is_empty = false;
- sdata->sdata_op_ordering_lock.Unlock();
- break;
- }
- sdata->sdata_op_ordering_lock.Unlock();
- }
- return is_empty;
-
- }
- bool is_shard_empty(uint32_t shard_index) {
-
+ bool is_shard_empty(uint32_t thread_index) {
+ uint32_t shard_index = thread_index % num_shards;
ShardData* sdata = shard_list[shard_index];
assert(NULL != sdata);
Mutex::Locker l(sdata->sdata_op_ordering_lock);