From: Shiyang Ruan Date: Fri, 28 Sep 2018 06:26:58 +0000 (+0800) Subject: TokenBucketThrottle: reduce the TokenBucket fill period. X-Git-Tag: v14.1.0~946^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=758223ee6c4b1285a13405c028d1befb33a4f167;p=ceph.git TokenBucketThrottle: reduce the TokenBucket fill period. TokenBucket shall put one token per 1/cir second (one schedule tick for QoS, set the minimum 50ms here). The cir is Committed Information Rate, which is defined by TokenBucket. Also means the IOPS(or BPS) limit we set. Signed-off-by: Shiyang Ruan --- diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index 9a4a10c1b15..8349347cec8 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -672,21 +672,23 @@ uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) { if (0 == max) { return 0; } + if (c) { // put c tokens into bucket uint64_t current = remain; - if ((current + c) <= (uint64_t)max) { + if ((current + c) <= max) { remain += c; } else { - remain = (uint64_t)max; + remain = max; } } return remain; } void TokenBucketThrottle::Bucket::set_max(uint64_t m) { - if (remain > m || max == 0) + if (remain > m || 0 == m) { remain = m; + } max = m; } @@ -699,13 +701,9 @@ TokenBucketThrottle::TokenBucketThrottle( : m_cct(cct), m_throttle(m_cct, "token_bucket_throttle", capacity), m_avg(avg), m_timer(timer), m_timer_lock(timer_lock), m_lock("token_bucket_throttle_lock") -{ - std::lock_guard timer_locker(*m_timer_lock); - schedule_timer(); -} +{} -TokenBucketThrottle::~TokenBucketThrottle() -{ +TokenBucketThrottle::~TokenBucketThrottle() { // cancel the timer events. { std::lock_guard timer_locker(*m_timer_lock); @@ -723,33 +721,94 @@ TokenBucketThrottle::~TokenBucketThrottle() } } -void TokenBucketThrottle::set_max(uint64_t m) { +int TokenBucketThrottle::set_burst(uint64_t burst){ std::lock_guard lock(m_lock); - m_throttle.set_max(m); + if (0 < burst && burst < m_avg) { + // the burst should never less than the average. + return -EINVAL; + } else { + m_burst = burst; + } + // for the default configuration of burst. + m_throttle.set_max(0 == m_burst ? m_avg : m_burst); + return 0; +} + +int TokenBucketThrottle::set_average(uint64_t avg) { + { + std::lock_guard lock(m_lock); + m_avg = avg; + + if (0 < m_burst && m_burst < avg) { + // the burst should never less than the average. + return -EINVAL; + } else if (0 == avg) { + // The limit is not set, and no tokens will be put into the bucket. + // So, we can schedule the timer slowly, or even cancel it. + m_tick = 1000; + } else { + // calculate the tick(ms), don't less than the minimum. + m_tick = 1000 / avg; + if (m_tick < m_tick_min) { + m_tick = m_tick_min; + } + + // this is for the number(avg) can not be divisible. + m_ticks_per_second = 1000 / m_tick; + m_current_tick = 0; + + // for the default configuration of burst. + if (0 == m_burst) { + m_throttle.set_max(m_avg); + } + } + // turn millisecond to second + m_schedule_tick = m_tick / 1000.0; + } + + // The schedule period will be changed when the average rate is set. + { + std::lock_guard timer_locker(*m_timer_lock); + cancel_timer(); + schedule_timer(); + } + return 0; +} + +uint64_t TokenBucketThrottle::tokens_filled(double tick) { + return (0 == m_avg) ? 0 : (tick / m_ticks_per_second * m_avg); } -void TokenBucketThrottle::set_average(uint64_t avg) { - m_avg = avg; +uint64_t TokenBucketThrottle::tokens_this_tick() { + if (0 == m_avg) { + return 0; + } + if (m_current_tick >= m_ticks_per_second) { + m_current_tick = 0; + } + m_current_tick++; + + return tokens_filled(m_current_tick) - tokens_filled(m_current_tick - 1); } void TokenBucketThrottle::add_tokens() { list tmp_blockers; { - // put m_avg tokens into bucket. std::lock_guard lock(m_lock); - m_throttle.put(m_avg); + // put tokens into bucket. + m_throttle.put(tokens_this_tick()); // check the m_blockers from head to tail, if blocker can get // enough tokens, let it go. while (!m_blockers.empty()) { Blocker blocker = m_blockers.front(); uint64_t got = m_throttle.get(blocker.tokens_requested); if (got == blocker.tokens_requested) { - // got enough tokens for front. - tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin()); + // got enough tokens for front. + tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin()); } else { - // there is no more tokens. - blocker.tokens_requested -= got; - break; + // there is no more tokens. + blocker.tokens_requested -= got; + break; } } } @@ -760,14 +819,13 @@ void TokenBucketThrottle::add_tokens() { } void TokenBucketThrottle::schedule_timer() { - add_tokens(); - m_token_ctx = new FunctionContext( [this](int r) { schedule_timer(); }); + m_timer->add_event_after(m_schedule_tick, m_token_ctx); - m_timer->add_event_after(1, m_token_ctx); + add_tokens(); } void TokenBucketThrottle::cancel_timer() { diff --git a/src/common/Throttle.h b/src/common/Throttle.h index 82bf8264b37..96d55d13c2e 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -334,17 +334,15 @@ private: class TokenBucketThrottle { - struct Bucket { CephContext *cct; const std::string name; - std::atomic remain = { 0 }, max = { 0 }; - Bucket(CephContext *cct, const std::string& n, uint64_t m) - : cct(cct), name(n), - remain(m), max(m) - { - } + uint64_t remain; + uint64_t max; + + Bucket(CephContext *cct, const std::string &name, uint64_t m) + : cct(cct), name(name), remain(m), max(m) {} uint64_t get(uint64_t c); uint64_t put(uint64_t c); @@ -362,15 +360,55 @@ class TokenBucketThrottle { CephContext *m_cct; Bucket m_throttle; uint64_t m_avg = 0; + uint64_t m_burst = 0; SafeTimer *m_timer; Mutex *m_timer_lock; FunctionContext *m_token_ctx = nullptr; list m_blockers; Mutex m_lock; + // minimum of the filling period. + static const uint64_t m_tick_min = 50; + // tokens filling period, its unit is millisecond. + uint64_t m_tick = 0; + /** + * These variables are used to calculate how many tokens need to be put into + * the bucket within each tick. + * + * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second) + * may be a floating point number, but we need an 'uint64_t' to put into the + * bucket. + * + * For example, we set the value of rate to be 950, means 950 iops(or bps). + * + * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052, + * which is too small for the SafeTimer. So we should set the period(m_tick) + * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second). + * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer. + * + * To resolve this, we use a method called tokens_filled(m_current_tick) to + * calculate how many tokens will be put so far(until m_current_tick): + * + * tokens_filled = m_current_tick / m_ticks_per_second * m_avg + * + * And the difference between two ticks will be the result we expect. + * tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) = 47 - 0 = 47 + * tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) = 95 - 47 = 48 + * tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 - 95 = 47 + * + * As a result, the tokens filled in one second will shown as this: + * tick | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20| + * tokens |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48| + */ + uint64_t m_ticks_per_second = 0; + uint64_t m_current_tick = 0; + + // period for the bucket filling tokens, its unit is seconds. + double m_schedule_tick = 1.0; + public: TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg, - SafeTimer *timer, Mutex *timer_lock); + SafeTimer *timer, Mutex *timer_lock); ~TokenBucketThrottle(); @@ -384,7 +422,7 @@ public: template bool get(uint64_t c, T *handler, I *item, uint64_t flag) { - if (0 == m_throttle.max) + if (0 == m_avg) return false; bool wait = false; @@ -407,10 +445,12 @@ public: return wait; } - void set_max(uint64_t m); - void set_average(uint64_t avg); + int set_burst(uint64_t burst); + int set_average(uint64_t avg); private: + uint64_t tokens_filled(double tick); + uint64_t tokens_this_tick(); void add_tokens(); void schedule_timer(); void cancel_timer(); diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc index 393211a0dca..deeef4531cb 100644 --- a/src/librbd/io/ImageRequestWQ.cc +++ b/src/librbd/io/ImageRequestWQ.cc @@ -624,7 +624,7 @@ void ImageRequestWQ::apply_qos_limit(uint64_t limit, const uint64_t flag) { } } ceph_assert(throttle != nullptr); - throttle->set_max(limit); + throttle->set_burst(limit); throttle->set_average(limit); if (limit) m_qos_enabled_flag |= flag;