From: xinxin shu Date: Thu, 23 Jul 2015 02:21:57 +0000 (+0800) Subject: use throttle framework to throttle ops/bytes for keyvaluestore X-Git-Tag: v9.1.0~375^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1cc9cc850814720083b2235fce71714bba15023d;p=ceph.git use throttle framework to throttle ops/bytes for keyvaluestore Signed-off-by: xinxin shu --- diff --git a/src/os/KeyValueStore.cc b/src/os/KeyValueStore.cc index 390113470eac..5abe3186c810 100644 --- a/src/os/KeyValueStore.cc +++ b/src/os/KeyValueStore.cc @@ -528,8 +528,8 @@ KeyValueStore::KeyValueStore(const std::string &base, collections_lock("KeyValueStore::collections_lock"), lock("KeyValueStore::lock"), default_osr("default"), - op_queue_len(0), op_queue_bytes(0), - op_throttle_lock("KeyValueStore::op_throttle_lock"), + throttle_ops(g_ceph_context, "keyvaluestore_ops", g_conf->keyvaluestore_queue_max_ops), + throttle_bytes(g_ceph_context, "keyvaluestore_bytes", g_conf->keyvaluestore_queue_max_bytes), op_finisher(g_ceph_context), op_tp(g_ceph_context, "KeyValueStore::op_tp", g_conf->keyvaluestore_op_threads, "keyvaluestore_op_threads"), @@ -1101,8 +1101,8 @@ void KeyValueStore::queue_op(OpSequencer *osr, Op *o) perf_logger->inc(l_os_bytes, o->bytes); dout(5) << "queue_op " << o << " seq " << o->op << " " << *osr << " " - << o->bytes << " bytes" << " (queue has " << op_queue_len - << " ops and " << op_queue_bytes << " bytes)" << dendl; + << o->bytes << " bytes" << " (queue has " << throttle_ops.get_current() + << " ops and " << throttle_bytes.get_current() << " bytes)" << dendl; op_wq.queue(osr); } @@ -1115,42 +1115,32 @@ void KeyValueStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handl perf_logger->set(l_os_oq_max_bytes, max_bytes); utime_t start = ceph_clock_now(g_ceph_context); - { - Mutex::Locker l(op_throttle_lock); - while ((max_ops && (op_queue_len + 1) > max_ops) || - (max_bytes && op_queue_bytes // let single large ops through! - && (op_queue_bytes + o->bytes) > max_bytes)) { - dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops - << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes - << dendl; - if (handle) - handle->suspend_tp_timeout(); - op_throttle_cond.Wait(op_throttle_lock); - if (handle) - handle->reset_tp_timeout(); - } + if (handle) + handle->suspend_tp_timeout(); + if (throttle_ops.should_wait(1) || + (throttle_bytes.get_current() // let single large ops through! + && throttle_bytes.should_wait(o->bytes))) { + dout(2) << "waiting " << throttle_ops.get_current() + 1 << " > " << max_ops << " ops || " + << throttle_bytes.get_current() + o->bytes << " > " << max_bytes << dendl; + } + throttle_ops.get(); + throttle_bytes.get(o->bytes); + if (handle) + handle->reset_tp_timeout(); - op_queue_len++; - op_queue_bytes += o->bytes; - } utime_t end = ceph_clock_now(g_ceph_context); perf_logger->tinc(l_os_queue_lat, end - start); - perf_logger->set(l_os_oq_ops, op_queue_len); - perf_logger->set(l_os_oq_bytes, op_queue_bytes); + perf_logger->set(l_os_oq_ops, throttle_ops.get_current()); + perf_logger->set(l_os_oq_bytes, throttle_bytes.get_current()); } void KeyValueStore::op_queue_release_throttle(Op *o) { - { - Mutex::Locker l(op_throttle_lock); - op_queue_len--; - op_queue_bytes -= o->bytes; - op_throttle_cond.Signal(); - } - - perf_logger->set(l_os_oq_ops, op_queue_len); - perf_logger->set(l_os_oq_bytes, op_queue_bytes); + throttle_ops.put(); + throttle_bytes.put(o->bytes); + perf_logger->set(l_os_oq_ops, throttle_ops.get_current()); + perf_logger->set(l_os_oq_bytes, throttle_bytes.get_current()); } void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) diff --git a/src/os/KeyValueStore.h b/src/os/KeyValueStore.h index 3c857420b5dd..fcb6f9395788 100644 --- a/src/os/KeyValueStore.h +++ b/src/os/KeyValueStore.h @@ -416,9 +416,7 @@ class KeyValueStore : public ObjectStore, Sequencer default_osr; deque op_queue; - uint64_t op_queue_len, op_queue_bytes; - Cond op_throttle_cond; - Mutex op_throttle_lock; + Throttle throttle_ops, throttle_bytes; Finisher op_finisher; ThreadPool op_tp;