From 33ea286d0cedf44b7b3c70bc9f35c7f88fcc0361 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 19 Sep 2017 15:25:05 -0500 Subject: [PATCH] common/AsyncReserver: support preemption If an (optional) preemption context is provided, use that to preempt and existing reservation and grant a higher-priority one. Signed-off-by: Sage Weil (cherry picked from commit dbc002eaa90e952df1acf295a630443ac3ada418) --- src/common/AsyncReserver.h | 157 ++++++++++++++++++++++++++++--------- src/common/subsys.h | 1 + src/vstart.sh | 1 + 3 files changed, 124 insertions(+), 35 deletions(-) diff --git a/src/common/AsyncReserver.h b/src/common/AsyncReserver.h index 49a7dc48ddf26..d5c7a852ddf14 100644 --- a/src/common/AsyncReserver.h +++ b/src/common/AsyncReserver.h @@ -18,6 +18,8 @@ #include "common/Finisher.h" #include "common/Formatter.h" +#define rdout(x) lgeneric_subdout(cct,reserver,x) + /** * Manages a configurable number of asyncronous reservations. * @@ -33,24 +35,87 @@ class AsyncReserver { unsigned min_priority; Mutex lock; - map > > queues; - map >::iterator > > queue_pointers; - set in_progress; + struct Reservation { + T item; + unsigned prio = 0; + Context *grant = 0; + Context *preempt = 0; + Reservation() {} + Reservation(T i, unsigned pr, Context *g, Context *p = 0) + : item(i), prio(pr), grant(g), preempt(p) {} + void dump(Formatter *f) const { + f->dump_stream("item") << item; + f->dump_unsigned("prio", prio); + f->dump_bool("can_preempt", !!preempt); + } + friend ostream& operator<<(ostream& out, const Reservation& r) { + return out << r.item << "(prio " << r.prio << " grant " << r.grant + << " preempt " << r.preempt << ")"; + } + }; + + map> queues; + map::iterator>> queue_pointers; + map in_progress; + set> preempt_by_prio; ///< in_progress that can be preempted + + void preempt_one() { + assert(!preempt_by_prio.empty()); + auto q = in_progress.find(preempt_by_prio.begin()->second); + assert(q != in_progress.end()); + Reservation victim = q->second; + rdout(10) << __func__ << " preempt " << victim << dendl; + f->queue(victim.preempt); + victim.preempt = nullptr; + in_progress.erase(q); + preempt_by_prio.erase(preempt_by_prio.begin()); + } void do_queues() { - typename map > >::reverse_iterator it; - for (it = queues.rbegin(); - it != queues.rend() && - in_progress.size() < max_allowed && - it->first >= min_priority; - ++it) { - while (in_progress.size() < max_allowed && - !it->second.empty()) { - pair p = it->second.front(); - queue_pointers.erase(p.first); - it->second.pop_front(); - f->queue(p.second); - in_progress.insert(p.first); + rdout(20) << __func__ << ":\n"; + JSONFormatter jf(true); + jf.open_object_section("queue"); + _dump(&jf); + jf.close_section(); + jf.flush(*_dout); + *_dout << dendl; + + // in case min_priority was adjusted up or max_allowed was adjusted down + while (!preempt_by_prio.empty() && + (in_progress.size() > max_allowed || + preempt_by_prio.begin()->first < min_priority)) { + preempt_one(); + } + + while (!queues.empty()) { + // choose highest priority queue + auto it = queues.end(); + --it; + assert(!it->second.empty()); + if (it->first < min_priority) { + break; + } + if (in_progress.size() >= max_allowed && + !preempt_by_prio.empty() && + it->first > preempt_by_prio.begin()->first) { + preempt_one(); + } + if (in_progress.size() >= max_allowed) { + break; // no room + } + // grant + Reservation p = it->second.front(); + rdout(10) << __func__ << " grant " << p << dendl; + queue_pointers.erase(p.item); + it->second.pop_front(); + if (it->second.empty()) { + queues.erase(it); + } + f->queue(p.grant); + p.grant = nullptr; + in_progress[p.item] = p; + if (p.preempt) { + preempt_by_prio.insert(make_pair(p.prio, p.item)); } } } @@ -80,27 +145,26 @@ public: void dump(Formatter *f) { Mutex::Locker l(lock); + _dump(f); + } + void _dump(Formatter *f) { f->dump_unsigned("max_allowed", max_allowed); f->dump_unsigned("min_priority", min_priority); f->open_array_section("queues"); - for (typename map > > ::const_iterator p = - queues.begin(); p != queues.end(); ++p) { + for (auto& p : queues) { f->open_object_section("queue"); - f->dump_unsigned("priority", p->first); + f->dump_unsigned("priority", p.first); f->open_array_section("items"); - for (typename list >::const_iterator q = - p->second.begin(); q != p->second.end(); ++q) { - f->dump_stream("item") << q->first; + for (auto& q : p.second) { + f->dump_object("item", q); } f->close_section(); f->close_section(); } f->close_section(); f->open_array_section("in_progress"); - for (typename set::const_iterator p = in_progress.begin(); - p != in_progress.end(); - ++p) { - f->dump_stream("item") << *p; + for (auto& p : in_progress) { + f->dump_object("item", p.second); } f->close_section(); } @@ -116,13 +180,17 @@ public: void request_reservation( T item, ///< [in] reservation key Context *on_reserved, ///< [in] callback to be called on reservation - unsigned prio + unsigned prio, ///< [in] priority + Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional) ) { Mutex::Locker l(lock); + Reservation r(item, prio, on_reserved, on_preempt); + rdout(10) << __func__ << " queue " << r << dendl; assert(!queue_pointers.count(item) && !in_progress.count(item)); - queues[prio].push_back(make_pair(item, on_reserved)); - queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end()))); + queues[prio].push_back(r); + queue_pointers.insert(make_pair(item, + make_pair(prio,--(queues[prio]).end()))); do_queues(); } @@ -137,13 +205,31 @@ public: T item ///< [in] key for reservation to cancel ) { Mutex::Locker l(lock); - if (queue_pointers.count(item)) { - unsigned prio = queue_pointers[item].first; - delete queue_pointers[item].second->second; - queues[prio].erase(queue_pointers[item].second); - queue_pointers.erase(item); + auto i = queue_pointers.find(item); + if (i != queue_pointers.end()) { + unsigned prio = i->second.first; + const Reservation& r = *i->second.second; + rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl; + delete r.grant; + delete r.preempt; + queues[prio].erase(i->second.second); + if (queues[prio].empty()) { + queues.erase(prio); + } + queue_pointers.erase(i); } else { - in_progress.erase(item); + auto p = in_progress.find(item); + if (p != in_progress.end()) { + rdout(10) << __func__ << " cancel " << p->second + << " (was in progress)" << dendl; + if (p->second.preempt) { + preempt_by_prio.erase(make_pair(p->second.prio, p->second.item)); + delete p->second.preempt; + } + in_progress.erase(p); + } else { + rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl; + } } do_queues(); } @@ -160,4 +246,5 @@ public: static const unsigned MAX_PRIORITY = (unsigned)-1; }; +#undef rdout #endif diff --git a/src/common/subsys.h b/src/common/subsys.h index 105ca99b7e969..6e6c26fa78f0b 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -53,6 +53,7 @@ SUBSYS(tp, 0, 5) SUBSYS(auth, 1, 5) SUBSYS(crypto, 1, 5) SUBSYS(finisher, 1, 1) +SUBSYS(reserver, 1, 1) SUBSYS(heartbeatmap, 1, 5) SUBSYS(perfcounter, 1, 5) SUBSYS(rgw, 1, 5) // log level for the Rados gateway diff --git a/src/vstart.sh b/src/vstart.sh index 82ec76bf94ee4..4269e55da0073 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -776,6 +776,7 @@ else debug rocksdb = 10 debug bdev = 20 debug rgw = 20 + debug reserver = 10 debug objclass = 20' CMDSDEBUG=' debug ms = 1 -- 2.39.5