}
);
+ std::pair<txn_stage_t, sm::label_instance> labels_by_stage[] = {
+ {txn_stage_t::COLLOCK_WAIT, sm::label_instance("stage", "collock_wait")},
+ {txn_stage_t::THROTTLER_WAIT, sm::label_instance("stage", "throttler_wait")},
+ {txn_stage_t::BUILD, sm::label_instance("stage", "build")},
+ {txn_stage_t::SUBMIT, sm::label_instance("stage", "submit")},
+ };
+ for (auto& [stage, label] : labels_by_stage) {
+ auto idx = static_cast<std::size_t>(stage);
+ auto& hist = stats.stage_lat[idx];
+ hist.buckets.resize(STAGE_LAT_BUCKETS_US.size());
+ for (std::size_t i = 0; i < STAGE_LAT_BUCKETS_US.size(); ++i) {
+ hist.buckets[i].upper_bound = STAGE_LAT_BUCKETS_US[i];
+ hist.buckets[i].count = 0;
+ }
+ metrics.add_group(
+ "seastore",
+ {
+ sm::make_histogram(
+ "do_transaction_stage_lat",
+ [this, idx]() -> seastar::metrics::histogram& {
+ return stats.stage_lat[idx];
+ },
+ sm::description("per-stage latency (microseconds) of do_transaction"),
+ {label, sm::label_instance("shard_store_index", std::to_string(store_index))}
+ )
+ }
+ );
+ }
+
metrics.add_group(
"seastore",
{
--(shard_stats.starting_io_num);
++(shard_stats.waiting_collock_io_num);
+ auto t_pre_collock = std::chrono::steady_clock::now();
co_await ctx.transaction->get_handle().take_collection_lock(
static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
);
+ auto collock_wait = std::chrono::steady_clock::now() - t_pre_collock;
assert(shard_stats.waiting_collock_io_num);
--(shard_stats.waiting_collock_io_num);
++(shard_stats.waiting_throttler_io_num);
+ auto t_pre_throttler = std::chrono::steady_clock::now();
co_await throttler.get(1);
+ auto throttler_wait = std::chrono::steady_clock::now() - t_pre_throttler;
assert(shard_stats.waiting_throttler_io_num);
--(shard_stats.waiting_throttler_io_num);
const size_t total_ops = ctx.ext_transaction.get_num_ops();
size_t current_op = 0;
+ auto build_start = std::chrono::steady_clock::now();
while (ctx.iter.have_op()) {
current_op++;
co_await _do_transaction_step(
ctx, ctx.ch, onodes, ctx.iter);
}
+ ctx.build_time += std::chrono::steady_clock::now() - build_start;
DEBUGT("completed all {} ops for cid={}",
t, total_ops, ctx.ch->get_cid());
+ auto submit_start = std::chrono::steady_clock::now();
co_await transaction_manager->submit_transaction(*ctx.transaction);
+ ctx.submit_time += std::chrono::steady_clock::now() - submit_start;
})
).handle_error(
crimson::ct_error::all_same_way([FNAME, &ctx](auto e) {
DEBUGT("done", *ctx.transaction);
add_conflict_replay_sample(ctx.transaction->get_num_replays());
+ add_stage_latency_sample(txn_stage_t::COLLOCK_WAIT, collock_wait);
+ add_stage_latency_sample(txn_stage_t::THROTTLER_WAIT, throttler_wait);
+ add_stage_latency_sample(txn_stage_t::BUILD, ctx.build_time);
+ add_stage_latency_sample(txn_stage_t::SUBMIT, ctx.submit_time);
add_latency_sample(
op_type_t::DO_TRANSACTION,
std::chrono::steady_clock::now() - ctx.begin_timestamp);
MAX
};
+enum class txn_stage_t : uint8_t {
+ COLLOCK_WAIT = 0, // waiting on the collection ordering_lock
+ THROTTLER_WAIT, // waiting for a throttler slot
+ BUILD, // building the transaction (_do_transaction_step loop)
+ SUBMIT, // submit_transaction (pipeline + journal write)
+ MAX
+};
+
class SeastoreCollection final : public FuturizedCollection {
public:
template <typename... T>
ceph::os::Transaction::iterator iter;
std::chrono::steady_clock::time_point begin_timestamp = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::duration build_time{0};
+ std::chrono::steady_clock::duration submit_time{0};
+
void reset_preserve_handle(TransactionManager &tm) {
tm.reset_transaction_preserve_handle(*transaction);
iter = ext_transaction.begin();
// Buckets for the per-transaction conflict/replay distribution.
static constexpr std::size_t REPLAY_BUCKETS = 16;
+ static constexpr auto STAGE_MAX = static_cast<std::size_t>(txn_stage_t::MAX);
+ // Upper bounds (microseconds) for the per-stage do_transaction latency histograms
+ static constexpr std::array<uint64_t, 14> STAGE_LAT_BUCKETS_US = {
+ 250, 500, 1000, 1500, 2000, 3000, 5000, 7500,
+ 10000, 15000, 20000, 30000, 50000, 100000
+ };
+
struct {
std::array<seastar::metrics::histogram, LAT_MAX> op_lat;
seastar::metrics::histogram conflict_replays;
+ std::array<seastar::metrics::histogram, STAGE_MAX> stage_lat;
} stats;
seastar::metrics::histogram& get_latency(
hist.sample_sum += num_replays;
}
+ // Record the latency of one do_transaction stage (microseconds). Buckets are
+ // non-cumulative (bucket = first upper_bound >= value); values above the top
+ // bound aren't bucketed but still land in sample_count/sample_sum.
+ void add_stage_latency_sample(txn_stage_t stage,
+ std::chrono::steady_clock::duration dur) {
+ auto& hist = stats.stage_lat[static_cast<std::size_t>(stage)];
+ if (hist.buckets.empty()) {
+ // register_metrics() did not run (store inactive); nothing to record.
+ return;
+ }
+ auto us = std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
+ for (auto& b : hist.buckets) {
+ if (static_cast<double>(us) <= b.upper_bound) {
+ ++b.count;
+ break;
+ }
+ }
+ ++hist.sample_count;
+ hist.sample_sum += us;
+ }
+
/*
* omaptree interfaces
*/