From b0a8b1cef4bf80fefd44537baf414da84acbf81a Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 2 Feb 2016 19:00:26 -0800 Subject: [PATCH] throttle: add a BackoffThrottle implementation Signed-off-by: Samuel Just --- src/common/Throttle.cc | 180 +++++++++++++++++++++++++++++++++++++++++ src/common/Throttle.h | 102 +++++++++++++++++++++++ 2 files changed, 282 insertions(+) diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index 469c8088de601..3b909ee411247 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include +#include #include "common/Throttle.h" #include "common/dout.h" @@ -237,6 +238,185 @@ int64_t Throttle::put(int64_t c) return count.read(); } +bool BackoffThrottle::set_params( + double _low_threshhold, + double _high_threshhold, + double _expected_throughput, + double _high_multiple, + double _max_multiple, + uint64_t _throttle_max, + ostream *errstream) +{ + bool valid = true; + if (_low_threshhold > _high_threshhold) { + valid = false; + if (errstream) { + *errstream << "low_threshhold (" << _low_threshhold + << ") > high_threshhold (" << _high_threshhold + << ")" << std::endl; + } + } + + if (_high_multiple > _max_multiple) { + valid = false; + if (errstream) { + *errstream << "_high_multiple (" << _high_multiple + << ") > _max_multiple (" << _max_multiple + << ")" << std::endl; + } + } + + if (_low_threshhold > 1 || _low_threshhold < 0) { + valid = false; + if (errstream) { + *errstream << "invalid low_threshhold (" << _low_threshhold << ")" + << std::endl; + } + } + + if (_high_threshhold > 1 || _high_threshhold < 0) { + valid = false; + if (errstream) { + *errstream << "invalid high_threshhold (" << _high_threshhold << ")" + << std::endl; + } + } + + if (_max_multiple < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _max_multiple (" + << _max_multiple << ")" + << std::endl; + } + } + + if (_high_multiple < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _high_multiple (" + << _high_multiple << ")" + << std::endl; + } + } + + if (_expected_throughput < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _expected_throughput(" + << _expected_throughput << ")" + << std::endl; + } + } + + if (!valid) + return false; + + locker l(lock); + low_threshhold = _low_threshhold; + high_threshhold = _high_threshhold; + high_delay_per_count = _high_multiple / _expected_throughput; + max_delay_per_count = _max_multiple / _expected_throughput; + max = _throttle_max; + + if (high_threshhold - low_threshhold > 0) { + s0 = high_delay_per_count / (high_threshhold - low_threshhold); + } else { + low_threshhold = high_threshhold; + s0 = 0; + } + + if (1 - high_threshhold > 0) { + s1 = (max_delay_per_count - high_delay_per_count) + / (1 - high_threshhold); + } else { + high_threshhold = 1; + s1 = 0; + } + + _kick_waiters(); + return true; +} + +std::chrono::duration BackoffThrottle::_get_delay(uint64_t c) const +{ + if (max == 0) + return std::chrono::duration(0); + + double r = ((double)current) / ((double)max); + if (r < low_threshhold) { + return std::chrono::duration(0); + } else if (r < high_threshhold) { + return c * std::chrono::duration( + (r - low_threshhold) * s0); + } else { + return c * std::chrono::duration( + high_delay_per_count + ((r - high_threshhold) * s1)); + } +} + +std::chrono::duration BackoffThrottle::get(uint64_t c) +{ + locker l(lock); + auto delay = _get_delay(c); + + // fast path + if (delay == std::chrono::duration(0) && + waiters.empty() && + ((max == 0) || (current == 0) || ((current + c) <= max))) { + current += c; + return std::chrono::duration(0); + } + + auto ticket = _push_waiter(); + + while (waiters.begin() != ticket) { + (*ticket)->wait(l); + } + + auto start = std::chrono::system_clock::now(); + delay = _get_delay(c); + while (((start + delay) > std::chrono::system_clock::now()) || + !((max == 0) || (current == 0) || ((current + c) <= max))) { + assert(ticket == waiters.begin()); + (*ticket)->wait_until(l, start + delay); + delay = _get_delay(c); + } + waiters.pop_front(); + _kick_waiters(); + + current += c; + return std::chrono::system_clock::now() - start; +} + +uint64_t BackoffThrottle::put(uint64_t c) +{ + locker l(lock); + assert(current >= c); + current -= c; + _kick_waiters(); + return current; +} + +uint64_t BackoffThrottle::take(uint64_t c) +{ + locker l(lock); + current += c; + return current; +} + +uint64_t BackoffThrottle::get_current() +{ + locker l(lock); + return current; +} + +uint64_t BackoffThrottle::get_max() +{ + locker l(lock); + return max; +} + SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent) : m_lock("SimpleThrottle"), m_max(max), diff --git a/src/common/Throttle.h b/src/common/Throttle.h index c04a9319e5602..f182c0c8797d6 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -8,6 +8,9 @@ #include "Cond.h" #include #include +#include +#include +#include #include "include/atomic.h" #include "include/Context.h" @@ -111,6 +114,105 @@ public: } }; +/** + * BackoffThrottle + * + * Creates a throttle which gradually induces delays when get() is called + * based on params low_threshhold, high_threshhold, expected_throughput, + * high_multiple, and max_multiple. + * + * In [0, low_threshhold), we want no delay. + * + * In [low_threshhold, high_threshhold), delays should be injected based + * on a line from 0 at low_threshhold to + * high_multiple * (1/expected_throughput) at high_threshhold. + * + * In [high_threshhold, 1), we want delays injected based on a line from + * (high_multiple * (1/expected_throughput)) at high_threshhold to + * (high_multiple * (1/expected_throughput)) + + * (max_multiple * (1/expected_throughput)) at 1. + * + * Let the current throttle ratio (current/max) be r, low_threshhold be l, + * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e, + * and max_delay (max_muliple / expected_throughput) be m. + * + * delay = 0, r \in [0, l) + * delay = (r - l) * (e / (h - l)), r \in [l, h) + * delay = h + (r - h)((m - e)/(1 - h)) + */ +class BackoffThrottle { + std::mutex lock; + using locker = std::unique_lock; + + unsigned next_cond = 0; + + /// allocated once to avoid constantly allocating new ones + vector conds; + + /// pointers into conds + list waiters; + + std::list::iterator _push_waiter() { + unsigned next = next_cond++; + if (next_cond == conds.size()) + next_cond = 0; + return waiters.insert(waiters.end(), &(conds[next])); + } + + void _kick_waiters() { + if (!waiters.empty()) + waiters.front()->notify_all(); + } + + /// see above, values are in [0, 1]. + double low_threshhold = 0; + double high_threshhold = 1; + + /// see above, values are in seconds + double high_delay_per_count = 0; + double max_delay_per_count = 0; + + /// Filled in in set_params + double s0 = 0; ///< e / (h - l), l != h, 0 otherwise + double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise + + /// max + uint64_t max = 0; + uint64_t current = 0; + + std::chrono::duration _get_delay(uint64_t c) const; + +public: + /** + * set_params + * + * Sets params. If the params are invalid, returns false + * and populates errstream (if non-null) with a user compreshensible + * explanation. + */ + bool set_params( + double low_threshhold, + double high_threshhold, + double expected_throughput, + double high_multiple, + double max_multiple, + uint64_t throttle_max, + ostream *errstream); + + std::chrono::duration get(uint64_t c = 1); + std::chrono::duration wait() { + return get(0); + } + uint64_t put(uint64_t c = 1); + uint64_t take(uint64_t c = 1); + uint64_t get_current(); + uint64_t get_max(); + + BackoffThrottle( + unsigned expected_concurrency ///< [in] determines size of conds + ) : conds(expected_concurrency) {} +}; + /** * @class SimpleThrottle -- 2.39.5