From: Samuel Just Date: Fri, 4 Sep 2020 07:46:19 +0000 (-0700) Subject: crimson/os/seastore/journal: generalize replay scan X-Git-Tag: v16.1.0~807^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8981c67aad9f59420659a4d47569befeabf2e148;p=ceph.git crimson/os/seastore/journal: generalize replay scan We'll need to do at least two forms of scan: - deltas for replay - extents for gc Most of the mechanics are common, however, so this patch hoists the common machinery into scan_segemnt. Signed-off-by: Samuel Just --- diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index 1489919b6c3b..1b6578c93b47 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -339,28 +339,87 @@ std::optional> Journal::try_decode_deltas( return deltas; } +std::optional> Journal::try_decode_extent_infos( + record_header_t header, + bufferlist &bl) +{ + auto bliter = bl.cbegin(); + bliter += ceph::encoded_sizeof_bounded(); + logger().debug("{}: decoding {} extents", __func__, header.extents); + std::vector extent_infos(header.extents); + for (auto &&i : extent_infos) { + try { + ::decode(i, bliter); + } catch (ceph::buffer::error &e) { + return std::nullopt; + } + } + return extent_infos; +} + Journal::replay_ertr::future<> Journal::replay_segment( journal_seq_t seq, - delta_handler_t &delta_handler) + delta_handler_t &handler) { logger().debug("replay_segment: starting at {}", seq); return seastar::do_with( - paddr_t(seq.offset), - [=, &delta_handler](paddr_t ¤t) { + delta_scan_handler_t( + [&handler, seq](auto addr, auto base, const auto &delta) { + return handler( + journal_seq_t{seq.segment_seq, addr}, + base, + delta); + }), + [this, seq](auto &dhandler) { + return scan_segment( + seq.offset, + segment_manager.get_segment_size(), + &dhandler, + nullptr).safe_then([](auto){}); + }); +} + +Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler) +{ + return seastar::do_with( + std::move(delta_handler), std::vector(), + [this](auto &handler, auto &segments) mutable -> replay_ret { + return find_replay_segments().safe_then( + [this, &handler, &segments](auto replay_segs) mutable { + logger().debug("replay: found {} segments", replay_segs.size()); + segments = std::move(replay_segs); + return crimson::do_for_each(segments, [this, &handler](auto i) mutable { + return replay_segment(i, handler); + }); + }); + }); +} + +Journal::scan_segment_ret Journal::scan_segment( + paddr_t addr, + extent_len_t bytes_to_read, + delta_scan_handler_t *delta_handler, + extent_handler_t *extent_info_handler) +{ + logger().debug("Journal::scan_segment: starting at {}", addr); + return seastar::do_with( + addr, + [=](paddr_t ¤t) { return crimson::do_until( - [=, ¤t, &delta_handler]() -> replay_ertr::future { + [=, ¤t]() -> scan_segment_ertr::future { return read_record_metadata(current).safe_then - ([=, ¤t, &delta_handler](auto p) - -> replay_ertr::future { + ([=, ¤t](auto p) + -> scan_segment_ertr::future { if (!p.has_value()) { - return replay_ertr::make_ready_future(true); + current = P_ADDR_NULL; + return scan_segment_ertr::make_ready_future(true); } auto &[header, bl] = *p; logger().debug( - "replay_segment: next record offset {} mdlength {} dlength {}", + "Journal::scan_segment: next record offset {} mdlength {} dlength {}", current, header.mdlength, header.dlength); @@ -368,48 +427,87 @@ Journal::replay_segment( auto record_start = current; current.offset += header.mdlength + header.dlength; - auto deltas = try_decode_deltas( - header, - bl); - if (!deltas) { - return replay_ertr::make_ready_future(true); - } - return seastar::do_with( - std::move(*deltas), - [=, &delta_handler](auto &deltas) { - return crimson::do_for_each( - deltas, - [=, &delta_handler](auto &info) { - return delta_handler( - journal_seq_t{ - seq.segment_seq, - record_start}, - record_start.add_offset(block_size), - info); - }); - }).safe_then([] { - return replay_ertr::make_ready_future(false); + header, + bl, + [=, ¤t](auto &header, auto &bl) { + return scan_segment_ertr::now( + ).safe_then( + [=, ¤t, &header, &bl]() + -> scan_segment_ertr::future<> { + if (!delta_handler) { + return scan_segment_ertr::now(); + } + + auto deltas = try_decode_deltas( + header, + bl); + if (!deltas) { + logger().error( + "Journal::scan_segment unable to decode deltas for record {}", + addr); + return crimson::ct_error::input_output_error::make(); + } + + return seastar::do_with( + std::move(*deltas), + [=](auto &deltas) { + return crimson::do_for_each( + deltas, + [=](auto &info) { + return (*delta_handler)( + record_start, + record_start.add_offset(header.mdlength), + info); + }); + }); + }).safe_then( + [=, &header, &bl]() -> scan_segment_ertr::future<> { + if (!extent_info_handler) { + return scan_segment_ertr::now(); + } + + auto infos = try_decode_extent_infos( + header, + bl); + if (!infos) { + logger().error( + "Journal::scan_segment unable to decode extent infos for record {}", + addr); + return crimson::ct_error::input_output_error::make(); + } + + return seastar::do_with( + segment_off_t(0), + std::move(*infos), + [=](auto &pos, auto &deltas) { + return crimson::do_for_each( + deltas, + [=, &pos](auto &info) { + auto addr = record_start + .add_offset(header.mdlength) + .add_offset(pos); + pos += info.len; + return (*extent_info_handler)( + addr, + info); + }); + }); + return scan_segment_ertr::now(); + }); + }).safe_then([=, ¤t] { + if ((segment_off_t)(addr.offset + bytes_to_read) + <= current.offset) { + return scan_segment_ertr::make_ready_future(true); + } else { + return scan_segment_ertr::make_ready_future(false); + } }); }); + }).safe_then([this, ¤t] { + return scan_segment_ertr::make_ready_future(current); }); }); } -Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler) -{ - return seastar::do_with( - std::move(delta_handler), std::vector(), - [this](auto&& handler, auto&& segments) mutable -> replay_ret { - return find_replay_segments().safe_then( - [this, &handler, &segments](auto replay_segs) { - logger().debug("replay: found {} segments", replay_segs.size()); - segments = std::move(replay_segs); - return crimson::do_for_each(segments, [this, &handler](auto i) { - return replay_segment(i, handler); - }); - }); - }); -} - } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index f1bdaff3a2b0..bedfa3ab5638 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -273,12 +273,42 @@ private: record_header_t header, bufferlist &bl); + /// attempts to decode extent infos from bl, return nullopt if unsuccessful + std::optional> try_decode_extent_infos( + record_header_t header, + bufferlist &bl); + + /** + * scan_segment + * + * Scans bytes_to_read forward from addr to the first record after + * addr+bytes_to_read invoking delta_handler and extent_info_handler + * on deltas and extent_infos respectively. deltas, extent_infos + * will only be decoded if the corresponding handler is included. + */ + using scan_segment_ertr = SegmentManager::read_ertr; + using scan_segment_ret = scan_segment_ertr::future; + using delta_scan_handler_t = std::function< + replay_ret(paddr_t record_start, + paddr_t record_block_base, + const delta_info_t&)>; + using extent_handler_t = std::function< + scan_segment_ertr::future<>(paddr_t addr, + const extent_info_t &info)>; + scan_segment_ret scan_segment( + paddr_t addr, + extent_len_t bytes_to_read, + delta_scan_handler_t *delta_handler, + extent_handler_t *extent_info_handler + ); + /// replays records starting at start through end of segment replay_ertr::future<> replay_segment( - journal_seq_t start, ///< [in] starting addr, seq - delta_handler_t &delta_handler ///< [in] processes deltas in order + journal_seq_t start, ///< [in] starting addr, seq + delta_handler_t &delta_handler ///< [in] processes deltas in order ); + }; }