*/
int64_t get_max() const { return max.read(); }
+ /**
+ * return true if past midpoint
+ */
+ bool past_midpoint() const {
+ return count.read() >= max.read() / 2;
+ }
+
/**
* set the new max number, and wait until the number of taken slots drains
* and drops below this limit.
if (!deferred_aggressive) {
std::lock_guard<std::mutex> l(deferred_lock);
- if (deferred_queue_size >= (int)g_conf->bluestore_deferred_batch_ops) {
+ if (deferred_queue_size >= (int)g_conf->bluestore_deferred_batch_ops ||
+ throttle_deferred_ops.past_midpoint() ||
+ throttle_deferred_bytes.past_midpoint()) {
_deferred_try_submit();
}
}
throttle_ops.get(txc->ops);
throttle_bytes.get(txc->bytes);
if (txc->deferred_txn) {
- throttle_deferred_ops.get(txc->ops);
- throttle_deferred_bytes.get(txc->bytes);
+ // ensure we do not block here because of deferred writes
+ if (!throttle_deferred_ops.get_or_fail(txc->ops)) {
+ deferred_try_submit();
+ throttle_deferred_ops.get(txc->ops);
+ }
+ if (!throttle_deferred_bytes.get_or_fail(txc->bytes)) {
+ deferred_try_submit();
+ throttle_deferred_bytes.get(txc->bytes);
+ }
}
if (handle)
bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
void _deferred_queue(TransContext *txc);
+ void deferred_try_submit() {
+ std::lock_guard<std::mutex> l(deferred_lock);
+ _deferred_try_submit();
+ }
void _deferred_try_submit();
void _deferred_try_submit(OpSequencer *osr);
int _deferred_finish(TransContext *txc);