From: Xuehan Xu Date: Mon, 19 Jul 2021 07:38:12 +0000 (+0800) Subject: crimson/os/seastore: add Scanner to scan extents X-Git-Tag: v17.1.0~919^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c239b5d1afe587aa8a8a13e9a7fe8b0ba986a686;p=ceph.git crimson/os/seastore: add Scanner to scan extents As there will be two kinds of segments to be scanned, those created by the journal and those created by the extent placement manager. We need a common module to scan extents of both of these two kinds of segments Signed-off-by: Xuehan Xu --- diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index cdd88ea28a9..146880a6599 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(crimson-seastore STATIC transaction.cc journal.cc cache.cc + scanner.cc lba_manager.cc segment_cleaner.cc lba_manager/btree/btree_lba_manager.cc diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index f4f4e09f7ee..e8cface6903 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -50,8 +50,8 @@ segment_nonce_t generate_nonce( sizeof(meta.seastore_id.uuid)); } -Journal::Journal(SegmentManager &segment_manager) - : segment_manager(segment_manager) {} +Journal::Journal(SegmentManager &segment_manager, Scanner& scanner) + : segment_manager(segment_manager), scanner(scanner) {} Journal::initialize_segment_ertr::future @@ -99,35 +99,6 @@ Journal::initialize_segment(Segment &segment) }); } -bool Journal::validate_metadata(const bufferlist &bl) -{ - auto bliter = bl.cbegin(); - auto test_crc = bliter.crc32c( - ceph::encoded_sizeof_bounded(), - -1); - ceph_le32 recorded_crc_le; - decode(recorded_crc_le, bliter); - uint32_t recorded_crc = recorded_crc_le; - test_crc = bliter.crc32c( - bliter.get_remaining(), - test_crc); - return test_crc == recorded_crc; -} - -Journal::read_validate_data_ret Journal::read_validate_data( - paddr_t record_base, - const record_header_t &header) -{ - return segment_manager.read( - record_base.add_offset(header.mdlength), - header.dlength - ).safe_then([=, &header](auto bptr) { - bufferlist bl; - bl.append(bptr); - return bl.crc32c(-1) == header.data_crc; - }); -} - Journal::write_record_ret Journal::write_record( record_size_t rsize, record_t &&record, @@ -215,49 +186,6 @@ Journal::roll_journal_segment() ); } -Journal::read_segment_header_ret -Journal::read_segment_header(segment_id_t segment) -{ - return segment_manager.read( - paddr_t{segment, 0}, - segment_manager.get_block_size() - ).handle_error( - read_segment_header_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Journal::read_segment_header" - } - ).safe_then([=](bufferptr bptr) -> read_segment_header_ret { - logger().debug("segment {} bptr size {}", segment, bptr.length()); - - segment_header_t header; - bufferlist bl; - bl.push_back(bptr); - - logger().debug( - "Journal::read_segment_header: segment {} block crc {}", - segment, - bl.begin().crc32c(segment_manager.get_block_size(), 0)); - - auto bp = bl.cbegin(); - try { - decode(header, bp); - } catch (ceph::buffer::error &e) { - logger().debug( - "Journal::read_segment_header: segment {} unable to decode " - "header, skipping", - segment); - return crimson::ct_error::enodata::make(); - } - logger().debug( - "Journal::read_segment_header: segment {} header {}", - segment, - header); - return read_segment_header_ret( - read_segment_header_ertr::ready_future_marker{}, - header); - }); -} - Journal::open_for_write_ret Journal::open_for_write() { return roll_journal_segment().safe_then([this](auto seq) { @@ -272,187 +200,79 @@ Journal::open_for_write_ret Journal::open_for_write() }); } -Journal::find_replay_segments_fut Journal::find_replay_segments() +Journal::prep_replay_segments_fut +Journal::prep_replay_segments( + std::vector> segments) { - return seastar::do_with( - std::vector>(), - [this](auto &&segments) mutable { - return crimson::do_for_each( - boost::make_counting_iterator(segment_id_t{0}), - boost::make_counting_iterator(segment_manager.get_num_segments()), - [this, &segments](auto i) { - return read_segment_header(i - ).safe_then([this, &segments, i](auto header) mutable { - if (generate_nonce( - header.journal_segment_seq, - segment_manager.get_meta()) != header.segment_nonce) { - logger().debug( - "find_replay_segments: nonce mismatch segment {} header {}", - i, - header); - assert(0 == "impossible"); - return find_replay_segments_ertr::now(); - } - - segments.emplace_back(i, std::move(header)); - return find_replay_segments_ertr::now(); - }).handle_error( - crimson::ct_error::enoent::handle([i](auto) { - logger().debug( - "find_replay_segments: segment {} not available for read", - i); - return find_replay_segments_ertr::now(); - }), - crimson::ct_error::enodata::handle([i](auto) { - logger().debug( - "find_replay_segments: segment {} header undecodable", - i); - return find_replay_segments_ertr::now(); - }), - find_replay_segments_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Journal::find_replay_segments" - } - ); - }).safe_then([this, &segments]() mutable -> find_replay_segments_fut { - logger().debug( - "find_replay_segments: have {} segments", - segments.size()); - if (segments.empty()) { - return crimson::ct_error::input_output_error::make(); - } - std::sort( - segments.begin(), - segments.end(), - [](const auto <, const auto &rt) { - return lt.second.journal_segment_seq < - rt.second.journal_segment_seq; - }); - - next_journal_segment_seq = - segments.rbegin()->second.journal_segment_seq + 1; - std::for_each( - segments.begin(), - segments.end(), - [this](auto &seg) { - segment_provider->init_mark_segment_closed( - seg.first, - seg.second.journal_segment_seq); - }); + logger().debug( + "prep_replay_segments: have {} segments", + segments.size()); + if (segments.empty()) { + return crimson::ct_error::input_output_error::make(); + } + std::sort( + segments.begin(), + segments.end(), + [](const auto <, const auto &rt) { + return lt.second.journal_segment_seq < + rt.second.journal_segment_seq; + }); - auto journal_tail = segments.rbegin()->second.journal_tail; - segment_provider->update_journal_tail_committed(journal_tail); - auto replay_from = journal_tail.offset; - logger().debug( - "Journal::find_replay_segments: journal_tail={}", - journal_tail); - auto from = segments.begin(); - if (replay_from != P_ADDR_NULL) { - from = std::find_if( - segments.begin(), - segments.end(), - [&replay_from](const auto &seg) -> bool { - return seg.first == replay_from.segment; - }); - if (from->second.journal_segment_seq != journal_tail.segment_seq) { - logger().error( - "find_replay_segments: journal_tail {} does not match {}", - journal_tail, - from->second); - assert(0 == "invalid"); - } - } else { - replay_from = paddr_t{ - from->first, - (segment_off_t)segment_manager.get_block_size()}; - } - auto ret = replay_segments_t(segments.end() - from); - std::transform( - from, segments.end(), ret.begin(), - [this](const auto &p) { - auto ret = journal_seq_t{ - p.second.journal_segment_seq, - paddr_t{ - p.first, - (segment_off_t)segment_manager.get_block_size()}}; - logger().debug( - "Journal::find_replay_segments: replaying from {}", - ret); - return std::make_pair(ret, p.second); - }); - ret[0].first.offset = replay_from; - return find_replay_segments_fut( - find_replay_segments_ertr::ready_future_marker{}, - std::move(ret)); - }); + next_journal_segment_seq = + segments.rbegin()->second.journal_segment_seq + 1; + std::for_each( + segments.begin(), + segments.end(), + [this](auto &seg) { + segment_provider->init_mark_segment_closed( + seg.first, + seg.second.journal_segment_seq); }); -} -Journal::read_validate_record_metadata_ret Journal::read_validate_record_metadata( - paddr_t start, - segment_nonce_t nonce) -{ - auto block_size = segment_manager.get_block_size(); - if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); + auto journal_tail = segments.rbegin()->second.journal_tail; + segment_provider->update_journal_tail_committed(journal_tail); + auto replay_from = journal_tail.offset; + logger().debug( + "Journal::prep_replay_segments: journal_tail={}", + journal_tail); + auto from = segments.begin(); + if (replay_from != P_ADDR_NULL) { + from = std::find_if( + segments.begin(), + segments.end(), + [&replay_from](const auto &seg) -> bool { + return seg.first == replay_from.segment; + }); + if (from->second.journal_segment_seq != journal_tail.segment_seq) { + logger().error( + "prep_replay_segments: journal_tail {} does not match {}", + journal_tail, + from->second); + assert(0 == "invalid"); + } + } else { + replay_from = paddr_t{ + from->first, + (segment_off_t)segment_manager.get_block_size()}; } - return segment_manager.read(start, block_size - ).safe_then( - [=](bufferptr bptr) mutable - -> read_validate_record_metadata_ret { - logger().debug("read_validate_record_metadata: reading {}", start); - auto block_size = segment_manager.get_block_size(); - bufferlist bl; - bl.append(bptr); - auto bp = bl.cbegin(); - record_header_t header; - try { - decode(header, bp); - } catch (ceph::buffer::error &e) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - if (header.segment_nonce != nonce) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - if (header.mdlength > (extent_len_t)block_size) { - if (start.offset + header.mdlength > - (int64_t)segment_manager.get_segment_size()) { - return crimson::ct_error::input_output_error::make(); - } - return segment_manager.read( - {start.segment, start.offset + (segment_off_t)block_size}, - header.mdlength - block_size).safe_then( - [header=std::move(header), bl=std::move(bl)]( - auto &&bptail) mutable { - bl.push_back(bptail); - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl))); - }); - } else { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl)) - ); - } - }).safe_then([=](auto p) { - if (p && validate_metadata(p->second)) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::move(*p) - ); - } else { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } + auto ret = replay_segments_t(segments.end() - from); + std::transform( + from, segments.end(), ret.begin(), + [this](const auto &p) { + auto ret = journal_seq_t{ + p.second.journal_segment_seq, + paddr_t{ + p.first, + (segment_off_t)segment_manager.get_block_size()}}; + logger().debug( + "Journal::prep_replay_segments: replaying from {}", + ret); + return std::make_pair(ret, p.second); }); + ret[0].first.offset = replay_from; + return prep_replay_segments_fut( + prep_replay_segments_ertr::ready_future_marker{}, + std::move(ret)); } std::optional> Journal::try_decode_deltas( @@ -475,25 +295,6 @@ std::optional> Journal::try_decode_deltas( return deltas; } -std::optional> Journal::try_decode_extent_infos( - record_header_t header, - const bufferlist &bl) -{ - auto bliter = bl.cbegin(); - bliter += ceph::encoded_sizeof_bounded(); - bliter += sizeof(checksum_t) /* crc */; - logger().debug("{}: decoding {} extents", __func__, header.extents); - std::vector extent_infos(header.extents); - for (auto &&i : extent_infos) { - try { - decode(i, bliter); - } catch (ceph::buffer::error &e) { - return std::nullopt; - } - } - return extent_infos; -} - Journal::replay_ertr::future<> Journal::replay_segment( journal_seq_t seq, @@ -503,7 +304,7 @@ Journal::replay_segment( logger().debug("replay_segment: starting at {}", seq); return seastar::do_with( scan_valid_records_cursor(seq.offset), - found_record_handler_t( + Scanner::found_record_handler_t( [=, &handler](paddr_t base, const record_header_t &header, const bufferlist &mdbuf) { @@ -548,20 +349,29 @@ Journal::replay_segment( }); }), [=](auto &cursor, auto &dhandler) { - return scan_valid_records( + return scanner.scan_valid_records( cursor, header.segment_nonce, std::numeric_limits::max(), - dhandler).safe_then([](auto){}); + dhandler).safe_then([](auto){} + ).handle_error( + replay_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "shouldn't meet with any other error other replay_ertr" + } + );; }); } -Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler) +Journal::replay_ret Journal::replay( + std::vector>&& segment_headers, + delta_handler_t &&delta_handler) { return seastar::do_with( std::move(delta_handler), replay_segments_t(), - [this](auto &handler, auto &segments) mutable -> replay_ret { - return find_replay_segments().safe_then( + [this, segment_headers=std::move(segment_headers)] + (auto &handler, auto &segments) mutable -> replay_ret { + return prep_replay_segments(std::move(segment_headers)).safe_then( [this, &handler, &segments](auto replay_segs) mutable { logger().debug("replay: found {} segments", replay_segs.size()); segments = std::move(replay_segs); @@ -572,165 +382,4 @@ Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler) }); } -Journal::scan_extents_ret Journal::scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read) -{ - auto ret = std::make_unique(); - auto* extents = ret.get(); - return read_segment_header(cursor.get_offset().segment - ).handle_error( - scan_extents_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in Journal::scan_extents" - } - ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { - auto segment_nonce = segment_header.segment_nonce; - return seastar::do_with( - found_record_handler_t( - [extents, this]( - paddr_t base, - const record_header_t &header, - const bufferlist &mdbuf) mutable { - - auto infos = try_decode_extent_infos( - header, - mdbuf); - if (!infos) { - // This should be impossible, we did check the crc on the mdbuf - logger().error( - "Journal::scan_extents unable to decode extents for record {}", - base); - assert(infos); - } - - paddr_t extent_offset = base.add_offset(header.mdlength); - for (const auto &i : *infos) { - extents->emplace_back(extent_offset, i); - extent_offset.offset += i.len; - } - return scan_extents_ertr::now(); - }), - [=, &cursor](auto &dhandler) { - return scan_valid_records( - cursor, - segment_nonce, - bytes_to_read, - dhandler).discard_result(); - }); - }).safe_then([ret=std::move(ret)] { - return std::move(*ret); - }); -} - -Journal::scan_valid_records_ret Journal::scan_valid_records( - scan_valid_records_cursor &cursor, - segment_nonce_t nonce, - size_t budget, - found_record_handler_t &handler) -{ - if (cursor.offset.offset == 0) { - cursor.offset.offset = segment_manager.get_block_size(); - } - auto retref = std::make_unique(0); - auto budget_used = *retref; - return crimson::repeat( - [=, &cursor, &budget_used, &handler]() mutable - -> scan_valid_records_ertr::future { - return [=, &handler, &cursor, &budget_used] { - if (!cursor.last_valid_header_found) { - return read_validate_record_metadata(cursor.offset, nonce - ).safe_then([=, &cursor](auto md) { - logger().debug( - "Journal::scan_valid_records: read complete {}", - cursor.offset); - if (!md) { - logger().debug( - "Journal::scan_valid_records: found invalid header at {}, presumably at end", - cursor.offset); - cursor.last_valid_header_found = true; - return scan_valid_records_ertr::now(); - } else { - logger().debug( - "Journal::scan_valid_records: valid record read at {}", - cursor.offset); - cursor.last_committed = paddr_t{ - cursor.offset.segment, - md->first.committed_to}; - cursor.pending_records.emplace_back( - cursor.offset, - md->first, - md->second); - cursor.offset.offset += - md->first.dlength + md->first.mdlength; - return scan_valid_records_ertr::now(); - } - }).safe_then([=, &cursor, &budget_used, &handler] { - return crimson::repeat( - [=, &budget_used, &cursor, &handler] { - logger().debug( - "Journal::scan_valid_records: valid record read, processing queue"); - if (cursor.pending_records.empty()) { - /* This is only possible if the segment is empty. - * A record's last_commited must be prior to its own - * location since it itself cannot yet have been committed - * at its own time of submission. Thus, the most recently - * read record must always fall after cursor.last_committed */ - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - auto &next = cursor.pending_records.front(); - if (next.offset > cursor.last_committed) { - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - budget_used += - next.header.dlength + next.header.mdlength; - return handler( - next.offset, - next.header, - next.mdbuffer - ).safe_then([&cursor] { - cursor.pending_records.pop_front(); - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); - }); - }); - } else { - assert(!cursor.pending_records.empty()); - auto &next = cursor.pending_records.front(); - return read_validate_data(next.offset, next.header - ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) { - if (!valid) { - cursor.pending_records.clear(); - return scan_valid_records_ertr::now(); - } - budget_used += - next.header.dlength + next.header.mdlength; - return handler( - next.offset, - next.header, - next.mdbuffer - ).safe_then([&cursor] { - cursor.pending_records.pop_front(); - return scan_valid_records_ertr::now(); - }); - }); - } - }().safe_then([=, &budget_used, &cursor] { - if (cursor.is_complete() || budget_used >= budget) { - return seastar::stop_iteration::yes; - } else { - return seastar::stop_iteration::no; - } - }); - }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { - return scan_valid_records_ret( - scan_valid_records_ertr::ready_future_marker{}, - std::move(*retref)); - }); -} - - } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index e7552bd1316..625a102347e 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -14,6 +14,7 @@ #include "include/denc.h" #include "crimson/common/log.h" +#include "crimson/os/seastore/scanner.h" #include "crimson/os/seastore/segment_manager.h" #include "crimson/os/seastore/ordering_handle.h" #include "crimson/os/seastore/seastore_types.h" @@ -29,7 +30,7 @@ class SegmentedAllocator; */ class Journal { public: - Journal(SegmentManager &segment_manager); + Journal(SegmentManager &segment_manager, Scanner& scanner); /** * Sets the SegmentProvider. @@ -136,26 +137,9 @@ public: replay_ret(journal_seq_t seq, paddr_t record_block_base, const delta_info_t&)>; - replay_ret replay(delta_handler_t &&delta_handler); - - /** - * scan_extents - * - * Scans records beginning at addr until the first record boundary after - * addr + bytes_to_read. - * - * Returns list - * cursor.is_complete() will be true when no further extents exist in segment. - */ - class scan_valid_records_cursor; - using scan_extents_cursor = scan_valid_records_cursor; - using scan_extents_ertr = SegmentManager::read_ertr; - using scan_extents_ret_bare = std::list>; - using scan_extents_ret = scan_extents_ertr::future; - scan_extents_ret scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read - ); + replay_ret replay( + std::vector>&& segment_headers, + delta_handler_t &&delta_handler); void set_write_pipeline(WritePipeline *_write_pipeline) { write_pipeline = _write_pipeline; @@ -172,6 +156,7 @@ private: segment_off_t written_to = 0; segment_off_t committed_to = 0; + Scanner& scanner; WritePipeline *write_pipeline = nullptr; void reset_soft_state() { @@ -187,18 +172,6 @@ private: initialize_segment_ertr::future initialize_segment( Segment &segment); - /// validate embedded metadata checksum - static bool validate_metadata(const bufferlist &bl); - - /// read and validate data - using read_validate_data_ertr = SegmentManager::read_ertr; - using read_validate_data_ret = read_validate_data_ertr::future; - read_validate_data_ret read_validate_data( - paddr_t record_base, - const record_header_t &header ///< caller must ensure lifetime through - /// future resolution - ); - /// do record write using write_record_ertr = crimson::errorator< @@ -217,96 +190,24 @@ private: /// returns true iff current segment has insufficient space bool needs_roll(segment_off_t length) const; - using read_segment_header_ertr = crimson::errorator< - crimson::ct_error::enoent, - crimson::ct_error::enodata, - crimson::ct_error::input_output_error - >; - using read_segment_header_ret = read_segment_header_ertr::future< - segment_header_t>; - read_segment_header_ret read_segment_header(segment_id_t segment); - /// return ordered vector of segments to replay using replay_segments_t = std::vector< std::pair>; - using find_replay_segments_ertr = crimson::errorator< + using prep_replay_segments_ertr = crimson::errorator< crimson::ct_error::input_output_error >; - using find_replay_segments_fut = find_replay_segments_ertr::future< + using prep_replay_segments_fut = prep_replay_segments_ertr::future< replay_segments_t>; - find_replay_segments_fut find_replay_segments(); + prep_replay_segments_fut prep_replay_segments( + std::vector> segments); /// attempts to decode deltas from bl, return nullopt if unsuccessful std::optional> try_decode_deltas( record_header_t header, const bufferlist &bl); - /// attempts to decode extent infos from bl, return nullopt if unsuccessful - std::optional> try_decode_extent_infos( - record_header_t header, - const bufferlist &bl); - - /// read record metadata for record starting at start - using read_validate_record_metadata_ertr = replay_ertr; - using read_validate_record_metadata_ret = - read_validate_record_metadata_ertr::future< - std::optional> - >; - read_validate_record_metadata_ret read_validate_record_metadata( - paddr_t start, - segment_nonce_t nonce); - -public: - /// scan segment for end incrementally - struct scan_valid_records_cursor { - bool last_valid_header_found = false; - paddr_t offset; - paddr_t last_committed; - - struct found_record_t { - paddr_t offset; - record_header_t header; - bufferlist mdbuffer; - - found_record_t( - paddr_t offset, - const record_header_t &header, - const bufferlist &mdbuffer) - : offset(offset), header(header), mdbuffer(mdbuffer) {} - }; - std::deque pending_records; - - bool is_complete() const { - return last_valid_header_found && pending_records.empty(); - } - - paddr_t get_offset() const { - return offset; - } - - scan_valid_records_cursor( - paddr_t offset) - : offset(offset) {} - }; private: - using scan_valid_records_ertr = SegmentManager::read_ertr; - using scan_valid_records_ret = scan_valid_records_ertr::future< - size_t>; - using found_record_handler_t = std::function< - scan_valid_records_ertr::future<>( - paddr_t record_block_base, - // callee may assume header and bl will remain valid until - // returned future resolves - const record_header_t &header, - const bufferlist &bl)>; - scan_valid_records_ret scan_valid_records( - scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call - segment_nonce_t nonce, ///< [in] nonce for segment - size_t budget, ///< [in] max budget to use - found_record_handler_t &handler ///< [in] handler for records - ); ///< @return used budget - /// replays records starting at start through end of segment replay_ertr::future<> replay_segment( diff --git a/src/crimson/os/seastore/scanner.cc b/src/crimson/os/seastore/scanner.cc new file mode 100644 index 00000000000..df3394cc60f --- /dev/null +++ b/src/crimson/os/seastore/scanner.cc @@ -0,0 +1,336 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/scanner.h" +#include "crimson/common/log.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_seastore); + } +} + +namespace crimson::os::seastore { + +Scanner::read_segment_header_ret +Scanner::read_segment_header(segment_id_t segment) +{ + return segment_manager.read( + paddr_t{segment, 0}, + segment_manager.get_block_size() + ).handle_error( + read_segment_header_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in Scanner::read_segment_header" + } + ).safe_then([=](bufferptr bptr) -> read_segment_header_ret { + logger().debug("segment {} bptr size {}", segment, bptr.length()); + + segment_header_t header; + bufferlist bl; + bl.push_back(bptr); + + logger().debug( + "Scanner::read_segment_header: segment {} block crc {}", + segment, + bl.begin().crc32c(segment_manager.get_block_size(), 0)); + + auto bp = bl.cbegin(); + try { + decode(header, bp); + } catch (ceph::buffer::error &e) { + logger().debug( + "Scanner::read_segment_header: segment {} unable to decode " + "header, skipping", + segment); + return crimson::ct_error::enodata::make(); + } + logger().debug( + "Scanner::read_segment_header: segment {} header {}", + segment, + header); + return read_segment_header_ret( + read_segment_header_ertr::ready_future_marker{}, + header); + }); +} +Scanner::scan_extents_ret Scanner::scan_extents( + scan_extents_cursor &cursor, + extent_len_t bytes_to_read) +{ + auto ret = std::make_unique(); + auto* extents = ret.get(); + return read_segment_header(cursor.get_offset().segment + ).handle_error( + scan_extents_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in Scanner::scan_extents" + } + ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { + auto segment_nonce = segment_header.segment_nonce; + return seastar::do_with( + found_record_handler_t( + [extents, this]( + paddr_t base, + const record_header_t &header, + const bufferlist &mdbuf) mutable { + + auto infos = try_decode_extent_infos( + header, + mdbuf); + if (!infos) { + // This should be impossible, we did check the crc on the mdbuf + logger().error( + "Scanner::scan_extents unable to decode extents for record {}", + base); + assert(infos); + } + + paddr_t extent_offset = base.add_offset(header.mdlength); + for (const auto &i : *infos) { + extents->emplace_back(extent_offset, i); + extent_offset.offset += i.len; + } + return scan_extents_ertr::now(); + }), + [=, &cursor](auto &dhandler) { + return scan_valid_records( + cursor, + segment_nonce, + bytes_to_read, + dhandler).discard_result(); + }); + }).safe_then([ret=std::move(ret)] { + return std::move(*ret); + }); +} + +Scanner::scan_valid_records_ret Scanner::scan_valid_records( + scan_valid_records_cursor &cursor, + segment_nonce_t nonce, + size_t budget, + found_record_handler_t &handler) +{ + if (cursor.offset.offset == 0) { + cursor.offset.offset = segment_manager.get_block_size(); + } + auto retref = std::make_unique(0); + auto budget_used = *retref; + return crimson::repeat( + [=, &cursor, &budget_used, &handler]() mutable + -> scan_valid_records_ertr::future { + return [=, &handler, &cursor, &budget_used] { + if (!cursor.last_valid_header_found) { + return read_validate_record_metadata(cursor.offset, nonce + ).safe_then([=, &cursor](auto md) { + logger().debug( + "Scanner::scan_valid_records: read complete {}", + cursor.offset); + if (!md) { + logger().debug( + "Scanner::scan_valid_records: found invalid header at {}, presumably at end", + cursor.offset); + cursor.last_valid_header_found = true; + return scan_valid_records_ertr::now(); + } else { + logger().debug( + "Scanner::scan_valid_records: valid record read at {}", + cursor.offset); + cursor.last_committed = paddr_t{ + cursor.offset.segment, + md->first.committed_to}; + cursor.pending_records.emplace_back( + cursor.offset, + md->first, + md->second); + cursor.offset.offset += + md->first.dlength + md->first.mdlength; + return scan_valid_records_ertr::now(); + } + }).safe_then([=, &cursor, &budget_used, &handler] { + return crimson::repeat( + [=, &budget_used, &cursor, &handler] { + logger().debug( + "Scanner::scan_valid_records: valid record read, processing queue"); + if (cursor.pending_records.empty()) { + /* This is only possible if the segment is empty. + * A record's last_commited must be prior to its own + * location since it itself cannot yet have been committed + * at its own time of submission. Thus, the most recently + * read record must always fall after cursor.last_committed */ + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + auto &next = cursor.pending_records.front(); + if (next.offset > cursor.last_committed) { + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + budget_used += + next.header.dlength + next.header.mdlength; + return handler( + next.offset, + next.header, + next.mdbuffer + ).safe_then([&cursor] { + cursor.pending_records.pop_front(); + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + }); + }); + } else { + assert(!cursor.pending_records.empty()); + auto &next = cursor.pending_records.front(); + return read_validate_data(next.offset, next.header + ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) { + if (!valid) { + cursor.pending_records.clear(); + return scan_valid_records_ertr::now(); + } + budget_used += + next.header.dlength + next.header.mdlength; + return handler( + next.offset, + next.header, + next.mdbuffer + ).safe_then([&cursor] { + cursor.pending_records.pop_front(); + return scan_valid_records_ertr::now(); + }); + }); + } + }().safe_then([=, &budget_used, &cursor] { + if (cursor.is_complete() || budget_used >= budget) { + return seastar::stop_iteration::yes; + } else { + return seastar::stop_iteration::no; + } + }); + }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { + return scan_valid_records_ret( + scan_valid_records_ertr::ready_future_marker{}, + std::move(*retref)); + }); +} + +Scanner::read_validate_record_metadata_ret +Scanner::read_validate_record_metadata( + paddr_t start, + segment_nonce_t nonce) +{ + auto block_size = segment_manager.get_block_size(); + if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + return segment_manager.read(start, block_size + ).safe_then( + [=](bufferptr bptr) mutable + -> read_validate_record_metadata_ret { + logger().debug("read_validate_record_metadata: reading {}", start); + auto block_size = segment_manager.get_block_size(); + bufferlist bl; + bl.append(bptr); + auto bp = bl.cbegin(); + record_header_t header; + try { + decode(header, bp); + } catch (ceph::buffer::error &e) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + if (header.segment_nonce != nonce) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + if (header.mdlength > (extent_len_t)block_size) { + if (start.offset + header.mdlength > + (int64_t)segment_manager.get_segment_size()) { + return crimson::ct_error::input_output_error::make(); + } + return segment_manager.read( + {start.segment, start.offset + (segment_off_t)block_size}, + header.mdlength - block_size).safe_then( + [header=std::move(header), bl=std::move(bl)]( + auto &&bptail) mutable { + bl.push_back(bptail); + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl))); + }); + } else { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl)) + ); + } + }).safe_then([=](auto p) { + if (p && validate_metadata(p->second)) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::move(*p) + ); + } else { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + }); +} + +std::optional> +Scanner::try_decode_extent_infos( + record_header_t header, + const bufferlist &bl) +{ + auto bliter = bl.cbegin(); + bliter += ceph::encoded_sizeof_bounded(); + bliter += sizeof(checksum_t) /* crc */; + logger().debug("{}: decoding {} extents", __func__, header.extents); + std::vector extent_infos(header.extents); + for (auto &&i : extent_infos) { + try { + decode(i, bliter); + } catch (ceph::buffer::error &e) { + return std::nullopt; + } + } + return extent_infos; +} + +Scanner::read_validate_data_ret +Scanner::read_validate_data( + paddr_t record_base, + const record_header_t &header) +{ + return segment_manager.read( + record_base.add_offset(header.mdlength), + header.dlength + ).safe_then([=, &header](auto bptr) { + bufferlist bl; + bl.append(bptr); + return bl.crc32c(-1) == header.data_crc; + }); +} + +bool Scanner::validate_metadata(const bufferlist &bl) +{ + auto bliter = bl.cbegin(); + auto test_crc = bliter.crc32c( + ceph::encoded_sizeof_bounded(), + -1); + ceph_le32 recorded_crc_le; + decode(recorded_crc_le, bliter); + uint32_t recorded_crc = recorded_crc_le; + test_crc = bliter.crc32c( + bliter.get_remaining(), + test_crc); + return test_crc == recorded_crc; +} + +} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/scanner.h b/src/crimson/os/seastore/scanner.h new file mode 100644 index 00000000000..36bec0858d4 --- /dev/null +++ b/src/crimson/os/seastore/scanner.h @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/seastore_types.h" +#include "crimson/os/seastore/segment_manager.h" + +namespace crimson::os::seastore { + +class SegmentCleaner; + +class Scanner { +public: + using read_ertr = crimson::errorator< + crimson::ct_error::input_output_error, + crimson::ct_error::invarg, + crimson::ct_error::enoent, + crimson::ct_error::erange>; + + Scanner(SegmentManager& segment_manager) + : segment_manager(segment_manager) {} + using read_segment_header_ertr = crimson::errorator< + crimson::ct_error::enoent, + crimson::ct_error::enodata, + crimson::ct_error::input_output_error + >; + using read_segment_header_ret = read_segment_header_ertr::future< + segment_header_t>; + read_segment_header_ret read_segment_header(segment_id_t segment); + + /** + * scan_extents + * + * Scans records beginning at addr until the first record boundary after + * addr + bytes_to_read. + * + * Returns list + * cursor.is_complete() will be true when no further extents exist in segment. + */ + using scan_extents_cursor = scan_valid_records_cursor; + using scan_extents_ertr = read_ertr::extend; + using scan_extents_ret_bare = std::list>; + using scan_extents_ret = scan_extents_ertr::future; + scan_extents_ret scan_extents( + scan_extents_cursor &cursor, + extent_len_t bytes_to_read + ); + + using scan_valid_records_ertr = read_ertr::extend; + using scan_valid_records_ret = scan_valid_records_ertr::future< + size_t>; + using found_record_handler_t = std::function< + scan_valid_records_ertr::future<>( + paddr_t record_block_base, + // callee may assume header and bl will remain valid until + // returned future resolves + const record_header_t &header, + const bufferlist &bl)>; + scan_valid_records_ret scan_valid_records( + scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call + segment_nonce_t nonce, ///< [in] nonce for segment + size_t budget, ///< [in] max budget to use + found_record_handler_t &handler ///< [in] handler for records + ); ///< @return used budget + +private: + SegmentManager& segment_manager; + + /// read record metadata for record starting at start + using read_validate_record_metadata_ertr = read_ertr; + using read_validate_record_metadata_ret = + read_validate_record_metadata_ertr::future< + std::optional> + >; + read_validate_record_metadata_ret read_validate_record_metadata( + paddr_t start, + segment_nonce_t nonce); + + /// attempts to decode extent infos from bl, return nullopt if unsuccessful + std::optional> try_decode_extent_infos( + record_header_t header, + const bufferlist &bl); + + /// read and validate data + using read_validate_data_ertr = read_ertr; + using read_validate_data_ret = read_validate_data_ertr::future; + read_validate_data_ret read_validate_data( + paddr_t record_base, + const record_header_t &header ///< caller must ensure lifetime through + /// future resolution + ); + + /// validate embedded metadata checksum + static bool validate_metadata(const bufferlist &bl); + +}; + +using ScannerRef = std::unique_ptr; + +} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index 3973da181b2..2aa4e6c3156 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -1173,11 +1173,14 @@ std::unique_ptr make_seastore( segment_manager::block::BlockSegmentManager >(device + "/block"); + auto scanner = std::make_unique(*sm); + auto& scanner_ref = *scanner.get(); auto segment_cleaner = std::make_unique( SegmentCleaner::config_t::get_default(), + std::move(scanner), false /* detailed */); - auto journal = std::make_unique(*sm); + auto journal = std::make_unique(*sm, scanner_ref); auto cache = std::make_unique(*sm); auto lba_manager = lba_manager::create_lba_manager(*sm, *cache); diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index 704f2bcea2f..465cd2dfb47 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -840,6 +840,38 @@ ceph::bufferlist encode_record( segment_off_t committed_to, segment_nonce_t current_segment_nonce = 0); +/// scan segment for end incrementally +struct scan_valid_records_cursor { + bool last_valid_header_found = false; + paddr_t offset; + paddr_t last_committed; + + struct found_record_t { + paddr_t offset; + record_header_t header; + bufferlist mdbuffer; + + found_record_t( + paddr_t offset, + const record_header_t &header, + const bufferlist &mdbuffer) + : offset(offset), header(header), mdbuffer(mdbuffer) {} + }; + std::deque pending_records; + + bool is_complete() const { + return last_valid_header_found && pending_records.empty(); + } + + paddr_t get_offset() const { + return offset; + } + + scan_valid_records_cursor( + paddr_t offset) + : offset(offset) {} +}; + } WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::seastore_meta_t) diff --git a/src/crimson/os/seastore/segment_cleaner.cc b/src/crimson/os/seastore/segment_cleaner.cc index 283b7eece89..82bf2d22fc4 100644 --- a/src/crimson/os/seastore/segment_cleaner.cc +++ b/src/crimson/os/seastore/segment_cleaner.cc @@ -144,9 +144,13 @@ void SpaceTrackerDetailed::dump_usage(segment_id_t id) const segment_usage[id].dump_usage(block_size); } -SegmentCleaner::SegmentCleaner(config_t config, bool detailed) +SegmentCleaner::SegmentCleaner( + config_t config, + ScannerRef&& scr, + bool detailed) : detailed(detailed), config(config), + scanner(std::move(scr)), gc_process(*this) { register_metrics(); @@ -304,7 +308,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() } next.offset = 0; scan_cursor = - std::make_unique( + std::make_unique( next); logger().debug( "SegmentCleaner::do_gc: starting gc on segment {}", @@ -313,7 +317,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() ceph_assert(!scan_cursor->is_complete()); } - return ecb->scan_extents( + return scanner->scan_extents( *scan_cursor, config.reclaim_bytes_stride ).safe_then([this](auto &&_extents) { @@ -372,4 +376,41 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() }); } +SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() { + return seastar::do_with( + std::vector>(), + [this](auto& segments) { + return crimson::do_for_each( + boost::make_counting_iterator(segment_id_t{0}), + boost::make_counting_iterator(segment_id_t{num_segments}), + [this, &segments](auto segment_id) { + return scanner->read_segment_header(segment_id) + .safe_then([&segments, segment_id, this](auto header) { + if (header.out_of_line) { + logger().debug("Scanner::init_segments: out-of-line segment {}", segment_id); + init_mark_segment_closed( + segment_id, + header.journal_segment_seq); + } else { + logger().debug("Scanner::init_segments: journal segment {}", segment_id); + segments.emplace_back(std::make_pair(segment_id, std::move(header))); + } + return seastar::now(); + }).handle_error( + crimson::ct_error::enoent::handle([](auto) { + return init_segments_ertr::now(); + }), + crimson::ct_error::enodata::handle([](auto) { + return init_segments_ertr::now(); + }), + crimson::ct_error::input_output_error::pass_further{} + ); + }).safe_then([&segments] { + return seastar::make_ready_future< + std::vector>>( + std::move(segments)); + }); + }); +} + } diff --git a/src/crimson/os/seastore/segment_cleaner.h b/src/crimson/os/seastore/segment_cleaner.h index a94a133301f..f79dee67751 100644 --- a/src/crimson/os/seastore/segment_cleaner.h +++ b/src/crimson/os/seastore/segment_cleaner.h @@ -108,7 +108,7 @@ class SpaceTrackerSimple : public SpaceTrackerI { return live_bytes_by_segment[segment]; } public: - SpaceTrackerSimple(size_t num_segments) + SpaceTrackerSimple(segment_id_t num_segments) : live_bytes_by_segment(num_segments, 0) {} int64_t allocate( @@ -190,7 +190,7 @@ class SpaceTrackerDetailed : public SpaceTrackerI { std::vector segment_usage; public: - SpaceTrackerDetailed(size_t num_segments, size_t segment_size, size_t block_size) + SpaceTrackerDetailed(segment_id_t num_segments, size_t segment_size, size_t block_size) : block_size(block_size), segment_size(segment_size), segment_usage(num_segments, segment_size / block_size) {} @@ -345,18 +345,6 @@ public: laddr_t laddr, segment_off_t len) = 0; - /** - * scan_extents - * - * Interface shim for Journal::scan_extents - */ - using scan_extents_cursor = Journal::scan_valid_records_cursor; - using scan_extents_ertr = Journal::scan_extents_ertr; - using scan_extents_ret = Journal::scan_extents_ret; - virtual scan_extents_ret scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read) = 0; - /** * release_segment * @@ -386,10 +374,12 @@ private: const bool detailed; const config_t config; - size_t num_segments = 0; + segment_id_t num_segments = 0; size_t segment_size = 0; size_t block_size = 0; + ScannerRef scanner; + SpaceTrackerIRef space_tracker; std::vector segments; size_t empty_segments; @@ -417,7 +407,10 @@ private: std::optional> blocked_io_wake; public: - SegmentCleaner(config_t config, bool detailed = false); + SegmentCleaner( + config_t config, + ScannerRef&& scanner, + bool detailed = false); void mount(SegmentManager &sm) { init_complete = false; @@ -444,6 +437,13 @@ public: empty_segments = num_segments; } + using init_segments_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + using init_segments_ret_bare = + std::vector>; + using init_segments_ret = init_segments_ertr::future; + init_segments_ret init_segments(); + get_segment_ret get_segment() final; void close_segment(segment_id_t segment) final; @@ -633,7 +633,7 @@ private: // GC status helpers std::unique_ptr< - ExtentCallbackInterface::scan_extents_cursor + Scanner::scan_extents_cursor > scan_cursor; /** @@ -711,7 +711,7 @@ private: } gc_process; using gc_ertr = work_ertr::extend_ertr< - ExtentCallbackInterface::scan_extents_ertr + Scanner::scan_extents_ertr >; gc_cycle_ret do_gc_cycle(); diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index 43e923ca0b4..3ef0f5af59b 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -63,8 +63,13 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount() LOG_PREFIX(TransactionManager::mount); cache->init(); segment_cleaner->mount(segment_manager); - return journal->replay([this](auto seq, auto paddr, const auto &e) { - return cache->replay_delta(seq, paddr, e); + return segment_cleaner->init_segments().safe_then( + [this](auto&& segments) { + return journal->replay( + std::move(segments), + [this](auto seq, auto paddr, const auto &e) { + return cache->replay_delta(seq, paddr, e); + }); }).safe_then([this] { return journal->open_for_write(); }).safe_then([this, FNAME](auto addr) { diff --git a/src/crimson/os/seastore/transaction_manager.h b/src/crimson/os/seastore/transaction_manager.h index 03a0d32a875..f12aac95ff0 100644 --- a/src/crimson/os/seastore/transaction_manager.h +++ b/src/crimson/os/seastore/transaction_manager.h @@ -373,18 +373,6 @@ public: laddr_t laddr, segment_off_t len) final; - using scan_extents_cursor = - SegmentCleaner::ExtentCallbackInterface::scan_extents_cursor; - using scan_extents_ertr = - SegmentCleaner::ExtentCallbackInterface::scan_extents_ertr; - using scan_extents_ret = - SegmentCleaner::ExtentCallbackInterface::scan_extents_ret; - scan_extents_ret scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read) final { - return journal->scan_extents(cursor, bytes_to_read); - } - using release_segment_ret = SegmentCleaner::ExtentCallbackInterface::release_segment_ret; release_segment_ret release_segment( diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index 3c94f5226e5..84809db5aa4 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -27,6 +27,7 @@ using namespace crimson::os::seastore::lba_manager::btree; struct btree_lba_manager_test : public seastar_test_suite_t, SegmentProvider { segment_manager::EphemeralSegmentManagerRef segment_manager; + ScannerRef scanner; Journal journal; Cache cache; BtreeLBAManagerRef lba_manager; @@ -37,7 +38,8 @@ struct btree_lba_manager_test : btree_lba_manager_test() : segment_manager(segment_manager::create_test_ephemeral()), - journal(*segment_manager), + scanner(new Scanner(*segment_manager)), + journal(*segment_manager, *scanner), cache(*segment_manager), lba_manager(new BtreeLBAManager(*segment_manager, cache)), block_size(segment_manager->get_block_size()) diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc index 2cc2439c7de..2be3cf94966 100644 --- a/src/test/crimson/seastore/test_seastore_journal.cc +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -74,11 +74,13 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { const segment_off_t block_size; + ScannerRef scanner; + journal_test_t() : segment_manager(segment_manager::create_test_ephemeral()), - block_size(segment_manager->get_block_size()) - { - } + block_size(segment_manager->get_block_size()), + scanner(std::make_unique(*segment_manager)) + {} segment_id_t next = 0; get_segment_ret get_segment() final { @@ -91,7 +93,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { void update_journal_tail_committed(journal_seq_t paddr) final {} seastar::future<> set_up_fut() final { - journal.reset(new Journal(*segment_manager)); + journal.reset(new Journal(*segment_manager, *scanner)); journal->set_segment_provider(this); journal->set_write_pipeline(&pipeline); return segment_manager->init( @@ -108,12 +110,43 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { auto replay(T &&f) { return journal->close( ).safe_then([this, f=std::move(f)]() mutable { - journal.reset(new Journal(*segment_manager)); + journal.reset(new Journal(*segment_manager, *scanner)); journal->set_segment_provider(this); journal->set_write_pipeline(&pipeline); - return journal->replay(std::forward(std::move(f))); - }).safe_then([this] { - return journal->open_for_write(); + return seastar::do_with( + std::vector>(), + [this](auto& segments) { + return crimson::do_for_each( + boost::make_counting_iterator(segment_id_t{0}), + boost::make_counting_iterator(segment_manager->get_num_segments()), + [this, &segments](auto segment_id) { + return scanner->read_segment_header(segment_id) + .safe_then([&segments, segment_id](auto header) { + if (!header.out_of_line) { + segments.emplace_back(std::make_pair(segment_id, std::move(header))); + } + return seastar::now(); + }).handle_error( + crimson::ct_error::enoent::handle([](auto) { + return SegmentCleaner::init_segments_ertr::now(); + }), + crimson::ct_error::enodata::handle([](auto) { + return SegmentCleaner::init_segments_ertr::now(); + }), + crimson::ct_error::input_output_error::pass_further{} + ); + }).safe_then([&segments] { + return seastar::make_ready_future< + std::vector>>( + std::move(segments)); + }); + }).safe_then([this, f=std::move(f)](auto&& segments) mutable { + return journal->replay( + std::move(segments), + std::forward(std::move(f))); + }).safe_then([this] { + return journal->open_for_write(); + }); }); } diff --git a/src/test/crimson/seastore/transaction_manager_test_state.h b/src/test/crimson/seastore/transaction_manager_test_state.h index a4abee89601..4d9eab79182 100644 --- a/src/test/crimson/seastore/transaction_manager_test_state.h +++ b/src/test/crimson/seastore/transaction_manager_test_state.h @@ -70,10 +70,13 @@ protected: auto get_transaction_manager( SegmentManager &segment_manager) { + auto scanner = std::make_unique(segment_manager); + auto& scanner_ref = *scanner.get(); auto segment_cleaner = std::make_unique( SegmentCleaner::config_t::get_default(), + std::move(scanner), true); - auto journal = std::make_unique(segment_manager); + auto journal = std::make_unique(segment_manager, scanner_ref); auto cache = std::make_unique(segment_manager); auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);