register_metrics();
INFO("{} segments", segments.get_num_segments());
- return seastar::do_with(
- std::vector<std::pair<segment_id_t, segment_header_t>>(),
- [this, FNAME](auto& segment_set) {
- return crimson::do_for_each(
- segments.begin(),
- segments.end(),
- [this, FNAME, &segment_set](auto& it) {
- auto segment_id = it.first;
- return sm_group->read_segment_header(
- segment_id
- ).safe_then([segment_id, this, FNAME, &segment_set](auto header) {
- INFO("segment_id={} -- {}", segment_id, header);
- auto s_type = header.get_type();
- if (s_type == segment_type_t::NULL_SEG) {
- ERROR("got null segment, segment_id={} -- {}", segment_id, header);
- ceph_abort();
- }
- return sm_group->read_segment_tail(
- segment_id
- ).safe_then([this, segment_id, &segment_set, header](auto tail)
- -> scan_extents_ertr::future<> {
- if (tail.segment_nonce != header.segment_nonce) {
- return scan_nonfull_segment(header, segment_set, segment_id);
- }
- time_point last_modified(duration(tail.last_modified));
- time_point last_rewritten(duration(tail.last_rewritten));
- segments.update_last_modified_rewritten(
- segment_id, last_modified, last_rewritten);
- if (tail.get_type() == segment_type_t::JOURNAL) {
- update_journal_tail_committed(tail.journal_tail);
- update_journal_tail_target(
- tail.journal_tail,
- tail.alloc_replay_from);
- }
- init_mark_segment_closed(
- segment_id,
- header.segment_seq,
- header.type,
- header.category,
- header.generation);
- return seastar::now();
- }).handle_error(
- crimson::ct_error::enodata::handle(
- [this, header, segment_id, &segment_set](auto) {
- return scan_nonfull_segment(header, segment_set, segment_id);
- }),
- crimson::ct_error::pass_further_all{}
- );
- }).handle_error(
- crimson::ct_error::enoent::handle([](auto) {
- return mount_ertr::now();
- }),
- crimson::ct_error::enodata::handle([](auto) {
- return mount_ertr::now();
- }),
- crimson::ct_error::input_output_error::pass_further{},
- crimson::ct_error::assert_all{"unexpected error"}
- );
- });
+ return crimson::do_for_each(
+ segments.begin(),
+ segments.end(),
+ [this, FNAME](auto& it)
+ {
+ auto segment_id = it.first;
+ return sm_group->read_segment_header(
+ segment_id
+ ).safe_then([segment_id, this, FNAME](auto header) {
+ INFO("segment_id={} -- {}", segment_id, header);
+ auto s_type = header.get_type();
+ if (s_type == segment_type_t::NULL_SEG) {
+ ERROR("got null segment, segment_id={} -- {}", segment_id, header);
+ ceph_abort();
+ }
+ return sm_group->read_segment_tail(
+ segment_id
+ ).safe_then([this, segment_id, header](auto tail)
+ -> scan_extents_ertr::future<> {
+ if (tail.segment_nonce != header.segment_nonce) {
+ return scan_no_tail_segment(header, segment_id);
+ }
+ time_point last_modified(duration(tail.last_modified));
+ time_point last_rewritten(duration(tail.last_rewritten));
+ segments.update_last_modified_rewritten(
+ segment_id, last_modified, last_rewritten);
+ if (tail.get_type() == segment_type_t::JOURNAL) {
+ update_journal_tail_committed(tail.journal_tail);
+ update_journal_tail_target(
+ tail.journal_tail,
+ tail.alloc_replay_from);
+ }
+ init_mark_segment_closed(
+ segment_id,
+ header.segment_seq,
+ header.type,
+ header.category,
+ header.generation);
+ return seastar::now();
+ }).handle_error(
+ crimson::ct_error::enodata::handle(
+ [this, header, segment_id](auto) {
+ return scan_no_tail_segment(header, segment_id);
+ }),
+ crimson::ct_error::pass_further_all{}
+ );
+ }).handle_error(
+ crimson::ct_error::enoent::handle([](auto) {
+ return mount_ertr::now();
+ }),
+ crimson::ct_error::enodata::handle([](auto) {
+ return mount_ertr::now();
+ }),
+ crimson::ct_error::input_output_error::pass_further{},
+ crimson::ct_error::assert_all{"unexpected error"}
+ );
});
}
-AsyncCleaner::scan_extents_ret AsyncCleaner::scan_nonfull_segment(
+AsyncCleaner::scan_extents_ret AsyncCleaner::scan_no_tail_segment(
const segment_header_t& header,
- scan_extents_ret_bare& segment_set,
segment_id_t segment_id)
{
return seastar::do_with(
scan_valid_records_cursor({
segments[segment_id].seq,
- paddr_t::make_seg_paddr(segment_id, 0)}),
- [this, segment_id, segment_header=header](auto& cursor) {
- return seastar::do_with(
- SegmentManagerGroup::found_record_handler_t(
- [this, segment_id, segment_header](
- record_locator_t locator,
- const record_group_header_t& header,
- const bufferlist& mdbuf
- ) mutable -> SegmentManagerGroup::scan_valid_records_ertr::future<> {
- LOG_PREFIX(AsyncCleaner::scan_nonfull_segment);
- if (segment_header.get_type() == segment_type_t::OOL) {
- DEBUG("out-of-line segment {}, decodeing {} records",
- segment_id,
- header.records);
- auto maybe_headers = try_decode_record_headers(header, mdbuf);
- if (!maybe_headers) {
- ERROR("unable to decode record headers for record group {}",
- locator.record_block_base);
- return crimson::ct_error::input_output_error::make();
- }
-
- for (auto& header : *maybe_headers) {
- mod_time_point_t ctime = header.commit_time;
- auto commit_type = header.commit_type;
- if (!ctime) {
- ERROR("AsyncCleaner::scan_nonfull_segment: extent {} 0 commit_time",
- ctime);
- ceph_abort("0 commit_time");
- }
- time_point commit_time{duration(ctime)};
- assert(commit_type == record_commit_type_t::MODIFY
- || commit_type == record_commit_type_t::REWRITE);
- if (commit_type == record_commit_type_t::MODIFY) {
- segments.update_last_modified_rewritten(segment_id, commit_time, {});
- }
- if (commit_type == record_commit_type_t::REWRITE) {
- segments.update_last_modified_rewritten(segment_id, {}, commit_time);
- }
- }
- } else {
- DEBUG("inline segment {}, decodeing {} records",
- segment_id,
- header.records);
- auto maybe_record_deltas_list = try_decode_deltas(
- header, mdbuf, locator.record_block_base);
- if (!maybe_record_deltas_list) {
- ERROR("unable to decode deltas for record {} at {}",
- header, locator);
- return crimson::ct_error::input_output_error::make();
- }
- for (auto &record_deltas : *maybe_record_deltas_list) {
- for (auto &[ctime, delta] : record_deltas.deltas) {
- if (delta.type == extent_types_t::ALLOC_TAIL) {
- journal_seq_t seq;
- decode(seq, delta.bl);
- update_alloc_info_replay_from(seq);
- }
- }
- }
- }
- return seastar::now();
- }),
- [&cursor, segment_header, this](auto& handler) {
- return sm_group->scan_valid_records(
- cursor,
- segment_header.segment_nonce,
- segments.get_segment_size(),
- handler);
+ paddr_t::make_seg_paddr(segment_id, 0)
+ }),
+ SegmentManagerGroup::found_record_handler_t(
+ [this, segment_id, segment_header=header](
+ record_locator_t locator,
+ const record_group_header_t& header,
+ const bufferlist& mdbuf
+ ) mutable -> SegmentManagerGroup::scan_valid_records_ertr::future<>
+ {
+ LOG_PREFIX(AsyncCleaner::scan_no_tail_segment);
+ if (segment_header.get_type() == segment_type_t::OOL) {
+ DEBUG("out-of-line segment {}, decodeing {} records",
+ segment_id,
+ header.records);
+ auto maybe_headers = try_decode_record_headers(header, mdbuf);
+ if (!maybe_headers) {
+ ERROR("unable to decode record headers for record group {}",
+ locator.record_block_base);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ for (auto& header : *maybe_headers) {
+ mod_time_point_t ctime = header.commit_time;
+ auto commit_type = header.commit_type;
+ if (!ctime) {
+ ERROR("extent {} 0 commit_time",
+ ctime);
+ ceph_abort("0 commit_time");
+ }
+ time_point commit_time{duration(ctime)};
+ assert(commit_type == record_commit_type_t::MODIFY
+ || commit_type == record_commit_type_t::REWRITE);
+ if (commit_type == record_commit_type_t::MODIFY) {
+ segments.update_last_modified_rewritten(segment_id, commit_time, {});
+ }
+ if (commit_type == record_commit_type_t::REWRITE) {
+ segments.update_last_modified_rewritten(segment_id, {}, commit_time);
+ }
+ }
+ } else {
+ DEBUG("inline segment {}, decodeing {} records",
+ segment_id,
+ header.records);
+ auto maybe_record_deltas_list = try_decode_deltas(
+ header, mdbuf, locator.record_block_base);
+ if (!maybe_record_deltas_list) {
+ ERROR("unable to decode deltas for record {} at {}",
+ header, locator);
+ return crimson::ct_error::input_output_error::make();
+ }
+ for (auto &record_deltas : *maybe_record_deltas_list) {
+ for (auto &[ctime, delta] : record_deltas.deltas) {
+ if (delta.type == extent_types_t::ALLOC_TAIL) {
+ journal_seq_t seq;
+ decode(seq, delta.bl);
+ update_alloc_info_replay_from(seq);
+ }
+ }
+ }
}
- );
- }).safe_then([this, segment_id, header](auto) {
+ return seastar::now();
+ }),
+ [this, header](auto &cursor, auto &handler)
+ {
+ return sm_group->scan_valid_records(
+ cursor,
+ header.segment_nonce,
+ segments.get_segment_size(),
+ handler).discard_result();
+ }).safe_then([this, segment_id, header] {
init_mark_segment_closed(
segment_id,
header.segment_seq,
header.type,
header.category,
header.generation);
- return seastar::now();
});
}
});
}
-SegmentManagerGroup::scan_extents_ret
-SegmentManagerGroup::scan_extents(
- scan_extents_cursor &cursor,
- extent_len_t bytes_to_read)
-{
- auto ret = std::make_unique<scan_extents_ret_bare>();
- auto* extents = ret.get();
- return read_segment_header(cursor.get_segment_id()
- ).handle_error(
- scan_extents_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in SegmentManagerGroup::scan_extents"
- }
- ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
- auto segment_nonce = segment_header.segment_nonce;
- return seastar::do_with(
- found_record_handler_t([extents](
- record_locator_t locator,
- const record_group_header_t& header,
- const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
- {
- LOG_PREFIX(SegmentManagerGroup::scan_extents);
- DEBUG("decoding {} records", header.records);
- auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
- if (!maybe_record_extent_infos) {
- // This should be impossible, we did check the crc on the mdbuf
- ERROR("unable to decode extents for record {}",
- locator.record_block_base);
- return crimson::ct_error::input_output_error::make();
- }
-
- paddr_t extent_offset = locator.record_block_base;
- for (auto& r: *maybe_record_extent_infos) {
- DEBUG("decoded {} extents", r.extent_infos.size());
- for (const auto &i : r.extent_infos) {
- extents->emplace_back(
- extent_offset,
- std::pair<commit_info_t, extent_info_t>(
- {r.header.commit_time,
- r.header.commit_type},
- i));
- auto& seg_addr = extent_offset.as_seg_paddr();
- seg_addr.set_segment_off(
- seg_addr.get_segment_off() + i.len);
- }
- }
- return scan_extents_ertr::now();
- }),
- [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
- return scan_valid_records(
- cursor,
- segment_nonce,
- bytes_to_read,
- dhandler
- ).discard_result();
- }
- );
- }).safe_then([ret=std::move(ret)] {
- return std::move(*ret);
- });
-}
-
SegmentManagerGroup::scan_valid_records_ret
SegmentManagerGroup::scan_valid_records(
scan_valid_records_cursor &cursor,