From: Somnath Roy Date: Tue, 10 Jun 2014 22:24:47 +0000 (-0700) Subject: ShardedTP: Changes related to conforming to Ceph coding guidelines X-Git-Tag: v0.83~110^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a0e48b8b6ea42f455e761dcae95f672e96a060c4;p=ceph.git ShardedTP: Changes related to conforming to Ceph coding guidelines Signed-off-by: Somnath Roy --- diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index 11cb526dc4ed..b31a246799ee 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -255,13 +255,14 @@ void ThreadPool::drain(WorkQueue_* wq) _lock.Unlock(); } -ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, uint32_t pnum_threads): - cct(pcct_), name(nm), lockname(nm + "::lock"), shardedpool_lock(lockname.c_str()), num_threads(pnum_threads), - stop_threads(0), pause_threads(0),drain_threads(0), num_paused(0), num_drained(0), wq(NULL) {} +ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, + uint32_t pnum_threads): cct(pcct_),name(nm),lockname(nm + "::lock"), + shardedpool_lock(lockname.c_str()),num_threads(pnum_threads),stop_threads(0), + pause_threads(0),drain_threads(0), num_paused(0), num_drained(0), wq(NULL) {} void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) { - assert (wq != NULL); + assert(wq != NULL); ldout(cct,10) << "worker start" << dendl; std::stringstream ss; @@ -314,7 +315,7 @@ void ShardedThreadPool::start_threads() ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; threads_shardedpool.push_back(wt); wt->create(); - thread_index ++; + thread_index++; } } @@ -332,7 +333,7 @@ void ShardedThreadPool::stop() { ldout(cct,10) << "stop" << dendl; stop_threads.set(1); - assert (wq != NULL); + assert(wq != NULL); wq->return_waiting_threads(); for (vector::iterator p = threads_shardedpool.begin(); p != threads_shardedpool.end(); @@ -349,7 +350,7 @@ void ShardedThreadPool::pause() ldout(cct,10) << "pause" << dendl; shardedpool_lock.Lock(); pause_threads.set(1); - assert (wq != NULL); + assert(wq != NULL); wq->return_waiting_threads(); while (num_threads != num_paused){ wait_cond.Wait(shardedpool_lock); @@ -363,7 +364,7 @@ void ShardedThreadPool::pause_new() ldout(cct,10) << "pause_new" << dendl; shardedpool_lock.Lock(); pause_threads.set(1); - assert (wq != NULL); + assert(wq != NULL); wq->return_waiting_threads(); shardedpool_lock.Unlock(); ldout(cct,10) << "paused_new" << dendl; @@ -384,7 +385,7 @@ void ShardedThreadPool::drain() ldout(cct,10) << "drain" << dendl; shardedpool_lock.Lock(); drain_threads.set(1); - assert (wq != NULL); + assert(wq != NULL); wq->return_waiting_threads(); while (num_threads != num_drained) { wait_cond.Wait(shardedpool_lock); diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 8288c1890fc3..d7ae66032d47 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -446,12 +446,12 @@ class ShardedThreadPool { public: - class baseShardedWQ { + class BaseShardedWQ { public: time_t timeout_interval, suicide_interval; - baseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {} - virtual ~baseShardedWQ() {} + BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {} + virtual ~BaseShardedWQ() {} virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0; virtual void return_waiting_threads() = 0; @@ -459,7 +459,7 @@ public: }; template - class ShardedWQ: public baseShardedWQ { + class ShardedWQ: public BaseShardedWQ { ShardedThreadPool* sharded_pool; @@ -469,7 +469,7 @@ public: public: - ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): baseShardedWQ(ti, sti), + ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), sharded_pool(tp) { tp->set_wq(this); } @@ -489,12 +489,13 @@ public: private: - baseShardedWQ* wq; + BaseShardedWQ* wq; // threads struct WorkThreadSharded : public Thread { ShardedThreadPool *pool; uint32_t thread_index; - WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),thread_index(pthread_index) {} + WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p), + thread_index(pthread_index) {} void *entry() { pool->shardedthreadpool_worker(thread_index); return 0; @@ -504,7 +505,7 @@ private: vector threads_shardedpool; void start_threads(); void shardedthreadpool_worker(uint32_t thread_index); - void set_wq(baseShardedWQ* swq) { + void set_wq(BaseShardedWQ* swq) { wq = swq; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 6eb2df44932d..185b66716a90 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -941,7 +941,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, finished_lock("OSD::finished_lock"), op_tracker(cct, cct->_conf->osd_enable_op_tracker), test_ops_hook(NULL), - op_shardedwq(cct->_conf->osd_op_num_shards, this, cct->_conf->osd_op_thread_timeout, &op_sharded_tp), + op_shardedwq(cct->_conf->osd_op_num_shards, this, + cct->_conf->osd_op_thread_timeout, &op_sharded_tp), peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp), map_lock("OSD::map_lock"), pg_map_lock("OSD::pg_map_lock"), @@ -7984,7 +7985,8 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) pair item = sdata->pqueue.dequeue(); sdata->pg_for_processing[&*(item.first)].push_back(item.second); sdata->sdata_op_ordering_lock.Unlock(); - ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval); + ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, + suicide_interval); (item.first)->lock_suspend_timeout(tp_handle); @@ -8016,7 +8018,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) } -void OSD::ShardedOpWQ::_enqueue(pair item) { +void OSD::ShardedOpWQ::_enqueue(pair item) { uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); @@ -8040,7 +8042,7 @@ void OSD::ShardedOpWQ::_enqueue(pair item) { } -void OSD::ShardedOpWQ::_enqueue_front(pair item) { +void OSD::ShardedOpWQ::_enqueue_front(pair item) { uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 08922bd0363e..6ac829f2c3fa 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1322,8 +1322,9 @@ private: snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i); char order_lock[32] = {0}; snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i); - ShardData* one_shard = new ShardData(lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority, - osd->cct->_conf->osd_op_pq_min_cost); + ShardData* one_shard = new ShardData(lock_name, order_lock, + osd->cct->_conf->osd_op_pq_max_tokens_per_priority, + osd->cct->_conf->osd_op_pq_min_cost); shard_list.push_back(one_shard); } }