From a2fa546d02cfe2a910413acdec5ef11dbfacb359 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 20 Jun 2019 17:45:06 -0700 Subject: [PATCH] bluestore: consolidate state transitions and throttles into BlueStoreThrottle This will make it easier to have a single place to emit trace information. Signed-off-by: Samuel Just --- src/os/bluestore/BlueStore.cc | 144 ++++++++++++++++++++-------------- src/os/bluestore/BlueStore.h | 62 ++++++++++----- 2 files changed, 129 insertions(+), 77 deletions(-) diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index ee9556a483746..4c3110eff44ee 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -4103,11 +4103,7 @@ void BlueStore::handle_discard(interval_set& to_release) BlueStore::BlueStore(CephContext *cct, const string& path) : ObjectStore(cct, path), - throttle_bytes(cct, "bluestore_throttle_bytes", - cct->_conf->bluestore_throttle_bytes), - throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", - cct->_conf->bluestore_throttle_bytes + - cct->_conf->bluestore_throttle_deferred_bytes), + bsthrottle(cct), finisher(cct, "commit_finisher", "cfin"), kv_sync_thread(this), kv_finalize_thread(this), @@ -4122,11 +4118,7 @@ BlueStore::BlueStore(CephContext *cct, const string& path, uint64_t _min_alloc_size) : ObjectStore(cct, path), - throttle_bytes(cct, "bluestore_throttle_bytes", - cct->_conf->bluestore_throttle_bytes), - throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", - cct->_conf->bluestore_throttle_bytes + - cct->_conf->bluestore_throttle_deferred_bytes), + bsthrottle(cct), finisher(cct, "commit_finisher", "cfin"), kv_sync_thread(this), kv_finalize_thread(this), @@ -4249,14 +4241,9 @@ void BlueStore::handle_conf_change(const ConfigProxy& conf, _set_throttle_params(); } } - if (changed.count("bluestore_throttle_bytes")) { - throttle_bytes.reset_max(conf->bluestore_throttle_bytes); - throttle_deferred_bytes.reset_max( - conf->bluestore_throttle_bytes + conf->bluestore_throttle_deferred_bytes); - } - if (changed.count("bluestore_throttle_deferred_bytes")) { - throttle_deferred_bytes.reset_max( - conf->bluestore_throttle_bytes + conf->bluestore_throttle_deferred_bytes); + if (changed.count("bluestore_throttle_bytes") || + changed.count("bluestore_throttle_deferred_bytes")) { + bsthrottle.reset_throttle(conf); } if (changed.count("bluestore_max_defer_interval")) { if (bdev) { @@ -10937,7 +10924,7 @@ void BlueStore::_txc_state_proc(TransContext *txc) << " " << txc->get_state_name() << dendl; switch (txc->state) { case TransContext::STATE_PREPARE: - txc->log_state_latency(logger, l_bluestore_state_prepare_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_prepare_lat); if (txc->ioc.has_pending_aios()) { txc->state = TransContext::STATE_AIO_WAIT; txc->had_ios = true; @@ -10948,7 +10935,8 @@ void BlueStore::_txc_state_proc(TransContext *txc) case TransContext::STATE_AIO_WAIT: { - utime_t lat = txc->log_state_latency(logger, l_bluestore_state_aio_wait_lat); + utime_t lat = bsthrottle.log_state_latency( + *txc, logger, l_bluestore_state_aio_wait_lat); if (lat >= cct->_conf->bluestore_log_op_age) { dout(0) << __func__ << " slow aio_wait, txc = " << txc << ", latency = " << lat @@ -10964,7 +10952,7 @@ void BlueStore::_txc_state_proc(TransContext *txc) if (txc->had_ios) { ++txc->osr->txc_with_unstable_io; } - txc->log_state_latency(logger, l_bluestore_state_io_done_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_io_done_lat); txc->state = TransContext::STATE_KV_QUEUED; if (cct->_conf->bluestore_sync_submit_transaction) { if (txc->last_nid >= nid_max || @@ -10988,10 +10976,7 @@ void BlueStore::_txc_state_proc(TransContext *txc) dout(20) << __func__ << " DEBUG randomly forcing submit via kv thread" << dendl; } else { - txc->state = TransContext::STATE_KV_SUBMITTED; - int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t); - ceph_assert(r == 0); - _txc_applied_kv(txc); + _txc_apply_kv(txc, true); } } { @@ -11015,7 +11000,7 @@ void BlueStore::_txc_state_proc(TransContext *txc) // ** fall-thru ** case TransContext::STATE_KV_DONE: - txc->log_state_latency(logger, l_bluestore_state_kv_done_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_kv_done_lat); if (txc->deferred_txn) { txc->state = TransContext::STATE_DEFERRED_QUEUED; _deferred_queue(txc); @@ -11025,12 +11010,12 @@ void BlueStore::_txc_state_proc(TransContext *txc) break; case TransContext::STATE_DEFERRED_CLEANUP: - txc->log_state_latency(logger, l_bluestore_state_deferred_cleanup_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_deferred_cleanup_lat); txc->state = TransContext::STATE_FINISHING; // ** fall-thru ** case TransContext::STATE_FINISHING: - txc->log_state_latency(logger, l_bluestore_state_finishing_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_finishing_lat); _txc_finish(txc); return; @@ -11184,8 +11169,15 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t) _txc_update_store_statfs(txc); } -void BlueStore::_txc_applied_kv(TransContext *txc) +void BlueStore::_txc_apply_kv(TransContext *txc, bool sync_submit_transaction) { + ceph_assert(txc->state == TransContext::STATE_KV_QUEUED); + txc->state = TransContext::STATE_KV_SUBMITTED; + { + int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t); + ceph_assert(r == 0); + } + for (auto ls : { &txc->onodes, &txc->modified_objects }) { for (auto& o : *ls) { dout(20) << __func__ << " onode " << o << " had " << o->flushing_count @@ -11201,6 +11193,7 @@ void BlueStore::_txc_applied_kv(TransContext *txc) void BlueStore::_txc_committed_kv(TransContext *txc) { dout(20) << __func__ << " txc " << txc << dendl; + bsthrottle.complete_kv(*txc); { std::lock_guard l(txc->osr->qlock); txc->state = TransContext::STATE_KV_DONE; @@ -11210,7 +11203,7 @@ void BlueStore::_txc_committed_kv(TransContext *txc) finisher.queue(txc->oncommits); } } - txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_kv_committing_lat); log_latency_fn( __func__, l_bluestore_commit_lat, @@ -11276,6 +11269,7 @@ void BlueStore::_txc_finish(TransContext *txc) osr->qcond.notify_all(); } } + while (!releasing_txc.empty()) { // release to allocator only after all preceding txc's have also // finished any deferred writes that potentially land in these @@ -11283,7 +11277,8 @@ void BlueStore::_txc_finish(TransContext *txc) auto txc = &releasing_txc.front(); _txc_release_alloc(txc); releasing_txc.pop_front(); - txc->log_state_latency(logger, l_bluestore_state_done_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_done_lat); + bsthrottle.complete(*txc); delete txc; } @@ -11302,7 +11297,7 @@ void BlueStore::_txc_finish(TransContext *txc) << dendl; } } - } +} void BlueStore::_txc_release_alloc(TransContext *txc) { @@ -11636,11 +11631,9 @@ void BlueStore::_kv_sync_thread() } for (auto txc : kv_committing) { + throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_queued_lat); if (txc->state == TransContext::STATE_KV_QUEUED) { - txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat); - int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t); - ceph_assert(r == 0); - _txc_applied_kv(txc); + _txc_apply_kv(txc, false); --txc->osr->kv_committing_serially; txc->state = TransContext::STATE_KV_SUBMITTED; if (txc->osr->kv_submitted_waiters) { @@ -11650,7 +11643,6 @@ void BlueStore::_kv_sync_thread() } else { ceph_assert(txc->state == TransContext::STATE_KV_SUBMITTED); - txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat); } if (txc->had_ios) { --txc->osr->txc_with_unstable_io; @@ -11663,7 +11655,7 @@ void BlueStore::_kv_sync_thread() // iteration there will already be ops awake. otherwise, we // end up going to sleep, and then wake up when the very first // transaction is ready for commit. - throttle_bytes.put(costs); + bsthrottle.release_kv_throttle(costs); if (bluefs && after_flush - bluefs_last_balance > @@ -11831,7 +11823,7 @@ void BlueStore::_kv_finalize_thread() if (!deferred_aggressive) { if (deferred_queue_size >= deferred_batch_ops.load() || - throttle_deferred_bytes.past_midpoint()) { + bsthrottle.should_submit_deferred()) { deferred_try_submit(); } } @@ -11940,7 +11932,7 @@ void BlueStore::_deferred_submit_unlock(OpSequencer *osr) deferred_lock.unlock(); for (auto& txc : b->txcs) { - txc.log_state_latency(logger, l_bluestore_state_deferred_queued_lat); + bsthrottle.log_state_latency(txc, logger, l_bluestore_state_deferred_queued_lat); } uint64_t start = 0, pos = 0; bufferlist bl; @@ -12018,12 +12010,12 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr) { for (auto& i : b->txcs) { TransContext *txc = &i; - txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat); + bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_deferred_aio_wait_lat); txc->state = TransContext::STATE_DEFERRED_CLEANUP; costs += txc->cost; } } - throttle_deferred_bytes.put(costs); + bsthrottle.release_deferred_throttle(costs); } { @@ -12132,25 +12124,26 @@ int BlueStore::queue_transactions( handle->suspend_tp_timeout(); auto tstart = mono_clock::now(); - throttle_bytes.get(txc->cost); - if (txc->deferred_txn) { + + if (!bsthrottle.try_start_transaction( + *db, + *txc, + tstart)) { // ensure we do not block here because of deferred writes - if (!throttle_deferred_bytes.get_or_fail(txc->cost)) { - dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive" - << dendl; - ++deferred_aggressive; - deferred_try_submit(); - { - // wake up any previously finished deferred events - std::lock_guard l(kv_lock); - if (!kv_sync_in_progress) { - kv_sync_in_progress = true; - kv_cond.notify_one(); - } + dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive" + << dendl; + ++deferred_aggressive; + deferred_try_submit(); + { + // wake up any previously finished deferred events + std::lock_guard l(kv_lock); + if (!kv_sync_in_progress) { + kv_sync_in_progress = true; + kv_cond.notify_one(); } - throttle_deferred_bytes.get(txc->cost); - --deferred_aggressive; - } + } + bsthrottle.finish_start_transaction(*db, *txc, tstart); + --deferred_aggressive; } auto tend = mono_clock::now(); @@ -14749,6 +14742,39 @@ void BlueStore::log_latency_fn( } +utime_t BlueStore::BlueStoreThrottle::log_state_latency( + TransContext &txc, PerfCounters *logger, int state) +{ + utime_t now = ceph_clock_now(); + utime_t lat = now - txc.last_stamp; + logger->tinc(state, lat); + txc.last_stamp = now; + return lat; +} + +bool BlueStore::BlueStoreThrottle::try_start_transaction( + KeyValueDB &db, + TransContext &txc, + mono_clock::time_point start_throttle_acquire) +{ + throttle_bytes.get(txc.cost); + + if (!txc.deferred_txn || throttle_deferred_bytes.get_or_fail(txc.cost)) { + return true; + } else { + return false; + } +} + +void BlueStore::BlueStoreThrottle::finish_start_transaction( + KeyValueDB &db, + TransContext &txc, + mono_clock::time_point start_throttle_acquire) +{ + ceph_assert(txc.deferred_txn); + throttle_deferred_bytes.get(txc.cost); +} + // DB key value Histogram #define KEY_SLAB 32 #define VALUE_SLAB 64 diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 8976c03b0278b..a06c7633c2c51 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -1512,20 +1512,6 @@ public: } #endif - utime_t log_state_latency(PerfCounters *logger, int state) { - utime_t lat, now = ceph_clock_now(); - lat = now - last_stamp; - logger->tinc(state, lat); -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) { - double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000; - OID_ELAPSED("", usecs, get_state_latency_name(state)); - } -#endif - last_stamp = now; - return lat; - } - CollectionRef ch; OpSequencerRef osr; // this should be ch->osr boost::intrusive::list_member_hook<> sequencer_item; @@ -1598,6 +1584,49 @@ public: } }; + + class BlueStoreThrottle { + Throttle throttle_bytes; ///< submit to commit + Throttle throttle_deferred_bytes; ///< submit to deferred complete + + + public: + BlueStoreThrottle(CephContext *cct) : + throttle_bytes(cct, "bluestore_throttle_bytes", + cct->_conf->bluestore_throttle_bytes), + throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", + cct->_conf->bluestore_throttle_bytes + + cct->_conf->bluestore_throttle_deferred_bytes) + {} + + + utime_t log_state_latency( + TransContext &txc, PerfCounters *logger, int state); + bool try_start_transaction( + KeyValueDB &db, + TransContext &txc, + mono_clock::time_point); + void finish_start_transaction( + KeyValueDB &db, + TransContext &txc, + mono_clock::time_point); + void release_kv_throttle(uint64_t cost) { + throttle_bytes.put(cost); + } + void release_deferred_throttle(uint64_t cost) { + throttle_deferred_bytes.put(cost); + } + bool should_submit_deferred() { + return throttle_deferred_bytes.past_midpoint(); + } + void reset_throttle(const ConfigProxy &conf) { + throttle_bytes.reset_max(conf->bluestore_throttle_bytes); + throttle_deferred_bytes.reset_max( + conf->bluestore_throttle_bytes + + conf->bluestore_throttle_deferred_bytes); + } + } bsthrottle; + typedef boost::intrusive::list< TransContext, boost::intrusive::member_hook< @@ -1838,9 +1867,6 @@ private: std::atomic blobid_last = {0}; std::atomic blobid_max = {0}; - Throttle throttle_bytes; ///< submit to commit - Throttle throttle_deferred_bytes; ///< submit to deferred complete - interval_set bluefs_extents; ///< block extents owned by bluefs interval_set bluefs_extents_reclaiming; ///< currently reclaiming @@ -2202,7 +2228,7 @@ public: private: void _txc_finish_io(TransContext *txc); void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t); - void _txc_applied_kv(TransContext *txc); + void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction); void _txc_committed_kv(TransContext *txc); void _txc_finish(TransContext *txc); void _txc_release_alloc(TransContext *txc); -- 2.39.5