]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: add blkin traces to BlueStore 35416/head
authorYang Honggang <rtlinux@163.com>
Fri, 19 Jun 2020 16:17:40 +0000 (00:17 +0800)
committerYang Honggang <rtlinux@163.com>
Sat, 20 Jun 2020 02:05:37 +0000 (10:05 +0800)
Signed-off-by: Yang Honggang <yanghonggang@kuaishou.com>
CMakeLists.txt
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index aef81c3e428c145deeb8b29b9da879b9f0e6eb8c..574888b18fae5b6a9c24a3e6c12ea2702d3ef24c 100644 (file)
@@ -344,6 +344,7 @@ endif()
 
 option(WITH_BLKIN "Use blkin to emit LTTng tracepoints for Zipkin" OFF)
 if(WITH_BLKIN)
+  find_package(LTTngUST REQUIRED)
   set(BLKIN_LIBRARIES blkin ${LTTNGUST_LIBRARIES} lttng-ust-fork)
   include_directories(SYSTEM src/blkin/blkin-lib)
 endif(WITH_BLKIN)
index 7901a285822070a2335118ea691b3f655ba66835..4b7850b2e4c409c1c25d2bade2f6742540d99d5d 100644 (file)
@@ -11126,10 +11126,21 @@ void BlueStore::get_db_statistics(Formatter *f)
 
 BlueStore::TransContext *BlueStore::_txc_create(
   Collection *c, OpSequencer *osr,
-  list<Context*> *on_commits)
+  list<Context*> *on_commits,
+  TrackedOpRef osd_op)
 {
   TransContext *txc = new TransContext(cct, c, osr, on_commits);
   txc->t = db->get_transaction();
+
+#ifdef WITH_BLKIN
+  if (osd_op && osd_op->pg_trace) {
+    txc->trace.init("TransContext", &trace_endpoint,
+                    &osd_op->pg_trace);
+    txc->trace.event("txc create");
+    txc->trace.keyval("txc seq", txc->seq);
+  }
+#endif
+
   osr->queue_new(txc);
   dout(20) << __func__ << " osr " << osr << " = " << txc
           << " seq " << txc->seq << dendl;
@@ -11186,11 +11197,16 @@ void BlueStore::_txc_state_proc(TransContext *txc)
   while (true) {
     dout(10) << __func__ << " txc " << txc
             << " " << txc->get_state_name() << dendl;
-    switch (txc->state) {
+    switch (txc->get_state()) {
     case TransContext::STATE_PREPARE:
       throttle.log_state_latency(*txc, logger, l_bluestore_state_prepare_lat);
       if (txc->ioc.has_pending_aios()) {
-       txc->state = TransContext::STATE_AIO_WAIT;
+       txc->set_state(TransContext::STATE_AIO_WAIT);
+#ifdef WITH_BLKIN
+        if (txc->trace) {
+          txc->trace.keyval("pending aios", txc->ioc.num_pending.load());
+        }
+#endif
        txc->had_ios = true;
        _txc_aio_submit(txc);
        return;
@@ -11217,7 +11233,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
        ++txc->osr->txc_with_unstable_io;
       }
       throttle.log_state_latency(*txc, logger, l_bluestore_state_io_done_lat);
-      txc->state = TransContext::STATE_KV_QUEUED;
+      txc->set_state(TransContext::STATE_KV_QUEUED);
       if (cct->_conf->bluestore_sync_submit_transaction) {
        if (txc->last_nid >= nid_max ||
            txc->last_blobid >= blobid_max) {
@@ -11250,7 +11266,7 @@ void BlueStore::_txc_state_proc(TransContext *txc)
          kv_sync_in_progress = true;
          kv_cond.notify_one();
        }
-       if (txc->state != TransContext::STATE_KV_SUBMITTED) {
+       if (txc->get_state() != TransContext::STATE_KV_SUBMITTED) {
          kv_queue_unsubmitted.push_back(txc);
          ++txc->osr->kv_committing_serially;
        }
@@ -11266,16 +11282,16 @@ void BlueStore::_txc_state_proc(TransContext *txc)
     case TransContext::STATE_KV_DONE:
       throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_done_lat);
       if (txc->deferred_txn) {
-       txc->state = TransContext::STATE_DEFERRED_QUEUED;
+       txc->set_state(TransContext::STATE_DEFERRED_QUEUED);
        _deferred_queue(txc);
        return;
       }
-      txc->state = TransContext::STATE_FINISHING;
+      txc->set_state(TransContext::STATE_FINISHING);
       break;
 
     case TransContext::STATE_DEFERRED_CLEANUP:
       throttle.log_state_latency(*txc, logger, l_bluestore_state_deferred_cleanup_lat);
-      txc->state = TransContext::STATE_FINISHING;
+      txc->set_state(TransContext::STATE_FINISHING);
       // ** fall-thru **
 
     case TransContext::STATE_FINISHING:
@@ -11303,17 +11319,17 @@ void BlueStore::_txc_finish_io(TransContext *txc)
 
   OpSequencer *osr = txc->osr.get();
   std::lock_guard l(osr->qlock);
-  txc->state = TransContext::STATE_IO_DONE;
+  txc->set_state(TransContext::STATE_IO_DONE);
   txc->ioc.release_running_aios();
   OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
   while (p != osr->q.begin()) {
     --p;
-    if (p->state < TransContext::STATE_IO_DONE) {
+    if (p->get_state() < TransContext::STATE_IO_DONE) {
       dout(20) << __func__ << " " << txc << " blocked by " << &*p << " "
               << p->get_state_name() << dendl;
       return;
     }
-    if (p->state > TransContext::STATE_IO_DONE) {
+    if (p->get_state() > TransContext::STATE_IO_DONE) {
       ++p;
       break;
     }
@@ -11321,7 +11337,7 @@ void BlueStore::_txc_finish_io(TransContext *txc)
   do {
     _txc_state_proc(&*p++);
   } while (p != osr->q.end() &&
-          p->state == TransContext::STATE_IO_DONE);
+          p->get_state() == TransContext::STATE_IO_DONE);
 
   if (osr->kv_submitted_waiters) {
     osr->qcond.notify_all();
@@ -11435,15 +11451,21 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
 
 void BlueStore::_txc_apply_kv(TransContext *txc, bool sync_submit_transaction)
 {
-  ceph_assert(txc->state == TransContext::STATE_KV_QUEUED);
+  ceph_assert(txc->get_state() == TransContext::STATE_KV_QUEUED);
   {
 #if defined(WITH_LTTNG)
     auto start = mono_clock::now();
 #endif
 
+#ifdef WITH_BLKIN
+    if (txc->trace) {
+      txc->trace.event("db async submit");
+    }
+#endif
+
     int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
     ceph_assert(r == 0);
-    txc->state = TransContext::STATE_KV_SUBMITTED;
+    txc->set_state(TransContext::STATE_KV_SUBMITTED);
     if (txc->osr->kv_submitted_waiters) {
       std::lock_guard l(txc->osr->qlock);
       txc->osr->qcond.notify_all();
@@ -11480,7 +11502,7 @@ void BlueStore::_txc_committed_kv(TransContext *txc)
   throttle.complete_kv(*txc);
   {
     std::lock_guard l(txc->osr->qlock);
-    txc->state = TransContext::STATE_KV_DONE;
+    txc->set_state(TransContext::STATE_KV_DONE);
     if (txc->ch->commit_queue) {
       txc->ch->commit_queue->queue(txc->oncommits);
     } else {
@@ -11502,7 +11524,7 @@ void BlueStore::_txc_committed_kv(TransContext *txc)
 void BlueStore::_txc_finish(TransContext *txc)
 {
   dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
-  ceph_assert(txc->state == TransContext::STATE_FINISHING);
+  ceph_assert(txc->get_state() == TransContext::STATE_FINISHING);
 
   for (auto& sb : txc->shared_blobs_written) {
     sb->finish_write(txc->seq);
@@ -11520,19 +11542,19 @@ void BlueStore::_txc_finish(TransContext *txc)
   OpSequencer::q_list_t releasing_txc;
   {
     std::lock_guard l(osr->qlock);
-    txc->state = TransContext::STATE_DONE;
+    txc->set_state(TransContext::STATE_DONE);
     bool notify = false;
     while (!osr->q.empty()) {
       TransContext *txc = &osr->q.front();
       dout(20) << __func__ << "  txc " << txc << " " << txc->get_state_name()
               << dendl;
-      if (txc->state != TransContext::STATE_DONE) {
-       if (txc->state == TransContext::STATE_PREPARE &&
+      if (txc->get_state() != TransContext::STATE_DONE) {
+       if (txc->get_state() == TransContext::STATE_PREPARE &&
          deferred_aggressive) {
          // for _osr_drain_preceding()
           notify = true;
        }
-       if (txc->state == TransContext::STATE_DEFERRED_QUEUED &&
+       if (txc->get_state() == TransContext::STATE_DEFERRED_QUEUED &&
            osr->q.size() > g_conf()->bluestore_max_deferred_txc) {
          submit_deferred = true;
        }
@@ -11917,11 +11939,11 @@ void BlueStore::_kv_sync_thread()
 
       for (auto txc : kv_committing) {
        throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_queued_lat);
-       if (txc->state == TransContext::STATE_KV_QUEUED) {
+       if (txc->get_state() == TransContext::STATE_KV_QUEUED) {
          _txc_apply_kv(txc, false);
          --txc->osr->kv_committing_serially;
        } else {
-         ceph_assert(txc->state == TransContext::STATE_KV_SUBMITTED);
+         ceph_assert(txc->get_state() == TransContext::STATE_KV_SUBMITTED);
        }
        if (txc->had_ios) {
          --txc->osr->txc_with_unstable_io;
@@ -11962,6 +11984,15 @@ void BlueStore::_kv_sync_thread()
       int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction_sync(synct);
       ceph_assert(r == 0);
 
+#ifdef WITH_BLKIN
+      for (auto txc : kv_committing) {
+        if (txc->trace) {
+          txc->trace.event("db sync submit");
+          txc->trace.keyval("kv_committing size", kv_committing.size());
+        }
+      }
+#endif
+
       int committing_size = kv_committing.size();
       int deferred_size = deferred_stable.size();
 
@@ -12104,7 +12135,7 @@ void BlueStore::_kv_finalize_thread()
 
       while (!kv_committed.empty()) {
        TransContext *txc = kv_committed.front();
-       ceph_assert(txc->state == TransContext::STATE_KV_SUBMITTED);
+       ceph_assert(txc->get_state() == TransContext::STATE_KV_SUBMITTED);
        _txc_state_proc(txc);
        kv_committed.pop_front();
       }
@@ -12310,7 +12341,7 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
       for (auto& i : b->txcs) {
        TransContext *txc = &i;
        throttle.log_state_latency(*txc, logger, l_bluestore_state_deferred_aio_wait_lat);
-       txc->state = TransContext::STATE_DEFERRED_CLEANUP;
+       txc->set_state(TransContext::STATE_DEFERRED_CLEANUP);
        costs += txc->cost;
       }
     }
@@ -12362,7 +12393,7 @@ int BlueStore::_deferred_replay()
     }
     TransContext *txc = _txc_create(ch.get(), osr,  nullptr);
     txc->deferred_txn = deferred_txn;
-    txc->state = TransContext::STATE_KV_DONE;
+    txc->set_state(TransContext::STATE_KV_DONE);
     _txc_state_proc(txc);
   }
  out:
@@ -12398,7 +12429,7 @@ int BlueStore::queue_transactions(
 
   // prepare
   TransContext *txc = _txc_create(static_cast<Collection*>(ch.get()), osr,
-                                 &on_commit);
+                                 &on_commit, op);
 
   // With HM-SMR drives (and ZNS SSDs) we want the I/O allocation and I/O
   // submission to happen atomically because if I/O submission happens in a
@@ -12428,6 +12459,13 @@ int BlueStore::queue_transactions(
   }
 
   _txc_finalize_kv(txc, txc->t);
+
+#ifdef WITH_BLKIN
+  if (txc->trace) {
+    txc->trace.event("txc encode finished");
+  }
+#endif
+
   if (handle)
     handle->suspend_tp_timeout();
 
@@ -12479,6 +12517,12 @@ int BlueStore::queue_transactions(
     }
   }
 
+#ifdef WITH_BLKIN
+  if (txc->trace) {
+    txc->trace.event("txc applied");
+  }
+#endif
+
   log_latency("submit_transact",
     l_bluestore_submit_lat,
     mono_clock::now() - start,
index 95c55a04ff119a13785a96154770c7e6039bdcd3..d67c05709173a4debd5df079b8e4b3f57ddb2f3a 100644 (file)
 #include "BlueFS.h"
 #include "common/EventTrace.h"
 
+#ifdef WITH_BLKIN
+#include "common/zipkin_trace.h"
+#endif
+
 class Allocator;
 class FreelistManager;
 class BlueStoreRepairer;
@@ -1508,8 +1512,6 @@ public:
       STATE_DONE,
     } state_t;
 
-    state_t state = STATE_PREPARE;
-
     const char *get_state_name() {
       switch (state) {
       case STATE_PREPARE: return "prepare";
@@ -1545,6 +1547,18 @@ public:
     }
 #endif
 
+    inline void set_state(state_t s) {
+       state = s;
+#ifdef WITH_BLKIN
+       if (trace) {
+         trace.event(get_state_name());
+       } 
+#endif
+    }
+    inline state_t get_state() {
+      return state;
+    }
+
     CollectionRef ch;
     OpSequencerRef osr;  // this should be ch->osr
     boost::intrusive::list_member_hook<> sequencer_item;
@@ -1581,6 +1595,10 @@ public:
     bool tracing = false;
 #endif
 
+#ifdef WITH_BLKIN
+    ZTracer::Trace trace;
+#endif
+
     explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
                          std::list<Context*> *on_commits)
       : ch(c),
@@ -1593,6 +1611,11 @@ public:
       }
     }
     ~TransContext() {
+#ifdef WITH_BLKIN
+      if (trace) {
+        trace.event("txc destruct");
+      }
+#endif
       delete deferred_txn;
     }
 
@@ -1619,6 +1642,8 @@ public:
     void aio_finish(BlueStore *store) override {
       store->txc_aio_finish(this);
     }
+  private:
+    state_t state = STATE_PREPARE;
   };
 
   class BlueStoreThrottle {
@@ -1822,7 +1847,7 @@ public:
       // caller must hold qlock & q.empty() must not empty
       ceph_assert(!q.empty());
       TransContext *txc = &q.back();
-      if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
+      if (txc->get_state() >= TransContext::STATE_KV_SUBMITTED) {
        return true;
       }
       return false;
@@ -1858,7 +1883,7 @@ public:
        } else {
          auto it = q.rbegin();
          it++;
-         if (it->state >= TransContext::STATE_KV_SUBMITTED) {
+         if (it->get_state() >= TransContext::STATE_KV_SUBMITTED) {
            --kv_submitted_waiters;
            return;
           }
@@ -1874,7 +1899,7 @@ public:
        return true;
       }
       TransContext *txc = &q.back();
-      if (txc->state >= TransContext::STATE_KV_DONE) {
+      if (txc->get_state() >= TransContext::STATE_KV_DONE) {
        return true;
       }
       txc->oncommits.push_back(c);
@@ -2237,6 +2262,10 @@ private:
     void _resize_shards(bool interval_stats);
   } mempool_thread;
 
+#ifdef WITH_BLKIN
+  ZTracer::Endpoint trace_endpoint {"0.0.0.0", 0, "BlueStore"};
+#endif
+
   // --------------------------------------------------------
   // private methods
 
@@ -2348,7 +2377,8 @@ private:
   friend void _dump_transaction(CephContext *cct, Transaction *t);
 
   TransContext *_txc_create(Collection *c, OpSequencer *osr,
-                           std::list<Context*> *on_commits);
+                           std::list<Context*> *on_commits,
+                           TrackedOpRef osd_op=TrackedOpRef());
   void _txc_update_store_statfs(TransContext *txc);
   void _txc_add_transaction(TransContext *txc, Transaction *t);
   void _txc_calc_cost(TransContext *txc);