From 41886c5420934dea85121c497bef370cfd290fc2 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 23 Apr 2015 14:51:51 -0700 Subject: [PATCH] os/newstore: throttle over entire write lifecycle Take a global throttle when we submit ops and release when they complete. The first throttles cover the period from submit to commit, while the wal ones also cover the async post-commit wal work. The configs are additive since the wal ones cover both periods; this should make them reasonably idiot-proof. Signed-off-by: Sage Weil --- src/common/config_opts.h | 7 +++++-- src/os/newstore/NewStore.cc | 33 ++++++++++++++++++++++++++------- src/os/newstore/NewStore.h | 31 ++++++++----------------------- 3 files changed, 39 insertions(+), 32 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index e9020757aba8f..f39d6435fdb65 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -796,14 +796,17 @@ OPTION(newstore_backend, OPT_STR, "rocksdb") OPTION(newstore_fail_eio, OPT_BOOL, true) OPTION(newstore_sync_io, OPT_BOOL, false) // perform initial io synchronously OPTION(newstore_sync_transaction, OPT_BOOL, false) // perform kv txn synchronously +OPTION(newstore_sync_wal_apply, OPT_BOOL, true) // perform initial wal work synchronously (possibly in combination with aio so we only *queue* ios) OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value OPTION(newstore_wal_threads, OPT_INT, 4) OPTION(newstore_wal_thread_timeout, OPT_INT, 30) OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120) -OPTION(newstore_wal_max_ops, OPT_U64, 64) -OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024) +OPTION(newstore_max_ops, OPT_U64, 512) +OPTION(newstore_max_bytes, OPT_U64, 64*1024*1024) +OPTION(newstore_wal_max_ops, OPT_U64, 512) +OPTION(newstore_wal_max_bytes, OPT_U64, 128*1024*1024) OPTION(newstore_fid_prealloc, OPT_INT, 1024) OPTION(newstore_nid_prealloc, OPT_INT, 1024) OPTION(newstore_overlay_max_length, OPT_INT, 65536) diff --git a/src/os/newstore/NewStore.cc b/src/os/newstore/NewStore.cc index d501943b99310..a9ece6da05a50 100644 --- a/src/os/newstore/NewStore.cc +++ b/src/os/newstore/NewStore.cc @@ -584,6 +584,14 @@ NewStore::NewStore(CephContext *cct, const string& path) fid_lock("NewStore::fid_lock"), nid_lock("NewStore::nid_lock"), nid_max(0), + throttle_ops(cct, "newstore_max_ops", cct->_conf->newstore_max_ops), + throttle_bytes(cct, "newstore_max_bytes", cct->_conf->newstore_max_bytes), + throttle_wal_ops(cct, "newstore_wal_max_ops", + cct->_conf->newstore_max_ops + + cct->_conf->newstore_wal_max_ops), + throttle_wal_bytes(cct, "newstore_wal_max_bytes", + cct->_conf->newstore_max_bytes + + cct->_conf->newstore_wal_max_bytes), wal_lock("NewStore::wal_lock"), wal_seq(0), wal_tp(cct, @@ -2088,7 +2096,11 @@ void NewStore::_txc_state_proc(TransContext *txc) case TransContext::STATE_KV_DONE: if (txc->wal_txn) { txc->state = TransContext::STATE_WAL_QUEUED; - wal_wq.queue(txc); + if (g_conf->newstore_sync_wal_apply) { + _wal_apply(txc); + } else { + wal_wq.queue(txc); + } return; } txc->state = TransContext::STATE_FINISHING; @@ -2253,6 +2265,9 @@ void NewStore::_txc_finish_kv(TransContext *txc) finisher.queue(txc->oncommits.front()); txc->oncommits.pop_front(); } + + throttle_ops.put(txc->ops); + throttle_bytes.put(txc->bytes); } void NewStore::_txc_finish(TransContext *txc) @@ -2280,6 +2295,9 @@ void NewStore::_txc_finish(TransContext *txc) txc->removed_collections.pop_front(); } + throttle_wal_ops.put(txc->ops); + throttle_wal_bytes.put(txc->bytes); + OpSequencerRef osr = txc->osr; osr->qlock.Lock(); txc->state = TransContext::STATE_DONE; @@ -2402,8 +2420,6 @@ int NewStore::_wal_finish(TransContext *txc) wal_transaction_t& wt = *txc->wal_txn; dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl; - wal_wq.release_throttle(txc); - string key; get_wal_key(wt.seq, &key); KeyValueDB::Transaction cleanup = db->get_transaction(); @@ -2584,10 +2600,6 @@ int NewStore::queue_transactions( tls, &onreadable, &ondisk, &onreadable_sync); int r; - // throttle on wal work - wal_wq.throttle(g_conf->newstore_wal_max_ops, - g_conf->newstore_wal_max_bytes); - // set up the sequencer OpSequencer *osr; if (!posr) @@ -2610,12 +2622,19 @@ int NewStore::queue_transactions( for (list::iterator p = tls.begin(); p != tls.end(); ++p) { (*p)->set_osr(osr); + txc->ops += (*p)->get_num_ops(); + txc->bytes += (*p)->get_num_bytes(); _txc_add_transaction(txc, *p); } r = _txc_finalize(osr, txc); assert(r == 0); + throttle_ops.get(txc->ops); + throttle_bytes.get(txc->bytes); + throttle_wal_ops.get(txc->ops); + throttle_wal_bytes.get(txc->bytes); + // execute (start) _txc_state_proc(txc); return 0; diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index fbe1cf0423e7b..9f97122045fac 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -185,6 +185,8 @@ public: OpSequencerRef osr; boost::intrusive::list_member_hook<> sequencer_item; + uint64_t ops, bytes; + list fds; ///< these fds need to be synced set onodes; ///< these onodes need to be updated/written KeyValueDB::Transaction t; ///< then we will commit this @@ -210,6 +212,8 @@ public: TransContext(OpSequencer *o) : state(STATE_PREPARE), osr(o), + ops(0), + bytes(0), oncommit(NULL), onreadable(NULL), onreadable_sync(NULL), @@ -376,15 +380,11 @@ public: private: NewStore *store; wal_osr_queue_t wal_queue; - uint64_t ops, bytes; - Cond throttle_cond; public: WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp) : ThreadPool::WorkQueue("NewStore::WALWQ", ti, sti, tp), - store(s), - ops(0), - bytes(0) { + store(s) { } bool _empty() { return wal_queue.empty(); @@ -394,8 +394,6 @@ public: wal_queue.push_back(*i->osr); } i->osr->wal_q.push_back(*i); - ++ops; - bytes += i->wal_txn->get_bytes(); return true; } void _dequeue(TransContext *p) { @@ -434,22 +432,6 @@ public: unlock(); drain(); } - - void throttle(uint64_t max_ops, uint64_t max_bytes) { - Mutex& lock = get_lock(); - Mutex::Locker l(lock); - while (ops > max_ops || bytes > max_bytes) { - throttle_cond.Wait(lock); - } - } - - void release_throttle(TransContext *txc) { - lock(); - --ops; - bytes -= txc->wal_txn->get_bytes(); - throttle_cond.Signal(); - unlock(); - } }; struct KVSyncThread : public Thread { @@ -495,6 +477,9 @@ private: uint64_t nid_last; uint64_t nid_max; + Throttle throttle_ops, throttle_bytes; ///< submit to commit + Throttle throttle_wal_ops, throttle_wal_bytes; ///< submit to wal complete + Mutex wal_lock; atomic64_t wal_seq; ThreadPool wal_tp; -- 2.39.5