From 6e3363b20e590cd9df89f2caebe71867b94cc291 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 21 Jan 2013 15:29:28 -0800 Subject: [PATCH] common/PrioritizedQueue: add min cost, max tokens per bucket Two problems. First, we need to cap the tokens per bucket. Otherwise, a stream of items at one priority over time will indefinitely inflate the tokens available at another priority. The cap should represent how "bursty" we allow a given bucket to be. Start with 4MB for now. Second, set a floor on the item cost. Otherwise, we can have an infinite queue of 0 cost items that start over queues. More realistically, we need to balance the overhead of processing small items with the cost of large items. I.e., a 4 KB item is not 1/1000th as expensive as a 4MB item. Signed-off-by: Sage Weil --- src/common/PrioritizedQueue.h | 45 +++++++++++++++++++++++++++++++---- src/common/config_opts.h | 4 ++++ src/msg/DispatchQueue.h | 2 ++ src/osd/OSD.h | 5 +++- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 6ada06601246b..072eaebf2ee8b 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -45,6 +45,8 @@ template class PrioritizedQueue { int64_t total_priority; + int64_t max_tokens_per_subqueue; + int64_t min_cost; template static unsigned filter_list_pairs( @@ -76,22 +78,39 @@ class PrioritizedQueue { struct SubQueue { private: map > > q; - unsigned tokens; + unsigned tokens, max_tokens; int64_t size; typename map > >::iterator cur; public: SubQueue(const SubQueue &other) - : q(other.q), tokens(other.tokens), size(other.size), + : q(other.q), + tokens(other.tokens), + max_tokens(other.max_tokens), + size(other.size), cur(q.begin()) {} - SubQueue() : tokens(0), size(0), cur(q.begin()) {} + SubQueue() + : tokens(0), + max_tokens(0), + size(0) {} + void set_max_tokens(unsigned mt) { + max_tokens = mt; + } + unsigned get_max_tokens() const { + return max_tokens; + } unsigned num_tokens() const { return tokens; } void put_tokens(unsigned t) { tokens += t; + if (tokens > max_tokens) + tokens = max_tokens; } void take_tokens(unsigned t) { + if (tokens > t) tokens -= t; + else + tokens = 0; } void enqueue(K cl, unsigned cost, T item) { q[cl].push_back(make_pair(cost, item)); @@ -175,7 +194,9 @@ class PrioritizedQueue { if (p != queue.end()) return &p->second; total_priority += priority; - return &queue[priority]; + SubQueue *sq = &queue[priority]; + sq->set_max_tokens(max_tokens_per_subqueue); + return sq; } void remove_queue(unsigned priority) { @@ -196,7 +217,11 @@ class PrioritizedQueue { } public: - PrioritizedQueue() : total_priority(0) {} + PrioritizedQueue(unsigned max_per, unsigned min_c) + : total_priority(0), + max_tokens_per_subqueue(max_per), + min_cost(min_c) + {} unsigned length() { unsigned total = 0; @@ -276,10 +301,14 @@ public: } void enqueue(K cl, unsigned priority, unsigned cost, T item) { + if (cost < min_cost) + cost = min_cost; create_queue(priority)->enqueue(cl, cost, item); } void enqueue_front(K cl, unsigned priority, unsigned cost, T item) { + if (cost < min_cost) + cost = min_cost; create_queue(priority)->enqueue_front(cl, cost, item); } @@ -300,6 +329,9 @@ public: return ret; } + // if there are multiple buckets/subqueues with sufficient tokens, + // we behave like a strict priority queue among all subqueues that + // are eligible to run. for (typename map::iterator i = queue.begin(); i != queue.end(); ++i) { @@ -315,6 +347,9 @@ public: return ret; } } + + // if no subqueues have sufficient tokens, we behave like a strict + // priority queue. T ret = queue.rbegin()->second.front().second; unsigned cost = queue.rbegin()->second.front().first; queue.rbegin()->second.pop_front(); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 32bd41c6ce30d..7eb3e94fadcce 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -105,6 +105,8 @@ OPTION(ms_bind_port_min, OPT_INT, 6800) OPTION(ms_bind_port_max, OPT_INT, 7100) OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10) OPTION(ms_tcp_read_timeout, OPT_U64, 900) +OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 4194304) +OPTION(ms_pq_min_cost, OPT_U64, 65536) OPTION(ms_inject_socket_failures, OPT_U64, 0) OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds @@ -315,6 +317,8 @@ OPTION(osd_map_dedup, OPT_BOOL, true) OPTION(osd_map_cache_size, OPT_INT, 500) OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading +OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304) +OPTION(osd_op_pq_min_cost, OPT_U64, 65536) OPTION(osd_disk_threads, OPT_INT, 1) OPTION(osd_recovery_threads, OPT_INT, 1) OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index ea44c165d56d6..884e026934262 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -151,6 +151,8 @@ class DispatchQueue { DispatchQueue(CephContext *cct, SimpleMessenger *msgr) : cct(cct), msgr(msgr), lock("SimpleMessenger::DispatchQeueu::lock"), + mqueue(cct->_conf->ms_pq_max_tokens_per_priority, + cct->_conf->ms_pq_min_cost), next_pipe_id(1), dispatch_thread(this), stop(false) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 3d5e51523ea01..1af32d7388385 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -642,7 +642,10 @@ private: : ThreadPool::WorkQueueVal, PGRef >( "OSD::OpWQ", ti, ti*10, tp), qlock("OpWQ::qlock"), - osd(o) {} + osd(o), + pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority, + o->cct->_conf->osd_op_pq_min_cost) + {} void _enqueue_front(pair item); void _enqueue(pair item); -- 2.39.5