return deltas;
}
+std::optional<std::vector<extent_info_t>> Journal::try_decode_extent_infos(
+ record_header_t header,
+ bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ logger().debug("{}: decoding {} extents", __func__, header.extents);
+ std::vector<extent_info_t> extent_infos(header.extents);
+ for (auto &&i : extent_infos) {
+ try {
+ ::decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ return std::nullopt;
+ }
+ }
+ return extent_infos;
+}
+
Journal::replay_ertr::future<>
Journal::replay_segment(
journal_seq_t seq,
- delta_handler_t &delta_handler)
+ delta_handler_t &handler)
{
logger().debug("replay_segment: starting at {}", seq);
return seastar::do_with(
- paddr_t(seq.offset),
- [=, &delta_handler](paddr_t ¤t) {
+ delta_scan_handler_t(
+ [&handler, seq](auto addr, auto base, const auto &delta) {
+ return handler(
+ journal_seq_t{seq.segment_seq, addr},
+ base,
+ delta);
+ }),
+ [this, seq](auto &dhandler) {
+ return scan_segment(
+ seq.offset,
+ segment_manager.get_segment_size(),
+ &dhandler,
+ nullptr).safe_then([](auto){});
+ });
+}
+
+Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
+{
+ return seastar::do_with(
+ std::move(delta_handler), std::vector<journal_seq_t>(),
+ [this](auto &handler, auto &segments) mutable -> replay_ret {
+ return find_replay_segments().safe_then(
+ [this, &handler, &segments](auto replay_segs) mutable {
+ logger().debug("replay: found {} segments", replay_segs.size());
+ segments = std::move(replay_segs);
+ return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
+ return replay_segment(i, handler);
+ });
+ });
+ });
+}
+
+Journal::scan_segment_ret Journal::scan_segment(
+ paddr_t addr,
+ extent_len_t bytes_to_read,
+ delta_scan_handler_t *delta_handler,
+ extent_handler_t *extent_info_handler)
+{
+ logger().debug("Journal::scan_segment: starting at {}", addr);
+ return seastar::do_with(
+ addr,
+ [=](paddr_t ¤t) {
return crimson::do_until(
- [=, ¤t, &delta_handler]() -> replay_ertr::future<bool> {
+ [=, ¤t]() -> scan_segment_ertr::future<bool> {
return read_record_metadata(current).safe_then
- ([=, ¤t, &delta_handler](auto p)
- -> replay_ertr::future<bool> {
+ ([=, ¤t](auto p)
+ -> scan_segment_ertr::future<bool> {
if (!p.has_value()) {
- return replay_ertr::make_ready_future<bool>(true);
+ current = P_ADDR_NULL;
+ return scan_segment_ertr::make_ready_future<bool>(true);
}
auto &[header, bl] = *p;
logger().debug(
- "replay_segment: next record offset {} mdlength {} dlength {}",
+ "Journal::scan_segment: next record offset {} mdlength {} dlength {}",
current,
header.mdlength,
header.dlength);
auto record_start = current;
current.offset += header.mdlength + header.dlength;
- auto deltas = try_decode_deltas(
- header,
- bl);
- if (!deltas) {
- return replay_ertr::make_ready_future<bool>(true);
- }
-
return seastar::do_with(
- std::move(*deltas),
- [=, &delta_handler](auto &deltas) {
- return crimson::do_for_each(
- deltas,
- [=, &delta_handler](auto &info) {
- return delta_handler(
- journal_seq_t{
- seq.segment_seq,
- record_start},
- record_start.add_offset(block_size),
- info);
- });
- }).safe_then([] {
- return replay_ertr::make_ready_future<bool>(false);
+ header,
+ bl,
+ [=, ¤t](auto &header, auto &bl) {
+ return scan_segment_ertr::now(
+ ).safe_then(
+ [=, ¤t, &header, &bl]()
+ -> scan_segment_ertr::future<> {
+ if (!delta_handler) {
+ return scan_segment_ertr::now();
+ }
+
+ auto deltas = try_decode_deltas(
+ header,
+ bl);
+ if (!deltas) {
+ logger().error(
+ "Journal::scan_segment unable to decode deltas for record {}",
+ addr);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ return seastar::do_with(
+ std::move(*deltas),
+ [=](auto &deltas) {
+ return crimson::do_for_each(
+ deltas,
+ [=](auto &info) {
+ return (*delta_handler)(
+ record_start,
+ record_start.add_offset(header.mdlength),
+ info);
+ });
+ });
+ }).safe_then(
+ [=, &header, &bl]() -> scan_segment_ertr::future<> {
+ if (!extent_info_handler) {
+ return scan_segment_ertr::now();
+ }
+
+ auto infos = try_decode_extent_infos(
+ header,
+ bl);
+ if (!infos) {
+ logger().error(
+ "Journal::scan_segment unable to decode extent infos for record {}",
+ addr);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ return seastar::do_with(
+ segment_off_t(0),
+ std::move(*infos),
+ [=](auto &pos, auto &deltas) {
+ return crimson::do_for_each(
+ deltas,
+ [=, &pos](auto &info) {
+ auto addr = record_start
+ .add_offset(header.mdlength)
+ .add_offset(pos);
+ pos += info.len;
+ return (*extent_info_handler)(
+ addr,
+ info);
+ });
+ });
+ return scan_segment_ertr::now();
+ });
+ }).safe_then([=, ¤t] {
+ if ((segment_off_t)(addr.offset + bytes_to_read)
+ <= current.offset) {
+ return scan_segment_ertr::make_ready_future<bool>(true);
+ } else {
+ return scan_segment_ertr::make_ready_future<bool>(false);
+ }
});
});
+ }).safe_then([this, ¤t] {
+ return scan_segment_ertr::make_ready_future<paddr_t>(current);
});
});
}
-Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
-{
- return seastar::do_with(
- std::move(delta_handler), std::vector<journal_seq_t>(),
- [this](auto&& handler, auto&& segments) mutable -> replay_ret {
- return find_replay_segments().safe_then(
- [this, &handler, &segments](auto replay_segs) {
- logger().debug("replay: found {} segments", replay_segs.size());
- segments = std::move(replay_segs);
- return crimson::do_for_each(segments, [this, &handler](auto i) {
- return replay_segment(i, handler);
- });
- });
- });
-}
-
}
record_header_t header,
bufferlist &bl);
+ /// attempts to decode extent infos from bl, return nullopt if unsuccessful
+ std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
+ record_header_t header,
+ bufferlist &bl);
+
+ /**
+ * scan_segment
+ *
+ * Scans bytes_to_read forward from addr to the first record after
+ * addr+bytes_to_read invoking delta_handler and extent_info_handler
+ * on deltas and extent_infos respectively. deltas, extent_infos
+ * will only be decoded if the corresponding handler is included.
+ */
+ using scan_segment_ertr = SegmentManager::read_ertr;
+ using scan_segment_ret = scan_segment_ertr::future<paddr_t>;
+ using delta_scan_handler_t = std::function<
+ replay_ret(paddr_t record_start,
+ paddr_t record_block_base,
+ const delta_info_t&)>;
+ using extent_handler_t = std::function<
+ scan_segment_ertr::future<>(paddr_t addr,
+ const extent_info_t &info)>;
+ scan_segment_ret scan_segment(
+ paddr_t addr,
+ extent_len_t bytes_to_read,
+ delta_scan_handler_t *delta_handler,
+ extent_handler_t *extent_info_handler
+ );
+
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
- journal_seq_t start, ///< [in] starting addr, seq
- delta_handler_t &delta_handler ///< [in] processes deltas in order
+ journal_seq_t start, ///< [in] starting addr, seq
+ delta_handler_t &delta_handler ///< [in] processes deltas in order
);
+
};
}