).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
auto segment_nonce = segment_header.segment_nonce;
return seastar::do_with(
- found_record_handler_t(
- [extents, this](
- paddr_t base,
- const record_header_t &header,
- const bufferlist &mdbuf) mutable {
+ found_record_handler_t([extents, this](
+ paddr_t base,
+ const record_header_t& header,
+ const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
+ {
+ auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
+ if (!maybe_record_extent_infos) {
+ // This should be impossible, we did check the crc on the mdbuf
+ logger().error(
+ "ExtentReader::scan_extents: unable to decode extents for record {}",
+ base);
+ return crimson::ct_error::input_output_error::make();
+ }
- auto infos = try_decode_extent_infos(
- header,
- mdbuf);
- if (!infos) {
- // This should be impossible, we did check the crc on the mdbuf
- logger().error(
- "ExtentReader::scan_extents unable to decode extents for record {}",
- base);
- assert(infos);
- }
-
- paddr_t extent_offset = base.add_offset(header.mdlength);
- for (const auto &i : *infos) {
- extents->emplace_back(extent_offset, i);
- auto& seg_addr = extent_offset.as_seg_paddr();
- seg_addr.set_segment_off(
- seg_addr.get_segment_off() + i.len);
- }
- return scan_extents_ertr::now();
- }),
+ paddr_t extent_offset = base.add_offset(header.mdlength);
+ logger().debug("ExtentReader::scan_extents: decoded {} extents",
+ maybe_record_extent_infos->size());
+ for (const auto &i : *maybe_record_extent_infos) {
+ extents->emplace_back(extent_offset, i);
+ auto& seg_addr = extent_offset.as_seg_paddr();
+ seg_addr.set_segment_off(
+ seg_addr.get_segment_off() + i.len);
+ }
+ return scan_extents_ertr::now();
+ }),
[=, &cursor](auto &dhandler) {
- return scan_valid_records(
- cursor,
- segment_nonce,
- bytes_to_read,
- dhandler).discard_result();
- });
+ return scan_valid_records(
+ cursor,
+ segment_nonce,
+ bytes_to_read,
+ dhandler
+ ).discard_result();
+ }
+ );
}).safe_then([ret=std::move(ret)] {
return std::move(*ret);
});
cursor.last_valid_header_found = true;
return scan_valid_records_ertr::now();
} else {
- auto new_committed_to = md->first.committed_to;
+ auto& [header, md_bl] = *md;
+ auto new_committed_to = header.committed_to;
logger().debug(
"ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
cursor.seq,
cursor.last_committed = new_committed_to;
cursor.pending_records.emplace_back(
cursor.seq.offset,
- md->first,
- md->second);
- cursor.increment(md->first.dlength + md->first.mdlength);
+ header,
+ std::move(md_bl));
+ cursor.increment(header.dlength + header.mdlength);
ceph_assert(new_committed_to == journal_seq_t() ||
new_committed_to < cursor.seq);
return scan_valid_records_ertr::now();
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
- budget_used +=
- next.header.dlength + next.header.mdlength;
- return handler(
- next.offset,
- next.header,
- next.mdbuffer
- ).safe_then([&cursor] {
- cursor.pending_records.pop_front();
+ return consume_next_records(cursor, handler, budget_used
+ ).safe_then([] {
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
assert(!cursor.pending_records.empty());
auto &next = cursor.pending_records.front();
return read_validate_data(next.offset, next.header
- ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+ ).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
if (!valid) {
cursor.pending_records.clear();
return scan_valid_records_ertr::now();
}
- budget_used +=
- next.header.dlength + next.header.mdlength;
- return handler(
- next.offset,
- next.header,
- next.mdbuffer
- ).safe_then([&cursor] {
- cursor.pending_records.pop_front();
- return scan_valid_records_ertr::now();
- });
+ return consume_next_records(cursor, handler, budget_used);
});
}
}().safe_then([=, &budget_used, &cursor] {
logger().debug("read_validate_record_metadata: reading header block {}...",
start);
return segment_manager.read(start, block_size
- ).safe_then(
- [=, &segment_manager](bufferptr bptr) mutable
- -> read_validate_record_metadata_ret {
- auto block_size = segment_manager.get_block_size();
- bufferlist bl;
- bl.append(bptr);
- auto bp = bl.cbegin();
- record_header_t header;
- try {
- decode(header, bp);
- } catch (ceph::buffer::error &e) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- if (header.segment_nonce != nonce) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- auto& seg_addr = start.as_seg_paddr();
- if (header.mdlength > (extent_len_t)block_size) {
- if (seg_addr.get_segment_off() + header.mdlength >
- (int64_t)segment_manager.get_segment_size()) {
- return crimson::ct_error::input_output_error::make();
- }
- return segment_manager.read(
- paddr_t::make_seg_paddr(seg_addr.get_segment_id(),
- seg_addr.get_segment_off() + (segment_off_t)block_size),
- header.mdlength - block_size).safe_then(
- [header=std::move(header), bl=std::move(bl)](
- auto &&bptail) mutable {
- bl.push_back(bptail);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl)));
- });
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl))
- );
- }
- }).safe_then([=](auto p) {
- if (p && validate_metadata(p->second)) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::move(*p)
- );
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
+ ).safe_then([=, &segment_manager](bufferptr bptr) mutable
+ -> read_validate_record_metadata_ret {
+ auto block_size = static_cast<extent_len_t>(
+ segment_manager.get_block_size());
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ record_header_t header;
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.segment_nonce != nonce) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ auto& seg_addr = start.as_seg_paddr();
+ if (seg_addr.get_segment_off() + header.mdlength >
+ (int64_t)segment_manager.get_segment_size()) {
+ logger().error("read_validate_record_metadata: failed, invalid header");
+ return crimson::ct_error::input_output_error::make();
+ }
+ if (header.mdlength == block_size) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl))
+ );
+ }
+ return segment_manager.read(
+ paddr_t::make_seg_paddr(
+ seg_addr.get_segment_id(),
+ seg_addr.get_segment_off() + (segment_off_t)block_size
+ ),
+ header.mdlength - block_size
+ ).safe_then([header=std::move(header), bl=std::move(bl)
+ ](auto&& bptail) mutable {
+ bl.push_back(bptail);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl)));
});
+ }).safe_then([this](auto p) {
+ if (p && validate_metadata(p->second)) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::move(*p)
+ );
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ });
}
std::optional<std::vector<extent_info_t>>
return test_crc == recorded_crc;
}
+ExtentReader::consume_record_group_ertr::future<>
+ExtentReader::consume_next_records(
+ scan_valid_records_cursor& cursor,
+ found_record_handler_t& handler,
+ std::size_t& budget_used)
+{
+ auto& next = cursor.pending_records.front();
+ auto total_length = next.header.dlength + next.header.mdlength;
+ budget_used += total_length;
+ return handler(
+ next.offset,
+ next.header,
+ next.mdbuffer
+ ).safe_then([&cursor] {
+ cursor.pending_records.pop_front();
+ });
+}
+
} // namespace crimson::os::seastore
} else {
replay_from = paddr_t::make_seg_paddr(
from->first,
- (segment_off_t)journal_segment_manager.get_block_size());
+ journal_segment_manager.get_block_size());
}
auto ret = replay_segments_t(segments.end() - from);
std::transform(
p.second.journal_segment_seq,
paddr_t::make_seg_paddr(
p.first,
- (segment_off_t)journal_segment_manager.get_block_size())};
+ journal_segment_manager.get_block_size())
+ };
logger().debug(
"Journal::prep_replay_segments: replaying from {}",
ret);
logger().debug("Journal::replay_segment: starting at {}", seq);
return seastar::do_with(
scan_valid_records_cursor(seq),
- ExtentReader::found_record_handler_t(
- [=, &handler](paddr_t base,
- const record_header_t &header,
- const bufferlist &mdbuf) {
- auto deltas = try_decode_deltas(
- header,
- mdbuf);
- if (!deltas) {
- // This should be impossible, we did check the crc on the mdbuf
- logger().error(
- "Journal::replay_segment: unable to decode deltas for record {}",
- base);
- assert(deltas);
- }
+ ExtentReader::found_record_handler_t([=, &handler](
+ paddr_t base,
+ const record_header_t& header,
+ const bufferlist& mdbuf)
+ -> ExtentReader::scan_valid_records_ertr::future<>
+ {
+ auto maybe_record_deltas_list = try_decode_deltas(
+ header, mdbuf);
+ if (!maybe_record_deltas_list) {
+ // This should be impossible, we did check the crc on the mdbuf
+ logger().error(
+ "Journal::replay_segment: unable to decode deltas for record {}",
+ base);
+ return crimson::ct_error::input_output_error::make();
+ }
- return seastar::do_with(
- std::move(*deltas),
- [=](auto &deltas) {
- return crimson::do_for_each(
- deltas,
- [=](auto &delta) {
- /* The journal may validly contain deltas for extents in
- * since released segments. We can detect those cases by
- * checking whether the segment in question currently has a
- * sequence number > the current journal segment seq. We can
- * safetly skip these deltas because the extent must already
- * have been rewritten.
- *
- * Note, this comparison exploits the fact that
- * SEGMENT_SEQ_NULL is a large number.
- */
- auto& seg_addr = delta.paddr.as_seg_paddr();
- if (delta.paddr != P_ADDR_NULL &&
- (segment_provider->get_seq(seg_addr.get_segment_id()) >
- seq.segment_seq)) {
- return replay_ertr::now();
- } else {
- auto offsets = submit_result_t{
- base.add_offset(header.mdlength),
- write_result_t{
- journal_seq_t{seq.segment_seq, base},
- static_cast<segment_off_t>(header.mdlength + header.dlength)
- }
- };
- return handler(
- offsets,
- delta);
- }
- });
- });
- }),
+ return seastar::do_with(
+ std::move(*maybe_record_deltas_list),
+ [=](auto &deltas)
+ {
+ logger().debug("Journal::replay_segment: decoded {} deltas at base {}",
+ deltas.size(),
+ base);
+ return crimson::do_for_each(
+ deltas,
+ [=](auto &delta)
+ {
+ /* The journal may validly contain deltas for extents in
+ * since released segments. We can detect those cases by
+ * checking whether the segment in question currently has a
+ * sequence number > the current journal segment seq. We can
+ * safetly skip these deltas because the extent must already
+ * have been rewritten.
+ *
+ * Note, this comparison exploits the fact that
+ * SEGMENT_SEQ_NULL is a large number.
+ */
+ auto& seg_addr = delta.paddr.as_seg_paddr();
+ if (delta.paddr != P_ADDR_NULL &&
+ (segment_provider->get_seq(seg_addr.get_segment_id()) >
+ seq.segment_seq)) {
+ return replay_ertr::now();
+ } else {
+ auto offsets = submit_result_t{
+ base.add_offset(header.mdlength),
+ write_result_t{
+ journal_seq_t{seq.segment_seq, base},
+ static_cast<segment_off_t>(header.mdlength + header.dlength)
+ }
+ };
+ return handler(offsets, delta);
+ }
+ });
+ });
+ }),
[=](auto &cursor, auto &dhandler) {
return scanner.scan_valid_records(
cursor,
crimson::ct_error::assert_all{
"shouldn't meet with any other error other replay_ertr"
}
- );;
- });
+ );
+ }
+ );
}
Journal::replay_ret Journal::replay(