if (0 == max) {
return 0;
}
+
if (c) {
// put c tokens into bucket
uint64_t current = remain;
- if ((current + c) <= (uint64_t)max) {
+ if ((current + c) <= max) {
remain += c;
} else {
- remain = (uint64_t)max;
+ remain = max;
}
}
return remain;
}
void TokenBucketThrottle::Bucket::set_max(uint64_t m) {
- if (remain > m || max == 0)
+ if (remain > m || 0 == m) {
remain = m;
+ }
max = m;
}
: m_cct(cct), m_throttle(m_cct, "token_bucket_throttle", capacity),
m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
m_lock("token_bucket_throttle_lock")
-{
- std::lock_guard<Mutex> timer_locker(*m_timer_lock);
- schedule_timer();
-}
+{}
-TokenBucketThrottle::~TokenBucketThrottle()
-{
+TokenBucketThrottle::~TokenBucketThrottle() {
// cancel the timer events.
{
std::lock_guard<Mutex> timer_locker(*m_timer_lock);
}
}
-void TokenBucketThrottle::set_max(uint64_t m) {
+int TokenBucketThrottle::set_burst(uint64_t burst){
std::lock_guard<Mutex> lock(m_lock);
- m_throttle.set_max(m);
+ if (0 < burst && burst < m_avg) {
+ // the burst should never less than the average.
+ return -EINVAL;
+ } else {
+ m_burst = burst;
+ }
+ // for the default configuration of burst.
+ m_throttle.set_max(0 == m_burst ? m_avg : m_burst);
+ return 0;
+}
+
+int TokenBucketThrottle::set_average(uint64_t avg) {
+ {
+ std::lock_guard<Mutex> lock(m_lock);
+ m_avg = avg;
+
+ if (0 < m_burst && m_burst < avg) {
+ // the burst should never less than the average.
+ return -EINVAL;
+ } else if (0 == avg) {
+ // The limit is not set, and no tokens will be put into the bucket.
+ // So, we can schedule the timer slowly, or even cancel it.
+ m_tick = 1000;
+ } else {
+ // calculate the tick(ms), don't less than the minimum.
+ m_tick = 1000 / avg;
+ if (m_tick < m_tick_min) {
+ m_tick = m_tick_min;
+ }
+
+ // this is for the number(avg) can not be divisible.
+ m_ticks_per_second = 1000 / m_tick;
+ m_current_tick = 0;
+
+ // for the default configuration of burst.
+ if (0 == m_burst) {
+ m_throttle.set_max(m_avg);
+ }
+ }
+ // turn millisecond to second
+ m_schedule_tick = m_tick / 1000.0;
+ }
+
+ // The schedule period will be changed when the average rate is set.
+ {
+ std::lock_guard<Mutex> timer_locker(*m_timer_lock);
+ cancel_timer();
+ schedule_timer();
+ }
+ return 0;
+}
+
+uint64_t TokenBucketThrottle::tokens_filled(double tick) {
+ return (0 == m_avg) ? 0 : (tick / m_ticks_per_second * m_avg);
}
-void TokenBucketThrottle::set_average(uint64_t avg) {
- m_avg = avg;
+uint64_t TokenBucketThrottle::tokens_this_tick() {
+ if (0 == m_avg) {
+ return 0;
+ }
+ if (m_current_tick >= m_ticks_per_second) {
+ m_current_tick = 0;
+ }
+ m_current_tick++;
+
+ return tokens_filled(m_current_tick) - tokens_filled(m_current_tick - 1);
}
void TokenBucketThrottle::add_tokens() {
list<Blocker> tmp_blockers;
{
- // put m_avg tokens into bucket.
std::lock_guard<Mutex> lock(m_lock);
- m_throttle.put(m_avg);
+ // put tokens into bucket.
+ m_throttle.put(tokens_this_tick());
// check the m_blockers from head to tail, if blocker can get
// enough tokens, let it go.
while (!m_blockers.empty()) {
Blocker blocker = m_blockers.front();
uint64_t got = m_throttle.get(blocker.tokens_requested);
if (got == blocker.tokens_requested) {
- // got enough tokens for front.
- tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
+ // got enough tokens for front.
+ tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
} else {
- // there is no more tokens.
- blocker.tokens_requested -= got;
- break;
+ // there is no more tokens.
+ blocker.tokens_requested -= got;
+ break;
}
}
}
}
void TokenBucketThrottle::schedule_timer() {
- add_tokens();
-
m_token_ctx = new FunctionContext(
[this](int r) {
schedule_timer();
});
+ m_timer->add_event_after(m_schedule_tick, m_token_ctx);
- m_timer->add_event_after(1, m_token_ctx);
+ add_tokens();
}
void TokenBucketThrottle::cancel_timer() {
class TokenBucketThrottle {
-
struct Bucket {
CephContext *cct;
const std::string name;
- std::atomic<uint64_t> remain = { 0 }, max = { 0 };
- Bucket(CephContext *cct, const std::string& n, uint64_t m)
- : cct(cct), name(n),
- remain(m), max(m)
- {
- }
+ uint64_t remain;
+ uint64_t max;
+
+ Bucket(CephContext *cct, const std::string &name, uint64_t m)
+ : cct(cct), name(name), remain(m), max(m) {}
uint64_t get(uint64_t c);
uint64_t put(uint64_t c);
CephContext *m_cct;
Bucket m_throttle;
uint64_t m_avg = 0;
+ uint64_t m_burst = 0;
SafeTimer *m_timer;
Mutex *m_timer_lock;
FunctionContext *m_token_ctx = nullptr;
list<Blocker> m_blockers;
Mutex m_lock;
+ // minimum of the filling period.
+ static const uint64_t m_tick_min = 50;
+ // tokens filling period, its unit is millisecond.
+ uint64_t m_tick = 0;
+ /**
+ * These variables are used to calculate how many tokens need to be put into
+ * the bucket within each tick.
+ *
+ * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second)
+ * may be a floating point number, but we need an 'uint64_t' to put into the
+ * bucket.
+ *
+ * For example, we set the value of rate to be 950, means 950 iops(or bps).
+ *
+ * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052,
+ * which is too small for the SafeTimer. So we should set the period(m_tick)
+ * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second).
+ * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer.
+ *
+ * To resolve this, we use a method called tokens_filled(m_current_tick) to
+ * calculate how many tokens will be put so far(until m_current_tick):
+ *
+ * tokens_filled = m_current_tick / m_ticks_per_second * m_avg
+ *
+ * And the difference between two ticks will be the result we expect.
+ * tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) = 47 - 0 = 47
+ * tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) = 95 - 47 = 48
+ * tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 - 95 = 47
+ *
+ * As a result, the tokens filled in one second will shown as this:
+ * tick | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20|
+ * tokens |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|
+ */
+ uint64_t m_ticks_per_second = 0;
+ uint64_t m_current_tick = 0;
+
+ // period for the bucket filling tokens, its unit is seconds.
+ double m_schedule_tick = 1.0;
+
public:
TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg,
- SafeTimer *timer, Mutex *timer_lock);
+ SafeTimer *timer, Mutex *timer_lock);
~TokenBucketThrottle();
template <typename T, typename I, void(T::*MF)(int, I*, uint64_t)>
bool get(uint64_t c, T *handler, I *item, uint64_t flag) {
- if (0 == m_throttle.max)
+ if (0 == m_avg)
return false;
bool wait = false;
return wait;
}
- void set_max(uint64_t m);
- void set_average(uint64_t avg);
+ int set_burst(uint64_t burst);
+ int set_average(uint64_t avg);
private:
+ uint64_t tokens_filled(double tick);
+ uint64_t tokens_this_tick();
void add_tokens();
void schedule_timer();
void cancel_timer();