]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Journal: replace the journal throttle with fullness backoff throttle
authorSamuel Just <sjust@redhat.com>
Thu, 11 Feb 2016 23:03:49 +0000 (15:03 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 25 Feb 2016 19:11:46 +0000 (11:11 -0800)
The existing FileJournal::throttle_(ops|bytes) throttles overlap with
the FileStore op queue throttles.  It doesn't seem important whether
pending ops are waiting on the journal or the backing fs, so the
FileJournal ones are out.  Instead, there is now a throttle which
is taken in queue_transaction and released in _committed_thru
(after sync) which reflects the current fullness of the journal
and gradually delays ops as the journal fills up.  The intention is
to smooth out workloads on small journals.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/CMakeLists.txt
src/common/config_opts.h
src/os/Makefile.am
src/os/ObjectStore.h
src/os/filestore/FileJournal.cc
src/os/filestore/FileJournal.h
src/os/filestore/FileStore.cc
src/os/filestore/Journal.h
src/os/filestore/JournalThrottle.cc [new file with mode: 0644]
src/os/filestore/JournalThrottle.h [new file with mode: 0644]

index 73d927fe1746d2e1c87d6c421c5ac648f3b1fa3a..bddad655017813d848841d6d3f64f5efb0ac74a0 100644 (file)
@@ -634,6 +634,7 @@ set(libos_srcs
   os/filestore/DBObjectMap.cc
   os/filestore/FileJournal.cc
   os/filestore/FileStore.cc
+  os/filestore/JournalThrottle.cc
   os/filestore/GenericFileStoreBackend.cc
   os/filestore/JournalingObjectStore.cc
   os/filestore/HashIndex.cc
index 4fdcd578e2207936a306939b14ff4272554b09ab..1b19ccfd5f8a4d58e45e3a71c82abf1bc0a91eaa 100644 (file)
@@ -1066,8 +1066,16 @@ OPTION(journal_block_align, OPT_BOOL, true)
 OPTION(journal_write_header_frequency, OPT_U64, 0)
 OPTION(journal_max_write_bytes, OPT_INT, 10 << 20)
 OPTION(journal_max_write_entries, OPT_INT, 100)
-OPTION(journal_queue_max_ops, OPT_INT, 300)
-OPTION(journal_queue_max_bytes, OPT_INT, 32 << 20)
+
+/// Target range for journal fullness
+OPTION(journal_throttle_low_threshhold, OPT_DOUBLE, 0.5)
+OPTION(journal_throttle_high_threshhold, OPT_DOUBLE, 0.8)
+
+/// Multiple over expected at high_threshhold (probably don't need to change)
+OPTION(journal_throttle_high_multiple, OPT_DOUBLE, 2)
+/// Multiple over expected at max (probably don't need to change)
+OPTION(journal_throttle_max_multiple, OPT_DOUBLE, 10)
+
 OPTION(journal_align_min_size, OPT_INT, 64 << 10)  // align data payloads >= this.
 OPTION(journal_replay_from, OPT_INT, 0)
 OPTION(journal_zero_on_create, OPT_BOOL, false)
index f5467ec7f221fa7f055a5cd50254007d223156ae..52bd2a52f288c7ada76e33e7f72104a11effff64 100644 (file)
@@ -17,6 +17,7 @@ libos_a_SOURCES = \
        os/filestore/DBObjectMap.cc \
        os/filestore/FileJournal.cc \
        os/filestore/FileStore.cc \
+       os/filestore/JournalThrottle.cc \
        os/filestore/GenericFileStoreBackend.cc \
        os/filestore/HashIndex.cc \
        os/filestore/IndexManager.cc \
@@ -75,6 +76,7 @@ noinst_HEADERS += \
        os/filestore/DBObjectMap.h \
        os/filestore/FileJournal.h \
        os/filestore/FileStore.h \
+       os/filestore/JournalThrottle.h \
        os/filestore/FDCache.h \
        os/filestore/GenericFileStoreBackend.h \
        os/filestore/HashIndex.h \
index b0b952452ae8bcbcd4fe5d4417708b58e0177eca..55350c01fd612eb0831d2dff35707260846c9cb0 100644 (file)
@@ -47,11 +47,9 @@ namespace ceph {
 
 enum {
   l_os_first = 84000,
-  l_os_jq_max_ops,
   l_os_jq_ops,
-  l_os_j_ops,
-  l_os_jq_max_bytes,
   l_os_jq_bytes,
+  l_os_j_ops,
   l_os_j_bytes,
   l_os_j_lat,
   l_os_j_wr,
index 9ddcce9921a16933f6ed188d1a1ba77f06d269a7..c464a373deb8d6a8025ec76a10555136183287ad 100644 (file)
@@ -912,7 +912,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
           // throw out what we have so far
           full_state = FULL_FULL;
           while (!writeq_empty()) {
-            put_throttle(1, peek_write().orig_len);
+            complete_write(1, peek_write().orig_len);
             pop_write();
           }
           print_header(header);
@@ -1307,7 +1307,7 @@ void FileJournal::write_thread_entry()
       if (write_stop) {
        dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
        while (!writeq_empty()) {
-         put_throttle(1, peek_write().orig_len);
+         complete_write(1, peek_write().orig_len);
          pop_write();
        }
        print_header(header);
@@ -1334,7 +1334,7 @@ void FileJournal::write_thread_entry()
 #else
     do_write(bl);
 #endif
-    put_throttle(orig_ops, orig_bytes);
+    complete_write(orig_ops, orig_bytes);
   }
 
   dout(10) << "write_thread_entry finish" << dendl;
@@ -1655,15 +1655,17 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
          << " (" << oncommit << ")" << dendl;
   assert(e.length() > 0);
 
-  throttle_ops.take(1);
-  throttle_bytes.take(orig_len);
   if (osd_op)
     osd_op->mark_event("commit_queued_for_journal_write");
   if (logger) {
-    logger->set(l_os_jq_max_ops, throttle_ops.get_max());
-    logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
-    logger->set(l_os_jq_ops, throttle_ops.get_current());
-    logger->set(l_os_jq_bytes, throttle_bytes.get_current());
+    logger->inc(l_os_jq_bytes, orig_len);
+    logger->inc(l_os_jq_ops, 1);
+  }
+
+  throttle.register_throttle_seq(seq, e.length());
+  if (logger) {
+    logger->inc(l_os_j_ops, 1);
+    logger->inc(l_os_j_bytes, e.length());
   }
 
   {
@@ -1705,19 +1707,37 @@ void FileJournal::pop_write()
 {
   assert(write_lock.is_locked());
   Mutex::Locker locker(writeq_lock);
+  if (logger) {
+    logger->dec(l_os_jq_bytes, writeq.front().orig_len);
+    logger->dec(l_os_jq_ops, 1);
+  }
   writeq.pop_front();
 }
 
 void FileJournal::batch_pop_write(list<write_item> &items)
 {
   assert(write_lock.is_locked());
-  Mutex::Locker locker(writeq_lock);
-  writeq.swap(items);
+  {
+    Mutex::Locker locker(writeq_lock);
+    writeq.swap(items);
+  }
+  for (auto &&i : items) {
+    if (logger) {
+      logger->dec(l_os_jq_bytes, i.orig_len);
+      logger->dec(l_os_jq_ops, 1);
+    }
+  }
 }
 
 void FileJournal::batch_unpop_write(list<write_item> &items)
 {
   assert(write_lock.is_locked());
+  for (auto &&i : items) {
+    if (logger) {
+      logger->inc(l_os_jq_bytes, i.orig_len);
+      logger->inc(l_os_jq_ops, 1);
+    }
+  }
   Mutex::Locker locker(writeq_lock);
   writeq.splice(writeq.begin(), items);
 }
@@ -1775,6 +1795,12 @@ void FileJournal::committed_thru(uint64_t seq)
 {
   Mutex::Locker locker(write_lock);
 
+  auto released = throttle.flush(seq);
+  if (logger) {
+    logger->dec(l_os_j_ops, released.first);
+    logger->dec(l_os_j_bytes, released.second);
+  }
+
   if (seq < last_committed_seq) {
     dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
     assert(seq >= last_committed_seq);
@@ -1831,7 +1857,7 @@ void FileJournal::committed_thru(uint64_t seq)
     dout(15) << " dropping committed but unwritten seq " << peek_write().seq
             << " len " << peek_write().bl.length()
             << dendl;
-    put_throttle(1, peek_write().orig_len);
+    complete_write(1, peek_write().orig_len);
     pop_write();
   }
 
@@ -1841,29 +1867,20 @@ void FileJournal::committed_thru(uint64_t seq)
 }
 
 
-void FileJournal::put_throttle(uint64_t ops, uint64_t bytes)
+void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
 {
-  uint64_t new_ops = throttle_ops.put(ops);
-  uint64_t new_bytes = throttle_bytes.put(bytes);
-  dout(5) << "put_throttle finished " << ops << " ops and "
-          << bytes << " bytes, now "
-          << new_ops << " ops and " << new_bytes << " bytes"
-          << dendl;
-
-  if (logger) {
-    logger->inc(l_os_j_ops, ops);
-    logger->inc(l_os_j_bytes, bytes);
-    logger->set(l_os_jq_ops, new_ops);
-    logger->set(l_os_jq_bytes, new_bytes);
-    logger->set(l_os_jq_max_ops, throttle_ops.get_max());
-    logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
-  }
+  dout(5) << __func__ << " finished " << ops << " ops and "
+         << bytes << " bytes" << dendl;
 }
 
 int FileJournal::make_writeable()
 {
   dout(10) << __func__ << dendl;
-  int r = _open(true);
+  int r = set_throttle_params();
+  if (r < 0)
+    return r;
+
+  r = _open(true);
   if (r < 0)
     return r;
 
@@ -1874,10 +1891,43 @@ int FileJournal::make_writeable()
   read_pos = 0;
 
   must_write_header = true;
+
   start_writer();
   return 0;
 }
 
+int FileJournal::set_throttle_params()
+{
+  stringstream ss;
+  bool valid = throttle.set_params(
+    g_conf->journal_throttle_low_threshhold,
+    g_conf->journal_throttle_high_threshhold,
+    g_conf->filestore_expected_throughput_bytes,
+    g_conf->journal_throttle_high_multiple,
+    g_conf->journal_throttle_max_multiple,
+    header.max_size - get_top(),
+    &ss);
+
+  if (!valid) {
+    derr << "tried to set invalid params: "
+        << ss.str()
+        << dendl;
+  }
+  return valid ? 0 : -EINVAL;
+}
+
+const char** FileJournal::get_tracked_conf_keys() const
+{
+  static const char *KEYS[] = {
+    "journal_throttle_low_threshhold",
+    "journal_throttle_high_threshhold",
+    "journal_throttle_high_multiple",
+    "journal_throttle_max_multiple",
+    "filestore_expected_throughput_bytes",
+    NULL};
+  return KEYS;
+}
+
 void FileJournal::wrap_read_bl(
   off64_t pos,
   int64_t olen,
@@ -1940,6 +1990,16 @@ bool FileJournal::read_entry(
     &ss);
   if (result == SUCCESS) {
     journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
+    uint64_t amount_to_take =
+      next_pos > pos ?
+      next_pos - pos :
+      (header.max_size - pos) + (next_pos - get_top());
+    throttle.take(amount_to_take);
+    throttle.register_throttle_seq(next_seq, amount_to_take);
+    if (logger) {
+      logger->inc(l_os_j_ops, 1);
+      logger->inc(l_os_j_bytes, amount_to_take);
+    }
     if (next_seq > seq) {
       return false;
     } else {
@@ -2057,12 +2117,9 @@ FileJournal::read_entry_result FileJournal::do_read_entry(
   return SUCCESS;
 }
 
-void FileJournal::throttle()
+void FileJournal::reserve_throttle_and_backoff(uint64_t count)
 {
-  if (throttle_ops.wait(g_conf->journal_queue_max_ops))
-    dout(2) << "throttle: waited for ops" << dendl;
-  if (throttle_bytes.wait(g_conf->journal_queue_max_bytes))
-    dout(2) << "throttle: waited for bytes" << dendl;
+  throttle.get(count);
 }
 
 void FileJournal::get_header(
index 3c6742a420330e4b3a0523608bdc960697b7556d..0c50e89133cc2eb2febecb6990e298d93c4cda27 100644 (file)
@@ -24,6 +24,8 @@ using std::deque;
 #include "common/Mutex.h"
 #include "common/Thread.h"
 #include "common/Throttle.h"
+#include "JournalThrottle.h"
+
 
 #ifdef HAVE_LIBAIO
 # include <libaio.h>
@@ -34,7 +36,9 @@ using std::deque;
  *
  * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
  */
-class FileJournal : public Journal {
+class FileJournal :
+  public Journal,
+  public md_config_obs_t {
 public:
   /// Protected by finisher_lock
   struct completion_item {
@@ -295,9 +299,23 @@ private:
 
 
   // throttle
-  Throttle throttle_ops, throttle_bytes;
+  int set_throttle_params();
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(
+    const struct md_config_t *conf,
+    const std::set <std::string> &changed) override {
+    for (const char **i = get_tracked_conf_keys();
+        *i;
+        ++i) {
+      if (changed.count(string(*i))) {
+       set_throttle_params();
+       return;
+      }
+    }
+  }
 
-  void put_throttle(uint64_t ops, uint64_t bytes);
+  void complete_write(uint64_t ops, uint64_t bytes);
+  JournalThrottle throttle;
 
   // write thread
   Mutex write_lock;
@@ -398,8 +416,7 @@ private:
     full_state(FULL_NOTFULL),
     fd(-1),
     writing_seq(0),
-    throttle_ops(g_ceph_context, "journal_ops", g_conf->journal_queue_max_ops),
-    throttle_bytes(g_ceph_context, "journal_bytes", g_conf->journal_queue_max_bytes),
+    throttle(g_conf->filestore_caller_concurrency),
     write_lock("FileJournal::write_lock", false, true, false, g_ceph_context),
     write_stop(true),
     aio_stop(true),
@@ -416,10 +433,13 @@ private:
         aio = false;
       }
 #endif
+
+      g_conf->add_observer(this);
   }
   ~FileJournal() {
     assert(fd == -1);
     delete[] zero_buf;
+    g_conf->remove_observer(this);
   }
 
   int check();
@@ -434,7 +454,7 @@ private:
 
   void flush();
 
-  void throttle();
+  void reserve_throttle_and_backoff(uint64_t count);
 
   bool is_writeable() {
     return read_pos == 0;
index 4251cae213ba852aa00df07494d3fe10dedb78ab..28ea4b3b46c5e31ae98649f709c1f07014432418 100644 (file)
@@ -581,10 +581,8 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
   // initialize logger
   PerfCountersBuilder plb(g_ceph_context, internal_name, l_os_first, l_os_last);
 
-  plb.add_u64(l_os_jq_max_ops, "journal_queue_max_ops", "Max operations in journal queue");
   plb.add_u64(l_os_jq_ops, "journal_queue_ops", "Operations in journal queue");
   plb.add_u64_counter(l_os_j_ops, "journal_ops", "Total journal entries written");
-  plb.add_u64(l_os_jq_max_bytes, "journal_queue_max_bytes", "Max data in journal queue");
   plb.add_u64(l_os_jq_bytes, "journal_queue_bytes", "Size of journal queue");
   plb.add_u64_counter(l_os_j_bytes, "journal_bytes", "Total operations size in journal");
   plb.add_time_avg(l_os_j_lat, "journal_latency", "Average journal queue completing latency");
@@ -1943,18 +1941,20 @@ int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
 
   if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
     Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
+
+    //prepare and encode transactions data out of lock
+    bufferlist tbl;
+    int orig_len = journal->prepare_entry(o->tls, &tbl);
+
     if (handle)
       handle->suspend_tp_timeout();
 
     op_queue_reserve_throttle(o);
+    journal->reserve_throttle_and_backoff(tbl.length());
 
     if (handle)
       handle->reset_tp_timeout();
 
-    journal->throttle();
-    //prepare and encode transactions data out of lock
-    bufferlist tbl;
-    int orig_len = journal->prepare_entry(o->tls, &tbl);
     uint64_t op_num = submit_manager.op_submit_start();
     o->op = op_num;
 
index 236e4315a35ca2d73f37734758a9abfbc1b052ed..ca30da4794feb0748b14f1c9d9ff133e3a4c2a9e 100644 (file)
@@ -49,7 +49,14 @@ public:
   virtual void close() = 0;  ///< close an open journal
 
   virtual void flush() = 0;
-  virtual void throttle() = 0;
+
+  /**
+   * reserve_throttle_and_backoff
+   *
+   * Implementation may throttle or backoff based on ops
+   * reserved here but not yet released using committed_thru.
+   */
+  virtual void reserve_throttle_and_backoff(uint64_t count) = 0;
 
   virtual int dump(ostream& out) { return -EOPNOTSUPP; }
 
diff --git a/src/os/filestore/JournalThrottle.cc b/src/os/filestore/JournalThrottle.cc
new file mode 100644 (file)
index 0000000..4a100c6
--- /dev/null
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "JournalThrottle.h"
+#include "include/assert.h"
+
+bool JournalThrottle::set_params(
+  double _low_threshhold,
+  double _high_threshhold,
+  double _expected_throughput,
+  double _high_multiple,
+  double _max_multiple,
+  uint64_t _throttle_max,
+  std::ostream *errstream)
+{
+  return throttle.set_params(
+    _low_threshhold,
+    _high_threshhold,
+    _expected_throughput,
+    _high_multiple,
+    _max_multiple,
+    _throttle_max,
+    errstream);
+}
+
+std::chrono::duration<double> JournalThrottle::get(uint64_t c)
+{
+  return throttle.get(c);
+}
+
+uint64_t JournalThrottle::take(uint64_t c)
+{
+  return throttle.take(c);
+}
+
+void JournalThrottle::register_throttle_seq(uint64_t seq, uint64_t c)
+{
+  locker l(lock);
+  journaled_ops.push_back(std::make_pair(seq, c));
+}
+
+std::pair<uint64_t, uint64_t> JournalThrottle::flush(uint64_t mono_id)
+{
+  uint64_t to_put_bytes = 0;
+  uint64_t to_put_ops = 0;
+  {
+    locker l(lock);
+    while (!journaled_ops.empty() &&
+          journaled_ops.front().first <= mono_id) {
+      to_put_bytes += journaled_ops.front().second;
+      to_put_ops++;
+      journaled_ops.pop_front();
+    }
+  }
+  throttle.put(to_put_bytes);
+  return make_pair(to_put_ops, to_put_bytes);
+}
+
+uint64_t JournalThrottle::get_current()
+{
+  return throttle.get_current();
+}
+
+uint64_t JournalThrottle::get_max()
+{
+  return throttle.get_max();
+}
diff --git a/src/os/filestore/JournalThrottle.h b/src/os/filestore/JournalThrottle.h
new file mode 100644 (file)
index 0000000..8a7ce72
--- /dev/null
@@ -0,0 +1,101 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_THROTTLE_H
+#define CEPH_JOURNAL_THROTTLE_H
+
+#include "common/Throttle.h"
+
+#include <list>
+#include <deque>
+#include <condition_variable>
+#include <thread>
+#include <vector>
+#include <chrono>
+#include <iostream>
+
+/**
+ * JournalThrottle
+ *
+ * Throttle designed to implement dynamic throttling as the journal fills
+ * up.  The goal is to not delay ops at all when the journal is relatively
+ * empty, delay ops somewhat as the journal begins to fill (with the delay
+ * getting linearly longer as the journal fills up to a high water mark),
+ * and to delay much more aggressively (though still linearly with usage)
+ * until we hit the max value.
+ *
+ * The implementation simply wraps BackoffThrottle with a queue of
+ * journaled but not synced ops.
+ *
+ * The usage pattern is as follows:
+ * 1) Call get(seq, bytes) before taking the op_queue_throttle
+ * 2) Once the journal is flushed, flush(max_op_id_flushed)
+ */
+class JournalThrottle {
+  BackoffThrottle throttle;
+
+  std::mutex lock;
+  /// deque<id, count>
+  std::deque<std::pair<uint64_t, uint64_t> > journaled_ops;
+  using locker = std::unique_lock<std::mutex>;
+
+public:
+  /**
+   * set_params
+   *
+   * Sets params.  If the params are invalid, returns false
+   * and populates errstream (if non-null) with a user compreshensible
+   * explanation.
+   */
+  bool set_params(
+    double low_threshhold,
+    double high_threshhold,
+    double expected_throughput,
+    double high_multiple,
+    double max_multiple,
+    uint64_t throttle_max,
+    std::ostream *errstream);
+
+  /**
+   * gets specified throttle for id mono_id, waiting as necessary
+   *
+   * @param c [in] amount to take
+   * @return duration waited
+   */
+  std::chrono::duration<double> get(uint64_t c);
+
+  /**
+   * take
+   *
+   * Takes specified throttle without waiting
+   */
+  uint64_t take(uint64_t c);
+
+  /**
+   * register_throttle_seq
+   *
+   * Registers a sequence number with an amount of throttle to
+   * release upon flush()
+   *
+   * @param seq [in] seq
+   */
+  void register_throttle_seq(uint64_t seq, uint64_t c);
+
+
+  /**
+   * Releases throttle held by ids <= mono_id
+   *
+   * @param mono_id [in] id up to which to flush
+   * @returns pair<ops_flushed, bytes_flushed>
+   */
+  std::pair<uint64_t, uint64_t> flush(uint64_t mono_id);
+
+  uint64_t get_current();
+  uint64_t get_max();
+
+  JournalThrottle(
+    unsigned expected_concurrency ///< [in] determines size of conds
+    ) : throttle(expected_concurrency) {}
+};
+
+#endif