OPTION(filestore_journal_trailing, OPT_BOOL, false)
OPTION(filestore_queue_max_ops, OPT_INT, 50)
OPTION(filestore_queue_max_bytes, OPT_INT, 100 << 20)
-OPTION(filestore_queue_committing_max_ops, OPT_INT, 500) // this is ON TOP of filestore_queue_max_*
-OPTION(filestore_queue_committing_max_bytes, OPT_INT, 100 << 20) // "
+
+OPTION(filestore_caller_concurrency, OPT_INT, 10)
+
+/// Expected filestore throughput in B/s
+OPTION(filestore_expected_throughput_bytes, OPT_DOUBLE, 100 << 20)
+/// Expected filestore throughput in ops/s
+OPTION(filestore_expected_throughput_ops, OPT_DOUBLE, 100)
+
+/// Filestore max delay multiple (probably don't need to change)
+OPTION(filestore_queue_max_delay_multiple, OPT_DOUBLE, 10)
+/// Filestore max delay multiple (probably don't need to change)
+OPTION(filestore_queue_high_delay_multiple, OPT_DOUBLE, 2)
+
+/// Use above to inject delays intended to keep the op queue between low and high
+OPTION(filestore_queue_low_threshhold, OPT_DOUBLE, 0.2)
+OPTION(filestore_queue_high_threshhold, OPT_DOUBLE, 0.8)
+
OPTION(filestore_op_threads, OPT_INT, 2)
OPTION(filestore_op_thread_timeout, OPT_INT, 60)
OPTION(filestore_op_thread_suicide_timeout, OPT_INT, 180)
fdcache(g_ceph_context),
wbthrottle(g_ceph_context),
next_osr_id(0),
- throttle_ops(g_ceph_context, "filestore_ops", g_conf->filestore_queue_max_ops),
- throttle_bytes(g_ceph_context, "filestore_bytes", g_conf->filestore_queue_max_bytes),
+ throttle_ops(g_conf->filestore_caller_concurrency),
+ throttle_bytes(g_conf->filestore_caller_concurrency),
m_ondisk_finisher_num(g_conf->filestore_ondisk_finisher_threads),
m_apply_finisher_num(g_conf->filestore_apply_finisher_threads),
op_tp(g_ceph_context, "FileStore::op_tp", "tp_fstore_op", g_conf->filestore_op_threads, "filestore_op_threads"),
m_journal_force_aio(g_conf->journal_force_aio),
m_osd_rollback_to_cluster_snap(g_conf->osd_rollback_to_cluster_snap),
m_osd_use_stale_snap(g_conf->osd_use_stale_snap),
- m_filestore_queue_max_ops(g_conf->filestore_queue_max_ops),
- m_filestore_queue_max_bytes(g_conf->filestore_queue_max_bytes),
- m_filestore_queue_committing_max_ops(g_conf->filestore_queue_committing_max_ops),
- m_filestore_queue_committing_max_bytes(g_conf->filestore_queue_committing_max_bytes),
m_filestore_do_dump(false),
m_filestore_dump_fmt(true),
m_filestore_sloppy_crc(g_conf->filestore_sloppy_crc),
dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
+ ret = set_throttle_params();
+ if (ret != 0)
+ goto done;
+
// make sure global base dir exists
if (::access(basedir.c_str(), R_OK | W_OK)) {
ret = -errno;
op_wq.queue(osr);
}
-void FileStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle)
+void FileStore::op_queue_reserve_throttle(Op *o)
{
- // Do not call while holding the journal lock!
- uint64_t max_ops = m_filestore_queue_max_ops;
- uint64_t max_bytes = m_filestore_queue_max_bytes;
-
- if (backend->can_checkpoint() && is_committing()) {
- max_ops += m_filestore_queue_committing_max_ops;
- max_bytes += m_filestore_queue_committing_max_bytes;
- }
-
- logger->set(l_os_oq_max_ops, max_ops);
- logger->set(l_os_oq_max_bytes, max_bytes);
-
- if (handle)
- handle->suspend_tp_timeout();
- if (throttle_ops.should_wait(1) ||
- (throttle_bytes.get_current() // let single large ops through!
- && throttle_bytes.should_wait(o->bytes))) {
- dout(2) << "waiting " << throttle_ops.get_current() + 1 << " > " << max_ops << " ops || "
- << throttle_bytes.get_current() + o->bytes << " > " << max_bytes << dendl;
- }
throttle_ops.get();
throttle_bytes.get(o->bytes);
- if (handle)
- handle->reset_tp_timeout();
logger->set(l_os_oq_ops, throttle_ops.get_current());
logger->set(l_os_oq_bytes, throttle_bytes.get_current());
if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
- op_queue_reserve_throttle(o, handle);
+ if (handle)
+ handle->suspend_tp_timeout();
+
+ op_queue_reserve_throttle(o);
+
+ if (handle)
+ handle->reset_tp_timeout();
+
journal->throttle();
//prepare and encode transactions data out of lock
bufferlist tbl;
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
dout(5) << __func__ << " (no journal) " << o << " " << tls << dendl;
- op_queue_reserve_throttle(o, handle);
+ if (handle)
+ handle->suspend_tp_timeout();
+
+ op_queue_reserve_throttle(o);
+
+ if (handle)
+ handle->reset_tp_timeout();
uint64_t op_num = submit_manager.op_submit_start();
o->op = op_num;
"filestore_max_sync_interval",
"filestore_queue_max_ops",
"filestore_queue_max_bytes",
- "filestore_queue_committing_max_ops",
- "filestore_queue_committing_max_bytes",
+ "filestore_queue_max_ops",
+ "filestore_expected_throughput_bytes",
+ "filestore_expected_throughput_ops",
+ "filestore_queue_low_threshhold",
+ "filestore_queue_high_threshhold",
+ "filestore_queue_high_delay_multiple",
+ "filestore_queue_max_delay_multiple",
"filestore_commit_timeout",
"filestore_dump_file",
"filestore_kill_at",
Mutex::Locker l(lock);
set_xattr_limits_via_conf();
}
+
+ if (changed.count("filestore_queue_max_bytes") ||
+ changed.count("filestore_queue_max_ops") ||
+ changed.count("filestore_expected_throughput_bytes") ||
+ changed.count("filestore_expected_throughput_ops") ||
+ changed.count("filestore_queue_low_threshhold") ||
+ changed.count("filestore_queue_high_threshhold") ||
+ changed.count("filestore_queue_high_delay_multiple") ||
+ changed.count("filestore_queue_max_delay_multiple")) {
+ Mutex::Locker l(lock);
+ set_throttle_params();
+ }
+
if (changed.count("filestore_min_sync_interval") ||
changed.count("filestore_max_sync_interval") ||
- changed.count("filestore_queue_max_ops") ||
- changed.count("filestore_queue_max_bytes") ||
- changed.count("filestore_queue_committing_max_ops") ||
- changed.count("filestore_queue_committing_max_bytes") ||
changed.count("filestore_kill_at") ||
changed.count("filestore_fail_eio") ||
changed.count("filestore_sloppy_crc") ||
Mutex::Locker l(lock);
m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
- m_filestore_queue_max_ops = conf->filestore_queue_max_ops;
- m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes;
- m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops;
- m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes;
m_filestore_kill_at.set(conf->filestore_kill_at);
m_filestore_fail_eio = conf->filestore_fail_eio;
m_filestore_fadvise = conf->filestore_fadvise;
m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
- throttle_ops.reset_max(conf->filestore_queue_max_ops);
- throttle_bytes.reset_max(conf->filestore_queue_max_bytes);
}
if (changed.count("filestore_commit_timeout")) {
Mutex::Locker l(sync_entry_timeo_lock);
}
}
+int FileStore::set_throttle_params()
+{
+ stringstream ss;
+ bool valid = throttle_bytes.set_params(
+ g_conf->filestore_queue_low_threshhold,
+ g_conf->filestore_queue_high_threshhold,
+ g_conf->filestore_expected_throughput_bytes,
+ g_conf->filestore_queue_high_delay_multiple,
+ g_conf->filestore_queue_max_delay_multiple,
+ g_conf->filestore_queue_max_bytes,
+ &ss);
+
+ valid &= throttle_ops.set_params(
+ g_conf->filestore_queue_low_threshhold,
+ g_conf->filestore_queue_high_threshhold,
+ g_conf->filestore_expected_throughput_ops,
+ g_conf->filestore_queue_high_delay_multiple,
+ g_conf->filestore_queue_max_delay_multiple,
+ g_conf->filestore_queue_max_ops,
+ &ss);
+
+ logger->set(l_os_oq_max_ops, throttle_ops.get_max());
+ logger->set(l_os_oq_max_bytes, throttle_bytes.get_max());
+
+ if (!valid) {
+ derr << "tried to set invalid params: "
+ << ss.str()
+ << dendl;
+ }
+ return valid ? 0 : -EINVAL;
+}
+
void FileStore::dump_start(const std::string& file)
{
dout(10) << "dump_start " << file << dendl;
atomic_t next_osr_id;
deque<OpSequencer*> op_queue;
- Throttle throttle_ops, throttle_bytes;
+ BackoffThrottle throttle_ops, throttle_bytes;
const int m_ondisk_finisher_num;
const int m_apply_finisher_num;
vector<Finisher*> ondisk_finishers;
Context *onreadable, Context *onreadable_sync,
TrackedOpRef osd_op);
void queue_op(OpSequencer *osr, Op *o);
- void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL);
+ void op_queue_reserve_throttle(Op *o);
void op_queue_release_throttle(Op *o);
void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
friend struct C_JournaledAhead;
virtual const char** get_tracked_conf_keys() const;
virtual void handle_conf_change(const struct md_config_t *conf,
const std::set <std::string> &changed);
+ int set_throttle_params();
float m_filestore_commit_timeout;
bool m_filestore_journal_parallel;
bool m_filestore_journal_trailing;
bool m_journal_dio, m_journal_aio, m_journal_force_aio;
std::string m_osd_rollback_to_cluster_snap;
bool m_osd_use_stale_snap;
- int m_filestore_queue_max_ops;
- int m_filestore_queue_max_bytes;
- int m_filestore_queue_committing_max_ops;
- int m_filestore_queue_committing_max_bytes;
bool m_filestore_do_dump;
std::ofstream m_filestore_dump;
JSONFormatter m_filestore_dump_fmt;