{
logger().debug("replay_segment: starting at {}", seq);
return seastar::do_with(
- delta_scan_handler_t(
- [=, &handler](auto addr, auto base, const auto &delta) {
- /* The journal may validly contain deltas for extents in since released
- * segments. We can detect those cases by whether the segment in question
- * currently has a sequence number > the current journal segment seq.
- * We can safely skip these deltas because the extent must already have
- * been rewritten.
- *
- * Note, this comparison exploits the fact that SEGMENT_SEQ_NULL is
- * a large number.
- */
- if (delta.paddr != P_ADDR_NULL &&
- segment_provider->get_seq(delta.paddr.segment) > seq.segment_seq) {
- return replay_ertr::now();
- } else {
- return handler(
- journal_seq_t{seq.segment_seq, addr},
- base,
- delta);
+ scan_valid_records_cursor(seq.offset),
+ found_record_handler_t(
+ [=, &handler](paddr_t base,
+ const record_header_t &header,
+ const bufferlist &mdbuf) {
+ auto deltas = try_decode_deltas(
+ header,
+ mdbuf);
+ if (!deltas) {
+ // This should be impossible, we did check the crc on the mdbuf
+ logger().error(
+ "Journal::replay_segment unable to decode deltas for record {}",
+ base);
+ assert(deltas);
}
+
+ return seastar::do_with(
+ std::move(*deltas),
+ [=](auto &deltas) {
+ return crimson::do_for_each(
+ deltas,
+ [=](auto &delta) {
+ /* The journal may validly contain deltas for extents in
+ * since released segments. We can detect those cases by
+ * whether the segment in question currently has a sequence
+ * number > the current journal segment seq. We can safely
+ * skip these deltas because the extent must already have
+ * been rewritten.
+ *
+ * Note, this comparison exploits the fact that
+ * SEGMENT_SEQ_NULL is a large number.
+ */
+ if (delta.paddr != P_ADDR_NULL &&
+ (segment_provider->get_seq(delta.paddr.segment) >
+ seq.segment_seq)) {
+ return replay_ertr::now();
+ } else {
+ return handler(
+ journal_seq_t{seq.segment_seq, base},
+ base.add_offset(header.mdlength),
+ delta);
+ }
+ });
+ });
}),
- [=](auto &dhandler) {
- return scan_segment(
- seq.offset,
- segment_manager.get_segment_size(),
+ [=](auto &cursor, auto &dhandler) {
+ return scan_valid_records(
+ cursor,
header.segment_nonce,
- &dhandler,
- nullptr).safe_then([](auto){});
+ std::numeric_limits<size_t>::max(),
+ dhandler).safe_then([](auto){});
});
}