]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
TokenBucketThrottle: reduce the TokenBucket fill period.
authorShiyang Ruan <ruansy.fnst@cn.fujitsu.com>
Fri, 28 Sep 2018 06:26:58 +0000 (14:26 +0800)
committerShiyang Ruan <ruansy.fnst@cn.fujitsu.com>
Thu, 1 Nov 2018 05:06:56 +0000 (13:06 +0800)
TokenBucket shall put one token per 1/cir second (one schedule tick for QoS, set the minimum 50ms here).
The cir is Committed Information Rate, which is defined by TokenBucket. Also means the IOPS(or BPS) limit we set.

Signed-off-by: Shiyang Ruan <ruansy.fnst@cn.fujitsu.com>
src/common/Throttle.cc
src/common/Throttle.h
src/librbd/io/ImageRequestWQ.cc

index 9a4a10c1b1558a03ab5ab56ecc12d261f91be911..8349347cec8cbd915609ec0fc8279764d79ad549 100644 (file)
@@ -672,21 +672,23 @@ uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) {
   if (0 == max) {
     return 0;
   }
+
   if (c) {
     // put c tokens into bucket
     uint64_t current = remain;
-    if ((current + c) <= (uint64_t)max) {
+    if ((current + c) <= max) {
       remain += c;
     } else {
-      remain = (uint64_t)max;
+      remain = max;
     }
   }
   return remain;
 }
 
 void TokenBucketThrottle::Bucket::set_max(uint64_t m) {
-  if (remain > m || max == 0)
+  if (remain > m || 0 == m) {
     remain = m;
+  }
   max = m;
 }
 
@@ -699,13 +701,9 @@ TokenBucketThrottle::TokenBucketThrottle(
   : m_cct(cct), m_throttle(m_cct, "token_bucket_throttle", capacity),
     m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
     m_lock("token_bucket_throttle_lock")
-{
-  std::lock_guard<Mutex> timer_locker(*m_timer_lock);
-  schedule_timer();
-}
+{}
 
-TokenBucketThrottle::~TokenBucketThrottle()
-{
+TokenBucketThrottle::~TokenBucketThrottle() {
   // cancel the timer events.
   {
     std::lock_guard<Mutex> timer_locker(*m_timer_lock);
@@ -723,33 +721,94 @@ TokenBucketThrottle::~TokenBucketThrottle()
   }
 }
 
-void TokenBucketThrottle::set_max(uint64_t m) {
+int TokenBucketThrottle::set_burst(uint64_t burst){
   std::lock_guard<Mutex> lock(m_lock);
-  m_throttle.set_max(m);
+  if (0 < burst && burst < m_avg) {
+    // the burst should never less than the average.
+    return -EINVAL;
+  } else {
+    m_burst = burst;
+  }
+  // for the default configuration of burst.
+  m_throttle.set_max(0 == m_burst ? m_avg : m_burst);
+  return 0;
+}
+
+int TokenBucketThrottle::set_average(uint64_t avg) {
+  {
+    std::lock_guard<Mutex> lock(m_lock);
+    m_avg = avg;
+
+    if (0 < m_burst && m_burst < avg) {
+      // the burst should never less than the average.
+      return -EINVAL;
+    } else if (0 == avg) {
+      // The limit is not set, and no tokens will be put into the bucket.
+      // So, we can schedule the timer slowly, or even cancel it.
+      m_tick = 1000;
+    } else {
+      // calculate the tick(ms), don't less than the minimum.
+      m_tick = 1000 / avg;
+      if (m_tick < m_tick_min) {
+        m_tick = m_tick_min;
+      }
+
+      // this is for the number(avg) can not be divisible.
+      m_ticks_per_second = 1000 / m_tick;
+      m_current_tick = 0;
+
+      // for the default configuration of burst.
+      if (0 == m_burst) {
+        m_throttle.set_max(m_avg);
+      }
+    }
+    // turn millisecond to second
+    m_schedule_tick = m_tick / 1000.0;
+  }
+
+  // The schedule period will be changed when the average rate is set.
+  {
+    std::lock_guard<Mutex> timer_locker(*m_timer_lock);
+    cancel_timer();
+    schedule_timer();
+  }
+  return 0;
+}
+
+uint64_t TokenBucketThrottle::tokens_filled(double tick) {
+  return (0 == m_avg) ? 0 : (tick / m_ticks_per_second * m_avg);
 }
 
-void TokenBucketThrottle::set_average(uint64_t avg) {
-  m_avg = avg;
+uint64_t TokenBucketThrottle::tokens_this_tick() {
+  if (0 == m_avg) {
+    return 0;
+  }
+  if (m_current_tick >= m_ticks_per_second) {
+    m_current_tick = 0;
+  }
+  m_current_tick++;
+
+  return tokens_filled(m_current_tick) - tokens_filled(m_current_tick - 1);
 }
 
 void TokenBucketThrottle::add_tokens() {
   list<Blocker> tmp_blockers;
   {
-    // put m_avg tokens into bucket.
     std::lock_guard<Mutex> lock(m_lock);
-    m_throttle.put(m_avg);
+    // put tokens into bucket.
+    m_throttle.put(tokens_this_tick());
     // check the m_blockers from head to tail, if blocker can get
     // enough tokens, let it go.
     while (!m_blockers.empty()) {
       Blocker blocker = m_blockers.front();
       uint64_t got = m_throttle.get(blocker.tokens_requested);
       if (got == blocker.tokens_requested) {
-       // got enough tokens for front.
-       tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
+        // got enough tokens for front.
+        tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
       } else {
-       // there is no more tokens.
-       blocker.tokens_requested -= got;
-       break;
+        // there is no more tokens.
+        blocker.tokens_requested -= got;
+        break;
       }
     }
   }
@@ -760,14 +819,13 @@ void TokenBucketThrottle::add_tokens() {
 }
 
 void TokenBucketThrottle::schedule_timer() {
-  add_tokens();
-
   m_token_ctx = new FunctionContext(
       [this](int r) {
         schedule_timer();
       });
+  m_timer->add_event_after(m_schedule_tick, m_token_ctx);
 
-  m_timer->add_event_after(1, m_token_ctx);
+  add_tokens();
 }
 
 void TokenBucketThrottle::cancel_timer() {
index 82bf8264b377dfa812ef4379061a513e14369443..96d55d13c2eee64621d33d6fe5d9b4c28f9dc9b3 100644 (file)
@@ -334,17 +334,15 @@ private:
 
 
 class TokenBucketThrottle {
-
   struct Bucket {
     CephContext *cct;
     const std::string name;
-    std::atomic<uint64_t> remain = { 0 }, max = { 0 };
 
-    Bucket(CephContext *cct, const std::string& n, uint64_t m)
-      : cct(cct), name(n),
-       remain(m), max(m)
-    {
-    }
+    uint64_t remain;
+    uint64_t max;
+
+    Bucket(CephContext *cct, const std::string &name, uint64_t m)
+      : cct(cct), name(name), remain(m), max(m) {}
 
     uint64_t get(uint64_t c);
     uint64_t put(uint64_t c);
@@ -362,15 +360,55 @@ class TokenBucketThrottle {
   CephContext *m_cct;
   Bucket m_throttle;
   uint64_t m_avg = 0;
+  uint64_t m_burst = 0;
   SafeTimer *m_timer;
   Mutex *m_timer_lock;
   FunctionContext *m_token_ctx = nullptr;
   list<Blocker> m_blockers;
   Mutex m_lock;
 
+  // minimum of the filling period.
+  static const uint64_t m_tick_min = 50;
+  // tokens filling period, its unit is millisecond.
+  uint64_t m_tick = 0;
+  /**
+   * These variables are used to calculate how many tokens need to be put into
+   * the bucket within each tick.
+   *
+   * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second)
+   * may be a floating point number, but we need an 'uint64_t' to put into the
+   * bucket.
+   *
+   * For example, we set the value of rate to be 950, means 950 iops(or bps).
+   *
+   * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052,
+   * which is too small for the SafeTimer. So we should set the period(m_tick)
+   * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second).
+   * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer.
+   *
+   * To resolve this, we use a method called tokens_filled(m_current_tick) to
+   * calculate how many tokens will be put so far(until m_current_tick):
+   *
+   *   tokens_filled = m_current_tick / m_ticks_per_second * m_avg
+   *
+   * And the difference between two ticks will be the result we expect.
+   *   tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) =  47 -   0 = 47
+   *   tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) =  95 -  47 = 48
+   *   tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 -  95 = 47
+   *
+   * As a result, the tokens filled in one second will shown as this:
+   *   tick    | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20|
+   *   tokens  |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|
+   */
+  uint64_t m_ticks_per_second = 0;
+  uint64_t m_current_tick = 0;
+
+  // period for the bucket filling tokens, its unit is seconds.
+  double m_schedule_tick = 1.0;
+
 public:
   TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg,
-                   SafeTimer *timer, Mutex *timer_lock);
+                      SafeTimer *timer, Mutex *timer_lock);
   
   ~TokenBucketThrottle();
 
@@ -384,7 +422,7 @@ public:
   
   template <typename T, typename I, void(T::*MF)(int, I*, uint64_t)>
   bool get(uint64_t c, T *handler, I *item, uint64_t flag) {
-    if (0 == m_throttle.max)
+    if (0 == m_avg)
       return false;
   
     bool wait = false;
@@ -407,10 +445,12 @@ public:
     return wait;
   }
   
-  void set_max(uint64_t m);
-  void set_average(uint64_t avg);
+  int set_burst(uint64_t burst);
+  int set_average(uint64_t avg);
 
 private:
+  uint64_t tokens_filled(double tick);
+  uint64_t tokens_this_tick();
   void add_tokens();
   void schedule_timer();
   void cancel_timer();
index 393211a0dca87bb384861242349bd89a34693cc0..deeef4531cb57191320adb2efeacea702917ad26 100644 (file)
@@ -624,7 +624,7 @@ void ImageRequestWQ<I>::apply_qos_limit(uint64_t limit, const uint64_t flag) {
     }
   }
   ceph_assert(throttle != nullptr);
-  throttle->set_max(limit);
+  throttle->set_burst(limit);
   throttle->set_average(limit);
   if (limit)
     m_qos_enabled_flag |= flag;