public:
// How many Ops are in the queue
virtual unsigned length() const = 0;
- // Ops will be removed f evaluates to true, f may have sideeffects
- virtual void remove_by_filter(
- std::function<bool (T)> f) = 0;
- // Ops of this priority should be deleted immediately
+ // Ops of this class should be deleted immediately. If out isn't
+ // nullptr then items should be added to the front in
+ // front-to-back order. The typical strategy is to visit items in
+ // the queue in *reverse* order and to use *push_front* to insert
+ // them into out.
virtual void remove_by_class(K k, std::list<T> *out) = 0;
// Enqueue op in the back of the strict queue
virtual void enqueue_strict(K cl, unsigned priority, T item) = 0;
int64_t min_cost;
typedef std::list<std::pair<unsigned, T> > ListPairs;
- static unsigned filter_list_pairs(
- ListPairs *l,
- std::function<bool (T)> f) {
- unsigned ret = 0;
- for (typename ListPairs::iterator i = l->end();
- i != l->begin();
- ) {
- auto next = i;
- --next;
- if (f(next->second)) {
- ++ret;
- l->erase(next);
- } else {
- i = next;
- }
- }
- return ret;
- }
struct SubQueue {
private:
bool empty() const {
return q.empty();
}
- void remove_by_filter(
- std::function<bool (T)> f) {
- for (typename Classes::iterator i = q.begin();
- i != q.end();
- ) {
- size -= filter_list_pairs(&(i->second), f);
- if (i->second.empty()) {
- if (cur == i) {
- ++cur;
- }
- q.erase(i++);
- } else {
- ++i;
- }
- }
- if (cur == q.end())
- cur = q.begin();
- }
void remove_by_class(K k, std::list<T> *out) {
typename Classes::iterator i = q.find(k);
if (i == q.end()) {
return total;
}
- void remove_by_filter(
- std::function<bool (T)> f) final {
- for (typename SubQueues::iterator i = queue.begin();
- i != queue.end();
- ) {
- unsigned priority = i->first;
-
- i->second.remove_by_filter(f);
- if (i->second.empty()) {
- ++i;
- remove_queue(priority);
- } else {
- ++i;
- }
- }
- for (typename SubQueues::iterator i = high_queue.begin();
- i != high_queue.end();
- ) {
- i->second.remove_by_filter(f);
- if (i->second.empty()) {
- high_queue.erase(i++);
- } else {
- ++i;
- }
- }
- }
-
void remove_by_class(K k, std::list<T> *out = 0) final {
for (typename SubQueues::iterator i = queue.begin();
i != queue.end();
unsigned get_size() const {
return lp.size();
}
- unsigned filter_list_pairs(std::function<bool (T)>& f) {
- unsigned count = 0;
- // intrusive containers can't erase with a reverse_iterator
- // so we have to walk backwards on our own. Since there is
- // no iterator before begin, we have to test at the end.
- for (Lit i = --lp.end();; --i) {
- if (f(i->item)) {
- i = lp.erase_and_dispose(i, DelItem<ListPair>());
- ++count;
- }
- if (i == lp.begin()) {
- break;
- }
- }
- return count;
- }
unsigned filter_class(std::list<T>* out) {
unsigned count = 0;
for (Lit i = --lp.end();; --i) {
check_end();
return ret;
}
- unsigned filter_list_pairs(std::function<bool (T)>& f) {
- unsigned count = 0;
- // intrusive containers can't erase with a reverse_iterator
- // so we have to walk backwards on our own. Since there is
- // no iterator before begin, we have to test at the end.
- for (Kit i = klasses.begin(); i != klasses.end();) {
- count += i->filter_list_pairs(f);
- if (i->empty()) {
- if (next == i) {
- ++next;
- }
- i = klasses.erase_and_dispose(i, DelItem<Klass>());
- } else {
- ++i;
- }
- }
- check_end();
- return count;
- }
unsigned filter_class(K& cl, std::list<T>* out) {
unsigned count = 0;
Kit i = klasses.find(cl, MapKey<Klass, K>());
}
return ret;
}
- void filter_list_pairs(std::function<bool (T)>& f) {
- for (Sit i = queues.begin(); i != queues.end();) {
- size -= i->filter_list_pairs(f);
- if (i->empty()) {
- total_prio -= i->key;
- i = queues.erase_and_dispose(i, DelItem<SubQueue>());
- } else {
- ++i;
- }
- }
- }
void filter_class(K& cl, std::list<T>* out) {
for (Sit i = queues.begin(); i != queues.end();) {
size -= i->filter_class(cl, out);
unsigned length() const final {
return strict.size + normal.size;
}
- void remove_by_filter(std::function<bool (T)> f) final {
- strict.filter_list_pairs(f);
- normal.filter_list_pairs(f);
- }
void remove_by_class(K cl, std::list<T>* removed = 0) final {
strict.filter_class(cl, removed);
normal.filter_class(cl, removed);
OPTION(osd_op_num_shards, OPT_INT, 0)
OPTION(osd_op_num_shards_hdd, OPT_INT, 5)
OPTION(osd_op_num_shards_ssd, OPT_INT, 8)
-OPTION(osd_op_queue, OPT_STR, "wpq") // PrioritzedQueue (prio), Weighted Priority Queue (wpq), or debug_random
+
+// PrioritzedQueue (prio), Weighted Priority Queue (wpq ; default),
+// mclock_opclass, mclock_client, or debug_random. "mclock_opclass"
+// and "mclock_client" are based on the mClock/dmClock algorithm
+// (Gulati, et al. 2010). "mclock_opclass" prioritizes based on the
+// class the operation belongs to. "mclock_client" does the same but
+// also works to ienforce fairness between clients. "debug_random"
+// chooses among all four with equal probability.
+OPTION(osd_op_queue, OPT_STR, "wpq")
+
OPTION(osd_op_queue_cut_off, OPT_STR, "low") // Min priority to go to strict queue. (low, high, debug_random)
+// mClock priority queue parameters for five types of ops
+OPTION(osd_op_queue_mclock_client_op_res, OPT_DOUBLE, 1000.0)
+OPTION(osd_op_queue_mclock_client_op_wgt, OPT_DOUBLE, 500.0)
+OPTION(osd_op_queue_mclock_client_op_lim, OPT_DOUBLE, 0.0)
+OPTION(osd_op_queue_mclock_osd_subop_res, OPT_DOUBLE, 1000.0)
+OPTION(osd_op_queue_mclock_osd_subop_wgt, OPT_DOUBLE, 500.0)
+OPTION(osd_op_queue_mclock_osd_subop_lim, OPT_DOUBLE, 0.0)
+OPTION(osd_op_queue_mclock_snap_res, OPT_DOUBLE, 0.0)
+OPTION(osd_op_queue_mclock_snap_wgt, OPT_DOUBLE, 1.0)
+OPTION(osd_op_queue_mclock_snap_lim, OPT_DOUBLE, 0.001)
+OPTION(osd_op_queue_mclock_recov_res, OPT_DOUBLE, 0.0)
+OPTION(osd_op_queue_mclock_recov_wgt, OPT_DOUBLE, 1.0)
+OPTION(osd_op_queue_mclock_recov_lim, OPT_DOUBLE, 0.001)
+OPTION(osd_op_queue_mclock_scrub_res, OPT_DOUBLE, 0.0)
+OPTION(osd_op_queue_mclock_scrub_wgt, OPT_DOUBLE, 1.0)
+OPTION(osd_op_queue_mclock_scrub_lim, OPT_DOUBLE, 0.001)
+
OPTION(osd_ignore_stale_divergent_priors, OPT_BOOL, false) // do not assert on divergent_prior entries which aren't in the log and whose on-disk objects are newer
// Set to true for testing. Users should NOT set this.
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+
+#include <functional>
+#include <map>
+#include <list>
+#include <cmath>
+
+#include "common/Formatter.h"
+#include "common/OpQueue.h"
+
+#include "dmclock/src/dmclock_server.h"
+
+// the following is done to unclobber _ASSERT_H so it returns to the
+// way ceph likes it
+#include "include/assert.h"
+
+
+namespace ceph {
+
+ namespace dmc = crimson::dmclock;
+
+ template <typename T, typename K>
+ class mClockQueue : public OpQueue <T, K> {
+
+ using priority_t = unsigned;
+ using cost_t = unsigned;
+
+ typedef std::list<std::pair<cost_t, T> > ListPairs;
+
+ static unsigned filter_list_pairs(ListPairs *l,
+ std::function<bool (const T&)> f,
+ std::list<T>* out = nullptr) {
+ unsigned ret = 0;
+ for (typename ListPairs::iterator i = l->end();
+ i != l->begin();
+ /* no inc */
+ ) {
+ auto next = i;
+ --next;
+ if (f(next->second)) {
+ ++ret;
+ if (out) out->push_back(next->second);
+ l->erase(next);
+ } else {
+ i = next;
+ }
+ }
+ return ret;
+ }
+
+ struct SubQueue {
+ private:
+ typedef std::map<K, ListPairs> Classes;
+ // client-class to ordered queue
+ Classes q;
+
+ unsigned tokens, max_tokens;
+ int64_t size;
+
+ typename Classes::iterator cur;
+
+ public:
+
+ SubQueue(const SubQueue &other)
+ : q(other.q),
+ tokens(other.tokens),
+ max_tokens(other.max_tokens),
+ size(other.size),
+ cur(q.begin()) {}
+
+ SubQueue()
+ : tokens(0),
+ max_tokens(0),
+ size(0), cur(q.begin()) {}
+
+ void set_max_tokens(unsigned mt) {
+ max_tokens = mt;
+ }
+
+ unsigned get_max_tokens() const {
+ return max_tokens;
+ }
+
+ unsigned num_tokens() const {
+ return tokens;
+ }
+
+ void put_tokens(unsigned t) {
+ tokens += t;
+ if (tokens > max_tokens) {
+ tokens = max_tokens;
+ }
+ }
+
+ void take_tokens(unsigned t) {
+ if (tokens > t) {
+ tokens -= t;
+ } else {
+ tokens = 0;
+ }
+ }
+
+ void enqueue(K cl, cost_t cost, T item) {
+ q[cl].push_back(std::make_pair(cost, item));
+ if (cur == q.end())
+ cur = q.begin();
+ size++;
+ }
+
+ void enqueue_front(K cl, cost_t cost, T item) {
+ q[cl].push_front(std::make_pair(cost, item));
+ if (cur == q.end())
+ cur = q.begin();
+ size++;
+ }
+
+ std::pair<cost_t, T> front() const {
+ assert(!(q.empty()));
+ assert(cur != q.end());
+ return cur->second.front();
+ }
+
+ void pop_front() {
+ assert(!(q.empty()));
+ assert(cur != q.end());
+ cur->second.pop_front();
+ if (cur->second.empty()) {
+ auto i = cur;
+ ++cur;
+ q.erase(i);
+ } else {
+ ++cur;
+ }
+ if (cur == q.end()) {
+ cur = q.begin();
+ }
+ size--;
+ }
+
+ unsigned length() const {
+ assert(size >= 0);
+ return (unsigned)size;
+ }
+
+ bool empty() const {
+ return q.empty();
+ }
+
+ void remove_by_filter(std::function<bool (const T&)> f) {
+ for (typename Classes::iterator i = q.begin();
+ i != q.end();
+ /* no-inc */) {
+ size -= filter_list_pairs(&(i->second), f);
+ if (i->second.empty()) {
+ if (cur == i) {
+ ++cur;
+ }
+ i = q.erase(i);
+ } else {
+ ++i;
+ }
+ }
+ if (cur == q.end()) cur = q.begin();
+ }
+
+ void remove_by_class(K k, std::list<T> *out) {
+ typename Classes::iterator i = q.find(k);
+ if (i == q.end()) {
+ return;
+ }
+ size -= i->second.size();
+ if (i == cur) {
+ ++cur;
+ }
+ if (out) {
+ for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
+ out->push_front(j->second);
+ }
+ }
+ q.erase(i);
+ if (cur == q.end()) cur = q.begin();
+ }
+
+ void dump(ceph::Formatter *f) const {
+ f->dump_int("size", size);
+ f->dump_int("num_keys", q.size());
+ }
+ };
+
+ using SubQueues = std::map<priority_t, SubQueue>;
+
+ SubQueues high_queue;
+
+ dmc::PullPriorityQueue<K,T> queue;
+
+ // when enqueue_front is called, rather than try to re-calc tags
+ // to put in mClock priority queue, we'll just keep a separate
+ // list from which we dequeue items first, and only when it's
+ // empty do we use queue.
+ std::list<std::pair<K,T>> queue_front;
+
+ public:
+
+ mClockQueue(
+ const typename dmc::PullPriorityQueue<K,T>::ClientInfoFunc& info_func) :
+ queue(info_func, true)
+ {
+ // empty
+ }
+
+ unsigned length() const override final {
+ unsigned total = 0;
+ total += queue_front.size();
+ total += queue.request_count();
+ for (auto i = high_queue.cbegin(); i != high_queue.cend(); ++i) {
+ assert(i->second.length());
+ total += i->second.length();
+ }
+ return total;
+ }
+
+ // be sure to do things in reverse priority order and push_front
+ // to the list so items end up on list in front-to-back priority
+ // order
+ void remove_by_filter(std::function<bool (const T&)> filter_accum) {
+ queue.remove_by_req_filter(filter_accum, true);
+
+ for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) {
+ if (filter_accum(i->second)) {
+ i = decltype(i){ queue_front.erase(std::next(i).base()) };
+ } else {
+ ++i;
+ }
+ }
+
+ for (typename SubQueues::iterator i = high_queue.begin();
+ i != high_queue.end();
+ /* no-inc */ ) {
+ i->second.remove_by_filter(filter_accum);
+ if (i->second.empty()) {
+ i = high_queue.erase(i);
+ } else {
+ ++i;
+ }
+ }
+ }
+
+ void remove_by_class(K k, std::list<T> *out = nullptr) override final {
+ if (out) {
+ queue.remove_by_client(k,
+ true,
+ [&out] (const T& t) { out->push_front(t); });
+ } else {
+ queue.remove_by_client(k, true);
+ }
+
+ for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) {
+ if (k == i->first) {
+ if (nullptr != out) out->push_front(i->second);
+ i = decltype(i){ queue_front.erase(std::next(i).base()) };
+ } else {
+ ++i;
+ }
+ }
+
+ for (auto i = high_queue.begin(); i != high_queue.end(); /* no-inc */) {
+ i->second.remove_by_class(k, out);
+ if (i->second.empty()) {
+ i = high_queue.erase(i);
+ } else {
+ ++i;
+ }
+ }
+ }
+
+ void enqueue_strict(K cl, unsigned priority, T item) override final {
+ high_queue[priority].enqueue(cl, 0, item);
+ }
+
+ void enqueue_strict_front(K cl, unsigned priority, T item) override final {
+ high_queue[priority].enqueue_front(cl, 0, item);
+ }
+
+ void enqueue(K cl, unsigned priority, unsigned cost, T item) override final {
+ // priority is ignored
+ queue.add_request(item, cl, cost);
+ }
+
+ void enqueue_front(K cl,
+ unsigned priority,
+ unsigned cost,
+ T item) override final {
+ queue_front.emplace_front(std::pair<K,T>(cl, item));
+ }
+
+ bool empty() const override final {
+ return queue.empty() && high_queue.empty() && queue_front.empty();
+ }
+
+ T dequeue() override final {
+ assert(!empty());
+
+ if (!(high_queue.empty())) {
+ T ret = high_queue.rbegin()->second.front().second;
+ high_queue.rbegin()->second.pop_front();
+ if (high_queue.rbegin()->second.empty()) {
+ high_queue.erase(high_queue.rbegin()->first);
+ }
+ return ret;
+ }
+
+ if (!queue_front.empty()) {
+ T ret = queue_front.front().second;
+ queue_front.pop_front();
+ return ret;
+ }
+
+ auto pr = queue.pull_request();
+ assert(pr.is_retn());
+ auto& retn = pr.get_retn();
+ return *(retn.request);
+ }
+
+ void dump(ceph::Formatter *f) const override final {
+ f->open_array_section("high_queues");
+ for (typename SubQueues::const_iterator p = high_queue.begin();
+ p != high_queue.end();
+ ++p) {
+ f->open_object_section("subqueue");
+ f->dump_int("priority", p->first);
+ p->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+
+ f->open_object_section("queue_front");
+ f->dump_int("size", queue_front.size());
+ f->close_section();
+
+ f->open_object_section("queue");
+ f->dump_int("size", queue.request_count());
+ f->close_section();
+ } // dump
+ };
+
+} // namespace ceph
osd_types.cc
ECUtil.cc
ExtentCache.cc
+ mClockOpClassQueue.cc
+ mClockClientQueue.cc
+ PGQueueable.cc
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
${osd_cyg_functions_src}
${osdc_osd_srcs})
$<TARGET_OBJECTS:cls_references_objs>
$<TARGET_OBJECTS:global_common_objs>
$<TARGET_OBJECTS:heap_profiler_objs>)
-target_link_libraries(osd ${LEVELDB_LIBRARIES} ${CMAKE_DL_LIBS} ${ALLOC_LIBS})
+target_link_libraries(osd ${LEVELDB_LIBRARIES} dmclock ${CMAKE_DL_LIBS} ${ALLOC_LIBS})
if(WITH_LTTNG)
add_dependencies(osd osd-tp pg-tp)
endif()
return *_dout << "osd." << whoami << " " << epoch << " ";
}
-void PGQueueable::RunVis::operator()(const OpRequestRef &op) {
- return osd->dequeue_op(pg, op, handle);
-}
-
-void PGQueueable::RunVis::operator()(const PGSnapTrim &op) {
- return pg->snap_trimmer(op.epoch_queued);
-}
-
-void PGQueueable::RunVis::operator()(const PGScrub &op) {
- return pg->scrub(op.epoch_queued, handle);
-}
-
-void PGQueueable::RunVis::operator()(const PGRecovery &op) {
- return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
-}
-
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
in_use.insert(out->begin(), out->end());
}
+
// =============================================================
#undef dout_context
}} // namespace ceph::osd_cmds
+
+std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) {
+ switch(q) {
+ case OSD::io_queue::prioritized:
+ out << "prioritized";
+ break;
+ case OSD::io_queue::weightedpriority:
+ out << "weightedpriority";
+ break;
+ case OSD::io_queue::mclock_opclass:
+ out << "mclock_opclass";
+ break;
+ case OSD::io_queue::mclock_client:
+ out << "mclock_client";
+ break;
+ }
+ return out;
+}
#include "OpRequest.h"
#include "Session.h"
+#include "osd/PGQueueable.h"
+
#include <atomic>
#include <map>
#include <memory>
#include "common/sharedptr_registry.hpp"
#include "common/WeightedPriorityQueue.h"
#include "common/PrioritizedQueue.h"
+#include "osd/mClockOpClassQueue.h"
+#include "osd/mClockClientQueue.h"
#include "messages/MOSDOp.h"
#include "include/Spinlock.h"
#include "common/EventTrace.h"
class OSD;
-struct PGScrub {
- epoch_t epoch_queued;
- explicit PGScrub(epoch_t e) : epoch_queued(e) {}
- ostream &operator<<(ostream &rhs) {
- return rhs << "PGScrub";
- }
-};
-
-struct PGSnapTrim {
- epoch_t epoch_queued;
- explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {}
- ostream &operator<<(ostream &rhs) {
- return rhs << "PGSnapTrim";
- }
-};
-
-struct PGRecovery {
- epoch_t epoch_queued;
- uint64_t reserved_pushes;
- PGRecovery(epoch_t e, uint64_t reserved_pushes)
- : epoch_queued(e), reserved_pushes(reserved_pushes) {}
- ostream &operator<<(ostream &rhs) {
- return rhs << "PGRecovery(epoch=" << epoch_queued
- << ", reserved_pushes: " << reserved_pushes << ")";
- }
-};
-
-class PGQueueable {
- typedef boost::variant<
- OpRequestRef,
- PGSnapTrim,
- PGScrub,
- PGRecovery
- > QVariant;
- QVariant qvariant;
- int cost;
- unsigned priority;
- utime_t start_time;
- entity_inst_t owner;
- epoch_t map_epoch; ///< an epoch we expect the PG to exist in
-
- struct RunVis : public boost::static_visitor<> {
- OSD *osd;
- PGRef &pg;
- ThreadPool::TPHandle &handle;
- RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
- : osd(osd), pg(pg), handle(handle) {}
- void operator()(const OpRequestRef &op);
- void operator()(const PGSnapTrim &op);
- void operator()(const PGScrub &op);
- void operator()(const PGRecovery &op);
- };
-
- struct StringifyVis : public boost::static_visitor<std::string> {
- std::string operator()(const OpRequestRef &op) {
- return stringify(op);
- }
- std::string operator()(const PGSnapTrim &op) {
- return "PGSnapTrim";
- }
- std::string operator()(const PGScrub &op) {
- return "PGScrub";
- }
- std::string operator()(const PGRecovery &op) {
- return "PGRecovery";
- }
- };
- friend ostream& operator<<(ostream& out, const PGQueueable& q) {
- StringifyVis v;
- return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
- << " prio " << q.priority << " cost " << q.cost
- << " e" << q.map_epoch << ")";
- }
-
-public:
- // cppcheck-suppress noExplicitConstructor
- PGQueueable(OpRequestRef op, epoch_t e)
- : qvariant(op), cost(op->get_req()->get_cost()),
- priority(op->get_req()->get_priority()),
- start_time(op->get_req()->get_recv_stamp()),
- owner(op->get_req()->get_source_inst()),
- map_epoch(e)
- {}
- PGQueueable(
- const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- PGQueueable(
- const PGScrub &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- PGQueueable(
- const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- const boost::optional<OpRequestRef> maybe_get_op() const {
- const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
- return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
- }
- uint64_t get_reserved_pushes() const {
- const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
- return op ? op->reserved_pushes : 0;
- }
- void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
- RunVis v(osd, pg, handle);
- boost::apply_visitor(v, qvariant);
- }
- unsigned get_priority() const { return priority; }
- int get_cost() const { return cost; }
- utime_t get_start_time() const { return start_time; }
- entity_inst_t get_owner() const { return owner; }
- epoch_t get_map_epoch() const { return map_epoch; }
-};
-
class OSDService {
public:
OSD *osd;
friend struct C_OpenPGs;
// -- op queue --
- enum io_queue {
+ enum class io_queue {
prioritized,
- weightedpriority
+ weightedpriority,
+ mclock_opclass,
+ mclock_client,
};
+ friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
const io_queue op_queue;
const unsigned int op_prio_cutoff;
* and already requeued the items.
*/
friend class PGQueueable;
+
class ShardedOpWQ
: public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
{
: sdata_lock(lock_name.c_str(), false, true, false, cct),
sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
false, cct) {
- if (opqueue == weightedpriority) {
+ if (opqueue == io_queue::weightedpriority) {
pqueue = std::unique_ptr
<WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
max_tok_per_prio, min_cost));
- } else if (opqueue == prioritized) {
+ } else if (opqueue == io_queue::prioritized) {
pqueue = std::unique_ptr
<PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
max_tok_per_prio, min_cost));
+ } else if (opqueue == io_queue::mclock_opclass) {
+ pqueue = std::unique_ptr
+ <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
+ } else if (opqueue == io_queue::mclock_client) {
+ pqueue = std::unique_ptr
+ <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
}
}
- };
+ }; // struct ShardData
vector<ShardData*> shard_list;
OSD *osd;
OSDMapRef get_osdmap() {
return osdmap;
}
- epoch_t get_osdmap_epoch() {
+ epoch_t get_osdmap_epoch() const {
return osdmap ? osdmap->get_epoch() : 0;
}
}
} remove_wq;
- private:
+private:
bool ms_can_fast_dispatch_any() const override { return true; }
bool ms_can_fast_dispatch(const Message *m) const override {
switch (m->get_type()) {
io_queue get_io_queue() const {
if (cct->_conf->osd_op_queue == "debug_random") {
+ static io_queue index_lookup[] = { io_queue::prioritized,
+ io_queue::weightedpriority,
+ io_queue::mclock_opclass,
+ io_queue::mclock_client };
srand(time(NULL));
- return (rand() % 2 < 1) ? prioritized : weightedpriority;
+ unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+ return index_lookup[which];
} else if (cct->_conf->osd_op_queue == "wpq") {
- return weightedpriority;
+ return io_queue::weightedpriority;
+ } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
+ return io_queue::mclock_opclass;
+ } else if (cct->_conf->osd_op_queue == "mclock_client") {
+ return io_queue::mclock_client;
} else {
- return prioritized;
+ return io_queue::prioritized;
}
}
friend class OSDService;
};
+
+std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
+
//compatibility of the executable
extern const CompatSet::Feature ceph_osd_feature_compat[];
extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
extern const CompatSet::Feature ceph_osd_feature_incompat[];
-#endif
+#endif // CEPH_OSD_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include "PG.h"
+#include "PGQueueable.h"
+#include "OSD.h"
+
+
+void PGQueueable::RunVis::operator()(const OpRequestRef &op) {
+ osd->dequeue_op(pg, op, handle);
+}
+
+void PGQueueable::RunVis::operator()(const PGSnapTrim &op) {
+ pg->snap_trimmer(op.epoch_queued);
+}
+
+void PGQueueable::RunVis::operator()(const PGScrub &op) {
+ pg->scrub(op.epoch_queued, handle);
+}
+
+void PGQueueable::RunVis::operator()(const PGRecovery &op) {
+ osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "osd/OpRequest.h"
+#include "osd/PG.h"
+
+
+class OSD;
+
+
+struct PGScrub {
+ epoch_t epoch_queued;
+ explicit PGScrub(epoch_t e) : epoch_queued(e) {}
+ ostream &operator<<(ostream &rhs) {
+ return rhs << "PGScrub";
+ }
+};
+
+struct PGSnapTrim {
+ epoch_t epoch_queued;
+ explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {}
+ ostream &operator<<(ostream &rhs) {
+ return rhs << "PGSnapTrim";
+ }
+};
+
+struct PGRecovery {
+ epoch_t epoch_queued;
+ uint64_t reserved_pushes;
+ PGRecovery(epoch_t e, uint64_t reserved_pushes)
+ : epoch_queued(e), reserved_pushes(reserved_pushes) {}
+ ostream &operator<<(ostream &rhs) {
+ return rhs << "PGRecovery(epoch=" << epoch_queued
+ << ", reserved_pushes: " << reserved_pushes << ")";
+ }
+};
+
+
+class PGQueueable {
+ typedef boost::variant<
+ OpRequestRef,
+ PGSnapTrim,
+ PGScrub,
+ PGRecovery
+ > QVariant;
+ QVariant qvariant;
+ int cost;
+ unsigned priority;
+ utime_t start_time;
+ entity_inst_t owner;
+ epoch_t map_epoch; ///< an epoch we expect the PG to exist in
+
+ struct RunVis : public boost::static_visitor<> {
+ OSD *osd;
+ PGRef &pg;
+ ThreadPool::TPHandle &handle;
+ RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
+ : osd(osd), pg(pg), handle(handle) {}
+ void operator()(const OpRequestRef &op);
+ void operator()(const PGSnapTrim &op);
+ void operator()(const PGScrub &op);
+ void operator()(const PGRecovery &op);
+ }; // struct RunVis
+
+ struct StringifyVis : public boost::static_visitor<std::string> {
+ std::string operator()(const OpRequestRef &op) {
+ return stringify(op);
+ }
+ std::string operator()(const PGSnapTrim &op) {
+ return "PGSnapTrim";
+ }
+ std::string operator()(const PGScrub &op) {
+ return "PGScrub";
+ }
+ std::string operator()(const PGRecovery &op) {
+ return "PGRecovery";
+ }
+ };
+
+ friend ostream& operator<<(ostream& out, const PGQueueable& q) {
+ StringifyVis v;
+ return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
+ << " prio " << q.priority << " cost " << q.cost
+ << " e" << q.map_epoch << ")";
+ }
+
+public:
+
+ PGQueueable(OpRequestRef op, epoch_t e)
+ : qvariant(op), cost(op->get_req()->get_cost()),
+ priority(op->get_req()->get_priority()),
+ start_time(op->get_req()->get_recv_stamp()),
+ owner(op->get_req()->get_source_inst()),
+ map_epoch(e)
+ {}
+ PGQueueable(
+ const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
+ const entity_inst_t &owner, epoch_t e)
+ : qvariant(op), cost(cost), priority(priority), start_time(start_time),
+ owner(owner), map_epoch(e) {}
+ PGQueueable(
+ const PGScrub &op, int cost, unsigned priority, utime_t start_time,
+ const entity_inst_t &owner, epoch_t e)
+ : qvariant(op), cost(cost), priority(priority), start_time(start_time),
+ owner(owner), map_epoch(e) {}
+ PGQueueable(
+ const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
+ const entity_inst_t &owner, epoch_t e)
+ : qvariant(op), cost(cost), priority(priority), start_time(start_time),
+ owner(owner), map_epoch(e) {}
+
+ const boost::optional<OpRequestRef> maybe_get_op() const {
+ const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
+ return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
+ }
+ uint64_t get_reserved_pushes() const {
+ const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
+ return op ? op->reserved_pushes : 0;
+ }
+ void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
+ RunVis v(osd, pg, handle);
+ boost::apply_visitor(v, qvariant);
+ }
+ unsigned get_priority() const { return priority; }
+ int get_cost() const { return cost; }
+ utime_t get_start_time() const { return start_time; }
+ entity_inst_t get_owner() const { return owner; }
+ epoch_t get_map_epoch() const { return map_epoch; }
+ const QVariant& get_variant() const { return qvariant; }
+}; // struct PGQueueable
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <memory>
+
+#include "osd/mClockClientQueue.h"
+#include "common/dout.h"
+
+
+namespace dmc = crimson::dmclock;
+
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout
+
+
+namespace ceph {
+
+ mClockClientQueue::mclock_op_tags_t::mclock_op_tags_t(CephContext *cct) :
+ client_op(cct->_conf->osd_op_queue_mclock_client_op_res,
+ cct->_conf->osd_op_queue_mclock_client_op_wgt,
+ cct->_conf->osd_op_queue_mclock_client_op_lim),
+ osd_subop(cct->_conf->osd_op_queue_mclock_osd_subop_res,
+ cct->_conf->osd_op_queue_mclock_osd_subop_wgt,
+ cct->_conf->osd_op_queue_mclock_osd_subop_lim),
+ snaptrim(cct->_conf->osd_op_queue_mclock_snap_res,
+ cct->_conf->osd_op_queue_mclock_snap_wgt,
+ cct->_conf->osd_op_queue_mclock_snap_lim),
+ recov(cct->_conf->osd_op_queue_mclock_recov_res,
+ cct->_conf->osd_op_queue_mclock_recov_wgt,
+ cct->_conf->osd_op_queue_mclock_recov_lim),
+ scrub(cct->_conf->osd_op_queue_mclock_scrub_res,
+ cct->_conf->osd_op_queue_mclock_scrub_wgt,
+ cct->_conf->osd_op_queue_mclock_scrub_lim)
+ {
+ dout(20) <<
+ "mClockClientQueue settings:: " <<
+ "client_op:" << client_op <<
+ "; osd_subop:" << osd_subop <<
+ "; snaptrim:" << snaptrim <<
+ "; recov:" << recov <<
+ "; scrub:" << scrub <<
+ dendl;
+ }
+
+
+ dmc::ClientInfo
+ mClockClientQueue::op_class_client_info_f(
+ const mClockClientQueue::InnerClient& client)
+ {
+ switch(client.second) {
+ case osd_op_type_t::client_op:
+ return mclock_op_tags->client_op;
+ case osd_op_type_t::osd_subop:
+ return mclock_op_tags->osd_subop;
+ case osd_op_type_t::bg_snaptrim:
+ return mclock_op_tags->snaptrim;
+ case osd_op_type_t::bg_recovery:
+ return mclock_op_tags->recov;
+ case osd_op_type_t::bg_scrub:
+ return mclock_op_tags->scrub;
+ default:
+ assert(0);
+ return dmc::ClientInfo(-1, -1, -1);
+ }
+ }
+
+
+ /*
+ * class mClockClientQueue
+ */
+
+ std::unique_ptr<mClockClientQueue::mclock_op_tags_t>
+ mClockClientQueue::mclock_op_tags(nullptr);
+
+ mClockClientQueue::pg_queueable_visitor_t
+ mClockClientQueue::pg_queueable_visitor;
+
+ mClockClientQueue::mClockClientQueue(CephContext *cct) :
+ queue(&mClockClientQueue::op_class_client_info_f)
+ {
+ // manage the singleton
+ if (!mclock_op_tags) {
+ mclock_op_tags.reset(new mclock_op_tags_t(cct));
+ }
+ }
+
+ mClockClientQueue::osd_op_type_t
+ mClockClientQueue::get_osd_op_type(const Request& request) {
+ osd_op_type_t type =
+ boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
+
+ // if we got client_op back then we need to distinguish between
+ // a client op and an osd subop.
+
+ if (osd_op_type_t::client_op != type) {
+ return type;
+ } else if (MSG_OSD_SUBOP ==
+ boost::get<OpRequestRef>(
+ request.second.get_variant())->get_req()->get_header().type) {
+ return osd_op_type_t::osd_subop;
+ } else {
+ return osd_op_type_t::client_op;
+ }
+ }
+
+ mClockClientQueue::InnerClient
+ inline mClockClientQueue::get_inner_client(const Client& cl,
+ const Request& request) {
+ return InnerClient(cl, get_osd_op_type(request));
+ }
+
+ // Formatted output of the queue
+ inline void mClockClientQueue::dump(ceph::Formatter *f) const {
+ queue.dump(f);
+ }
+
+ inline void mClockClientQueue::enqueue_strict(Client cl,
+ unsigned priority,
+ Request item) {
+ queue.enqueue_strict(get_inner_client(cl, item), priority, item);
+ }
+
+ // Enqueue op in the front of the strict queue
+ inline void mClockClientQueue::enqueue_strict_front(Client cl,
+ unsigned priority,
+ Request item) {
+ queue.enqueue_strict_front(get_inner_client(cl, item), priority, item);
+ }
+
+ // Enqueue op in the back of the regular queue
+ inline void mClockClientQueue::enqueue(Client cl,
+ unsigned priority,
+ unsigned cost,
+ Request item) {
+ queue.enqueue(get_inner_client(cl, item), priority, cost, item);
+ }
+
+ // Enqueue the op in the front of the regular queue
+ inline void mClockClientQueue::enqueue_front(Client cl,
+ unsigned priority,
+ unsigned cost,
+ Request item) {
+ queue.enqueue_front(get_inner_client(cl, item), priority, cost, item);
+ }
+
+ // Return an op to be dispatched
+ inline Request mClockClientQueue::dequeue() {
+ return queue.dequeue();
+ }
+} // namespace ceph
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+
+#include "boost/variant.hpp"
+
+#include "common/config.h"
+#include "common/ceph_context.h"
+#include "osd/PGQueueable.h"
+
+#include "common/mClockPriorityQueue.h"
+
+
+namespace ceph {
+
+ using Request = std::pair<spg_t, PGQueueable>;
+ using Client = entity_inst_t;
+
+
+ // This class exists to bridge the ceph code, which treats the class
+ // as the client, and the queue, where the class is
+ // osd_op_type_t. So this adpater class will transform calls
+ // appropriately.
+ class mClockClientQueue : public OpQueue<Request, Client> {
+
+ enum class osd_op_type_t {
+ client_op, osd_subop, bg_snaptrim, bg_recovery, bg_scrub };
+
+ using InnerClient = std::pair<entity_inst_t,osd_op_type_t>;
+
+ using queue_t = mClockQueue<Request, InnerClient>;
+
+ queue_t queue;
+
+ struct mclock_op_tags_t {
+ crimson::dmclock::ClientInfo client_op;
+ crimson::dmclock::ClientInfo osd_subop;
+ crimson::dmclock::ClientInfo snaptrim;
+ crimson::dmclock::ClientInfo recov;
+ crimson::dmclock::ClientInfo scrub;
+
+ mclock_op_tags_t(CephContext *cct);
+ };
+
+ static std::unique_ptr<mclock_op_tags_t> mclock_op_tags;
+
+ public:
+
+ mClockClientQueue(CephContext *cct);
+
+ static crimson::dmclock::ClientInfo
+ op_class_client_info_f(const InnerClient& client);
+
+ inline unsigned length() const override final {
+ return queue.length();
+ }
+
+ // Ops of this priority should be deleted immediately
+ inline void remove_by_class(Client cl,
+ std::list<Request> *out) override final {
+ queue.remove_by_filter(
+ [&cl, out] (const Request& r) -> bool {
+ if (cl == r.second.get_owner()) {
+ out->push_front(r);
+ return true;
+ } else {
+ return false;
+ }
+ });
+ }
+
+ void enqueue_strict(Client cl,
+ unsigned priority,
+ Request item) override final;
+
+ // Enqueue op in the front of the strict queue
+ void enqueue_strict_front(Client cl,
+ unsigned priority,
+ Request item) override final;
+
+ // Enqueue op in the back of the regular queue
+ void enqueue(Client cl,
+ unsigned priority,
+ unsigned cost,
+ Request item) override final;
+
+ // Enqueue the op in the front of the regular queue
+ void enqueue_front(Client cl,
+ unsigned priority,
+ unsigned cost,
+ Request item) override final;
+
+ // Return an op to be dispatch
+ Request dequeue() override final;
+
+ // Returns if the queue is empty
+ inline bool empty() const override final {
+ return queue.empty();
+ }
+
+ // Formatted output of the queue
+ void dump(ceph::Formatter *f) const override final;
+
+ protected:
+
+ struct pg_queueable_visitor_t : public boost::static_visitor<osd_op_type_t> {
+ osd_op_type_t operator()(const OpRequestRef& o) const {
+ // don't know if it's a client_op or a
+ return osd_op_type_t::client_op;
+ }
+
+ osd_op_type_t operator()(const PGSnapTrim& o) const {
+ return osd_op_type_t::bg_snaptrim;
+ }
+
+ osd_op_type_t operator()(const PGScrub& o) const {
+ return osd_op_type_t::bg_scrub;
+ }
+
+ osd_op_type_t operator()(const PGRecovery& o) const {
+ return osd_op_type_t::bg_recovery;
+ }
+ }; // class pg_queueable_visitor_t
+
+ static pg_queueable_visitor_t pg_queueable_visitor;
+
+ osd_op_type_t get_osd_op_type(const Request& request);
+ InnerClient get_inner_client(const Client& cl, const Request& request);
+ }; // class mClockClientAdapter
+
+} // namespace ceph
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <memory>
+
+#include "osd/mClockOpClassQueue.h"
+#include "common/dout.h"
+
+
+namespace dmc = crimson::dmclock;
+
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout
+
+
+namespace ceph {
+
+ mClockOpClassQueue::mclock_op_tags_t::mclock_op_tags_t(CephContext *cct) :
+ client_op(cct->_conf->osd_op_queue_mclock_client_op_res,
+ cct->_conf->osd_op_queue_mclock_client_op_wgt,
+ cct->_conf->osd_op_queue_mclock_client_op_lim),
+ osd_subop(cct->_conf->osd_op_queue_mclock_osd_subop_res,
+ cct->_conf->osd_op_queue_mclock_osd_subop_wgt,
+ cct->_conf->osd_op_queue_mclock_osd_subop_lim),
+ snaptrim(cct->_conf->osd_op_queue_mclock_snap_res,
+ cct->_conf->osd_op_queue_mclock_snap_wgt,
+ cct->_conf->osd_op_queue_mclock_snap_lim),
+ recov(cct->_conf->osd_op_queue_mclock_recov_res,
+ cct->_conf->osd_op_queue_mclock_recov_wgt,
+ cct->_conf->osd_op_queue_mclock_recov_lim),
+ scrub(cct->_conf->osd_op_queue_mclock_scrub_res,
+ cct->_conf->osd_op_queue_mclock_scrub_wgt,
+ cct->_conf->osd_op_queue_mclock_scrub_lim)
+ {
+ dout(20) <<
+ "mClockOpClassQueue settings:: " <<
+ "client_op:" << client_op <<
+ "; osd_subop:" << osd_subop <<
+ "; snaptrim:" << snaptrim <<
+ "; recov:" << recov <<
+ "; scrub:" << scrub <<
+ dendl;
+ }
+
+
+ dmc::ClientInfo
+ mClockOpClassQueue::op_class_client_info_f(const osd_op_type_t& op_type) {
+ switch(op_type) {
+ case osd_op_type_t::client_op:
+ return mclock_op_tags->client_op;
+ case osd_op_type_t::osd_subop:
+ return mclock_op_tags->osd_subop;
+ case osd_op_type_t::bg_snaptrim:
+ return mclock_op_tags->snaptrim;
+ case osd_op_type_t::bg_recovery:
+ return mclock_op_tags->recov;
+ case osd_op_type_t::bg_scrub:
+ return mclock_op_tags->scrub;
+ default:
+ assert(0);
+ return dmc::ClientInfo(-1, -1, -1);
+ }
+ }
+
+ /*
+ * class mClockOpClassQueue
+ */
+
+ std::unique_ptr<mClockOpClassQueue::mclock_op_tags_t>
+ mClockOpClassQueue::mclock_op_tags(nullptr);
+
+ mClockOpClassQueue::pg_queueable_visitor_t
+ mClockOpClassQueue::pg_queueable_visitor;
+
+ mClockOpClassQueue::mClockOpClassQueue(CephContext *cct) :
+ queue(&mClockOpClassQueue::op_class_client_info_f)
+ {
+ // manage the singleton
+ if (!mclock_op_tags) {
+ mclock_op_tags.reset(new mclock_op_tags_t(cct));
+ }
+ }
+
+ mClockOpClassQueue::osd_op_type_t
+ mClockOpClassQueue::get_osd_op_type(const Request& request) {
+ osd_op_type_t type =
+ boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
+
+ // if we got client_op back then we need to distinguish between
+ // a client op and an osd subop.
+
+ if (osd_op_type_t::client_op != type) {
+ return type;
+ } else if (MSG_OSD_SUBOP ==
+ boost::get<OpRequestRef>(
+ request.second.get_variant())->get_req()->get_header().type) {
+ return osd_op_type_t::osd_subop;
+ } else {
+ return osd_op_type_t::client_op;
+ }
+ }
+
+ // Formatted output of the queue
+ void mClockOpClassQueue::dump(ceph::Formatter *f) const {
+ queue.dump(f);
+ }
+
+} // namespace ceph
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+
+#include "boost/variant.hpp"
+
+#include "common/config.h"
+#include "common/ceph_context.h"
+#include "osd/PGQueueable.h"
+
+#include "common/mClockPriorityQueue.h"
+
+
+namespace ceph {
+
+ using Request = std::pair<spg_t, PGQueueable>;
+ using Client = entity_inst_t;
+
+
+ // This class exists to bridge the ceph code, which treats the class
+ // as the client, and the queue, where the class is
+ // osd_op_type_t. So this adpater class will transform calls
+ // appropriately.
+ class mClockOpClassQueue : public OpQueue<Request, Client> {
+
+ enum class osd_op_type_t {
+ client_op, osd_subop, bg_snaptrim, bg_recovery, bg_scrub };
+
+ using queue_t = mClockQueue<Request, osd_op_type_t>;
+
+ queue_t queue;
+
+ struct mclock_op_tags_t {
+ crimson::dmclock::ClientInfo client_op;
+ crimson::dmclock::ClientInfo osd_subop;
+ crimson::dmclock::ClientInfo snaptrim;
+ crimson::dmclock::ClientInfo recov;
+ crimson::dmclock::ClientInfo scrub;
+
+ mclock_op_tags_t(CephContext *cct);
+ };
+
+ static std::unique_ptr<mclock_op_tags_t> mclock_op_tags;
+
+ public:
+
+ mClockOpClassQueue(CephContext *cct);
+
+ static crimson::dmclock::ClientInfo
+ op_class_client_info_f(const osd_op_type_t& op_type);
+
+ inline unsigned length() const override final {
+ return queue.length();
+ }
+
+ // Ops of this priority should be deleted immediately
+ inline void remove_by_class(Client cl,
+ std::list<Request> *out) override final {
+ queue.remove_by_filter(
+ [&cl, out] (const Request& r) -> bool {
+ if (cl == r.second.get_owner()) {
+ out->push_front(r);
+ return true;
+ } else {
+ return false;
+ }
+ });
+ }
+
+ inline void enqueue_strict(Client cl,
+ unsigned priority,
+ Request item) override final {
+ queue.enqueue_strict(get_osd_op_type(item), priority, item);
+ }
+
+ // Enqueue op in the front of the strict queue
+ inline void enqueue_strict_front(Client cl,
+ unsigned priority,
+ Request item) override final {
+ queue.enqueue_strict_front(get_osd_op_type(item), priority, item);
+ }
+
+ // Enqueue op in the back of the regular queue
+ inline void enqueue(Client cl,
+ unsigned priority,
+ unsigned cost,
+ Request item) override final {
+ queue.enqueue(get_osd_op_type(item), priority, cost, item);
+ }
+
+ // Enqueue the op in the front of the regular queue
+ inline void enqueue_front(Client cl,
+ unsigned priority,
+ unsigned cost,
+ Request item) override final {
+ queue.enqueue_front(get_osd_op_type(item), priority, cost, item);
+ }
+
+ // Returns if the queue is empty
+ inline bool empty() const override final {
+ return queue.empty();
+ }
+
+ // Return an op to be dispatch
+ inline Request dequeue() override final {
+ return queue.dequeue();
+ }
+
+ // Formatted output of the queue
+ void dump(ceph::Formatter *f) const override final;
+
+ protected:
+
+ struct pg_queueable_visitor_t : public boost::static_visitor<osd_op_type_t> {
+ osd_op_type_t operator()(const OpRequestRef& o) const {
+ // don't know if it's a client_op or a
+ return osd_op_type_t::client_op;
+ }
+
+ osd_op_type_t operator()(const PGSnapTrim& o) const {
+ return osd_op_type_t::bg_snaptrim;
+ }
+
+ osd_op_type_t operator()(const PGScrub& o) const {
+ return osd_op_type_t::bg_scrub;
+ }
+
+ osd_op_type_t operator()(const PGRecovery& o) const {
+ return osd_op_type_t::bg_recovery;
+ }
+ }; // class pg_queueable_visitor_t
+
+ static pg_queueable_visitor_t pg_queueable_visitor;
+
+ osd_op_type_t get_osd_op_type(const Request& request);
+ }; // class mClockOpClassAdapter
+
+} // namespace ceph
add_ceph_unittest(unittest_prioritized_queue ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_prioritized_queue)
target_link_libraries(unittest_prioritized_queue global ${BLKID_LIBRARIES})
+# unittest_mclock_priority_queue
+add_executable(unittest_mclock_priority_queue EXCLUDE_FROM_ALL
+ test_mclock_priority_queue.cc
+ )
+add_ceph_unittest(unittest_mclock_priority_queue ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_priority_queue)
+target_link_libraries(unittest_mclock_priority_queue global ${BLKID_LIBRARIES} dmclock)
+
# unittest_str_map
add_executable(unittest_str_map
test_str_map.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <thread>
+#include <chrono>
+#include <iostream>
+#include "gtest/gtest.h"
+#include "common/mClockPriorityQueue.h"
+
+
+struct Request {
+ int value;
+ Request() = default;
+ Request(const Request& o) = default;
+ Request(int value) :
+ value(value)
+ {}
+};
+
+
+struct Client {
+ int client_num;
+ Client() :
+ Client(-1)
+ {}
+ Client(int client_num) :
+ client_num(client_num)
+ {}
+ friend bool operator<(const Client& r1, const Client& r2) {
+ return r1.client_num < r2.client_num;
+ }
+ friend bool operator==(const Client& r1, const Client& r2) {
+ return r1.client_num == r2.client_num;
+ }
+};
+
+
+crimson::dmclock::ClientInfo client_info_func(const Client& c) {
+ static const crimson::dmclock::ClientInfo
+ the_info(10.0, 10.0, 10.0);
+ return the_info;
+}
+
+
+TEST(mClockPriorityQueue, Create)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+}
+
+
+TEST(mClockPriorityQueue, Sizes)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(0u, q.length());
+
+ Client c1(1);
+ Client c2(2);
+
+ q.enqueue_strict(c1, 1, Request(1));
+ q.enqueue_strict(c2, 2, Request(2));
+ q.enqueue_strict(c1, 2, Request(3));
+ q.enqueue(c2, 1, 0, Request(4));
+ q.enqueue(c1, 2, 0, Request(5));
+ q.enqueue_strict(c2, 1, Request(6));
+
+ ASSERT_FALSE(q.empty());
+ ASSERT_EQ(6u, q.length());
+
+
+ for (int i = 0; i < 6; ++i) {
+ (void) q.dequeue();
+ }
+
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(0u, q.length());
+}
+
+
+TEST(mClockPriorityQueue, JustStrict)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+
+ Client c1(1);
+ Client c2(2);
+
+ q.enqueue_strict(c1, 1, Request(1));
+ q.enqueue_strict(c2, 2, Request(2));
+ q.enqueue_strict(c1, 2, Request(3));
+ q.enqueue_strict(c2, 1, Request(4));
+
+ Request r;
+
+ r = q.dequeue();
+ ASSERT_EQ(2, r.value);
+ r = q.dequeue();
+ ASSERT_EQ(3, r.value);
+ r = q.dequeue();
+ ASSERT_EQ(1, r.value);
+ r = q.dequeue();
+ ASSERT_EQ(4, r.value);
+}
+
+
+TEST(mClockPriorityQueue, StrictPriorities)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+
+ Client c1(1);
+ Client c2(2);
+
+ q.enqueue_strict(c1, 1, Request(1));
+ q.enqueue_strict(c2, 2, Request(2));
+ q.enqueue_strict(c1, 3, Request(3));
+ q.enqueue_strict(c2, 4, Request(4));
+
+ Request r;
+
+ r = q.dequeue();
+ ASSERT_EQ(4, r.value);
+ r = q.dequeue();
+ ASSERT_EQ(3, r.value);
+ r = q.dequeue();
+ ASSERT_EQ(2, r.value);
+ r = q.dequeue();
+ ASSERT_EQ(1, r.value);
+}
+
+
+TEST(mClockPriorityQueue, JustNotStrict)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+
+ Client c1(1);
+ Client c2(2);
+
+ // non-strict queue ignores priorites, but will divide between
+ // clients evenly and maintain orders between clients
+ q.enqueue(c1, 1, 0, Request(1));
+ q.enqueue(c1, 2, 0, Request(2));
+ q.enqueue(c2, 3, 0, Request(3));
+ q.enqueue(c2, 4, 0, Request(4));
+
+ Request r1, r2;
+
+ r1 = q.dequeue();
+ ASSERT_TRUE(1 == r1.value || 3 == r1.value);
+
+ r2 = q.dequeue();
+ ASSERT_TRUE(1 == r2.value || 3 == r2.value);
+
+ ASSERT_NE(r1.value, r2.value);
+
+ r1 = q.dequeue();
+ ASSERT_TRUE(2 == r1.value || 4 == r1.value);
+
+ r2 = q.dequeue();
+ ASSERT_TRUE(2 == r2.value || 4 == r2.value);
+
+ ASSERT_NE(r1.value, r2.value);
+}
+
+
+TEST(mClockPriorityQueue, EnqueuFront)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+
+ Client c1(1);
+ Client c2(2);
+
+ // non-strict queue ignores priorites, but will divide between
+ // clients evenly and maintain orders between clients
+ q.enqueue(c1, 1, 0, Request(1));
+ q.enqueue(c1, 2, 0, Request(2));
+ q.enqueue(c2, 3, 0, Request(3));
+ q.enqueue(c2, 4, 0, Request(4));
+ q.enqueue_strict(c2, 6, Request(6));
+ q.enqueue_strict(c1, 7, Request(7));
+
+ std::list<Request> reqs;
+
+ for (uint i = 0; i < 4; ++i) {
+ reqs.emplace_back(q.dequeue());
+ }
+
+ for (uint i = 0; i < 4; ++i) {
+ Request& r = reqs.front();
+ if (r.value > 5) {
+ q.enqueue_strict_front(r.value == 6 ? c2 : 1, r.value, r);
+ } else {
+ q.enqueue_front(r.value <= 2 ? c1 : c2, r.value, 0, r);
+ }
+ reqs.pop_front();
+ }
+
+ Request r;
+
+ r = q.dequeue();
+ ASSERT_EQ(7, r.value);
+
+ r = q.dequeue();
+ ASSERT_EQ(6, r.value);
+
+ r = q.dequeue();
+ ASSERT_TRUE(1 == r.value || 3 == r.value);
+
+ r = q.dequeue();
+ ASSERT_TRUE(1 == r.value || 3 == r.value);
+
+ r = q.dequeue();
+ ASSERT_TRUE(2 == r.value || 4 == r.value);
+
+ r = q.dequeue();
+ ASSERT_TRUE(2 == r.value || 4 == r.value);
+}
+
+
+TEST(mClockPriorityQueue, RemoveByClass)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+
+ Client c1(1);
+ Client c2(2);
+ Client c3(3);
+
+ q.enqueue(c1, 1, 0, Request(1));
+ q.enqueue(c2, 1, 0, Request(2));
+ q.enqueue(c3, 1, 0, Request(4));
+ q.enqueue_strict(c1, 2, Request(8));
+ q.enqueue_strict(c2, 1, Request(16));
+ q.enqueue_strict(c3, 3, Request(32));
+ q.enqueue(c3, 1, 0, Request(64));
+ q.enqueue(c2, 1, 0, Request(128));
+ q.enqueue(c1, 1, 0, Request(256));
+
+ int out_mask = 2 | 16 | 128;
+ int in_mask = 1 | 8 | 256;
+
+ std::list<Request> out;
+ q.remove_by_class(c2, &out);
+
+ ASSERT_EQ(3u, out.size());
+ while (!out.empty()) {
+ ASSERT_TRUE((out.front().value & out_mask) > 0) <<
+ "had value that was not expected after first removal";
+ out.pop_front();
+ }
+
+ ASSERT_EQ(6u, q.length()) << "after removal of three from client c2";
+
+ q.remove_by_class(c3);
+
+ ASSERT_EQ(3u, q.length()) << "after removal of three from client c3";
+ while (!q.empty()) {
+ Request r = q.dequeue();
+ ASSERT_TRUE((r.value & in_mask) > 0) <<
+ "had value that was not expected after two removals";
+ }
+}
+
+
+TEST(mClockPriorityQueue, RemoveByFilter)
+{
+ ceph::mClockQueue<Request,Client> q(&client_info_func);
+
+ Client c1(1);
+ Client c2(2);
+ Client c3(3);
+
+ q.enqueue(c1, 1, 0, Request(1));
+ q.enqueue(c2, 1, 0, Request(2));
+ q.enqueue(c3, 1, 0, Request(3));
+ q.enqueue_strict(c1, 2, Request(4));
+ q.enqueue_strict(c2, 1, Request(5));
+ q.enqueue_strict(c3, 3, Request(6));
+ q.enqueue(c3, 1, 0, Request(7));
+ q.enqueue(c2, 1, 0, Request(8));
+ q.enqueue(c1, 1, 0, Request(9));
+
+ std::list<Request> filtered;
+
+ q.remove_by_filter([&](const Request& r) -> bool {
+ if (r.value & 2) {
+ filtered.push_back(r);
+ return true;
+ } else {
+ return false;
+ }
+ });
+
+ ASSERT_EQ(4u, filtered.size()) <<
+ "filter should have removed four elements";
+ while (!filtered.empty()) {
+ ASSERT_TRUE((filtered.front().value & 2) > 0) <<
+ "expect this value to have been filtered out";
+ filtered.pop_front();
+ }
+
+ ASSERT_EQ(5u, q.length()) <<
+ "filter should have left five remaining elements";
+ while (!q.empty()) {
+ Request r = q.dequeue();
+ ASSERT_TRUE((r.value & 2) == 0) <<
+ "expect this value to have been left in";
+ }
+}
}
}
-template <typename T>
-struct Greater {
- const T rhs;
- std::list<T> *removed;
- explicit Greater(const T& v, std::list<T> *removed) : rhs(v), removed(removed)
- {}
- bool operator()(const T& lhs) {
- if (lhs > rhs) {
- if (removed)
- removed->push_back(lhs);
- return true;
- } else {
- return false;
- }
- }
-};
-
-TEST_F(PrioritizedQueueTest, remove_by_filter) {
- const unsigned min_cost = 1;
- const unsigned max_tokens_per_subqueue = 50;
- PQ pq(max_tokens_per_subqueue, min_cost);
-
- Greater<Item> pred(item_size/2, nullptr);
- unsigned num_to_remove = 0;
- for (unsigned i = 0; i < item_size; i++) {
- const Item& item = items[i];
- pq.enqueue(Klass(1), 0, 10, item);
- if (pred(item)) {
- num_to_remove++;
- }
- }
- std::list<Item> removed;
- Greater<Item> pred2(item_size/2, &removed);
- pq.remove_by_filter(pred2);
-
- // see if the removed items are expected ones.
- for (std::list<Item>::iterator it = removed.begin();
- it != removed.end();
- ++it) {
- const Item& item = *it;
- EXPECT_TRUE(pred(item));
- items.erase(remove(items.begin(), items.end(), item), items.end());
- }
- EXPECT_EQ(num_to_remove, removed.size());
- EXPECT_EQ(item_size - num_to_remove, pq.length());
- EXPECT_EQ(item_size - num_to_remove, items.size());
- // see if the remainder are expeceted also.
- while (!pq.empty()) {
- const Item item = pq.dequeue();
- EXPECT_FALSE(pred(item));
- items.erase(remove(items.begin(), items.end(), item), items.end());
- }
- EXPECT_TRUE(items.empty());
-}
TEST_F(PrioritizedQueueTest, remove_by_class) {
const unsigned min_cost = 1;
test_queue(rand() % 500 + 500, true);
}
-template <typename T>
-struct Greater {
- const T rhs;
- std::list<T> *removed;
- Greater(const T &v, std::list<T> *removed) : rhs(v), removed(removed) {}
- bool operator()(const T &lhs) {
- if (std::get<2>(lhs) > std::get<2>(rhs)) {
- if (removed)
- removed->push_back(lhs);
- return true;
- } else {
- return false;
- }
- }
-};
-
-TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_filter_null) {
- WQ wq(0, 0);
- LQ strictq, normq;
- unsigned num_items = 100;
- fill_queue(wq, strictq, normq, num_items);
- // Pick a value that we didn't enqueue
- Removed wq_removed;
- Greater<Item> pred(std::make_tuple(0, 0, 1 << 17), &wq_removed);
- wq.remove_by_filter(pred);
- EXPECT_EQ(0u, wq_removed.size());
-}
-
-TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_filter) {
- WQ wq(0, 0);
- LQ strictq, normq;
- unsigned num_items = 1000;
- fill_queue(wq, strictq, normq, num_items);
- Greater<Item> pred2(std::make_tuple(0, 0, (1 << 16) - (1 << 16)/10), nullptr);
- Removed r_strictq, r_normq;
- unsigned num_to_remove = 0;
- // Figure out from what has been queued what we
- // expect to be removed
- for (LQ::iterator pi = strictq.begin();
- pi != strictq.end(); ++pi) {
- for (KlassItem::iterator ki = pi->second.begin();
- ki != pi->second.end(); ++ki) {
- for (ItemList::iterator li = ki->second.begin();
- li != ki->second.end(); ++li) {
- if (pred2(li->second)) {
- ++num_to_remove;
- }
- }
- }
- }
- for (LQ::iterator pi = normq.begin();
- pi != normq.end(); ++pi) {
- for (KlassItem::iterator ki = pi->second.begin();
- ki != pi->second.end(); ++ki) {
- for (ItemList::iterator li = ki->second.begin();
- li != ki->second.end(); ++li) {
- if (pred2(li->second)) {
- ++num_to_remove;
- }
- }
- }
- }
- Removed wq_removed;
- Greater<Item> pred(
- std::make_tuple(0, 0, (1 << 16) - (1 << 16)/10),
- &wq_removed);
- wq.remove_by_filter(pred);
- // Check that what was removed was correct
- for (Removed::iterator it = wq_removed.begin();
- it != wq_removed.end(); ++it) {
- EXPECT_TRUE(pred2(*it));
- }
- EXPECT_EQ(num_to_remove, wq_removed.size());
- EXPECT_EQ(num_items - num_to_remove, wq.length());
- // Make sure that none were missed
- while (!(wq.empty())) {
- EXPECT_FALSE(pred(wq.dequeue()));
- }
-}
-
TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_class_null) {
WQ wq(0, 0);
LQ strictq, normq;
)
add_ceph_unittest(unittest_ec_transaction ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_ec_transaction)
target_link_libraries(unittest_ec_transaction osd global ${BLKID_LIBRARIES})
+
+# unittest_mclock_op_class_queue
+add_executable(unittest_mclock_op_class_queue
+ TestMClockOpClassQueue.cc
+)
+add_ceph_unittest(unittest_mclock_op_class_queue
+ ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_op_class_queue
+)
+target_link_libraries(unittest_mclock_op_class_queue
+ global osd dmclock
+)
+
+# unittest_mclock_client_queue
+add_executable(unittest_mclock_client_queue
+ TestMClockClientQueue.cc
+)
+add_ceph_unittest(unittest_mclock_client_queue
+ ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_client_queue
+)
+target_link_libraries(unittest_mclock_client_queue
+ global osd dmclock
+)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <iostream>
+
+#include "gtest/gtest.h"
+#include "global/global_init.h"
+#include "common/common_init.h"
+
+#include "osd/mClockClientQueue.h"
+
+
+int main(int argc, char **argv) {
+ std::vector<const char*> args(argv, argv+argc);
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+
+class MClockClientQueueTest : public testing::Test {
+public:
+ mClockClientQueue q;
+
+ entity_inst_t client1;
+ entity_inst_t client2;
+ entity_inst_t client3;
+
+ MClockClientQueueTest() :
+ q(g_ceph_context),
+ client1(entity_name_t(CEPH_ENTITY_TYPE_OSD, 1), entity_addr_t()),
+ client2(entity_name_t(CEPH_ENTITY_TYPE_OSD, 2), entity_addr_t()),
+ client3(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, 1), entity_addr_t())
+ {}
+
+#if 0 // more work needed here
+ Request create_client_op(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(), PGQueueable(OpRequestRef(), e));
+ }
+#endif
+
+ Request create_snaptrim(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(),
+ PGQueueable(PGSnapTrim(e),
+ 12, 12,
+ utime_t(), owner, e));
+ }
+
+ Request create_scrub(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(),
+ PGQueueable(PGScrub(e),
+ 12, 12,
+ utime_t(), owner, e));
+ }
+
+ Request create_recovery(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(),
+ PGQueueable(PGRecovery(e, 64),
+ 12, 12,
+ utime_t(), owner, e));
+ }
+};
+
+
+TEST_F(MClockClientQueueTest, TestSize) {
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(0u, q.length());
+
+ q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
+ q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
+ q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
+ q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
+ q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
+
+ ASSERT_FALSE(q.empty());
+ ASSERT_EQ(5u, q.length());
+
+ std::list<Request> reqs;
+
+ reqs.push_back(q.dequeue());
+ reqs.push_back(q.dequeue());
+ reqs.push_back(q.dequeue());
+
+ ASSERT_FALSE(q.empty());
+ ASSERT_EQ(2u, q.length());
+
+ q.enqueue_front(client2, 12, 0, reqs.back());
+ reqs.pop_back();
+
+ q.enqueue_strict_front(client3, 12, reqs.back());
+ reqs.pop_back();
+
+ q.enqueue_strict_front(client2, 12, reqs.back());
+ reqs.pop_back();
+
+ ASSERT_FALSE(q.empty());
+ ASSERT_EQ(5u, q.length());
+
+ for (int i = 0; i < 5; ++i) {
+ (void) q.dequeue();
+ }
+
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(0u, q.length());
+}
+
+
+TEST_F(MClockClientQueueTest, TestEnqueue) {
+ q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
+ q.enqueue(client2, 12, 0, create_snaptrim(101, client2));
+ q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
+ q.enqueue(client3, 12, 0, create_snaptrim(103, client3));
+ q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
+
+ Request r = q.dequeue();
+ ASSERT_EQ(100u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(101u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(103u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_TRUE(r.second.get_map_epoch() == 102u ||
+ r.second.get_map_epoch() == 104u);
+
+ r = q.dequeue();
+ ASSERT_TRUE(r.second.get_map_epoch() == 102u ||
+ r.second.get_map_epoch() == 104u);
+}
+
+
+TEST_F(MClockClientQueueTest, TestEnqueueStrict) {
+ q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
+ q.enqueue_strict(client2, 13, create_snaptrim(101, client2));
+ q.enqueue_strict(client2, 16, create_snaptrim(102, client2));
+ q.enqueue_strict(client3, 14, create_snaptrim(103, client3));
+ q.enqueue_strict(client1, 15, create_snaptrim(104, client1));
+
+ Request r = q.dequeue();
+ ASSERT_EQ(102u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(103u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(101u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(100u, r.second.get_map_epoch());
+}
+
+
+TEST_F(MClockClientQueueTest, TestRemoveByClass) {
+ q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
+ q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
+ q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
+ q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
+ q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
+
+ std::list<Request> filtered_out;
+ q.remove_by_class(client2, &filtered_out);
+
+ ASSERT_EQ(2u, filtered_out.size());
+ while (!filtered_out.empty()) {
+ auto e = filtered_out.front().second.get_map_epoch() ;
+ ASSERT_TRUE(e == 101 || e == 102);
+ filtered_out.pop_front();
+ }
+
+ ASSERT_EQ(3u, q.length());
+ Request r = q.dequeue();
+ ASSERT_EQ(103u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(100u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.second.get_map_epoch());
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <iostream>
+
+#include "gtest/gtest.h"
+
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include "common/common_init.h"
+
+#include "osd/mClockOpClassQueue.h"
+
+
+int main(int argc, char **argv) {
+ std::vector<const char*> args(argv, argv+argc);
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+
+class MClockOpClassQueueTest : public testing::Test {
+public:
+ mClockOpClassQueue q;
+
+ entity_inst_t client1;
+ entity_inst_t client2;
+ entity_inst_t client3;
+
+ MClockOpClassQueueTest() :
+ q(g_ceph_context),
+ client1(entity_name_t(CEPH_ENTITY_TYPE_OSD, 1), entity_addr_t()),
+ client2(entity_name_t(CEPH_ENTITY_TYPE_OSD, 2), entity_addr_t()),
+ client3(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, 1), entity_addr_t())
+ {}
+
+#if 0 // more work needed here
+ Request create_client_op(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(), PGQueueable(OpRequestRef(), e));
+ }
+#endif
+
+ Request create_snaptrim(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(),
+ PGQueueable(PGSnapTrim(e),
+ 12, 12,
+ utime_t(), owner, e));
+ }
+
+ Request create_scrub(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(),
+ PGQueueable(PGScrub(e),
+ 12, 12,
+ utime_t(), owner, e));
+ }
+
+ Request create_recovery(epoch_t e, const entity_inst_t& owner) {
+ return Request(spg_t(),
+ PGQueueable(PGRecovery(e, 64),
+ 12, 12,
+ utime_t(), owner, e));
+ }
+};
+
+
+TEST_F(MClockOpClassQueueTest, TestSize) {
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(0u, q.length());
+
+ q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
+ q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
+ q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
+ q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
+ q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
+
+ ASSERT_FALSE(q.empty());
+ ASSERT_EQ(5u, q.length());
+
+ std::list<Request> reqs;
+
+ reqs.push_back(q.dequeue());
+ reqs.push_back(q.dequeue());
+ reqs.push_back(q.dequeue());
+
+ ASSERT_FALSE(q.empty());
+ ASSERT_EQ(2u, q.length());
+
+ q.enqueue_front(client2, 12, 0, reqs.back());
+ reqs.pop_back();
+
+ q.enqueue_strict_front(client3, 12, reqs.back());
+ reqs.pop_back();
+
+ q.enqueue_strict_front(client2, 12, reqs.back());
+ reqs.pop_back();
+
+ ASSERT_FALSE(q.empty());
+ ASSERT_EQ(5u, q.length());
+
+ for (int i = 0; i < 5; ++i) {
+ (void) q.dequeue();
+ }
+
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(0u, q.length());
+}
+
+
+TEST_F(MClockOpClassQueueTest, TestEnqueue) {
+ q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
+ q.enqueue(client2, 12, 0, create_snaptrim(101, client2));
+ q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
+ q.enqueue(client3, 12, 0, create_snaptrim(103, client3));
+ q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
+
+ Request r = q.dequeue();
+ ASSERT_EQ(100u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(101u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(102u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(103u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.second.get_map_epoch());
+}
+
+
+TEST_F(MClockOpClassQueueTest, TestEnqueueStrict) {
+ q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
+ q.enqueue_strict(client2, 13, create_snaptrim(101, client2));
+ q.enqueue_strict(client2, 16, create_snaptrim(102, client2));
+ q.enqueue_strict(client3, 14, create_snaptrim(103, client3));
+ q.enqueue_strict(client1, 15, create_snaptrim(104, client1));
+
+ Request r = q.dequeue();
+ ASSERT_EQ(102u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(103u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(101u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(100u, r.second.get_map_epoch());
+}
+
+
+TEST_F(MClockOpClassQueueTest, TestRemoveByClass) {
+ q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
+ q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
+ q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
+ q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
+ q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
+
+ std::list<Request> filtered_out;
+ q.remove_by_class(client2, &filtered_out);
+
+ ASSERT_EQ(2u, filtered_out.size());
+ while (!filtered_out.empty()) {
+ auto e = filtered_out.front().second.get_map_epoch() ;
+ ASSERT_TRUE(e == 101 || e == 102);
+ filtered_out.pop_front();
+ }
+
+ ASSERT_EQ(3u, q.length());
+ Request r = q.dequeue();
+ ASSERT_EQ(103u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(100u, r.second.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.second.get_map_epoch());
+}