});
}
-CircularBoundedJournal::read_record_ret
-CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl)
-{
- LOG_PREFIX(CircularBoundedJournal::return_record);
- DEBUG("record size {}", bl.length());
- assert(bl.length() == header.mdlength + header.dlength);
- bufferlist md_bl, data_bl;
- md_bl.substr_of(bl, 0, header.mdlength);
- data_bl.substr_of(bl, header.mdlength, header.dlength);
- if (validate_records_metadata(md_bl) &&
- validate_records_data(header, data_bl)) {
- return read_record_ret(
- read_record_ertr::ready_future_marker{},
- std::make_pair(header, std::move(bl)));
- } else {
- DEBUG("invalid matadata");
- return read_record_ret(
- read_record_ertr::ready_future_marker{},
- std::nullopt);
- }
-}
-
-CircularBoundedJournal::read_record_ret
-CircularBoundedJournal::read_record(paddr_t off, segment_nonce_t magic)
-{
- LOG_PREFIX(CircularBoundedJournal::read_record);
- rbm_abs_addr addr = convert_paddr_to_abs_addr(off);
- auto read_length = get_block_size();
- assert(addr + read_length <= get_journal_end());
- DEBUG("reading record from abs addr {} read length {}", addr, read_length);
- auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
- return cjs.read(addr, bptr
- ).safe_then([this, addr, bptr, magic, FNAME]() mutable
- -> read_record_ret {
- record_group_header_t h;
- bufferlist bl;
- bl.append(bptr);
- auto bp = bl.cbegin();
- try {
- decode(h, bp);
- } catch (ceph::buffer::error &e) {
- return read_record_ret(
- read_record_ertr::ready_future_marker{},
- std::nullopt);
- }
- if (h.mdlength < get_block_size() ||
- h.mdlength % get_block_size() != 0 ||
- h.dlength % get_block_size() != 0 ||
- addr + h.mdlength + h.dlength > get_journal_end() ||
- h.segment_nonce != magic) {
- return read_record_ret(
- read_record_ertr::ready_future_marker{},
- std::nullopt);
- }
- auto record_size = h.mdlength + h.dlength;
- if (record_size > get_block_size()) {
- auto next_addr = addr + get_block_size();
- auto next_length = record_size - get_block_size();
- auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length));
- DEBUG("reading record part 2 from abs addr {} read length {}",
- next_addr, next_length);
- return cjs.read(next_addr, next_bptr
- ).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable {
- bl.append(next_bptr);
- return return_record(h, bl);
- });
- } else {
- assert(record_size == get_block_size());
- return return_record(h, bl);
- }
- });
-}
-
seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) {
if (is_trim_transaction(type)) {
return update_journal_tail(
return cjs.get_alloc_tail();
}
- using read_ertr = crimson::errorator<
- crimson::ct_error::input_output_error,
- crimson::ct_error::invarg,
- crimson::ct_error::enoent,
- crimson::ct_error::erange>;
- using read_record_ertr = read_ertr;
- using read_record_ret = read_record_ertr::future<
- std::optional<std::pair<record_group_header_t, bufferlist>>
- >;
- /*
- * read_record
- *
- * read record from given address
- *
- * @param paddr_t to read
- * @param expected_seq
- *
- */
- read_record_ret read_record(paddr_t offset, segment_nonce_t magic);
-
- read_record_ret return_record(record_group_header_t& header, bufferlist bl);
-
void set_write_pipeline(WritePipeline *_write_pipeline) final {
write_pipeline = _write_pipeline;
}
return cjs;
}
+ read_validate_record_metadata_ret test_read_validate_record_metadata(
+ scan_valid_records_cursor &cursor,
+ segment_nonce_t nonce)
+ {
+ return read_validate_record_metadata(cursor, nonce);
+ }
+
+ void test_initialize_cursor(scan_valid_records_cursor &cursor)
+ {
+ initialize_cursor(cursor);
+ }
+
private:
JournalTrimmer &trimmer;
std::string path;
struct entry_validator_t {
bufferlist bl;
int entries;
- journal_seq_t last_seq;
record_t record;
- rbm_abs_addr addr = 0;
segment_nonce_t magic = 0;
+ journal_seq_t seq;
template <typename... T>
entry_validator_t(T&&... entry) : record(std::forward<T>(entry)...) {}
++iter_delta;
}
}
-
void validate(CircularBoundedJournal &cbj) {
rbm_abs_addr offset = 0;
+ auto cursor = scan_valid_records_cursor(seq);
+ cbj.test_initialize_cursor(cursor);
for (int i = 0; i < entries; i++) {
- paddr_t paddr = convert_abs_addr_to_paddr(
- addr + offset,
- cbj.get_device_id());
- auto [header, buf] = *(cbj.read_record(paddr, magic).unsafe_get0());
- auto record = decode_record(buf);
+ paddr_t paddr = seq.offset.add_offset(offset);
+ cursor.seq.offset = paddr;
+ auto md = cbj.test_read_validate_record_metadata(
+ cursor, magic).unsafe_get0();
+ assert(md);
+ auto& [header, md_bl] = *md;
+ auto dbuf = cbj.read(
+ paddr.add_offset(header.mdlength),
+ header.dlength).unsafe_get0();
+
+ bufferlist bl;
+ bl.append(md_bl);
+ bl.append(dbuf);
+ auto record = decode_record(bl);
validate(*record);
offset += header.mdlength + header.dlength;
+ cursor.last_committed = header.committed_to;
}
}
+ rbm_abs_addr get_abs_addr() {
+ return convert_paddr_to_abs_addr(seq.offset);
+ }
+
bool validate_delta(bufferlist bl) {
for (auto &&block : record.deltas) {
if (bl.begin().crc32c(bl.length(), 1) ==
auto [addr, w_result] = cbj->submit_record(
std::move(record),
handle).unsafe_get0();
- entries.back().addr =
- convert_paddr_to_abs_addr(w_result.start_seq.offset);
+ entries.back().seq = w_result.start_seq;
entries.back().entries = 1;
entries.back().magic = cbj->get_cjs().get_cbj_header().magic;
- logger().debug("submit entry to addr {}", entries.back().addr);
- return entries.back().addr;
+ logger().debug("submit entry to addr {}", entries.back().seq);
+ return convert_paddr_to_abs_addr(entries.back().seq.offset);
}
seastar::future<> tear_down_fut() final {
for (auto &i : entries) {
paddr_t base = offsets.write_result.start_seq.offset;
rbm_abs_addr addr = convert_paddr_to_abs_addr(base);
- if (addr == i.addr) {
- logger().debug(" compare addr: {} and i.addr {} ", base, i.addr);
+ if (addr == i.get_abs_addr()) {
+ logger().debug(" compare addr: {} and i.addr {} ", base, i.get_abs_addr());
found = i.validate_delta(e.bl);
break;
}
});
}
- update_journal_tail(entries.back().addr, record_total_size);
+ update_journal_tail(entries.back().get_abs_addr(), record_total_size);
ASSERT_EQ(get_records_total_size(),
get_records_available_size());
uint64_t avail = get_records_available_size();
// forward 2 recod size here because 1 block is reserved between head and tail
- update_journal_tail(entries.front().addr, record_total_size * 2);
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size * 2);
entries.erase(entries.begin());
entries.erase(entries.begin());
ASSERT_EQ(avail + (record_total_size * 2), get_records_available_size());
auto record_total_size = r_size.get_encoded_length();
submit_record(std::move(rec));
- update_journal_tail(entries.front().addr, record_total_size);
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size);
cbj->get_cjs().write_header().unsafe_get0();
auto [update_header, update_buf2] = *(cbj->get_cjs().read_header().unsafe_get0());
cbj->close().unsafe_get0();
}
// will be appended at the begining of WAL
uint64_t avail = get_records_available_size();
- update_journal_tail(entries.front().addr, record_total_size * 2);
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size * 2);
entries.erase(entries.begin());
entries.erase(entries.begin());
ASSERT_EQ(avail + (record_total_size * 2), get_records_available_size());
{ generate_delta(20), generate_delta(21) }
});
}
- update_journal_tail(entries.front().addr, record_total_size * 8);
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size * 8);
for (int i = 0; i < 8; i++) {
entries.erase(entries.begin());
}