]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
os/bluestore: separate _txc_finish_kv into _txc_{applied,committed}_kv
authorSage Weil <sage@redhat.com>
Wed, 8 Mar 2017 19:51:39 +0000 (14:51 -0500)
committerSage Weil <sage@redhat.com>
Tue, 21 Mar 2017 18:56:27 +0000 (13:56 -0500)
We can unblock flush()ing threads as soon as we have applied to the kv db,
while the callbacks must wait until we have committed.

Move methods around a bit to better match the execution order.

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index af1b51b8a0a99d5e8d6c44b413611ca7e7f06853..d33755ab0596fd7fad23c6948255e070d6b93942 100644 (file)
@@ -7150,6 +7150,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
          txc->state = TransContext::STATE_KV_SUBMITTED;
          int r = db->submit_transaction(txc->t);
          assert(r == 0);
+         _txc_applied_kv(txc);
        }
       }
       {
@@ -7165,7 +7166,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
     case TransContext::STATE_KV_SUBMITTED:
       txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat);
       txc->state = TransContext::STATE_KV_DONE;
-      _txc_finish_kv(txc);
+      _txc_committed_kv(txc);
       // ** fall-thru **
 
     case TransContext::STATE_KV_DONE:
@@ -7337,47 +7338,6 @@ void BlueStore::_txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t)
   }
 }
 
-void BlueStore::_txc_finish_kv(TransContext *txc)
-{
-  dout(20) << __func__ << " txc " << txc << dendl;
-
-  for (auto ls : { &txc->onodes, &txc->modified_objects }) {
-    for (auto& o : *ls) {
-      std::lock_guard<std::mutex> l(o->flush_lock);
-      dout(20) << __func__ << " onode " << o << " had " << o->flush_txns
-              << dendl;
-      assert(o->flush_txns.count(txc));
-      o->flush_txns.erase(txc);
-      o->flushing_count--;
-      if (o->flush_txns.empty()) {
-       o->flush_cond.notify_all();
-      }
-    }
-    ls->clear();  // clear out refs
-  }
-
-  // warning: we're calling onreadable_sync inside the sequencer lock
-  if (txc->onreadable_sync) {
-    txc->onreadable_sync->complete(0);
-    txc->onreadable_sync = NULL;
-  }
-  unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num);
-  if (txc->oncommit) {
-    logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start);
-    finishers[n]->queue(txc->oncommit);
-    txc->oncommit = NULL;
-  }
-  if (txc->onreadable) {
-    finishers[n]->queue(txc->onreadable);
-    txc->onreadable = NULL;
-  }
-
-  if (!txc->oncommits.empty()) {
-    finishers[n]->queue(txc->oncommits);
-  }
-  _op_queue_release_throttle(txc);
-}
-
 void BlueStore::BSPerfTracker::update_from_perfcounters(
   PerfCounters &logger)
 {
@@ -7389,32 +7349,6 @@ void BlueStore::BSPerfTracker::update_from_perfcounters(
       l_bluestore_commit_lat));
 }
 
-void BlueStore::_txc_finish(TransContext *txc)
-{
-  dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
-  assert(txc->state == TransContext::STATE_FINISHING);
-
-  for (auto& sb : txc->shared_blobs_written) {
-    sb->bc.finish_write(sb->get_cache(), txc->seq);
-  }
-  txc->shared_blobs_written.clear();
-
-  while (!txc->removed_collections.empty()) {
-    _queue_reap_collection(txc->removed_collections.front());
-    txc->removed_collections.pop_front();
-  }
-
-  _op_queue_release_deferred_throttle(txc);
-
-  OpSequencerRef osr = txc->osr;
-  {
-    std::lock_guard<std::mutex> l(osr->qlock);
-    txc->state = TransContext::STATE_DONE;
-  }
-
-  _osr_reap_done(osr.get());
-}
-
 void BlueStore::_osr_reap_done(OpSequencer *osr)
 {
   CollectionRef c;
@@ -7501,6 +7435,76 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
   _txc_update_store_statfs(txc);
 }
 
+void BlueStore::_txc_applied_kv(TransContext *txc)
+{
+  for (auto ls : { &txc->onodes, &txc->modified_objects }) {
+    for (auto& o : *ls) {
+      std::lock_guard<std::mutex> l(o->flush_lock);
+      dout(20) << __func__ << " onode " << o << " had " << o->flush_txns
+              << dendl;
+      assert(o->flush_txns.count(txc));
+      o->flush_txns.erase(txc);
+      o->flushing_count--;
+      if (o->flush_txns.empty()) {
+       o->flush_cond.notify_all();
+      }
+    }
+    ls->clear();  // clear out refs
+  }
+}
+
+void BlueStore::_txc_committed_kv(TransContext *txc)
+{
+  dout(20) << __func__ << " txc " << txc << dendl;
+
+  // warning: we're calling onreadable_sync inside the sequencer lock
+  if (txc->onreadable_sync) {
+    txc->onreadable_sync->complete(0);
+    txc->onreadable_sync = NULL;
+  }
+  unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num);
+  if (txc->oncommit) {
+    logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start);
+    finishers[n]->queue(txc->oncommit);
+    txc->oncommit = NULL;
+  }
+  if (txc->onreadable) {
+    finishers[n]->queue(txc->onreadable);
+    txc->onreadable = NULL;
+  }
+
+  if (!txc->oncommits.empty()) {
+    finishers[n]->queue(txc->oncommits);
+  }
+  _op_queue_release_throttle(txc);
+}
+
+void BlueStore::_txc_finish(TransContext *txc)
+{
+  dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
+  assert(txc->state == TransContext::STATE_FINISHING);
+
+  for (auto& sb : txc->shared_blobs_written) {
+    sb->bc.finish_write(sb->get_cache(), txc->seq);
+  }
+  txc->shared_blobs_written.clear();
+
+  while (!txc->removed_collections.empty()) {
+    _queue_reap_collection(txc->removed_collections.front());
+    txc->removed_collections.pop_front();
+  }
+
+  _op_queue_release_deferred_throttle(txc);
+
+  OpSequencerRef osr = txc->osr;
+  {
+    std::lock_guard<std::mutex> l(osr->qlock);
+    txc->state = TransContext::STATE_DONE;
+  }
+
+  _osr_reap_done(osr.get());
+}
+
 void BlueStore::_txc_release_alloc(TransContext *txc)
 {
   // update allocator with full released set
@@ -7580,6 +7584,7 @@ void BlueStore::_kv_sync_thread()
        txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
        int r = db->submit_transaction(txc->t);
        assert(r == 0);
+       _txc_applied_kv(txc);
        --txc->osr->kv_committing_serially;
        txc->state = TransContext::STATE_KV_SUBMITTED;
       }
index d9e0ebd00faef26540d1b50a288a8c9c71283d7f..991dfcae14ba1c19890909be8c86276085e0165b 100644 (file)
@@ -1860,9 +1860,10 @@ public:
 private:
   void _txc_finish_io(TransContext *txc);
   void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
-  void _txc_release_alloc(TransContext *txc);
-  void _txc_finish_kv(TransContext *txc);
+  void _txc_applied_kv(TransContext *txc);
+  void _txc_committed_kv(TransContext *txc);
   void _txc_finish(TransContext *txc);
+  void _txc_release_alloc(TransContext *txc);
 
   void _osr_reap_done(OpSequencer *osr);