});
}
+RecordScanner::read_validate_record_metadata_ret CircularBoundedJournal::read_validate_record_metadata(
+ scan_valid_records_cursor &cursor,
+ segment_nonce_t nonce)
+{
+ LOG_PREFIX(CircularBoundedJournal::read_validate_record_metadata);
+ paddr_t start = cursor.seq.offset;
+ return read_record(start, 0
+ ).safe_then([FNAME, &cursor](auto ret) {
+ if (!ret.has_value()) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ auto [r_header, bl] = *ret;
+ if ((cursor.last_committed != JOURNAL_SEQ_NULL &&
+ cursor.last_committed > r_header.committed_to) ||
+ r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
+ DEBUG("invalid header: {}", r_header);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+
+ bufferlist mdbuf;
+ mdbuf.substr_of(bl, 0, r_header.mdlength);
+ DEBUG("header: {}", r_header);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(r_header), std::move(mdbuf)));
+ });
+}
+
+RecordScanner::read_validate_data_ret CircularBoundedJournal::read_validate_data(
+ paddr_t record_base,
+ const record_group_header_t &header)
+{
+ return read_record(record_base, header.segment_nonce
+ ).safe_then([](auto ret) {
+ // read_record would return non-empty value if the record is valid
+ if (!ret.has_value()) {
+ return read_validate_data_ret(
+ read_validate_data_ertr::ready_future_marker{},
+ false);
+ }
+ return read_validate_data_ertr::make_ready_future<bool>(true);
+ });
+}
+
+Journal::replay_ret CircularBoundedJournal::replay_segment(
+ cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor)
+{
+ LOG_PREFIX(Journal::replay_segment);
+ return seastar::do_with(
+ RecordScanner::found_record_handler_t(
+ [this, &handler, FNAME](
+ record_locator_t locator,
+ const record_group_header_t& r_header,
+ const bufferlist& mdbuf)
+ -> RecordScanner::scan_valid_records_ertr::future<>
+ {
+ auto maybe_record_deltas_list = try_decode_deltas(
+ r_header, mdbuf, locator.record_block_base);
+ if (!maybe_record_deltas_list) {
+ // This should be impossible, we did check the crc on the mdbuf
+ ERROR("unable to decode deltas for record {} at {}",
+ r_header, locator.record_block_base);
+ return crimson::ct_error::input_output_error::make();
+ }
+ auto cursor_addr = convert_paddr_to_abs_addr(r_header.committed_to.offset);
+ DEBUG("{} at {}", r_header, cursor_addr);
+ auto write_result = write_result_t{
+ r_header.committed_to,
+ r_header.mdlength + r_header.dlength
+ };
+ auto expected_seq = r_header.committed_to.segment_seq;
+ cursor_addr += (r_header.mdlength + r_header.dlength);
+ if (cursor_addr >= get_journal_end()) {
+ cursor_addr = get_records_start();
+ ++expected_seq;
+ paddr_t addr = convert_abs_addr_to_paddr(
+ cursor_addr,
+ get_device_id());
+ write_result.start_seq.offset = addr;
+ write_result.start_seq.segment_seq = expected_seq;
+ }
+ paddr_t addr = convert_abs_addr_to_paddr(
+ cursor_addr,
+ get_device_id());
+ set_written_to(
+ journal_seq_t{expected_seq, addr});
+ return seastar::do_with(
+ std::move(*maybe_record_deltas_list),
+ [write_result,
+ &handler,
+ FNAME](auto& record_deltas_list) {
+ return crimson::do_for_each(
+ record_deltas_list,
+ [write_result,
+ &handler, FNAME](record_deltas_t& record_deltas) {
+ auto locator = record_locator_t{
+ record_deltas.record_block_base,
+ write_result
+ };
+ DEBUG("processing {} deltas at block_base {}",
+ record_deltas.deltas.size(),
+ locator);
+ return crimson::do_for_each(
+ record_deltas.deltas,
+ [locator,
+ &handler](auto& p) {
+ auto& modify_time = p.first;
+ auto& delta = p.second;
+ return handler(
+ locator,
+ delta,
+ modify_time).discard_result();
+ });
+ });
+ });
+ }),
+ [=, this, &cursor](auto &dhandler) {
+ return scan_valid_records(
+ cursor,
+ 0,
+ std::numeric_limits<size_t>::max(),
+ dhandler).safe_then([](auto){}
+ ).handle_error(
+ replay_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "shouldn't meet with any other error other replay_ertr"
+ }
+ );
+ }
+ );
+}
+
+
Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta(
- cbj_delta_handler_t &&delta_handler, journal_seq_t tail)
+ cbj_delta_handler_t &&handler, journal_seq_t tail)
{
- LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta);
+ LOG_PREFIX(Journal::scan_valid_record_delta);
+ INFO("starting at {} ", tail);
return seastar::do_with(
+ scan_valid_records_cursor(tail),
+ std::move(handler),
bool(false),
- rbm_abs_addr(get_rbm_addr(tail)),
- std::move(delta_handler),
- segment_seq_t(NULL_SEG_SEQ),
- [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
- return crimson::repeat(
- [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
- -> replay_ertr::future<seastar::stop_iteration> {
- paddr_t record_paddr = convert_abs_addr_to_paddr(
- cursor_addr,
- get_device_id());
- return read_record(record_paddr, expected_seq
- ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
- -> replay_ertr::future<seastar::stop_iteration> {
- if (!ret.has_value()) {
- if (expected_seq == NULL_SEG_SEQ || is_rolled) {
- DEBUG("no more records, stop replaying");
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- } else {
- cursor_addr = get_records_start();
- ++expected_seq;
- is_rolled = true;
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
- }
- }
- auto [r_header, bl] = *ret;
- bufferlist mdbuf;
- mdbuf.substr_of(bl, 0, r_header.mdlength);
- paddr_t record_block_base = paddr_t::make_blk_paddr(
- get_device_id(), cursor_addr + r_header.mdlength);
- auto maybe_record_deltas_list = try_decode_deltas(
- r_header, mdbuf, record_block_base);
- if (!maybe_record_deltas_list) {
- // This should be impossible, we did check the crc on the mdbuf
- ERROR("unable to decode deltas for record {} at {}",
- r_header, record_block_base);
- return crimson::ct_error::input_output_error::make();
- }
- DEBUG("{} at {}", r_header, cursor_addr);
- auto write_result = write_result_t{
- r_header.committed_to,
- bl.length()
- };
- if (expected_seq == NULL_SEG_SEQ) {
- expected_seq = r_header.committed_to.segment_seq;
- } else {
- assert(expected_seq == r_header.committed_to.segment_seq);
- }
- cursor_addr += bl.length();
- if (cursor_addr >= get_journal_end()) {
- assert(cursor_addr == get_journal_end());
- cursor_addr = get_records_start();
- ++expected_seq;
- paddr_t addr = convert_abs_addr_to_paddr(
- cursor_addr,
- get_device_id());
- write_result.start_seq.offset = addr;
- write_result.start_seq.segment_seq = expected_seq;
- is_rolled = true;
- }
- paddr_t addr = convert_abs_addr_to_paddr(
- cursor_addr,
- get_device_id());
- set_written_to(
- journal_seq_t{expected_seq, addr});
- return seastar::do_with(
- std::move(*maybe_record_deltas_list),
- [write_result,
- &d_handler,
- FNAME](auto& record_deltas_list) {
- return crimson::do_for_each(
- record_deltas_list,
- [write_result,
- &d_handler, FNAME](record_deltas_t& record_deltas) {
- auto locator = record_locator_t{
- record_deltas.record_block_base,
- write_result
- };
- DEBUG("processing {} deltas at block_base {}",
- record_deltas.deltas.size(),
- locator);
- return crimson::do_for_each(
- record_deltas.deltas,
- [locator,
- &d_handler](auto& p) {
- auto& modify_time = p.first;
- auto& delta = p.second;
- return d_handler(
- locator,
- delta,
- modify_time).discard_result();
- });
- }).safe_then([]() {
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
- });
- });
+ [this] (auto &cursor, auto &handler, auto &rolled) {
+ return crimson::repeat([this, &handler, &cursor, &rolled]()
+ -> replay_ertr::future<seastar::stop_iteration>
+ {
+ return replay_segment(handler, cursor
+ ).safe_then([this, &cursor, &rolled] {
+ if (!rolled) {
+ cursor.last_valid_header_found = false;
+ }
+ if (!cursor.is_complete()) {
+ try_read_rolled_header(cursor);
+ rolled = true;
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ }
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
});
});
});
}
+
Journal::replay_ret CircularBoundedJournal::replay(
delta_handler_t &&delta_handler)
{