lock("KeyValueStore::lock"),
default_osr("default"),
op_queue_len(0), op_queue_bytes(0),
+ op_throttle_lock("KeyValueStore::op_throttle_lock"),
op_finisher(g_ceph_context),
op_tp(g_ceph_context, "KeyValueStore::op_tp",
g_conf->filestore_op_threads, "keyvaluestore_op_threads"),
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
logger(NULL),
read_error_lock("KeyValueStore::read_error_lock"),
+ m_keyvaluestore_queue_max_ops(g_conf->keyvaluestore_queue_max_ops),
+ m_keyvaluestore_queue_max_bytes(g_conf->keyvaluestore_queue_max_bytes),
m_fail_eio(g_conf->filestore_fail_eio),
do_update(do_update)
{
current_op_seq_fn = sss.str();
// initialize logger
- PerfCountersBuilder plb(g_ceph_context, internal_name, 0, 1);
+ PerfCountersBuilder plb(g_ceph_context, internal_name, l_os_commit_lat, l_os_last);
+
+ plb.add_u64(l_os_oq_max_ops, "op_queue_max_ops");
+ plb.add_u64(l_os_oq_ops, "op_queue_ops");
+ plb.add_u64_counter(l_os_ops, "ops");
+ plb.add_u64(l_os_oq_max_bytes, "op_queue_max_bytes");
+ plb.add_u64(l_os_oq_bytes, "op_queue_bytes");
+ plb.add_u64_counter(l_os_bytes, "bytes");
+ plb.add_time_avg(l_os_apply_lat, "apply_latency");
+ plb.add_time_avg(l_os_queue_lat, "queue_transaction_latency_avg");
+
logger = plb.create_perf_counters();
g_ceph_context->get_perfcounters_collection()->add(logger);
}
Op *o = build_op(tls, ondisk, onreadable, onreadable_sync, osd_op);
+ op_queue_reserve_throttle(o, handle);
uint64_t op = submit_manager.op_submit_start();
o->op = op;
dout(5) << "queue_transactions (trailing journal) " << op << " "
osr->queue(o);
+ logger->inc(l_os_ops);
+ logger->inc(l_os_bytes, o->bytes);
+
dout(5) << "queue_op " << o << " seq " << o->op << " " << *osr << " "
<< o->bytes << " bytes" << " (queue has " << op_queue_len
<< " ops and " << op_queue_bytes << " bytes)" << dendl;
op_wq.queue(osr);
}
+void KeyValueStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle)
+{
+ uint64_t max_ops = m_keyvaluestore_queue_max_ops;
+ uint64_t max_bytes = m_keyvaluestore_queue_max_bytes;
+
+ logger->set(l_os_oq_max_ops, max_ops);
+ logger->set(l_os_oq_max_bytes, max_bytes);
+
+ utime_t start = ceph_clock_now(g_ceph_context);
+ {
+ Mutex::Locker l(op_throttle_lock);
+ while ((max_ops && (op_queue_len + 1) > max_ops) ||
+ (max_bytes && op_queue_bytes // let single large ops through!
+ && (op_queue_bytes + o->bytes) > max_bytes)) {
+ dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops
+ << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes
+ << dendl;
+ if (handle)
+ handle->suspend_tp_timeout();
+ op_throttle_cond.Wait(op_throttle_lock);
+ if (handle)
+ handle->reset_tp_timeout();
+ }
+
+ op_queue_len++;
+ op_queue_bytes += o->bytes;
+ }
+ utime_t end = ceph_clock_now(g_ceph_context);
+ logger->tinc(l_os_queue_lat, end - start);
+
+ logger->set(l_os_oq_ops, op_queue_len);
+ logger->set(l_os_oq_bytes, op_queue_bytes);
+}
+
+void KeyValueStore::op_queue_release_throttle(Op *o)
+{
+ {
+ Mutex::Locker l(op_throttle_lock);
+ op_queue_len--;
+ op_queue_bytes -= o->bytes;
+ op_throttle_cond.Signal();
+ }
+
+ logger->set(l_os_oq_ops, op_queue_len);
+ logger->set(l_os_oq_bytes, op_queue_bytes);
+}
+
void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
{
// inject a stall?
dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl;
osr->apply_lock.Unlock(); // locked in _do_op
+ op_queue_release_throttle(o);
utime_t lat = ceph_clock_now(g_ceph_context);
lat -= o->start;
+ logger->tinc(l_os_apply_lat, lat);
if (o->onreadable_sync) {
o->onreadable_sync->complete(0);
Sequencer default_osr;
deque<OpSequencer*> op_queue;
uint64_t op_queue_len, op_queue_bytes;
+ Cond op_throttle_cond;
+ Mutex op_throttle_lock;
Finisher op_finisher;
ThreadPool op_tp;
}
} op_wq;
- void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle);
- void _finish_op(OpSequencer *osr);
Op *build_op(list<Transaction*>& tls, Context *ondisk, Context *onreadable,
Context *onreadable_sync, TrackedOpRef osd_op);
void queue_op(OpSequencer *osr, Op *o);
+ void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL);
+ void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle);
+ void op_queue_release_throttle(Op *o);
+ void _finish_op(OpSequencer *osr);
PerfCounters *logger;
const std::set <std::string> &changed);
std::string m_osd_rollback_to_cluster_snap;
+ int m_keyvaluestore_queue_max_ops;
+ int m_keyvaluestore_queue_max_bytes;
bool m_fail_eio;
int do_update;