]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
bluestore: consolidate state transitions and throttles into BlueStoreThrottle
authorSamuel Just <sjust@redhat.com>
Fri, 21 Jun 2019 00:45:06 +0000 (17:45 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 27 Sep 2019 22:55:18 +0000 (15:55 -0700)
This will make it easier to have a single place to emit trace information.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index ee9556a483746364d209a30be7d07fd2251b34c1..4c3110eff44eea38ea4241bc17411c723b378d81 100644 (file)
@@ -4103,11 +4103,7 @@ void BlueStore::handle_discard(interval_set<uint64_t>& to_release)
 
 BlueStore::BlueStore(CephContext *cct, const string& path)
   : ObjectStore(cct, path),
-    throttle_bytes(cct, "bluestore_throttle_bytes",
-                  cct->_conf->bluestore_throttle_bytes),
-    throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
-                      cct->_conf->bluestore_throttle_bytes +
-                      cct->_conf->bluestore_throttle_deferred_bytes),
+    bsthrottle(cct),
     finisher(cct, "commit_finisher", "cfin"),
     kv_sync_thread(this),
     kv_finalize_thread(this),
@@ -4122,11 +4118,7 @@ BlueStore::BlueStore(CephContext *cct,
   const string& path,
   uint64_t _min_alloc_size)
   : ObjectStore(cct, path),
-    throttle_bytes(cct, "bluestore_throttle_bytes",
-                  cct->_conf->bluestore_throttle_bytes),
-    throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
-                      cct->_conf->bluestore_throttle_bytes +
-                      cct->_conf->bluestore_throttle_deferred_bytes),
+    bsthrottle(cct),
     finisher(cct, "commit_finisher", "cfin"),
     kv_sync_thread(this),
     kv_finalize_thread(this),
@@ -4249,14 +4241,9 @@ void BlueStore::handle_conf_change(const ConfigProxy& conf,
       _set_throttle_params();
     }
   }
