collections_lock("KeyValueStore::collections_lock"),
lock("KeyValueStore::lock"),
default_osr("default"),
- op_queue_len(0), op_queue_bytes(0),
- op_throttle_lock("KeyValueStore::op_throttle_lock"),
+ throttle_ops(g_ceph_context, "keyvaluestore_ops", g_conf->keyvaluestore_queue_max_ops),
+ throttle_bytes(g_ceph_context, "keyvaluestore_bytes", g_conf->keyvaluestore_queue_max_bytes),
op_finisher(g_ceph_context),
op_tp(g_ceph_context, "KeyValueStore::op_tp",
g_conf->keyvaluestore_op_threads, "keyvaluestore_op_threads"),
perf_logger->inc(l_os_bytes, o->bytes);
dout(5) << "queue_op " << o << " seq " << o->op << " " << *osr << " "
- << o->bytes << " bytes" << " (queue has " << op_queue_len
- << " ops and " << op_queue_bytes << " bytes)" << dendl;
+ << o->bytes << " bytes" << " (queue has " << throttle_ops.get_current()
+ << " ops and " << throttle_bytes.get_current() << " bytes)" << dendl;
op_wq.queue(osr);
}
perf_logger->set(l_os_oq_max_bytes, max_bytes);
utime_t start = ceph_clock_now(g_ceph_context);
- {
- Mutex::Locker l(op_throttle_lock);
- while ((max_ops && (op_queue_len + 1) > max_ops) ||
- (max_bytes && op_queue_bytes // let single large ops through!
- && (op_queue_bytes + o->bytes) > max_bytes)) {
- dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops
- << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes
- << dendl;
- if (handle)
- handle->suspend_tp_timeout();
- op_throttle_cond.Wait(op_throttle_lock);
- if (handle)
- handle->reset_tp_timeout();
- }
+ if (handle)
+ handle->suspend_tp_timeout();
+ if (throttle_ops.should_wait(1) ||
+ (throttle_bytes.get_current() // let single large ops through!
+ && throttle_bytes.should_wait(o->bytes))) {
+ dout(2) << "waiting " << throttle_ops.get_current() + 1 << " > " << max_ops << " ops || "
+ << throttle_bytes.get_current() + o->bytes << " > " << max_bytes << dendl;
+ }
+ throttle_ops.get();
+ throttle_bytes.get(o->bytes);
+ if (handle)
+ handle->reset_tp_timeout();
- op_queue_len++;
- op_queue_bytes += o->bytes;
- }
utime_t end = ceph_clock_now(g_ceph_context);
perf_logger->tinc(l_os_queue_lat, end - start);
- perf_logger->set(l_os_oq_ops, op_queue_len);
- perf_logger->set(l_os_oq_bytes, op_queue_bytes);
+ perf_logger->set(l_os_oq_ops, throttle_ops.get_current());
+ perf_logger->set(l_os_oq_bytes, throttle_bytes.get_current());
}
void KeyValueStore::op_queue_release_throttle(Op *o)
{
- {
- Mutex::Locker l(op_throttle_lock);
- op_queue_len--;
- op_queue_bytes -= o->bytes;
- op_throttle_cond.Signal();
- }
-
- perf_logger->set(l_os_oq_ops, op_queue_len);
- perf_logger->set(l_os_oq_bytes, op_queue_bytes);
+ throttle_ops.put();
+ throttle_bytes.put(o->bytes);
+ perf_logger->set(l_os_oq_ops, throttle_ops.get_current());
+ perf_logger->set(l_os_oq_bytes, throttle_bytes.get_current());
}
void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)