]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: limit IO per second by TokenBucketThrottle 17032/head
authorDongsheng Yang <dongsheng.yang@easystack.cn>
Tue, 12 Sep 2017 16:39:50 +0000 (00:39 +0800)
committerDongsheng Yang <dongsheng.yang@easystack.cn>
Tue, 14 Nov 2017 01:19:00 +0000 (09:19 +0800)
Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
qa/suites/rbd/thrash/workloads/rbd_fsx_rate_limit.yaml [new file with mode: 0644]
src/common/options.cc
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/io/ImageRequest.h
src/librbd/io/ImageRequestWQ.cc
src/librbd/io/ImageRequestWQ.h
src/test/librbd/io/test_mock_ImageRequestWQ.cc

diff --git a/qa/suites/rbd/thrash/workloads/rbd_fsx_rate_limit.yaml b/qa/suites/rbd/thrash/workloads/rbd_fsx_rate_limit.yaml
new file mode 100644 (file)
index 0000000..6e07e59
--- /dev/null
@@ -0,0 +1,9 @@
+tasks:
+- rbd_fsx:
+    clients: [client.0]
+    ops: 6000
+overrides:
+  ceph:
+    conf:
+      client:
+        rbd qos iops limit: 50
index 111ab63502ac7bdf14c959a9ffcb1736cefced2f..738be272f4e40bba6c79779ee5416429349004d4 100644 (file)
@@ -5419,6 +5419,10 @@ static std::vector<Option> get_rbd_options() {
     Option("rbd_journal_max_concurrent_object_sets", Option::TYPE_INT, Option::LEVEL_ADVANCED)
     .set_default(0)
     .set_description("maximum number of object sets a journal client can be behind before it is automatically unregistered"),
+
+    Option("rbd_qos_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(0)
+    .set_description("the desired limit of IO operations per second"),
   });
 }
 
index 2008a07a7695b3eaeda6544a42fe5093ac6b379f..c068d84ee10684e13fc00eab6acd7aeaba06e8ff 100644 (file)
@@ -1003,7 +1003,8 @@ struct C_InvalidateCache : public Context {
         "rbd_journal_max_concurrent_object_sets", false)(
         "rbd_mirroring_resync_after_disconnect", false)(
         "rbd_mirroring_replay_delay", false)(
-        "rbd_skip_partial_discard", false);
+        "rbd_skip_partial_discard", false)(
+       "rbd_qos_iops_limit", false);
 
     md_config_t local_config_t;
     std::map<std::string, bufferlist> res;
@@ -1064,6 +1065,7 @@ struct C_InvalidateCache : public Context {
     ASSIGN_OPTION(mirroring_replay_delay, int64_t);
     ASSIGN_OPTION(skip_partial_discard, bool);
     ASSIGN_OPTION(blkin_trace_all, bool);
+    ASSIGN_OPTION(qos_iops_limit, uint64_t);
 
     if (thread_safe) {
       ASSIGN_OPTION(journal_pool, std::string);
@@ -1072,6 +1074,8 @@ struct C_InvalidateCache : public Context {
     if (sparse_read_threshold_bytes == 0) {
       sparse_read_threshold_bytes = get_object_size();
     }
+
+    io_work_queue->apply_qos_iops_limit(qos_iops_limit);
   }
 
   ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
index ed77e339d64421377a23c59703f674ea3874f159..e5a6a3d91fc31d4a3e12e655914fe53f12f0b029 100644 (file)
@@ -198,6 +198,7 @@ namespace librbd {
     int mirroring_replay_delay;
     bool skip_partial_discard;
     bool blkin_trace_all;
+    uint64_t qos_iops_limit;
 
     LibrbdAdminSocketHook *asok_hook;
 
index 0e7f32b1fac92003dc6f4a0bafd97b29a603bc3a..1abada16b5844a3be7723f3276b8980dfc984a79 100644 (file)
@@ -99,6 +99,14 @@ public:
     return m_trace;
   }
 
+  bool was_throttled() {
+    return m_throttled;
+  }
+
+  void set_throttled() {
+    m_throttled = true;
+  }
+
 protected:
   typedef std::list<ObjectRequestHandle *> ObjectRequests;
 
@@ -107,6 +115,7 @@ protected:
   Extents m_image_extents;
   ZTracer::Trace m_trace;
   bool m_bypass_image_cache = false;
+  bool m_throttled = false;
 
   ImageRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
                Extents &&image_extents, const char *trace_name,
index 7f479f45e96cb7a589eb79d1eb87201d7f6a0d26..9970f8fe3725e09af7ea7af312610545321b93e6 100644 (file)
@@ -69,9 +69,21 @@ ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
     m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+
+  SafeTimer *timer;
+  Mutex *timer_lock;
+  ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
+
+  iops_throttle = new TokenBucketThrottle(
+      cct, 0, 0, timer, timer_lock);
   this->register_work_queue();
 }
 
+template <typename I>
+ImageRequestWQ<I>::~ImageRequestWQ() {
+  delete iops_throttle;
+}
+
 template <typename I>
 ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
                                ReadResult &&read_result, int op_flags) {
@@ -541,6 +553,21 @@ void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
   }
 }
 