-  if (changed.count("bluestore_throttle_bytes")) {
-    throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
-    throttle_deferred_bytes.reset_max(
-      conf->bluestore_throttle_bytes + conf->bluestore_throttle_deferred_bytes);
-  }
-  if (changed.count("bluestore_throttle_deferred_bytes")) {
-    throttle_deferred_bytes.reset_max(
-      conf->bluestore_throttle_bytes + conf->bluestore_throttle_deferred_bytes);
+  if (changed.count("bluestore_throttle_bytes") ||
+      changed.count("bluestore_throttle_deferred_bytes")) {
+    bsthrottle.reset_throttle(conf);
   }
   if (changed.count("bluestore_max_defer_interval")) {
     if (bdev) {
@@ -10937,7 +10924,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
             << " " << txc->get_state_name() << dendl;
     switch (txc->state) {
     case TransContext::STATE_PREPARE:
-      txc->log_state_latency(logger, l_bluestore_state_prepare_lat);
+      bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_prepare_lat);
       if (txc->ioc.has_pending_aios()) {
        txc->state = TransContext::STATE_AIO_WAIT;
        txc->had_ios = true;
@@ -10948,7 +10935,8 @@ void BlueStore::_txc_state_proc(TransContext *txc)
 
     case TransContext::STATE_AIO_WAIT:
       {
-       utime_t lat = txc->log_state_latency(logger, l_bluestore_state_aio_wait_lat);
+       utime_t lat = bsthrottle.log_state_latency(
+         *txc, logger, l_bluestore_state_aio_wait_lat);
        if (lat >= cct->_conf->bluestore_log_op_age) {
          dout(0) << __func__ << " slow aio_wait, txc = " << txc
                  << ", latency = " << lat
@@ -10964,7 +10952,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
       if (txc->had_ios) {
        ++txc->osr->txc_with_unstable_io;
       }
-      txc->log_state_latency(logger, l_bluestore_state_io_done_lat);
+      bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_io_done_lat);
       txc->state = TransContext::STATE_KV_QUEUED;
       if (cct->_conf->bluestore_sync_submit_transaction) {
        if (txc->last_nid >= nid_max ||
@@ -10988,10 +10976,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
          dout(20) << __func__ << " DEBUG randomly forcing submit via kv thread"
                   << dendl;
        } else {
-         txc->state = TransContext::STATE_KV_SUBMITTED;
-         int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
-         ceph_assert(r == 0);
-         _txc_applied_kv(txc);
+         _txc_apply_kv(txc, true);
        }
       }
       {
@@ -11015,7 +11000,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
       // ** fall-thru **
 
     case TransContext::STATE_KV_DONE:
-      txc->log_state_latency(logger, l_bluestore_state_kv_done_lat);
+      bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_kv_done_lat);
       if (txc->deferred_txn) {
        txc->state = TransContext::STATE_DEFERRED_QUEUED;
        _deferred_queue(txc);
@@ -11025,12 +11010,12 @@ void BlueStore::_txc_state_proc(TransContext *txc)
       break;
 
     case TransContext::STATE_DEFERRED_CLEANUP:
-      txc->log_state_latency(logger, l_bluestore_state_deferred_cleanup_lat);
+      bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_deferred_cleanup_lat);
       txc->state = TransContext::STATE_FINISHING;
       // ** fall-thru **
 
     case TransContext::STATE_FINISHING:
-      txc->log_state_latency(logger, l_bluestore_state_finishing_lat);
+      bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_finishing_lat);
       _txc_finish(txc);
       return;
 
@@ -11184,8 +11169,15 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
   _txc_update_store_statfs(txc);
 }
 
-void BlueStore::_txc_applied_kv(TransContext *txc)
+void BlueStore::_txc_apply_kv(TransContext *txc, bool sync_submit_transaction)
 {
+  ceph_assert(txc->state == TransContext::STATE_KV_QUEUED);
+  txc->state = TransContext::STATE_KV_SUBMITTED;
+  {
+    int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
+    ceph_assert(r == 0);
+  }
+
   for (auto ls : { &txc->onodes, &txc->modified_objects }) {
     for (auto& o : *ls) {
       dout(20) << __func__ << " onode " << o << " had " << o->flushing_count
@@ -11201,6 +11193,7 @@ void BlueStore::_txc_applied_kv(TransContext *txc)
 void BlueStore::_txc_committed_kv(TransContext *txc)
 {
   dout(20) << __func__ << " txc " << txc << dendl;
+  bsthrottle.complete_kv(*txc);
   {
     std::lock_guard l(txc->osr->qlock);
     txc->state = TransContext::STATE_KV_DONE;
@@ -11210,7 +11203,7 @@ void BlueStore::_txc_committed_kv(TransContext *txc)
       finisher.queue(txc->oncommits);
     }
   }
-  txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat);
+  bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_kv_committing_lat);
   log_latency_fn(
     __func__,
     l_bluestore_commit_lat,
@@ -11276,6 +11269,7 @@ void BlueStore::_txc_finish(TransContext *txc)
       osr->qcond.notify_all();
     }
   }
+
   while (!releasing_txc.empty()) {
     // release to allocator only after all preceding txc's have also
     // finished any deferred writes that potentially land in these
@@ -11283,7 +11277,8 @@ void BlueStore::_txc_finish(TransContext *txc)
     auto txc = &releasing_txc.front();
     _txc_release_alloc(txc);
     releasing_txc.pop_front();
-    txc->log_state_latency(logger, l_bluestore_state_done_lat);
+    bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_done_lat);
+    bsthrottle.complete(*txc);
     delete txc;
   }
 
@@ -11302,7 +11297,7 @@ void BlueStore::_txc_finish(TransContext *txc)
               << dendl;
     }
   }
