BlueStore::BlueStore(CephContext *cct, const string& path)
: ObjectStore(cct, path),
- throttle_ops(cct, "bluestore_max_ops", cct->_conf->bluestore_max_ops),
throttle_bytes(cct, "bluestore_max_bytes", cct->_conf->bluestore_max_bytes),
- throttle_deferred_ops(cct, "bluestore_deferred_max_ops",
- cct->_conf->bluestore_max_ops +
- cct->_conf->bluestore_deferred_max_ops),
throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes",
cct->_conf->bluestore_max_bytes +
cct->_conf->bluestore_deferred_max_bytes),
const string& path,
uint64_t _min_alloc_size)
: ObjectStore(cct, path),
- throttle_ops(cct, "bluestore_max_ops", cct->_conf->bluestore_max_ops),
throttle_bytes(cct, "bluestore_max_bytes", cct->_conf->bluestore_max_bytes),
- throttle_deferred_ops(cct, "bluestore_deferred_max_ops",
- cct->_conf->bluestore_max_ops +
- cct->_conf->bluestore_deferred_max_ops),
throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes",
cct->_conf->bluestore_max_bytes +
cct->_conf->bluestore_deferred_max_bytes),
_set_alloc_sizes();
}
}
- if (changed.count("bluestore_max_ops")) {
- throttle_ops.reset_max(conf->bluestore_max_ops);
- throttle_deferred_ops.reset_max(
- conf->bluestore_max_ops + conf->bluestore_deferred_max_ops);
+ if (changed.count("bluestore_throttle_cost_per_io") ||
+ changed.count("bluestore_throttle_cost_per_io_hdd") ||
+ changed.count("bluestore_throttle_cost_per_io_ssd")) {
+ if (bdev) {
+ _set_throttle_params();
+ }
}
if (changed.count("bluestore_max_bytes")) {
throttle_bytes.reset_max(conf->bluestore_max_bytes);
throttle_deferred_bytes.reset_max(
conf->bluestore_max_bytes + conf->bluestore_deferred_max_bytes);
}
- if (changed.count("bluestore_deferred_max_ops")) {
- throttle_deferred_ops.reset_max(
- conf->bluestore_max_ops + conf->bluestore_deferred_max_ops);
- }
if (changed.count("bluestore_deferred_max_bytes")) {
throttle_deferred_bytes.reset_max(
conf->bluestore_max_bytes + conf->bluestore_deferred_max_bytes);
<< dendl;
}
+void BlueStore::_set_throttle_params()
+{
+ if (cct->_conf->bluestore_throttle_cost_per_io) {
+ throttle_cost_per_io = cct->_conf->bluestore_throttle_cost_per_io;
+ } else {
+ assert(bdev);
+ if (bdev->is_rotational()) {
+ throttle_cost_per_io = cct->_conf->bluestore_throttle_cost_per_io_hdd;
+ } else {
+ throttle_cost_per_io = cct->_conf->bluestore_throttle_cost_per_io_ssd;
+ }
+ }
+
+ dout(10) << __func__ << " throttle_cost_per_io " << throttle_cost_per_io
+ << dendl;
+}
+
void BlueStore::_init_logger()
{
PerfCountersBuilder b(cct, "bluestore",
<< std::dec << dendl;
}
_set_alloc_sizes();
+ _set_throttle_params();
return 0;
}
return txc;
}
+void BlueStore::_txc_calc_cost(TransContext *txc)
+{
+ // this is about the simplest model for trasnaction cost you can
+ // imagine. there is some fixed overhead cost by saying there is a
+ // minimum of one "io". and then we have some cost per "io" that is
+ // a configurable (with different hdd and ssd defaults), and add
+ // that to the bytes value.
+ int ios = 1; // one "io" for the kv commit
+ for (auto& p : txc->ioc.pending_aios) {
+ ios += p.iov.size();
+ }
+ txc->cost = ios * throttle_cost_per_io + txc->bytes;
+ dout(10) << __func__ << " " << txc << " cost " << txc->cost << " ("
+ << ios << " ios * " << throttle_cost_per_io << " + " << txc->bytes
+ << " bytes)" << dendl;
+}
+
void BlueStore::_txc_update_store_statfs(TransContext *txc)
{
if (txc->statfs_delta.is_empty())
// iteration there will already be ops awake. otherwise, we
// end up going to sleep, and then wake up when the very first
// transaction is ready for commit.
- throttle_ops.put(txc->ops);
- throttle_bytes.put(txc->bytes);
+ throttle_bytes.put(txc->cost);
}
PExtentVector bluefs_gift_extents;
if (!deferred_aggressive) {
std::lock_guard<std::mutex> l(deferred_lock);
if (deferred_queue_size >= (int)g_conf->bluestore_deferred_batch_ops ||
- throttle_deferred_ops.past_midpoint() ||
throttle_deferred_bytes.past_midpoint()) {
_deferred_try_submit();
}
TransContext *txc = &i;
txc->state = TransContext::STATE_DEFERRED_CLEANUP;
txc->osr->qcond.notify_all();
- throttle_deferred_ops.put(txc->ops);
- throttle_deferred_bytes.put(txc->bytes);
+ throttle_deferred_bytes.put(txc->cost);
deferred_done_queue.push_back(txc);
}
finished.clear();
for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
(*p).set_osr(osr);
- txc->ops += (*p).get_num_ops();
txc->bytes += (*p).get_num_bytes();
_txc_add_transaction(txc, &(*p));
}
+ _txc_calc_cost(txc);
_txc_write_nodes(txc, txc->t);
handle->suspend_tp_timeout();
utime_t tstart = ceph_clock_now();
- throttle_ops.get(txc->ops);
- throttle_bytes.get(txc->bytes);
+ throttle_bytes.get(txc->cost);
if (txc->deferred_txn) {
// ensure we do not block here because of deferred writes
- if (!throttle_deferred_ops.get_or_fail(txc->ops)) {
- deferred_try_submit();
- throttle_deferred_ops.get(txc->ops);
- }
- if (!throttle_deferred_bytes.get_or_fail(txc->bytes)) {
+ if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
deferred_try_submit();
- throttle_deferred_bytes.get(txc->bytes);
+ throttle_deferred_bytes.get(txc->cost);
}
}
utime_t tend = ceph_clock_now();
void _set_csum();
void _set_compression();
+ void _set_throttle_params();
class TransContext;
OpSequencerRef osr;
boost::intrusive::list_member_hook<> sequencer_item;
- uint64_t ops = 0, bytes = 0;
+ uint64_t bytes = 0, cost = 0;
set<OnodeRef> onodes; ///< these need to be updated/written
set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
std::atomic<uint64_t> blobid_last = {0};
std::atomic<uint64_t> blobid_max = {0};
- Throttle throttle_ops, throttle_bytes; ///< submit to commit
- Throttle throttle_deferred_ops, throttle_deferred_bytes; ///< submit to deferred complete
+ Throttle throttle_bytes; ///< submit to commit
+ Throttle throttle_deferred_bytes; ///< submit to deferred complete
interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
uint64_t max_alloc_size = 0; ///< maximum allocation unit (power of 2)
+ uint64_t throttle_cost_per_io = 0; ///< approx cost per io, in bytes
+
std::atomic<Compressor::CompressionMode> comp_mode = {Compressor::COMP_NONE}; ///< compression mode
CompressorRef compressor;
std::atomic<uint64_t> comp_min_blob_size = {0};
TransContext *_txc_create(OpSequencer *osr);
void _txc_update_store_statfs(TransContext *txc);
void _txc_add_transaction(TransContext *txc, Transaction *t);
+ void _txc_calc_cost(TransContext *txc);
void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
void _txc_state_proc(TransContext *txc);
void _txc_aio_submit(TransContext *txc);