+template <typename I>
+void ImageRequestWQ<I>::apply_qos_iops_limit(uint64_t limit) {
+  iops_throttle->set_max(limit);
+  iops_throttle->set_average(limit);
+}
+
+template <typename I>
+void ImageRequestWQ<I>::handle_iops_throttle_ready(int r, ImageRequest<I> *item) {
+  assert(m_io_blockers.load() > 0);
+  --m_io_blockers;
+  item->set_throttled();
+  this->requeue(item);
+  this->signal();
+}
+
 template <typename I>
 void *ImageRequestWQ<I>::_void_dequeue() {
   CephContext *cct = m_image_ctx.cct;
@@ -574,6 +601,14 @@ void *ImageRequestWQ<I>::_void_dequeue() {
     ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue());
   assert(peek_item == item);
 
+  if (!item->was_throttled() &&
+       iops_throttle->get<ImageRequestWQ<I>, ImageRequest<I>,
+           &ImageRequestWQ<I>::handle_iops_throttle_ready>(1, this, item)) {
+    // io was queued into blockers list and wait for tokens.
+    ++m_io_blockers;
+    return nullptr;
+  }
+
   if (lock_required) {
     this->get_pool_lock().Unlock();
     m_image_ctx.owner_lock.get_read();
index 5c5fc911c0f74197f8ed1c0cca138a15dc14b2fa..fd6bec30dcd8b3992fb48707f38f27b969348067 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "include/Context.h"
 #include "common/RWLock.h"
+#include "common/Throttle.h"
 #include "common/WorkQueue.h"
 #include "librbd/io/Types.h"
 
@@ -28,6 +29,7 @@ class ImageRequestWQ
 public:
   ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
                  ThreadPool *tp);
+  ~ImageRequestWQ();
 
   ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result,
                int op_flags);
@@ -70,6 +72,8 @@ public:
 
   void set_require_lock(Direction direction, bool enabled);
 
+  void apply_qos_iops_limit(uint64_t limit);
+
 protected:
   void *_void_dequeue() override;
   void process(ImageRequest<ImageCtxT> *req) override;
@@ -93,6 +97,8 @@ private:
   std::atomic<unsigned> m_in_flight_writes { 0 };
   std::atomic<unsigned> m_io_blockers { 0 };
 
+  TokenBucketThrottle *iops_throttle;
+
   bool m_shutdown = false;
   Context *m_on_shutdown = nullptr;
 
@@ -119,6 +125,8 @@ private:
   void handle_acquire_lock(int r, ImageRequest<ImageCtxT> *req);
   void handle_refreshed(int r, ImageRequest<ImageCtxT> *req);
   void handle_blocked_writes(int r);
+
+  void handle_iops_throttle_ready(int r, ImageRequest<ImageCtxT> *item);
 };
 
 } // namespace io
index 5222c0eb6c687eaff9c150cf84f542f9bfbf8ca7..f58d7a603eb2b403bfb502a1b8b1454c3d6b377a 100644 (file)
@@ -44,6 +44,8 @@ struct ImageRequest<librbd::MockTestImageCtx> {
   MOCK_CONST_METHOD0(start_op, void());
   MOCK_CONST_METHOD0(send, void());
   MOCK_CONST_METHOD1(fail, void(int));
+  MOCK_CONST_METHOD0(was_throttled, bool());
+  MOCK_CONST_METHOD0(set_throttled, void());
 
   ImageRequest() {
     s_instance = this;