os/filestore/DBObjectMap.cc
os/filestore/FileJournal.cc
os/filestore/FileStore.cc
+ os/filestore/JournalThrottle.cc
os/filestore/GenericFileStoreBackend.cc
os/filestore/JournalingObjectStore.cc
os/filestore/HashIndex.cc
OPTION(journal_write_header_frequency, OPT_U64, 0)
OPTION(journal_max_write_bytes, OPT_INT, 10 << 20)
OPTION(journal_max_write_entries, OPT_INT, 100)
-OPTION(journal_queue_max_ops, OPT_INT, 300)
-OPTION(journal_queue_max_bytes, OPT_INT, 32 << 20)
+
+/// Target range for journal fullness
+OPTION(journal_throttle_low_threshhold, OPT_DOUBLE, 0.5)
+OPTION(journal_throttle_high_threshhold, OPT_DOUBLE, 0.8)
+
+/// Multiple over expected at high_threshhold (probably don't need to change)
+OPTION(journal_throttle_high_multiple, OPT_DOUBLE, 2)
+/// Multiple over expected at max (probably don't need to change)
+OPTION(journal_throttle_max_multiple, OPT_DOUBLE, 10)
+
OPTION(journal_align_min_size, OPT_INT, 64 << 10) // align data payloads >= this.
OPTION(journal_replay_from, OPT_INT, 0)
OPTION(journal_zero_on_create, OPT_BOOL, false)
os/filestore/DBObjectMap.cc \
os/filestore/FileJournal.cc \
os/filestore/FileStore.cc \
+ os/filestore/JournalThrottle.cc \
os/filestore/GenericFileStoreBackend.cc \
os/filestore/HashIndex.cc \
os/filestore/IndexManager.cc \
os/filestore/DBObjectMap.h \
os/filestore/FileJournal.h \
os/filestore/FileStore.h \
+ os/filestore/JournalThrottle.h \
os/filestore/FDCache.h \
os/filestore/GenericFileStoreBackend.h \
os/filestore/HashIndex.h \
enum {
l_os_first = 84000,
- l_os_jq_max_ops,
l_os_jq_ops,
- l_os_j_ops,
- l_os_jq_max_bytes,
l_os_jq_bytes,
+ l_os_j_ops,
l_os_j_bytes,
l_os_j_lat,
l_os_j_wr,
// throw out what we have so far
full_state = FULL_FULL;
while (!writeq_empty()) {
- put_throttle(1, peek_write().orig_len);
+ complete_write(1, peek_write().orig_len);
pop_write();
}
print_header(header);
if (write_stop) {
dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
while (!writeq_empty()) {
- put_throttle(1, peek_write().orig_len);
+ complete_write(1, peek_write().orig_len);
pop_write();
}
print_header(header);
#else
do_write(bl);
#endif
- put_throttle(orig_ops, orig_bytes);
+ complete_write(orig_ops, orig_bytes);
}
dout(10) << "write_thread_entry finish" << dendl;
<< " (" << oncommit << ")" << dendl;
assert(e.length() > 0);
- throttle_ops.take(1);
- throttle_bytes.take(orig_len);
if (osd_op)
osd_op->mark_event("commit_queued_for_journal_write");
if (logger) {
- logger->set(l_os_jq_max_ops, throttle_ops.get_max());
- logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
- logger->set(l_os_jq_ops, throttle_ops.get_current());
- logger->set(l_os_jq_bytes, throttle_bytes.get_current());
+ logger->inc(l_os_jq_bytes, orig_len);
+ logger->inc(l_os_jq_ops, 1);
+ }
+
+ throttle.register_throttle_seq(seq, e.length());
+ if (logger) {
+ logger->inc(l_os_j_ops, 1);
+ logger->inc(l_os_j_bytes, e.length());
}
{
{
assert(write_lock.is_locked());
Mutex::Locker locker(writeq_lock);
+ if (logger) {
+ logger->dec(l_os_jq_bytes, writeq.front().orig_len);
+ logger->dec(l_os_jq_ops, 1);
+ }
writeq.pop_front();
}
void FileJournal::batch_pop_write(list<write_item> &items)
{
assert(write_lock.is_locked());
- Mutex::Locker locker(writeq_lock);
- writeq.swap(items);
+ {
+ Mutex::Locker locker(writeq_lock);
+ writeq.swap(items);
+ }
+ for (auto &&i : items) {
+ if (logger) {
+ logger->dec(l_os_jq_bytes, i.orig_len);
+ logger->dec(l_os_jq_ops, 1);
+ }
+ }
}
void FileJournal::batch_unpop_write(list<write_item> &items)
{
assert(write_lock.is_locked());
+ for (auto &&i : items) {
+ if (logger) {
+ logger->inc(l_os_jq_bytes, i.orig_len);
+ logger->inc(l_os_jq_ops, 1);
+ }
+ }
Mutex::Locker locker(writeq_lock);
writeq.splice(writeq.begin(), items);
}
{
Mutex::Locker locker(write_lock);
+ auto released = throttle.flush(seq);
+ if (logger) {
+ logger->dec(l_os_j_ops, released.first);
+ logger->dec(l_os_j_bytes, released.second);
+ }
+
if (seq < last_committed_seq) {
dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
assert(seq >= last_committed_seq);
dout(15) << " dropping committed but unwritten seq " << peek_write().seq
<< " len " << peek_write().bl.length()
<< dendl;
- put_throttle(1, peek_write().orig_len);
+ complete_write(1, peek_write().orig_len);
pop_write();
}
}
-void FileJournal::put_throttle(uint64_t ops, uint64_t bytes)
+void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
{
- uint64_t new_ops = throttle_ops.put(ops);
- uint64_t new_bytes = throttle_bytes.put(bytes);
- dout(5) << "put_throttle finished " << ops << " ops and "
- << bytes << " bytes, now "
- << new_ops << " ops and " << new_bytes << " bytes"
- << dendl;
-
- if (logger) {
- logger->inc(l_os_j_ops, ops);
- logger->inc(l_os_j_bytes, bytes);
- logger->set(l_os_jq_ops, new_ops);
- logger->set(l_os_jq_bytes, new_bytes);
- logger->set(l_os_jq_max_ops, throttle_ops.get_max());
- logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
- }
+ dout(5) << __func__ << " finished " << ops << " ops and "
+ << bytes << " bytes" << dendl;
}
int FileJournal::make_writeable()
{
dout(10) << __func__ << dendl;
- int r = _open(true);
+ int r = set_throttle_params();
+ if (r < 0)
+ return r;
+
+ r = _open(true);
if (r < 0)
return r;
read_pos = 0;
must_write_header = true;
+
start_writer();
return 0;
}
+int FileJournal::set_throttle_params()
+{
+ stringstream ss;
+ bool valid = throttle.set_params(
+ g_conf->journal_throttle_low_threshhold,
+ g_conf->journal_throttle_high_threshhold,
+ g_conf->filestore_expected_throughput_bytes,
+ g_conf->journal_throttle_high_multiple,
+ g_conf->journal_throttle_max_multiple,
+ header.max_size - get_top(),
+ &ss);
+
+ if (!valid) {
+ derr << "tried to set invalid params: "
+ << ss.str()
+ << dendl;
+ }
+ return valid ? 0 : -EINVAL;
+}
+
+const char** FileJournal::get_tracked_conf_keys() const
+{
+ static const char *KEYS[] = {
+ "journal_throttle_low_threshhold",
+ "journal_throttle_high_threshhold",
+ "journal_throttle_high_multiple",
+ "journal_throttle_max_multiple",
+ "filestore_expected_throughput_bytes",
+ NULL};
+ return KEYS;
+}
+
void FileJournal::wrap_read_bl(
off64_t pos,
int64_t olen,
&ss);
if (result == SUCCESS) {
journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
+ uint64_t amount_to_take =
+ next_pos > pos ?
+ next_pos - pos :
+ (header.max_size - pos) + (next_pos - get_top());
+ throttle.take(amount_to_take);
+ throttle.register_throttle_seq(next_seq, amount_to_take);
+ if (logger) {
+ logger->inc(l_os_j_ops, 1);
+ logger->inc(l_os_j_bytes, amount_to_take);
+ }
if (next_seq > seq) {
return false;
} else {
return SUCCESS;
}
-void FileJournal::throttle()
+void FileJournal::reserve_throttle_and_backoff(uint64_t count)
{
- if (throttle_ops.wait(g_conf->journal_queue_max_ops))
- dout(2) << "throttle: waited for ops" << dendl;
- if (throttle_bytes.wait(g_conf->journal_queue_max_bytes))
- dout(2) << "throttle: waited for bytes" << dendl;
+ throttle.get(count);
}
void FileJournal::get_header(
#include "common/Mutex.h"
#include "common/Thread.h"
#include "common/Throttle.h"
+#include "JournalThrottle.h"
+
#ifdef HAVE_LIBAIO
# include <libaio.h>
*
* Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
*/
-class FileJournal : public Journal {
+class FileJournal :
+ public Journal,
+ public md_config_obs_t {
public:
/// Protected by finisher_lock
struct completion_item {
// throttle
- Throttle throttle_ops, throttle_bytes;
+ int set_throttle_params();
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(
+ const struct md_config_t *conf,
+ const std::set <std::string> &changed) override {
+ for (const char **i = get_tracked_conf_keys();
+ *i;
+ ++i) {
+ if (changed.count(string(*i))) {
+ set_throttle_params();
+ return;
+ }
+ }
+ }
- void put_throttle(uint64_t ops, uint64_t bytes);
+ void complete_write(uint64_t ops, uint64_t bytes);
+ JournalThrottle throttle;
// write thread
Mutex write_lock;
full_state(FULL_NOTFULL),
fd(-1),
writing_seq(0),
- throttle_ops(g_ceph_context, "journal_ops", g_conf->journal_queue_max_ops),
- throttle_bytes(g_ceph_context, "journal_bytes", g_conf->journal_queue_max_bytes),
+ throttle(g_conf->filestore_caller_concurrency),
write_lock("FileJournal::write_lock", false, true, false, g_ceph_context),
write_stop(true),
aio_stop(true),
aio = false;
}
#endif
+
+ g_conf->add_observer(this);
}
~FileJournal() {
assert(fd == -1);
delete[] zero_buf;
+ g_conf->remove_observer(this);
}
int check();
void flush();
- void throttle();
+ void reserve_throttle_and_backoff(uint64_t count);
bool is_writeable() {
return read_pos == 0;
// initialize logger
PerfCountersBuilder plb(g_ceph_context, internal_name, l_os_first, l_os_last);
- plb.add_u64(l_os_jq_max_ops, "journal_queue_max_ops", "Max operations in journal queue");
plb.add_u64(l_os_jq_ops, "journal_queue_ops", "Operations in journal queue");
plb.add_u64_counter(l_os_j_ops, "journal_ops", "Total journal entries written");
- plb.add_u64(l_os_jq_max_bytes, "journal_queue_max_bytes", "Max data in journal queue");
plb.add_u64(l_os_jq_bytes, "journal_queue_bytes", "Size of journal queue");
plb.add_u64_counter(l_os_j_bytes, "journal_bytes", "Total operations size in journal");
plb.add_time_avg(l_os_j_lat, "journal_latency", "Average journal queue completing latency");
if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
+
+ //prepare and encode transactions data out of lock
+ bufferlist tbl;
+ int orig_len = journal->prepare_entry(o->tls, &tbl);
+
if (handle)
handle->suspend_tp_timeout();
op_queue_reserve_throttle(o);
+ journal->reserve_throttle_and_backoff(tbl.length());
if (handle)
handle->reset_tp_timeout();
- journal->throttle();
- //prepare and encode transactions data out of lock
- bufferlist tbl;
- int orig_len = journal->prepare_entry(o->tls, &tbl);
uint64_t op_num = submit_manager.op_submit_start();
o->op = op_num;
virtual void close() = 0; ///< close an open journal
virtual void flush() = 0;
- virtual void throttle() = 0;
+
+ /**
+ * reserve_throttle_and_backoff
+ *
+ * Implementation may throttle or backoff based on ops
+ * reserved here but not yet released using committed_thru.
+ */
+ virtual void reserve_throttle_and_backoff(uint64_t count) = 0;
virtual int dump(ostream& out) { return -EOPNOTSUPP; }
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "JournalThrottle.h"
+#include "include/assert.h"
+
+bool JournalThrottle::set_params(
+ double _low_threshhold,
+ double _high_threshhold,
+ double _expected_throughput,
+ double _high_multiple,
+ double _max_multiple,
+ uint64_t _throttle_max,
+ std::ostream *errstream)
+{
+ return throttle.set_params(
+ _low_threshhold,
+ _high_threshhold,
+ _expected_throughput,
+ _high_multiple,
+ _max_multiple,
+ _throttle_max,
+ errstream);
+}
+
+std::chrono::duration<double> JournalThrottle::get(uint64_t c)
+{
+ return throttle.get(c);
+}
+
+uint64_t JournalThrottle::take(uint64_t c)
+{
+ return throttle.take(c);
+}
+
+void JournalThrottle::register_throttle_seq(uint64_t seq, uint64_t c)
+{
+ locker l(lock);
+ journaled_ops.push_back(std::make_pair(seq, c));
+}
+
+std::pair<uint64_t, uint64_t> JournalThrottle::flush(uint64_t mono_id)
+{
+ uint64_t to_put_bytes = 0;
+ uint64_t to_put_ops = 0;
+ {
+ locker l(lock);
+ while (!journaled_ops.empty() &&
+ journaled_ops.front().first <= mono_id) {
+ to_put_bytes += journaled_ops.front().second;
+ to_put_ops++;
+ journaled_ops.pop_front();
+ }
+ }
+ throttle.put(to_put_bytes);
+ return make_pair(to_put_ops, to_put_bytes);
+}
+
+uint64_t JournalThrottle::get_current()
+{
+ return throttle.get_current();
+}
+
+uint64_t JournalThrottle::get_max()
+{
+ return throttle.get_max();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_THROTTLE_H
+#define CEPH_JOURNAL_THROTTLE_H
+
+#include "common/Throttle.h"
+
+#include <list>
+#include <deque>
+#include <condition_variable>
+#include <thread>
+#include <vector>
+#include <chrono>
+#include <iostream>
+
+/**
+ * JournalThrottle
+ *
+ * Throttle designed to implement dynamic throttling as the journal fills
+ * up. The goal is to not delay ops at all when the journal is relatively
+ * empty, delay ops somewhat as the journal begins to fill (with the delay
+ * getting linearly longer as the journal fills up to a high water mark),
+ * and to delay much more aggressively (though still linearly with usage)
+ * until we hit the max value.
+ *
+ * The implementation simply wraps BackoffThrottle with a queue of
+ * journaled but not synced ops.
+ *
+ * The usage pattern is as follows:
+ * 1) Call get(seq, bytes) before taking the op_queue_throttle
+ * 2) Once the journal is flushed, flush(max_op_id_flushed)
+ */
+class JournalThrottle {
+ BackoffThrottle throttle;
+
+ std::mutex lock;
+ /// deque<id, count>
+ std::deque<std::pair<uint64_t, uint64_t> > journaled_ops;
+ using locker = std::unique_lock<std::mutex>;
+
+public:
+ /**
+ * set_params
+ *
+ * Sets params. If the params are invalid, returns false
+ * and populates errstream (if non-null) with a user compreshensible
+ * explanation.
+ */
+ bool set_params(
+ double low_threshhold,
+ double high_threshhold,
+ double expected_throughput,
+ double high_multiple,
+ double max_multiple,
+ uint64_t throttle_max,
+ std::ostream *errstream);
+
+ /**
+ * gets specified throttle for id mono_id, waiting as necessary
+ *
+ * @param c [in] amount to take
+ * @return duration waited
+ */
+ std::chrono::duration<double> get(uint64_t c);
+
+ /**
+ * take
+ *
+ * Takes specified throttle without waiting
+ */
+ uint64_t take(uint64_t c);
+
+ /**
+ * register_throttle_seq
+ *
+ * Registers a sequence number with an amount of throttle to
+ * release upon flush()
+ *
+ * @param seq [in] seq
+ */
+ void register_throttle_seq(uint64_t seq, uint64_t c);
+
+
+ /**
+ * Releases throttle held by ids <= mono_id
+ *
+ * @param mono_id [in] id up to which to flush
+ * @returns pair<ops_flushed, bytes_flushed>
+ */
+ std::pair<uint64_t, uint64_t> flush(uint64_t mono_id);
+
+ uint64_t get_current();
+ uint64_t get_max();
+
+ JournalThrottle(
+ unsigned expected_concurrency ///< [in] determines size of conds
+ ) : throttle(expected_concurrency) {}
+};
+
+#endif