From: Sage Weil Date: Wed, 8 Mar 2017 19:51:39 +0000 (-0500) Subject: os/bluestore: separate _txc_finish_kv into _txc_{applied,committed}_kv X-Git-Tag: v12.0.1~12^2~35 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=eff1e83145d05a8f455204c18a66b9dca9ccfd78;p=ceph-ci.git os/bluestore: separate _txc_finish_kv into _txc_{applied,committed}_kv We can unblock flush()ing threads as soon as we have applied to the kv db, while the callbacks must wait until we have committed. Move methods around a bit to better match the execution order. Signed-off-by: Sage Weil --- diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index af1b51b8a0a..d33755ab059 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -7150,6 +7150,7 @@ void BlueStore::_txc_state_proc(TransContext *txc) txc->state = TransContext::STATE_KV_SUBMITTED; int r = db->submit_transaction(txc->t); assert(r == 0); + _txc_applied_kv(txc); } } { @@ -7165,7 +7166,7 @@ void BlueStore::_txc_state_proc(TransContext *txc) case TransContext::STATE_KV_SUBMITTED: txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat); txc->state = TransContext::STATE_KV_DONE; - _txc_finish_kv(txc); + _txc_committed_kv(txc); // ** fall-thru ** case TransContext::STATE_KV_DONE: @@ -7337,47 +7338,6 @@ void BlueStore::_txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t) } } -void BlueStore::_txc_finish_kv(TransContext *txc) -{ - dout(20) << __func__ << " txc " << txc << dendl; - - for (auto ls : { &txc->onodes, &txc->modified_objects }) { - for (auto& o : *ls) { - std::lock_guard l(o->flush_lock); - dout(20) << __func__ << " onode " << o << " had " << o->flush_txns - << dendl; - assert(o->flush_txns.count(txc)); - o->flush_txns.erase(txc); - o->flushing_count--; - if (o->flush_txns.empty()) { - o->flush_cond.notify_all(); - } - } - ls->clear(); // clear out refs - } - - // warning: we're calling onreadable_sync inside the sequencer lock - if (txc->onreadable_sync) { - txc->onreadable_sync->complete(0); - txc->onreadable_sync = NULL; - } - unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num); - if (txc->oncommit) { - logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start); - finishers[n]->queue(txc->oncommit); - txc->oncommit = NULL; - } - if (txc->onreadable) { - finishers[n]->queue(txc->onreadable); - txc->onreadable = NULL; - } - - if (!txc->oncommits.empty()) { - finishers[n]->queue(txc->oncommits); - } - _op_queue_release_throttle(txc); -} - void BlueStore::BSPerfTracker::update_from_perfcounters( PerfCounters &logger) { @@ -7389,32 +7349,6 @@ void BlueStore::BSPerfTracker::update_from_perfcounters( l_bluestore_commit_lat)); } -void BlueStore::_txc_finish(TransContext *txc) -{ - dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl; - assert(txc->state == TransContext::STATE_FINISHING); - - for (auto& sb : txc->shared_blobs_written) { - sb->bc.finish_write(sb->get_cache(), txc->seq); - } - txc->shared_blobs_written.clear(); - - while (!txc->removed_collections.empty()) { - _queue_reap_collection(txc->removed_collections.front()); - txc->removed_collections.pop_front(); - } - - _op_queue_release_deferred_throttle(txc); - - OpSequencerRef osr = txc->osr; - { - std::lock_guard l(osr->qlock); - txc->state = TransContext::STATE_DONE; - } - - _osr_reap_done(osr.get()); -} - void BlueStore::_osr_reap_done(OpSequencer *osr) { CollectionRef c; @@ -7501,6 +7435,76 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t) _txc_update_store_statfs(txc); } +void BlueStore::_txc_applied_kv(TransContext *txc) +{ + for (auto ls : { &txc->onodes, &txc->modified_objects }) { + for (auto& o : *ls) { + std::lock_guard l(o->flush_lock); + dout(20) << __func__ << " onode " << o << " had " << o->flush_txns + << dendl; + assert(o->flush_txns.count(txc)); + o->flush_txns.erase(txc); + o->flushing_count--; + if (o->flush_txns.empty()) { + o->flush_cond.notify_all(); + } + } + ls->clear(); // clear out refs + } +} + +void BlueStore::_txc_committed_kv(TransContext *txc) +{ + dout(20) << __func__ << " txc " << txc << dendl; + + // warning: we're calling onreadable_sync inside the sequencer lock + if (txc->onreadable_sync) { + txc->onreadable_sync->complete(0); + txc->onreadable_sync = NULL; + } + unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num); + if (txc->oncommit) { + logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start); + finishers[n]->queue(txc->oncommit); + txc->oncommit = NULL; + } + if (txc->onreadable) { + finishers[n]->queue(txc->onreadable); + txc->onreadable = NULL; + } + + if (!txc->oncommits.empty()) { + finishers[n]->queue(txc->oncommits); + } + _op_queue_release_throttle(txc); +} + +void BlueStore::_txc_finish(TransContext *txc) +{ + dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl; + assert(txc->state == TransContext::STATE_FINISHING); + + for (auto& sb : txc->shared_blobs_written) { + sb->bc.finish_write(sb->get_cache(), txc->seq); + } + txc->shared_blobs_written.clear(); + + while (!txc->removed_collections.empty()) { + _queue_reap_collection(txc->removed_collections.front()); + txc->removed_collections.pop_front(); + } + + _op_queue_release_deferred_throttle(txc); + + OpSequencerRef osr = txc->osr; + { + std::lock_guard l(osr->qlock); + txc->state = TransContext::STATE_DONE; + } + + _osr_reap_done(osr.get()); +} + void BlueStore::_txc_release_alloc(TransContext *txc) { // update allocator with full released set @@ -7580,6 +7584,7 @@ void BlueStore::_kv_sync_thread() txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat); int r = db->submit_transaction(txc->t); assert(r == 0); + _txc_applied_kv(txc); --txc->osr->kv_committing_serially; txc->state = TransContext::STATE_KV_SUBMITTED; } diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index d9e0ebd00fa..991dfcae14b 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -1860,9 +1860,10 @@ public: private: void _txc_finish_io(TransContext *txc); void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t); - void _txc_release_alloc(TransContext *txc); - void _txc_finish_kv(TransContext *txc); + void _txc_applied_kv(TransContext *txc); + void _txc_committed_kv(TransContext *txc); void _txc_finish(TransContext *txc); + void _txc_release_alloc(TransContext *txc); void _osr_reap_done(OpSequencer *osr);