});
}
+Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta(
+ cbj_delta_handler_t &&delta_handler, journal_seq_t tail)
+{
+ LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta);
+ return seastar::do_with(
+ bool(false),
+ rbm_abs_addr(get_rbm_addr(tail)),
+ std::move(delta_handler),
+ segment_seq_t(NULL_SEG_SEQ),
+ [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
+ return crimson::repeat(
+ [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
+ -> replay_ertr::future<seastar::stop_iteration> {
+ paddr_t record_paddr = convert_abs_addr_to_paddr(
+ cursor_addr,
+ get_device_id());
+ return read_record(record_paddr, expected_seq
+ ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
+ -> replay_ertr::future<seastar::stop_iteration> {
+ if (!ret.has_value()) {
+ if (expected_seq == NULL_SEG_SEQ || is_rolled) {
+ DEBUG("no more records, stop replaying");
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ } else {
+ cursor_addr = get_records_start();
+ ++expected_seq;
+ is_rolled = true;
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ }
+ }
+ auto [r_header, bl] = *ret;
+ bufferlist mdbuf;
+ mdbuf.substr_of(bl, 0, r_header.mdlength);
+ paddr_t record_block_base = paddr_t::make_blk_paddr(
+ get_device_id(), cursor_addr + r_header.mdlength);
+ auto maybe_record_deltas_list = try_decode_deltas(
+ r_header, mdbuf, record_block_base);
+ if (!maybe_record_deltas_list) {
+ // This should be impossible, we did check the crc on the mdbuf
+ ERROR("unable to decode deltas for record {} at {}",
+ r_header, record_block_base);
+ return crimson::ct_error::input_output_error::make();
+ }
+ DEBUG("{} at {}", r_header, cursor_addr);
+ auto write_result = write_result_t{
+ r_header.committed_to,
+ bl.length()
+ };
+ if (expected_seq == NULL_SEG_SEQ) {
+ expected_seq = r_header.committed_to.segment_seq;
+ } else {
+ assert(expected_seq == r_header.committed_to.segment_seq);
+ }
+ cursor_addr += bl.length();
+ if (cursor_addr >= get_journal_end()) {
+ assert(cursor_addr == get_journal_end());
+ cursor_addr = get_records_start();
+ ++expected_seq;
+ is_rolled = true;
+ }
+ paddr_t addr = convert_abs_addr_to_paddr(
+ cursor_addr,
+ get_device_id());
+ set_written_to(
+ journal_seq_t{expected_seq, addr});
+ return seastar::do_with(
+ std::move(*maybe_record_deltas_list),
+ [write_result,
+ &d_handler,
+ FNAME](auto& record_deltas_list) {
+ return crimson::do_for_each(
+ record_deltas_list,
+ [write_result,
+ &d_handler, FNAME](record_deltas_t& record_deltas) {
+ auto locator = record_locator_t{
+ record_deltas.record_block_base,
+ write_result
+ };
+ DEBUG("processing {} deltas at block_base {}",
+ record_deltas.deltas.size(),
+ locator);
+ return crimson::do_for_each(
+ record_deltas.deltas,
+ [locator,
+ &d_handler](auto& p) {
+ auto& modify_time = p.first;
+ auto& delta = p.second;
+ return d_handler(
+ locator,
+ delta,
+ modify_time).discard_result();
+ });
+ }).safe_then([]() {
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ });
+ });
+ });
+ });
+ });
+}
+
Journal::replay_ret CircularBoundedJournal::replay(
delta_handler_t &&delta_handler)
{
open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error read_header"
- }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p) mutable {
+ }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p)
+ mutable {
auto &[head, bl] = *p;
header = head;
DEBUG("header : {}", header);
initialized = true;
- written_to.segment_seq = NULL_SEG_SEQ;
- auto tail = get_dirty_tail() <= get_alloc_tail() ?
- get_dirty_tail() : get_alloc_tail();
- set_written_to(tail);
return seastar::do_with(
- bool(false),
- rbm_abs_addr(get_rbm_addr(tail)),
std::move(delta_handler),
- segment_seq_t(NULL_SEG_SEQ),
- [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
- return crimson::repeat(
- [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
- -> replay_ertr::future<seastar::stop_iteration> {
- paddr_t record_paddr = convert_abs_addr_to_paddr(
- cursor_addr,
- get_device_id());
- return read_record(record_paddr, expected_seq
- ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
- -> replay_ertr::future<seastar::stop_iteration> {
- if (!ret.has_value()) {
- if (expected_seq == NULL_SEG_SEQ || is_rolled) {
- DEBUG("no more records, stop replaying");
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- } else {
- cursor_addr = get_records_start();
- ++expected_seq;
- is_rolled = true;
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
+ std::map<paddr_t, journal_seq_t>(),
+ [this](auto &d_handler, auto &map) {
+ auto build_paddr_seq_map = [&map](
+ const auto &offsets,
+ const auto &e,
+ sea_time_point modify_time)
+ {
+ if (e.type == extent_types_t::ALLOC_INFO) {
+ alloc_delta_t alloc_delta;
+ decode(alloc_delta, e.bl);
+ if (alloc_delta.op == alloc_delta_t::op_types_t::CLEAR) {
+ for (auto &alloc_blk : alloc_delta.alloc_blk_ranges) {
+ map[alloc_blk.paddr] = offsets.write_result.start_seq;
}
}
- auto [r_header, bl] = *ret;
- bufferlist mdbuf;
- mdbuf.substr_of(bl, 0, r_header.mdlength);
- paddr_t record_block_base = paddr_t::make_blk_paddr(
- get_device_id(), cursor_addr + r_header.mdlength);
- auto maybe_record_deltas_list = try_decode_deltas(
- r_header, mdbuf, record_block_base);
- if (!maybe_record_deltas_list) {
- // This should be impossible, we did check the crc on the mdbuf
- ERROR("unable to decode deltas for record {} at {}",
- r_header, record_block_base);
- return crimson::ct_error::input_output_error::make();
+ }
+ return replay_ertr::make_ready_future<bool>(true);
+ };
+ written_to.segment_seq = NULL_SEG_SEQ;
+ auto tail = get_dirty_tail() <= get_alloc_tail() ?
+ get_dirty_tail() : get_alloc_tail();
+ set_written_to(tail);
+ // The first pass to build the paddr->journal_seq_t map
+ // from extent allocations
+ return scan_valid_record_delta(std::move(build_paddr_seq_map), tail
+ ).safe_then([this, &map, &d_handler, tail]() {
+ auto call_d_handler_if_valid = [this, &map, &d_handler](
+ const auto &offsets,
+ const auto &e,
+ sea_time_point modify_time)
+ {
+ if (map.find(e.paddr) == map.end() ||
+ map[e.paddr] <= offsets.write_result.start_seq) {
+ return d_handler(
+ offsets,
+ e,
+ header.dirty_tail,
+ header.alloc_tail,
+ modify_time
+ );
}
- DEBUG("{} at {}", r_header, cursor_addr);
- auto write_result = write_result_t{
- r_header.committed_to,
- bl.length()
- };
- if (expected_seq == NULL_SEG_SEQ) {
- expected_seq = r_header.committed_to.segment_seq;
- } else {
- assert(expected_seq == r_header.committed_to.segment_seq);
- }
- cursor_addr += bl.length();
- if (cursor_addr >= get_journal_end()) {
- assert(cursor_addr == get_journal_end());
- cursor_addr = get_records_start();
- ++expected_seq;
- is_rolled = true;
- }
- paddr_t addr = convert_abs_addr_to_paddr(
- cursor_addr,
- get_device_id());
- set_written_to(
- journal_seq_t{expected_seq, addr});
- return seastar::do_with(
- std::move(*maybe_record_deltas_list),
- [this,
- write_result,
- &d_handler,
- FNAME](auto& record_deltas_list) {
- return crimson::do_for_each(
- record_deltas_list,
- [this,
- write_result,
- &d_handler, FNAME](record_deltas_t& record_deltas) {
- auto locator = record_locator_t{
- record_deltas.record_block_base,
- write_result
- };
- DEBUG("processing {} deltas at block_base {}",
- record_deltas.deltas.size(),
- locator);
- return crimson::do_for_each(
- record_deltas.deltas,
- [this,
- locator,
- &d_handler](auto& p) {
- auto& modify_time = p.first;
- auto& delta = p.second;
- return d_handler(
- locator,
- delta,
- header.dirty_tail,
- header.alloc_tail,
- modify_time).discard_result();
- });
- }).safe_then([]() {
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
- });
- });
- });
+ return replay_ertr::make_ready_future<bool>(true);
+ };
+ // The second pass to replay deltas
+ return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
});
}).safe_then([this]() {
trimmer.update_journal_tails(