template <typename T, typename K>
class PrioritizedQueue {
int64_t total_priority;
+ int64_t max_tokens_per_subqueue;
+ int64_t min_cost;
template <class F>
static unsigned filter_list_pairs(
struct SubQueue {
private:
map<K, list<pair<unsigned, T> > > q;
- unsigned tokens;
+ unsigned tokens, max_tokens;
int64_t size;
typename map<K, list<pair<unsigned, T> > >::iterator cur;
public:
SubQueue(const SubQueue &other)
- : q(other.q), tokens(other.tokens), size(other.size),
+ : q(other.q),
+ tokens(other.tokens),
+ max_tokens(other.max_tokens),
+ size(other.size),
cur(q.begin()) {}
- SubQueue() : tokens(0), size(0), cur(q.begin()) {}
+ SubQueue()
+ : tokens(0),
+ max_tokens(0),
+ size(0) {}
+ void set_max_tokens(unsigned mt) {
+ max_tokens = mt;
+ }
+ unsigned get_max_tokens() const {
+ return max_tokens;
+ }
unsigned num_tokens() const {
return tokens;
}
void put_tokens(unsigned t) {
tokens += t;
+ if (tokens > max_tokens)
+ tokens = max_tokens;
}
void take_tokens(unsigned t) {
+ if (tokens > t)
tokens -= t;
+ else
+ tokens = 0;
}
void enqueue(K cl, unsigned cost, T item) {
q[cl].push_back(make_pair(cost, item));
if (p != queue.end())
return &p->second;
total_priority += priority;
- return &queue[priority];
+ SubQueue *sq = &queue[priority];
+ sq->set_max_tokens(max_tokens_per_subqueue);
+ return sq;
}
void remove_queue(unsigned priority) {
}
public:
- PrioritizedQueue() : total_priority(0) {}
+ PrioritizedQueue(unsigned max_per, unsigned min_c)
+ : total_priority(0),
+ max_tokens_per_subqueue(max_per),
+ min_cost(min_c)
+ {}
unsigned length() {
unsigned total = 0;
}
void enqueue(K cl, unsigned priority, unsigned cost, T item) {
+ if (cost < min_cost)
+ cost = min_cost;
create_queue(priority)->enqueue(cl, cost, item);
}
void enqueue_front(K cl, unsigned priority, unsigned cost, T item) {
+ if (cost < min_cost)
+ cost = min_cost;
create_queue(priority)->enqueue_front(cl, cost, item);
}
return ret;
}
+ // if there are multiple buckets/subqueues with sufficient tokens,
+ // we behave like a strict priority queue among all subqueues that
+ // are eligible to run.
for (typename map<unsigned, SubQueue>::iterator i = queue.begin();
i != queue.end();
++i) {
return ret;
}
}
+
+ // if no subqueues have sufficient tokens, we behave like a strict
+ // priority queue.
T ret = queue.rbegin()->second.front().second;
unsigned cost = queue.rbegin()->second.front().first;
queue.rbegin()->second.pop_front();
OPTION(ms_bind_port_max, OPT_INT, 7100)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
+OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(ms_pq_min_cost, OPT_U64, 65536)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
OPTION(osd_map_cache_size, OPT_INT, 500)
OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message
OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading
+OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(osd_op_pq_min_cost, OPT_U64, 65536)
OPTION(osd_disk_threads, OPT_INT, 1)
OPTION(osd_recovery_threads, OPT_INT, 1)
OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration
: ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
"OSD::OpWQ", ti, ti*10, tp),
qlock("OpWQ::qlock"),
- osd(o) {}
+ osd(o),
+ pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ o->cct->_conf->osd_op_pq_min_cost)
+ {}
void _enqueue_front(pair<PGRef, OpRequestRef> item);
void _enqueue(pair<PGRef, OpRequestRef> item);