]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
throttle: add a BackoffThrottle implementation
authorSamuel Just <sjust@redhat.com>
Wed, 3 Feb 2016 03:00:26 +0000 (19:00 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 25 Feb 2016 19:11:44 +0000 (11:11 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/common/Throttle.cc
src/common/Throttle.h

index 469c8088de6019f00cfc9cb3d99bdae1994f07d2..3b909ee4112478c73363c2332e1568cc46d8753c 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <errno.h>
+#include <thread>
 
 #include "common/Throttle.h"
 #include "common/dout.h"
@@ -237,6 +238,185 @@ int64_t Throttle::put(int64_t c)
   return count.read();
 }
 
+bool BackoffThrottle::set_params(
+  double _low_threshhold,
+  double _high_threshhold,
+  double _expected_throughput,
+  double _high_multiple,
+  double _max_multiple,
+  uint64_t _throttle_max,
+  ostream *errstream)
+{
+  bool valid = true;
+  if (_low_threshhold > _high_threshhold) {
+    valid = false;
+    if (errstream) {
+      *errstream << "low_threshhold (" << _low_threshhold
+                << ") > high_threshhold (" << _high_threshhold
+                << ")" << std::endl;
+    }
+  }
+
+  if (_high_multiple > _max_multiple) {
+    valid = false;
+    if (errstream) {
+      *errstream << "_high_multiple (" << _high_multiple
+                << ") > _max_multiple (" << _max_multiple
+                << ")" << std::endl;
+    }
+  }
+
+  if (_low_threshhold > 1 || _low_threshhold < 0) {
+    valid = false;
+    if (errstream) {
+      *errstream << "invalid low_threshhold (" << _low_threshhold << ")"
+                << std::endl;
+    }
+  }
+
+  if (_high_threshhold > 1 || _high_threshhold < 0) {
+    valid = false;
+    if (errstream) {
+      *errstream << "invalid high_threshhold (" << _high_threshhold << ")"
+                << std::endl;
+    }
+  }
+
+  if (_max_multiple < 0) {
+    valid = false;
+    if (errstream) {
+      *errstream << "invalid _max_multiple ("
+                << _max_multiple << ")"
+                << std::endl;
+    }
+  }
+
+  if (_high_multiple < 0) {
+    valid = false;
+    if (errstream) {
+      *errstream << "invalid _high_multiple ("
+                << _high_multiple << ")"
+                << std::endl;
+    }
+  }
+
+  if (_expected_throughput < 0) {
+    valid = false;
+    if (errstream) {
+      *errstream << "invalid _expected_throughput("
+                << _expected_throughput << ")"
+                << std::endl;
+    }
+  }
+
+  if (!valid)
+    return false;
+
+  locker l(lock);
+  low_threshhold = _low_threshhold;
+  high_threshhold = _high_threshhold;
+  high_delay_per_count = _high_multiple / _expected_throughput;
+  max_delay_per_count = _max_multiple / _expected_throughput;
+  max = _throttle_max;
+
+  if (high_threshhold - low_threshhold > 0) {
+    s0 = high_delay_per_count / (high_threshhold - low_threshhold);
+  } else {
+    low_threshhold = high_threshhold;
+    s0 = 0;
+  }
+
+  if (1 - high_threshhold > 0) {
+    s1 = (max_delay_per_count - high_delay_per_count)
+      / (1 - high_threshhold);
+  } else {
+    high_threshhold = 1;
+    s1 = 0;
+  }
+
+  _kick_waiters();
+  return true;
+}
+
+std::chrono::duration<double> BackoffThrottle::_get_delay(uint64_t c) const
+{
+  if (max == 0)
+    return std::chrono::duration<double>(0);
+
+  double r = ((double)current) / ((double)max);
+  if (r < low_threshhold) {
+    return std::chrono::duration<double>(0);
+  } else if (r < high_threshhold) {
+    return c * std::chrono::duration<double>(
+      (r - low_threshhold) * s0);
+  } else {
+    return c * std::chrono::duration<double>(
+      high_delay_per_count + ((r - high_threshhold) * s1));
+  }
+}
+
+std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
+{
+  locker l(lock);
+  auto delay = _get_delay(c);
+
+  // fast path
+  if (delay == std::chrono::duration<double>(0) &&
+      waiters.empty() &&
+      ((max == 0) || (current == 0) || ((current + c) <= max))) {
+    current += c;
+    return std::chrono::duration<double>(0);
+  }
+
+  auto ticket = _push_waiter();
+
+  while (waiters.begin() != ticket) {
+    (*ticket)->wait(l);
+  }
+
+  auto start = std::chrono::system_clock::now();
+  delay = _get_delay(c);
+  while (((start + delay) > std::chrono::system_clock::now()) ||
+        !((max == 0) || (current == 0) || ((current + c) <= max))) {
+    assert(ticket == waiters.begin());
+    (*ticket)->wait_until(l, start + delay);
+    delay = _get_delay(c);
+  }
+  waiters.pop_front();
+  _kick_waiters();
+
+  current += c;
+  return std::chrono::system_clock::now() - start;
+}
+
+uint64_t BackoffThrottle::put(uint64_t c)
+{
+  locker l(lock);
+  assert(current >= c);
+  current -= c;
+  _kick_waiters();
+  return current;
+}
+
+uint64_t BackoffThrottle::take(uint64_t c)
+{
+  locker l(lock);
+  current += c;
+  return current;
+}
+
+uint64_t BackoffThrottle::get_current()
+{
+  locker l(lock);
+  return current;
+}
+
+uint64_t BackoffThrottle::get_max()
+{
+  locker l(lock);
+  return max;
+}
+
 SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
   : m_lock("SimpleThrottle"),
     m_max(max),
index c04a9319e5602d52aeb774ed248c2b6213c734da..f182c0c8797d69bd56074f97d4a47df18d412e15 100644 (file)
@@ -8,6 +8,9 @@
 #include "Cond.h"
 #include <list>
 #include <map>
+#include <iostream>
+#include <condition_variable>
+#include <chrono>
 #include "include/atomic.h"
 #include "include/Context.h"
 
@@ -111,6 +114,105 @@ public:
   }
 };
 
