++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
- std::lock_guard<std::mutex> l(deferred_lock);
+ deferred_lock.lock();
if (osr->deferred_pending) {
- _deferred_submit(osr);
+ _deferred_submit_unlock(osr);
+ } else {
+ deferred_lock.unlock();
}
}
{
++deferred_aggressive;
{
// submit anything pending
- std::lock_guard<std::mutex> l(deferred_lock);
- _deferred_try_submit();
+ deferred_try_submit();
}
{
// wake up any previously finished deferred events
deferred_stable.clear();
if (!deferred_aggressive) {
- std::lock_guard<std::mutex> l(deferred_lock);
if (deferred_queue_size >= deferred_batch_ops.load() ||
throttle_deferred_bytes.past_midpoint()) {
- _deferred_try_submit();
+ deferred_try_submit();
}
}
void BlueStore::_deferred_queue(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
- std::lock_guard<std::mutex> l(deferred_lock);
+ deferred_lock.lock();
if (!txc->osr->deferred_pending &&
!txc->osr->deferred_running) {
deferred_queue.push_back(*txc->osr);
}
if (deferred_aggressive &&
!txc->osr->deferred_running) {
- _deferred_submit(txc->osr.get());
+ _deferred_submit_unlock(txc->osr.get());
+ } else {
+ deferred_lock.unlock();
}
}
-void BlueStore::_deferred_try_submit()
+void BlueStore::deferred_try_submit()
{
dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
<< deferred_queue_size << " txcs" << dendl;
+ std::lock_guard<std::mutex> l(deferred_lock);
+ vector<OpSequencerRef> osrs;
+ osrs.reserve(deferred_queue.size());
for (auto& osr : deferred_queue) {
- if (!osr.deferred_running) {
- _deferred_submit(&osr);
+ osrs.push_back(&osr);
+ }
+ for (auto& osr : osrs) {
+ if (!osr->deferred_running) {
+ _deferred_submit_unlock(osr.get());
+ deferred_lock.lock();
}
}
}
-void BlueStore::_deferred_submit(OpSequencer *osr)
+void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
{
dout(10) << __func__ << " osr " << osr
<< " " << osr->deferred_pending->iomap.size() << " ios pending "
bl.claim_append(i->second.bl);
++i;
}
+
+ // demote to deferred_submit_lock, then drop that too
+ std::lock_guard<std::mutex> l(deferred_submit_lock);
+ deferred_lock.unlock();
bdev->aio_submit(&b->ioc);
}
auto q = deferred_queue.iterator_to(*osr);
deferred_queue.erase(q);
} else if (deferred_aggressive) {
- _deferred_submit(osr);
+ dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
+ finishers[0]->queue(new FunctionContext([&](int) {
+ deferred_try_submit();
+ }));
}
}
interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
- std::mutex deferred_lock;
+ std::mutex deferred_lock, deferred_submit_lock;
std::atomic<uint64_t> deferred_seq = {0};
deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int deferred_queue_size = 0; ///< num txc's queued across all osrs
bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
void _deferred_queue(TransContext *txc);
- void deferred_try_submit() {
- std::lock_guard<std::mutex> l(deferred_lock);
- _deferred_try_submit();
- }
- void _deferred_try_submit();
- void _deferred_submit(OpSequencer *osr);
+ void deferred_try_submit();
+ void _deferred_submit_unlock(OpSequencer *osr);
void _deferred_aio_finish(OpSequencer *osr);
int _deferred_replay();