From: Haomai Wang Date: Tue, 18 Feb 2014 14:46:22 +0000 (+0800) Subject: Move perf counter and add op queue reserve throttle X-Git-Tag: v0.78~130^2~19 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2b9e893a31ac56a95a4acb70b120d80df41ab535;p=ceph.git Move perf counter and add op queue reserve throttle The perf counter of FileStore can shared with other ObjectStore backend, so move it to ObjectStore and adjust position to let other KeyValueStore refer to. Signed-off-by: Haomai Wang --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 28f4acf07e4..a9ea73d8a0c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -650,6 +650,9 @@ OPTION(journal_dio, OPT_BOOL, true) OPTION(journal_aio, OPT_BOOL, true) OPTION(journal_force_aio, OPT_BOOL, false) +OPTION(keyvaluestore_queue_max_ops, OPT_INT, 50) +OPTION(keyvaluestore_queue_max_bytes, OPT_INT, 100 << 20) + // max bytes to search ahead in journal searching for corruption OPTION(journal_max_corrupt_search, OPT_U64, 10<<20) OPTION(journal_block_align, OPT_BOOL, true) diff --git a/src/os/FileStore.h b/src/os/FileStore.h index d30eeba74e0..9ee9999a1c3 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -63,33 +63,6 @@ static const __SWORD_TYPE XFS_SUPER_MAGIC(0x58465342); static const __SWORD_TYPE ZFS_SUPER_MAGIC(0x2fc12fc1); #endif -enum { - l_os_first = 84000, - l_os_jq_max_ops, - l_os_jq_ops, - l_os_j_ops, - l_os_jq_max_bytes, - l_os_jq_bytes, - l_os_j_bytes, - l_os_j_lat, - l_os_j_wr, - l_os_j_wr_bytes, - l_os_oq_max_ops, - l_os_oq_ops, - l_os_ops, - l_os_oq_max_bytes, - l_os_oq_bytes, - l_os_bytes, - l_os_apply_lat, - l_os_committing, - l_os_commit, - l_os_commit_len, - l_os_commit_lat, - l_os_j_full, - l_os_queue_lat, - l_os_last, -}; - enum fs_types { FS_TYPE_NONE = 0, diff --git a/src/os/KeyValueStore.cc b/src/os/KeyValueStore.cc index 02fa3ee1f61..b77e7fc6fbc 100644 --- a/src/os/KeyValueStore.cc +++ b/src/os/KeyValueStore.cc @@ -500,6 +500,7 @@ KeyValueStore::KeyValueStore(const std::string &base, lock("KeyValueStore::lock"), default_osr("default"), op_queue_len(0), op_queue_bytes(0), + op_throttle_lock("KeyValueStore::op_throttle_lock"), op_finisher(g_ceph_context), op_tp(g_ceph_context, "KeyValueStore::op_tp", g_conf->filestore_op_threads, "keyvaluestore_op_threads"), @@ -507,6 +508,8 @@ KeyValueStore::KeyValueStore(const std::string &base, g_conf->filestore_op_thread_suicide_timeout, &op_tp), logger(NULL), read_error_lock("KeyValueStore::read_error_lock"), + m_keyvaluestore_queue_max_ops(g_conf->keyvaluestore_queue_max_ops), + m_keyvaluestore_queue_max_bytes(g_conf->keyvaluestore_queue_max_bytes), m_fail_eio(g_conf->filestore_fail_eio), do_update(do_update) { @@ -519,7 +522,17 @@ KeyValueStore::KeyValueStore(const std::string &base, current_op_seq_fn = sss.str(); // initialize logger - PerfCountersBuilder plb(g_ceph_context, internal_name, 0, 1); + PerfCountersBuilder plb(g_ceph_context, internal_name, l_os_commit_lat, l_os_last); + + plb.add_u64(l_os_oq_max_ops, "op_queue_max_ops"); + plb.add_u64(l_os_oq_ops, "op_queue_ops"); + plb.add_u64_counter(l_os_ops, "ops"); + plb.add_u64(l_os_oq_max_bytes, "op_queue_max_bytes"); + plb.add_u64(l_os_oq_bytes, "op_queue_bytes"); + plb.add_u64_counter(l_os_bytes, "bytes"); + plb.add_time_avg(l_os_apply_lat, "apply_latency"); + plb.add_time_avg(l_os_queue_lat, "queue_transaction_latency_avg"); + logger = plb.create_perf_counters(); g_ceph_context->get_perfcounters_collection()->add(logger); @@ -951,6 +964,7 @@ int KeyValueStore::queue_transactions(Sequencer *posr, list &tls, } Op *o = build_op(tls, ondisk, onreadable, onreadable_sync, osd_op); + op_queue_reserve_throttle(o, handle); uint64_t op = submit_manager.op_submit_start(); o->op = op; dout(5) << "queue_transactions (trailing journal) " << op << " " @@ -997,12 +1011,62 @@ void KeyValueStore::queue_op(OpSequencer *osr, Op *o) osr->queue(o); + logger->inc(l_os_ops); + logger->inc(l_os_bytes, o->bytes); + dout(5) << "queue_op " << o << " seq " << o->op << " " << *osr << " " << o->bytes << " bytes" << " (queue has " << op_queue_len << " ops and " << op_queue_bytes << " bytes)" << dendl; op_wq.queue(osr); } +void KeyValueStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle) +{ + uint64_t max_ops = m_keyvaluestore_queue_max_ops; + uint64_t max_bytes = m_keyvaluestore_queue_max_bytes; + + logger->set(l_os_oq_max_ops, max_ops); + logger->set(l_os_oq_max_bytes, max_bytes); + + utime_t start = ceph_clock_now(g_ceph_context); + { + Mutex::Locker l(op_throttle_lock); + while ((max_ops && (op_queue_len + 1) > max_ops) || + (max_bytes && op_queue_bytes // let single large ops through! + && (op_queue_bytes + o->bytes) > max_bytes)) { + dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops + << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes + << dendl; + if (handle) + handle->suspend_tp_timeout(); + op_throttle_cond.Wait(op_throttle_lock); + if (handle) + handle->reset_tp_timeout(); + } + + op_queue_len++; + op_queue_bytes += o->bytes; + } + utime_t end = ceph_clock_now(g_ceph_context); + logger->tinc(l_os_queue_lat, end - start); + + logger->set(l_os_oq_ops, op_queue_len); + logger->set(l_os_oq_bytes, op_queue_bytes); +} + +void KeyValueStore::op_queue_release_throttle(Op *o) +{ + { + Mutex::Locker l(op_throttle_lock); + op_queue_len--; + op_queue_bytes -= o->bytes; + op_throttle_cond.Signal(); + } + + logger->set(l_os_oq_ops, op_queue_len); + logger->set(l_os_oq_bytes, op_queue_bytes); +} + void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle) { // inject a stall? @@ -1040,9 +1104,11 @@ void KeyValueStore::_finish_op(OpSequencer *osr) dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl; osr->apply_lock.Unlock(); // locked in _do_op + op_queue_release_throttle(o); utime_t lat = ceph_clock_now(g_ceph_context); lat -= o->start; + logger->tinc(l_os_apply_lat, lat); if (o->onreadable_sync) { o->onreadable_sync->complete(0); diff --git a/src/os/KeyValueStore.h b/src/os/KeyValueStore.h index 27f0f953562..c22185ffd24 100644 --- a/src/os/KeyValueStore.h +++ b/src/os/KeyValueStore.h @@ -302,6 +302,8 @@ class KeyValueStore : public ObjectStore, Sequencer default_osr; deque op_queue; uint64_t op_queue_len, op_queue_bytes; + Cond op_throttle_cond; + Mutex op_throttle_lock; Finisher op_finisher; ThreadPool op_tp; @@ -341,11 +343,13 @@ class KeyValueStore : public ObjectStore, } } op_wq; - void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle); - void _finish_op(OpSequencer *osr); Op *build_op(list& tls, Context *ondisk, Context *onreadable, Context *onreadable_sync, TrackedOpRef osd_op); void queue_op(OpSequencer *osr, Op *o); + void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL); + void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle); + void op_queue_release_throttle(Op *o); + void _finish_op(OpSequencer *osr); PerfCounters *logger; @@ -550,6 +554,8 @@ class KeyValueStore : public ObjectStore, const std::set &changed); std::string m_osd_rollback_to_cluster_snap; + int m_keyvaluestore_queue_max_ops; + int m_keyvaluestore_queue_max_bytes; bool m_fail_eio; int do_update; diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 1b75ecb2650..7216310da10 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -41,6 +41,34 @@ namespace ceph { class Formatter; } +enum { + l_os_first = 84000, + l_os_jq_max_ops, + l_os_jq_ops, + l_os_j_ops, + l_os_jq_max_bytes, + l_os_jq_bytes, + l_os_j_bytes, + l_os_j_lat, + l_os_j_wr, + l_os_j_wr_bytes, + l_os_j_full, + l_os_committing, + l_os_commit, + l_os_commit_len, + l_os_commit_lat, + l_os_oq_max_ops, + l_os_oq_ops, + l_os_ops, + l_os_oq_max_bytes, + l_os_oq_bytes, + l_os_bytes, + l_os_apply_lat, + l_os_queue_lat, + l_os_last, +}; + + /* * low-level interface to the local OSD file system */