]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
use throttle framework to throttle ops/bytes for keyvaluestore
authorxinxin shu <xinxin.shu@intel.com>
Thu, 23 Jul 2015 02:21:57 +0000 (10:21 +0800)
committerxinxin shu <xinxin.shu@intel.com>
Fri, 31 Jul 2015 06:09:27 +0000 (14:09 +0800)
Signed-off-by: xinxin shu <xinxin.shu@intel.com>
src/os/KeyValueStore.cc
src/os/KeyValueStore.h

index 390113470eacce786ff47bcff2f4032a25cc5632..5abe3186c81021fe18d2c698840498019a0e98e6 100644 (file)
@@ -528,8 +528,8 @@ KeyValueStore::KeyValueStore(const std::string &base,
   collections_lock("KeyValueStore::collections_lock"),
   lock("KeyValueStore::lock"),
   default_osr("default"),
-  op_queue_len(0), op_queue_bytes(0),
-  op_throttle_lock("KeyValueStore::op_throttle_lock"),
+  throttle_ops(g_ceph_context, "keyvaluestore_ops", g_conf->keyvaluestore_queue_max_ops),
+  throttle_bytes(g_ceph_context, "keyvaluestore_bytes", g_conf->keyvaluestore_queue_max_bytes),
   op_finisher(g_ceph_context),
   op_tp(g_ceph_context, "KeyValueStore::op_tp",
         g_conf->keyvaluestore_op_threads, "keyvaluestore_op_threads"),
@@ -1101,8 +1101,8 @@ void KeyValueStore::queue_op(OpSequencer *osr, Op *o)
   perf_logger->inc(l_os_bytes, o->bytes);
 
   dout(5) << "queue_op " << o << " seq " << o->op << " " << *osr << " "
-          << o->bytes << " bytes" << "   (queue has " << op_queue_len
-          << " ops and " << op_queue_bytes << " bytes)" << dendl;
+          << o->bytes << " bytes" << "   (queue has " << throttle_ops.get_current()
+          << " ops and " << throttle_bytes.get_current() << " bytes)" << dendl;
   op_wq.queue(osr);
 }
 
@@ -1115,42 +1115,32 @@ void KeyValueStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handl
   perf_logger->set(l_os_oq_max_bytes, max_bytes);
 
   utime_t start = ceph_clock_now(g_ceph_context);
-  {
-    Mutex::Locker l(op_throttle_lock);
-    while ((max_ops && (op_queue_len + 1) > max_ops) ||
-           (max_bytes && op_queue_bytes      // let single large ops through!
-           && (op_queue_bytes + o->bytes) > max_bytes)) {
-      dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops
-              << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes
-              << dendl;
-      if (handle)
-        handle->suspend_tp_timeout();
-      op_throttle_cond.Wait(op_throttle_lock);
-      if (handle)
-        handle->reset_tp_timeout();
-    }
+  if (handle)
+    handle->suspend_tp_timeout();
+  if (throttle_ops.should_wait(1) ||
+    (throttle_bytes.get_current()      // let single large ops through!
+    && throttle_bytes.should_wait(o->bytes))) {
+    dout(2) << "waiting " << throttle_ops.get_current() + 1 << " > " << max_ops << " ops || " 
+      << throttle_bytes.get_current() + o->bytes << " > " << max_bytes << dendl;
+  }
+  throttle_ops.get();
+  throttle_bytes.get(o->bytes);
+  if (handle)
+    handle->reset_tp_timeout();
 
-    op_queue_len++;
-    op_queue_bytes += o->bytes;
-  }
   utime_t end = ceph_clock_now(g_ceph_context);
   perf_logger->tinc(l_os_queue_lat, end - start);
 
-  perf_logger->set(l_os_oq_ops, op_queue_len);
-  perf_logger->set(l_os_oq_bytes, op_queue_bytes);
+  perf_logger->set(l_os_oq_ops, throttle_ops.get_current());
+  perf_logger->set(l_os_oq_bytes, throttle_bytes.get_current());
 }
 
 void KeyValueStore::op_queue_release_throttle(Op *o)
 {
-  {
-    Mutex::Locker l(op_throttle_lock);
-    op_queue_len--;
-    op_queue_bytes -= o->bytes;
-    op_throttle_cond.Signal();
-  }
-
-  perf_logger->set(l_os_oq_ops, op_queue_len);
-  perf_logger->set(l_os_oq_bytes, op_queue_bytes);
+  throttle_ops.put();
+  throttle_bytes.put(o->bytes);
+  perf_logger->set(l_os_oq_ops, throttle_ops.get_current());
+  perf_logger->set(l_os_oq_bytes, throttle_bytes.get_current());
 }
 
 void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
index 3c857420b5dd5dfe73325beb042935a2b45b1558..fcb6f9395788ab32d81cfcea013efd9fdf3c8495 100644 (file)
@@ -416,9 +416,7 @@ class KeyValueStore : public ObjectStore,
 
   Sequencer default_osr;
   deque<OpSequencer*> op_queue;
-  uint64_t op_queue_len, op_queue_bytes;
-  Cond op_throttle_cond;
-  Mutex op_throttle_lock;
+  Throttle throttle_ops, throttle_bytes;
   Finisher op_finisher;
 
   ThreadPool op_tp;