+/**
+ * BackoffThrottle
+ *
+ * Creates a throttle which gradually induces delays when get() is called
+ * based on params low_threshhold, high_threshhold, expected_throughput,
+ * high_multiple, and max_multiple.
+ *
+ * In [0, low_threshhold), we want no delay.
+ *
+ * In [low_threshhold, high_threshhold), delays should be injected based
+ * on a line from 0 at low_threshhold to
+ * high_multiple * (1/expected_throughput) at high_threshhold.
+ *
+ * In [high_threshhold, 1), we want delays injected based on a line from
+ * (high_multiple * (1/expected_throughput)) at high_threshhold to
+ * (high_multiple * (1/expected_throughput)) +
+ * (max_multiple * (1/expected_throughput)) at 1.
+ *
+ * Let the current throttle ratio (current/max) be r, low_threshhold be l,
+ * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
+ * and max_delay (max_muliple / expected_throughput) be m.
+ *
+ * delay = 0, r \in [0, l)
+ * delay = (r - l) * (e / (h - l)), r \in [l, h)
+ * delay = h + (r - h)((m - e)/(1 - h))
+ */
+class BackoffThrottle {
+  std::mutex lock;
+  using locker = std::unique_lock<std::mutex>;
+
+  unsigned next_cond = 0;
+
+  /// allocated once to avoid constantly allocating new ones
+  vector<std::condition_variable> conds;
+
+  /// pointers into conds
+  list<std::condition_variable*> waiters;
+
+  std::list<std::condition_variable*>::iterator _push_waiter() {
+    unsigned next = next_cond++;
+    if (next_cond == conds.size())
+      next_cond = 0;
+    return waiters.insert(waiters.end(), &(conds[next]));
+  }
+
+  void _kick_waiters() {
+    if (!waiters.empty())
+      waiters.front()->notify_all();
+  }
+
+  /// see above, values are in [0, 1].
+  double low_threshhold = 0;
+  double high_threshhold = 1;
+
+  /// see above, values are in seconds
+  double high_delay_per_count = 0;
+  double max_delay_per_count = 0;
+
+  /// Filled in in set_params
+  double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
+  double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
+
+  /// max
+  uint64_t max = 0;
+  uint64_t current = 0;
+
+  std::chrono::duration<double> _get_delay(uint64_t c) const;
+
+public:
+  /**
+   * set_params
+   *
+   * Sets params.  If the params are invalid, returns false
+   * and populates errstream (if non-null) with a user compreshensible
+   * explanation.
+   */
+  bool set_params(
+    double low_threshhold,
+    double high_threshhold,
+    double expected_throughput,
+    double high_multiple,
+    double max_multiple,
+    uint64_t throttle_max,
+    ostream *errstream);
+
+  std::chrono::duration<double> get(uint64_t c = 1);
+  std::chrono::duration<double> wait() {
+    return get(0);
+  }
+  uint64_t put(uint64_t c = 1);
+  uint64_t take(uint64_t c = 1);
+  uint64_t get_current();
+  uint64_t get_max();
+
+  BackoffThrottle(
+    unsigned expected_concurrency ///< [in] determines size of conds
+    ) : conds(expected_concurrency) {}
+};
+
 
 /**
  * @class SimpleThrottle