As there will be two kinds of segments to be scanned, those created by the journal
and those created by the extent placement manager. We need a common module to scan
extents of both of these two kinds of segments
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
transaction.cc
journal.cc
cache.cc
+ scanner.cc
lba_manager.cc
segment_cleaner.cc
lba_manager/btree/btree_lba_manager.cc
sizeof(meta.seastore_id.uuid));
}
-Journal::Journal(SegmentManager &segment_manager)
- : segment_manager(segment_manager) {}
+Journal::Journal(SegmentManager &segment_manager, Scanner& scanner)
+ : segment_manager(segment_manager), scanner(scanner) {}
Journal::initialize_segment_ertr::future<segment_seq_t>
});
}
-bool Journal::validate_metadata(const bufferlist &bl)
-{
- auto bliter = bl.cbegin();
- auto test_crc = bliter.crc32c(
- ceph::encoded_sizeof_bounded<record_header_t>(),
- -1);
- ceph_le32 recorded_crc_le;
- decode(recorded_crc_le, bliter);
- uint32_t recorded_crc = recorded_crc_le;
- test_crc = bliter.crc32c(
- bliter.get_remaining(),
- test_crc);
- return test_crc == recorded_crc;
-}
-
-Journal::read_validate_data_ret Journal::read_validate_data(
- paddr_t record_base,
- const record_header_t &header)
-{
- return segment_manager.read(
- record_base.add_offset(header.mdlength),
- header.dlength
- ).safe_then([=, &header](auto bptr) {
- bufferlist bl;
- bl.append(bptr);
- return bl.crc32c(-1) == header.data_crc;
- });
-}
-
Journal::write_record_ret Journal::write_record(
record_size_t rsize,
record_t &&record,
);
}
-Journal::read_segment_header_ret
-Journal::read_segment_header(segment_id_t segment)
-{
- return segment_manager.read(
- paddr_t{segment, 0},
- segment_manager.get_block_size()
- ).handle_error(
- read_segment_header_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Journal::read_segment_header"
- }
- ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
- logger().debug("segment {} bptr size {}", segment, bptr.length());
-
- segment_header_t header;
- bufferlist bl;
- bl.push_back(bptr);
-
- logger().debug(
- "Journal::read_segment_header: segment {} block crc {}",
- segment,
- bl.begin().crc32c(segment_manager.get_block_size(), 0));
-
- auto bp = bl.cbegin();
- try {
- decode(header, bp);
- } catch (ceph::buffer::error &e) {
- logger().debug(
- "Journal::read_segment_header: segment {} unable to decode "
- "header, skipping",
- segment);
- return crimson::ct_error::enodata::make();
- }
- logger().debug(
- "Journal::read_segment_header: segment {} header {}",
- segment,
- header);
- return read_segment_header_ret(
- read_segment_header_ertr::ready_future_marker{},
- header);
- });
-}
-
Journal::open_for_write_ret Journal::open_for_write()
{
return roll_journal_segment().safe_then([this](auto seq) {
});
}
-Journal::find_replay_segments_fut Journal::find_replay_segments()
+Journal::prep_replay_segments_fut
+Journal::prep_replay_segments(
+ std::vector<std::pair<segment_id_t, segment_header_t>> segments)
{
- return seastar::do_with(
- std::vector<std::pair<segment_id_t, segment_header_t>>(),
- [this](auto &&segments) mutable {
- return crimson::do_for_each(
- boost::make_counting_iterator(segment_id_t{0}),
- boost::make_counting_iterator(segment_manager.get_num_segments()),
- [this, &segments](auto i) {
- return read_segment_header(i
- ).safe_then([this, &segments, i](auto header) mutable {
- if (generate_nonce(
- header.journal_segment_seq,
- segment_manager.get_meta()) != header.segment_nonce) {
- logger().debug(
- "find_replay_segments: nonce mismatch segment {} header {}",
- i,
- header);
- assert(0 == "impossible");
- return find_replay_segments_ertr::now();
- }
-
- segments.emplace_back(i, std::move(header));
- return find_replay_segments_ertr::now();
- }).handle_error(
- crimson::ct_error::enoent::handle([i](auto) {
- logger().debug(
- "find_replay_segments: segment {} not available for read",
- i);
- return find_replay_segments_ertr::now();
- }),
- crimson::ct_error::enodata::handle([i](auto) {
- logger().debug(
- "find_replay_segments: segment {} header undecodable",
- i);
- return find_replay_segments_ertr::now();
- }),
- find_replay_segments_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Journal::find_replay_segments"
- }
- );
- }).safe_then([this, &segments]() mutable -> find_replay_segments_fut {
- logger().debug(
- "find_replay_segments: have {} segments",
- segments.size());
- if (segments.empty()) {
- return crimson::ct_error::input_output_error::make();
- }
- std::sort(
- segments.begin(),
- segments.end(),
- [](const auto <, const auto &rt) {
- return lt.second.journal_segment_seq <
- rt.second.journal_segment_seq;
- });
-
- next_journal_segment_seq =
- segments.rbegin()->second.journal_segment_seq + 1;
- std::for_each(
- segments.begin(),
- segments.end(),
- [this](auto &seg) {
- segment_provider->init_mark_segment_closed(
- seg.first,
- seg.second.journal_segment_seq);
- });
+ logger().debug(
+ "prep_replay_segments: have {} segments",
+ segments.size());
+ if (segments.empty()) {
+ return crimson::ct_error::input_output_error::make();
+ }
+ std::sort(
+ segments.begin(),
+ segments.end(),
+ [](const auto <, const auto &rt) {
+ return lt.second.journal_segment_seq <
+ rt.second.journal_segment_seq;
+ });
- auto journal_tail = segments.rbegin()->second.journal_tail;
- segment_provider->update_journal_tail_committed(journal_tail);
- auto replay_from = journal_tail.offset;
- logger().debug(
- "Journal::find_replay_segments: journal_tail={}",
- journal_tail);
- auto from = segments.begin();
- if (replay_from != P_ADDR_NULL) {
- from = std::find_if(
- segments.begin(),
- segments.end(),
- [&replay_from](const auto &seg) -> bool {
- return seg.first == replay_from.segment;
- });
- if (from->second.journal_segment_seq != journal_tail.segment_seq) {
- logger().error(
- "find_replay_segments: journal_tail {} does not match {}",
- journal_tail,
- from->second);
- assert(0 == "invalid");
- }
- } else {
- replay_from = paddr_t{
- from->first,
- (segment_off_t)segment_manager.get_block_size()};
- }
- auto ret = replay_segments_t(segments.end() - from);
- std::transform(
- from, segments.end(), ret.begin(),
- [this](const auto &p) {
- auto ret = journal_seq_t{
- p.second.journal_segment_seq,
- paddr_t{
- p.first,
- (segment_off_t)segment_manager.get_block_size()}};
- logger().debug(
- "Journal::find_replay_segments: replaying from {}",
- ret);
- return std::make_pair(ret, p.second);
- });
- ret[0].first.offset = replay_from;
- return find_replay_segments_fut(
- find_replay_segments_ertr::ready_future_marker{},
- std::move(ret));
- });
+ next_journal_segment_seq =
+ segments.rbegin()->second.journal_segment_seq + 1;
+ std::for_each(
+ segments.begin(),
+ segments.end(),
+ [this](auto &seg) {
+ segment_provider->init_mark_segment_closed(
+ seg.first,
+ seg.second.journal_segment_seq);
});
-}
-Journal::read_validate_record_metadata_ret Journal::read_validate_record_metadata(
- paddr_t start,
- segment_nonce_t nonce)
-{
- auto block_size = segment_manager.get_block_size();
- if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
+ auto journal_tail = segments.rbegin()->second.journal_tail;
+ segment_provider->update_journal_tail_committed(journal_tail);
+ auto replay_from = journal_tail.offset;
+ logger().debug(
+ "Journal::prep_replay_segments: journal_tail={}",
+ journal_tail);
+ auto from = segments.begin();
+ if (replay_from != P_ADDR_NULL) {
+ from = std::find_if(
+ segments.begin(),
+ segments.end(),
+ [&replay_from](const auto &seg) -> bool {
+ return seg.first == replay_from.segment;
+ });
+ if (from->second.journal_segment_seq != journal_tail.segment_seq) {
+ logger().error(
+ "prep_replay_segments: journal_tail {} does not match {}",
+ journal_tail,
+ from->second);
+ assert(0 == "invalid");
+ }
+ } else {
+ replay_from = paddr_t{
+ from->first,
+ (segment_off_t)segment_manager.get_block_size()};
}
- return segment_manager.read(start, block_size
- ).safe_then(
- [=](bufferptr bptr) mutable
- -> read_validate_record_metadata_ret {
- logger().debug("read_validate_record_metadata: reading {}", start);
- auto block_size = segment_manager.get_block_size();
- bufferlist bl;
- bl.append(bptr);
- auto bp = bl.cbegin();
- record_header_t header;
- try {
- decode(header, bp);
- } catch (ceph::buffer::error &e) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- if (header.segment_nonce != nonce) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
- if (header.mdlength > (extent_len_t)block_size) {
- if (start.offset + header.mdlength >
- (int64_t)segment_manager.get_segment_size()) {
- return crimson::ct_error::input_output_error::make();
- }
- return segment_manager.read(
- {start.segment, start.offset + (segment_off_t)block_size},
- header.mdlength - block_size).safe_then(
- [header=std::move(header), bl=std::move(bl)](
- auto &&bptail) mutable {
- bl.push_back(bptail);
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl)));
- });
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::make_pair(std::move(header), std::move(bl))
- );
- }
- }).safe_then([=](auto p) {
- if (p && validate_metadata(p->second)) {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::move(*p)
- );
- } else {
- return read_validate_record_metadata_ret(
- read_validate_record_metadata_ertr::ready_future_marker{},
- std::nullopt);
- }
+ auto ret = replay_segments_t(segments.end() - from);
+ std::transform(
+ from, segments.end(), ret.begin(),
+ [this](const auto &p) {
+ auto ret = journal_seq_t{
+ p.second.journal_segment_seq,
+ paddr_t{
+ p.first,
+ (segment_off_t)segment_manager.get_block_size()}};
+ logger().debug(
+ "Journal::prep_replay_segments: replaying from {}",
+ ret);
+ return std::make_pair(ret, p.second);
});
+ ret[0].first.offset = replay_from;
+ return prep_replay_segments_fut(
+ prep_replay_segments_ertr::ready_future_marker{},
+ std::move(ret));
}
std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
return deltas;
}
-std::optional<std::vector<extent_info_t>> Journal::try_decode_extent_infos(
- record_header_t header,
- const bufferlist &bl)
-{
- auto bliter = bl.cbegin();
- bliter += ceph::encoded_sizeof_bounded<record_header_t>();
- bliter += sizeof(checksum_t) /* crc */;
- logger().debug("{}: decoding {} extents", __func__, header.extents);
- std::vector<extent_info_t> extent_infos(header.extents);
- for (auto &&i : extent_infos) {
- try {
- decode(i, bliter);
- } catch (ceph::buffer::error &e) {
- return std::nullopt;
- }
- }
- return extent_infos;
-}
-
Journal::replay_ertr::future<>
Journal::replay_segment(
journal_seq_t seq,
logger().debug("replay_segment: starting at {}", seq);
return seastar::do_with(
scan_valid_records_cursor(seq.offset),
- found_record_handler_t(
+ Scanner::found_record_handler_t(
[=, &handler](paddr_t base,
const record_header_t &header,
const bufferlist &mdbuf) {
});
}),
[=](auto &cursor, auto &dhandler) {
- return scan_valid_records(
+ return scanner.scan_valid_records(
cursor,
header.segment_nonce,
std::numeric_limits<size_t>::max(),
- dhandler).safe_then([](auto){});
+ dhandler).safe_then([](auto){}
+ ).handle_error(
+ replay_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "shouldn't meet with any other error other replay_ertr"
+ }
+ );;
});
}
-Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
+Journal::replay_ret Journal::replay(
+ std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
+ delta_handler_t &&delta_handler)
{
return seastar::do_with(
std::move(delta_handler), replay_segments_t(),
- [this](auto &handler, auto &segments) mutable -> replay_ret {
- return find_replay_segments().safe_then(
+ [this, segment_headers=std::move(segment_headers)]
+ (auto &handler, auto &segments) mutable -> replay_ret {
+ return prep_replay_segments(std::move(segment_headers)).safe_then(
[this, &handler, &segments](auto replay_segs) mutable {
logger().debug("replay: found {} segments", replay_segs.size());
segments = std::move(replay_segs);
});
}
-Journal::scan_extents_ret Journal::scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read)
-{
- auto ret = std::make_unique<scan_extents_ret_bare>();
- auto* extents = ret.get();
- return read_segment_header(cursor.get_offset().segment
- ).handle_error(
- scan_extents_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Journal::scan_extents"
- }
- ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
- auto segment_nonce = segment_header.segment_nonce;
- return seastar::do_with(
- found_record_handler_t(
- [extents, this](
- paddr_t base,
- const record_header_t &header,
- const bufferlist &mdbuf) mutable {
-
- auto infos = try_decode_extent_infos(
- header,
- mdbuf);
- if (!infos) {
- // This should be impossible, we did check the crc on the mdbuf
- logger().error(
- "Journal::scan_extents unable to decode extents for record {}",
- base);
- assert(infos);
- }
-
- paddr_t extent_offset = base.add_offset(header.mdlength);
- for (const auto &i : *infos) {
- extents->emplace_back(extent_offset, i);
- extent_offset.offset += i.len;
- }
- return scan_extents_ertr::now();
- }),
- [=, &cursor](auto &dhandler) {
- return scan_valid_records(
- cursor,
- segment_nonce,
- bytes_to_read,
- dhandler).discard_result();
- });
- }).safe_then([ret=std::move(ret)] {
- return std::move(*ret);
- });
-}
-
-Journal::scan_valid_records_ret Journal::scan_valid_records(
- scan_valid_records_cursor &cursor,
- segment_nonce_t nonce,
- size_t budget,
- found_record_handler_t &handler)
-{
- if (cursor.offset.offset == 0) {
- cursor.offset.offset = segment_manager.get_block_size();
- }
- auto retref = std::make_unique<size_t>(0);
- auto budget_used = *retref;
- return crimson::repeat(
- [=, &cursor, &budget_used, &handler]() mutable
- -> scan_valid_records_ertr::future<seastar::stop_iteration> {
- return [=, &handler, &cursor, &budget_used] {
- if (!cursor.last_valid_header_found) {
- return read_validate_record_metadata(cursor.offset, nonce
- ).safe_then([=, &cursor](auto md) {
- logger().debug(
- "Journal::scan_valid_records: read complete {}",
- cursor.offset);
- if (!md) {
- logger().debug(
- "Journal::scan_valid_records: found invalid header at {}, presumably at end",
- cursor.offset);
- cursor.last_valid_header_found = true;
- return scan_valid_records_ertr::now();
- } else {
- logger().debug(
- "Journal::scan_valid_records: valid record read at {}",
- cursor.offset);
- cursor.last_committed = paddr_t{
- cursor.offset.segment,
- md->first.committed_to};
- cursor.pending_records.emplace_back(
- cursor.offset,
- md->first,
- md->second);
- cursor.offset.offset +=
- md->first.dlength + md->first.mdlength;
- return scan_valid_records_ertr::now();
- }
- }).safe_then([=, &cursor, &budget_used, &handler] {
- return crimson::repeat(
- [=, &budget_used, &cursor, &handler] {
- logger().debug(
- "Journal::scan_valid_records: valid record read, processing queue");
- if (cursor.pending_records.empty()) {
- /* This is only possible if the segment is empty.
- * A record's last_commited must be prior to its own
- * location since it itself cannot yet have been committed
- * at its own time of submission. Thus, the most recently
- * read record must always fall after cursor.last_committed */
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
- auto &next = cursor.pending_records.front();
- if (next.offset > cursor.last_committed) {
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
- budget_used +=
- next.header.dlength + next.header.mdlength;
- return handler(
- next.offset,
- next.header,
- next.mdbuffer
- ).safe_then([&cursor] {
- cursor.pending_records.pop_front();
- return scan_valid_records_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
- });
- });
- });
- } else {
- assert(!cursor.pending_records.empty());
- auto &next = cursor.pending_records.front();
- return read_validate_data(next.offset, next.header
- ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
- if (!valid) {
- cursor.pending_records.clear();
- return scan_valid_records_ertr::now();
- }
- budget_used +=
- next.header.dlength + next.header.mdlength;
- return handler(
- next.offset,
- next.header,
- next.mdbuffer
- ).safe_then([&cursor] {
- cursor.pending_records.pop_front();
- return scan_valid_records_ertr::now();
- });
- });
- }
- }().safe_then([=, &budget_used, &cursor] {
- if (cursor.is_complete() || budget_used >= budget) {
- return seastar::stop_iteration::yes;
- } else {
- return seastar::stop_iteration::no;
- }
- });
- }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
- return scan_valid_records_ret(
- scan_valid_records_ertr::ready_future_marker{},
- std::move(*retref));
- });
-}
-
-
}
#include "include/denc.h"
#include "crimson/common/log.h"
+#include "crimson/os/seastore/scanner.h"
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
*/
class Journal {
public:
- Journal(SegmentManager &segment_manager);
+ Journal(SegmentManager &segment_manager, Scanner& scanner);
/**
* Sets the SegmentProvider.
replay_ret(journal_seq_t seq,
paddr_t record_block_base,
const delta_info_t&)>;
- replay_ret replay(delta_handler_t &&delta_handler);
-
- /**
- * scan_extents
- *
- * Scans records beginning at addr until the first record boundary after
- * addr + bytes_to_read.
- *
- * Returns list<extent, extent_info>
- * cursor.is_complete() will be true when no further extents exist in segment.
- */
- class scan_valid_records_cursor;
- using scan_extents_cursor = scan_valid_records_cursor;
- using scan_extents_ertr = SegmentManager::read_ertr;
- using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
- using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
- scan_extents_ret scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read
- );
+ replay_ret replay(
+ std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
+ delta_handler_t &&delta_handler);
void set_write_pipeline(WritePipeline *_write_pipeline) {
write_pipeline = _write_pipeline;
segment_off_t written_to = 0;
segment_off_t committed_to = 0;
+ Scanner& scanner;
WritePipeline *write_pipeline = nullptr;
void reset_soft_state() {
initialize_segment_ertr::future<segment_seq_t> initialize_segment(
Segment &segment);
- /// validate embedded metadata checksum
- static bool validate_metadata(const bufferlist &bl);
-
- /// read and validate data
- using read_validate_data_ertr = SegmentManager::read_ertr;
- using read_validate_data_ret = read_validate_data_ertr::future<bool>;
- read_validate_data_ret read_validate_data(
- paddr_t record_base,
- const record_header_t &header ///< caller must ensure lifetime through
- /// future resolution
- );
-
/// do record write
using write_record_ertr = crimson::errorator<
/// returns true iff current segment has insufficient space
bool needs_roll(segment_off_t length) const;
- using read_segment_header_ertr = crimson::errorator<
- crimson::ct_error::enoent,
- crimson::ct_error::enodata,
- crimson::ct_error::input_output_error
- >;
- using read_segment_header_ret = read_segment_header_ertr::future<
- segment_header_t>;
- read_segment_header_ret read_segment_header(segment_id_t segment);
-
/// return ordered vector of segments to replay
using replay_segments_t = std::vector<
std::pair<journal_seq_t, segment_header_t>>;
- using find_replay_segments_ertr = crimson::errorator<
+ using prep_replay_segments_ertr = crimson::errorator<
crimson::ct_error::input_output_error
>;
- using find_replay_segments_fut = find_replay_segments_ertr::future<
+ using prep_replay_segments_fut = prep_replay_segments_ertr::future<
replay_segments_t>;
- find_replay_segments_fut find_replay_segments();
+ prep_replay_segments_fut prep_replay_segments(
+ std::vector<std::pair<segment_id_t, segment_header_t>> segments);
/// attempts to decode deltas from bl, return nullopt if unsuccessful
std::optional<std::vector<delta_info_t>> try_decode_deltas(
record_header_t header,
const bufferlist &bl);
- /// attempts to decode extent infos from bl, return nullopt if unsuccessful
- std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
- record_header_t header,
- const bufferlist &bl);
-
- /// read record metadata for record starting at start
- using read_validate_record_metadata_ertr = replay_ertr;
- using read_validate_record_metadata_ret =
- read_validate_record_metadata_ertr::future<
- std::optional<std::pair<record_header_t, bufferlist>>
- >;
- read_validate_record_metadata_ret read_validate_record_metadata(
- paddr_t start,
- segment_nonce_t nonce);
-
-public:
- /// scan segment for end incrementally
- struct scan_valid_records_cursor {
- bool last_valid_header_found = false;
- paddr_t offset;
- paddr_t last_committed;
-
- struct found_record_t {
- paddr_t offset;
- record_header_t header;
- bufferlist mdbuffer;
-
- found_record_t(
- paddr_t offset,
- const record_header_t &header,
- const bufferlist &mdbuffer)
- : offset(offset), header(header), mdbuffer(mdbuffer) {}
- };
- std::deque<found_record_t> pending_records;
-
- bool is_complete() const {
- return last_valid_header_found && pending_records.empty();
- }
-
- paddr_t get_offset() const {
- return offset;
- }
-
- scan_valid_records_cursor(
- paddr_t offset)
- : offset(offset) {}
- };
private:
- using scan_valid_records_ertr = SegmentManager::read_ertr;
- using scan_valid_records_ret = scan_valid_records_ertr::future<
- size_t>;
- using found_record_handler_t = std::function<
- scan_valid_records_ertr::future<>(
- paddr_t record_block_base,
- // callee may assume header and bl will remain valid until
- // returned future resolves
- const record_header_t &header,
- const bufferlist &bl)>;
- scan_valid_records_ret scan_valid_records(
- scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
- segment_nonce_t nonce, ///< [in] nonce for segment
- size_t budget, ///< [in] max budget to use
- found_record_handler_t &handler ///< [in] handler for records
- ); ///< @return used budget
-
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/scanner.h"
+#include "crimson/common/log.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_seastore);
+ }
+}
+
+namespace crimson::os::seastore {
+
+Scanner::read_segment_header_ret
+Scanner::read_segment_header(segment_id_t segment)
+{
+ return segment_manager.read(
+ paddr_t{segment, 0},
+ segment_manager.get_block_size()
+ ).handle_error(
+ read_segment_header_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in Scanner::read_segment_header"
+ }
+ ).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
+ logger().debug("segment {} bptr size {}", segment, bptr.length());
+
+ segment_header_t header;
+ bufferlist bl;
+ bl.push_back(bptr);
+
+ logger().debug(
+ "Scanner::read_segment_header: segment {} block crc {}",
+ segment,
+ bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+ auto bp = bl.cbegin();
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ logger().debug(
+ "Scanner::read_segment_header: segment {} unable to decode "
+ "header, skipping",
+ segment);
+ return crimson::ct_error::enodata::make();
+ }
+ logger().debug(
+ "Scanner::read_segment_header: segment {} header {}",
+ segment,
+ header);
+ return read_segment_header_ret(
+ read_segment_header_ertr::ready_future_marker{},
+ header);
+ });
+}
+Scanner::scan_extents_ret Scanner::scan_extents(
+ scan_extents_cursor &cursor,
+ extent_len_t bytes_to_read)
+{
+ auto ret = std::make_unique<scan_extents_ret_bare>();
+ auto* extents = ret.get();
+ return read_segment_header(cursor.get_offset().segment
+ ).handle_error(
+ scan_extents_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in Scanner::scan_extents"
+ }
+ ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
+ auto segment_nonce = segment_header.segment_nonce;
+ return seastar::do_with(
+ found_record_handler_t(
+ [extents, this](
+ paddr_t base,
+ const record_header_t &header,
+ const bufferlist &mdbuf) mutable {
+
+ auto infos = try_decode_extent_infos(
+ header,
+ mdbuf);
+ if (!infos) {
+ // This should be impossible, we did check the crc on the mdbuf
+ logger().error(
+ "Scanner::scan_extents unable to decode extents for record {}",
+ base);
+ assert(infos);
+ }
+
+ paddr_t extent_offset = base.add_offset(header.mdlength);
+ for (const auto &i : *infos) {
+ extents->emplace_back(extent_offset, i);
+ extent_offset.offset += i.len;
+ }
+ return scan_extents_ertr::now();
+ }),
+ [=, &cursor](auto &dhandler) {
+ return scan_valid_records(
+ cursor,
+ segment_nonce,
+ bytes_to_read,
+ dhandler).discard_result();
+ });
+ }).safe_then([ret=std::move(ret)] {
+ return std::move(*ret);
+ });
+}
+
+Scanner::scan_valid_records_ret Scanner::scan_valid_records(
+ scan_valid_records_cursor &cursor,
+ segment_nonce_t nonce,
+ size_t budget,
+ found_record_handler_t &handler)
+{
+ if (cursor.offset.offset == 0) {
+ cursor.offset.offset = segment_manager.get_block_size();
+ }
+ auto retref = std::make_unique<size_t>(0);
+ auto budget_used = *retref;
+ return crimson::repeat(
+ [=, &cursor, &budget_used, &handler]() mutable
+ -> scan_valid_records_ertr::future<seastar::stop_iteration> {
+ return [=, &handler, &cursor, &budget_used] {
+ if (!cursor.last_valid_header_found) {
+ return read_validate_record_metadata(cursor.offset, nonce
+ ).safe_then([=, &cursor](auto md) {
+ logger().debug(
+ "Scanner::scan_valid_records: read complete {}",
+ cursor.offset);
+ if (!md) {
+ logger().debug(
+ "Scanner::scan_valid_records: found invalid header at {}, presumably at end",
+ cursor.offset);
+ cursor.last_valid_header_found = true;
+ return scan_valid_records_ertr::now();
+ } else {
+ logger().debug(
+ "Scanner::scan_valid_records: valid record read at {}",
+ cursor.offset);
+ cursor.last_committed = paddr_t{
+ cursor.offset.segment,
+ md->first.committed_to};
+ cursor.pending_records.emplace_back(
+ cursor.offset,
+ md->first,
+ md->second);
+ cursor.offset.offset +=
+ md->first.dlength + md->first.mdlength;
+ return scan_valid_records_ertr::now();
+ }
+ }).safe_then([=, &cursor, &budget_used, &handler] {
+ return crimson::repeat(
+ [=, &budget_used, &cursor, &handler] {
+ logger().debug(
+ "Scanner::scan_valid_records: valid record read, processing queue");
+ if (cursor.pending_records.empty()) {
+ /* This is only possible if the segment is empty.
+ * A record's last_commited must be prior to its own
+ * location since it itself cannot yet have been committed
+ * at its own time of submission. Thus, the most recently
+ * read record must always fall after cursor.last_committed */
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ auto &next = cursor.pending_records.front();
+ if (next.offset > cursor.last_committed) {
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ }
+ budget_used +=
+ next.header.dlength + next.header.mdlength;
+ return handler(
+ next.offset,
+ next.header,
+ next.mdbuffer
+ ).safe_then([&cursor] {
+ cursor.pending_records.pop_front();
+ return scan_valid_records_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ });
+ });
+ });
+ } else {
+ assert(!cursor.pending_records.empty());
+ auto &next = cursor.pending_records.front();
+ return read_validate_data(next.offset, next.header
+ ).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
+ if (!valid) {
+ cursor.pending_records.clear();
+ return scan_valid_records_ertr::now();
+ }
+ budget_used +=
+ next.header.dlength + next.header.mdlength;
+ return handler(
+ next.offset,
+ next.header,
+ next.mdbuffer
+ ).safe_then([&cursor] {
+ cursor.pending_records.pop_front();
+ return scan_valid_records_ertr::now();
+ });
+ });
+ }
+ }().safe_then([=, &budget_used, &cursor] {
+ if (cursor.is_complete() || budget_used >= budget) {
+ return seastar::stop_iteration::yes;
+ } else {
+ return seastar::stop_iteration::no;
+ }
+ });
+ }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
+ return scan_valid_records_ret(
+ scan_valid_records_ertr::ready_future_marker{},
+ std::move(*retref));
+ });
+}
+
+Scanner::read_validate_record_metadata_ret
+Scanner::read_validate_record_metadata(
+ paddr_t start,
+ segment_nonce_t nonce)
+{
+ auto block_size = segment_manager.get_block_size();
+ if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ return segment_manager.read(start, block_size
+ ).safe_then(
+ [=](bufferptr bptr) mutable
+ -> read_validate_record_metadata_ret {
+ logger().debug("read_validate_record_metadata: reading {}", start);
+ auto block_size = segment_manager.get_block_size();
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ record_header_t header;
+ try {
+ decode(header, bp);
+ } catch (ceph::buffer::error &e) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.segment_nonce != nonce) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ if (header.mdlength > (extent_len_t)block_size) {
+ if (start.offset + header.mdlength >
+ (int64_t)segment_manager.get_segment_size()) {
+ return crimson::ct_error::input_output_error::make();
+ }
+ return segment_manager.read(
+ {start.segment, start.offset + (segment_off_t)block_size},
+ header.mdlength - block_size).safe_then(
+ [header=std::move(header), bl=std::move(bl)](
+ auto &&bptail) mutable {
+ bl.push_back(bptail);
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl)));
+ });
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::make_pair(std::move(header), std::move(bl))
+ );
+ }
+ }).safe_then([=](auto p) {
+ if (p && validate_metadata(p->second)) {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::move(*p)
+ );
+ } else {
+ return read_validate_record_metadata_ret(
+ read_validate_record_metadata_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ });
+}
+
+std::optional<std::vector<extent_info_t>>
+Scanner::try_decode_extent_infos(
+ record_header_t header,
+ const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ bliter += sizeof(checksum_t) /* crc */;
+ logger().debug("{}: decoding {} extents", __func__, header.extents);
+ std::vector<extent_info_t> extent_infos(header.extents);
+ for (auto &&i : extent_infos) {
+ try {
+ decode(i, bliter);
+ } catch (ceph::buffer::error &e) {
+ return std::nullopt;
+ }
+ }
+ return extent_infos;
+}
+
+Scanner::read_validate_data_ret
+Scanner::read_validate_data(
+ paddr_t record_base,
+ const record_header_t &header)
+{
+ return segment_manager.read(
+ record_base.add_offset(header.mdlength),
+ header.dlength
+ ).safe_then([=, &header](auto bptr) {
+ bufferlist bl;
+ bl.append(bptr);
+ return bl.crc32c(-1) == header.data_crc;
+ });
+}
+
+bool Scanner::validate_metadata(const bufferlist &bl)
+{
+ auto bliter = bl.cbegin();
+ auto test_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<record_header_t>(),
+ -1);
+ ceph_le32 recorded_crc_le;
+ decode(recorded_crc_le, bliter);
+ uint32_t recorded_crc = recorded_crc_le;
+ test_crc = bliter.crc32c(
+ bliter.get_remaining(),
+ test_crc);
+ return test_crc == recorded_crc;
+}
+
+} // namespace crimson::os::seastore
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+
+class SegmentCleaner;
+
+class Scanner {
+public:
+ using read_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent,
+ crimson::ct_error::erange>;
+
+ Scanner(SegmentManager& segment_manager)
+ : segment_manager(segment_manager) {}
+ using read_segment_header_ertr = crimson::errorator<
+ crimson::ct_error::enoent,
+ crimson::ct_error::enodata,
+ crimson::ct_error::input_output_error
+ >;
+ using read_segment_header_ret = read_segment_header_ertr::future<
+ segment_header_t>;
+ read_segment_header_ret read_segment_header(segment_id_t segment);
+
+ /**
+ * scan_extents
+ *
+ * Scans records beginning at addr until the first record boundary after
+ * addr + bytes_to_read.
+ *
+ * Returns list<extent, extent_info>
+ * cursor.is_complete() will be true when no further extents exist in segment.
+ */
+ using scan_extents_cursor = scan_valid_records_cursor;
+ using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+ using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
+ using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
+ scan_extents_ret scan_extents(
+ scan_extents_cursor &cursor,
+ extent_len_t bytes_to_read
+ );
+
+ using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
+ using scan_valid_records_ret = scan_valid_records_ertr::future<
+ size_t>;
+ using found_record_handler_t = std::function<
+ scan_valid_records_ertr::future<>(
+ paddr_t record_block_base,
+ // callee may assume header and bl will remain valid until
+ // returned future resolves
+ const record_header_t &header,
+ const bufferlist &bl)>;
+ scan_valid_records_ret scan_valid_records(
+ scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
+ segment_nonce_t nonce, ///< [in] nonce for segment
+ size_t budget, ///< [in] max budget to use
+ found_record_handler_t &handler ///< [in] handler for records
+ ); ///< @return used budget
+
+private:
+ SegmentManager& segment_manager;
+
+ /// read record metadata for record starting at start
+ using read_validate_record_metadata_ertr = read_ertr;
+ using read_validate_record_metadata_ret =
+ read_validate_record_metadata_ertr::future<
+ std::optional<std::pair<record_header_t, bufferlist>>
+ >;
+ read_validate_record_metadata_ret read_validate_record_metadata(
+ paddr_t start,
+ segment_nonce_t nonce);
+
+ /// attempts to decode extent infos from bl, return nullopt if unsuccessful
+ std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
+ record_header_t header,
+ const bufferlist &bl);
+
+ /// read and validate data
+ using read_validate_data_ertr = read_ertr;
+ using read_validate_data_ret = read_validate_data_ertr::future<bool>;
+ read_validate_data_ret read_validate_data(
+ paddr_t record_base,
+ const record_header_t &header ///< caller must ensure lifetime through
+ /// future resolution
+ );
+
+ /// validate embedded metadata checksum
+ static bool validate_metadata(const bufferlist &bl);
+
+};
+
+using ScannerRef = std::unique_ptr<Scanner>;
+
+} // namespace crimson::os::seastore
segment_manager::block::BlockSegmentManager
>(device + "/block");
+ auto scanner = std::make_unique<Scanner>(*sm);
+ auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
+ std::move(scanner),
false /* detailed */);
- auto journal = std::make_unique<Journal>(*sm);
+ auto journal = std::make_unique<Journal>(*sm, scanner_ref);
auto cache = std::make_unique<Cache>(*sm);
auto lba_manager = lba_manager::create_lba_manager(*sm, *cache);
segment_off_t committed_to,
segment_nonce_t current_segment_nonce = 0);
+/// scan segment for end incrementally
+struct scan_valid_records_cursor {
+ bool last_valid_header_found = false;
+ paddr_t offset;
+ paddr_t last_committed;
+
+ struct found_record_t {
+ paddr_t offset;
+ record_header_t header;
+ bufferlist mdbuffer;
+
+ found_record_t(
+ paddr_t offset,
+ const record_header_t &header,
+ const bufferlist &mdbuffer)
+ : offset(offset), header(header), mdbuffer(mdbuffer) {}
+ };
+ std::deque<found_record_t> pending_records;
+
+ bool is_complete() const {
+ return last_valid_header_found && pending_records.empty();
+ }
+
+ paddr_t get_offset() const {
+ return offset;
+ }
+
+ scan_valid_records_cursor(
+ paddr_t offset)
+ : offset(offset) {}
+};
+
}
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::seastore_meta_t)
segment_usage[id].dump_usage(block_size);
}
-SegmentCleaner::SegmentCleaner(config_t config, bool detailed)
+SegmentCleaner::SegmentCleaner(
+ config_t config,
+ ScannerRef&& scr,
+ bool detailed)
: detailed(detailed),
config(config),
+ scanner(std::move(scr)),
gc_process(*this)
{
register_metrics();
}
next.offset = 0;
scan_cursor =
- std::make_unique<ExtentCallbackInterface::scan_extents_cursor>(
+ std::make_unique<Scanner::scan_extents_cursor>(
next);
logger().debug(
"SegmentCleaner::do_gc: starting gc on segment {}",
ceph_assert(!scan_cursor->is_complete());
}
- return ecb->scan_extents(
+ return scanner->scan_extents(
*scan_cursor,
config.reclaim_bytes_stride
).safe_then([this](auto &&_extents) {
});
}
+SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() {
+ return seastar::do_with(
+ std::vector<std::pair<segment_id_t, segment_header_t>>(),
+ [this](auto& segments) {
+ return crimson::do_for_each(
+ boost::make_counting_iterator(segment_id_t{0}),
+ boost::make_counting_iterator(segment_id_t{num_segments}),
+ [this, &segments](auto segment_id) {
+ return scanner->read_segment_header(segment_id)
+ .safe_then([&segments, segment_id, this](auto header) {
+ if (header.out_of_line) {
+ logger().debug("Scanner::init_segments: out-of-line segment {}", segment_id);
+ init_mark_segment_closed(
+ segment_id,
+ header.journal_segment_seq);
+ } else {
+ logger().debug("Scanner::init_segments: journal segment {}", segment_id);
+ segments.emplace_back(std::make_pair(segment_id, std::move(header)));
+ }
+ return seastar::now();
+ }).handle_error(
+ crimson::ct_error::enoent::handle([](auto) {
+ return init_segments_ertr::now();
+ }),
+ crimson::ct_error::enodata::handle([](auto) {
+ return init_segments_ertr::now();
+ }),
+ crimson::ct_error::input_output_error::pass_further{}
+ );
+ }).safe_then([&segments] {
+ return seastar::make_ready_future<
+ std::vector<std::pair<segment_id_t, segment_header_t>>>(
+ std::move(segments));
+ });
+ });
+}
+
}
return live_bytes_by_segment[segment];
}
public:
- SpaceTrackerSimple(size_t num_segments)
+ SpaceTrackerSimple(segment_id_t num_segments)
: live_bytes_by_segment(num_segments, 0) {}
int64_t allocate(
std::vector<SegmentMap> segment_usage;
public:
- SpaceTrackerDetailed(size_t num_segments, size_t segment_size, size_t block_size)
+ SpaceTrackerDetailed(segment_id_t num_segments, size_t segment_size, size_t block_size)
: block_size(block_size),
segment_size(segment_size),
segment_usage(num_segments, segment_size / block_size) {}
laddr_t laddr,
segment_off_t len) = 0;
- /**
- * scan_extents
- *
- * Interface shim for Journal::scan_extents
- */
- using scan_extents_cursor = Journal::scan_valid_records_cursor;
- using scan_extents_ertr = Journal::scan_extents_ertr;
- using scan_extents_ret = Journal::scan_extents_ret;
- virtual scan_extents_ret scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read) = 0;
-
/**
* release_segment
*
const bool detailed;
const config_t config;
- size_t num_segments = 0;
+ segment_id_t num_segments = 0;
size_t segment_size = 0;
size_t block_size = 0;
+ ScannerRef scanner;
+
SpaceTrackerIRef space_tracker;
std::vector<segment_info_t> segments;
size_t empty_segments;
std::optional<seastar::promise<>> blocked_io_wake;
public:
- SegmentCleaner(config_t config, bool detailed = false);
+ SegmentCleaner(
+ config_t config,
+ ScannerRef&& scanner,
+ bool detailed = false);
void mount(SegmentManager &sm) {
init_complete = false;
empty_segments = num_segments;
}
+ using init_segments_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ using init_segments_ret_bare =
+ std::vector<std::pair<segment_id_t, segment_header_t>>;
+ using init_segments_ret = init_segments_ertr::future<init_segments_ret_bare>;
+ init_segments_ret init_segments();
+
get_segment_ret get_segment() final;
void close_segment(segment_id_t segment) final;
// GC status helpers
std::unique_ptr<
- ExtentCallbackInterface::scan_extents_cursor
+ Scanner::scan_extents_cursor
> scan_cursor;
/**
} gc_process;
using gc_ertr = work_ertr::extend_ertr<
- ExtentCallbackInterface::scan_extents_ertr
+ Scanner::scan_extents_ertr
>;
gc_cycle_ret do_gc_cycle();
LOG_PREFIX(TransactionManager::mount);
cache->init();
segment_cleaner->mount(segment_manager);
- return journal->replay([this](auto seq, auto paddr, const auto &e) {
- return cache->replay_delta(seq, paddr, e);
+ return segment_cleaner->init_segments().safe_then(
+ [this](auto&& segments) {
+ return journal->replay(
+ std::move(segments),
+ [this](auto seq, auto paddr, const auto &e) {
+ return cache->replay_delta(seq, paddr, e);
+ });
}).safe_then([this] {
return journal->open_for_write();
}).safe_then([this, FNAME](auto addr) {
laddr_t laddr,
segment_off_t len) final;
- using scan_extents_cursor =
- SegmentCleaner::ExtentCallbackInterface::scan_extents_cursor;
- using scan_extents_ertr =
- SegmentCleaner::ExtentCallbackInterface::scan_extents_ertr;
- using scan_extents_ret =
- SegmentCleaner::ExtentCallbackInterface::scan_extents_ret;
- scan_extents_ret scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read) final {
- return journal->scan_extents(cursor, bytes_to_read);
- }
-
using release_segment_ret =
SegmentCleaner::ExtentCallbackInterface::release_segment_ret;
release_segment_ret release_segment(
struct btree_lba_manager_test :
public seastar_test_suite_t, SegmentProvider {
segment_manager::EphemeralSegmentManagerRef segment_manager;
+ ScannerRef scanner;
Journal journal;
Cache cache;
BtreeLBAManagerRef lba_manager;
btree_lba_manager_test()
: segment_manager(segment_manager::create_test_ephemeral()),
- journal(*segment_manager),
+ scanner(new Scanner(*segment_manager)),
+ journal(*segment_manager, *scanner),
cache(*segment_manager),
lba_manager(new BtreeLBAManager(*segment_manager, cache)),
block_size(segment_manager->get_block_size())
const segment_off_t block_size;
+ ScannerRef scanner;
+
journal_test_t()
: segment_manager(segment_manager::create_test_ephemeral()),
- block_size(segment_manager->get_block_size())
- {
- }
+ block_size(segment_manager->get_block_size()),
+ scanner(std::make_unique<Scanner>(*segment_manager))
+ {}
segment_id_t next = 0;
get_segment_ret get_segment() final {
void update_journal_tail_committed(journal_seq_t paddr) final {}
seastar::future<> set_up_fut() final {
- journal.reset(new Journal(*segment_manager));
+ journal.reset(new Journal(*segment_manager, *scanner));
journal->set_segment_provider(this);
journal->set_write_pipeline(&pipeline);
return segment_manager->init(
auto replay(T &&f) {
return journal->close(
).safe_then([this, f=std::move(f)]() mutable {
- journal.reset(new Journal(*segment_manager));
+ journal.reset(new Journal(*segment_manager, *scanner));
journal->set_segment_provider(this);
journal->set_write_pipeline(&pipeline);
- return journal->replay(std::forward<T>(std::move(f)));
- }).safe_then([this] {
- return journal->open_for_write();
+ return seastar::do_with(
+ std::vector<std::pair<segment_id_t, segment_header_t>>(),
+ [this](auto& segments) {
+ return crimson::do_for_each(
+ boost::make_counting_iterator(segment_id_t{0}),
+ boost::make_counting_iterator(segment_manager->get_num_segments()),
+ [this, &segments](auto segment_id) {
+ return scanner->read_segment_header(segment_id)
+ .safe_then([&segments, segment_id](auto header) {
+ if (!header.out_of_line) {
+ segments.emplace_back(std::make_pair(segment_id, std::move(header)));
+ }
+ return seastar::now();
+ }).handle_error(
+ crimson::ct_error::enoent::handle([](auto) {
+ return SegmentCleaner::init_segments_ertr::now();
+ }),
+ crimson::ct_error::enodata::handle([](auto) {
+ return SegmentCleaner::init_segments_ertr::now();
+ }),
+ crimson::ct_error::input_output_error::pass_further{}
+ );
+ }).safe_then([&segments] {
+ return seastar::make_ready_future<
+ std::vector<std::pair<segment_id_t, segment_header_t>>>(
+ std::move(segments));
+ });
+ }).safe_then([this, f=std::move(f)](auto&& segments) mutable {
+ return journal->replay(
+ std::move(segments),
+ std::forward<T>(std::move(f)));
+ }).safe_then([this] {
+ return journal->open_for_write();
+ });
});
}
auto get_transaction_manager(
SegmentManager &segment_manager) {
+ auto scanner = std::make_unique<Scanner>(segment_manager);
+ auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
+ std::move(scanner),
true);
- auto journal = std::make_unique<Journal>(segment_manager);
+ auto journal = std::make_unique<Journal>(segment_manager, scanner_ref);
auto cache = std::make_unique<Cache>(segment_manager);
auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);