- }
+}
 
 void BlueStore::_txc_release_alloc(TransContext *txc)
 {
@@ -11636,11 +11631,9 @@ void BlueStore::_kv_sync_thread()
       }
 
       for (auto txc : kv_committing) {
+       throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_queued_lat);
        if (txc->state == TransContext::STATE_KV_QUEUED) {
-         txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
-         int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
-         ceph_assert(r == 0);
-         _txc_applied_kv(txc);
+         _txc_apply_kv(txc, false);
          --txc->osr->kv_committing_serially;
          txc->state = TransContext::STATE_KV_SUBMITTED;
          if (txc->osr->kv_submitted_waiters) {
@@ -11650,7 +11643,6 @@ void BlueStore::_kv_sync_thread()
 
        } else {
          ceph_assert(txc->state == TransContext::STATE_KV_SUBMITTED);
-         txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
        }
        if (txc->had_ios) {
          --txc->osr->txc_with_unstable_io;
@@ -11663,7 +11655,7 @@ void BlueStore::_kv_sync_thread()
       // iteration there will already be ops awake.  otherwise, we
       // end up going to sleep, and then wake up when the very first
       // transaction is ready for commit.
-      throttle_bytes.put(costs);
+      bsthrottle.release_kv_throttle(costs);
 
       if (bluefs &&
          after_flush - bluefs_last_balance >
@@ -11831,7 +11823,7 @@ void BlueStore::_kv_finalize_thread()
 
       if (!deferred_aggressive) {
        if (deferred_queue_size >= deferred_batch_ops.load() ||
-           throttle_deferred_bytes.past_midpoint()) {
+           bsthrottle.should_submit_deferred()) {
          deferred_try_submit();
        }
       }
@@ -11940,7 +11932,7 @@ void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
   deferred_lock.unlock();
 
   for (auto& txc : b->txcs) {
-    txc.log_state_latency(logger, l_bluestore_state_deferred_queued_lat);
+    bsthrottle.log_state_latency(txc, logger, l_bluestore_state_deferred_queued_lat);
   }
   uint64_t start = 0, pos = 0;
   bufferlist bl;
@@ -12018,12 +12010,12 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
     {
       for (auto& i : b->txcs) {
        TransContext *txc = &i;
-       txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat);
+       bsthrottle.log_state_latency(*txc, logger, l_bluestore_state_deferred_aio_wait_lat);
        txc->state = TransContext::STATE_DEFERRED_CLEANUP;
        costs += txc->cost;
       }
     }
-    throttle_deferred_bytes.put(costs);
+    bsthrottle.release_deferred_throttle(costs);
   }
 
   {
@@ -12132,25 +12124,26 @@ int BlueStore::queue_transactions(
     handle->suspend_tp_timeout();
 
   auto tstart = mono_clock::now();
-  throttle_bytes.get(txc->cost);
-  if (txc->deferred_txn) {
+
+  if (!bsthrottle.try_start_transaction(
+       *db,
+       *txc,
+       tstart)) {
     // ensure we do not block here because of deferred writes
-    if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
-      dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
-              << dendl;
-      ++deferred_aggressive;
-      deferred_try_submit();
-      {
-       // wake up any previously finished deferred events
-       std::lock_guard l(kv_lock);
-       if (!kv_sync_in_progress) {
-         kv_sync_in_progress = true;
-         kv_cond.notify_one();
-       }
+    dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
+            << dendl;
+    ++deferred_aggressive;
+    deferred_try_submit();
+    {
+      // wake up any previously finished deferred events
+      std::lock_guard l(kv_lock);
+      if (!kv_sync_in_progress) {
+       kv_sync_in_progress = true;
+       kv_cond.notify_one();
       }
-      throttle_deferred_bytes.get(txc->cost);
-      --deferred_aggressive;
-   }
+    }
+    bsthrottle.finish_start_transaction(*db, *txc, tstart);
+    --deferred_aggressive;
   }
   auto tend = mono_clock::now();
 
@@ -14749,6 +14742,39 @@ void BlueStore::log_latency_fn(
 }
 
 
+utime_t BlueStore::BlueStoreThrottle::log_state_latency(
+  TransContext &txc, PerfCounters *logger, int state)
+{
+  utime_t now = ceph_clock_now();
+  utime_t lat = now - txc.last_stamp;
+  logger->tinc(state, lat);
+  txc.last_stamp = now;
+  return lat;
+}
+
+bool BlueStore::BlueStoreThrottle::try_start_transaction(
+  KeyValueDB &db,
+  TransContext &txc,
+  mono_clock::time_point start_throttle_acquire)
+{
+  throttle_bytes.get(txc.cost);
+
+  if (!txc.deferred_txn || throttle_deferred_bytes.get_or_fail(txc.cost)) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void BlueStore::BlueStoreThrottle::finish_start_transaction(
+  KeyValueDB &db,
+  TransContext &txc,
+  mono_clock::time_point start_throttle_acquire)
+{
+  ceph_assert(txc.deferred_txn);
+  throttle_deferred_bytes.get(txc.cost);
+}
+
 // DB key value Histogram
 #define KEY_SLAB 32
 #define VALUE_SLAB 64
index 8976c03b0278b627fd83a5dbecdce657346d6781..a06c7633c2c51b9b8058be1e0b02c54d4ba490cf 100644 (file)
@@ -1512,20 +1512,6 @@ public:
     }
 #endif
 
-    utime_t log_state_latency(PerfCounters *logger, int state) {
-      utime_t lat, now = ceph_clock_now();
-      lat = now - last_stamp;
-      logger->tinc(state, lat);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-      if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
-        double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
-        OID_ELAPSED("", usecs, get_state_latency_name(state));
-      }
-#endif
-      last_stamp = now;
-      return lat;
-    }
-
     CollectionRef ch;
     OpSequencerRef osr;  // this should be ch->osr
     boost::intrusive::list_member_hook<> sequencer_item;
@@ -1598,6 +1584,49 @@ public:
     }
   };
 
+
+  class BlueStoreThrottle {
+    Throttle throttle_bytes;           ///< submit to commit
+    Throttle throttle_deferred_bytes;  ///< submit to deferred complete
+
+
+  public:
+    BlueStoreThrottle(CephContext *cct) :
+      throttle_bytes(cct, "bluestore_throttle_bytes",
+                    cct->_conf->bluestore_throttle_bytes),
+      throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
+                             cct->_conf->bluestore_throttle_bytes +
+                             cct->_conf->bluestore_throttle_deferred_bytes)
+    {}
+
+
+    utime_t log_state_latency(
+      TransContext &txc, PerfCounters *logger, int state);
+    bool try_start_transaction(
+      KeyValueDB &db,
+      TransContext &txc,
+      mono_clock::time_point);
+    void finish_start_transaction(
+      KeyValueDB &db,
+      TransContext &txc,
+      mono_clock::time_point);
+    void release_kv_throttle(uint64_t cost) {
+      throttle_bytes.put(cost);
+    }
+    void release_deferred_throttle(uint64_t cost) {
+      throttle_deferred_bytes.put(cost);
+    }
+    bool should_submit_deferred() {
+      return throttle_deferred_bytes.past_midpoint();
+    }
+    void reset_throttle(const ConfigProxy &conf) {
+      throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
+      throttle_deferred_bytes.reset_max(
+       conf->bluestore_throttle_bytes +
+       conf->bluestore_throttle_deferred_bytes);
+    }
+  } bsthrottle;
+
   typedef boost::intrusive::list<
     TransContext,
     boost::intrusive::member_hook<
@@ -1838,9 +1867,6 @@ private:
   std::atomic<uint64_t> blobid_last = {0};
   std::atomic<uint64_t> blobid_max = {0};
 
-  Throttle throttle_bytes;          ///< submit to commit
-  Throttle throttle_deferred_bytes;  ///< submit to deferred complete
-
   interval_set<uint64_t> bluefs_extents;  ///< block extents owned by bluefs
   interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
 
@@ -2202,7 +2228,7 @@ public:
 private:
   void _txc_finish_io(TransContext *txc);
   void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
-  void _txc_applied_kv(TransContext *txc);
+  void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
   void _txc_committed_kv(TransContext *txc);
   void _txc_finish(TransContext *txc);
   void _txc_release_alloc(TransContext *txc);