]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Move perf counter and add op queue reserve throttle
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 18 Feb 2014 14:46:22 +0000 (22:46 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sat, 22 Feb 2014 13:04:59 +0000 (21:04 +0800)
The perf counter of FileStore can shared with other ObjectStore backend, so move
it to ObjectStore and adjust position to let other KeyValueStore refer to.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/common/config_opts.h
src/os/FileStore.h
src/os/KeyValueStore.cc
src/os/KeyValueStore.h
src/os/ObjectStore.h

index 28f4acf07e45ff649ff018bfba89fb2d89279bad..a9ea73d8a0c5f4f4ff5516254b1c77ca4c02d79c 100644 (file)
@@ -650,6 +650,9 @@ OPTION(journal_dio, OPT_BOOL, true)
 OPTION(journal_aio, OPT_BOOL, true)
 OPTION(journal_force_aio, OPT_BOOL, false)
 
+OPTION(keyvaluestore_queue_max_ops, OPT_INT, 50)
+OPTION(keyvaluestore_queue_max_bytes, OPT_INT, 100 << 20)
+
 // max bytes to search ahead in journal searching for corruption
 OPTION(journal_max_corrupt_search, OPT_U64, 10<<20)
 OPTION(journal_block_align, OPT_BOOL, true)
index d30eeba74e0e544307c65b28874eff8b543aa536..9ee9999a1c37a7db02e1aefae87a82613eb98237 100644 (file)
@@ -63,33 +63,6 @@ static const __SWORD_TYPE XFS_SUPER_MAGIC(0x58465342);
 static const __SWORD_TYPE ZFS_SUPER_MAGIC(0x2fc12fc1);
 #endif
 
-enum {
-  l_os_first = 84000,
-  l_os_jq_max_ops,
-  l_os_jq_ops,
-  l_os_j_ops,
-  l_os_jq_max_bytes,
-  l_os_jq_bytes,
-  l_os_j_bytes,
-  l_os_j_lat,
-  l_os_j_wr,
-  l_os_j_wr_bytes,
-  l_os_oq_max_ops,
-  l_os_oq_ops,
-  l_os_ops,
-  l_os_oq_max_bytes,
-  l_os_oq_bytes,
-  l_os_bytes,
-  l_os_apply_lat,
-  l_os_committing,
-  l_os_commit,
-  l_os_commit_len,
-  l_os_commit_lat,
-  l_os_j_full,
-  l_os_queue_lat,
-  l_os_last,
-};
-
 
 enum fs_types {
   FS_TYPE_NONE = 0,
index 02fa3ee1f618d3eb1e80c0bd583ad0a7e9e1055a..b77e7fc6fbc39631b1fe73b50901ea330cd0271d 100644 (file)
@@ -500,6 +500,7 @@ KeyValueStore::KeyValueStore(const std::string &base,
   lock("KeyValueStore::lock"),
   default_osr("default"),
   op_queue_len(0), op_queue_bytes(0),
+  op_throttle_lock("KeyValueStore::op_throttle_lock"),
   op_finisher(g_ceph_context),
   op_tp(g_ceph_context, "KeyValueStore::op_tp",
         g_conf->filestore_op_threads, "keyvaluestore_op_threads"),
@@ -507,6 +508,8 @@ KeyValueStore::KeyValueStore(const std::string &base,
         g_conf->filestore_op_thread_suicide_timeout, &op_tp),
   logger(NULL),
   read_error_lock("KeyValueStore::read_error_lock"),
+  m_keyvaluestore_queue_max_ops(g_conf->keyvaluestore_queue_max_ops),
+  m_keyvaluestore_queue_max_bytes(g_conf->keyvaluestore_queue_max_bytes),
   m_fail_eio(g_conf->filestore_fail_eio),
   do_update(do_update)
 {
@@ -519,7 +522,17 @@ KeyValueStore::KeyValueStore(const std::string &base,
   current_op_seq_fn = sss.str();
 
   // initialize logger
-  PerfCountersBuilder plb(g_ceph_context, internal_name, 0, 1);
+  PerfCountersBuilder plb(g_ceph_context, internal_name, l_os_commit_lat, l_os_last);
+
+  plb.add_u64(l_os_oq_max_ops, "op_queue_max_ops");
+  plb.add_u64(l_os_oq_ops, "op_queue_ops");
+  plb.add_u64_counter(l_os_ops, "ops");
+  plb.add_u64(l_os_oq_max_bytes, "op_queue_max_bytes");
+  plb.add_u64(l_os_oq_bytes, "op_queue_bytes");
+  plb.add_u64_counter(l_os_bytes, "bytes");
+  plb.add_time_avg(l_os_apply_lat, "apply_latency");
+  plb.add_time_avg(l_os_queue_lat, "queue_transaction_latency_avg");
+
   logger = plb.create_perf_counters();
 
   g_ceph_context->get_perfcounters_collection()->add(logger);
@@ -951,6 +964,7 @@ int KeyValueStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   }
 
   Op *o = build_op(tls, ondisk, onreadable, onreadable_sync, osd_op);
+  op_queue_reserve_throttle(o, handle);
   uint64_t op = submit_manager.op_submit_start();
   o->op = op;
   dout(5) << "queue_transactions (trailing journal) " << op << " "
@@ -997,12 +1011,62 @@ void KeyValueStore::queue_op(OpSequencer *osr, Op *o)
 
   osr->queue(o);
 
+  logger->inc(l_os_ops);
+  logger->inc(l_os_bytes, o->bytes);
+
   dout(5) << "queue_op " << o << " seq " << o->op << " " << *osr << " "
           << o->bytes << " bytes" << "   (queue has " << op_queue_len
           << " ops and " << op_queue_bytes << " bytes)" << dendl;
   op_wq.queue(osr);
 }
 
+void KeyValueStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle)
+{
+  uint64_t max_ops = m_keyvaluestore_queue_max_ops;
+  uint64_t max_bytes = m_keyvaluestore_queue_max_bytes;
+
+  logger->set(l_os_oq_max_ops, max_ops);
+  logger->set(l_os_oq_max_bytes, max_bytes);
+
+  utime_t start = ceph_clock_now(g_ceph_context);
+  {
+    Mutex::Locker l(op_throttle_lock);
+    while ((max_ops && (op_queue_len + 1) > max_ops) ||
+           (max_bytes && op_queue_bytes      // let single large ops through!
+           && (op_queue_bytes + o->bytes) > max_bytes)) {
+      dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops
+              << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes
+              << dendl;
+      if (handle)
+        handle->suspend_tp_timeout();
+      op_throttle_cond.Wait(op_throttle_lock);
+      if (handle)
+        handle->reset_tp_timeout();
+    }
+
+    op_queue_len++;
+    op_queue_bytes += o->bytes;
+  }
+  utime_t end = ceph_clock_now(g_ceph_context);
+  logger->tinc(l_os_queue_lat, end - start);
+
+  logger->set(l_os_oq_ops, op_queue_len);
+  logger->set(l_os_oq_bytes, op_queue_bytes);
+}
+
+void KeyValueStore::op_queue_release_throttle(Op *o)
+{
+  {
+    Mutex::Locker l(op_throttle_lock);
+    op_queue_len--;
+    op_queue_bytes -= o->bytes;
+    op_throttle_cond.Signal();
+  }
+
+  logger->set(l_os_oq_ops, op_queue_len);
+  logger->set(l_os_oq_bytes, op_queue_bytes);
+}
+
 void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
 {
   // inject a stall?
@@ -1040,9 +1104,11 @@ void KeyValueStore::_finish_op(OpSequencer *osr)
 
   dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl;
   osr->apply_lock.Unlock();  // locked in _do_op
+  op_queue_release_throttle(o);
 
   utime_t lat = ceph_clock_now(g_ceph_context);
   lat -= o->start;
+  logger->tinc(l_os_apply_lat, lat);
 
   if (o->onreadable_sync) {
     o->onreadable_sync->complete(0);
index 27f0f953562cf294fc1213e4b7f4b3a8adea4cca..c22185ffd244975bc38d7a3b424a6ad006e9d146 100644 (file)
@@ -302,6 +302,8 @@ class KeyValueStore : public ObjectStore,
   Sequencer default_osr;
   deque<OpSequencer*> op_queue;
   uint64_t op_queue_len, op_queue_bytes;
+  Cond op_throttle_cond;
+  Mutex op_throttle_lock;
   Finisher op_finisher;
 
   ThreadPool op_tp;
@@ -341,11 +343,13 @@ class KeyValueStore : public ObjectStore,
     }
   } op_wq;
 
-  void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle);
-  void _finish_op(OpSequencer *osr);
   Op *build_op(list<Transaction*>& tls, Context *ondisk, Context *onreadable,
                Context *onreadable_sync, TrackedOpRef osd_op);
   void queue_op(OpSequencer *osr, Op *o);
+  void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL);
+  void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle);
+  void op_queue_release_throttle(Op *o);
+  void _finish_op(OpSequencer *osr);
 
   PerfCounters *logger;
 
