++m_complete_tid;
}
}
+
+uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) {
+ if (0 == max) {
+ return 0;
+ }
+
+ uint64_t got = 0;
+ if (remain >= c) {
+ // There is enough token in bucket, take c.
+ got = c;
+ remain -= c;
+ } else {
+ // There is not enough, take all remain.
+ got = remain;
+ remain = 0;
+ }
+ return got;
+}
+
+uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) {
+ if (0 == max) {
+ return 0;
+ }
+ if (c) {
+ // put c tokens into bucket
+ uint64_t current = remain;
+ if ((current + c) <= (uint64_t)max) {
+ remain += c;
+ } else {
+ remain = (uint64_t)max;
+ }
+ }
+ return remain;
+}
+
+void TokenBucketThrottle::Bucket::set_max(uint64_t m) {
+ if (remain > m)
+ remain = m;
+ max = m;
+}
+
+TokenBucketThrottle::TokenBucketThrottle(
+ CephContext *cct,
+ uint64_t capacity,
+ uint64_t avg,
+ SafeTimer *timer,
+ Mutex *timer_lock)
+ : m_cct(cct), m_throttle(m_cct, "token_bucket_throttle", capacity),
+ m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
+ m_lock("token_bucket_throttle_lock")
+{
+ Mutex::Locker timer_locker(*m_timer_lock);
+ schedule_timer();
+}
+
+TokenBucketThrottle::~TokenBucketThrottle()
+{
+ // cancel the timer events.
+ {
+ Mutex::Locker timer_locker(*m_timer_lock);
+ cancel_timer();
+ }
+
+ list<Blocker> tmp_blockers;
+ {
+ Mutex::Locker blockers_lock(m_lock);
+ tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end());
+ }
+
+ for (auto b : tmp_blockers) {
+ b.ctx->complete(0);
+ }
+}
+
+void TokenBucketThrottle::set_max(uint64_t m) {
+ Mutex::Locker lock(m_lock);
+ m_throttle.set_max(m);
+}
+
+void TokenBucketThrottle::set_average(uint64_t avg) {
+ m_avg = avg;
+}
+
+void TokenBucketThrottle::add_tokens() {
+ list<Blocker> tmp_blockers;
+ {
+ // put m_avg tokens into bucket.
+ Mutex::Locker lock(m_lock);
+ m_throttle.put(m_avg);
+ // check the m_blockers from head to tail, if blocker can get
+ // enough tokens, let it go.
+ while (!m_blockers.empty()) {
+ Blocker blocker = m_blockers.front();
+ uint64_t got = m_throttle.get(blocker.tokens_requested);
+ if (got == blocker.tokens_requested) {
+ // got enough tokens for front.
+ tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
+ } else {
+ // there is no more tokens.
+ blocker.tokens_requested -= got;
+ break;
+ }
+ }
+ }
+
+ for (auto b : tmp_blockers) {
+ b.ctx->complete(0);
+ }
+}
+
+void TokenBucketThrottle::schedule_timer() {
+ add_tokens();
+
+ m_token_ctx = new FunctionContext(
+ [this](int r) {
+ schedule_timer();
+ });
+
+ m_timer->add_event_after(1, m_token_ctx);
+}
+
+void TokenBucketThrottle::cancel_timer() {
+ m_timer->cancel_event(m_token_ctx);
+}
#include <mutex>
#include "include/Context.h"
+#include "common/Timer.h"
#include "common/convenience.h"
#include "common/perf_counters.h"
uint32_t waiters = 0;
};
+
+class TokenBucketThrottle {
+
+ struct Bucket {
+ CephContext *cct;
+ const std::string name;
+ std::atomic<uint64_t> remain = { 0 }, max = { 0 };
+
+ Bucket(CephContext *cct, const std::string& n, uint64_t m)
+ : cct(cct), name(n),
+ remain(m), max(m)
+ {
+ }
+
+ uint64_t get(uint64_t c);
+ uint64_t put(uint64_t c);
+ void set_max(uint64_t m);
+ };
+
+ struct Blocker {
+ uint64_t tokens_requested;
+ Context *ctx;
+
+ Blocker(uint64_t _tokens_requested, Context* _ctx)
+ : tokens_requested(_tokens_requested), ctx(_ctx) {}
+ };
+
+ CephContext *m_cct;
+ Bucket m_throttle;
+ uint64_t m_avg = 0;
+ SafeTimer *m_timer;
+ Mutex *m_timer_lock;
+ FunctionContext *m_token_ctx = nullptr;
+ list<Blocker> m_blockers;
+ Mutex m_lock;
+
+public:
+ TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg,
+ SafeTimer *timer, Mutex *timer_lock);
+
+ ~TokenBucketThrottle();
+
+ template <typename T, typename I, void(T::*MF)(int, I*)>
+ bool get(uint64_t c, T *handler, I *item) {
+ if (0 == m_throttle.max)
+ return false;
+
+ bool waited = false;
+
+ Mutex::Locker lock(m_lock);
+ uint64_t got = m_throttle.get(c);
+ if (got < c) {
+ // Not enough tokens, add a blocker for it.
+ Context *ctx = new FunctionContext([this, handler, item](int r) {
+ (handler->*MF)(r, item);
+ });
+ m_blockers.emplace_back(c - got, ctx);
+ waited = true;
+ }
+ return waited;
+ }
+
+
+ void set_max(uint64_t m);
+ void set_average(uint64_t avg);
+
+private:
+ void add_tokens();
+ void schedule_timer();
+ void cancel_timer();
+};
+
#endif