]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Throttle: add a new TokenBucketThrottle
authorDongsheng Yang <dongsheng.yang@easystack.cn>
Thu, 3 Aug 2017 07:21:23 +0000 (15:21 +0800)
committerDongsheng Yang <dongsheng.yang@easystack.cn>
Wed, 8 Nov 2017 02:34:17 +0000 (10:34 +0800)
Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
src/common/Throttle.cc
src/common/Throttle.h

index d0d3f612e33c4acefb8617b36503e5f34df624d6..9b30ef3ed3c4eaadca92e98b26c23a0909330a08 100644 (file)
@@ -650,3 +650,127 @@ void OrderedThrottle::complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l) {
     ++m_complete_tid;
   }
 }
+
+uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) {
+  if (0 == max) {
+    return 0;
+  }
+
+  uint64_t got = 0;
+  if (remain >= c) {
+    // There is enough token in bucket, take c.
+    got = c;
+    remain -= c;
+  } else {
+    // There is not enough, take all remain.
+    got = remain;
+    remain = 0;
+  }
+  return got;
+}
+
+uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) {
+  if (0 == max) {
+    return 0;
+  }
+  if (c) {
+    // put c tokens into bucket
+    uint64_t current = remain;
+    if ((current + c) <= (uint64_t)max) {
+      remain += c;
+    } else {
+      remain = (uint64_t)max;
+    }
+  }
+  return remain;
+}
+
+void TokenBucketThrottle::Bucket::set_max(uint64_t m) {
+  if (remain > m)
+    remain = m;
+  max = m;
+}
+
+TokenBucketThrottle::TokenBucketThrottle(
+    CephContext *cct,
+    uint64_t capacity,
+    uint64_t avg,
+    SafeTimer *timer,
+    Mutex *timer_lock)
+  : m_cct(cct), m_throttle(m_cct, "token_bucket_throttle", capacity),
+    m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
+    m_lock("token_bucket_throttle_lock")
+{
+  Mutex::Locker timer_locker(*m_timer_lock);
+  schedule_timer();
+}
+
+TokenBucketThrottle::~TokenBucketThrottle()
+{
+  // cancel the timer events.
+  {
+    Mutex::Locker timer_locker(*m_timer_lock);
+    cancel_timer();
+  }
+
+  list<Blocker> tmp_blockers;
+  {
+    Mutex::Locker blockers_lock(m_lock);
+    tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end());
+  }
+
+  for (auto b : tmp_blockers) {
+    b.ctx->complete(0);
+  }
+}
+
+void TokenBucketThrottle::set_max(uint64_t m) {
+  Mutex::Locker lock(m_lock);
+  m_throttle.set_max(m);
+}
+
+void TokenBucketThrottle::set_average(uint64_t avg) {
+  m_avg = avg;
+}
+
+void TokenBucketThrottle::add_tokens() {
+  list<Blocker> tmp_blockers;
+  {
+    // put m_avg tokens into bucket.
+    Mutex::Locker lock(m_lock);
+    m_throttle.put(m_avg);
+    // check the m_blockers from head to tail, if blocker can get
+    // enough tokens, let it go.
+    while (!m_blockers.empty()) {
+      Blocker blocker = m_blockers.front();
+      uint64_t got = m_throttle.get(blocker.tokens_requested);
+      if (got == blocker.tokens_requested) {
+       // got enough tokens for front.
+       tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
+      } else {
+       // there is no more tokens.
+       blocker.tokens_requested -= got;
+       break;
+      }
+    }
+  }
+
+  for (auto b : tmp_blockers) {
+    b.ctx->complete(0);
+  }
+}
+
+void TokenBucketThrottle::schedule_timer() {
+  add_tokens();
+
+  m_token_ctx = new FunctionContext(
+      [this](int r) {
+        schedule_timer();
+      });
+
+  m_timer->add_event_after(1, m_token_ctx);
+}
+
+void TokenBucketThrottle::cancel_timer() {
+  m_timer->cancel_event(m_token_ctx);
+}
index 162e597a655bb3f15372efb01e2b01d7129afb60..9081c006b77476cadc6f37fad9d431c6bc3609f3 100644 (file)
@@ -13,6 +13,7 @@
 #include <mutex>
 
 #include "include/Context.h"
+#include "common/Timer.h"
 #include "common/convenience.h"
 #include "common/perf_counters.h"
 
@@ -330,4 +331,76 @@ private:
   uint32_t waiters = 0;
 };
 
+
+class TokenBucketThrottle {
+
+  struct Bucket {
+    CephContext *cct;
+    const std::string name;
+    std::atomic<uint64_t> remain = { 0 }, max = { 0 };
+
+    Bucket(CephContext *cct, const std::string& n, uint64_t m)
+      : cct(cct), name(n),
+       remain(m), max(m)
+    {
+    }
+
+    uint64_t get(uint64_t c);
+    uint64_t put(uint64_t c);
+    void set_max(uint64_t m);
+  };
+
+  struct Blocker {
+    uint64_t tokens_requested;
+    Context *ctx;
+
+    Blocker(uint64_t _tokens_requested, Context* _ctx)
+      : tokens_requested(_tokens_requested), ctx(_ctx) {}
+  };
+
+  CephContext *m_cct;
+  Bucket m_throttle;
+  uint64_t m_avg = 0;
+  SafeTimer *m_timer;
+  Mutex *m_timer_lock;
+  FunctionContext *m_token_ctx = nullptr;
+  list<Blocker> m_blockers;
+  Mutex m_lock;
+
+public:
+  TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg,
+                   SafeTimer *timer, Mutex *timer_lock);
+  
+  ~TokenBucketThrottle();
+  
+  template <typename T, typename I, void(T::*MF)(int, I*)>
+  bool get(uint64_t c, T *handler, I *item) {
+    if (0 == m_throttle.max)
+      return false;
+  
+    bool waited = false;
+  
+    Mutex::Locker lock(m_lock);
+    uint64_t got = m_throttle.get(c);
+    if (got < c) {
+      // Not enough tokens, add a blocker for it.
+      Context *ctx = new FunctionContext([this, handler, item](int r) {
+       (handler->*MF)(r, item);
+        });
+      m_blockers.emplace_back(c - got, ctx);
+      waited = true;
+    }
+    return waited;
+  }
+  
+  
+  void set_max(uint64_t m);
+  void set_average(uint64_t avg);
+
+private:
+  void add_tokens();
+  void schedule_timer();
+  void cancel_timer();
+};
+
 #endif