From: Dongsheng Yang Date: Thu, 3 Aug 2017 07:21:23 +0000 (+0800) Subject: Throttle: add a new TokenBucketThrottle X-Git-Tag: v13.0.1~220^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8366ebceb54c138ff33523e467ae655d6c0fc194;p=ceph.git Throttle: add a new TokenBucketThrottle Signed-off-by: Dongsheng Yang --- diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index d0d3f612e33..9b30ef3ed3c 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -650,3 +650,127 @@ void OrderedThrottle::complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l) { ++m_complete_tid; } } + +uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) { + if (0 == max) { + return 0; + } + + uint64_t got = 0; + if (remain >= c) { + // There is enough token in bucket, take c. + got = c; + remain -= c; + } else { + // There is not enough, take all remain. + got = remain; + remain = 0; + } + return got; +} + +uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) { + if (0 == max) { + return 0; + } + if (c) { + // put c tokens into bucket + uint64_t current = remain; + if ((current + c) <= (uint64_t)max) { + remain += c; + } else { + remain = (uint64_t)max; + } + } + return remain; +} + +void TokenBucketThrottle::Bucket::set_max(uint64_t m) { + if (remain > m) + remain = m; + max = m; +} + +TokenBucketThrottle::TokenBucketThrottle( + CephContext *cct, + uint64_t capacity, + uint64_t avg, + SafeTimer *timer, + Mutex *timer_lock) + : m_cct(cct), m_throttle(m_cct, "token_bucket_throttle", capacity), + m_avg(avg), m_timer(timer), m_timer_lock(timer_lock), + m_lock("token_bucket_throttle_lock") +{ + Mutex::Locker timer_locker(*m_timer_lock); + schedule_timer(); +} + +TokenBucketThrottle::~TokenBucketThrottle() +{ + // cancel the timer events. + { + Mutex::Locker timer_locker(*m_timer_lock); + cancel_timer(); + } + + list tmp_blockers; + { + Mutex::Locker blockers_lock(m_lock); + tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end()); + } + + for (auto b : tmp_blockers) { + b.ctx->complete(0); + } +} + +void TokenBucketThrottle::set_max(uint64_t m) { + Mutex::Locker lock(m_lock); + m_throttle.set_max(m); +} + +void TokenBucketThrottle::set_average(uint64_t avg) { + m_avg = avg; +} + +void TokenBucketThrottle::add_tokens() { + list tmp_blockers; + { + // put m_avg tokens into bucket. + Mutex::Locker lock(m_lock); + m_throttle.put(m_avg); + // check the m_blockers from head to tail, if blocker can get + // enough tokens, let it go. + while (!m_blockers.empty()) { + Blocker blocker = m_blockers.front(); + uint64_t got = m_throttle.get(blocker.tokens_requested); + if (got == blocker.tokens_requested) { + // got enough tokens for front. + tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin()); + } else { + // there is no more tokens. + blocker.tokens_requested -= got; + break; + } + } + } + + for (auto b : tmp_blockers) { + b.ctx->complete(0); + } +} + +void TokenBucketThrottle::schedule_timer() { + add_tokens(); + + m_token_ctx = new FunctionContext( + [this](int r) { + schedule_timer(); + }); + + m_timer->add_event_after(1, m_token_ctx); +} + +void TokenBucketThrottle::cancel_timer() { + m_timer->cancel_event(m_token_ctx); +} diff --git a/src/common/Throttle.h b/src/common/Throttle.h index 162e597a655..9081c006b77 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -13,6 +13,7 @@ #include #include "include/Context.h" +#include "common/Timer.h" #include "common/convenience.h" #include "common/perf_counters.h" @@ -330,4 +331,76 @@ private: uint32_t waiters = 0; }; + +class TokenBucketThrottle { + + struct Bucket { + CephContext *cct; + const std::string name; + std::atomic remain = { 0 }, max = { 0 }; + + Bucket(CephContext *cct, const std::string& n, uint64_t m) + : cct(cct), name(n), + remain(m), max(m) + { + } + + uint64_t get(uint64_t c); + uint64_t put(uint64_t c); + void set_max(uint64_t m); + }; + + struct Blocker { + uint64_t tokens_requested; + Context *ctx; + + Blocker(uint64_t _tokens_requested, Context* _ctx) + : tokens_requested(_tokens_requested), ctx(_ctx) {} + }; + + CephContext *m_cct; + Bucket m_throttle; + uint64_t m_avg = 0; + SafeTimer *m_timer; + Mutex *m_timer_lock; + FunctionContext *m_token_ctx = nullptr; + list m_blockers; + Mutex m_lock; + +public: + TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg, + SafeTimer *timer, Mutex *timer_lock); + + ~TokenBucketThrottle(); + + template + bool get(uint64_t c, T *handler, I *item) { + if (0 == m_throttle.max) + return false; + + bool waited = false; + + Mutex::Locker lock(m_lock); + uint64_t got = m_throttle.get(c); + if (got < c) { + // Not enough tokens, add a blocker for it. + Context *ctx = new FunctionContext([this, handler, item](int r) { + (handler->*MF)(r, item); + }); + m_blockers.emplace_back(c - got, ctx); + waited = true; + } + return waited; + } + + + void set_max(uint64_t m); + void set_average(uint64_t avg); + +private: + void add_tokens(); + void schedule_timer(); + void cancel_timer(); +}; + #endif