const delta_info_t &delta,
const journal_seq_t &dirty_tail,
const journal_seq_t &alloc_tail,
- sea_time_point &modify_time)
+ sea_time_point modify_time)
{
LOG_PREFIX(Cache::replay_delta);
assert(dirty_tail != JOURNAL_SEQ_NULL);
if (delta.type == extent_types_t::JOURNAL_TAIL) {
// this delta should have been dealt with during segment cleaner mounting
- return replay_delta_ertr::now();
+ return replay_delta_ertr::make_ready_future<bool>(false);
}
// replay alloc
if (journal_seq < alloc_tail) {
DEBUG("journal_seq {} < alloc_tail {}, don't replay {}",
journal_seq, alloc_tail, delta);
- return replay_delta_ertr::now();
+ return replay_delta_ertr::make_ready_future<bool>(false);
}
alloc_delta_t alloc_delta;
if (!backref_list.empty()) {
backref_batch_update(std::move(backref_list), journal_seq);
}
- return replay_delta_ertr::now();
+ return replay_delta_ertr::make_ready_future<bool>(true);
}
// replay dirty
if (journal_seq < dirty_tail) {
DEBUG("journal_seq {} < dirty_tail {}, don't replay {}",
journal_seq, dirty_tail, delta);
- return replay_delta_ertr::now();
+ return replay_delta_ertr::make_ready_future<bool>(false);
}
if (delta.type == extent_types_t::ROOT) {
journal_seq, record_base, delta, *root);
root->set_modify_time(modify_time);
add_extent(root);
- return replay_delta_ertr::now();
+ return replay_delta_ertr::make_ready_future<bool>(true);
} else {
auto _get_extent_if_cached = [this](paddr_t addr)
-> get_extent_ertr::future<CachedExtentRef> {
DEBUG("replay extent is not present, so delta is obsolete at {} {} -- {}",
journal_seq, record_base, delta);
assert(delta.pversion > 0);
- return;
+ return replay_delta_ertr::make_ready_future<bool>(true);
}
DEBUG("replay extent delta at {} {} ... -- {}, prv_extent={}",
journal_seq, record_base, delta, *extent);
}
mark_dirty(extent);
+ return replay_delta_ertr::make_ready_future<bool>(true);
});
}
}
* Intended for use in Journal::delta. For each delta, should decode delta,
* read relevant block from disk or cache (using correct type), and call
* CachedExtent::apply_delta marking the extent dirty.
+ *
+ * Returns whether the delta is applied.
*/
using replay_delta_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
- using replay_delta_ret = replay_delta_ertr::future<>;
+ using replay_delta_ret = replay_delta_ertr::future<bool>;
replay_delta_ret replay_delta(
journal_seq_t seq,
paddr_t record_block_base,
const delta_info_t &delta,
const journal_seq_t &dirty_tail,
const journal_seq_t &alloc_tail,
- sea_time_point &modify_time);
+ sea_time_point modify_time);
/**
* init_cached_extents
crimson::ct_error::erange>;
using replay_ret = replay_ertr::future<>;
using delta_handler_t = std::function<
- replay_ret(const record_locator_t&,
- const delta_info_t&,
- const journal_seq_t&, // dirty_tail
- const journal_seq_t&, // alloc_tail
- sea_time_point modify_time)>;
+ replay_ertr::future<bool>(
+ const record_locator_t&,
+ const delta_info_t&,
+ const journal_seq_t&, // dirty_tail
+ const journal_seq_t&, // alloc_tail
+ sea_time_point modify_time)>;
virtual replay_ret replay(
delta_handler_t &&delta_handler) = 0;
delta,
locator.write_result.start_seq,
locator.write_result.start_seq,
- modify_time);
+ modify_time).discard_result();
});
}).safe_then([]() {
return replay_ertr::make_ready_future<
SegmentedJournal::replay_segment(
journal_seq_t seq,
segment_header_t header,
- delta_handler_t &handler)
+ delta_handler_t &handler,
+ replay_stats_t &stats)
{
LOG_PREFIX(Journal::replay_segment);
INFO("starting at {} -- {}", seq, header);
return seastar::do_with(
scan_valid_records_cursor(seq),
SegmentManagerGroup::found_record_handler_t(
- [s_type=header.type, &handler, this](
+ [s_type=header.type, &handler, this, &stats](
record_locator_t locator,
const record_group_header_t& header,
const bufferlist& mdbuf)
-> SegmentManagerGroup::scan_valid_records_ertr::future<>
{
LOG_PREFIX(Journal::replay_segment);
+ ++stats.num_record_groups;
auto maybe_record_deltas_list = try_decode_deltas(
header, mdbuf, locator.record_block_base);
if (!maybe_record_deltas_list) {
s_type,
this,
FNAME,
- &handler](auto& record_deltas_list)
+ &handler,
+ &stats](auto& record_deltas_list)
{
return crimson::do_for_each(
record_deltas_list,
s_type,
this,
FNAME,
- &handler](record_deltas_t& record_deltas)
+ &handler,
+ &stats](record_deltas_t& record_deltas)
{
+ ++stats.num_records;
auto locator = record_locator_t{
record_deltas.record_block_base,
write_result
s_type,
this,
FNAME,
- &handler](auto &p)
+ &handler,
+ &stats](auto &p)
{
auto& modify_time = p.first;
auto& delta = p.second;
delta,
segment_provider.get_dirty_tail(),
segment_provider.get_alloc_tail(),
- modify_time);
+ modify_time
+ ).safe_then([&stats, delta_type=delta.type](bool is_applied) {
+ if (is_applied) {
+ // see Cache::replay_delta()
+ assert(delta_type != extent_types_t::JOURNAL_TAIL);
+ if (delta_type == extent_types_t::ALLOC_INFO) {
+ ++stats.num_alloc_deltas;
+ } else {
+ ++stats.num_dirty_deltas;
+ }
+ }
+ });
});
});
});
(auto &&segment_headers) mutable -> replay_ret {
INFO("got {} segments", segment_headers.size());
return seastar::do_with(
- std::move(delta_handler), replay_segments_t(),
- [this, segment_headers=std::move(segment_headers)]
- (auto &handler, auto &segments) mutable -> replay_ret {
+ std::move(delta_handler),
+ replay_segments_t(),
+ replay_stats_t(),
+ [this, segment_headers=std::move(segment_headers), FNAME]
+ (auto &handler, auto &segments, auto &stats) mutable -> replay_ret {
return prep_replay_segments(std::move(segment_headers)
- ).safe_then([this, &handler, &segments](auto replay_segs) mutable {
+ ).safe_then([this, &handler, &segments, &stats](auto replay_segs) mutable {
segments = std::move(replay_segs);
- return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
- return replay_segment(i.first, i.second, handler);
+ return crimson::do_for_each(segments,[this, &handler, &stats](auto i) mutable {
+ return replay_segment(i.first, i.second, handler, stats);
});
- });
+ }).safe_then([&stats, FNAME] {
+ INFO("replay done, record_groups={}, records={}, "
+ "alloc_deltas={}, dirty_deltas={}",
+ stats.num_record_groups,
+ stats.num_records,
+ stats.num_alloc_deltas,
+ stats.num_dirty_deltas);
+ });
});
});
}
scan_last_segment_ertr::future<> scan_last_segment(
const segment_id_t&, const segment_header_t&);
+ struct replay_stats_t {
+ std::size_t num_record_groups = 0;
+ std::size_t num_records = 0;
+ std::size_t num_alloc_deltas = 0;
+ std::size_t num_dirty_deltas = 0;
+ };
+
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
journal_seq_t start, ///< [in] starting addr, seq
segment_header_t header, ///< [in] segment header
- delta_handler_t &delta_handler ///< [in] processes deltas in order
+ delta_handler_t &delta_handler, ///< [in] processes deltas in order
+ replay_stats_t &stats ///< [out] replay stats
);
};
const auto &e,
const journal_seq_t &dirty_tail,
const journal_seq_t &alloc_tail,
- auto modify_time)
+ sea_time_point modify_time)
{
auto start_seq = offsets.write_result.start_seq;
return cache->replay_delta(
const auto &e,
auto &dirty_seq,
auto &alloc_seq,
- auto last_modified)
- -> Journal::replay_ret {
+ auto last_modified) {
bool found = false;
for (auto &i : entries) {
paddr_t base = offsets.write_result.start_seq.offset;
}
}
assert(found == true);
- return Journal::replay_ertr::now();
+ return Journal::replay_ertr::make_ready_future<bool>(true);
}).unsafe_get0();
}
delta_checker = std::nullopt;
advance();
}
- return Journal::replay_ertr::now();
+ return Journal::replay_ertr::make_ready_future<bool>(true);
}).unsafe_get0();
ASSERT_EQ(record_iter, records.end());
for (auto &i : records) {