From f54e563c694736defff285091de67b8878df1c7f Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 11 Feb 2016 15:03:49 -0800 Subject: [PATCH] Journal: replace the journal throttle with fullness backoff throttle The existing FileJournal::throttle_(ops|bytes) throttles overlap with the FileStore op queue throttles. It doesn't seem important whether pending ops are waiting on the journal or the backing fs, so the FileJournal ones are out. Instead, there is now a throttle which is taken in queue_transaction and released in _committed_thru (after sync) which reflects the current fullness of the journal and gradually delays ops as the journal fills up. The intention is to smooth out workloads on small journals. Signed-off-by: Samuel Just --- src/CMakeLists.txt | 1 + src/common/config_opts.h | 12 ++- src/os/Makefile.am | 2 + src/os/ObjectStore.h | 4 +- src/os/filestore/FileJournal.cc | 125 ++++++++++++++++++++-------- src/os/filestore/FileJournal.h | 32 +++++-- src/os/filestore/FileStore.cc | 12 +-- src/os/filestore/Journal.h | 9 +- src/os/filestore/JournalThrottle.cc | 67 +++++++++++++++ src/os/filestore/JournalThrottle.h | 101 ++++++++++++++++++++++ 10 files changed, 313 insertions(+), 52 deletions(-) create mode 100644 src/os/filestore/JournalThrottle.cc create mode 100644 src/os/filestore/JournalThrottle.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 73d927fe1746d..bddad65501781 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -634,6 +634,7 @@ set(libos_srcs os/filestore/DBObjectMap.cc os/filestore/FileJournal.cc os/filestore/FileStore.cc + os/filestore/JournalThrottle.cc os/filestore/GenericFileStoreBackend.cc os/filestore/JournalingObjectStore.cc os/filestore/HashIndex.cc diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 4fdcd578e2207..1b19ccfd5f8a4 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1066,8 +1066,16 @@ OPTION(journal_block_align, OPT_BOOL, true) OPTION(journal_write_header_frequency, OPT_U64, 0) OPTION(journal_max_write_bytes, OPT_INT, 10 << 20) OPTION(journal_max_write_entries, OPT_INT, 100) -OPTION(journal_queue_max_ops, OPT_INT, 300) -OPTION(journal_queue_max_bytes, OPT_INT, 32 << 20) + +/// Target range for journal fullness +OPTION(journal_throttle_low_threshhold, OPT_DOUBLE, 0.5) +OPTION(journal_throttle_high_threshhold, OPT_DOUBLE, 0.8) + +/// Multiple over expected at high_threshhold (probably don't need to change) +OPTION(journal_throttle_high_multiple, OPT_DOUBLE, 2) +/// Multiple over expected at max (probably don't need to change) +OPTION(journal_throttle_max_multiple, OPT_DOUBLE, 10) + OPTION(journal_align_min_size, OPT_INT, 64 << 10) // align data payloads >= this. OPTION(journal_replay_from, OPT_INT, 0) OPTION(journal_zero_on_create, OPT_BOOL, false) diff --git a/src/os/Makefile.am b/src/os/Makefile.am index f5467ec7f221f..52bd2a52f288c 100644 --- a/src/os/Makefile.am +++ b/src/os/Makefile.am @@ -17,6 +17,7 @@ libos_a_SOURCES = \ os/filestore/DBObjectMap.cc \ os/filestore/FileJournal.cc \ os/filestore/FileStore.cc \ + os/filestore/JournalThrottle.cc \ os/filestore/GenericFileStoreBackend.cc \ os/filestore/HashIndex.cc \ os/filestore/IndexManager.cc \ @@ -75,6 +76,7 @@ noinst_HEADERS += \ os/filestore/DBObjectMap.h \ os/filestore/FileJournal.h \ os/filestore/FileStore.h \ + os/filestore/JournalThrottle.h \ os/filestore/FDCache.h \ os/filestore/GenericFileStoreBackend.h \ os/filestore/HashIndex.h \ diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index b0b952452ae8b..55350c01fd612 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -47,11 +47,9 @@ namespace ceph { enum { l_os_first = 84000, - l_os_jq_max_ops, l_os_jq_ops, - l_os_j_ops, - l_os_jq_max_bytes, l_os_jq_bytes, + l_os_j_ops, l_os_j_bytes, l_os_j_lat, l_os_j_wr, diff --git a/src/os/filestore/FileJournal.cc b/src/os/filestore/FileJournal.cc index 9ddcce9921a16..c464a373deb8d 100644 --- a/src/os/filestore/FileJournal.cc +++ b/src/os/filestore/FileJournal.cc @@ -912,7 +912,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_ // throw out what we have so far full_state = FULL_FULL; while (!writeq_empty()) { - put_throttle(1, peek_write().orig_len); + complete_write(1, peek_write().orig_len); pop_write(); } print_header(header); @@ -1307,7 +1307,7 @@ void FileJournal::write_thread_entry() if (write_stop) { dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; while (!writeq_empty()) { - put_throttle(1, peek_write().orig_len); + complete_write(1, peek_write().orig_len); pop_write(); } print_header(header); @@ -1334,7 +1334,7 @@ void FileJournal::write_thread_entry() #else do_write(bl); #endif - put_throttle(orig_ops, orig_bytes); + complete_write(orig_ops, orig_bytes); } dout(10) << "write_thread_entry finish" << dendl; @@ -1655,15 +1655,17 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, << " (" << oncommit << ")" << dendl; assert(e.length() > 0); - throttle_ops.take(1); - throttle_bytes.take(orig_len); if (osd_op) osd_op->mark_event("commit_queued_for_journal_write"); if (logger) { - logger->set(l_os_jq_max_ops, throttle_ops.get_max()); - logger->set(l_os_jq_max_bytes, throttle_bytes.get_max()); - logger->set(l_os_jq_ops, throttle_ops.get_current()); - logger->set(l_os_jq_bytes, throttle_bytes.get_current()); + logger->inc(l_os_jq_bytes, orig_len); + logger->inc(l_os_jq_ops, 1); + } + + throttle.register_throttle_seq(seq, e.length()); + if (logger) { + logger->inc(l_os_j_ops, 1); + logger->inc(l_os_j_bytes, e.length()); } { @@ -1705,19 +1707,37 @@ void FileJournal::pop_write() { assert(write_lock.is_locked()); Mutex::Locker locker(writeq_lock); + if (logger) { + logger->dec(l_os_jq_bytes, writeq.front().orig_len); + logger->dec(l_os_jq_ops, 1); + } writeq.pop_front(); } void FileJournal::batch_pop_write(list &items) { assert(write_lock.is_locked()); - Mutex::Locker locker(writeq_lock); - writeq.swap(items); + { + Mutex::Locker locker(writeq_lock); + writeq.swap(items); + } + for (auto &&i : items) { + if (logger) { + logger->dec(l_os_jq_bytes, i.orig_len); + logger->dec(l_os_jq_ops, 1); + } + } } void FileJournal::batch_unpop_write(list &items) { assert(write_lock.is_locked()); + for (auto &&i : items) { + if (logger) { + logger->inc(l_os_jq_bytes, i.orig_len); + logger->inc(l_os_jq_ops, 1); + } + } Mutex::Locker locker(writeq_lock); writeq.splice(writeq.begin(), items); } @@ -1775,6 +1795,12 @@ void FileJournal::committed_thru(uint64_t seq) { Mutex::Locker locker(write_lock); + auto released = throttle.flush(seq); + if (logger) { + logger->dec(l_os_j_ops, released.first); + logger->dec(l_os_j_bytes, released.second); + } + if (seq < last_committed_seq) { dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl; assert(seq >= last_committed_seq); @@ -1831,7 +1857,7 @@ void FileJournal::committed_thru(uint64_t seq) dout(15) << " dropping committed but unwritten seq " << peek_write().seq << " len " << peek_write().bl.length() << dendl; - put_throttle(1, peek_write().orig_len); + complete_write(1, peek_write().orig_len); pop_write(); } @@ -1841,29 +1867,20 @@ void FileJournal::committed_thru(uint64_t seq) } -void FileJournal::put_throttle(uint64_t ops, uint64_t bytes) +void FileJournal::complete_write(uint64_t ops, uint64_t bytes) { - uint64_t new_ops = throttle_ops.put(ops); - uint64_t new_bytes = throttle_bytes.put(bytes); - dout(5) << "put_throttle finished " << ops << " ops and " - << bytes << " bytes, now " - << new_ops << " ops and " << new_bytes << " bytes" - << dendl; - - if (logger) { - logger->inc(l_os_j_ops, ops); - logger->inc(l_os_j_bytes, bytes); - logger->set(l_os_jq_ops, new_ops); - logger->set(l_os_jq_bytes, new_bytes); - logger->set(l_os_jq_max_ops, throttle_ops.get_max()); - logger->set(l_os_jq_max_bytes, throttle_bytes.get_max()); - } + dout(5) << __func__ << " finished " << ops << " ops and " + << bytes << " bytes" << dendl; } int FileJournal::make_writeable() { dout(10) << __func__ << dendl; - int r = _open(true); + int r = set_throttle_params(); + if (r < 0) + return r; + + r = _open(true); if (r < 0) return r; @@ -1874,10 +1891,43 @@ int FileJournal::make_writeable() read_pos = 0; must_write_header = true; + start_writer(); return 0; } +int FileJournal::set_throttle_params() +{ + stringstream ss; + bool valid = throttle.set_params( + g_conf->journal_throttle_low_threshhold, + g_conf->journal_throttle_high_threshhold, + g_conf->filestore_expected_throughput_bytes, + g_conf->journal_throttle_high_multiple, + g_conf->journal_throttle_max_multiple, + header.max_size - get_top(), + &ss); + + if (!valid) { + derr << "tried to set invalid params: " + << ss.str() + << dendl; + } + return valid ? 0 : -EINVAL; +} + +const char** FileJournal::get_tracked_conf_keys() const +{ + static const char *KEYS[] = { + "journal_throttle_low_threshhold", + "journal_throttle_high_threshhold", + "journal_throttle_high_multiple", + "journal_throttle_max_multiple", + "filestore_expected_throughput_bytes", + NULL}; + return KEYS; +} + void FileJournal::wrap_read_bl( off64_t pos, int64_t olen, @@ -1940,6 +1990,16 @@ bool FileJournal::read_entry( &ss); if (result == SUCCESS) { journalq.push_back( pair(seq, pos)); + uint64_t amount_to_take = + next_pos > pos ? + next_pos - pos : + (header.max_size - pos) + (next_pos - get_top()); + throttle.take(amount_to_take); + throttle.register_throttle_seq(next_seq, amount_to_take); + if (logger) { + logger->inc(l_os_j_ops, 1); + logger->inc(l_os_j_bytes, amount_to_take); + } if (next_seq > seq) { return false; } else { @@ -2057,12 +2117,9 @@ FileJournal::read_entry_result FileJournal::do_read_entry( return SUCCESS; } -void FileJournal::throttle() +void FileJournal::reserve_throttle_and_backoff(uint64_t count) { - if (throttle_ops.wait(g_conf->journal_queue_max_ops)) - dout(2) << "throttle: waited for ops" << dendl; - if (throttle_bytes.wait(g_conf->journal_queue_max_bytes)) - dout(2) << "throttle: waited for bytes" << dendl; + throttle.get(count); } void FileJournal::get_header( diff --git a/src/os/filestore/FileJournal.h b/src/os/filestore/FileJournal.h index 3c6742a420330..0c50e89133cc2 100644 --- a/src/os/filestore/FileJournal.h +++ b/src/os/filestore/FileJournal.h @@ -24,6 +24,8 @@ using std::deque; #include "common/Mutex.h" #include "common/Thread.h" #include "common/Throttle.h" +#include "JournalThrottle.h" + #ifdef HAVE_LIBAIO # include @@ -34,7 +36,9 @@ using std::deque; * * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock) */ -class FileJournal : public Journal { +class FileJournal : + public Journal, + public md_config_obs_t { public: /// Protected by finisher_lock struct completion_item { @@ -295,9 +299,23 @@ private: // throttle - Throttle throttle_ops, throttle_bytes; + int set_throttle_params(); + const char** get_tracked_conf_keys() const override; + void handle_conf_change( + const struct md_config_t *conf, + const std::set &changed) override { + for (const char **i = get_tracked_conf_keys(); + *i; + ++i) { + if (changed.count(string(*i))) { + set_throttle_params(); + return; + } + } + } - void put_throttle(uint64_t ops, uint64_t bytes); + void complete_write(uint64_t ops, uint64_t bytes); + JournalThrottle throttle; // write thread Mutex write_lock; @@ -398,8 +416,7 @@ private: full_state(FULL_NOTFULL), fd(-1), writing_seq(0), - throttle_ops(g_ceph_context, "journal_ops", g_conf->journal_queue_max_ops), - throttle_bytes(g_ceph_context, "journal_bytes", g_conf->journal_queue_max_bytes), + throttle(g_conf->filestore_caller_concurrency), write_lock("FileJournal::write_lock", false, true, false, g_ceph_context), write_stop(true), aio_stop(true), @@ -416,10 +433,13 @@ private: aio = false; } #endif + + g_conf->add_observer(this); } ~FileJournal() { assert(fd == -1); delete[] zero_buf; + g_conf->remove_observer(this); } int check(); @@ -434,7 +454,7 @@ private: void flush(); - void throttle(); + void reserve_throttle_and_backoff(uint64_t count); bool is_writeable() { return read_pos == 0; diff --git a/src/os/filestore/FileStore.cc b/src/os/filestore/FileStore.cc index 4251cae213ba8..28ea4b3b46c5e 100644 --- a/src/os/filestore/FileStore.cc +++ b/src/os/filestore/FileStore.cc @@ -581,10 +581,8 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit // initialize logger PerfCountersBuilder plb(g_ceph_context, internal_name, l_os_first, l_os_last); - plb.add_u64(l_os_jq_max_ops, "journal_queue_max_ops", "Max operations in journal queue"); plb.add_u64(l_os_jq_ops, "journal_queue_ops", "Operations in journal queue"); plb.add_u64_counter(l_os_j_ops, "journal_ops", "Total journal entries written"); - plb.add_u64(l_os_jq_max_bytes, "journal_queue_max_bytes", "Max data in journal queue"); plb.add_u64(l_os_jq_bytes, "journal_queue_bytes", "Size of journal queue"); plb.add_u64_counter(l_os_j_bytes, "journal_bytes", "Total operations size in journal"); plb.add_time_avg(l_os_j_lat, "journal_latency", "Average journal queue completing latency"); @@ -1943,18 +1941,20 @@ int FileStore::queue_transactions(Sequencer *posr, vector& tls, if (journal && journal->is_writeable() && !m_filestore_journal_trailing) { Op *o = build_op(tls, onreadable, onreadable_sync, osd_op); + + //prepare and encode transactions data out of lock + bufferlist tbl; + int orig_len = journal->prepare_entry(o->tls, &tbl); + if (handle) handle->suspend_tp_timeout(); op_queue_reserve_throttle(o); + journal->reserve_throttle_and_backoff(tbl.length()); if (handle) handle->reset_tp_timeout(); - journal->throttle(); - //prepare and encode transactions data out of lock - bufferlist tbl; - int orig_len = journal->prepare_entry(o->tls, &tbl); uint64_t op_num = submit_manager.op_submit_start(); o->op = op_num; diff --git a/src/os/filestore/Journal.h b/src/os/filestore/Journal.h index 236e4315a35ca..ca30da4794feb 100644 --- a/src/os/filestore/Journal.h +++ b/src/os/filestore/Journal.h @@ -49,7 +49,14 @@ public: virtual void close() = 0; ///< close an open journal virtual void flush() = 0; - virtual void throttle() = 0; + + /** + * reserve_throttle_and_backoff + * + * Implementation may throttle or backoff based on ops + * reserved here but not yet released using committed_thru. + */ + virtual void reserve_throttle_and_backoff(uint64_t count) = 0; virtual int dump(ostream& out) { return -EOPNOTSUPP; } diff --git a/src/os/filestore/JournalThrottle.cc b/src/os/filestore/JournalThrottle.cc new file mode 100644 index 0000000000000..4a100c6b39fb1 --- /dev/null +++ b/src/os/filestore/JournalThrottle.cc @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "JournalThrottle.h" +#include "include/assert.h" + +bool JournalThrottle::set_params( + double _low_threshhold, + double _high_threshhold, + double _expected_throughput, + double _high_multiple, + double _max_multiple, + uint64_t _throttle_max, + std::ostream *errstream) +{ + return throttle.set_params( + _low_threshhold, + _high_threshhold, + _expected_throughput, + _high_multiple, + _max_multiple, + _throttle_max, + errstream); +} + +std::chrono::duration JournalThrottle::get(uint64_t c) +{ + return throttle.get(c); +} + +uint64_t JournalThrottle::take(uint64_t c) +{ + return throttle.take(c); +} + +void JournalThrottle::register_throttle_seq(uint64_t seq, uint64_t c) +{ + locker l(lock); + journaled_ops.push_back(std::make_pair(seq, c)); +} + +std::pair JournalThrottle::flush(uint64_t mono_id) +{ + uint64_t to_put_bytes = 0; + uint64_t to_put_ops = 0; + { + locker l(lock); + while (!journaled_ops.empty() && + journaled_ops.front().first <= mono_id) { + to_put_bytes += journaled_ops.front().second; + to_put_ops++; + journaled_ops.pop_front(); + } + } + throttle.put(to_put_bytes); + return make_pair(to_put_ops, to_put_bytes); +} + +uint64_t JournalThrottle::get_current() +{ + return throttle.get_current(); +} + +uint64_t JournalThrottle::get_max() +{ + return throttle.get_max(); +} diff --git a/src/os/filestore/JournalThrottle.h b/src/os/filestore/JournalThrottle.h new file mode 100644 index 0000000000000..8a7ce7261072c --- /dev/null +++ b/src/os/filestore/JournalThrottle.h @@ -0,0 +1,101 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_THROTTLE_H +#define CEPH_JOURNAL_THROTTLE_H + +#include "common/Throttle.h" + +#include +#include +#include +#include +#include +#include +#include + +/** + * JournalThrottle + * + * Throttle designed to implement dynamic throttling as the journal fills + * up. The goal is to not delay ops at all when the journal is relatively + * empty, delay ops somewhat as the journal begins to fill (with the delay + * getting linearly longer as the journal fills up to a high water mark), + * and to delay much more aggressively (though still linearly with usage) + * until we hit the max value. + * + * The implementation simply wraps BackoffThrottle with a queue of + * journaled but not synced ops. + * + * The usage pattern is as follows: + * 1) Call get(seq, bytes) before taking the op_queue_throttle + * 2) Once the journal is flushed, flush(max_op_id_flushed) + */ +class JournalThrottle { + BackoffThrottle throttle; + + std::mutex lock; + /// deque + std::deque > journaled_ops; + using locker = std::unique_lock; + +public: + /** + * set_params + * + * Sets params. If the params are invalid, returns false + * and populates errstream (if non-null) with a user compreshensible + * explanation. + */ + bool set_params( + double low_threshhold, + double high_threshhold, + double expected_throughput, + double high_multiple, + double max_multiple, + uint64_t throttle_max, + std::ostream *errstream); + + /** + * gets specified throttle for id mono_id, waiting as necessary + * + * @param c [in] amount to take + * @return duration waited + */ + std::chrono::duration get(uint64_t c); + + /** + * take + * + * Takes specified throttle without waiting + */ + uint64_t take(uint64_t c); + + /** + * register_throttle_seq + * + * Registers a sequence number with an amount of throttle to + * release upon flush() + * + * @param seq [in] seq + */ + void register_throttle_seq(uint64_t seq, uint64_t c); + + + /** + * Releases throttle held by ids <= mono_id + * + * @param mono_id [in] id up to which to flush + * @returns pair + */ + std::pair flush(uint64_t mono_id); + + uint64_t get_current(); + uint64_t get_max(); + + JournalThrottle( + unsigned expected_concurrency ///< [in] determines size of conds + ) : throttle(expected_concurrency) {} +}; + +#endif -- 2.39.5