OPTION(bluestore_max_bytes, OPT_U64, 64*1024*1024)
OPTION(bluestore_deferred_max_ops, OPT_U64, 512)
OPTION(bluestore_deferred_max_bytes, OPT_U64, 128*1024*1024)
+OPTION(bluestore_deferred_batch_ops, OPT_U64, 8)
OPTION(bluestore_nid_prealloc, OPT_INT, 1024)
OPTION(bluestore_blobid_prealloc, OPT_U64, 10240)
OPTION(bluestore_clone_cow, OPT_BOOL, true) // do copy-on-write for clones
s = osr_set;
}
- deferred_aggressive_cleanup = true;
+ deferred_aggressive = true;
+ {
+ // submit anything pending
+ std::lock_guard<std::mutex> l(deferred_lock);
+ _deferred_try_submit();
+ }
{
// wake up any previously finished deferred events
std::lock_guard<std::mutex> l(kv_lock);
dout(20) << __func__ << " drain " << osr << dendl;
osr->drain();
}
- deferred_aggressive_cleanup = false;
+ deferred_aggressive = false;
dout(10) << __func__ << " done" << dendl;
}
deferred_cleaning.pop_front();
}
+ if (!deferred_aggressive) {
+ std::lock_guard<std::mutex> l(deferred_lock);
+ if (deferred_queue_size >= (int)g_conf->bluestore_deferred_batch_ops) {
+ _deferred_try_submit();
+ }
+ }
+
// this is as good a place as any ...
_reap_collections();
deferred_queue.push_back(*txc->osr);
}
txc->osr->deferred_pending.push_back(*txc);
- if (txc->osr->deferred_running.empty()) {
+ ++deferred_queue_size;
+ if (deferred_aggressive &&
+ txc->osr->deferred_running.empty()) {
_deferred_try_submit(txc->osr.get());
}
}
void BlueStore::_deferred_try_submit()
{
- dout(20) << __func__ << " " << deferred_queue.size() << " osrs" << dendl;
+ dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
+ << deferred_queue_size << " txcs" << dendl;
for (auto& osr : deferred_queue) {
if (osr.deferred_running.empty()) {
_deferred_try_submit(&osr);
{
dout(10) << __func__ << " osr " << osr << " " << osr->deferred_pending.size()
<< " pending " << dendl;
+ assert(!osr->deferred_pending.empty());
assert(osr->deferred_running.empty());
- osr->deferred_pending.swap(osr->deferred_running);
+
+ deferred_queue_size -= osr->deferred_pending.size();
+ assert(deferred_queue_size >= 0);
+ osr->deferred_running.swap(osr->deferred_pending);
// attach all IO to the last in the batch
TransContext *last = &osr->deferred_running.back();
assert(txc->osr->deferred_txc == txc);
txc->osr->deferred_blocks.clear();
finished.swap(txc->osr->deferred_running);
- if (!txc->osr->deferred_pending.empty()) {
- _deferred_try_submit(txc->osr.get());
- } else {
+ if (txc->osr->deferred_pending.empty()) {
auto q = deferred_queue.iterator_to(*txc->osr);
deferred_queue.erase(q);
+ } else if (deferred_aggressive) {
+ _deferred_try_submit(txc->osr.get());
}
}
// in the normal case, do not bother waking up the kv thread; it will
// catch us on the next commit anyway.
- if (deferred_aggressive_cleanup) {
+ if (deferred_aggressive) {
kv_cond.notify_one();
}
return 0;