From: Yingxin Cheng Date: Thu, 24 Mar 2022 06:31:03 +0000 (+0800) Subject: crimson/os/seastore: rename ExtentReader to SegmentManagerGroup X-Git-Tag: v18.0.0~1075^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1daf02b583e49eb7203931309c5384da9bf3b044;p=ceph-ci.git crimson/os/seastore: rename ExtentReader to SegmentManagerGroup Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index a10d1c677d8..2e3ec4a5fe0 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -7,7 +7,6 @@ set(crimson_seastore_srcs transaction_manager.cc transaction.cc cache.cc - extent_reader.cc lba_manager.cc segment_cleaner.cc lba_manager/btree/btree_lba_manager.cc @@ -40,6 +39,7 @@ set(crimson_seastore_srcs journal/segment_allocator.cc journal.cc device.cc + segment_manager_group.cc ../../../test/crimson/seastore/test_block.cc ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc ) diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc deleted file mode 100644 index a2f3c10ab3a..00000000000 --- a/src/crimson/os/seastore/extent_reader.cc +++ /dev/null @@ -1,383 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- -// vim: ts=8 sw=2 smarttab expandtab - -#include "crimson/os/seastore/extent_reader.h" - -#include "crimson/os/seastore/logging.h" - -SET_SUBSYS(seastore_journal); - -namespace crimson::os::seastore { - -ExtentReader::read_segment_tail_ret -ExtentReader::read_segment_tail(segment_id_t segment) -{ - auto& segment_manager = *segment_managers[segment.device_id()]; - return segment_manager.read( - paddr_t::make_seg_paddr( - segment, - segment_manager.get_segment_size() - - segment_manager.get_rounded_tail_length()), - segment_manager.get_rounded_tail_length() - ).handle_error( - read_segment_header_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in ExtentReader::read_segment_tail" - } - ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret { - LOG_PREFIX(ExtentReader::read_segment_tail); - DEBUG("segment {} bptr size {}", segment, bptr.length()); - - segment_tail_t tail; - bufferlist bl; - bl.push_back(bptr); - - DEBUG("segment {} block crc {}", - segment, - bl.begin().crc32c(segment_manager.get_block_size(), 0)); - - auto bp = bl.cbegin(); - try { - decode(tail, bp); - } catch (ceph::buffer::error &e) { - DEBUG("segment {} unable to decode tail, skipping -- {}", - segment, e); - return crimson::ct_error::enodata::make(); - } - DEBUG("segment {} tail {}", segment, tail); - return read_segment_tail_ret( - read_segment_tail_ertr::ready_future_marker{}, - tail); - }); -} - -ExtentReader::read_segment_header_ret -ExtentReader::read_segment_header(segment_id_t segment) -{ - auto& segment_manager = *segment_managers[segment.device_id()]; - return segment_manager.read( - paddr_t::make_seg_paddr(segment, 0), - segment_manager.get_block_size() - ).handle_error( - read_segment_header_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in ExtentReader::read_segment_header" - } - ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret { - LOG_PREFIX(ExtentReader::read_segment_header); - DEBUG("segment {} bptr size {}", segment, bptr.length()); - - segment_header_t header; - bufferlist bl; - bl.push_back(bptr); - - DEBUG("segment {} block crc {}", - segment, - bl.begin().crc32c(segment_manager.get_block_size(), 0)); - - auto bp = bl.cbegin(); - try { - decode(header, bp); - } catch (ceph::buffer::error &e) { - DEBUG("segment {} unable to decode header, skipping -- {}", - segment, e); - return crimson::ct_error::enodata::make(); - } - DEBUG("segment {} header {}", segment, header); - return read_segment_header_ret( - read_segment_header_ertr::ready_future_marker{}, - header); - }); -} - -ExtentReader::scan_extents_ret ExtentReader::scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read) -{ - auto ret = std::make_unique(); - auto* extents = ret.get(); - return read_segment_header(cursor.get_segment_id() - ).handle_error( - scan_extents_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in ExtentReader::scan_extents" - } - ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { - auto segment_nonce = segment_header.segment_nonce; - return seastar::do_with( - found_record_handler_t([extents]( - record_locator_t locator, - const record_group_header_t& header, - const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<> - { - LOG_PREFIX(ExtentReader::scan_extents); - DEBUG("decoding {} records", header.records); - auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf); - if (!maybe_record_extent_infos) { - // This should be impossible, we did check the crc on the mdbuf - ERROR("unable to decode extents for record {}", - locator.record_block_base); - return crimson::ct_error::input_output_error::make(); - } - - paddr_t extent_offset = locator.record_block_base; - for (auto& r: *maybe_record_extent_infos) { - DEBUG("decoded {} extents", r.extent_infos.size()); - for (const auto &i : r.extent_infos) { - extents->emplace_back( - extent_offset, - std::pair( - {r.header.commit_time, - r.header.commit_type}, - i)); - auto& seg_addr = extent_offset.as_seg_paddr(); - seg_addr.set_segment_off( - seg_addr.get_segment_off() + i.len); - } - } - return scan_extents_ertr::now(); - }), - [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) { - return scan_valid_records( - cursor, - segment_nonce, - bytes_to_read, - dhandler - ).discard_result(); - } - ); - }).safe_then([ret=std::move(ret)] { - return std::move(*ret); - }); -} - -ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( - scan_valid_records_cursor &cursor, - segment_nonce_t nonce, - size_t budget, - found_record_handler_t &handler) -{ - LOG_PREFIX(ExtentReader::scan_valid_records); - auto& segment_manager = - *segment_managers[cursor.get_segment_id().device_id()]; - if (cursor.get_segment_offset() == 0) { - INFO("start to scan segment {}", cursor.get_segment_id()); - cursor.increment_seq(segment_manager.get_block_size()); - } - DEBUG("starting at {}, budget={}", cursor, budget); - auto retref = std::make_unique(0); - auto &budget_used = *retref; - return crimson::repeat( - [=, &cursor, &budget_used, &handler]() mutable - -> scan_valid_records_ertr::future { - return [=, &handler, &cursor, &budget_used] { - if (!cursor.last_valid_header_found) { - return read_validate_record_metadata(cursor.seq.offset, nonce - ).safe_then([=, &cursor](auto md) { - if (!md) { - cursor.last_valid_header_found = true; - if (cursor.is_complete()) { - INFO("complete at {}, invalid record group metadata", - cursor); - } else { - DEBUG("found invalid record group metadata at {}, " - "processing {} pending record groups", - cursor.seq, - cursor.pending_record_groups.size()); - } - return scan_valid_records_ertr::now(); - } else { - auto& [header, md_bl] = *md; - DEBUG("found valid {} at {}", header, cursor.seq); - cursor.emplace_record_group(header, std::move(md_bl)); - return scan_valid_records_ertr::now(); - } - }).safe_then([=, &cursor, &budget_used, &handler] { - DEBUG("processing committed record groups until {}, {} pending", - cursor.last_committed, - cursor.pending_record_groups.size()); - return crimson::repeat( - [=, &budget_used, &cursor, &handler] { - if (cursor.pending_record_groups.empty()) { - /* This is only possible if the segment is empty. - * A record's last_commited must be prior to its own - * location since it itself cannot yet have been committed - * at its own time of submission. Thus, the most recently - * read record must always fall after cursor.last_committed */ - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - auto &next = cursor.pending_record_groups.front(); - journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset}; - if (cursor.last_committed == JOURNAL_SEQ_NULL || - next_seq > cursor.last_committed) { - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - return consume_next_records(cursor, handler, budget_used - ).safe_then([] { - return scan_valid_records_ertr::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); - }); - }); - } else { - assert(!cursor.pending_record_groups.empty()); - auto &next = cursor.pending_record_groups.front(); - return read_validate_data(next.offset, next.header - ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) { - if (!valid) { - INFO("complete at {}, invalid record group data at {}, {}", - cursor, next.offset, next.header); - cursor.pending_record_groups.clear(); - return scan_valid_records_ertr::now(); - } - return consume_next_records(cursor, handler, budget_used); - }); - } - }().safe_then([=, &budget_used, &cursor] { - if (cursor.is_complete() || budget_used >= budget) { - DEBUG("finish at {}, budget_used={}, budget={}", - cursor, budget_used, budget); - return seastar::stop_iteration::yes; - } else { - return seastar::stop_iteration::no; - } - }); - }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { - return scan_valid_records_ret( - scan_valid_records_ertr::ready_future_marker{}, - std::move(*retref)); - }); -} - -ExtentReader::read_validate_record_metadata_ret -ExtentReader::read_validate_record_metadata( - paddr_t start, - segment_nonce_t nonce) -{ - LOG_PREFIX(ExtentReader::read_validate_record_metadata); - auto& seg_addr = start.as_seg_paddr(); - auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()]; - auto block_size = segment_manager.get_block_size(); - auto segment_size = static_cast(segment_manager.get_segment_size()); - if (seg_addr.get_segment_off() + block_size > segment_size) { - DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size); - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - TRACE("reading record group header block {}~4096", start); - return segment_manager.read(start, block_size - ).safe_then([=, &segment_manager](bufferptr bptr) mutable - -> read_validate_record_metadata_ret { - auto block_size = static_cast( - segment_manager.get_block_size()); - bufferlist bl; - bl.append(bptr); - auto maybe_header = try_decode_records_header(bl, nonce); - if (!maybe_header.has_value()) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - auto& seg_addr = start.as_seg_paddr(); - auto& header = *maybe_header; - if (header.mdlength < block_size || - header.mdlength % block_size != 0 || - header.dlength % block_size != 0 || - (header.committed_to != JOURNAL_SEQ_NULL && - header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) || - (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) { - ERROR("failed, invalid record group header {}", start); - return crimson::ct_error::input_output_error::make(); - } - if (header.mdlength == block_size) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl)) - ); - } - - auto rest_start = paddr_t::make_seg_paddr( - seg_addr.get_segment_id(), - seg_addr.get_segment_off() + (seastore_off_t)block_size - ); - auto rest_len = header.mdlength - block_size; - TRACE("reading record group header rest {}~{}", rest_start, rest_len); - return segment_manager.read(rest_start, rest_len - ).safe_then([header=std::move(header), bl=std::move(bl) - ](auto&& bptail) mutable { - bl.push_back(bptail); - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::make_pair(std::move(header), std::move(bl))); - }); - }).safe_then([](auto p) { - if (p && validate_records_metadata(p->second)) { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::move(*p) - ); - } else { - return read_validate_record_metadata_ret( - read_validate_record_metadata_ertr::ready_future_marker{}, - std::nullopt); - } - }); -} - -ExtentReader::read_validate_data_ret -ExtentReader::read_validate_data( - paddr_t record_base, - const record_group_header_t &header) -{ - LOG_PREFIX(ExtentReader::read_validate_data); - auto& segment_manager = *segment_managers[record_base.get_device_id()]; - auto data_addr = record_base.add_offset(header.mdlength); - TRACE("reading record group data blocks {}~{}", data_addr, header.dlength); - return segment_manager.read( - data_addr, - header.dlength - ).safe_then([=, &header](auto bptr) { - bufferlist bl; - bl.append(bptr); - return validate_records_data(header, bl); - }); -} - -ExtentReader::consume_record_group_ertr::future<> -ExtentReader::consume_next_records( - scan_valid_records_cursor& cursor, - found_record_handler_t& handler, - std::size_t& budget_used) -{ - LOG_PREFIX(ExtentReader::consume_next_records); - auto& next = cursor.pending_record_groups.front(); - auto total_length = next.header.dlength + next.header.mdlength; - budget_used += total_length; - auto locator = record_locator_t{ - next.offset.add_offset(next.header.mdlength), - write_result_t{ - journal_seq_t{ - cursor.seq.segment_seq, - next.offset - }, - static_cast(total_length) - } - }; - DEBUG("processing {} at {}, budget_used={}", - next.header, locator, budget_used); - return handler( - locator, - next.header, - next.mdbuffer - ).safe_then([FNAME, &cursor] { - cursor.pop_record_group(); - if (cursor.is_complete()) { - INFO("complete at {}, no more record group", cursor); - } - }); -} - -} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/extent_reader.h b/src/crimson/os/seastore/extent_reader.h deleted file mode 100644 index b32ae81e64b..00000000000 --- a/src/crimson/os/seastore/extent_reader.h +++ /dev/null @@ -1,127 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- -// vim: ts=8 sw=2 smarttab expandtab - -#pragma once - -#include "crimson/common/errorator.h" -#include "crimson/os/seastore/seastore_types.h" -#include "crimson/os/seastore/segment_manager.h" -#include "crimson/os/seastore/logging.h" - -namespace crimson::os::seastore { - -class SegmentCleaner; -class TransactionManager; - -class ExtentReader { -public: - std::vector& get_segment_managers() { - return segment_managers; - } - - using read_ertr = SegmentManager::read_ertr; - ExtentReader() { - segment_managers.resize(DEVICE_ID_MAX, nullptr); - } - using read_segment_header_ertr = crimson::errorator< - crimson::ct_error::enoent, - crimson::ct_error::enodata, - crimson::ct_error::input_output_error - >; - using read_segment_header_ret = read_segment_header_ertr::future< - segment_header_t>; - read_segment_header_ret read_segment_header(segment_id_t segment); - - using read_segment_tail_ertr = read_segment_header_ertr; - using read_segment_tail_ret = read_segment_tail_ertr::future< - segment_tail_t>; - read_segment_tail_ret read_segment_tail(segment_id_t segment); - - struct commit_info_t { - mod_time_point_t commit_time; - record_commit_type_t commit_type; - }; - - /** - * scan_extents - * - * Scans records beginning at addr until the first record boundary after - * addr + bytes_to_read. - * - * Returns list - * cursor.is_complete() will be true when no further extents exist in segment. - */ - using scan_extents_cursor = scan_valid_records_cursor; - using scan_extents_ertr = read_ertr::extend; - using scan_extents_ret_bare = - std::list>>; - using scan_extents_ret = scan_extents_ertr::future; - scan_extents_ret scan_extents( - scan_extents_cursor &cursor, - extent_len_t bytes_to_read - ); - - using scan_valid_records_ertr = read_ertr::extend; - using scan_valid_records_ret = scan_valid_records_ertr::future< - size_t>; - using found_record_handler_t = std::function< - scan_valid_records_ertr::future<>( - record_locator_t record_locator, - // callee may assume header and bl will remain valid until - // returned future resolves - const record_group_header_t &header, - const bufferlist &mdbuf)>; - scan_valid_records_ret scan_valid_records( - scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call - segment_nonce_t nonce, ///< [in] nonce for segment - size_t budget, ///< [in] max budget to use - found_record_handler_t &handler ///< [in] handler for records - ); ///< @return used budget - - using release_ertr = SegmentManager::release_ertr; - release_ertr::future<> release_segment(segment_id_t id) { - assert(segment_managers[id.device_id()] != nullptr); - return segment_managers[id.device_id()]->release(id); - } - - void add_segment_manager(SegmentManager* segment_manager) { - ceph_assert(!segment_managers[segment_manager->get_device_id()]); - segment_managers[segment_manager->get_device_id()] = segment_manager; - } - - void reset() { - segment_managers.clear(); - segment_managers.resize(DEVICE_ID_MAX, nullptr); - } - -private: - std::vector segment_managers; - /// read record metadata for record starting at start - using read_validate_record_metadata_ertr = read_ertr; - using read_validate_record_metadata_ret = - read_validate_record_metadata_ertr::future< - std::optional> - >; - read_validate_record_metadata_ret read_validate_record_metadata( - paddr_t start, - segment_nonce_t nonce); - - /// read and validate data - using read_validate_data_ertr = read_ertr; - using read_validate_data_ret = read_validate_data_ertr::future; - read_validate_data_ret read_validate_data( - paddr_t record_base, - const record_group_header_t &header ///< caller must ensure lifetime through - /// future resolution - ); - - using consume_record_group_ertr = scan_valid_records_ertr; - consume_record_group_ertr::future<> consume_next_records( - scan_valid_records_cursor& cursor, - found_record_handler_t& handler, - std::size_t& budget_used); -}; - -using ExtentReaderRef = std::unique_ptr; - -} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index 56e582390dc..2ed4d49739a 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -8,10 +8,10 @@ namespace crimson::os::seastore::journal { JournalRef make_segmented( SegmentManager &sm, - ExtentReader &reader, + SegmentManagerGroup &sms, SegmentProvider &provider) { - return std::make_unique(sm, reader, provider); + return std::make_unique(sm, sms, provider); } } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 77ecc34df93..b0448d7e950 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -16,7 +16,7 @@ class NVMeBlockDevice; } class SegmentManager; -class ExtentReader; +class SegmentManagerGroup; class SegmentProvider; class Journal { @@ -93,7 +93,7 @@ namespace journal { JournalRef make_segmented( SegmentManager &sm, - ExtentReader &reader, + SegmentManagerGroup &sms, SegmentProvider &provider); } diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index e4a29d795ef..18b29e50fab 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -28,7 +28,7 @@ namespace crimson::os::seastore::journal { SegmentedJournal::SegmentedJournal( SegmentManager &segment_manager, - ExtentReader &scanner, + SegmentManagerGroup &sms, SegmentProvider &segment_provider) : segment_provider(segment_provider), segment_seq_allocator( @@ -47,7 +47,7 @@ SegmentedJournal::SegmentedJournal( crimson::common::get_conf( "seastore_journal_batch_preferred_fullness"), journal_segment_allocator), - scanner(scanner) + sms(sms) { } @@ -150,12 +150,12 @@ SegmentedJournal::replay_segment( INFO("starting at {} -- {}", seq, header); return seastar::do_with( scan_valid_records_cursor(seq), - ExtentReader::found_record_handler_t( + SegmentManagerGroup::found_record_handler_t( [s_type=header.type, &handler, this]( record_locator_t locator, const record_group_header_t& header, const bufferlist& mdbuf) - -> ExtentReader::scan_valid_records_ertr::future<> + -> SegmentManagerGroup::scan_valid_records_ertr::future<> { LOG_PREFIX(Journal::replay_segment); auto maybe_record_deltas_list = try_decode_deltas( @@ -233,7 +233,7 @@ SegmentedJournal::replay_segment( }); }), [=](auto &cursor, auto &dhandler) { - return scanner.scan_valid_records( + return sms.scan_valid_records( cursor, header.segment_nonce, std::numeric_limits::max(), @@ -262,7 +262,7 @@ SegmentedJournal::find_journal_segments() segment_id_t segment_id{ journal_segment_allocator.get_device_id(), d_segment_id}; - return scanner.read_segment_header( + return sms.read_segment_header( segment_id ).safe_then([segment_id, &ret](auto &&header) { if (header.get_type() == segment_type_t::JOURNAL) { diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index e571583bcb9..973d04fd6f1 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -11,7 +11,7 @@ #include "crimson/os/seastore/segment_cleaner.h" #include "crimson/os/seastore/journal.h" -#include "crimson/os/seastore/extent_reader.h" +#include "crimson/os/seastore/segment_manager_group.h" #include "crimson/os/seastore/ordering_handle.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/osd/exceptions.h" @@ -26,7 +26,7 @@ class SegmentedJournal : public Journal { public: SegmentedJournal( SegmentManager &segment_manager, - ExtentReader& scanner, + SegmentManagerGroup& sms, SegmentProvider& cleaner); ~SegmentedJournal() {} @@ -56,10 +56,10 @@ private: SegmentSeqAllocatorRef segment_seq_allocator; SegmentAllocator journal_segment_allocator; RecordSubmitter record_submitter; - ExtentReader& scanner; + SegmentManagerGroup& sms; WritePipeline* write_pipeline = nullptr; - /// read journal segment headers from scanner + /// read journal segment headers from sms using find_journal_segments_ertr = crimson::errorator< crimson::ct_error::input_output_error>; using find_journal_segments_ret_bare = std::vector< diff --git a/src/crimson/os/seastore/segment_cleaner.cc b/src/crimson/os/seastore/segment_cleaner.cc index f629f7e8181..085fe74994f 100644 --- a/src/crimson/os/seastore/segment_cleaner.cc +++ b/src/crimson/os/seastore/segment_cleaner.cc @@ -178,11 +178,11 @@ void SpaceTrackerSimple::dump_usage(segment_id_t id) const SegmentCleaner::SegmentCleaner( config_t config, - ExtentReaderRef&& scr, + SegmentManagerGroupRef&& sm_group, bool detailed) : detailed(detailed), config(config), - scanner(std::move(scr)), + sm_group(std::move(sm_group)), ool_segment_seq_allocator( new SegmentSeqAllocator(segment_type_t::OOL)), gc_process(*this) @@ -391,7 +391,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() return seastar::now(); } scan_cursor = - std::make_unique( + std::make_unique( next); logger().debug( "SegmentCleaner::do_gc: starting gc on segment {}", @@ -400,7 +400,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() ceph_assert(!scan_cursor->is_complete()); } - return scanner->scan_extents( + return sm_group->scan_extents( *scan_cursor, config.reclaim_bytes_stride ).safe_then([this](auto &&_extents) { @@ -505,7 +505,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space() SegmentCleaner::mount_ret SegmentCleaner::mount( device_id_t pdevice_id) { - auto& sms = scanner->get_segment_managers(); + auto& sms = sm_group->get_segment_managers(); logger().debug( "SegmentCleaner::mount: {} segment managers", sms.size()); init_complete = false; @@ -545,20 +545,20 @@ SegmentCleaner::mount_ret SegmentCleaner::mount( segments.end(), [this, &segment_set](auto& it) { auto segment_id = it.first; - return scanner->read_segment_header( + return sm_group->read_segment_header( segment_id ).safe_then([segment_id, this, &segment_set](auto header) { logger().debug( - "ExtentReader::mount: segment_id={} -- {}", + "SegmentCleaner::mount: segment_id={} -- {}", segment_id, header); auto s_type = header.get_type(); if (s_type == segment_type_t::NULL_SEG) { logger().error( - "ExtentReader::mount: got null segment, segment_id={} -- {}", + "SegmentCleaner::mount: got null segment, segment_id={} -- {}", segment_id, header); ceph_abort(); } - return scanner->read_segment_tail( + return sm_group->read_segment_tail( segment_id ).safe_then([this, segment_id, &segment_set, header](auto tail) -> scan_extents_ertr::future<> { @@ -608,7 +608,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment( { if (header.get_type() == segment_type_t::OOL) { logger().info( - "ExtentReader::init_segments: out-of-line segment {}", + "SegmentCleaner::scan_nonfull_segment: out-of-line segment {}", segment_id); return seastar::do_with( scan_valid_records_cursor({ @@ -616,11 +616,11 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment( paddr_t::make_seg_paddr(segment_id, 0)}), [this, segment_id, header](auto& cursor) { return seastar::do_with( - ExtentReader::found_record_handler_t([this, segment_id]( + SegmentManagerGroup::found_record_handler_t([this, segment_id]( record_locator_t locator, const record_group_header_t& header, const bufferlist& mdbuf - ) mutable -> ExtentReader::scan_valid_records_ertr::future<> { + ) mutable -> SegmentManagerGroup::scan_valid_records_ertr::future<> { LOG_PREFIX(SegmentCleaner::scan_nonfull_segment); DEBUG("decodeing {} records", header.records); auto maybe_headers = try_decode_record_headers(header, mdbuf); @@ -634,7 +634,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment( mod_time_point_t ctime = header.commit_time; auto commit_type = header.commit_type; if (!ctime) { - ERROR("Scanner::init_segments: extent {} 0 commit_time", + ERROR("SegmentCleaner::scan_nonfull_segment: extent {} 0 commit_time", ctime); ceph_abort("0 commit_time"); } @@ -654,7 +654,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment( return seastar::now(); }), [&cursor, header, segment_id, this](auto& handler) { - return scanner->scan_valid_records( + return sm_group->scan_valid_records( cursor, header.segment_nonce, segments[segment_id.device_id()]->segment_size, @@ -670,7 +670,7 @@ SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment( }); } else if (header.get_type() == segment_type_t::JOURNAL) { logger().info( - "ExtentReader::init_segments: journal segment {}", + "SEgmentCleaner::scan_nonfull_segment: journal segment {}", segment_id); segment_set.emplace_back(std::make_pair(segment_id, std::move(header))); } else { @@ -690,7 +690,7 @@ SegmentCleaner::maybe_release_segment(Transaction &t) if (to_release != NULL_SEG_ID) { LOG_PREFIX(SegmentCleaner::maybe_release_segment); INFOT("releasing segment {}", t, to_release); - return scanner->release_segment(to_release + return sm_group->release_segment(to_release ).safe_then([this, to_release] { stats.segments_released++; mark_empty(to_release); diff --git a/src/crimson/os/seastore/segment_cleaner.h b/src/crimson/os/seastore/segment_cleaner.h index e703b19793d..df7c225ef8a 100644 --- a/src/crimson/os/seastore/segment_cleaner.h +++ b/src/crimson/os/seastore/segment_cleaner.h @@ -12,9 +12,9 @@ #include "crimson/common/log.h" #include "crimson/os/seastore/cached_extent.h" -#include "crimson/os/seastore/extent_reader.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/os/seastore/segment_manager.h" +#include "crimson/os/seastore/segment_manager_group.h" #include "crimson/os/seastore/transaction.h" #include "crimson/os/seastore/segment_seq_allocator.h" @@ -667,7 +667,7 @@ private: const bool detailed; const config_t config; - ExtentReaderRef scanner; + SegmentManagerGroupRef sm_group; SpaceTrackerIRef space_tracker; segment_info_set_t segments; @@ -716,7 +716,7 @@ private: public: SegmentCleaner( config_t config, - ExtentReaderRef&& scanner, + SegmentManagerGroupRef&& sm_group, bool detailed = false); SegmentSeqAllocator& get_ool_segment_seq_allocator() { @@ -766,7 +766,7 @@ public: return segments[id].get_type(); } - using release_ertr = ExtentReader::release_ertr; + using release_ertr = SegmentManagerGroup::release_ertr; release_ertr::future<> maybe_release_segment(Transaction &t); void adjust_segment_util(double old_usage, double new_usage) { @@ -968,7 +968,7 @@ private: // GC status helpers std::unique_ptr< - ExtentReader::scan_extents_cursor + SegmentManagerGroup::scan_extents_cursor > scan_cursor; /** @@ -1046,7 +1046,7 @@ private: } gc_process; using gc_ertr = work_ertr::extend_ertr< - ExtentReader::scan_extents_ertr + SegmentManagerGroup::scan_extents_ertr >; gc_cycle_ret do_gc_cycle(); @@ -1278,7 +1278,7 @@ private: using scan_extents_ret_bare = std::vector>; - using scan_extents_ertr = ExtentReader::scan_extents_ertr; + using scan_extents_ertr = SegmentManagerGroup::scan_extents_ertr; using scan_extents_ret = scan_extents_ertr::future<>; scan_extents_ret scan_nonfull_segment( const segment_header_t& header, diff --git a/src/crimson/os/seastore/segment_manager_group.cc b/src/crimson/os/seastore/segment_manager_group.cc new file mode 100644 index 00000000000..3e53021d0e9 --- /dev/null +++ b/src/crimson/os/seastore/segment_manager_group.cc @@ -0,0 +1,385 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "crimson/os/seastore/segment_manager_group.h" + +#include "crimson/os/seastore/logging.h" + +SET_SUBSYS(seastore_journal); + +namespace crimson::os::seastore { + +SegmentManagerGroup::read_segment_tail_ret +SegmentManagerGroup::read_segment_tail(segment_id_t segment) +{ + auto& segment_manager = *segment_managers[segment.device_id()]; + return segment_manager.read( + paddr_t::make_seg_paddr( + segment, + segment_manager.get_segment_size() - + segment_manager.get_rounded_tail_length()), + segment_manager.get_rounded_tail_length() + ).handle_error( + read_segment_header_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentManagerGroup::read_segment_tail" + } + ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret { + LOG_PREFIX(SegmentManagerGroup::read_segment_tail); + DEBUG("segment {} bptr size {}", segment, bptr.length()); + + segment_tail_t tail; + bufferlist bl; + bl.push_back(bptr); + + DEBUG("segment {} block crc {}", + segment, + bl.begin().crc32c(segment_manager.get_block_size(), 0)); + + auto bp = bl.cbegin(); + try { + decode(tail, bp); + } catch (ceph::buffer::error &e) { + DEBUG("segment {} unable to decode tail, skipping -- {}", + segment, e); + return crimson::ct_error::enodata::make(); + } + DEBUG("segment {} tail {}", segment, tail); + return read_segment_tail_ret( + read_segment_tail_ertr::ready_future_marker{}, + tail); + }); +} + +SegmentManagerGroup::read_segment_header_ret +SegmentManagerGroup::read_segment_header(segment_id_t segment) +{ + auto& segment_manager = *segment_managers[segment.device_id()]; + return segment_manager.read( + paddr_t::make_seg_paddr(segment, 0), + segment_manager.get_block_size() + ).handle_error( + read_segment_header_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentManagerGroup::read_segment_header" + } + ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret { + LOG_PREFIX(SegmentManagerGroup::read_segment_header); + DEBUG("segment {} bptr size {}", segment, bptr.length()); + + segment_header_t header; + bufferlist bl; + bl.push_back(bptr); + + DEBUG("segment {} block crc {}", + segment, + bl.begin().crc32c(segment_manager.get_block_size(), 0)); + + auto bp = bl.cbegin(); + try { + decode(header, bp); + } catch (ceph::buffer::error &e) { + DEBUG("segment {} unable to decode header, skipping -- {}", + segment, e); + return crimson::ct_error::enodata::make(); + } + DEBUG("segment {} header {}", segment, header); + return read_segment_header_ret( + read_segment_header_ertr::ready_future_marker{}, + header); + }); +} + +SegmentManagerGroup::scan_extents_ret +SegmentManagerGroup::scan_extents( + scan_extents_cursor &cursor, + extent_len_t bytes_to_read) +{ + auto ret = std::make_unique(); + auto* extents = ret.get(); + return read_segment_header(cursor.get_segment_id() + ).handle_error( + scan_extents_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentManagerGroup::scan_extents" + } + ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) { + auto segment_nonce = segment_header.segment_nonce; + return seastar::do_with( + found_record_handler_t([extents]( + record_locator_t locator, + const record_group_header_t& header, + const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<> + { + LOG_PREFIX(SegmentManagerGroup::scan_extents); + DEBUG("decoding {} records", header.records); + auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf); + if (!maybe_record_extent_infos) { + // This should be impossible, we did check the crc on the mdbuf + ERROR("unable to decode extents for record {}", + locator.record_block_base); + return crimson::ct_error::input_output_error::make(); + } + + paddr_t extent_offset = locator.record_block_base; + for (auto& r: *maybe_record_extent_infos) { + DEBUG("decoded {} extents", r.extent_infos.size()); + for (const auto &i : r.extent_infos) { + extents->emplace_back( + extent_offset, + std::pair( + {r.header.commit_time, + r.header.commit_type}, + i)); + auto& seg_addr = extent_offset.as_seg_paddr(); + seg_addr.set_segment_off( + seg_addr.get_segment_off() + i.len); + } + } + return scan_extents_ertr::now(); + }), + [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) { + return scan_valid_records( + cursor, + segment_nonce, + bytes_to_read, + dhandler + ).discard_result(); + } + ); + }).safe_then([ret=std::move(ret)] { + return std::move(*ret); + }); +} + +SegmentManagerGroup::scan_valid_records_ret +SegmentManagerGroup::scan_valid_records( + scan_valid_records_cursor &cursor, + segment_nonce_t nonce, + size_t budget, + found_record_handler_t &handler) +{ + LOG_PREFIX(SegmentManagerGroup::scan_valid_records); + auto& segment_manager = + *segment_managers[cursor.get_segment_id().device_id()]; + if (cursor.get_segment_offset() == 0) { + INFO("start to scan segment {}", cursor.get_segment_id()); + cursor.increment_seq(segment_manager.get_block_size()); + } + DEBUG("starting at {}, budget={}", cursor, budget); + auto retref = std::make_unique(0); + auto &budget_used = *retref; + return crimson::repeat( + [=, &cursor, &budget_used, &handler]() mutable + -> scan_valid_records_ertr::future { + return [=, &handler, &cursor, &budget_used] { + if (!cursor.last_valid_header_found) { + return read_validate_record_metadata(cursor.seq.offset, nonce + ).safe_then([=, &cursor](auto md) { + if (!md) { + cursor.last_valid_header_found = true; + if (cursor.is_complete()) { + INFO("complete at {}, invalid record group metadata", + cursor); + } else { + DEBUG("found invalid record group metadata at {}, " + "processing {} pending record groups", + cursor.seq, + cursor.pending_record_groups.size()); + } + return scan_valid_records_ertr::now(); + } else { + auto& [header, md_bl] = *md; + DEBUG("found valid {} at {}", header, cursor.seq); + cursor.emplace_record_group(header, std::move(md_bl)); + return scan_valid_records_ertr::now(); + } + }).safe_then([=, &cursor, &budget_used, &handler] { + DEBUG("processing committed record groups until {}, {} pending", + cursor.last_committed, + cursor.pending_record_groups.size()); + return crimson::repeat( + [=, &budget_used, &cursor, &handler] { + if (cursor.pending_record_groups.empty()) { + /* This is only possible if the segment is empty. + * A record's last_commited must be prior to its own + * location since it itself cannot yet have been committed + * at its own time of submission. Thus, the most recently + * read record must always fall after cursor.last_committed */ + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + auto &next = cursor.pending_record_groups.front(); + journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset}; + if (cursor.last_committed == JOURNAL_SEQ_NULL || + next_seq > cursor.last_committed) { + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + } + return consume_next_records(cursor, handler, budget_used + ).safe_then([] { + return scan_valid_records_ertr::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + }); + }); + } else { + assert(!cursor.pending_record_groups.empty()); + auto &next = cursor.pending_record_groups.front(); + return read_validate_data(next.offset, next.header + ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) { + if (!valid) { + INFO("complete at {}, invalid record group data at {}, {}", + cursor, next.offset, next.header); + cursor.pending_record_groups.clear(); + return scan_valid_records_ertr::now(); + } + return consume_next_records(cursor, handler, budget_used); + }); + } + }().safe_then([=, &budget_used, &cursor] { + if (cursor.is_complete() || budget_used >= budget) { + DEBUG("finish at {}, budget_used={}, budget={}", + cursor, budget_used, budget); + return seastar::stop_iteration::yes; + } else { + return seastar::stop_iteration::no; + } + }); + }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { + return scan_valid_records_ret( + scan_valid_records_ertr::ready_future_marker{}, + std::move(*retref)); + }); +} + +SegmentManagerGroup::read_validate_record_metadata_ret +SegmentManagerGroup::read_validate_record_metadata( + paddr_t start, + segment_nonce_t nonce) +{ + LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata); + auto& seg_addr = start.as_seg_paddr(); + auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()]; + auto block_size = segment_manager.get_block_size(); + auto segment_size = static_cast(segment_manager.get_segment_size()); + if (seg_addr.get_segment_off() + block_size > segment_size) { + DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size); + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + TRACE("reading record group header block {}~4096", start); + return segment_manager.read(start, block_size + ).safe_then([=, &segment_manager](bufferptr bptr) mutable + -> read_validate_record_metadata_ret { + auto block_size = static_cast( + segment_manager.get_block_size()); + bufferlist bl; + bl.append(bptr); + auto maybe_header = try_decode_records_header(bl, nonce); + if (!maybe_header.has_value()) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + auto& seg_addr = start.as_seg_paddr(); + auto& header = *maybe_header; + if (header.mdlength < block_size || + header.mdlength % block_size != 0 || + header.dlength % block_size != 0 || + (header.committed_to != JOURNAL_SEQ_NULL && + header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) || + (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) { + ERROR("failed, invalid record group header {}", start); + return crimson::ct_error::input_output_error::make(); + } + if (header.mdlength == block_size) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl)) + ); + } + + auto rest_start = paddr_t::make_seg_paddr( + seg_addr.get_segment_id(), + seg_addr.get_segment_off() + (seastore_off_t)block_size + ); + auto rest_len = header.mdlength - block_size; + TRACE("reading record group header rest {}~{}", rest_start, rest_len); + return segment_manager.read(rest_start, rest_len + ).safe_then([header=std::move(header), bl=std::move(bl) + ](auto&& bptail) mutable { + bl.push_back(bptail); + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::make_pair(std::move(header), std::move(bl))); + }); + }).safe_then([](auto p) { + if (p && validate_records_metadata(p->second)) { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::move(*p) + ); + } else { + return read_validate_record_metadata_ret( + read_validate_record_metadata_ertr::ready_future_marker{}, + std::nullopt); + } + }); +} + +SegmentManagerGroup::read_validate_data_ret +SegmentManagerGroup::read_validate_data( + paddr_t record_base, + const record_group_header_t &header) +{ + LOG_PREFIX(SegmentManagerGroup::read_validate_data); + auto& segment_manager = *segment_managers[record_base.get_device_id()]; + auto data_addr = record_base.add_offset(header.mdlength); + TRACE("reading record group data blocks {}~{}", data_addr, header.dlength); + return segment_manager.read( + data_addr, + header.dlength + ).safe_then([=, &header](auto bptr) { + bufferlist bl; + bl.append(bptr); + return validate_records_data(header, bl); + }); +} + +SegmentManagerGroup::consume_record_group_ertr::future<> +SegmentManagerGroup::consume_next_records( + scan_valid_records_cursor& cursor, + found_record_handler_t& handler, + std::size_t& budget_used) +{ + LOG_PREFIX(SegmentManagerGroup::consume_next_records); + auto& next = cursor.pending_record_groups.front(); + auto total_length = next.header.dlength + next.header.mdlength; + budget_used += total_length; + auto locator = record_locator_t{ + next.offset.add_offset(next.header.mdlength), + write_result_t{ + journal_seq_t{ + cursor.seq.segment_seq, + next.offset + }, + static_cast(total_length) + } + }; + DEBUG("processing {} at {}, budget_used={}", + next.header, locator, budget_used); + return handler( + locator, + next.header, + next.mdbuffer + ).safe_then([FNAME, &cursor] { + cursor.pop_record_group(); + if (cursor.is_complete()) { + INFO("complete at {}, no more record group", cursor); + } + }); +} + +} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/segment_manager_group.h b/src/crimson/os/seastore/segment_manager_group.h new file mode 100644 index 00000000000..522eed1557d --- /dev/null +++ b/src/crimson/os/seastore/segment_manager_group.h @@ -0,0 +1,123 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/seastore_types.h" +#include "crimson/os/seastore/segment_manager.h" + +namespace crimson::os::seastore { + +class SegmentManagerGroup { +public: + std::vector& get_segment_managers() { + return segment_managers; + } + + using read_ertr = SegmentManager::read_ertr; + SegmentManagerGroup() { + segment_managers.resize(DEVICE_ID_MAX, nullptr); + } + using read_segment_header_ertr = crimson::errorator< + crimson::ct_error::enoent, + crimson::ct_error::enodata, + crimson::ct_error::input_output_error + >; + using read_segment_header_ret = read_segment_header_ertr::future< + segment_header_t>; + read_segment_header_ret read_segment_header(segment_id_t segment); + + using read_segment_tail_ertr = read_segment_header_ertr; + using read_segment_tail_ret = read_segment_tail_ertr::future< + segment_tail_t>; + read_segment_tail_ret read_segment_tail(segment_id_t segment); + + struct commit_info_t { + mod_time_point_t commit_time; + record_commit_type_t commit_type; + }; + + /** + * scan_extents + * + * Scans records beginning at addr until the first record boundary after + * addr + bytes_to_read. + * + * Returns list + * cursor.is_complete() will be true when no further extents exist in segment. + */ + using scan_extents_cursor = scan_valid_records_cursor; + using scan_extents_ertr = read_ertr::extend; + using scan_extents_ret_bare = + std::list>>; + using scan_extents_ret = scan_extents_ertr::future; + scan_extents_ret scan_extents( + scan_extents_cursor &cursor, + extent_len_t bytes_to_read + ); + + using scan_valid_records_ertr = read_ertr::extend; + using scan_valid_records_ret = scan_valid_records_ertr::future< + size_t>; + using found_record_handler_t = std::function< + scan_valid_records_ertr::future<>( + record_locator_t record_locator, + // callee may assume header and bl will remain valid until + // returned future resolves + const record_group_header_t &header, + const bufferlist &mdbuf)>; + scan_valid_records_ret scan_valid_records( + scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call + segment_nonce_t nonce, ///< [in] nonce for segment + size_t budget, ///< [in] max budget to use + found_record_handler_t &handler ///< [in] handler for records + ); ///< @return used budget + + using release_ertr = SegmentManager::release_ertr; + release_ertr::future<> release_segment(segment_id_t id) { + assert(segment_managers[id.device_id()] != nullptr); + return segment_managers[id.device_id()]->release(id); + } + + void add_segment_manager(SegmentManager* segment_manager) { + ceph_assert(!segment_managers[segment_manager->get_device_id()]); + segment_managers[segment_manager->get_device_id()] = segment_manager; + } + + void reset() { + segment_managers.clear(); + segment_managers.resize(DEVICE_ID_MAX, nullptr); + } + +private: + std::vector segment_managers; + /// read record metadata for record starting at start + using read_validate_record_metadata_ertr = read_ertr; + using read_validate_record_metadata_ret = + read_validate_record_metadata_ertr::future< + std::optional> + >; + read_validate_record_metadata_ret read_validate_record_metadata( + paddr_t start, + segment_nonce_t nonce); + + /// read and validate data + using read_validate_data_ertr = read_ertr; + using read_validate_data_ret = read_validate_data_ertr::future; + read_validate_data_ret read_validate_data( + paddr_t record_base, + const record_group_header_t &header ///< caller must ensure lifetime through + /// future resolution + ); + + using consume_record_group_ertr = scan_valid_records_ertr; + consume_record_group_ertr::future<> consume_next_records( + scan_valid_records_cursor& cursor, + found_record_handler_t& handler, + std::size_t& budget_used); +}; + +using SegmentManagerGroupRef = std::unique_ptr; + +} // namespace crimson::os::seastore diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index b922aedb552..76800702723 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -27,13 +27,13 @@ TransactionManager::TransactionManager( CacheRef _cache, LBAManagerRef _lba_manager, ExtentPlacementManagerRef&& epm, - ExtentReader& scanner) + SegmentManagerGroup& sms) : segment_cleaner(std::move(_segment_cleaner)), cache(std::move(_cache)), lba_manager(std::move(_lba_manager)), journal(std::move(_journal)), epm(std::move(epm)), - scanner(scanner) + sms(sms) { segment_cleaner->set_extent_callback(this); journal->set_write_pipeline(&write_pipeline); @@ -156,7 +156,7 @@ TransactionManager::close_ertr::future<> TransactionManager::close() { cache->dump_contents(); return journal->close(); }).safe_then([this] { - scanner.reset(); + sms.reset(); return epm->close(); }).safe_then([FNAME] { INFO("completed"); @@ -545,16 +545,16 @@ TransactionManagerRef make_transaction_manager( Device &device, bool detailed) { - auto scanner = std::make_unique(); - auto& scanner_ref = *scanner.get(); + auto sms = std::make_unique(); + auto& sms_ref = *sms.get(); auto segment_cleaner = std::make_unique( SegmentCleaner::config_t::get_default(), - std::move(scanner), + std::move(sms), detailed); ceph_assert(device.get_device_type() == device_type_t::SEGMENTED); auto sm = dynamic_cast(&device); ceph_assert(sm != nullptr); - auto journal = journal::make_segmented(*sm, scanner_ref, *segment_cleaner); + auto journal = journal::make_segmented(*sm, sms_ref, *segment_cleaner); auto epm = std::make_unique(); auto cache = std::make_unique(*epm); auto lba_manager = lba_manager::create_lba_manager(*cache); @@ -565,7 +565,7 @@ TransactionManagerRef make_transaction_manager( std::move(cache), std::move(lba_manager), std::move(epm), - scanner_ref); + sms_ref); } } diff --git a/src/crimson/os/seastore/transaction_manager.h b/src/crimson/os/seastore/transaction_manager.h index cc54ba2fb15..2e5385025b2 100644 --- a/src/crimson/os/seastore/transaction_manager.h +++ b/src/crimson/os/seastore/transaction_manager.h @@ -28,6 +28,7 @@ #include "crimson/os/seastore/journal.h" #include "crimson/os/seastore/extent_placement_manager.h" #include "crimson/os/seastore/device.h" +#include "crimson/os/seastore/segment_manager_group.h" namespace crimson::os::seastore { class Journal; @@ -69,7 +70,7 @@ public: CacheRef cache, LBAManagerRef lba_manager, ExtentPlacementManagerRef&& epm, - ExtentReader& scanner); + SegmentManagerGroup& sms); /// Writes initial metadata to disk using mkfs_ertr = base_ertr; @@ -548,7 +549,7 @@ public: *segment_cleaner, *sm, segment_cleaner->get_ool_segment_seq_allocator())); - scanner.add_segment_manager(sm); + sms.add_segment_manager(sm); } ~TransactionManager(); @@ -561,7 +562,7 @@ private: LBAManagerRef lba_manager; JournalRef journal; ExtentPlacementManagerRef epm; - ExtentReader& scanner; + SegmentManagerGroup& sms; WritePipeline write_pipeline; diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index 63261111b60..66f405006e9 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -28,7 +28,7 @@ struct btree_test_base : public seastar_test_suite_t, SegmentProvider { segment_manager::EphemeralSegmentManagerRef segment_manager; - ExtentReaderRef scanner; + SegmentManagerGroupRef sms; JournalRef journal; ExtentPlacementManagerRef epm; CacheRef cache; @@ -98,16 +98,16 @@ struct btree_test_base : virtual LBAManager::mkfs_ret test_structure_setup(Transaction &t) = 0; seastar::future<> set_up_fut() final { segment_manager = segment_manager::create_test_ephemeral(); - scanner.reset(new ExtentReader()); - auto& scanner_ref = *scanner.get(); + sms.reset(new SegmentManagerGroup()); + auto& sms_ref = *sms.get(); journal = journal::make_segmented( - *segment_manager, scanner_ref, *this); + *segment_manager, sms_ref, *this); epm.reset(new ExtentPlacementManager()); cache.reset(new Cache(*epm)); block_size = segment_manager->get_block_size(); next = segment_id_t{segment_manager->get_device_id(), 0}; - scanner_ref.add_segment_manager(segment_manager.get()); + sms_ref.add_segment_manager(segment_manager.get()); epm->add_device(segment_manager.get(), true); journal->set_write_pipeline(&pipeline); @@ -148,7 +148,7 @@ struct btree_test_base : }).safe_then([this] { test_structure_reset(); segment_manager.reset(); - scanner.reset(); + sms.reset(); journal.reset(); epm.reset(); cache.reset(); diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc index 3599dc16033..0daae06f734 100644 --- a/src/test/crimson/seastore/test_seastore_journal.cc +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -76,7 +76,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { seastore_off_t block_size; - ExtentReaderRef scanner; + SegmentManagerGroupRef sms; segment_id_t next; @@ -125,11 +125,11 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { seastar::future<> set_up_fut() final { segment_manager = segment_manager::create_test_ephemeral(); block_size = segment_manager->get_block_size(); - scanner.reset(new ExtentReader()); + sms.reset(new SegmentManagerGroup()); next = segment_id_t(segment_manager->get_device_id(), 0); - journal = journal::make_segmented(*segment_manager, *scanner, *this); + journal = journal::make_segmented(*segment_manager, *sms, *this); journal->set_write_pipeline(&pipeline); - scanner->add_segment_manager(segment_manager.get()); + sms->add_segment_manager(segment_manager.get()); return segment_manager->init( ).safe_then([this] { return journal->open_for_write(); @@ -144,7 +144,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { return journal->close( ).safe_then([this] { segment_manager.reset(); - scanner.reset(); + sms.reset(); journal.reset(); }).handle_error( crimson::ct_error::all_same_way([](auto e) { @@ -158,7 +158,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { return journal->close( ).safe_then([this, f=std::move(f)]() mutable { journal = journal::make_segmented( - *segment_manager, *scanner, *this); + *segment_manager, *sms, *this); journal->set_write_pipeline(&pipeline); return journal->replay(std::forward(std::move(f))); }).safe_then([this] {