From: Somnath Roy Date: Tue, 20 May 2014 23:20:54 +0000 (-0700) Subject: OSD:Derived sharded queue implementation is changed X-Git-Tag: v0.83~110^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b15bf6bbcb5872d3bce0573b924064fac2001173;p=ceph.git OSD:Derived sharded queue implementation is changed All the threadpool related stuff like stop/pause/drain etc. are not handled by sharded queue anymore. All it is implementing are related to processing,enqueue , signaling of waiting threads and shard queue status. The pg ordering is been taken care of by introducing a map in each shard. Signed-off-by: Somnath Roy --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 978f302408b..aa947529164 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -966,6 +966,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, next_removal_seq(0), service(this) { + assert(cct->_conf->osd_op_num_sharded_pool_threads >= cct->_conf->osd_op_num_shards); monc->set_messenger(client_messenger); op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time, cct->_conf->osd_op_log_threshold); @@ -7961,6 +7962,116 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op) pg->queue_op(op); } +void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) { + + uint32_t shard_index = thread_index % num_shards; + + ShardData* sdata = shard_list[shard_index]; + assert(NULL != sdata); + sdata->sdata_op_ordering_lock.Lock(); + if (sdata->pqueue.empty()) { + sdata->sdata_op_ordering_lock.Unlock(); + osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0); + sdata->sdata_lock.Lock(); + sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0)); + sdata->sdata_lock.Unlock(); + sdata->sdata_op_ordering_lock.Lock(); + if(sdata->pqueue.empty()) { + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + } + + pair item = sdata->pqueue.dequeue(); + sdata->pg_for_processing[&*(item.first)].push_back(item.second); + sdata->sdata_op_ordering_lock.Unlock(); + in_process.inc(); + ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval); + + (item.first)->lock_suspend_timeout(tp_handle); + + OpRequestRef op; + { + Mutex::Locker l(sdata->sdata_op_ordering_lock); + if (!sdata->pg_for_processing.count(&*(item.first))) { + (item.first)->unlock(); + return; + } + assert(sdata->pg_for_processing[&*(item.first)].size()); + op = sdata->pg_for_processing[&*(item.first)].front(); + sdata->pg_for_processing[&*(item.first)].pop_front(); + if (!(sdata->pg_for_processing[&*(item.first)].size())) + sdata->pg_for_processing.erase(&*(item.first)); + } + + lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: "; + Formatter *f = new_formatter("json"); + f->open_object_section("q"); + dump(f); + f->close_section(); + f->flush(*_dout); + delete f; + *_dout << dendl; + + osd->dequeue_op(item.first, op, tp_handle); + (item.first)->unlock(); + in_process.dec(); + +} + +void OSD::ShardedOpWQ::_enqueue(pair item) { + + uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); + + ShardData* sdata = shard_list[shard_index]; + assert (NULL != sdata); + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); + sdata->sdata_op_ordering_lock.Lock(); + + if (priority >= CEPH_MSG_PRIO_LOW) + sdata->pqueue.enqueue_strict( + item.second->get_req()->get_source_inst(), priority, item); + else + sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(), + priority, cost, item); + sdata->sdata_op_ordering_lock.Unlock(); + + sdata->sdata_lock.Lock(); + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); + +} + +void OSD::ShardedOpWQ::_enqueue_front(pair item) { + + uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); + + ShardData* sdata = shard_list[shard_index]; + assert (NULL != sdata); + sdata->sdata_op_ordering_lock.Lock(); + if (sdata->pg_for_processing.count(&*(item.first))) { + sdata->pg_for_processing[&*(item.first)].push_front(item.second); + item.second = sdata->pg_for_processing[&*(item.first)].back(); + sdata->pg_for_processing[&*(item.first)].pop_back(); + } + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); + if (priority >= CEPH_MSG_PRIO_LOW) + sdata->pqueue.enqueue_strict_front( + item.second->get_req()->get_source_inst(),priority, item); + else + sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(), + priority, cost, item); + + sdata->sdata_op_ordering_lock.Unlock(); + sdata->sdata_lock.Lock(); + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); + +} + + void OSDService::dequeue_pg(PG *pg, list *dequeued) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 0e27d2161ac..08c1219c00e 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1300,26 +1300,29 @@ private: struct ShardData { Mutex sdata_lock; Cond sdata_cond; + Mutex sdata_op_ordering_lock; + map > pg_for_processing; PrioritizedQueue< pair, entity_inst_t> pqueue; - ShardData(string lock_name, uint64_t max_tok_per_prio, uint64_t min_cost): + ShardData(string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost): sdata_lock(lock_name.c_str()), + sdata_op_ordering_lock(ordering_lock.c_str()), pqueue(max_tok_per_prio, min_cost) {} }; vector shard_list; OSD *osd; uint32_t num_shards; - Mutex opQ_lock; - Cond opQ_cond; public: ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, ShardedThreadPool* tp): ShardedThreadPool::ShardedWQ < pair >(ti, ti*10, tp), - osd(o), num_shards(pnum_shards), opQ_lock("OSD::ShardedOpWQLock") { + osd(o), num_shards(pnum_shards) { for(uint32_t i = 0; i < num_shards; i++) { char lock_name[32] = {0}; - snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD::ShardedOpWQ::", i); - ShardData* one_shard = new ShardData(lock_name, osd->cct->_conf->osd_op_pq_max_tokens_per_priority, + snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i); + char order_lock[32] = {0}; + snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i); + ShardData* one_shard = new ShardData(lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority, osd->cct->_conf->osd_op_pq_min_cost); shard_list.push_back(one_shard); } @@ -1333,190 +1336,28 @@ private: } } - void _process(uint32_t thread_index, heartbeat_handle_d *hb ) { - - uint32_t shard_index = thread_index % num_shards; - - ShardData* sdata = shard_list[shard_index]; - - if (NULL != sdata) { - - sdata->sdata_lock.Lock(); - - while (true) { - - while(!sdata->pqueue.empty()) { - - if (pause_threads.read() != 0){ - - break; - } - - in_process.inc(); - ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval); - tp_handle.reset_tp_timeout(); - - pair item = sdata->pqueue.dequeue(); - - (item.first)->lock_suspend_timeout(tp_handle); - //unlocking after holding the PG lock as it should maintain the op order - sdata->sdata_lock.Unlock(); - //Should it be within some config option ? - lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: "; - Formatter *f = new_formatter("json"); - f->open_object_section("q"); - dump(f); - f->close_section(); - f->flush(*_dout); - delete f; - *_dout << dendl; - - osd->dequeue_op(item.first, item.second, tp_handle); - (item.first)->unlock(); - - sdata->sdata_lock.Lock(); - in_process.dec(); - if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) { - opQ_lock.Lock(); - opQ_cond.Signal(); - opQ_lock.Unlock(); - } - } - - if (stop_threads.read() != 0){ - break; - } - - osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0); - sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0)); - - } - sdata->sdata_lock.Unlock(); - - } else { - assert(0); - } - - } - - void stop_threads_on_queue() { - stop_threads.set(1); - for(uint32_t i = 0; i < num_shards; i++) { - ShardData* sdata = shard_list[i]; - if (NULL != sdata) { - sdata->sdata_lock.Lock(); - sdata->sdata_cond.Signal(); - sdata->sdata_lock.Unlock(); - } - } + void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ); + void _enqueue(pair item); + void _enqueue_front(pair item); - } - - void pause_threads_on_queue() { - pause_threads.set(1); - opQ_lock.Lock(); - while (in_process.read()) { - opQ_cond.Wait(opQ_lock); - } - opQ_lock.Unlock(); - - } - - void pause_new_threads_on_queue() { - pause_threads.set(1); - - } - - void unpause_threads_on_queue() { - pause_threads.set(0); + void return_waiting_threads() { for(uint32_t i = 0; i < num_shards; i++) { ShardData* sdata = shard_list[i]; - if (NULL != sdata) { - sdata->sdata_lock.Lock(); - sdata->sdata_cond.Signal(); - sdata->sdata_lock.Unlock(); - } - } - - } - - void drain_threads_on_queue() { - drain_threads.set(1); - opQ_lock.Lock(); - for(uint32_t i = 0; i < num_shards; i++) { - if (!_empty(i)) { - opQ_cond.Wait(opQ_lock); - } - } - while (in_process.read()){ - opQ_cond.Wait(opQ_lock); - } - opQ_lock.Unlock(); - - drain_threads.set(0); - } - - void drain() { - - drain_threads_on_queue(); - } - - void _enqueue(pair item) { - - uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); - - ShardData* sdata = shard_list[shard_index]; - if (NULL != sdata) { - unsigned priority = item.second->get_req()->get_priority(); - unsigned cost = item.second->get_req()->get_cost(); - sdata->sdata_lock.Lock(); - if (priority >= CEPH_MSG_PRIO_LOW) - sdata->pqueue.enqueue_strict( - item.second->get_req()->get_source_inst(), priority, item); - else - sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(), - priority, cost, item); - - - sdata->sdata_cond.SignalOne(); - sdata->sdata_lock.Unlock(); - } else { - assert(0); - } - } - - void _enqueue_front(pair item) { - - uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); - - ShardData* sdata = shard_list[shard_index]; - if (NULL != sdata) { - unsigned priority = item.second->get_req()->get_priority(); - unsigned cost = item.second->get_req()->get_cost(); + assert (NULL != sdata); sdata->sdata_lock.Lock(); - if (priority >= CEPH_MSG_PRIO_LOW) - sdata->pqueue.enqueue_strict_front( - item.second->get_req()->get_source_inst(),priority, item); - else - sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(), - priority, cost, item); - - sdata->sdata_cond.SignalOne(); + sdata->sdata_cond.Signal(); sdata->sdata_lock.Unlock(); - } else { - assert(0); } + } - void dump(Formatter *f) { for(uint32_t i = 0; i < num_shards; i++) { ShardData* sdata = shard_list[i]; - if (NULL != sdata) { - sdata->sdata_lock.Lock(); - sdata->pqueue.dump(f); - sdata->sdata_lock.Unlock(); - } + assert (NULL != sdata); + sdata->sdata_op_ordering_lock.Lock(); + sdata->pqueue.dump(f); + sdata->sdata_op_ordering_lock.Unlock(); } } @@ -1530,44 +1371,58 @@ private: void dequeue(PG *pg, list *dequeued = 0) { ShardData* sdata = NULL; - if (pg) { - uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); - sdata = shard_list[shard_index]; - if (!sdata) { - assert(0); - } - } else { - assert(0); - } - + assert(pg != NULL); + uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); + sdata = shard_list[shard_index]; + assert(sdata != NULL); if (!dequeued) { - sdata->sdata_lock.Lock(); + sdata->sdata_op_ordering_lock.Lock(); sdata->pqueue.remove_by_filter(Pred(pg)); - sdata->sdata_lock.Unlock(); + sdata->pg_for_processing.erase(pg); + sdata->sdata_op_ordering_lock.Unlock(); } else { list > _dequeued; - sdata->sdata_lock.Lock(); + sdata->sdata_op_ordering_lock.Lock(); sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued); - sdata->sdata_lock.Unlock(); for (list >::iterator i = _dequeued.begin(); i != _dequeued.end(); ++i) { dequeued->push_back(i->second); } + if (sdata->pg_for_processing.count(pg)) { + dequeued->splice( + dequeued->begin(), + sdata->pg_for_processing[pg]); + sdata->pg_for_processing.erase(pg); + } + sdata->sdata_op_ordering_lock.Unlock(); } } - bool _empty(uint32_t shard_index) { - ShardData* sdata = shard_list[shard_index]; - if (NULL != sdata) { - sdata->sdata_lock.Lock(); - bool is_empty = sdata->pqueue.empty(); - sdata->sdata_lock.Unlock(); - return is_empty; + bool is_all_shard_empty() { + bool is_empty = true; + for(uint32_t i = 0; i < num_shards; i++) { + ShardData* sdata = shard_list[i]; + assert(NULL != sdata); + sdata->sdata_op_ordering_lock.Lock(); + if (!sdata->pqueue.empty()) { + is_empty = false; + sdata->sdata_op_ordering_lock.Unlock(); + break; + } + sdata->sdata_op_ordering_lock.Unlock(); } - return true; + return is_empty; } + + bool is_shard_empty(uint32_t shard_index) { + + ShardData* sdata = shard_list[shard_index]; + assert(NULL != sdata); + Mutex::Locker l(sdata->sdata_op_ordering_lock); + return sdata->pqueue.empty(); + } } op_shardedwq;