transaction_manager.cc
transaction.cc
cache.cc
- extent_reader.cc
lba_manager.cc
segment_cleaner.cc
lba_manager/btree/btree_lba_manager.cc
journal/segment_allocator.cc
journal.cc
device.cc
+ segment_manager_group.cc
../../../test/crimson/seastore/test_block.cc
${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
)
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#include "crimson/os/seastore/extent_reader.h"
-
-#include "crimson/os/seastore/logging.h"
-
-SET_SUBSYS(seastore_journal);
-
-namespace crimson::os::seastore {
-
-ExtentReader::read_segment_tail_ret
-ExtentReader::read_segment_tail(segment_id_t segment)
-{
- auto& segment_manager = *segment_managers[segment.device_id()];
- return segment_manager.read(
- paddr_t::make_seg_paddr(
- segment,
- segment_manager.get_segment_size() -
- segment_manager.get_rounded_tail_length()),
- segment_manager.get_rounded_tail_length()
- ).handle_error(
- read_segment_header_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in ExtentReader::read_segment_tail"
- }
- ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret {
- LOG_PREFIX(ExtentReader::read_segment_tail);
- DEBUG("segment {} bptr size {}", segment, bptr.length());
-
- segment_tail_t tail;
- bufferlist bl;
- bl.push_back(bptr);
-
- DEBUG("segment {} block crc {}",
- segment,
- bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
- auto bp = bl.cbegin();
- try {
- decode(tail, bp);
- } catch (ceph::buffer::error &e) {
- DEBUG("segment {} unable to decode tail, skipping -- {}",
- segment, e);
- return crimson::ct_error::enodata::make();
- }
- DEBUG("segment {} tail {}", segment, tail);
- return read_segment_tail_ret(
- read_segment_tail_ertr::ready_future_marker{},
- tail);
- });
-}
-
-ExtentReader::read_segment_header_ret
-ExtentReader::read_segment_header(segment_id_t segment)
-{
- auto& segment_manager = *segment_managers[segment.device_id()];
- return segment_manager.read(
- paddr_t::make_seg_paddr(segment, 0),
- segment_manager.get_block_size()
- ).handle_error(
- read_segment_header_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in ExtentReader::read_segment_header"
- }
- ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret {
- LOG_PREFIX(ExtentReader::read_segment_header);
- DEBUG("segment {} bptr size {}", segment, bptr.length());
-
- segment_header_t header;
- bufferlist bl;
- bl.push_back(bptr);
-
- DEBUG("segment {} block crc {}",
- segment,
- bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
- auto bp = bl.cbegin();
- try {
- decode(header, bp);
- } catch (ceph::buffer::error &e) {
- DEBUG("segment {} unable to decode header, skipping -- {}",
- segment, e);
- return crimson::ct_error::enodata::make();
- }
- DEBUG("segment {} header {}", segment, header);
- return read_segment_header_ret(
- read_segment_header_ertr::ready_future_marker{},
- header);
- });
-}
-
-ExtentReader::scan_extents_ret ExtentReader::scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read)
-{
- auto ret = std::make_unique<scan_extents_ret_bare>();
- auto* extents = ret.get();
- return read_segment_header(cursor.get_segment_id()
- ).handle_error(
- scan_extents_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in ExtentReader::scan_extents"
- }
- ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
- auto segment_nonce = segment_header.segment_nonce;
- return seastar::do_with(
- found_record_handler_t([extents](
- record_locator_t locator,
- const record_group_header_t& header,
- const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
- {
- LOG_PREFIX(ExtentReader::scan_extents);
- DEBUG("decoding {} records", header.records);
- auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
- if (!maybe_record_extent_infos) {
- // This should be impossible, we did check the crc on the mdbuf
- ERROR("unable to decode extents for record {}",
- locator.record_block_base);
- return crimson::ct_error::input_output_error::make();
- }
-
- paddr_t extent_offset = locator.record_block_base;
- for (auto& r: *maybe_record_extent_infos) {
- DEBUG("decoded {} extents", r.extent_infos.size());
- for (const auto &i : r.extent_infos) {
- extents->emplace_back(
- extent_offset,
- std::pair<commit_info_t, extent_info_t>(
- {r.header.commit_time,
- r.header.commit_type},
- i));
- auto& seg_addr = extent_offset.as_seg_paddr();
- seg_addr.set_segment_off(
- seg_addr.get_segment_off() + i.len);
- }
- }
- return scan_extents_ertr::now();
- }),
- [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
- return scan_valid_records(
- cursor,
- segment_nonce,
- bytes_to_read,
- dhandler
- ).discard_result();
- }
- );
- }).safe_then([ret=std::move(ret)] {
- return std::move(*ret);
- });
-}
-
-ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
- scan_valid_records_cursor &cursor,
- segment_nonce_t nonce,
- size_t budget,
- found_record_handler_t &handler)
-{
- LOG_PREFIX(ExtentReader::scan_valid_records);
- auto& segment_manager =
- *segment_managers[cursor.get_segment_id().device_id()];
- if (cursor.get_segment_offset() == 0) {
- INFO("start to scan segment {}", cursor.get_segment_id());
- cursor.increment_seq(segment_manager.get_block_size());
- }
- DEBUG("starting at {}, budget={}", cursor, budget);
- auto retref = std::make_unique<size_t>(0);
- auto &budget_used = *retref;
- return crimson::repeat(
- [=, &cursor, &budget_used, &handler]() mutable
- -> scan_valid_records_ertr::future<seastar::stop_iteration> {
- return [=, &handler, &cursor, &budget_used] {
- if (!cursor.last_valid_header_found) {
- return read_validate_record_metadata(cursor.seq.offset, nonce
- ).safe_then([=, &cursor](auto md) {
- if (!md) {
- cursor.last_valid_header_found = true;
- if (cursor.is_complete()) {
- INFO("complete at {}, invalid record group metadata",
- cursor);
- } else {
- DEBUG("found invalid record group metadata at {}, "
- "processing {} pending record groups",
- cursor.seq,
- cursor.pending_record_groups.size());
- }
- return scan_valid_records_ertr::now();
- } else {
- auto& [header, md_bl] = *md;
- DEBUG("found valid {} at {}", header, cursor.seq);
- cursor.emplace_record_group(header, std::move(md_bl));
- return scan_valid_records_ertr::now();
- }
- }).safe_then([=, &cursor, &budget_used, &handler] {
- DEBUG("processing committed record groups until {}, {} pending",
- cursor.last_committed,
- cursor.pending_record_groups.size());
- return crimson::repeat(
- [=, &budget_used, &cursor, &handler] {
- if (cursor.pending_record_groups.empty()) {
- /* This is only possible if the segment is empty.
- * A record's last_commited must be prior to its own
- * location since it itself cannot yet have been committed
- * at its own time of submission. Thus, the most recently
- * read record must always fall after cursor.last_committed */
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
- auto &next = cursor.pending_record_groups.front();
- journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
- if (cursor.last_committed == JOURNAL_SEQ_NULL ||
- next_seq > cursor.last_committed) {
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
- return consume_next_records(cursor, handler, budget_used
- ).safe_then([] {
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
- });
- });
- });
- } else {
- assert(!cursor.pending_record_groups.empty());
- auto &next = cursor.pending_record_groups.front();
- return read_validate_data(next.offset, next.header
- ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
- if (!valid) {
- INFO("complete at {}, invalid record group data at {}, {}",
- cursor, next.offset, next.header);
- cursor.pending_record_groups.clear();
- return scan_valid_records_ertr::now();
- }
- return consume_next_records(cursor, handler, budget_used);
- });
- }
- }().safe_then([=, &budget_used, &cursor] {
- if (cursor.is_complete() || budget_used >= budget) {
- DEBUG("finish at {}, budget_used={}, budget={}",
- cursor, budget_used, budget);
- return seastar::stop_iteration::yes;
- } else {
- return seastar::stop_iteration::no;
- }
- });
- }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
- return scan_valid_records_ret(
- scan_valid_records_ertr::ready_future_marker{},
- std::move(*retref));
- });
-}
-
-ExtentReader::read_validate_record_metadata_ret
-ExtentReader::read_validate_record_metadata(
- paddr_t start,
- segment_nonce_t nonce)
-{
- LOG_PREFIX(ExtentReader::read_validate_record_metadata);
- auto& seg_addr = start.as_seg_paddr();
- auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
- auto block_size = segment_manager.get_block_size();
- auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
- if (seg_addr.get_segment_off() + block_size > segment_size) {
- DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- TRACE("reading record group header block {}~4096", start);
- return segment_manager.read(start, block_size
- ).safe_then([=, &segment_manager](bufferptr bptr) mutable
- -> read_validate_record_metadata_ret {
- auto block_size = static_cast<extent_len_t>(
- segment_manager.get_block_size());
- bufferlist bl;
- bl.append(bptr);
- auto maybe_header = try_decode_records_header(bl, nonce);
- if (!maybe_header.has_value()) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- auto& seg_addr = start.as_seg_paddr();
- auto& header = *maybe_header;
- if (header.mdlength < block_size ||
- header.mdlength % block_size != 0 ||
- header.dlength % block_size != 0 ||
- (header.committed_to != JOURNAL_SEQ_NULL &&
- header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
- (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
- ERROR("failed, invalid record group header {}", start);
- return crimson::ct_error::input_output_error::make();
- }
- if (header.mdlength == block_size) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl))
- );
- }
-
- auto rest_start = paddr_t::make_seg_paddr(
- seg_addr.get_segment_id(),
- seg_addr.get_segment_off() + (seastore_off_t)block_size
- );
- auto rest_len = header.mdlength - block_size;
- TRACE("reading record group header rest {}~{}", rest_start, rest_len);
- return segment_manager.read(rest_start, rest_len
- ).safe_then([header=std::move(header), bl=std::move(bl)
- ](auto&& bptail) mutable {
- bl.push_back(bptail);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl)));
- });
- }).safe_then([](auto p) {
- if (p && validate_records_metadata(p->second)) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::move(*p)
- );
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- });
-}
-
-ExtentReader::read_validate_data_ret
-ExtentReader::read_validate_data(
- paddr_t record_base,
- const record_group_header_t &header)
-{
- LOG_PREFIX(ExtentReader::read_validate_data);
- auto& segment_manager = *segment_managers[record_base.get_device_id()];
- auto data_addr = record_base.add_offset(header.mdlength);
- TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
- return segment_manager.read(
- data_addr,
- header.dlength
- ).safe_then([=, &header](auto bptr) {
- bufferlist bl;
- bl.append(bptr);
- return validate_records_data(header, bl);
- });
-}
-
-ExtentReader::consume_record_group_ertr::future<>
-ExtentReader::consume_next_records(
- scan_valid_records_cursor& cursor,
- found_record_handler_t& handler,
- std::size_t& budget_used)
-{
- LOG_PREFIX(ExtentReader::consume_next_records);
- auto& next = cursor.pending_record_groups.front();
- auto total_length = next.header.dlength + next.header.mdlength;
- budget_used += total_length;
- auto locator = record_locator_t{
- next.offset.add_offset(next.header.mdlength),
- write_result_t{
- journal_seq_t{
- cursor.seq.segment_seq,
- next.offset
- },
- static_cast<seastore_off_t>(total_length)
- }
- };
- DEBUG("processing {} at {}, budget_used={}",
- next.header, locator, budget_used);
- return handler(
- locator,
- next.header,
- next.mdbuffer
- ).safe_then([FNAME, &cursor] {
- cursor.pop_record_group();
- if (cursor.is_complete()) {
- INFO("complete at {}, no more record group", cursor);
- }
- });
-}
-
-} // namespace crimson::os::seastore
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab expandtab
-
-#pragma once
-
-#include "crimson/common/errorator.h"
-#include "crimson/os/seastore/seastore_types.h"
-#include "crimson/os/seastore/segment_manager.h"
-#include "crimson/os/seastore/logging.h"
-
-namespace crimson::os::seastore {
-
-class SegmentCleaner;
-class TransactionManager;
-
-class ExtentReader {
-public:
- std::vector<SegmentManager*>& get_segment_managers() {
- return segment_managers;
- }
-
- using read_ertr = SegmentManager::read_ertr;
- ExtentReader() {
- segment_managers.resize(DEVICE_ID_MAX, nullptr);
- }
- using read_segment_header_ertr = crimson::errorator<
- crimson::ct_error::enoent,
- crimson::ct_error::enodata,
- crimson::ct_error::input_output_error
- >;
- using read_segment_header_ret = read_segment_header_ertr::future<
- segment_header_t>;
- read_segment_header_ret read_segment_header(segment_id_t segment);
-
- using read_segment_tail_ertr = read_segment_header_ertr;
- using read_segment_tail_ret = read_segment_tail_ertr::future<
- segment_tail_t>;
- read_segment_tail_ret read_segment_tail(segment_id_t segment);
-
- struct commit_info_t {
- mod_time_point_t commit_time;
- record_commit_type_t commit_type;
- };
-
- /**
- * scan_extents
- *
- * Scans records beginning at addr until the first record boundary after
- * addr + bytes_to_read.
- *
- * Returns list<extent, extent_info>
- * cursor.is_complete() will be true when no further extents exist in segment.
- */
- using scan_extents_cursor = scan_valid_records_cursor;
- using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
- using scan_extents_ret_bare =
- std::list<std::pair<paddr_t, std::pair<commit_info_t, extent_info_t>>>;
- using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
- scan_extents_ret scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read
- );
-
- using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
- using scan_valid_records_ret = scan_valid_records_ertr::future<
- size_t>;
- using found_record_handler_t = std::function<
- scan_valid_records_ertr::future<>(
- record_locator_t record_locator,
- // callee may assume header and bl will remain valid until
- // returned future resolves
- const record_group_header_t &header,
- const bufferlist &mdbuf)>;
- scan_valid_records_ret scan_valid_records(
- scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
- segment_nonce_t nonce, ///< [in] nonce for segment
- size_t budget, ///< [in] max budget to use
- found_record_handler_t &handler ///< [in] handler for records
- ); ///< @return used budget
-
- using release_ertr = SegmentManager::release_ertr;
- release_ertr::future<> release_segment(segment_id_t id) {
- assert(segment_managers[id.device_id()] != nullptr);
- return segment_managers[id.device_id()]->release(id);
- }
-
- void add_segment_manager(SegmentManager* segment_manager) {
- ceph_assert(!segment_managers[segment_manager->get_device_id()]);
- segment_managers[segment_manager->get_device_id()] = segment_manager;
- }
-
- void reset() {
- segment_managers.clear();
- segment_managers.resize(DEVICE_ID_MAX, nullptr);
- }
-
-private:
- std::vector<SegmentManager*> segment_managers;
- /// read record metadata for record starting at start
- using read_validate_record_metadata_ertr = read_ertr;
- using read_validate_record_metadata_ret =
- read_validate_record_metadata_ertr::future<
- std::optional<std::pair<record_group_header_t, bufferlist>>
- >;
- read_validate_record_metadata_ret read_validate_record_metadata(
- paddr_t start,
- segment_nonce_t nonce);
-
- /// read and validate data
- using read_validate_data_ertr = read_ertr;
- using read_validate_data_ret = read_validate_data_ertr::future<bool>;
- read_validate_data_ret read_validate_data(
- paddr_t record_base,
- const record_group_header_t &header ///< caller must ensure lifetime through
- /// future resolution
- );
-
- using consume_record_group_ertr = scan_valid_records_ertr;
- consume_record_group_ertr::future<> consume_next_records(
- scan_valid_records_cursor& cursor,
- found_record_handler_t& handler,
- std::size_t& budget_used);
-};
-
-using ExtentReaderRef = std::unique_ptr<ExtentReader>;
-
-} // namespace crimson::os::seastore
JournalRef make_segmented(
SegmentManager &sm,
- ExtentReader &reader,
+ SegmentManagerGroup &sms,
SegmentProvider &provider)
{
- return std::make_unique<SegmentedJournal>(sm, reader, provider);
+ return std::make_unique<SegmentedJournal>(sm, sms, provider);
}
}
}
class SegmentManager;
-class ExtentReader;
+class SegmentManagerGroup;
class SegmentProvider;
class Journal {
JournalRef make_segmented(
SegmentManager &sm,
- ExtentReader &reader,
+ SegmentManagerGroup &sms,
SegmentProvider &provider);
}
SegmentedJournal::SegmentedJournal(
SegmentManager &segment_manager,
- ExtentReader &scanner,
+ SegmentManagerGroup &sms,
SegmentProvider &segment_provider)
: segment_provider(segment_provider),
segment_seq_allocator(
crimson::common::get_conf<double>(
"seastore_journal_batch_preferred_fullness"),
journal_segment_allocator),
- scanner(scanner)
+ sms(sms)
{
}
INFO("starting at {} -- {}", seq, header);
return seastar::do_with(
scan_valid_records_cursor(seq),
- ExtentReader::found_record_handler_t(
+ SegmentManagerGroup::found_record_handler_t(
[s_type=header.type, &handler, this](
record_locator_t locator,
const record_group_header_t& header,
const bufferlist& mdbuf)
- -> ExtentReader::scan_valid_records_ertr::future<>
+ -> SegmentManagerGroup::scan_valid_records_ertr::future<>
{
LOG_PREFIX(Journal::replay_segment);
auto maybe_record_deltas_list = try_decode_deltas(
});
}),
[=](auto &cursor, auto &dhandler) {
- return scanner.scan_valid_records(
+ return sms.scan_valid_records(
cursor,
header.segment_nonce,
std::numeric_limits<size_t>::max(),
segment_id_t segment_id{
journal_segment_allocator.get_device_id(),
d_segment_id};
- return scanner.read_segment_header(
+ return sms.read_segment_header(
segment_id
).safe_then([segment_id, &ret](auto &&header) {
if (header.get_type() == segment_type_t::JOURNAL) {
#include "crimson/os/seastore/segment_cleaner.h"
#include "crimson/os/seastore/journal.h"
-#include "crimson/os/seastore/extent_reader.h"
+#include "crimson/os/seastore/segment_manager_group.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/osd/exceptions.h"
public:
SegmentedJournal(
SegmentManager &segment_manager,
- ExtentReader& scanner,
+ SegmentManagerGroup& sms,
SegmentProvider& cleaner);
~SegmentedJournal() {}
SegmentSeqAllocatorRef segment_seq_allocator;
SegmentAllocator journal_segment_allocator;
RecordSubmitter record_submitter;
- ExtentReader& scanner;
+ SegmentManagerGroup& sms;
WritePipeline* write_pipeline = nullptr;
- /// read journal segment headers from scanner
+ /// read journal segment headers from sms
using find_journal_segments_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using find_journal_segments_ret_bare = std::vector<
SegmentCleaner::SegmentCleaner(
config_t config,
- ExtentReaderRef&& scr,
+ SegmentManagerGroupRef&& sm_group,
bool detailed)
: detailed(detailed),
config(config),
- scanner(std::move(scr)),
+ sm_group(std::move(sm_group)),
ool_segment_seq_allocator(
new SegmentSeqAllocator(segment_type_t::OOL)),
gc_process(*this)
return seastar::now();
}
scan_cursor =
- std::make_unique<ExtentReader::scan_extents_cursor>(
+ std::make_unique<SegmentManagerGroup::scan_extents_cursor>(
next);
logger().debug(
"SegmentCleaner::do_gc: starting gc on segment {}",
ceph_assert(!scan_cursor->is_complete());
}
- return scanner->scan_extents(
+ return sm_group->scan_extents(
*scan_cursor,
config.reclaim_bytes_stride
).safe_then([this](auto &&_extents) {
SegmentCleaner::mount_ret SegmentCleaner::mount(
device_id_t pdevice_id)
{
- auto& sms = scanner->get_segment_managers();
+ auto& sms = sm_group->get_segment_managers();
logger().debug(
"SegmentCleaner::mount: {} segment managers", sms.size());
init_complete = false;
segments.end(),
[this, &segment_set](auto& it) {
auto segment_id = it.first;
- return scanner->read_segment_header(
+ return sm_group->read_segment_header(
segment_id
).safe_then([segment_id, this, &segment_set](auto header) {
logger().debug(
- "ExtentReader::mount: segment_id={} -- {}",
+ "SegmentCleaner::mount: segment_id={} -- {}",
segment_id, header);
auto s_type = header.get_type();
if (s_type == segment_type_t::NULL_SEG) {
logger().error(
- "ExtentReader::mount: got null segment, segment_id={} -- {}",
+ "SegmentCleaner::mount: got null segment, segment_id={} -- {}",
segment_id, header);
ceph_abort();
}
- return scanner->read_segment_tail(
+ return sm_group->read_segment_tail(
segment_id
).safe_then([this, segment_id, &segment_set, header](auto tail)
-> scan_extents_ertr::future<> {
{
if (header.get_type() == segment_type_t::OOL) {
logger().info(
- "ExtentReader::init_segments: out-of-line segment {}",
+ "SegmentCleaner::scan_nonfull_segment: out-of-line segment {}",
segment_id);
return seastar::do_with(
scan_valid_records_cursor({
paddr_t::make_seg_paddr(segment_id, 0)}),
[this, segment_id, header](auto& cursor) {
return seastar::do_with(
- ExtentReader::found_record_handler_t([this, segment_id](
+ SegmentManagerGroup::found_record_handler_t([this, segment_id](
record_locator_t locator,
const record_group_header_t& header,
const bufferlist& mdbuf
- ) mutable -> ExtentReader::scan_valid_records_ertr::future<> {
+ ) mutable -> SegmentManagerGroup::scan_valid_records_ertr::future<> {
LOG_PREFIX(SegmentCleaner::scan_nonfull_segment);
DEBUG("decodeing {} records", header.records);
auto maybe_headers = try_decode_record_headers(header, mdbuf);
mod_time_point_t ctime = header.commit_time;
auto commit_type = header.commit_type;
if (!ctime) {
- ERROR("Scanner::init_segments: extent {} 0 commit_time",
+ ERROR("SegmentCleaner::scan_nonfull_segment: extent {} 0 commit_time",
ctime);
ceph_abort("0 commit_time");
}
return seastar::now();
}),
[&cursor, header, segment_id, this](auto& handler) {
- return scanner->scan_valid_records(
+ return sm_group->scan_valid_records(
cursor,
header.segment_nonce,
segments[segment_id.device_id()]->segment_size,
});
} else if (header.get_type() == segment_type_t::JOURNAL) {
logger().info(
- "ExtentReader::init_segments: journal segment {}",
+ "SEgmentCleaner::scan_nonfull_segment: journal segment {}",
segment_id);
segment_set.emplace_back(std::make_pair(segment_id, std::move(header)));
} else {
if (to_release != NULL_SEG_ID) {
LOG_PREFIX(SegmentCleaner::maybe_release_segment);
INFOT("releasing segment {}", t, to_release);
- return scanner->release_segment(to_release
+ return sm_group->release_segment(to_release
).safe_then([this, to_release] {
stats.segments_released++;
mark_empty(to_release);
#include "crimson/common/log.h"
#include "crimson/os/seastore/cached_extent.h"
-#include "crimson/os/seastore/extent_reader.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/segment_manager_group.h"
#include "crimson/os/seastore/transaction.h"
#include "crimson/os/seastore/segment_seq_allocator.h"
const bool detailed;
const config_t config;
- ExtentReaderRef scanner;
+ SegmentManagerGroupRef sm_group;
SpaceTrackerIRef space_tracker;
segment_info_set_t segments;
public:
SegmentCleaner(
config_t config,
- ExtentReaderRef&& scanner,
+ SegmentManagerGroupRef&& sm_group,
bool detailed = false);
SegmentSeqAllocator& get_ool_segment_seq_allocator() {
return segments[id].get_type();
}
- using release_ertr = ExtentReader::release_ertr;
+ using release_ertr = SegmentManagerGroup::release_ertr;
release_ertr::future<> maybe_release_segment(Transaction &t);
void adjust_segment_util(double old_usage, double new_usage) {
// GC status helpers
std::unique_ptr<
- ExtentReader::scan_extents_cursor
+ SegmentManagerGroup::scan_extents_cursor
> scan_cursor;
/**
} gc_process;
using gc_ertr = work_ertr::extend_ertr<
- ExtentReader::scan_extents_ertr
+ SegmentManagerGroup::scan_extents_ertr
>;
gc_cycle_ret do_gc_cycle();
using scan_extents_ret_bare =
std::vector<std::pair<segment_id_t, segment_header_t>>;
- using scan_extents_ertr = ExtentReader::scan_extents_ertr;
+ using scan_extents_ertr = SegmentManagerGroup::scan_extents_ertr;
using scan_extents_ret = scan_extents_ertr::future<>;
scan_extents_ret scan_nonfull_segment(
const segment_header_t& header,
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/segment_manager_group.h"
+
+#include "crimson/os/seastore/logging.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore {
+
+SegmentManagerGroup::read_segment_tail_ret
+SegmentManagerGroup::read_segment_tail(segment_id_t segment)
+{
+ auto& segment_manager = *segment_managers[segment.device_id()];
+ return segment_manager.read(
+ paddr_t::make_seg_paddr(
+ segment,
+ segment_manager.get_segment_size() -
+ segment_manager.get_rounded_tail_length()),
+ segment_manager.get_rounded_tail_length()
+ ).handle_error(
+ read_segment_header_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in SegmentManagerGroup::read_segment_tail"
+ }
+ ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret {
+ LOG_PREFIX(SegmentManagerGroup::read_segment_tail);
+ DEBUG("segment {} bptr size {}", segment, bptr.length());
+
+ segment_tail_t tail;
+ bufferlist bl;
+ bl.push_back(bptr);
+
+ DEBUG("segment {} block crc {}",
+ segment,
+ bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+ auto bp = bl.cbegin();
+ try {
+ decode(tail, bp);
+ } catch (ceph::buffer::error &e) {
+ DEBUG("segment {} unable to decode tail, skipping -- {}",
+ segment, e);
+ return crimson::ct_error::enodata::make();
+ }
+ DEBUG("segment {} tail {}", segment, tail);
+ return read_segment_tail_ret(
+ read_segment_tail_ertr::ready_future_marker{},
+ tail);
+ });
+}
+
+SegmentManagerGroup::read_segment_header_ret
+SegmentManagerGroup::read_segment_header(segment_id_t segment)
+{
+ auto& segment_manager = *segment_managers[segment.device_id()];
+ return segment_manager.read(
+ paddr_t::make_seg_paddr(segment, 0),
+ segment_manager.get_block_size()
+ ).handle_error(
+ read_segment_header_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in SegmentManagerGroup::read_segment_header"
+ }
+ ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret {
+ LOG_PREFIX(SegmentManagerGroup::read_segment_header);
+ DEBUG("segment {} bptr size {}", segment, bptr.length());
+
+ segment_header_t header;
+ bufferlist bl;
+ bl.push_back(bptr);
+
+ DEBUG("segment {} block crc {}",
+ segment,
+ bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+ auto bp = bl.cbegin();
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ DEBUG("segment {} unable to decode header, skipping -- {}",
+ segment, e);
+ return crimson::ct_error::enodata::make();
+ }
+ DEBUG("segment {} header {}", segment, header);
+ return read_segment_header_ret(
+ read_segment_header_ertr::ready_future_marker{},
+ header);
+ });
+}
+
+SegmentManagerGroup::scan_extents_ret
+SegmentManagerGroup::scan_extents(
+ scan_extents_cursor &cursor,
+ extent_len_t bytes_to_read)
+{
+ auto ret = std::make_unique<scan_extents_ret_bare>();
+ auto* extents = ret.get();
+ return read_segment_header(cursor.get_segment_id()
+ ).handle_error(
+ scan_extents_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in SegmentManagerGroup::scan_extents"
+ }
+ ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
+ auto segment_nonce = segment_header.segment_nonce;
+ return seastar::do_with(
+ found_record_handler_t([extents](
+ record_locator_t locator,
+ const record_group_header_t& header,
+ const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
+ {
+ LOG_PREFIX(SegmentManagerGroup::scan_extents);
+ DEBUG("decoding {} records", header.records);
+ auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
+ if (!maybe_record_extent_infos) {
+ // This should be impossible, we did check the crc on the mdbuf
+ ERROR("unable to decode extents for record {}",
+ locator.record_block_base);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ paddr_t extent_offset = locator.record_block_base;
+ for (auto& r: *maybe_record_extent_infos) {
+ DEBUG("decoded {} extents", r.extent_infos.size());
+ for (const auto &i : r.extent_infos) {
+ extents->emplace_back(
+ extent_offset,
+ std::pair<commit_info_t, extent_info_t>(
+ {r.header.commit_time,
+ r.header.commit_type},
+ i));
+ auto& seg_addr = extent_offset.as_seg_paddr();
+ seg_addr.set_segment_off(
+ seg_addr.get_segment_off() + i.len);
+ }
+ }
+ return scan_extents_ertr::now();
+ }),
+ [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
+ return scan_valid_records(
+ cursor,
+ segment_nonce,
+ bytes_to_read,
+ dhandler
+ ).discard_result();
+ }
+ );
+ }).safe_then([ret=std::move(ret)] {
+ return std::move(*ret);
+ });
+}
+
+SegmentManagerGroup::scan_valid_records_ret
+SegmentManagerGroup::scan_valid_records(
+ scan_valid_records_cursor &cursor,
+ segment_nonce_t nonce,
+ size_t budget,
+ found_record_handler_t &handler)
+{
+ LOG_PREFIX(SegmentManagerGroup::scan_valid_records);
+ auto& segment_manager =
+ *segment_managers[cursor.get_segment_id().device_id()];
+ if (cursor.get_segment_offset() == 0) {
+ INFO("start to scan segment {}", cursor.get_segment_id());
+ cursor.increment_seq(segment_manager.get_block_size());
+ }
+ DEBUG("starting at {}, budget={}", cursor, budget);
+ auto retref = std::make_unique<size_t>(0);
+ auto &budget_used = *retref;
+ return crimson::repeat(
+ [=, &cursor, &budget_used, &handler]() mutable
+ -> scan_valid_records_ertr::future<seastar::stop_iteration> {
+ return [=, &handler, &cursor, &budget_used] {
+ if (!cursor.last_valid_header_found) {
+ return read_validate_record_metadata(cursor.seq.offset, nonce
+ ).safe_then([=, &cursor](auto md) {
+ if (!md) {
+ cursor.last_valid_header_found = true;
+ if (cursor.is_complete()) {
+ INFO("complete at {}, invalid record group metadata",
+ cursor);
+ } else {
+ DEBUG("found invalid record group metadata at {}, "
+ "processing {} pending record groups",
+ cursor.seq,
+ cursor.pending_record_groups.size());
+ }
+ return scan_valid_records_ertr::now();
+ } else {
+ auto& [header, md_bl] = *md;
+ DEBUG("found valid {} at {}", header, cursor.seq);
+ cursor.emplace_record_group(header, std::move(md_bl));
+ return scan_valid_records_ertr::now();
+ }
+ }).safe_then([=, &cursor, &budget_used, &handler] {
+ DEBUG("processing committed record groups until {}, {} pending",
+ cursor.last_committed,
+ cursor.pending_record_groups.size());
+ return crimson::repeat(
+ [=, &budget_used, &cursor, &handler] {
+ if (cursor.pending_record_groups.empty()) {
+ /* This is only possible if the segment is empty.
+ * A record's last_commited must be prior to its own
+ * location since it itself cannot yet have been committed
+ * at its own time of submission. Thus, the most recently
+ * read record must always fall after cursor.last_committed */
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ auto &next = cursor.pending_record_groups.front();
+ journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
+ if (cursor.last_committed == JOURNAL_SEQ_NULL ||
+ next_seq > cursor.last_committed) {
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ return consume_next_records(cursor, handler, budget_used
+ ).safe_then([] {
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ });
+ });
+ });
+ } else {
+ assert(!cursor.pending_record_groups.empty());
+ auto &next = cursor.pending_record_groups.front();
+ return read_validate_data(next.offset, next.header
+ ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
+ if (!valid) {
+ INFO("complete at {}, invalid record group data at {}, {}",
+ cursor, next.offset, next.header);
+ cursor.pending_record_groups.clear();
+ return scan_valid_records_ertr::now();
+ }
+ return consume_next_records(cursor, handler, budget_used);
+ });
+ }
+ }().safe_then([=, &budget_used, &cursor] {
+ if (cursor.is_complete() || budget_used >= budget) {
+ DEBUG("finish at {}, budget_used={}, budget={}",
+ cursor, budget_used, budget);
+ return seastar::stop_iteration::yes;
+ } else {
+ return seastar::stop_iteration::no;
+ }
+ });
+ }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+ return scan_valid_records_ret(
+ scan_valid_records_ertr::ready_future_marker{},
+ std::move(*retref));
+ });
+}
+
+SegmentManagerGroup::read_validate_record_metadata_ret
+SegmentManagerGroup::read_validate_record_metadata(
+ paddr_t start,
+ segment_nonce_t nonce)
+{
+ LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata);
+ auto& seg_addr = start.as_seg_paddr();
+ auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
+ auto block_size = segment_manager.get_block_size();
+ auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
+ if (seg_addr.get_segment_off() + block_size > segment_size) {
+ DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ TRACE("reading record group header block {}~4096", start);
+ return segment_manager.read(start, block_size
+ ).safe_then([=, &segment_manager](bufferptr bptr) mutable
+ -> read_validate_record_metadata_ret {
+ auto block_size = static_cast<extent_len_t>(
+ segment_manager.get_block_size());
+ bufferlist bl;
+ bl.append(bptr);
+ auto maybe_header = try_decode_records_header(bl, nonce);
+ if (!maybe_header.has_value()) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ auto& seg_addr = start.as_seg_paddr();
+ auto& header = *maybe_header;
+ if (header.mdlength < block_size ||
+ header.mdlength % block_size != 0 ||
+ header.dlength % block_size != 0 ||
+ (header.committed_to != JOURNAL_SEQ_NULL &&
+ header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
+ (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
+ ERROR("failed, invalid record group header {}", start);
+ return crimson::ct_error::input_output_error::make();
+ }
+ if (header.mdlength == block_size) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl))
+ );
+ }
+
+ auto rest_start = paddr_t::make_seg_paddr(
+ seg_addr.get_segment_id(),
+ seg_addr.get_segment_off() + (seastore_off_t)block_size
+ );
+ auto rest_len = header.mdlength - block_size;
+ TRACE("reading record group header rest {}~{}", rest_start, rest_len);
+ return segment_manager.read(rest_start, rest_len
+ ).safe_then([header=std::move(header), bl=std::move(bl)
+ ](auto&& bptail) mutable {
+ bl.push_back(bptail);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl)));
+ });
+ }).safe_then([](auto p) {
+ if (p && validate_records_metadata(p->second)) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::move(*p)
+ );
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ });
+}
+
+SegmentManagerGroup::read_validate_data_ret
+SegmentManagerGroup::read_validate_data(
+ paddr_t record_base,
+ const record_group_header_t &header)
+{
+ LOG_PREFIX(SegmentManagerGroup::read_validate_data);
+ auto& segment_manager = *segment_managers[record_base.get_device_id()];
+ auto data_addr = record_base.add_offset(header.mdlength);
+ TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
+ return segment_manager.read(
+ data_addr,
+ header.dlength
+ ).safe_then([=, &header](auto bptr) {
+ bufferlist bl;
+ bl.append(bptr);
+ return validate_records_data(header, bl);
+ });
+}
+
+SegmentManagerGroup::consume_record_group_ertr::future<>
+SegmentManagerGroup::consume_next_records(
+ scan_valid_records_cursor& cursor,
+ found_record_handler_t& handler,
+ std::size_t& budget_used)
+{
+ LOG_PREFIX(SegmentManagerGroup::consume_next_records);
+ auto& next = cursor.pending_record_groups.front();
+ auto total_length = next.header.dlength + next.header.mdlength;
+ budget_used += total_length;
+ auto locator = record_locator_t{
+ next.offset.add_offset(next.header.mdlength),
+ write_result_t{
+ journal_seq_t{
+ cursor.seq.segment_seq,
+ next.offset
+ },
+ static_cast<seastore_off_t>(total_length)
+ }
+ };
+ DEBUG("processing {} at {}, budget_used={}",
+ next.header, locator, budget_used);
+ return handler(
+ locator,
+ next.header,
+ next.mdbuffer
+ ).safe_then([FNAME, &cursor] {
+ cursor.pop_record_group();
+ if (cursor.is_complete()) {
+ INFO("complete at {}, no more record group", cursor);
+ }
+ });
+}
+
+} // namespace crimson::os::seastore
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+
+class SegmentManagerGroup {
+public:
+ std::vector<SegmentManager*>& get_segment_managers() {
+ return segment_managers;
+ }
+
+ using read_ertr = SegmentManager::read_ertr;
+ SegmentManagerGroup() {
+ segment_managers.resize(DEVICE_ID_MAX, nullptr);
+ }
+ using read_segment_header_ertr = crimson::errorator<
+ crimson::ct_error::enoent,
+ crimson::ct_error::enodata,
+ crimson::ct_error::input_output_error
+ >;
+ using read_segment_header_ret = read_segment_header_ertr::future<
+ segment_header_t>;
+ read_segment_header_ret read_segment_header(segment_id_t segment);
+
+ using read_segment_tail_ertr = read_segment_header_ertr;
+ using read_segment_tail_ret = read_segment_tail_ertr::future<
+ segment_tail_t>;
+ read_segment_tail_ret read_segment_tail(segment_id_t segment);
+
+ struct commit_info_t {
+ mod_time_point_t commit_time;
+ record_commit_type_t commit_type;
+ };
+
+ /**
+ * scan_extents
+ *
+ * Scans records beginning at addr until the first record boundary after
+ * addr + bytes_to_read.
+ *
+ * Returns list<extent, extent_info>
+ * cursor.is_complete() will be true when no further extents exist in segment.
+ */
+ using scan_extents_cursor = scan_valid_records_cursor;
+ using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+ using scan_extents_ret_bare =
+ std::list<std::pair<paddr_t, std::pair<commit_info_t, extent_info_t>>>;
+ using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
+ scan_extents_ret scan_extents(
+ scan_extents_cursor &cursor,
+ extent_len_t bytes_to_read
+ );
+
+ using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+ using scan_valid_records_ret = scan_valid_records_ertr::future<
+ size_t>;
+ using found_record_handler_t = std::function<
+ scan_valid_records_ertr::future<>(
+ record_locator_t record_locator,
+ // callee may assume header and bl will remain valid until
+ // returned future resolves
+ const record_group_header_t &header,
+ const bufferlist &mdbuf)>;
+ scan_valid_records_ret scan_valid_records(
+ scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+ segment_nonce_t nonce, ///< [in] nonce for segment
+ size_t budget, ///< [in] max budget to use
+ found_record_handler_t &handler ///< [in] handler for records
+ ); ///< @return used budget
+
+ using release_ertr = SegmentManager::release_ertr;
+ release_ertr::future<> release_segment(segment_id_t id) {
+ assert(segment_managers[id.device_id()] != nullptr);
+ return segment_managers[id.device_id()]->release(id);
+ }
+
+ void add_segment_manager(SegmentManager* segment_manager) {
+ ceph_assert(!segment_managers[segment_manager->get_device_id()]);
+ segment_managers[segment_manager->get_device_id()] = segment_manager;
+ }
+
+ void reset() {
+ segment_managers.clear();
+ segment_managers.resize(DEVICE_ID_MAX, nullptr);
+ }
+
+private:
+ std::vector<SegmentManager*> segment_managers;
+ /// read record metadata for record starting at start
+ using read_validate_record_metadata_ertr = read_ertr;
+ using read_validate_record_metadata_ret =
+ read_validate_record_metadata_ertr::future<
+ std::optional<std::pair<record_group_header_t, bufferlist>>
+ >;
+ read_validate_record_metadata_ret read_validate_record_metadata(
+ paddr_t start,
+ segment_nonce_t nonce);
+
+ /// read and validate data
+ using read_validate_data_ertr = read_ertr;
+ using read_validate_data_ret = read_validate_data_ertr::future<bool>;
+ read_validate_data_ret read_validate_data(
+ paddr_t record_base,
+ const record_group_header_t &header ///< caller must ensure lifetime through
+ /// future resolution
+ );
+
+ using consume_record_group_ertr = scan_valid_records_ertr;
+ consume_record_group_ertr::future<> consume_next_records(
+ scan_valid_records_cursor& cursor,
+ found_record_handler_t& handler,
+ std::size_t& budget_used);
+};
+
+using SegmentManagerGroupRef = std::unique_ptr<SegmentManagerGroup>;
+
+} // namespace crimson::os::seastore
CacheRef _cache,
LBAManagerRef _lba_manager,
ExtentPlacementManagerRef&& epm,
- ExtentReader& scanner)
+ SegmentManagerGroup& sms)
: segment_cleaner(std::move(_segment_cleaner)),
cache(std::move(_cache)),
lba_manager(std::move(_lba_manager)),
journal(std::move(_journal)),
epm(std::move(epm)),
- scanner(scanner)
+ sms(sms)
{
segment_cleaner->set_extent_callback(this);
journal->set_write_pipeline(&write_pipeline);
cache->dump_contents();
return journal->close();
}).safe_then([this] {
- scanner.reset();
+ sms.reset();
return epm->close();
}).safe_then([FNAME] {
INFO("completed");
Device &device,
bool detailed)
{
- auto scanner = std::make_unique<ExtentReader>();
- auto& scanner_ref = *scanner.get();
+ auto sms = std::make_unique<SegmentManagerGroup>();
+ auto& sms_ref = *sms.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
- std::move(scanner),
+ std::move(sms),
detailed);
ceph_assert(device.get_device_type() == device_type_t::SEGMENTED);
auto sm = dynamic_cast<SegmentManager*>(&device);
ceph_assert(sm != nullptr);
- auto journal = journal::make_segmented(*sm, scanner_ref, *segment_cleaner);
+ auto journal = journal::make_segmented(*sm, sms_ref, *segment_cleaner);
auto epm = std::make_unique<ExtentPlacementManager>();
auto cache = std::make_unique<Cache>(*epm);
auto lba_manager = lba_manager::create_lba_manager(*cache);
std::move(cache),
std::move(lba_manager),
std::move(epm),
- scanner_ref);
+ sms_ref);
}
}
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/extent_placement_manager.h"
#include "crimson/os/seastore/device.h"
+#include "crimson/os/seastore/segment_manager_group.h"
namespace crimson::os::seastore {
class Journal;
CacheRef cache,
LBAManagerRef lba_manager,
ExtentPlacementManagerRef&& epm,
- ExtentReader& scanner);
+ SegmentManagerGroup& sms);
/// Writes initial metadata to disk
using mkfs_ertr = base_ertr;
*segment_cleaner,
*sm,
segment_cleaner->get_ool_segment_seq_allocator()));
- scanner.add_segment_manager(sm);
+ sms.add_segment_manager(sm);
}
~TransactionManager();
LBAManagerRef lba_manager;
JournalRef journal;
ExtentPlacementManagerRef epm;
- ExtentReader& scanner;
+ SegmentManagerGroup& sms;
WritePipeline write_pipeline;
public seastar_test_suite_t, SegmentProvider {
segment_manager::EphemeralSegmentManagerRef segment_manager;
- ExtentReaderRef scanner;
+ SegmentManagerGroupRef sms;
JournalRef journal;
ExtentPlacementManagerRef epm;
CacheRef cache;
virtual LBAManager::mkfs_ret test_structure_setup(Transaction &t) = 0;
seastar::future<> set_up_fut() final {
segment_manager = segment_manager::create_test_ephemeral();
- scanner.reset(new ExtentReader());
- auto& scanner_ref = *scanner.get();
+ sms.reset(new SegmentManagerGroup());
+ auto& sms_ref = *sms.get();
journal = journal::make_segmented(
- *segment_manager, scanner_ref, *this);
+ *segment_manager, sms_ref, *this);
epm.reset(new ExtentPlacementManager());
cache.reset(new Cache(*epm));
block_size = segment_manager->get_block_size();
next = segment_id_t{segment_manager->get_device_id(), 0};
- scanner_ref.add_segment_manager(segment_manager.get());
+ sms_ref.add_segment_manager(segment_manager.get());
epm->add_device(segment_manager.get(), true);
journal->set_write_pipeline(&pipeline);
}).safe_then([this] {
test_structure_reset();
segment_manager.reset();
- scanner.reset();
+ sms.reset();
journal.reset();
epm.reset();
cache.reset();
seastore_off_t block_size;
- ExtentReaderRef scanner;
+ SegmentManagerGroupRef sms;
segment_id_t next;
seastar::future<> set_up_fut() final {
segment_manager = segment_manager::create_test_ephemeral();
block_size = segment_manager->get_block_size();
- scanner.reset(new ExtentReader());
+ sms.reset(new SegmentManagerGroup());
next = segment_id_t(segment_manager->get_device_id(), 0);
- journal = journal::make_segmented(*segment_manager, *scanner, *this);
+ journal = journal::make_segmented(*segment_manager, *sms, *this);
journal->set_write_pipeline(&pipeline);
- scanner->add_segment_manager(segment_manager.get());
+ sms->add_segment_manager(segment_manager.get());
return segment_manager->init(
).safe_then([this] {
return journal->open_for_write();
return journal->close(
).safe_then([this] {
segment_manager.reset();
- scanner.reset();
+ sms.reset();
journal.reset();
}).handle_error(
crimson::ct_error::all_same_way([](auto e) {
return journal->close(
).safe_then([this, f=std::move(f)]() mutable {
journal = journal::make_segmented(
- *segment_manager, *scanner, *this);
+ *segment_manager, *sms, *this);
journal->set_write_pipeline(&pipeline);
return journal->replay(std::forward<T>(std::move(f)));
}).safe_then([this] {