fdcache(g_ceph_context),
wbthrottle(g_ceph_context),
default_osr("default"),
- op_queue_len(0), op_queue_bytes(0),
- op_throttle_lock("FileStore::op_throttle_lock"),
+ throttle_ops(g_ceph_context, "filestore_ops",g_conf->filestore_queue_max_ops),
+ throttle_bytes(g_ceph_context, "filestore_bytes",g_conf->filestore_queue_max_bytes),
op_finisher(g_ceph_context),
op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
dout(5) << "queue_op " << o << " seq " << o->op
<< " " << *osr
<< " " << o->bytes << " bytes"
- << " (queue has " << op_queue_len << " ops and " << op_queue_bytes << " bytes)"
+ << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
<< dendl;
op_wq.queue(osr);
}
logger->set(l_os_oq_max_bytes, max_bytes);
utime_t start = ceph_clock_now(g_ceph_context);
- {
- Mutex::Locker l(op_throttle_lock);
- while ((max_ops && (op_queue_len + 1) > max_ops) ||
- (max_bytes && op_queue_bytes // let single large ops through!
- && (op_queue_bytes + o->bytes) > max_bytes)) {
- dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || "
- << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
- if (handle)
- handle->suspend_tp_timeout();
- op_throttle_cond.Wait(op_throttle_lock);
- if (handle)
- handle->reset_tp_timeout();
- }
+ if (handle)
+ handle->suspend_tp_timeout();
+ if (throttle_ops.should_wait(1) ||
+ (throttle_bytes.get_current() // let single large ops through!
+ && throttle_bytes.should_wait(o->bytes))) {
+ dout(2) << "waiting " << throttle_ops.get_current() + 1 << " > " << max_ops << " ops || "
+ << throttle_bytes.get_current() + o->bytes << " > " << max_bytes << dendl;
+ }
+ throttle_ops.get();
+ throttle_bytes.get(o->bytes);
+ if (handle)
+ handle->reset_tp_timeout();
- op_queue_len++;
- op_queue_bytes += o->bytes;
- }
utime_t end = ceph_clock_now(g_ceph_context);
logger->tinc(l_os_queue_lat, end - start);
- logger->set(l_os_oq_ops, op_queue_len);
- logger->set(l_os_oq_bytes, op_queue_bytes);
+ logger->set(l_os_oq_ops, throttle_ops.get_current());
+ logger->set(l_os_oq_bytes, throttle_bytes.get_current());
}
void FileStore::op_queue_release_throttle(Op *o)
{
- {
- Mutex::Locker l(op_throttle_lock);
- op_queue_len--;
- op_queue_bytes -= o->bytes;
- op_throttle_cond.Signal();
- }
-
- logger->set(l_os_oq_ops, op_queue_len);
- logger->set(l_os_oq_bytes, op_queue_bytes);
+ throttle_ops.put();
+ throttle_bytes.put(o->bytes);
+ logger->set(l_os_oq_ops, throttle_ops.get_current());
+ logger->set(l_os_oq_bytes, throttle_bytes.get_current());
}
void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)