From 06845d0bf6d6fbbb9916df818be0aca10b7f8ae0 Mon Sep 17 00:00:00 2001 From: Somnath Roy Date: Wed, 14 May 2014 16:13:05 -0700 Subject: [PATCH] OSD: Sharded Op worker queue implementation for handling OSD ops This is the implementation for the client of the sharded thread pool/sharded workQ. Removed the op_wq class and now OSD ops are going through sharded workqueue model which is used by the sharded threadpool. Derived ShardedOpWQ implementation has a data structure called ShardData which has it's own lock/cond and storage. ShardedOpWQ holds a vector of that and the size of the vector is a config option. During enqueue operation on the queue, the ops are sharded across these ShardData based on pg hash % number of shards. Similarly, in the _process function the sharded thread pool threads are divided across ShardData based on thread index % number of shards Signed-off-by: Somnath Roy --- src/common/config_opts.h | 2 + src/osd/OSD.cc | 96 ++---------- src/osd/OSD.h | 327 ++++++++++++++++++++++++++++++++------- 3 files changed, 287 insertions(+), 138 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index d003c4b948d29..8d28e8a9a42d1 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -466,6 +466,8 @@ OPTION(osd_op_pq_min_cost, OPT_U64, 65536) OPTION(osd_disk_threads, OPT_INT, 1) OPTION(osd_recovery_threads, OPT_INT, 1) OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration +OPTION(osd_op_num_sharded_pool_threads, OPT_INT, 10) +OPTION(osd_op_num_shards, OPT_INT, 5) // Only use clone_overlap for recovery if there are fewer than // osd_recover_clone_overlap_limit entries in the overlap set diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 91ec596b72109..978f302408b8b 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -181,7 +181,7 @@ OSDService::OSDService(OSD *osd) : logger(osd->logger), recoverystate_perf(osd->recoverystate_perf), monc(osd->monc), - op_wq(osd->op_wq), + op_wq(osd->op_shardedwq), peering_wq(osd->peering_wq), recovery_wq(osd->recovery_wq), snap_trim_wq(osd->snap_trim_wq), @@ -923,6 +923,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, osd_compat(get_osd_compat_set()), state_lock(), state(STATE_INITIALIZING), op_tp(cct, "OSD::op_tp", cct->_conf->osd_op_threads, "osd_op_threads"), + op_sharded_tp(cct, "OSD::op_sharded_tp", cct->_conf->osd_op_num_sharded_pool_threads), recovery_tp(cct, "OSD::recovery_tp", cct->_conf->osd_recovery_threads, "osd_recovery_threads"), disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"), command_tp(cct, "OSD::command_tp", 1), @@ -940,7 +941,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, finished_lock("OSD::finished_lock"), op_tracker(cct, cct->_conf->osd_enable_op_tracker), test_ops_hook(NULL), - op_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp), + op_shardedwq(cct->_conf->osd_op_num_shards, this, cct->_conf->osd_op_thread_timeout, &op_sharded_tp), peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp), map_lock("OSD::map_lock"), pg_map_lock("OSD::pg_map_lock"), @@ -1052,7 +1053,7 @@ bool OSD::asok_command(string command, cmdmap_t& cmdmap, string format, op_tracker.dump_historic_ops(f); } else if (command == "dump_op_pq_state") { f->open_object_section("pq"); - op_wq.dump(f); + op_shardedwq.dump(f); f->close_section(); } else if (command == "dump_blacklist") { list > bl; @@ -1281,6 +1282,7 @@ int OSD::init() monc->set_log_client(&clog); op_tp.start(); + op_sharded_tp.start(); recovery_tp.start(); disk_tp.start(); command_tp.start(); @@ -1581,6 +1583,7 @@ void OSD::suicide(int exitcode) derr << " pausing thread pools" << dendl; op_tp.pause(); + op_sharded_tp.pause(); disk_tp.pause(); recovery_tp.pause(); command_tp.pause(); @@ -1630,7 +1633,7 @@ int OSD::shutdown() } // finish ops - op_wq.drain(); // should already be empty except for lagard PGs + op_shardedwq.drain(); // should already be empty except for lagard PGs { Mutex::Locker l(finished_lock); finished.clear(); // zap waiters (bleh, this is messy) @@ -1673,6 +1676,10 @@ int OSD::shutdown() op_tp.stop(); dout(10) << "op tp stopped" << dendl; + op_sharded_tp.drain(); + op_sharded_tp.stop(); + dout(10) << "op sharded tp stopped" << dendl; + command_tp.drain(); command_tp.stop(); dout(10) << "command tp stopped" << dendl; @@ -1714,7 +1721,6 @@ int OSD::shutdown() Mutex::Locker l(pg_stat_queue_lock); assert(pg_stat_queue.empty()); } - peering_wq.clear(); // Remove PGs #ifdef PG_DEBUG_REFS @@ -7955,88 +7961,10 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op) pg->queue_op(op); } -void OSD::OpWQ::_enqueue(pair item) -{ - unsigned priority = item.second->get_req()->get_priority(); - unsigned cost = item.second->get_req()->get_cost(); - if (priority >= CEPH_MSG_PRIO_LOW) - pqueue.enqueue_strict( - item.second->get_req()->get_source_inst(), - priority, item); - else - pqueue.enqueue(item.second->get_req()->get_source_inst(), - priority, cost, item); - osd->logger->set(l_osd_opq, pqueue.length()); -} - -void OSD::OpWQ::_enqueue_front(pair item) -{ - Mutex::Locker l(qlock); - if (pg_for_processing.count(&*(item.first))) { - pg_for_processing[&*(item.first)].push_front(item.second); - item.second = pg_for_processing[&*(item.first)].back(); - pg_for_processing[&*(item.first)].pop_back(); - } - unsigned priority = item.second->get_req()->get_priority(); - unsigned cost = item.second->get_req()->get_cost(); - if (priority >= CEPH_MSG_PRIO_LOW) - pqueue.enqueue_strict_front( - item.second->get_req()->get_source_inst(), - priority, item); - else - pqueue.enqueue_front(item.second->get_req()->get_source_inst(), - priority, cost, item); - osd->logger->set(l_osd_opq, pqueue.length()); -} - -PGRef OSD::OpWQ::_dequeue() -{ - assert(!pqueue.empty()); - PGRef pg; - { - Mutex::Locker l(qlock); - pair ret = pqueue.dequeue(); - pg = ret.first; - pg_for_processing[&*pg].push_back(ret.second); - } - osd->logger->set(l_osd_opq, pqueue.length()); - return pg; -} - -void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle) -{ - pg->lock_suspend_timeout(handle); - OpRequestRef op; - { - Mutex::Locker l(qlock); - if (!pg_for_processing.count(&*pg)) { - pg->unlock(); - return; - } - assert(pg_for_processing[&*pg].size()); - op = pg_for_processing[&*pg].front(); - pg_for_processing[&*pg].pop_front(); - if (!(pg_for_processing[&*pg].size())) - pg_for_processing.erase(&*pg); - } - - lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: "; - Formatter *f = new_formatter("json"); - f->open_object_section("q"); - dump(f); - f->close_section(); - f->flush(*_dout); - delete f; - *_dout << dendl; - - osd->dequeue_op(pg, op, handle); - pg->unlock(); -} - void OSDService::dequeue_pg(PG *pg, list *dequeued) { - osd->op_wq.dequeue(pg, dequeued); + osd->op_shardedwq.dequeue(pg, dequeued); } /* diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 0f5f05560c144..0e27d2161ac93 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -321,7 +321,7 @@ public: PerfCounters *&logger; PerfCounters *&recoverystate_perf; MonClient *&monc; - ThreadPool::WorkQueueVal, PGRef> &op_wq; + ShardedThreadPool::ShardedWQ < pair > &op_wq; ThreadPool::BatchWorkQueue &peering_wq; ThreadPool::WorkQueue &recovery_wq; ThreadPool::WorkQueue &snap_trim_wq; @@ -1087,6 +1087,7 @@ public: private: ThreadPool op_tp; + ShardedThreadPool op_sharded_tp; ThreadPool recovery_tp; ThreadPool disk_tp; ThreadPool command_tp; @@ -1293,65 +1294,283 @@ private: // -- op queue -- - struct OpWQ: public ThreadPool::WorkQueueVal, - PGRef > { - Mutex qlock; - map > pg_for_processing; + + class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair > { + + struct ShardData { + Mutex sdata_lock; + Cond sdata_cond; + PrioritizedQueue< pair, entity_inst_t> pqueue; + ShardData(string lock_name, uint64_t max_tok_per_prio, uint64_t min_cost): + sdata_lock(lock_name.c_str()), + pqueue(max_tok_per_prio, min_cost) {} + }; + + vector shard_list; OSD *osd; - PrioritizedQueue, entity_inst_t > pqueue; - OpWQ(OSD *o, time_t ti, ThreadPool *tp) - : ThreadPool::WorkQueueVal, PGRef >( - "OSD::OpWQ", ti, ti*10, tp), - qlock("OpWQ::qlock"), - osd(o), - pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority, - o->cct->_conf->osd_op_pq_min_cost) - {} + uint32_t num_shards; + Mutex opQ_lock; + Cond opQ_cond; + + public: + ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, ShardedThreadPool* tp): + ShardedThreadPool::ShardedWQ < pair >(ti, ti*10, tp), + osd(o), num_shards(pnum_shards), opQ_lock("OSD::ShardedOpWQLock") { + for(uint32_t i = 0; i < num_shards; i++) { + char lock_name[32] = {0}; + snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD::ShardedOpWQ::", i); + ShardData* one_shard = new ShardData(lock_name, osd->cct->_conf->osd_op_pq_max_tokens_per_priority, + osd->cct->_conf->osd_op_pq_min_cost); + shard_list.push_back(one_shard); + } + } - void dump(Formatter *f) { - lock(); - pqueue.dump(f); - unlock(); - } + ~ShardedOpWQ() { + + while(!shard_list.empty()) { + delete shard_list.back(); + shard_list.pop_back(); + } + } + + void _process(uint32_t thread_index, heartbeat_handle_d *hb ) { + + uint32_t shard_index = thread_index % num_shards; + + ShardData* sdata = shard_list[shard_index]; + + if (NULL != sdata) { + + sdata->sdata_lock.Lock(); + + while (true) { + + while(!sdata->pqueue.empty()) { + + if (pause_threads.read() != 0){ + + break; + } - void _enqueue_front(pair item); - void _enqueue(pair item); - PGRef _dequeue(); + in_process.inc(); + ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval); + tp_handle.reset_tp_timeout(); + + pair item = sdata->pqueue.dequeue(); + + (item.first)->lock_suspend_timeout(tp_handle); + //unlocking after holding the PG lock as it should maintain the op order + sdata->sdata_lock.Unlock(); + //Should it be within some config option ? + lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: "; + Formatter *f = new_formatter("json"); + f->open_object_section("q"); + dump(f); + f->close_section(); + f->flush(*_dout); + delete f; + *_dout << dendl; + + osd->dequeue_op(item.first, item.second, tp_handle); + (item.first)->unlock(); + + sdata->sdata_lock.Lock(); + in_process.dec(); + if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) { + opQ_lock.Lock(); + opQ_cond.Signal(); + opQ_lock.Unlock(); + } + } + + if (stop_threads.read() != 0){ + break; + } + + osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0); + sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0)); + + } + sdata->sdata_lock.Unlock(); + + } else { + assert(0); + } - struct Pred { - PG *pg; - Pred(PG *pg) : pg(pg) {} - bool operator()(const pair &op) { - return op.first == pg; } - }; - void dequeue(PG *pg, list *dequeued = 0) { - lock(); - if (!dequeued) { - pqueue.remove_by_filter(Pred(pg)); - pg_for_processing.erase(pg); - } else { - list > _dequeued; - pqueue.remove_by_filter(Pred(pg), &_dequeued); - for (list >::iterator i = _dequeued.begin(); - i != _dequeued.end(); - ++i) { - dequeued->push_back(i->second); - } - if (pg_for_processing.count(pg)) { - dequeued->splice( - dequeued->begin(), - pg_for_processing[pg]); - pg_for_processing.erase(pg); - } + + void stop_threads_on_queue() { + stop_threads.set(1); + for(uint32_t i = 0; i < num_shards; i++) { + ShardData* sdata = shard_list[i]; + if (NULL != sdata) { + sdata->sdata_lock.Lock(); + sdata->sdata_cond.Signal(); + sdata->sdata_lock.Unlock(); + } + } + } - unlock(); - } - bool _empty() { - return pqueue.empty(); - } - void _process(PGRef pg, ThreadPool::TPHandle &handle); - } op_wq; + + void pause_threads_on_queue() { + pause_threads.set(1); + opQ_lock.Lock(); + while (in_process.read()) { + opQ_cond.Wait(opQ_lock); + } + opQ_lock.Unlock(); + + } + + void pause_new_threads_on_queue() { + pause_threads.set(1); + + } + + void unpause_threads_on_queue() { + pause_threads.set(0); + for(uint32_t i = 0; i < num_shards; i++) { + ShardData* sdata = shard_list[i]; + if (NULL != sdata) { + sdata->sdata_lock.Lock(); + sdata->sdata_cond.Signal(); + sdata->sdata_lock.Unlock(); + } + } + + } + + void drain_threads_on_queue() { + drain_threads.set(1); + opQ_lock.Lock(); + for(uint32_t i = 0; i < num_shards; i++) { + if (!_empty(i)) { + opQ_cond.Wait(opQ_lock); + } + } + while (in_process.read()){ + opQ_cond.Wait(opQ_lock); + } + opQ_lock.Unlock(); + + drain_threads.set(0); + } + + void drain() { + + drain_threads_on_queue(); + } + + void _enqueue(pair item) { + + uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); + + ShardData* sdata = shard_list[shard_index]; + if (NULL != sdata) { + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); + sdata->sdata_lock.Lock(); + if (priority >= CEPH_MSG_PRIO_LOW) + sdata->pqueue.enqueue_strict( + item.second->get_req()->get_source_inst(), priority, item); + else + sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(), + priority, cost, item); + + + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); + } else { + assert(0); + } + } + + void _enqueue_front(pair item) { + + uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); + + ShardData* sdata = shard_list[shard_index]; + if (NULL != sdata) { + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); + sdata->sdata_lock.Lock(); + if (priority >= CEPH_MSG_PRIO_LOW) + sdata->pqueue.enqueue_strict_front( + item.second->get_req()->get_source_inst(),priority, item); + else + sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(), + priority, cost, item); + + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); + } else { + assert(0); + } + } + + + void dump(Formatter *f) { + for(uint32_t i = 0; i < num_shards; i++) { + ShardData* sdata = shard_list[i]; + if (NULL != sdata) { + sdata->sdata_lock.Lock(); + sdata->pqueue.dump(f); + sdata->sdata_lock.Unlock(); + } + } + } + + struct Pred { + PG *pg; + Pred(PG *pg) : pg(pg) {} + bool operator()(const pair &op) { + return op.first == pg; + } + }; + + void dequeue(PG *pg, list *dequeued = 0) { + ShardData* sdata = NULL; + if (pg) { + uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); + sdata = shard_list[shard_index]; + if (!sdata) { + assert(0); + } + } else { + assert(0); + } + + if (!dequeued) { + sdata->sdata_lock.Lock(); + sdata->pqueue.remove_by_filter(Pred(pg)); + sdata->sdata_lock.Unlock(); + } else { + list > _dequeued; + sdata->sdata_lock.Lock(); + sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued); + sdata->sdata_lock.Unlock(); + for (list >::iterator i = _dequeued.begin(); + i != _dequeued.end(); ++i) { + dequeued->push_back(i->second); + } + } + + } + + bool _empty(uint32_t shard_index) { + ShardData* sdata = shard_list[shard_index]; + if (NULL != sdata) { + sdata->sdata_lock.Lock(); + bool is_empty = sdata->pqueue.empty(); + sdata->sdata_lock.Unlock(); + return is_empty; + } + return true; + + } + + } op_shardedwq; + void enqueue_op(PG *pg, OpRequestRef op); void dequeue_op( -- 2.39.5