paddr_t paddr = convert_abs_addr_to_paddr(
get_written_to(),
header.device_id);
+ if (circulation_seq == NULL_SEG_SEQ) {
+ circulation_seq = 0;
+ }
return open_for_write_ret(
open_for_write_ertr::ready_future_marker{},
journal_seq_t{
- cur_segment_seq,
+ circulation_seq,
paddr
});
}
return open_for_write_ret(
open_for_write_ertr::ready_future_marker{},
journal_seq_t{
- cur_segment_seq,
+ circulation_seq,
paddr
});
});
{
LOG_PREFIX(CircularBoundedJournal::submit_record);
assert(write_pipeline);
+ assert(circulation_seq != NULL_SEG_SEQ);
auto r_size = record_group_size_t(record.size, get_block_size());
auto encoded_size = r_size.get_encoded_length();
if (encoded_size > get_available_size()) {
if (encoded_size + get_written_to() > get_journal_end()) {
DEBUG("roll");
set_written_to(get_start_addr());
+ ++circulation_seq;
if (encoded_size > get_available_size()) {
ERROR("rolled, record size {}, but available size {}",
encoded_size, get_available_size());
}
journal_seq_t j_seq {
- cur_segment_seq++,
+ circulation_seq,
convert_abs_addr_to_paddr(
get_written_to(),
header.device_id)};
assert(new_written_to == get_journal_end());
DEBUG("roll");
set_written_to(get_start_addr());
+ ++circulation_seq;
} else {
set_written_to(new_written_to);
}
LOG_PREFIX(CircularBoundedJournal::replay);
return open_device_read_header(
).safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto) {
+ circulation_seq = NULL_SEG_SEQ;
set_written_to(get_journal_tail());
return seastar::do_with(
bool(false),
rbm_abs_addr(get_journal_tail()),
std::move(delta_handler),
- segment_seq_t(),
- [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &last_seq) {
+ segment_seq_t(NULL_SEG_SEQ),
+ [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
return crimson::repeat(
- [this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable
+ [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
-> replay_ertr::future<seastar::stop_iteration> {
paddr_t record_paddr = convert_abs_addr_to_paddr(
cursor_addr,
header.device_id);
- return read_record(record_paddr, last_seq
- ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret)
+ return read_record(record_paddr, expected_seq
+ ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
-> replay_ertr::future<seastar::stop_iteration> {
if (!ret.has_value()) {
- if (is_rolled) {
+ if (expected_seq == NULL_SEG_SEQ || is_rolled) {
DEBUG("no more records, stop replaying");
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
} else {
cursor_addr = get_start_addr();
+ ++expected_seq;
is_rolled = true;
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
r_header.committed_to,
(seastore_off_t)bl.length()
};
- cur_segment_seq = r_header.committed_to.segment_seq + 1;
+ if (expected_seq == NULL_SEG_SEQ) {
+ expected_seq = r_header.committed_to.segment_seq;
+ } else {
+ assert(expected_seq == r_header.committed_to.segment_seq);
+ }
cursor_addr += bl.length();
if (cursor_addr >= get_journal_end()) {
assert(cursor_addr == get_journal_end());
cursor_addr = get_start_addr();
+ ++expected_seq;
is_rolled = true;
}
set_written_to(cursor_addr);
- last_seq = r_header.committed_to.segment_seq;
+ circulation_seq = expected_seq;
return seastar::do_with(
std::move(*maybe_record_deltas_list),
[write_result,
}
CircularBoundedJournal::read_record_ret
-CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq)
+CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq)
{
LOG_PREFIX(CircularBoundedJournal::read_record);
rbm_abs_addr addr = convert_paddr_to_abs_addr(off);
addr, read_length);
auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
return device->read(addr, bptr
- ).safe_then([this, addr, bptr, last_seq, FNAME]() mutable
+ ).safe_then([this, addr, bptr, expected_seq, FNAME]() mutable
-> read_record_ret {
record_group_header_t h;
bufferlist bl;
h.dlength % get_block_size() != 0 ||
addr + h.mdlength + h.dlength > get_journal_end() ||
h.committed_to.segment_seq == NULL_SEG_SEQ ||
- h.committed_to.segment_seq < last_seq) {
+ (expected_seq != NULL_SEG_SEQ &&
+ h.committed_to.segment_seq != expected_seq)) {
return read_record_ret(
read_record_ertr::ready_future_marker{},
std::nullopt);