});
}
-RecordScanner::read_validate_record_metadata_ret
-CircularBoundedJournal::read_validate_record_metadata(
- scan_valid_records_cursor &cursor,
- segment_nonce_t nonce)
-{
- LOG_PREFIX(CircularBoundedJournal::read_validate_record_metadata);
- paddr_t start = cursor.seq.offset;
- return read_record(start, nonce
- ).safe_then([FNAME, &cursor, this](auto ret) {
- if (!ret.has_value()) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- auto [r_header, bl] = *ret;
- auto print_invalid = [FNAME](auto &r_header) {
- DEBUG("invalid header: {}", r_header);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- };
- if (cursor.seq.offset == convert_abs_addr_to_paddr(
- get_records_start(), get_device_id())) {
- if ((r_header.committed_to.segment_seq == NULL_SEG_SEQ &&
- cursor.seq.segment_seq != 0) ||
- r_header.committed_to.segment_seq != cursor.seq.segment_seq - 1) {
- return print_invalid(r_header);
- }
- } else if (r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
- return print_invalid(r_header);
- }
-
- bufferlist mdbuf;
- mdbuf.substr_of(bl, 0, r_header.mdlength);
- DEBUG("header: {}", r_header);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(r_header), std::move(mdbuf)));
- });
-}
-
-RecordScanner::read_validate_data_ret CircularBoundedJournal::read_validate_data(
- paddr_t record_base,
- const record_group_header_t &header)
-{
- return read_record(record_base, header.segment_nonce
- ).safe_then([](auto ret) {
- // read_record would return non-empty value if the record is valid
- if (!ret.has_value()) {
- return read_validate_data_ret(
- read_validate_data_ertr::ready_future_marker{},
- false);
- }
- return read_validate_data_ertr::make_ready_future<bool>(true);
- });
-}
-
Journal::replay_ret CircularBoundedJournal::replay_segment(
cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor)
{
});
}
+RecordScanner::read_ret CircularBoundedJournal::read(paddr_t start, size_t len)
+{
+ LOG_PREFIX(CircularBoundedJournal::read);
+ rbm_abs_addr addr = convert_paddr_to_abs_addr(start);
+ DEBUG("reading data from addr {} read length {}", addr, len);
+ auto bptr = bufferptr(ceph::buffer::create_page_aligned(len));
+ return cjs.read(addr, bptr
+ ).safe_then([bptr=std::move(bptr)]() {
+ return read_ret(
+ RecordScanner::read_ertr::ready_future_marker{},
+ std::move(bptr)
+ );
+ });
+}
+
+bool CircularBoundedJournal::is_record_segment_seq_invalid(
+ scan_valid_records_cursor &cursor,
+ record_group_header_t &r_header)
+{
+ LOG_PREFIX(CircularBoundedJournal::is_record_segment_seq_invalid);
+ auto print_invalid = [FNAME](auto &r_header) {
+ DEBUG("invalid header: {}", r_header);
+ return true;
+ };
+ if (cursor.seq.offset == convert_abs_addr_to_paddr(
+ get_records_start(), get_device_id())) {
+ if ((r_header.committed_to.segment_seq == NULL_SEG_SEQ &&
+ cursor.seq.segment_seq != 0) ||
+ r_header.committed_to.segment_seq != cursor.seq.segment_seq - 1) {
+ return print_invalid(r_header);
+ }
+ } else if (r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
+ return print_invalid(r_header);
+ }
+ return false;
+}
Journal::replay_ret CircularBoundedJournal::replay(
delta_handler_t &&delta_handler)
cursor.seq.segment_seq += 1;
}
- void initialize_cursor(scan_valid_records_cursor& cursor) final {};
+ void initialize_cursor(scan_valid_records_cursor& cursor) final {
+ cursor.block_size = get_block_size();
+ };
Journal::replay_ret replay_segment(
cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor);
- read_validate_record_metadata_ret read_validate_record_metadata(
- scan_valid_records_cursor &cursor,
- segment_nonce_t nonce) final;
+ read_ret read(paddr_t start, size_t len) final;
- read_validate_data_ret read_validate_data(
- paddr_t record_base,
- const record_group_header_t &header) final;
+ bool is_record_segment_seq_invalid(scan_valid_records_cursor &cursor,
+ record_group_header_t &h) final;
+
+ int64_t get_segment_end_offset(paddr_t addr) final {
+ return get_journal_end();
+ }
// Test interfaces
});
}
+RecordScanner::read_validate_record_metadata_ret
+RecordScanner::read_validate_record_metadata(
+ scan_valid_records_cursor &cursor,
+ segment_nonce_t nonce)
+{
+ LOG_PREFIX(RecordScanner::read_validate_record_metadata);
+ paddr_t start = cursor.seq.offset;
+ auto block_size = cursor.get_block_size();
+ if (get_segment_off(cursor.seq.offset) + block_size > get_segment_end_offset(cursor.seq.offset)) {
+ DEBUG("failed -- record group header block {}~4096 > segment_size {}",
+ start, get_segment_end_offset(cursor.seq.offset));
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ TRACE("reading record group header block {}~4096", start);
+ return read(start, block_size
+ ).safe_then([=](bufferptr bptr) mutable
+ -> read_validate_record_metadata_ret {
+ bufferlist bl;
+ bl.append(bptr);
+ auto maybe_header = try_decode_records_header(bl, nonce);
+ if (!maybe_header.has_value()) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+
+ auto& header = *maybe_header;
+ if (header.mdlength < block_size ||
+ header.mdlength % block_size != 0 ||
+ header.dlength % block_size != 0 ||
+ (header.committed_to != JOURNAL_SEQ_NULL &&
+ get_segment_off(header.committed_to.offset) %
+ cursor.get_block_size() != 0) ||
+ (get_segment_off(cursor.seq.offset) + header.mdlength + header.dlength >
+ get_segment_end_offset(cursor.seq.offset))) {
+ ERROR("failed, invalid record group header {}", header);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ if (is_record_segment_seq_invalid(cursor, header)) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+
+ if (header.mdlength == block_size) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl))
+ );
+ }
+
+ paddr_t rest_start = cursor.seq.offset.add_offset(block_size);
+ auto rest_len = header.mdlength - block_size;
+ TRACE("reading record group header rest {}~{}", rest_start, rest_len);
+ return read(rest_start, rest_len
+ ).safe_then([header=std::move(header), bl=std::move(bl)
+ ](auto&& bptail) mutable {
+ bl.push_back(bptail);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl)));
+ });
+ }).safe_then([](auto p) {
+ if (p && validate_records_metadata(p->second)) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::move(*p)
+ );
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ });
+
+}
+
+RecordScanner::read_validate_data_ret RecordScanner::read_validate_data(
+ paddr_t record_base,
+ const record_group_header_t &header)
+{
+ LOG_PREFIX(RecordScanner::read_validate_data);
+ auto data_addr = record_base.add_offset(header.mdlength);
+ TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
+ return read(
+ data_addr,
+ header.dlength
+ ).safe_then([=, &header](auto bptr) {
+ bufferlist bl;
+ bl.append(bptr);
+ return validate_records_data(header, bl);
+ });
+}
+
RecordScanner::consume_record_group_ertr::future<>
RecordScanner::consume_next_records(
scan_valid_records_cursor& cursor,
found_record_handler_t &handler ///< [in] handler for records
); ///< @return used budget
+ device_off_t get_segment_off(paddr_t addr) const {
+ if (addr.get_addr_type() == paddr_types_t::SEGMENT) {
+ auto& seg_addr = addr.as_seg_paddr();
+ return seg_addr.get_segment_off();
+ }
+ assert(addr.get_addr_type() == paddr_types_t::RANDOM_BLOCK);
+ auto& blk_addr = addr.as_blk_paddr();
+ return blk_addr.get_device_off();
+ }
+
protected:
/// read record metadata for record starting at start
using read_validate_record_metadata_ertr = read_ertr;
read_validate_record_metadata_ertr::future<
std::optional<std::pair<record_group_header_t, bufferlist>>
>;
- virtual read_validate_record_metadata_ret read_validate_record_metadata(
+ read_validate_record_metadata_ret read_validate_record_metadata(
scan_valid_records_cursor &cursor,
- segment_nonce_t nonce) = 0;
+ segment_nonce_t nonce);
/// read and validate data
using read_validate_data_ertr = read_ertr;
using read_validate_data_ret = read_validate_data_ertr::future<bool>;
- virtual read_validate_data_ret read_validate_data(
+ read_validate_data_ret read_validate_data(
paddr_t record_base,
const record_group_header_t &header ///< caller must ensure lifetime through
/// future resolution
- ) = 0;
+ );
+
+ virtual bool is_record_segment_seq_invalid(scan_valid_records_cursor &cursor,
+ record_group_header_t &h) = 0;
+
+ virtual int64_t get_segment_end_offset(paddr_t addr) = 0;
+
+ using read_ret = read_ertr::future<bufferptr>;
+ virtual read_ret read(paddr_t start, size_t len) = 0;
using consume_record_group_ertr = scan_valid_records_ertr;
consume_record_group_ertr::future<> consume_next_records(
journal_seq_t seq;
journal_seq_t last_committed;
std::size_t num_consumed_records = 0;
+ extent_len_t block_size = 0;
struct found_record_group_t {
paddr_t offset;
return seq.offset.as_seg_paddr().get_segment_off();
}
+ extent_len_t get_block_size() const {
+ return block_size;
+ }
+
void increment_seq(segment_off_t off) {
seq.offset = seq.offset.add_offset(off);
}
INFO("start to scan segment {}", cursor.get_segment_id());
cursor.increment_seq(segment_manager.get_block_size());
}
+ cursor.block_size = segment_manager.get_block_size();
}
-SegmentManagerGroup::read_validate_record_metadata_ret
-SegmentManagerGroup::read_validate_record_metadata(
- scan_valid_records_cursor &cursor,
- segment_nonce_t nonce)
+SegmentManagerGroup::read_ret
+SegmentManagerGroup::read(paddr_t start, size_t len)
{
- LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata);
- paddr_t start = cursor.seq.offset;
- auto& seg_addr = start.as_seg_paddr();
- assert(has_device(seg_addr.get_segment_id().device_id()));
- auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
- auto block_size = segment_manager.get_block_size();
- auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
- if (seg_addr.get_segment_off() + block_size > segment_size) {
- DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- TRACE("reading record group header block {}~4096", start);
- return segment_manager.read(start, block_size
- ).safe_then([=, &segment_manager](bufferptr bptr) mutable
- -> read_validate_record_metadata_ret {
- auto block_size = segment_manager.get_block_size();
- bufferlist bl;
- bl.append(bptr);
- auto maybe_header = try_decode_records_header(bl, nonce);
- if (!maybe_header.has_value()) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- auto& seg_addr = start.as_seg_paddr();
- auto& header = *maybe_header;
- if (header.mdlength < block_size ||
- header.mdlength % block_size != 0 ||
- header.dlength % block_size != 0 ||
- (header.committed_to != JOURNAL_SEQ_NULL &&
- header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
- (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
- ERROR("failed, invalid record group header {}", start);
- return crimson::ct_error::input_output_error::make();
- }
- if (header.mdlength == block_size) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl))
- );
- }
-
- auto rest_start = paddr_t::make_seg_paddr(
- seg_addr.get_segment_id(),
- seg_addr.get_segment_off() + block_size
- );
- auto rest_len = header.mdlength - block_size;
- TRACE("reading record group header rest {}~{}", rest_start, rest_len);
- return segment_manager.read(rest_start, rest_len
- ).safe_then([header=std::move(header), bl=std::move(bl)
- ](auto&& bptail) mutable {
- bl.push_back(bptail);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl)));
- });
- }).safe_then([](auto p) {
- if (p && validate_records_metadata(p->second)) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::move(*p)
- );
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- });
-}
-
-SegmentManagerGroup::read_validate_data_ret
-SegmentManagerGroup::read_validate_data(
- paddr_t record_base,
- const record_group_header_t &header)
-{
- LOG_PREFIX(SegmentManagerGroup::read_validate_data);
- assert(has_device(record_base.get_device_id()));
- auto& segment_manager = *segment_managers[record_base.get_device_id()];
- auto data_addr = record_base.add_offset(header.mdlength);
- TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
+ LOG_PREFIX(SegmentManagerGroup::read);
+ assert(has_device(start.get_device_id()));
+ auto& segment_manager = *segment_managers[start.get_device_id()];
+ TRACE("reading data {}~{}", start, len);
return segment_manager.read(
- data_addr,
- header.dlength
- ).safe_then([=, &header](auto bptr) {
- bufferlist bl;
- bl.append(bptr);
- return validate_records_data(header, bl);
+ start,
+ len
+ ).safe_then([](auto bptr) {
+ return read_ret(
+ read_ertr::ready_future_marker{},
+ std::move(bptr)
+ );
});
}
void initialize_cursor(scan_valid_records_cursor &cursor) final;
- read_validate_record_metadata_ret read_validate_record_metadata(
- scan_valid_records_cursor &cursor,
- segment_nonce_t nonce) final;
-
- read_validate_data_ret read_validate_data(
- paddr_t record_base,
- const record_group_header_t &header ///< caller must ensure lifetime through
- /// future resolution
- ) final;
+ read_ret read(paddr_t start, size_t len) final;
+
+ bool is_record_segment_seq_invalid(scan_valid_records_cursor &cursor,
+ record_group_header_t &header) final {
+ return false;
+ }
+
+ int64_t get_segment_end_offset(paddr_t addr) final {
+ auto& seg_addr = addr.as_seg_paddr();
+ auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
+ return static_cast<int64_t>(segment_manager.get_segment_size());
+ }
std::vector<SegmentManager*> segment_managers;
std::set<device_id_t> device_ids;