return seastar::do_with(
rbm_abs_addr(get_applied_to()),
std::move(delta_handler),
- [this, FNAME](auto &cursor_addr, auto &d_handler) {
+ segment_seq_t(),
+ [this, FNAME](auto &cursor_addr, auto &d_handler, auto &last_seq) {
return crimson::repeat(
- [this, &cursor_addr, &d_handler, FNAME]() mutable
+ [this, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable
-> replay_ertr::future<seastar::stop_iteration> {
paddr_t cursor_paddr = convert_abs_addr_to_paddr(
cursor_addr,
header.device_id);
return read_record(cursor_paddr
- ).safe_then([this, &cursor_addr, &d_handler, FNAME](auto ret) {
+ ).safe_then([this, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret) {
+ if (!ret.has_value()) {
+ DEBUG("no more records");
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
auto [r_header, bl] = *ret;
+ if (last_seq > r_header.committed_to.segment_seq) {
+ DEBUG("found invalide record. stop replaying");
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
bufferlist mdbuf;
mdbuf.substr_of(bl, 0, r_header.mdlength);
paddr_t record_block_base = paddr_t::make_blk_paddr(
r_header.committed_to,
(seastore_off_t)bl.length()
};
+ set_last_committed_record_base(cursor_addr);
cursor_addr += bl.length();
+ set_written_to(cursor_addr);
+ last_seq = r_header.committed_to.segment_seq;
return seastar::do_with(
std::move(*maybe_record_deltas_list),
[write_result,
ceph::encoded_sizeof_bounded<record_group_header_t>() > header.end) {
cursor_addr = get_start_addr();
}
- if (cursor_addr == get_written_to()) {
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
void update_applied_to(rbm_abs_addr addr, uint32_t len) {
cbj->update_applied_to(addr, len);
}
+ void set_last_committed_record_base(rbm_abs_addr addr) {
+ cbj->set_last_committed_record_base(addr);
+ }
+ void set_written_to(rbm_abs_addr addr) {
+ cbj->set_written_to(addr);
+ }
};
TEST_F(cbjournal_test_t, submit_one_record)
replay();
});
}
+
+TEST_F(cbjournal_test_t, replay_after_reset)
+{
+ run_async([this] {
+ mkfs();
+ open();
+ record_t rec {
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(20), generate_delta(21) }
+ };
+ auto r_size = record_group_size_t(rec.size, block_size);
+ auto record_total_size = r_size.get_encoded_length();
+ submit_record(std::move(rec));
+ while (record_total_size <= get_available_size()) {
+ submit_record(
+ record_t {
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(20), generate_delta(21) }
+ });
+ }
+ auto old_written_to = get_written_to();
+ auto old_last_committed_record_base = get_last_committed_record_base();
+ set_written_to(4096);
+ set_last_committed_record_base(4096);
+ replay();
+ ASSERT_EQ(old_written_to, get_written_to());
+ ASSERT_EQ(old_last_committed_record_base,
+ get_last_committed_record_base());
+ });
+}