++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
- deferred_lock.lock();
+ osr->deferred_lock.lock();
if (osr->deferred_pending && !osr->deferred_running) {
_deferred_submit_unlock(osr);
} else {
- deferred_lock.unlock();
+ osr->deferred_lock.unlock();
}
}
{
++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
- deferred_lock.lock();
+ osr->deferred_lock.lock();
if (osr->deferred_pending && !osr->deferred_running) {
_deferred_submit_unlock(osr);
} else {
- deferred_lock.unlock();
+ osr->deferred_lock.unlock();
}
}
{
void BlueStore::_deferred_queue(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
- deferred_lock.lock();
- if (!txc->osr->deferred_pending &&
- !txc->osr->deferred_running) {
- deferred_queue.push_back(*txc->osr);
- }
- if (!txc->osr->deferred_pending) {
- txc->osr->deferred_pending = new DeferredBatch(cct, txc->osr.get());
+
+ DeferredBatch *tmp;
+ {
+ txc->osr->deferred_lock.lock();
+ if (!txc->osr->deferred_pending) {
+ tmp = new DeferredBatch(cct, txc->osr.get());
+ } else {
+ tmp = txc->osr->deferred_pending;
+ txc->osr->deferred_pending = nullptr;
+ }
+ txc->osr->deferred_lock.unlock();
}
- ++deferred_queue_size;
- txc->osr->deferred_pending->txcs.push_back(*txc);
+
+ tmp->txcs.push_back(*txc);
bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
for (auto opi = wt.ops.begin(); opi != wt.ops.end(); ++opi) {
const auto& op = *opi;
ceph_assert(op.op == bluestore_deferred_op_t::OP_WRITE);
bufferlist::const_iterator p = op.data.begin();
for (auto e : op.extents) {
- txc->osr->deferred_pending->prepare_write(
- cct, wt.seq, e.offset, e.length, p);
+ tmp->prepare_write(cct, wt.seq, e.offset, e.length, p);
}
}
- if (deferred_aggressive &&
- !txc->osr->deferred_running) {
- _deferred_submit_unlock(txc->osr.get());
- } else {
- deferred_lock.unlock();
+
+ {
+ txc->osr->deferred_lock.lock();
+ ++deferred_queue_size;
+ txc->osr->deferred_pending = tmp;
+ if (!txc->osr->deferred_running && (tmp->txcs.size() == 1)) {
+ deferred_lock.lock();
+ deferred_queue.push_back(*txc->osr);
+ deferred_lock.unlock();
+ }
+
+ if (deferred_aggressive &&
+ !txc->osr->deferred_running) {
+ _deferred_submit_unlock(txc->osr.get());
+ } else {
+ txc->osr->deferred_lock.unlock();
+ }
}
-}
+
+ }
void BlueStore::deferred_try_submit()
{
dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
<< deferred_queue_size << " txcs" << dendl;
- std::lock_guard l(deferred_lock);
vector<OpSequencerRef> osrs;
- osrs.reserve(deferred_queue.size());
- for (auto& osr : deferred_queue) {
- osrs.push_back(&osr);
+
+ {
+ std::lock_guard l(deferred_lock);
+ osrs.reserve(deferred_queue.size());
+ for (auto& osr : deferred_queue) {
+ osrs.push_back(&osr);
+ }
}
+
for (auto& osr : osrs) {
+ osr->deferred_lock.lock();
if (osr->deferred_pending) {
if (!osr->deferred_running) {
_deferred_submit_unlock(osr.get());
- deferred_lock.lock();
} else {
+ osr->deferred_lock.unlock();
dout(20) << __func__ << " osr " << osr << " already has running"
<< dendl;
}
} else {
+ osr->deferred_lock.unlock();
dout(20) << __func__ << " osr " << osr << " has no pending" << dendl;
}
}
- deferred_last_submitted = ceph_clock_now();
+ {
+ std::lock_guard l(deferred_lock);
+ deferred_last_submitted = ceph_clock_now();
+ }
}
void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
osr->deferred_running = osr->deferred_pending;
osr->deferred_pending = nullptr;
- deferred_lock.unlock();
+ osr->deferred_lock.unlock();
for (auto& txc : b->txcs) {
throttle.log_state_latency(txc, logger, l_bluestore_state_deferred_queued_lat);
DeferredBatch *b = osr->deferred_running;
{
- deferred_lock.lock();
+ osr->deferred_lock.lock();
ceph_assert(osr->deferred_running == b);
osr->deferred_running = nullptr;
if (!osr->deferred_pending) {
dout(20) << __func__ << " dequeueing" << dendl;
- auto q = deferred_queue.iterator_to(*osr);
- deferred_queue.erase(q);
- deferred_lock.unlock();
+ {
+ deferred_lock.lock();
+ auto q = deferred_queue.iterator_to(*osr);
+ deferred_queue.erase(q);
+ deferred_lock.unlock();
+ }
+ osr->deferred_lock.unlock();
} else {
- deferred_lock.unlock();
+ osr->deferred_lock.unlock();
if (deferred_aggressive) {
dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
finisher.queue(new C_DeferredTrySubmit(this));