txc->state = TransContext::STATE_KV_SUBMITTED;
int r = db->submit_transaction(txc->t);
assert(r == 0);
+ _txc_applied_kv(txc);
}
}
{
case TransContext::STATE_KV_SUBMITTED:
txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat);
txc->state = TransContext::STATE_KV_DONE;
- _txc_finish_kv(txc);
+ _txc_committed_kv(txc);
// ** fall-thru **
case TransContext::STATE_KV_DONE:
}
}
-void BlueStore::_txc_finish_kv(TransContext *txc)
-{
- dout(20) << __func__ << " txc " << txc << dendl;
-
- for (auto ls : { &txc->onodes, &txc->modified_objects }) {
- for (auto& o : *ls) {
- std::lock_guard<std::mutex> l(o->flush_lock);
- dout(20) << __func__ << " onode " << o << " had " << o->flush_txns
- << dendl;
- assert(o->flush_txns.count(txc));
- o->flush_txns.erase(txc);
- o->flushing_count--;
- if (o->flush_txns.empty()) {
- o->flush_cond.notify_all();
- }
- }
- ls->clear(); // clear out refs
- }
-
- // warning: we're calling onreadable_sync inside the sequencer lock
- if (txc->onreadable_sync) {
- txc->onreadable_sync->complete(0);
- txc->onreadable_sync = NULL;
- }
- unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num);
- if (txc->oncommit) {
- logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start);
- finishers[n]->queue(txc->oncommit);
- txc->oncommit = NULL;
- }
- if (txc->onreadable) {
- finishers[n]->queue(txc->onreadable);
- txc->onreadable = NULL;
- }
-
- if (!txc->oncommits.empty()) {
- finishers[n]->queue(txc->oncommits);
- }
- _op_queue_release_throttle(txc);
-}
-
void BlueStore::BSPerfTracker::update_from_perfcounters(
PerfCounters &logger)
{
l_bluestore_commit_lat));
}
-void BlueStore::_txc_finish(TransContext *txc)
-{
- dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
- assert(txc->state == TransContext::STATE_FINISHING);
-
- for (auto& sb : txc->shared_blobs_written) {
- sb->bc.finish_write(sb->get_cache(), txc->seq);
- }
- txc->shared_blobs_written.clear();
-
- while (!txc->removed_collections.empty()) {
- _queue_reap_collection(txc->removed_collections.front());
- txc->removed_collections.pop_front();
- }
-
- _op_queue_release_deferred_throttle(txc);
-
- OpSequencerRef osr = txc->osr;
- {
- std::lock_guard<std::mutex> l(osr->qlock);
- txc->state = TransContext::STATE_DONE;
- }
-
- _osr_reap_done(osr.get());
-}
-
void BlueStore::_osr_reap_done(OpSequencer *osr)
{
CollectionRef c;
_txc_update_store_statfs(txc);
}
+void BlueStore::_txc_applied_kv(TransContext *txc)
+{
+ for (auto ls : { &txc->onodes, &txc->modified_objects }) {
+ for (auto& o : *ls) {
+ std::lock_guard<std::mutex> l(o->flush_lock);
+ dout(20) << __func__ << " onode " << o << " had " << o->flush_txns
+ << dendl;
+ assert(o->flush_txns.count(txc));
+ o->flush_txns.erase(txc);
+ o->flushing_count--;
+ if (o->flush_txns.empty()) {
+ o->flush_cond.notify_all();
+ }
+ }
+ ls->clear(); // clear out refs
+ }
+}
+
+void BlueStore::_txc_committed_kv(TransContext *txc)
+{
+ dout(20) << __func__ << " txc " << txc << dendl;
+
+ // warning: we're calling onreadable_sync inside the sequencer lock
+ if (txc->onreadable_sync) {
+ txc->onreadable_sync->complete(0);
+ txc->onreadable_sync = NULL;
+ }
+ unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num);
+ if (txc->oncommit) {
+ logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start);
+ finishers[n]->queue(txc->oncommit);
+ txc->oncommit = NULL;
+ }
+ if (txc->onreadable) {
+ finishers[n]->queue(txc->onreadable);
+ txc->onreadable = NULL;
+ }
+
+ if (!txc->oncommits.empty()) {
+ finishers[n]->queue(txc->oncommits);
+ }
+ _op_queue_release_throttle(txc);
+}
+
+void BlueStore::_txc_finish(TransContext *txc)
+{
+ dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
+ assert(txc->state == TransContext::STATE_FINISHING);
+
+ for (auto& sb : txc->shared_blobs_written) {
+ sb->bc.finish_write(sb->get_cache(), txc->seq);
+ }
+ txc->shared_blobs_written.clear();
+
+ while (!txc->removed_collections.empty()) {
+ _queue_reap_collection(txc->removed_collections.front());
+ txc->removed_collections.pop_front();
+ }
+
+ _op_queue_release_deferred_throttle(txc);
+
+ OpSequencerRef osr = txc->osr;
+ {
+ std::lock_guard<std::mutex> l(osr->qlock);
+ txc->state = TransContext::STATE_DONE;
+ }
+
+ _osr_reap_done(osr.get());
+}
+
void BlueStore::_txc_release_alloc(TransContext *txc)
{
// update allocator with full released set
txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
int r = db->submit_transaction(txc->t);
assert(r == 0);
+ _txc_applied_kv(txc);
--txc->osr->kv_committing_serially;
txc->state = TransContext::STATE_KV_SUBMITTED;
}