ldout(cct,10) << "unpause" << dendl;
shardedpool_lock.Lock();
pause_threads = false;
+ wq->stop_return_waiting_threads();
shardedpool_cond.Signal();
shardedpool_lock.Unlock();
ldout(cct,10) << "unpaused" << dendl;
wait_cond.Wait(shardedpool_lock);
}
drain_threads = false;
+ wq->stop_return_waiting_threads();
shardedpool_cond.Signal();
shardedpool_lock.Unlock();
ldout(cct,10) << "drained" << dendl;
virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
virtual void return_waiting_threads() = 0;
+ virtual void stop_return_waiting_threads() = 0;
virtual bool is_shard_empty(uint32_t thread_index) = 0;
};
// peek at spg_t
sdata->sdata_op_ordering_lock.Lock();
if (sdata->pqueue->empty()) {
- dout(20) << __func__ << " empty q, waiting" << dendl;
- osd->cct->get_heartbeat_map()->clear_timeout(hb);
sdata->sdata_lock.Lock();
- sdata->sdata_op_ordering_lock.Unlock();
- sdata->sdata_cond.Wait(sdata->sdata_lock);
- sdata->sdata_lock.Unlock();
- sdata->sdata_op_ordering_lock.Lock();
- if (sdata->pqueue->empty()) {
+ if (!sdata->stop_waiting) {
+ dout(20) << __func__ << " empty q, waiting" << dendl;
+ osd->cct->get_heartbeat_map()->clear_timeout(hb);
+ sdata->sdata_op_ordering_lock.Unlock();
+ sdata->sdata_cond.Wait(sdata->sdata_lock);
+ sdata->sdata_lock.Unlock();
+ sdata->sdata_op_ordering_lock.Lock();
+ if (sdata->pqueue->empty()) {
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+ osd->cct->get_heartbeat_map()->reset_timeout(hb,
+ osd->cct->_conf->threadpool_default_timeout, 0);
+ } else {
+ dout(0) << __func__ << " need return immediately" << dendl;
+ sdata->sdata_lock.Unlock();
sdata->sdata_op_ordering_lock.Unlock();
return;
}
- osd->cct->get_heartbeat_map()->reset_timeout(hb,
- osd->cct->_conf->threadpool_default_timeout, 0);
}
OpQueueItem item = sdata->pqueue->dequeue();
if (osd->is_stopping()) {
/// priority queue
std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
+ bool stop_waiting = false;
+
void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
unsigned priority = item.get_priority();
unsigned cost = item.get_cost();
ShardData* sdata = shard_list[i];
assert (NULL != sdata);
sdata->sdata_lock.Lock();
+ sdata->stop_waiting = true;
sdata->sdata_cond.Signal();
sdata->sdata_lock.Unlock();
}
}
+ void stop_return_waiting_threads() override {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ assert (NULL != sdata);
+ sdata->sdata_lock.Lock();
+ sdata->stop_waiting = false;
+ sdata->sdata_lock.Unlock();
+ }
+ }
+
void dump(Formatter *f) {
for(uint32_t i = 0; i < num_shards; i++) {
auto &&sdata = shard_list[i];