From: Robert LeBlanc Date: Tue, 26 Jan 2016 00:45:24 +0000 (+0000) Subject: osd: Add runtime config option to select which queue to use and the X-Git-Tag: v10.0.4~71^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=795fd8f267526bd44c5acfd8fa41c58da6b2dc92;p=ceph.git osd: Add runtime config option to select which queue to use and the priority of the cutoff between the strict queue and the normal queue. osd_op_queue takes prio, wpq, or debug_random to select PrioritizedQueue, WeightedPriorityQueue or randomly select one for testing. Default is PrioritizedQueue. osd_op_queue_cut_off takes low, high, or debug_random to select ops with priority greater than or equal to 64 (low) or 196 (high) to be queued in the strict queue rather than the normal queue. It is advantageous to use the high option so that repops are queued proportionally with regular ops to help prevent I/O starvation when an OSD has many non-primary PGs. This allows clients who accesses a primary PG on this busy OSD to not be indefinatly blocked by OSD sending all repops to the front of the line. Default is low. The defaults preserve the original operation. Signed-Off-By: Robert LeBlanc --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index f7d6fd0d287..7e1bf405d54 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -624,6 +624,8 @@ OPTION(osd_recovery_threads, OPT_INT, 1) OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration OPTION(osd_op_num_threads_per_shard, OPT_INT, 2) OPTION(osd_op_num_shards, OPT_INT, 5) +OPTION(osd_op_queue, OPT_STR, "prio") // PrioritzedQueue (prio), Weighted Priority Queue (wpq), or debug_random +OPTION(osd_op_queue_cut_off, OPT_STR, "low") // Min priority to go to strict queue. (low, high, debug_random) // Set to true for testing. Users should NOT set this. // If set to true even after reading enough shards to diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 1d0ca9c57f2..db8386f5baa 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1583,6 +1583,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, op_tracker(cct, cct->_conf->osd_enable_op_tracker, cct->_conf->osd_num_op_tracker_shard), test_ops_hook(NULL), + op_queue(get_io_queue()), + op_prio_cutoff(get_io_prio_cut()), op_shardedwq( cct->_conf->osd_op_num_shards, this, @@ -1939,6 +1941,8 @@ int OSD::init() load_pgs(); dout(2) << "superblock: i am osd." << superblock.whoami << dendl; + dout(0) << "using " << op_queue << " op queue with priority op cut off at " << + op_prio_cutoff << "." << dendl; create_logger(); @@ -8277,19 +8281,19 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) ShardData* sdata = shard_list[shard_index]; assert(NULL != sdata); sdata->sdata_op_ordering_lock.Lock(); - if (sdata->pqueue.empty()) { + if (sdata->pqueue->empty()) { sdata->sdata_op_ordering_lock.Unlock(); osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0); sdata->sdata_lock.Lock(); sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0)); sdata->sdata_lock.Unlock(); sdata->sdata_op_ordering_lock.Lock(); - if(sdata->pqueue.empty()) { + if(sdata->pqueue->empty()) { sdata->sdata_op_ordering_lock.Unlock(); return; } } - pair item = sdata->pqueue.dequeue(); + pair item = sdata->pqueue->dequeue(); sdata->pg_for_processing[&*(item.first)].push_back(item.second); sdata->sdata_op_ordering_lock.Unlock(); ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, @@ -8359,11 +8363,11 @@ void OSD::ShardedOpWQ::_enqueue(pair item) { unsigned cost = item.second.get_cost(); sdata->sdata_op_ordering_lock.Lock(); - if (priority >= CEPH_MSG_PRIO_LOW) - sdata->pqueue.enqueue_strict( + if (priority >= osd->op_prio_cutoff) + sdata->pqueue->enqueue_strict( item.second.get_owner(), priority, item); else - sdata->pqueue.enqueue( + sdata->pqueue->enqueue( item.second.get_owner(), priority, cost, item); sdata->sdata_op_ordering_lock.Unlock(); @@ -8388,12 +8392,12 @@ void OSD::ShardedOpWQ::_enqueue_front(pair item) { } unsigned priority = item.second.get_priority(); unsigned cost = item.second.get_cost(); - if (priority >= CEPH_MSG_PRIO_LOW) - sdata->pqueue.enqueue_strict_front( + if (priority >= osd->op_prio_cutoff) + sdata->pqueue->enqueue_strict_front( item.second.get_owner(), priority, item); else - sdata->pqueue.enqueue_front( + sdata->pqueue->enqueue_front( item.second.get_owner(), priority, cost, item); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 367d236c059..556c931c526 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -52,7 +52,9 @@ using namespace std; #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" #include "common/sharedptr_registry.hpp" +#include "common/WeightedPriorityQueue.h" #include "common/PrioritizedQueue.h" +#include "common/OpQueue.h" #include "messages/MOSDOp.h" #include "include/Spinlock.h" @@ -1617,6 +1619,11 @@ private: friend struct C_CompleteSplits; // -- op queue -- + enum io_queue { + prioritized, + weightedpriority}; + const io_queue op_queue; + const unsigned int op_prio_cutoff; friend class PGQueueable; class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair > { @@ -1626,13 +1633,25 @@ private: Cond sdata_cond; Mutex sdata_op_ordering_lock; map > pg_for_processing; - PrioritizedQueue< pair, entity_inst_t> pqueue; + std::unique_ptr, entity_inst_t>> pqueue; ShardData( string lock_name, string ordering_lock, - uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct) + uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, + io_queue opqueue) : sdata_lock(lock_name.c_str(), false, true, false, cct), - sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct), - pqueue(max_tok_per_prio, min_cost) {} + sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) { + if (opqueue == weightedpriority) { + pqueue = std::unique_ptr + , entity_inst_t>>( + new WeightedPriorityQueue< pair, entity_inst_t>( + max_tok_per_prio, min_cost)); + } else if (opqueue == prioritized) { + pqueue = std::unique_ptr + , entity_inst_t>>( + new PrioritizedQueue< pair, entity_inst_t>( + max_tok_per_prio, min_cost)); + } + } }; vector shard_list; @@ -1653,7 +1672,7 @@ private: ShardData* one_shard = new ShardData( lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority, - osd->cct->_conf->osd_op_pq_min_cost, osd->cct); + osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue); shard_list.push_back(one_shard); } } @@ -1687,7 +1706,7 @@ private: assert (NULL != sdata); sdata->sdata_op_ordering_lock.Lock(); f->open_object_section(lock_name); - sdata->pqueue.dump(f); + sdata->pqueue->dump(f); f->close_section(); sdata->sdata_op_ordering_lock.Unlock(); } @@ -1708,7 +1727,7 @@ private: sdata = shard_list[shard_index]; assert(sdata != NULL); sdata->sdata_op_ordering_lock.Lock(); - sdata->pqueue.remove_by_filter(Pred(pg)); + sdata->pqueue->remove_by_filter(Pred(pg), 0); sdata->pg_for_processing.erase(pg); sdata->sdata_op_ordering_lock.Unlock(); } @@ -1722,7 +1741,7 @@ private: assert(dequeued); list > _dequeued; sdata->sdata_op_ordering_lock.Lock(); - sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued); + sdata->pqueue->remove_by_filter(Pred(pg), &_dequeued); for (list >::iterator i = _dequeued.begin(); i != _dequeued.end(); ++i) { boost::optional mop = i->second.maybe_get_op(); @@ -1749,7 +1768,7 @@ private: ShardData* sdata = shard_list[shard_index]; assert(NULL != sdata); Mutex::Locker l(sdata->sdata_op_ordering_lock); - return sdata->pqueue.empty(); + return sdata->pqueue->empty(); } } op_shardedwq; @@ -2310,6 +2329,28 @@ protected: bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con) {} + io_queue get_io_queue() const { + if (cct->_conf->osd_op_queue == "debug_random") { + srand(time(NULL)); + return (rand() % 2 < 1) ? prioritized : weightedpriority; + } else if (cct->_conf->osd_op_queue == "wpq") { + return weightedpriority; + } else { + return prioritized; + } + } + + unsigned int get_io_prio_cut() const { + if (cct->_conf->osd_op_queue_cut_off == "debug_random") { + srand(time(NULL)); + return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (cct->_conf->osd_op_queue_cut_off == "low") { + return CEPH_MSG_PRIO_LOW; + } else { + return CEPH_MSG_PRIO_HIGH; + } + } + public: /* internal and external can point to the same messenger, they will still * be cleaned up properly*/