BlueStore::TransContext *BlueStore::_txc_create(
Collection *c, OpSequencer *osr,
- list<Context*> *on_commits)
+ list<Context*> *on_commits,
+ TrackedOpRef osd_op)
{
TransContext *txc = new TransContext(cct, c, osr, on_commits);
txc->t = db->get_transaction();
+
+#ifdef WITH_BLKIN
+ if (osd_op && osd_op->pg_trace) {
+ txc->trace.init("TransContext", &trace_endpoint,
+ &osd_op->pg_trace);
+ txc->trace.event("txc create");
+ txc->trace.keyval("txc seq", txc->seq);
+ }
+#endif
+
osr->queue_new(txc);
dout(20) << __func__ << " osr " << osr << " = " << txc
<< " seq " << txc->seq << dendl;
while (true) {
dout(10) << __func__ << " txc " << txc
<< " " << txc->get_state_name() << dendl;
- switch (txc->state) {
+ switch (txc->get_state()) {
case TransContext::STATE_PREPARE:
throttle.log_state_latency(*txc, logger, l_bluestore_state_prepare_lat);
if (txc->ioc.has_pending_aios()) {
- txc->state = TransContext::STATE_AIO_WAIT;
+ txc->set_state(TransContext::STATE_AIO_WAIT);
+#ifdef WITH_BLKIN
+ if (txc->trace) {
+ txc->trace.keyval("pending aios", txc->ioc.num_pending.load());
+ }
+#endif
txc->had_ios = true;
_txc_aio_submit(txc);
return;
++txc->osr->txc_with_unstable_io;
}
throttle.log_state_latency(*txc, logger, l_bluestore_state_io_done_lat);
- txc->state = TransContext::STATE_KV_QUEUED;
+ txc->set_state(TransContext::STATE_KV_QUEUED);
if (cct->_conf->bluestore_sync_submit_transaction) {
if (txc->last_nid >= nid_max ||
txc->last_blobid >= blobid_max) {
kv_sync_in_progress = true;
kv_cond.notify_one();
}
- if (txc->state != TransContext::STATE_KV_SUBMITTED) {
+ if (txc->get_state() != TransContext::STATE_KV_SUBMITTED) {
kv_queue_unsubmitted.push_back(txc);
++txc->osr->kv_committing_serially;
}
case TransContext::STATE_KV_DONE:
throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_done_lat);
if (txc->deferred_txn) {
- txc->state = TransContext::STATE_DEFERRED_QUEUED;
+ txc->set_state(TransContext::STATE_DEFERRED_QUEUED);
_deferred_queue(txc);
return;
}
- txc->state = TransContext::STATE_FINISHING;
+ txc->set_state(TransContext::STATE_FINISHING);
break;
case TransContext::STATE_DEFERRED_CLEANUP:
throttle.log_state_latency(*txc, logger, l_bluestore_state_deferred_cleanup_lat);
- txc->state = TransContext::STATE_FINISHING;
+ txc->set_state(TransContext::STATE_FINISHING);
// ** fall-thru **
case TransContext::STATE_FINISHING:
OpSequencer *osr = txc->osr.get();
std::lock_guard l(osr->qlock);
- txc->state = TransContext::STATE_IO_DONE;
+ txc->set_state(TransContext::STATE_IO_DONE);
txc->ioc.release_running_aios();
OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
while (p != osr->q.begin()) {
--p;
- if (p->state < TransContext::STATE_IO_DONE) {
+ if (p->get_state() < TransContext::STATE_IO_DONE) {
dout(20) << __func__ << " " << txc << " blocked by " << &*p << " "
<< p->get_state_name() << dendl;
return;
}
- if (p->state > TransContext::STATE_IO_DONE) {
+ if (p->get_state() > TransContext::STATE_IO_DONE) {
++p;
break;
}
do {
_txc_state_proc(&*p++);
} while (p != osr->q.end() &&
- p->state == TransContext::STATE_IO_DONE);
+ p->get_state() == TransContext::STATE_IO_DONE);
if (osr->kv_submitted_waiters) {
osr->qcond.notify_all();
void BlueStore::_txc_apply_kv(TransContext *txc, bool sync_submit_transaction)
{
- ceph_assert(txc->state == TransContext::STATE_KV_QUEUED);
+ ceph_assert(txc->get_state() == TransContext::STATE_KV_QUEUED);
{
#if defined(WITH_LTTNG)
auto start = mono_clock::now();
#endif
+#ifdef WITH_BLKIN
+ if (txc->trace) {
+ txc->trace.event("db async submit");
+ }
+#endif
+
int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
ceph_assert(r == 0);
- txc->state = TransContext::STATE_KV_SUBMITTED;
+ txc->set_state(TransContext::STATE_KV_SUBMITTED);
if (txc->osr->kv_submitted_waiters) {
std::lock_guard l(txc->osr->qlock);
txc->osr->qcond.notify_all();
throttle.complete_kv(*txc);
{
std::lock_guard l(txc->osr->qlock);
- txc->state = TransContext::STATE_KV_DONE;
+ txc->set_state(TransContext::STATE_KV_DONE);
if (txc->ch->commit_queue) {
txc->ch->commit_queue->queue(txc->oncommits);
} else {
void BlueStore::_txc_finish(TransContext *txc)
{
dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
- ceph_assert(txc->state == TransContext::STATE_FINISHING);
+ ceph_assert(txc->get_state() == TransContext::STATE_FINISHING);
for (auto& sb : txc->shared_blobs_written) {
sb->finish_write(txc->seq);
OpSequencer::q_list_t releasing_txc;
{
std::lock_guard l(osr->qlock);
- txc->state = TransContext::STATE_DONE;
+ txc->set_state(TransContext::STATE_DONE);
bool notify = false;
while (!osr->q.empty()) {
TransContext *txc = &osr->q.front();
dout(20) << __func__ << " txc " << txc << " " << txc->get_state_name()
<< dendl;
- if (txc->state != TransContext::STATE_DONE) {
- if (txc->state == TransContext::STATE_PREPARE &&
+ if (txc->get_state() != TransContext::STATE_DONE) {
+ if (txc->get_state() == TransContext::STATE_PREPARE &&
deferred_aggressive) {
// for _osr_drain_preceding()
notify = true;
}
- if (txc->state == TransContext::STATE_DEFERRED_QUEUED &&
+ if (txc->get_state() == TransContext::STATE_DEFERRED_QUEUED &&
osr->q.size() > g_conf()->bluestore_max_deferred_txc) {
submit_deferred = true;
}
for (auto txc : kv_committing) {
throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_queued_lat);
- if (txc->state == TransContext::STATE_KV_QUEUED) {
+ if (txc->get_state() == TransContext::STATE_KV_QUEUED) {
_txc_apply_kv(txc, false);
--txc->osr->kv_committing_serially;
} else {
- ceph_assert(txc->state == TransContext::STATE_KV_SUBMITTED);
+ ceph_assert(txc->get_state() == TransContext::STATE_KV_SUBMITTED);
}
if (txc->had_ios) {
--txc->osr->txc_with_unstable_io;
int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction_sync(synct);
ceph_assert(r == 0);
+#ifdef WITH_BLKIN
+ for (auto txc : kv_committing) {
+ if (txc->trace) {
+ txc->trace.event("db sync submit");
+ txc->trace.keyval("kv_committing size", kv_committing.size());
+ }
+ }
+#endif
+
int committing_size = kv_committing.size();
int deferred_size = deferred_stable.size();
while (!kv_committed.empty()) {
TransContext *txc = kv_committed.front();
- ceph_assert(txc->state == TransContext::STATE_KV_SUBMITTED);
+ ceph_assert(txc->get_state() == TransContext::STATE_KV_SUBMITTED);
_txc_state_proc(txc);
kv_committed.pop_front();
}
for (auto& i : b->txcs) {
TransContext *txc = &i;
throttle.log_state_latency(*txc, logger, l_bluestore_state_deferred_aio_wait_lat);
- txc->state = TransContext::STATE_DEFERRED_CLEANUP;
+ txc->set_state(TransContext::STATE_DEFERRED_CLEANUP);
costs += txc->cost;
}
}
}
TransContext *txc = _txc_create(ch.get(), osr, nullptr);
txc->deferred_txn = deferred_txn;
- txc->state = TransContext::STATE_KV_DONE;
+ txc->set_state(TransContext::STATE_KV_DONE);
_txc_state_proc(txc);
}
out:
// prepare
TransContext *txc = _txc_create(static_cast<Collection*>(ch.get()), osr,
- &on_commit);
+ &on_commit, op);
// With HM-SMR drives (and ZNS SSDs) we want the I/O allocation and I/O
// submission to happen atomically because if I/O submission happens in a
}
_txc_finalize_kv(txc, txc->t);
+
+#ifdef WITH_BLKIN
+ if (txc->trace) {
+ txc->trace.event("txc encode finished");
+ }
+#endif
+
if (handle)
handle->suspend_tp_timeout();
}
}
+#ifdef WITH_BLKIN
+ if (txc->trace) {
+ txc->trace.event("txc applied");
+ }
+#endif
+
log_latency("submit_transact",
l_bluestore_submit_lat,
mono_clock::now() - start,
#include "BlueFS.h"
#include "common/EventTrace.h"
+#ifdef WITH_BLKIN
+#include "common/zipkin_trace.h"
+#endif
+
class Allocator;
class FreelistManager;
class BlueStoreRepairer;
STATE_DONE,
} state_t;
- state_t state = STATE_PREPARE;
-
const char *get_state_name() {
switch (state) {
case STATE_PREPARE: return "prepare";
}
#endif
+ inline void set_state(state_t s) {
+ state = s;
+#ifdef WITH_BLKIN
+ if (trace) {
+ trace.event(get_state_name());
+ }
+#endif
+ }
+ inline state_t get_state() {
+ return state;
+ }
+
CollectionRef ch;
OpSequencerRef osr; // this should be ch->osr
boost::intrusive::list_member_hook<> sequencer_item;
bool tracing = false;
#endif
+#ifdef WITH_BLKIN
+ ZTracer::Trace trace;
+#endif
+
explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
std::list<Context*> *on_commits)
: ch(c),
}
}
~TransContext() {
+#ifdef WITH_BLKIN
+ if (trace) {
+ trace.event("txc destruct");
+ }
+#endif
delete deferred_txn;
}
void aio_finish(BlueStore *store) override {
store->txc_aio_finish(this);
}
+ private:
+ state_t state = STATE_PREPARE;
};
class BlueStoreThrottle {
// caller must hold qlock & q.empty() must not empty
ceph_assert(!q.empty());
TransContext *txc = &q.back();
- if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
+ if (txc->get_state() >= TransContext::STATE_KV_SUBMITTED) {
return true;
}
return false;
} else {
auto it = q.rbegin();
it++;
- if (it->state >= TransContext::STATE_KV_SUBMITTED) {
+ if (it->get_state() >= TransContext::STATE_KV_SUBMITTED) {
--kv_submitted_waiters;
return;
}
return true;
}
TransContext *txc = &q.back();
- if (txc->state >= TransContext::STATE_KV_DONE) {
+ if (txc->get_state() >= TransContext::STATE_KV_DONE) {
return true;
}
txc->oncommits.push_back(c);
void _resize_shards(bool interval_stats);
} mempool_thread;
+#ifdef WITH_BLKIN
+ ZTracer::Endpoint trace_endpoint {"0.0.0.0", 0, "BlueStore"};
+#endif
+
// --------------------------------------------------------
// private methods
friend void _dump_transaction(CephContext *cct, Transaction *t);
TransContext *_txc_create(Collection *c, OpSequencer *osr,
- std::list<Context*> *on_commits);
+ std::list<Context*> *on_commits,
+ TrackedOpRef osd_op=TrackedOpRef());
void _txc_update_store_statfs(TransContext *txc);
void _txc_add_transaction(TransContext *txc, Transaction *t);
void _txc_calc_cost(TransContext *txc);