_lock.Unlock();
}
-ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads):
- cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads),
- stop_threads(0), pause_threads(0),drain_threads(0), num_paused(0), num_drained(0), wq(NULL) {}
+ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm,
+ uint32_t pnum_threads): cct(pcct_),name(nm),lockname(nm + "::lock"),
+ shardedpool_lock(lockname.c_str()),num_threads(pnum_threads),stop_threads(0),
+ pause_threads(0),drain_threads(0), num_paused(0), num_drained(0), wq(NULL) {}
void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
{
- assert (wq != NULL);
+ assert(wq != NULL);
ldout(cct,10) << "worker start" << dendl;
std::stringstream ss;
ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
threads_shardedpool.push_back(wt);
wt->create();
- thread_index ++;
+ thread_index++;
}
}
{
ldout(cct,10) << "stop" << dendl;
stop_threads.set(1);
- assert (wq != NULL);
+ assert(wq != NULL);
wq->return_waiting_threads();
for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
p != threads_shardedpool.end();
ldout(cct,10) << "pause" << dendl;
shardedpool_lock.Lock();
pause_threads.set(1);
- assert (wq != NULL);
+ assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_paused){
wait_cond.Wait(shardedpool_lock);
ldout(cct,10) << "pause_new" << dendl;
shardedpool_lock.Lock();
pause_threads.set(1);
- assert (wq != NULL);
+ assert(wq != NULL);
wq->return_waiting_threads();
shardedpool_lock.Unlock();
ldout(cct,10) << "paused_new" << dendl;
ldout(cct,10) << "drain" << dendl;
shardedpool_lock.Lock();
drain_threads.set(1);
- assert (wq != NULL);
+ assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_drained) {
wait_cond.Wait(shardedpool_lock);
public:
- class baseShardedWQ {
+ class BaseShardedWQ {
public:
time_t timeout_interval, suicide_interval;
- baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
- virtual ~baseShardedWQ() {}
+ BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
+ virtual ~BaseShardedWQ() {}
virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
virtual void return_waiting_threads() = 0;
};
template <typename T>
- class ShardedWQ: public baseShardedWQ {
+ class ShardedWQ: public BaseShardedWQ {
ShardedThreadPool* sharded_pool;
public:
- ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): baseShardedWQ(ti, sti),
+ ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
sharded_pool(tp) {
tp->set_wq(this);
}
private:
- baseShardedWQ* wq;
+ BaseShardedWQ* wq;
// threads
struct WorkThreadSharded : public Thread {
ShardedThreadPool *pool;
uint32_t thread_index;
- WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),thread_index(pthread_index) {}
+ WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
+ thread_index(pthread_index) {}
void *entry() {
pool->shardedthreadpool_worker(thread_index);
return 0;
vector<WorkThreadSharded*> threads_shardedpool;
void start_threads();
void shardedthreadpool_worker(uint32_t thread_index);
- void set_wq(baseShardedWQ* swq) {
+ void set_wq(BaseShardedWQ* swq) {
wq = swq;
}
finished_lock("OSD::finished_lock"),
op_tracker(cct, cct->_conf->osd_enable_op_tracker),
test_ops_hook(NULL),
- op_shardedwq(cct->_conf->osd_op_num_shards, this, cct->_conf->osd_op_thread_timeout, &op_sharded_tp),
+ op_shardedwq(cct->_conf->osd_op_num_shards, this,
+ cct->_conf->osd_op_thread_timeout, &op_sharded_tp),
peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
pg_map_lock("OSD::pg_map_lock"),
pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
sdata->pg_for_processing[&*(item.first)].push_back(item.second);
sdata->sdata_op_ordering_lock.Unlock();
- ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
+ ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
+ suicide_interval);
(item.first)->lock_suspend_timeout(tp_handle);
}
-void OSD::ShardedOpWQ::_enqueue(pair <PGRef, OpRequestRef> item) {
+void OSD::ShardedOpWQ::_enqueue(pair<PGRef, OpRequestRef> item) {
uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
}
-void OSD::ShardedOpWQ::_enqueue_front(pair <PGRef, OpRequestRef> item) {
+void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) {
uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
char order_lock[32] = {0};
snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i);
- ShardData* one_shard = new ShardData(lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
- osd->cct->_conf->osd_op_pq_min_cost);
+ ShardData* one_shard = new ShardData(lock_name, order_lock,
+ osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ osd->cct->_conf->osd_op_pq_min_cost);
shard_list.push_back(one_shard);
}
}