]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/AsyncReserver: support preemption
authorSage Weil <sage@redhat.com>
Tue, 19 Sep 2017 20:25:05 +0000 (15:25 -0500)
committerSage Weil <sage@redhat.com>
Fri, 29 Sep 2017 16:53:59 +0000 (11:53 -0500)
If an (optional) preemption context is provided, use that to preempt
and existing reservation and grant a higher-priority one.

Signed-off-by: Sage Weil <sage@redhat.com>
(cherry picked from commit dbc002eaa90e952df1acf295a630443ac3ada418)

src/common/AsyncReserver.h
src/common/subsys.h
src/vstart.sh

index 49a7dc48ddf26d21a690962561d63f36aa031f96..d5c7a852ddf145f682d92c1ce0a442264e615563 100644 (file)
@@ -18,6 +18,8 @@
 #include "common/Finisher.h"
 #include "common/Formatter.h"
 
+#define rdout(x) lgeneric_subdout(cct,reserver,x)
+
 /**
  * Manages a configurable number of asyncronous reservations.
  *
@@ -33,24 +35,87 @@ class AsyncReserver {
   unsigned min_priority;
   Mutex lock;
 
-  map<unsigned, list<pair<T, Context*> > > queues;
-  map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers;
-  set<T> in_progress;
+  struct Reservation {
+    T item;
+    unsigned prio = 0;
+    Context *grant = 0;
+    Context *preempt = 0;
+    Reservation() {}
+    Reservation(T i, unsigned pr, Context *g, Context *p = 0)
+      : item(i), prio(pr), grant(g), preempt(p) {}
+    void dump(Formatter *f) const {
+      f->dump_stream("item") << item;
+      f->dump_unsigned("prio", prio);
+      f->dump_bool("can_preempt", !!preempt);
+    }
+    friend ostream& operator<<(ostream& out, const Reservation& r) {
+      return out << r.item << "(prio " << r.prio << " grant " << r.grant
+                << " preempt " << r.preempt << ")";
+    }
+  };
+
+  map<unsigned, list<Reservation>> queues;
+  map<T, pair<unsigned, typename list<Reservation>::iterator>> queue_pointers;
+  map<T,Reservation> in_progress;
+  set<pair<unsigned,T>> preempt_by_prio;  ///< in_progress that can be preempted
+
+  void preempt_one() {
+    assert(!preempt_by_prio.empty());
+    auto q = in_progress.find(preempt_by_prio.begin()->second);
+    assert(q != in_progress.end());
+    Reservation victim = q->second;
+    rdout(10) << __func__ << " preempt " << victim << dendl;
+    f->queue(victim.preempt);
+    victim.preempt = nullptr;
+    in_progress.erase(q);
+    preempt_by_prio.erase(preempt_by_prio.begin());
+  }
 
   void do_queues() {
-    typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it;
-    for (it = queues.rbegin();
-         it != queues.rend() &&
-          in_progress.size() < max_allowed &&
-          it->first >= min_priority;
-         ++it) {
-      while (in_progress.size() < max_allowed &&
-             !it->second.empty()) {
-        pair<T, Context*> p = it->second.front();
-        queue_pointers.erase(p.first);
-        it->second.pop_front();
-        f->queue(p.second);
-        in_progress.insert(p.first);
+    rdout(20) << __func__ << ":\n";
+    JSONFormatter jf(true);
+    jf.open_object_section("queue");
+    _dump(&jf);
+    jf.close_section();
+    jf.flush(*_dout);
+    *_dout << dendl;
+
+    // in case min_priority was adjusted up or max_allowed was adjusted down
+    while (!preempt_by_prio.empty() &&
+          (in_progress.size() > max_allowed ||
+           preempt_by_prio.begin()->first < min_priority)) {
+      preempt_one();
+    }
+
+    while (!queues.empty()) {
+      // choose highest priority queue
+      auto it = queues.end();
+      --it;
+      assert(!it->second.empty());
+      if (it->first < min_priority) {
+       break;
+      }
+      if (in_progress.size() >= max_allowed &&
+         !preempt_by_prio.empty() &&
+         it->first > preempt_by_prio.begin()->first) {
+       preempt_one();
+      }
+      if (in_progress.size() >= max_allowed) {
+       break; // no room
+      }
+      // grant
+      Reservation p = it->second.front();
+      rdout(10) << __func__ << " grant " << p << dendl;
+      queue_pointers.erase(p.item);
+      it->second.pop_front();
+      if (it->second.empty()) {
+       queues.erase(it);
+      }
+      f->queue(p.grant);
+      p.grant = nullptr;
+      in_progress[p.item] = p;
+      if (p.preempt) {
+       preempt_by_prio.insert(make_pair(p.prio, p.item));
       }
     }
   }
@@ -80,27 +145,26 @@ public:
 
   void dump(Formatter *f) {
     Mutex::Locker l(lock);
+    _dump(f);
+  }
+  void _dump(Formatter *f) {
     f->dump_unsigned("max_allowed", max_allowed);
     f->dump_unsigned("min_priority", min_priority);
     f->open_array_section("queues");
-    for (typename map<unsigned, list<pair<T, Context*> > > ::const_iterator p =
-          queues.begin(); p != queues.end(); ++p) {
+    for (auto& p : queues) {
       f->open_object_section("queue");
-      f->dump_unsigned("priority", p->first);
+      f->dump_unsigned("priority", p.first);
       f->open_array_section("items");
-      for (typename list<pair<T, Context*> >::const_iterator q =
-            p->second.begin(); q != p->second.end(); ++q) {
-       f->dump_stream("item") << q->first;
+      for (auto& q : p.second) {
+       f->dump_object("item", q);
       }
       f->close_section();
       f->close_section();
     }
     f->close_section();
     f->open_array_section("in_progress");
-    for (typename set<T>::const_iterator p = in_progress.begin();
-        p != in_progress.end();
-        ++p) {
-      f->dump_stream("item") << *p;
+    for (auto& p : in_progress) {
+      f->dump_object("item", p.second);
     }
     f->close_section();
   }
@@ -116,13 +180,17 @@ public:
   void request_reservation(
     T item,                   ///< [in] reservation key
     Context *on_reserved,     ///< [in] callback to be called on reservation
-    unsigned prio
+    unsigned prio,            ///< [in] priority
+    Context *on_preempt = 0   ///< [in] callback to be called if we are preempted (optional)
     ) {
     Mutex::Locker l(lock);
+    Reservation r(item, prio, on_reserved, on_preempt);
+    rdout(10) << __func__ << " queue " << r << dendl;
     assert(!queue_pointers.count(item) &&
           !in_progress.count(item));
-    queues[prio].push_back(make_pair(item, on_reserved));
-    queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end())));
+    queues[prio].push_back(r);
+    queue_pointers.insert(make_pair(item,
+                                   make_pair(prio,--(queues[prio]).end())));
     do_queues();
   }
 
@@ -137,13 +205,31 @@ public:
     T item                   ///< [in] key for reservation to cancel
     ) {
     Mutex::Locker l(lock);
-    if (queue_pointers.count(item)) {
-      unsigned prio = queue_pointers[item].first;
-      delete queue_pointers[item].second->second;
-      queues[prio].erase(queue_pointers[item].second);
-      queue_pointers.erase(item);
+    auto i = queue_pointers.find(item);
+    if (i != queue_pointers.end()) {
+      unsigned prio = i->second.first;
+      const Reservation& r = *i->second.second;
+      rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl;
+      delete r.grant;
+      delete r.preempt;
+      queues[prio].erase(i->second.second);
+      if (queues[prio].empty()) {
+       queues.erase(prio);
+      }
+      queue_pointers.erase(i);
     } else {
-      in_progress.erase(item);
+      auto p = in_progress.find(item);
+      if (p != in_progress.end()) {
+       rdout(10) << __func__ << " cancel " << p->second
+                 << " (was in progress)" << dendl;
+       if (p->second.preempt) {
+         preempt_by_prio.erase(make_pair(p->second.prio, p->second.item));
+         delete p->second.preempt;
+       }
+       in_progress.erase(p);
+      } else {
+       rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl;
+      }
     }
     do_queues();
   }
@@ -160,4 +246,5 @@ public:
   static const unsigned MAX_PRIORITY = (unsigned)-1;
 };
 
+#undef rdout
 #endif
index 105ca99b7e969f6843751268deacda59b7b8a458..6e6c26fa78f0b75590e998d2acf84979c20a75ed 100644 (file)
@@ -53,6 +53,7 @@ SUBSYS(tp, 0, 5)
 SUBSYS(auth, 1, 5)
 SUBSYS(crypto, 1, 5)
 SUBSYS(finisher, 1, 1)
+SUBSYS(reserver, 1, 1)
 SUBSYS(heartbeatmap, 1, 5)
 SUBSYS(perfcounter, 1, 5)
 SUBSYS(rgw, 1, 5)                 // log level for the Rados gateway
index 82ec76bf94ee49b1bac36894a54815cbc14f5126..4269e55da0073f14c712ac905cd494984a66be2b 100755 (executable)
@@ -776,6 +776,7 @@ else
         debug rocksdb = 10
         debug bdev = 20
         debug rgw = 20
+       debug reserver = 10
         debug objclass = 20'
     CMDSDEBUG='
         debug ms = 1