BlueStore::BlueStore(CephContext *cct, const string& path)
: ObjectStore(cct, path),
- throttle_bytes(cct, "bluestore_throttle_bytes",
- cct->_conf->bluestore_throttle_bytes),
- throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
- cct->_conf->bluestore_throttle_bytes +
- cct->_conf->bluestore_throttle_deferred_bytes),
+ bsthrottle(cct),
finisher(cct, "commit_finisher", "cfin"),
kv_sync_thread(this),
kv_finalize_thread(this),
const string& path,
uint64_t _min_alloc_size)
: ObjectStore(cct, path),
- throttle_bytes(cct, "bluestore_throttle_bytes",
- cct->_conf->bluestore_throttle_bytes),
- throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
- cct->_conf->bluestore_throttle_bytes +
- cct->_conf->bluestore_throttle_deferred_bytes),
+ bsthrottle(cct),
finisher(cct, "commit_finisher", "cfin"),
kv_sync_thread(this),
kv_finalize_thread(this),
_set_throttle_params();
}
}
- if (changed.count("bluestore_throttle_bytes")) {
- throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
- throttle_deferred_bytes.reset_max(
- conf->bluestore_throttle_bytes + conf->bluestore_throttle_deferred_bytes);
- }
- if (changed.count("bluestore_throttle_deferred_bytes")) {
- throttle_deferred_bytes.reset_max(
- conf->bluestore_throttle_bytes + conf->bluestore_throttle_deferred_bytes);
+ if (changed.count("bluestore_throttle_bytes") ||
+ changed.count("bluestore_throttle_deferred_bytes")) {
+ bsthrottle.reset_throttle(conf);
}
if (changed.count("bluestore_max_defer_interval")) {
if (bdev) {
<< " " << txc->get_state_name() << dendl;
switch (txc->state) {
case TransContext::STATE_PREPARE:
- txc->log_state_latency(logger, l_bluestore_state_prepare_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_prepare_lat);
if (txc->ioc.has_pending_aios()) {
txc->state = TransContext::STATE_AIO_WAIT;
txc->had_ios = true;
case TransContext::STATE_AIO_WAIT:
{
- utime_t lat = txc->log_state_latency(logger, l_bluestore_state_aio_wait_lat);
+ utime_t lat = bsthrottle.log_state_latency(
+ *txc, logger, l_bluestore_state_aio_wait_lat);
if (lat >= cct->_conf->bluestore_log_op_age) {
dout(0) << __func__ << " slow aio_wait, txc = " << txc
<< ", latency = " << lat
if (txc->had_ios) {
++txc->osr->txc_with_unstable_io;
}
- txc->log_state_latency(logger, l_bluestore_state_io_done_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_io_done_lat);
txc->state = TransContext::STATE_KV_QUEUED;
if (cct->_conf->bluestore_sync_submit_transaction) {
if (txc->last_nid >= nid_max ||
dout(20) << __func__ << " DEBUG randomly forcing submit via kv thread"
<< dendl;
} else {
- txc->state = TransContext::STATE_KV_SUBMITTED;
- int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
- ceph_assert(r == 0);
- _txc_applied_kv(txc);
+ _txc_apply_kv(txc, true);
}
}
{
// ** fall-thru **
case TransContext::STATE_KV_DONE:
- txc->log_state_latency(logger, l_bluestore_state_kv_done_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_kv_done_lat);
if (txc->deferred_txn) {
txc->state = TransContext::STATE_DEFERRED_QUEUED;
_deferred_queue(txc);
break;
case TransContext::STATE_DEFERRED_CLEANUP:
- txc->log_state_latency(logger, l_bluestore_state_deferred_cleanup_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_deferred_cleanup_lat);
txc->state = TransContext::STATE_FINISHING;
// ** fall-thru **
case TransContext::STATE_FINISHING:
- txc->log_state_latency(logger, l_bluestore_state_finishing_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_finishing_lat);
_txc_finish(txc);
return;
_txc_update_store_statfs(txc);
}
-void BlueStore::_txc_applied_kv(TransContext *txc)
+void BlueStore::_txc_apply_kv(TransContext *txc, bool sync_submit_transaction)
{
+ ceph_assert(txc->state == TransContext::STATE_KV_QUEUED);
+ txc->state = TransContext::STATE_KV_SUBMITTED;
+ {
+ int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
+ ceph_assert(r == 0);
+ }
+
for (auto ls : { &txc->onodes, &txc->modified_objects }) {
for (auto& o : *ls) {
dout(20) << __func__ << " onode " << o << " had " << o->flushing_count
void BlueStore::_txc_committed_kv(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << dendl;
+ bsthrottle.complete_kv(*txc);
{
std::lock_guard l(txc->osr->qlock);
txc->state = TransContext::STATE_KV_DONE;
finisher.queue(txc->oncommits);
}
}
- txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_kv_committing_lat);
log_latency_fn(
__func__,
l_bluestore_commit_lat,
osr->qcond.notify_all();
}
}
+
while (!releasing_txc.empty()) {
// release to allocator only after all preceding txc's have also
// finished any deferred writes that potentially land in these
auto txc = &releasing_txc.front();
_txc_release_alloc(txc);
releasing_txc.pop_front();
- txc->log_state_latency(logger, l_bluestore_state_done_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_done_lat);
+ bsthrottle.complete(*txc);
delete txc;
}
<< dendl;
}
}
- }
+}
void BlueStore::_txc_release_alloc(TransContext *txc)
{
}
for (auto txc : kv_committing) {
+ throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_queued_lat);
if (txc->state == TransContext::STATE_KV_QUEUED) {
- txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
- int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
- ceph_assert(r == 0);
- _txc_applied_kv(txc);
+ _txc_apply_kv(txc, false);
--txc->osr->kv_committing_serially;
txc->state = TransContext::STATE_KV_SUBMITTED;
if (txc->osr->kv_submitted_waiters) {
} else {
ceph_assert(txc->state == TransContext::STATE_KV_SUBMITTED);
- txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
}
if (txc->had_ios) {
--txc->osr->txc_with_unstable_io;
// iteration there will already be ops awake. otherwise, we
// end up going to sleep, and then wake up when the very first
// transaction is ready for commit.
- throttle_bytes.put(costs);
+ bsthrottle.release_kv_throttle(costs);
if (bluefs &&
after_flush - bluefs_last_balance >
if (!deferred_aggressive) {
if (deferred_queue_size >= deferred_batch_ops.load() ||
- throttle_deferred_bytes.past_midpoint()) {
+ bsthrottle.should_submit_deferred()) {
deferred_try_submit();
}
}
deferred_lock.unlock();
for (auto& txc : b->txcs) {
- txc.log_state_latency(logger, l_bluestore_state_deferred_queued_lat);
+ bsthrottle.log_state_latency(txc, logger, l_bluestore_state_deferred_queued_lat);
}
uint64_t start = 0, pos = 0;
bufferlist bl;
{
for (auto& i : b->txcs) {
TransContext *txc = &i;
- txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat);
+ bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_deferred_aio_wait_lat);
txc->state = TransContext::STATE_DEFERRED_CLEANUP;
costs += txc->cost;
}
}
- throttle_deferred_bytes.put(costs);
+ bsthrottle.release_deferred_throttle(costs);
}
{
handle->suspend_tp_timeout();
auto tstart = mono_clock::now();
- throttle_bytes.get(txc->cost);
- if (txc->deferred_txn) {
+
+ if (!bsthrottle.try_start_transaction(
+ *db,
+ *txc,
+ tstart)) {
// ensure we do not block here because of deferred writes
- if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
- dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
- << dendl;
- ++deferred_aggressive;
- deferred_try_submit();
- {
- // wake up any previously finished deferred events
- std::lock_guard l(kv_lock);
- if (!kv_sync_in_progress) {
- kv_sync_in_progress = true;
- kv_cond.notify_one();
- }
+ dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
+ << dendl;
+ ++deferred_aggressive;
+ deferred_try_submit();
+ {
+ // wake up any previously finished deferred events
+ std::lock_guard l(kv_lock);
+ if (!kv_sync_in_progress) {
+ kv_sync_in_progress = true;
+ kv_cond.notify_one();
}
- throttle_deferred_bytes.get(txc->cost);
- --deferred_aggressive;
- }
+ }
+ bsthrottle.finish_start_transaction(*db, *txc, tstart);
+ --deferred_aggressive;
}
auto tend = mono_clock::now();
}
+utime_t BlueStore::BlueStoreThrottle::log_state_latency(
+ TransContext &txc, PerfCounters *logger, int state)
+{
+ utime_t now = ceph_clock_now();
+ utime_t lat = now - txc.last_stamp;
+ logger->tinc(state, lat);
+ txc.last_stamp = now;
+ return lat;
+}
+
+bool BlueStore::BlueStoreThrottle::try_start_transaction(
+ KeyValueDB &db,
+ TransContext &txc,
+ mono_clock::time_point start_throttle_acquire)
+{
+ throttle_bytes.get(txc.cost);
+
+ if (!txc.deferred_txn || throttle_deferred_bytes.get_or_fail(txc.cost)) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void BlueStore::BlueStoreThrottle::finish_start_transaction(
+ KeyValueDB &db,
+ TransContext &txc,
+ mono_clock::time_point start_throttle_acquire)
+{
+ ceph_assert(txc.deferred_txn);
+ throttle_deferred_bytes.get(txc.cost);
+}
+
// DB key value Histogram
#define KEY_SLAB 32
#define VALUE_SLAB 64
}
#endif
- utime_t log_state_latency(PerfCounters *logger, int state) {
- utime_t lat, now = ceph_clock_now();
- lat = now - last_stamp;
- logger->tinc(state, lat);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
- if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
- double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
- OID_ELAPSED("", usecs, get_state_latency_name(state));
- }
-#endif
- last_stamp = now;
- return lat;
- }
-
CollectionRef ch;
OpSequencerRef osr; // this should be ch->osr
boost::intrusive::list_member_hook<> sequencer_item;
}
};
+
+ class BlueStoreThrottle {
+ Throttle throttle_bytes; ///< submit to commit
+ Throttle throttle_deferred_bytes; ///< submit to deferred complete
+
+
+ public:
+ BlueStoreThrottle(CephContext *cct) :
+ throttle_bytes(cct, "bluestore_throttle_bytes",
+ cct->_conf->bluestore_throttle_bytes),
+ throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
+ cct->_conf->bluestore_throttle_bytes +
+ cct->_conf->bluestore_throttle_deferred_bytes)
+ {}
+
+
+ utime_t log_state_latency(
+ TransContext &txc, PerfCounters *logger, int state);
+ bool try_start_transaction(
+ KeyValueDB &db,
+ TransContext &txc,
+ mono_clock::time_point);
+ void finish_start_transaction(
+ KeyValueDB &db,
+ TransContext &txc,
+ mono_clock::time_point);
+ void release_kv_throttle(uint64_t cost) {
+ throttle_bytes.put(cost);
+ }
+ void release_deferred_throttle(uint64_t cost) {
+ throttle_deferred_bytes.put(cost);
+ }
+ bool should_submit_deferred() {
+ return throttle_deferred_bytes.past_midpoint();
+ }
+ void reset_throttle(const ConfigProxy &conf) {
+ throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
+ throttle_deferred_bytes.reset_max(
+ conf->bluestore_throttle_bytes +
+ conf->bluestore_throttle_deferred_bytes);
+ }
+ } bsthrottle;
+
typedef boost::intrusive::list<
TransContext,
boost::intrusive::member_hook<
std::atomic<uint64_t> blobid_last = {0};
std::atomic<uint64_t> blobid_max = {0};
- Throttle throttle_bytes; ///< submit to commit
- Throttle throttle_deferred_bytes; ///< submit to deferred complete
-
interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
private:
void _txc_finish_io(TransContext *txc);
void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
- void _txc_applied_kv(TransContext *txc);
+ void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
void _txc_committed_kv(TransContext *txc);
void _txc_finish(TransContext *txc);
void _txc_release_alloc(TransContext *txc);