// vim: ts=8 sw=2 smarttab
#include <errno.h>
+#include <thread>
#include "common/Throttle.h"
#include "common/dout.h"
return count.read();
}
+bool BackoffThrottle::set_params(
+ double _low_threshhold,
+ double _high_threshhold,
+ double _expected_throughput,
+ double _high_multiple,
+ double _max_multiple,
+ uint64_t _throttle_max,
+ ostream *errstream)
+{
+ bool valid = true;
+ if (_low_threshhold > _high_threshhold) {
+ valid = false;
+ if (errstream) {
+ *errstream << "low_threshhold (" << _low_threshhold
+ << ") > high_threshhold (" << _high_threshhold
+ << ")" << std::endl;
+ }
+ }
+
+ if (_high_multiple > _max_multiple) {
+ valid = false;
+ if (errstream) {
+ *errstream << "_high_multiple (" << _high_multiple
+ << ") > _max_multiple (" << _max_multiple
+ << ")" << std::endl;
+ }
+ }
+
+ if (_low_threshhold > 1 || _low_threshhold < 0) {
+ valid = false;
+ if (errstream) {
+ *errstream << "invalid low_threshhold (" << _low_threshhold << ")"
+ << std::endl;
+ }
+ }
+
+ if (_high_threshhold > 1 || _high_threshhold < 0) {
+ valid = false;
+ if (errstream) {
+ *errstream << "invalid high_threshhold (" << _high_threshhold << ")"
+ << std::endl;
+ }
+ }
+
+ if (_max_multiple < 0) {
+ valid = false;
+ if (errstream) {
+ *errstream << "invalid _max_multiple ("
+ << _max_multiple << ")"
+ << std::endl;
+ }
+ }
+
+ if (_high_multiple < 0) {
+ valid = false;
+ if (errstream) {
+ *errstream << "invalid _high_multiple ("
+ << _high_multiple << ")"
+ << std::endl;
+ }
+ }
+
+ if (_expected_throughput < 0) {
+ valid = false;
+ if (errstream) {
+ *errstream << "invalid _expected_throughput("
+ << _expected_throughput << ")"
+ << std::endl;
+ }
+ }
+
+ if (!valid)
+ return false;
+
+ locker l(lock);
+ low_threshhold = _low_threshhold;
+ high_threshhold = _high_threshhold;
+ high_delay_per_count = _high_multiple / _expected_throughput;
+ max_delay_per_count = _max_multiple / _expected_throughput;
+ max = _throttle_max;
+
+ if (high_threshhold - low_threshhold > 0) {
+ s0 = high_delay_per_count / (high_threshhold - low_threshhold);
+ } else {
+ low_threshhold = high_threshhold;
+ s0 = 0;
+ }
+
+ if (1 - high_threshhold > 0) {
+ s1 = (max_delay_per_count - high_delay_per_count)
+ / (1 - high_threshhold);
+ } else {
+ high_threshhold = 1;
+ s1 = 0;
+ }
+
+ _kick_waiters();
+ return true;
+}
+
+std::chrono::duration<double> BackoffThrottle::_get_delay(uint64_t c) const
+{
+ if (max == 0)
+ return std::chrono::duration<double>(0);
+
+ double r = ((double)current) / ((double)max);
+ if (r < low_threshhold) {
+ return std::chrono::duration<double>(0);
+ } else if (r < high_threshhold) {
+ return c * std::chrono::duration<double>(
+ (r - low_threshhold) * s0);
+ } else {
+ return c * std::chrono::duration<double>(
+ high_delay_per_count + ((r - high_threshhold) * s1));
+ }
+}
+
+std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
+{
+ locker l(lock);
+ auto delay = _get_delay(c);
+
+ // fast path
+ if (delay == std::chrono::duration<double>(0) &&
+ waiters.empty() &&
+ ((max == 0) || (current == 0) || ((current + c) <= max))) {
+ current += c;
+ return std::chrono::duration<double>(0);
+ }
+
+ auto ticket = _push_waiter();
+
+ while (waiters.begin() != ticket) {
+ (*ticket)->wait(l);
+ }
+
+ auto start = std::chrono::system_clock::now();
+ delay = _get_delay(c);
+ while (((start + delay) > std::chrono::system_clock::now()) ||
+ !((max == 0) || (current == 0) || ((current + c) <= max))) {
+ assert(ticket == waiters.begin());
+ (*ticket)->wait_until(l, start + delay);
+ delay = _get_delay(c);
+ }
+ waiters.pop_front();
+ _kick_waiters();
+
+ current += c;
+ return std::chrono::system_clock::now() - start;
+}
+
+uint64_t BackoffThrottle::put(uint64_t c)
+{
+ locker l(lock);
+ assert(current >= c);
+ current -= c;
+ _kick_waiters();
+ return current;
+}
+
+uint64_t BackoffThrottle::take(uint64_t c)
+{
+ locker l(lock);
+ current += c;
+ return current;
+}
+
+uint64_t BackoffThrottle::get_current()
+{
+ locker l(lock);
+ return current;
+}
+
+uint64_t BackoffThrottle::get_max()
+{
+ locker l(lock);
+ return max;
+}
+
SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
: m_lock("SimpleThrottle"),
m_max(max),
#include "Cond.h"
#include <list>
#include <map>
+#include <iostream>
+#include <condition_variable>
+#include <chrono>
#include "include/atomic.h"
#include "include/Context.h"
}
};
+/**
+ * BackoffThrottle
+ *
+ * Creates a throttle which gradually induces delays when get() is called
+ * based on params low_threshhold, high_threshhold, expected_throughput,
+ * high_multiple, and max_multiple.
+ *
+ * In [0, low_threshhold), we want no delay.
+ *
+ * In [low_threshhold, high_threshhold), delays should be injected based
+ * on a line from 0 at low_threshhold to
+ * high_multiple * (1/expected_throughput) at high_threshhold.
+ *
+ * In [high_threshhold, 1), we want delays injected based on a line from
+ * (high_multiple * (1/expected_throughput)) at high_threshhold to
+ * (high_multiple * (1/expected_throughput)) +
+ * (max_multiple * (1/expected_throughput)) at 1.
+ *
+ * Let the current throttle ratio (current/max) be r, low_threshhold be l,
+ * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
+ * and max_delay (max_muliple / expected_throughput) be m.
+ *
+ * delay = 0, r \in [0, l)
+ * delay = (r - l) * (e / (h - l)), r \in [l, h)
+ * delay = h + (r - h)((m - e)/(1 - h))
+ */
+class BackoffThrottle {
+ std::mutex lock;
+ using locker = std::unique_lock<std::mutex>;
+
+ unsigned next_cond = 0;
+
+ /// allocated once to avoid constantly allocating new ones
+ vector<std::condition_variable> conds;
+
+ /// pointers into conds
+ list<std::condition_variable*> waiters;
+
+ std::list<std::condition_variable*>::iterator _push_waiter() {
+ unsigned next = next_cond++;
+ if (next_cond == conds.size())
+ next_cond = 0;
+ return waiters.insert(waiters.end(), &(conds[next]));
+ }
+
+ void _kick_waiters() {
+ if (!waiters.empty())
+ waiters.front()->notify_all();
+ }
+
+ /// see above, values are in [0, 1].
+ double low_threshhold = 0;
+ double high_threshhold = 1;
+
+ /// see above, values are in seconds
+ double high_delay_per_count = 0;
+ double max_delay_per_count = 0;
+
+ /// Filled in in set_params
+ double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
+ double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
+
+ /// max
+ uint64_t max = 0;
+ uint64_t current = 0;
+
+ std::chrono::duration<double> _get_delay(uint64_t c) const;
+
+public:
+ /**
+ * set_params
+ *
+ * Sets params. If the params are invalid, returns false
+ * and populates errstream (if non-null) with a user compreshensible
+ * explanation.
+ */
+ bool set_params(
+ double low_threshhold,
+ double high_threshhold,
+ double expected_throughput,
+ double high_multiple,
+ double max_multiple,
+ uint64_t throttle_max,
+ ostream *errstream);
+
+ std::chrono::duration<double> get(uint64_t c = 1);
+ std::chrono::duration<double> wait() {
+ return get(0);
+ }
+ uint64_t put(uint64_t c = 1);
+ uint64_t take(uint64_t c = 1);
+ uint64_t get_current();
+ uint64_t get_max();
+
+ BackoffThrottle(
+ unsigned expected_concurrency ///< [in] determines size of conds
+ ) : conds(expected_concurrency) {}
+};
+
/**
* @class SimpleThrottle