);
std::pair<txn_stage_t, sm::label_instance> labels_by_stage[] = {
- {txn_stage_t::COLLOCK_WAIT, sm::label_instance("stage", "collock_wait")},
- {txn_stage_t::THROTTLER_WAIT, sm::label_instance("stage", "throttler_wait")},
- {txn_stage_t::BUILD, sm::label_instance("stage", "build")},
- {txn_stage_t::SUBMIT, sm::label_instance("stage", "submit")},
+ {txn_stage_t::COLLOCK_WAIT, sm::label_instance("stage", "collock_wait")},
+ {txn_stage_t::THROTTLER_WAIT, sm::label_instance("stage", "throttler_wait")},
+ {txn_stage_t::BUILD, sm::label_instance("stage", "build")},
+ {txn_stage_t::SUBMIT_TOTAL, sm::label_instance("stage", "submit_total")},
+ {txn_stage_t::SUBMIT_RESERVE, sm::label_instance("stage", "submit_reserve")},
+ {txn_stage_t::SUBMIT_OOL_WRITE, sm::label_instance("stage", "submit_ool_write")},
+ {txn_stage_t::SUBMIT_LBA_UPDATE, sm::label_instance("stage", "submit_lba_update")},
+ {txn_stage_t::SUBMIT_PREPARE_ENTER, sm::label_instance("stage", "submit_prepare_enter")},
+ {txn_stage_t::SUBMIT_PREPARE_RECORD, sm::label_instance("stage", "submit_prepare_record")},
+ {txn_stage_t::SUBMIT_JOURNAL, sm::label_instance("stage", "submit_journal")},
};
for (auto& [stage, label] : labels_by_stage) {
auto idx = static_cast<std::size_t>(stage);
add_stage_latency_sample(txn_stage_t::COLLOCK_WAIT, collock_wait);
add_stage_latency_sample(txn_stage_t::THROTTLER_WAIT, throttler_wait);
add_stage_latency_sample(txn_stage_t::BUILD, ctx.build_time);
- add_stage_latency_sample(txn_stage_t::SUBMIT, ctx.submit_time);
+ add_stage_latency_sample(txn_stage_t::SUBMIT_TOTAL, ctx.submit_time);
+ {
+ auto& pd = ctx.transaction->get_phase_durations();
+ add_stage_latency_sample(txn_stage_t::SUBMIT_RESERVE, pd.reserve);
+ add_stage_latency_sample(txn_stage_t::SUBMIT_OOL_WRITE, pd.ool_write);
+ add_stage_latency_sample(txn_stage_t::SUBMIT_LBA_UPDATE, pd.lba_update);
+ add_stage_latency_sample(txn_stage_t::SUBMIT_PREPARE_ENTER, pd.prepare_enter);
+ add_stage_latency_sample(txn_stage_t::SUBMIT_PREPARE_RECORD, pd.prepare_record);
+ add_stage_latency_sample(txn_stage_t::SUBMIT_JOURNAL, pd.journal);
+ }
add_latency_sample(
op_type_t::DO_TRANSACTION,
std::chrono::steady_clock::now() - ctx.begin_timestamp);
COLLOCK_WAIT = 0, // waiting on the collection ordering_lock
THROTTLER_WAIT, // waiting for a throttler slot
BUILD, // building the transaction (_do_transaction_step loop)
- SUBMIT, // submit_transaction (pipeline + journal write)
+ SUBMIT_TOTAL, // the whole submit_transaction (pipeline + journal write)
+ // Sub-phases of submit_transaction:
+ SUBMIT_RESERVE, // enter(reserve_projected_usage) + epm reserve_projected_usage
+ SUBMIT_OOL_WRITE, // write_delayed + write_preallocated OOL extents (device I/O)
+ SUBMIT_LBA_UPDATE, // update_lba_mappings
+ SUBMIT_PREPARE_ENTER, // enter(prepare) pipeline stage (global OrderedExclusive wait)
+ SUBMIT_PREPARE_RECORD, // prepare_record (record encoding)
+ SUBMIT_JOURNAL, // journal->submit_record -- POST-lock (not part of the hold)
MAX
};
#pragma once
+#include <chrono>
#include <iostream>
#include <boost/intrusive/list.hpp>
return num_replays;
}
+ // Time spent in each sub-phase of submit_transaction, accumulated across retries
+ struct phase_durations_t {
+ std::chrono::steady_clock::duration reserve{0}; // enter reserve + epm reserve
+ std::chrono::steady_clock::duration ool_write{0}; // delayed + preallocated OOL writes
+ std::chrono::steady_clock::duration lba_update{0}; // update_lba_mappings
+ std::chrono::steady_clock::duration prepare_enter{0}; // enter(prepare) pipeline stage
+ std::chrono::steady_clock::duration prepare_record{0}; // prepare_record
+ std::chrono::steady_clock::duration journal{0}; // journal->submit_record (post-lock)
+ };
+ phase_durations_t &get_phase_durations() {
+ return phase_durations;
+ }
+
auto &get_handle() {
return handle;
}
std::size_t num_replays = 0;
+ phase_durations_t phase_durations;
+
bool has_reset = false;
OrderingHandle handle;
{
LOG_PREFIX(TransactionManager::submit_transaction);
SUBDEBUGT(seastore_t, "start, entering reserve_projected_usage", t);
+ auto reserve_start = std::chrono::steady_clock::now();
co_await trans_intr::make_interruptible(
t.get_handle().enter(write_pipeline.reserve_projected_usage)
);
projected_usage
)
);
+ t.get_phase_durations().reserve +=
+ std::chrono::steady_clock::now() - reserve_start;
auto release_usage = seastar::defer([this, FNAME, projected_usage, &t] {
SUBTRACET(seastore_t, "releasing projected_usage: {}", t, projected_usage);
epm->release_projected_usage(projected_usage);
);
SUBTRACET(seastore_t, "write delayed ool extents", tref);
+ auto ool_start = std::chrono::steady_clock::now();
co_await epm->write_delayed_ool_extents(
tref, dispatch_result.alloc_map
);
+ tref.get_phase_durations().ool_write +=
+ std::chrono::steady_clock::now() - ool_start;
auto allocated_extents = tref.get_valid_pre_alloc_list();
+ auto lba_start = std::chrono::steady_clock::now();
co_await update_lba_mappings(tref, allocated_extents);
+ tref.get_phase_durations().lba_update +=
+ std::chrono::steady_clock::now() - lba_start;
auto num_extents = allocated_extents.size();
SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents);
+ ool_start = std::chrono::steady_clock::now();
co_await epm->write_preallocated_ool_extents(tref, allocated_extents);
+ tref.get_phase_durations().ool_write +=
+ std::chrono::steady_clock::now() - ool_start;
SUBTRACET(seastore_t, "entering prepare", tref);
+ auto prepare_enter_start = std::chrono::steady_clock::now();
co_await trans_intr::make_interruptible(
tref.get_handle().enter(write_pipeline.prepare)
);
+ tref.get_phase_durations().prepare_enter +=
+ std::chrono::steady_clock::now() - prepare_enter_start;
while (tref.need_wait_visibility) {
co_await trans_intr::make_interruptible(seastar::yield());
cache->trim_backref_bufs(*trim_alloc_to);
}
+ auto prepare_record_start = std::chrono::steady_clock::now();
auto record = cache->prepare_record(
tref,
journal->get_trimmer().get_journal_head(),
journal->get_trimmer().get_dirty_tail());
+ tref.get_phase_durations().prepare_record +=
+ std::chrono::steady_clock::now() - prepare_record_start;
tref.get_handle().maybe_release_collection_lock();
if (tref.get_src() == Transaction::src_t::MUTATE) {
}
SUBTRACET(seastore_t, "submitting record", tref);
+ auto journal_start = std::chrono::steady_clock::now();
co_await journal->submit_record(
std::move(record),
tref.get_handle(),
submit_transaction_iertr::pass_further{},
crimson::ct_error::assert_all("Hit error submitting to journal")
);
+ tref.get_phase_durations().journal +=
+ std::chrono::steady_clock::now() - journal_start;
co_await trans_intr::make_interruptible(
tref.get_handle().complete()