@@ -550,6 +554,8 @@ class KeyValueStore : public ObjectStore,
                                   const std::set <std::string> &changed);
 
   std::string m_osd_rollback_to_cluster_snap;
+  int m_keyvaluestore_queue_max_ops;
+  int m_keyvaluestore_queue_max_bytes;
   bool m_fail_eio;
 
   int do_update;
index 1b75ecb265012c986a35cc3f18a509763bfb2d7e..7216310da1047d93eccc923a4d29998c06470b8d 100644 (file)
@@ -41,6 +41,34 @@ namespace ceph {
   class Formatter;
 }
 
+enum {
+  l_os_first = 84000,
+  l_os_jq_max_ops,
+  l_os_jq_ops,
+  l_os_j_ops,
+  l_os_jq_max_bytes,
+  l_os_jq_bytes,
+  l_os_j_bytes,
+  l_os_j_lat,
+  l_os_j_wr,
+  l_os_j_wr_bytes,
+  l_os_j_full,
+  l_os_committing,
+  l_os_commit,
+  l_os_commit_len,
+  l_os_commit_lat,
+  l_os_oq_max_ops,
+  l_os_oq_ops,
+  l_os_ops,
+  l_os_oq_max_bytes,
+  l_os_oq_bytes,
+  l_os_bytes,
+  l_os_apply_lat,
+  l_os_queue_lat,
+  l_os_last,
+};
+
+
 /*
  * low-level interface to the local OSD file system
  */