segment_manager/block.cc
transaction_manager.cc
transaction.cc
- journal.cc
cache.cc
extent_reader.cc
lba_manager.cc
seastore.cc
random_block_manager/nvme_manager.cc
random_block_manager/nvmedevice.cc
+ journal/segmented_journal.cc
+ journal.cc
../../../test/crimson/seastore/test_block.cc
${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
)
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
-#include <iostream>
+#include "journal.h"
+#include "journal/segmented_journal.h"
-#include <boost/iterator/counting_iterator.hpp>
+namespace crimson::os::seastore::journal {
-#include "include/intarith.h"
-
-#include "crimson/common/config_proxy.h"
-#include "crimson/os/seastore/journal.h"
-#include "crimson/os/seastore/logging.h"
-#include "crimson/os/seastore/segment_cleaner.h"
-
-SET_SUBSYS(seastore_journal);
-/*
- * format:
- * - H<handle-addr> information
- *
- * levels:
- * - INFO: major initiation, closing, rolling and replay operations
- * - DEBUG: INFO details, major submit operations
- * - TRACE: DEBUG details
- */
-
-namespace crimson::os::seastore {
-
-segment_nonce_t generate_nonce(
- segment_seq_t seq,
- const seastore_meta_t &meta)
-{
- return ceph_crc32c(
- seq,
- reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
- sizeof(meta.seastore_id.uuid));
-}
-
-Journal::Journal(
- SegmentManager& segment_manager,
- ExtentReader& scanner)
- : journal_segment_manager(segment_manager),
- record_submitter(crimson::common::get_conf<uint64_t>(
- "seastore_journal_iodepth_limit"),
- crimson::common::get_conf<uint64_t>(
- "seastore_journal_batch_capacity"),
- crimson::common::get_conf<Option::size_t>(
- "seastore_journal_batch_flush_size"),
- crimson::common::get_conf<double>(
- "seastore_journal_batch_preferred_fullness"),
- journal_segment_manager),
- scanner(scanner)
-{
- register_metrics();
-}
-
-Journal::open_for_write_ret Journal::open_for_write()
-{
- LOG_PREFIX(Journal::open_for_write);
- INFO("device_id={}", journal_segment_manager.get_device_id());
- return journal_segment_manager.open();
-}
-
-Journal::close_ertr::future<> Journal::close()
-{
- LOG_PREFIX(Journal::close);
- INFO("closing");
- metrics.clear();
- return journal_segment_manager.close();
-}
-
-Journal::prep_replay_segments_fut
-Journal::prep_replay_segments(
- std::vector<std::pair<segment_id_t, segment_header_t>> segments)
-{
- LOG_PREFIX(Journal::prep_replay_segments);
- if (segments.empty()) {
- ERROR("no journal segments for replay");
- return crimson::ct_error::input_output_error::make();
- }
- std::sort(
- segments.begin(),
- segments.end(),
- [](const auto <, const auto &rt) {
- return lt.second.journal_segment_seq <
- rt.second.journal_segment_seq;
- });
-
- journal_segment_manager.set_segment_seq(
- segments.rbegin()->second.journal_segment_seq);
- std::for_each(
- segments.begin(),
- segments.end(),
- [this, FNAME](auto &seg)
- {
- if (seg.first != seg.second.physical_segment_id ||
- seg.first.device_id() != journal_segment_manager.get_device_id() ||
- seg.second.get_type() != segment_type_t::JOURNAL) {
- ERROR("illegal journal segment for replay -- {}", seg.second);
- ceph_abort();
- }
- });
-
- auto journal_tail = segments.rbegin()->second.journal_tail;
- segment_provider->update_journal_tail_committed(journal_tail);
- auto replay_from = journal_tail.offset;
- auto from = segments.begin();
- if (replay_from != P_ADDR_NULL) {
- from = std::find_if(
- segments.begin(),
- segments.end(),
- [&replay_from](const auto &seg) -> bool {
- auto& seg_addr = replay_from.as_seg_paddr();
- return seg.first == seg_addr.get_segment_id();
- });
- if (from->second.journal_segment_seq != journal_tail.segment_seq) {
- ERROR("journal_tail {} does not match {}",
- journal_tail, from->second);
- ceph_abort();
- }
- } else {
- replay_from = paddr_t::make_seg_paddr(
- from->first,
- journal_segment_manager.get_block_size());
- }
-
- auto num_segments = segments.end() - from;
- INFO("{} segments to replay, from {}",
- num_segments, replay_from);
- auto ret = replay_segments_t(num_segments);
- std::transform(
- from, segments.end(), ret.begin(),
- [this](const auto &p) {
- auto ret = journal_seq_t{
- p.second.journal_segment_seq,
- paddr_t::make_seg_paddr(
- p.first,
- journal_segment_manager.get_block_size())
- };
- return std::make_pair(ret, p.second);
- });
- ret[0].first.offset = replay_from;
- return prep_replay_segments_fut(
- prep_replay_segments_ertr::ready_future_marker{},
- std::move(ret));
-}
-
-Journal::replay_ertr::future<>
-Journal::replay_segment(
- journal_seq_t seq,
- segment_header_t header,
- delta_handler_t &handler)
-{
- LOG_PREFIX(Journal::replay_segment);
- INFO("starting at {} -- {}", seq, header);
- return seastar::do_with(
- scan_valid_records_cursor(seq),
- ExtentReader::found_record_handler_t([=, &handler](
- record_locator_t locator,
- const record_group_header_t& header,
- const bufferlist& mdbuf)
- -> ExtentReader::scan_valid_records_ertr::future<>
- {
- auto maybe_record_deltas_list = try_decode_deltas(
- header, mdbuf, locator.record_block_base);
- if (!maybe_record_deltas_list) {
- // This should be impossible, we did check the crc on the mdbuf
- ERROR("unable to decode deltas for record {} at {}",
- header, locator);
- return crimson::ct_error::input_output_error::make();
- }
-
- return seastar::do_with(
- std::move(*maybe_record_deltas_list),
- [write_result=locator.write_result,
- this,
- FNAME,
- &handler](auto& record_deltas_list)
- {
- return crimson::do_for_each(
- record_deltas_list,
- [write_result,
- this,
- FNAME,
- &handler](record_deltas_t& record_deltas)
- {
- auto locator = record_locator_t{
- record_deltas.record_block_base,
- write_result
- };
- DEBUG("processing {} deltas at block_base {}",
- record_deltas.deltas.size(),
- locator);
- return crimson::do_for_each(
- record_deltas.deltas,
- [locator,
- this,
- FNAME,
- &handler](delta_info_t& delta)
- {
- /* The journal may validly contain deltas for extents in
- * since released segments. We can detect those cases by
- * checking whether the segment in question currently has a
- * sequence number > the current journal segment seq. We can
- * safetly skip these deltas because the extent must already
- * have been rewritten.
- */
- if (delta.paddr != P_ADDR_NULL) {
- auto& seg_addr = delta.paddr.as_seg_paddr();
- auto delta_paddr_segment_seq = segment_provider->get_seq(seg_addr.get_segment_id());
- auto delta_paddr_segment_type = segment_seq_to_type(delta_paddr_segment_seq);
- auto locator_segment_seq = locator.write_result.start_seq.segment_seq;
- if (delta_paddr_segment_type == segment_type_t::NULL_SEG ||
- (delta_paddr_segment_type == segment_type_t::JOURNAL &&
- delta_paddr_segment_seq > locator_segment_seq)) {
- SUBDEBUG(seastore_cache,
- "delta is obsolete, delta_paddr_segment_seq={}, locator_segment_seq={} -- {}",
- segment_seq_printer_t{delta_paddr_segment_seq},
- segment_seq_printer_t{locator_segment_seq},
- delta);
- return replay_ertr::now();
- }
- }
- return handler(locator, delta);
- });
- });
- });
- }),
- [=](auto &cursor, auto &dhandler) {
- return scanner.scan_valid_records(
- cursor,
- header.segment_nonce,
- std::numeric_limits<size_t>::max(),
- dhandler).safe_then([](auto){}
- ).handle_error(
- replay_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "shouldn't meet with any other error other replay_ertr"
- }
- );
- }
- );
-}
-
-Journal::replay_ret Journal::replay(
- std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
- delta_handler_t &&delta_handler)
-{
- LOG_PREFIX(Journal::replay);
- INFO("got {} segments", segment_headers.size());
- return seastar::do_with(
- std::move(delta_handler), replay_segments_t(),
- [this, segment_headers=std::move(segment_headers)]
- (auto &handler, auto &segments) mutable -> replay_ret
- {
- return prep_replay_segments(std::move(segment_headers)
- ).safe_then([this, &handler, &segments](auto replay_segs) mutable {
- segments = std::move(replay_segs);
- return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
- return replay_segment(i.first, i.second, handler);
- });
- });
- });
-}
-
-void Journal::register_metrics()
-{
- LOG_PREFIX(Journal::register_metrics);
- DEBUG("");
- record_submitter.reset_stats();
- namespace sm = seastar::metrics;
- metrics.add_group(
- "journal",
- {
- sm::make_counter(
- "record_num",
- [this] {
- return record_submitter.get_record_batch_stats().num_io;
- },
- sm::description("total number of records submitted")
- ),
- sm::make_counter(
- "record_batch_num",
- [this] {
- return record_submitter.get_record_batch_stats().num_io_grouped;
- },
- sm::description("total number of records batched")
- ),
- sm::make_counter(
- "io_num",
- [this] {
- return record_submitter.get_io_depth_stats().num_io;
- },
- sm::description("total number of io submitted")
- ),
- sm::make_counter(
- "io_depth_num",
- [this] {
- return record_submitter.get_io_depth_stats().num_io_grouped;
- },
- sm::description("total number of io depth")
- ),
- sm::make_counter(
- "record_group_padding_bytes",
- [this] {
- return record_submitter.get_record_group_padding_bytes();
- },
- sm::description("bytes of metadata padding when write record groups")
- ),
- sm::make_counter(
- "record_group_metadata_bytes",
- [this] {
- return record_submitter.get_record_group_metadata_bytes();
- },
- sm::description("bytes of raw metadata when write record groups")
- ),
- sm::make_counter(
- "record_group_data_bytes",
- [this] {
- return record_submitter.get_record_group_data_bytes();
- },
- sm::description("bytes of data when write record groups")
- ),
- }
- );
-}
-
-Journal::JournalSegmentManager::JournalSegmentManager(
- SegmentManager& segment_manager)
- : segment_manager{segment_manager}
-{
- reset();
-}
-
-Journal::JournalSegmentManager::open_ret
-Journal::JournalSegmentManager::open()
-{
- return roll().safe_then([this] {
- return get_current_write_seq();
- });
-}
-
-Journal::JournalSegmentManager::close_ertr::future<>
-Journal::JournalSegmentManager::close()
-{
- LOG_PREFIX(JournalSegmentManager::close);
- if (current_journal_segment) {
- INFO("segment_id={}, seq={}, "
- "written_to={}, committed_to={}, nonce={}",
- current_journal_segment->get_segment_id(),
- get_segment_seq(),
- written_to,
- committed_to,
- current_segment_nonce);
- } else {
- INFO("no current journal segment");
- }
-
- return (
- current_journal_segment ?
- current_journal_segment->close() :
- Segment::close_ertr::now()
- ).handle_error(
- close_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in JournalSegmentManager::close()"
- }
- ).finally([this] {
- reset();
- });
-}
-
-Journal::JournalSegmentManager::roll_ertr::future<>
-Journal::JournalSegmentManager::roll()
-{
- LOG_PREFIX(JournalSegmentManager::roll);
- auto old_segment_id = current_journal_segment ?
- current_journal_segment->get_segment_id() :
- NULL_SEG_ID;
- if (current_journal_segment) {
- INFO("closing segment {}, seq={}, "
- "written_to={}, committed_to={}, nonce={}",
- old_segment_id,
- get_segment_seq(),
- written_to,
- committed_to,
- current_segment_nonce);
- }
-
- return (
- current_journal_segment ?
- current_journal_segment->close() :
- Segment::close_ertr::now()
- ).safe_then([this] {
- return segment_provider->get_segment(
- get_device_id(), next_journal_segment_seq);
- }).safe_then([this](auto segment) {
- return segment_manager.open(segment);
- }).safe_then([this](auto sref) {
- current_journal_segment = sref;
- return initialize_segment(*current_journal_segment);
- }).safe_then([this, old_segment_id] {
- if (old_segment_id != NULL_SEG_ID) {
- segment_provider->close_segment(old_segment_id);
- }
- }).handle_error(
- roll_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in JournalSegmentManager::roll"
- }
- );
-}
-
-Journal::JournalSegmentManager::write_ret
-Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
-{
- LOG_PREFIX(JournalSegmentManager::write);
- auto write_length = to_write.length();
- auto write_start_seq = get_current_write_seq();
- TRACE("{}~{}", write_start_seq, write_length);
- assert(write_length > 0);
- assert((write_length % segment_manager.get_block_size()) == 0);
- assert(!needs_roll(write_length));
-
- auto write_start_offset = written_to;
- written_to += write_length;
- auto write_result = write_result_t{
- write_start_seq,
- static_cast<seastore_off_t>(write_length)
- };
- return current_journal_segment->write(
- write_start_offset, to_write
- ).handle_error(
- write_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in JournalSegmentManager::write"
- }
- ).safe_then([write_result] {
- return write_result;
- });
-}
-
-void Journal::JournalSegmentManager::mark_committed(
- const journal_seq_t& new_committed_to)
-{
- LOG_PREFIX(JournalSegmentManager::mark_committed);
- TRACE("{} => {}", committed_to, new_committed_to);
- assert(committed_to == journal_seq_t() ||
- committed_to <= new_committed_to);
- committed_to = new_committed_to;
-}
-
-Journal::JournalSegmentManager::initialize_segment_ertr::future<>
-Journal::JournalSegmentManager::initialize_segment(Segment& segment)
-{
- LOG_PREFIX(JournalSegmentManager::initialize_segment);
- auto new_tail = segment_provider->get_journal_tail_target();
- // write out header
- ceph_assert(segment.get_write_ptr() == 0);
- bufferlist bl;
-
- segment_seq_t seq = next_journal_segment_seq++;
- current_segment_nonce = generate_nonce(
- seq, segment_manager.get_meta());
- auto header = segment_header_t{
- seq,
- segment.get_segment_id(),
- new_tail,
- current_segment_nonce};
- INFO("writing {} ...", header);
- ceph_assert(header.get_type() == segment_type_t::JOURNAL);
- encode(header, bl);
-
- bufferptr bp(
- ceph::buffer::create_page_aligned(
- segment_manager.get_block_size()));
- bp.zero();
- auto iter = bl.cbegin();
- iter.copy(bl.length(), bp.c_str());
- bl.clear();
- bl.append(bp);
-
- written_to = 0;
- return write(bl
- ).safe_then([this, new_tail](auto) {
- segment_provider->update_journal_tail_committed(new_tail);
- });
-}
-
-Journal::RecordBatch::add_pending_ret
-Journal::RecordBatch::add_pending(
- record_t&& record,
- OrderingHandle& handle,
- extent_len_t block_size)
-{
- LOG_PREFIX(RecordBatch::add_pending);
- auto new_size = get_encoded_length_after(record, block_size);
- auto dlength_offset = pending.size.dlength;
- TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
- (void*)&handle,
- pending.get_size() + 1,
- new_size.get_encoded_length(),
- dlength_offset);
- assert(state != state_t::SUBMITTING);
- assert(can_batch(record, block_size).value() == new_size);
-
- pending.push_back(
- std::move(record), block_size);
- assert(pending.size == new_size);
- if (state == state_t::EMPTY) {
- assert(!io_promise.has_value());
- io_promise = seastar::shared_promise<maybe_promise_result_t>();
- } else {
- assert(io_promise.has_value());
- }
- state = state_t::PENDING;
-
- return io_promise->get_shared_future(
- ).then([dlength_offset, FNAME, &handle
- ](auto maybe_promise_result) -> add_pending_ret {
- if (!maybe_promise_result.has_value()) {
- ERROR("H{} write failed", (void*)&handle);
- return crimson::ct_error::input_output_error::make();
- }
- auto write_result = maybe_promise_result->write_result;
- auto submit_result = record_locator_t{
- write_result.start_seq.offset.add_offset(
- maybe_promise_result->mdlength + dlength_offset),
- write_result
- };
- TRACE("H{} write finish with {}", (void*)&handle, submit_result);
- return add_pending_ret(
- add_pending_ertr::ready_future_marker{},
- submit_result);
- });
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-Journal::RecordBatch::encode_batch(
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce)
-{
- assert(state == state_t::PENDING);
- assert(pending.get_size() > 0);
- assert(io_promise.has_value());
-
- state = state_t::SUBMITTING;
- submitting_size = pending.get_size();
- auto gsize = pending.size;
- submitting_length = gsize.get_encoded_length();
- submitting_mdlength = gsize.get_mdlength();
- auto bl = encode_records(pending, committed_to, segment_nonce);
- // Note: pending is cleared here
- assert(bl.length() == (std::size_t)submitting_length);
- return std::make_pair(bl, gsize);
-}
-
-void Journal::RecordBatch::set_result(
- maybe_result_t maybe_write_result)
-{
- maybe_promise_result_t result;
- if (maybe_write_result.has_value()) {
- assert(maybe_write_result->length == submitting_length);
- result = promise_result_t{
- *maybe_write_result,
- submitting_mdlength
- };
- }
- assert(state == state_t::SUBMITTING);
- assert(io_promise.has_value());
-
- state = state_t::EMPTY;
- submitting_size = 0;
- submitting_length = 0;
- submitting_mdlength = 0;
- io_promise->set_value(result);
- io_promise.reset();
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-Journal::RecordBatch::submit_pending_fast(
- record_t&& record,
- extent_len_t block_size,
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce)
-{
- auto new_size = get_encoded_length_after(record, block_size);
- std::ignore = new_size;
- assert(state == state_t::EMPTY);
- assert(can_batch(record, block_size).value() == new_size);
-
- auto group = record_group_t(std::move(record), block_size);
- auto size = group.size;
- assert(size == new_size);
- auto bl = encode_records(group, committed_to, segment_nonce);
- assert(bl.length() == size.get_encoded_length());
- return std::make_pair(bl, size);
-}
-
-Journal::RecordSubmitter::RecordSubmitter(
- std::size_t io_depth,
- std::size_t batch_capacity,
- std::size_t batch_flush_size,
- double preferred_fullness,
- JournalSegmentManager& jsm)
- : io_depth_limit{io_depth},
- preferred_fullness{preferred_fullness},
- journal_segment_manager{jsm},
- batches(new RecordBatch[io_depth + 1])
-{
- LOG_PREFIX(RecordSubmitter);
- INFO("Journal::RecordSubmitter: io_depth_limit={}, "
- "batch_capacity={}, batch_flush_size={}, "
- "preferred_fullness={}",
- io_depth, batch_capacity,
- batch_flush_size, preferred_fullness);
- ceph_assert(io_depth > 0);
- ceph_assert(batch_capacity > 0);
- ceph_assert(preferred_fullness >= 0 &&
- preferred_fullness <= 1);
- free_batch_ptrs.reserve(io_depth + 1);
- for (std::size_t i = 0; i <= io_depth; ++i) {
- batches[i].initialize(i, batch_capacity, batch_flush_size);
- free_batch_ptrs.push_back(&batches[i]);
- }
- pop_free_batch();
-}
-
-Journal::RecordSubmitter::submit_ret
-Journal::RecordSubmitter::submit(
- record_t&& record,
- OrderingHandle& handle)
-{
- LOG_PREFIX(RecordSubmitter::submit);
- DEBUG("H{} {} start ...", (void*)&handle, record);
- assert(write_pipeline);
- auto expected_size = record_group_size_t(
- record.size,
- journal_segment_manager.get_block_size()
- ).get_encoded_length();
- auto max_record_length = journal_segment_manager.get_max_write_length();
- if (expected_size > max_record_length) {
- ERROR("H{} {} exceeds max record size {}",
- (void*)&handle, record, max_record_length);
- return crimson::ct_error::erange::make();
- }
-
- return do_submit(std::move(record), handle);
-}
-
-seastar::future<> Journal::RecordSubmitter::flush(OrderingHandle &handle)
-{
- LOG_PREFIX(RecordSubmitter::flush);
- DEBUG("H{} flush", (void*)&handle);
- return handle.enter(write_pipeline->device_submission
- ).then([this, &handle] {
- return handle.enter(write_pipeline->finalize);
- }).then([FNAME, &handle] {
- DEBUG("H{} flush done", (void*)&handle);
- });
-}
-
-void Journal::RecordSubmitter::update_state()
-{
- if (num_outstanding_io == 0) {
- state = state_t::IDLE;
- } else if (num_outstanding_io < io_depth_limit) {
- state = state_t::PENDING;
- } else if (num_outstanding_io == io_depth_limit) {
- state = state_t::FULL;
- } else {
- ceph_abort("fatal error: io-depth overflow");
- }
-}
-
-void Journal::RecordSubmitter::decrement_io_with_flush()
+JournalRef make_segmented(
+ SegmentManager &sm,
+ ExtentReader &reader,
+ SegmentProvider &provider)
{
- LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
- assert(num_outstanding_io > 0);
- --num_outstanding_io;
-#ifndef NDEBUG
- auto prv_state = state;
-#endif
- update_state();
-
- if (wait_submit_promise.has_value()) {
- DEBUG("wait resolved");
- assert(prv_state == state_t::FULL);
- wait_submit_promise->set_value();
- wait_submit_promise.reset();
- }
-
- if (!p_current_batch->is_empty()) {
- TRACE("flush");
- flush_current_batch();
- }
-}
-
-void Journal::RecordSubmitter::account_submission(
- std::size_t num,
- const record_group_size_t& size)
-{
- stats.record_group_padding_bytes +=
- (size.get_mdlength() - size.get_raw_mdlength());
- stats.record_group_metadata_bytes += size.get_raw_mdlength();
- stats.record_group_data_bytes += size.dlength;
-}
-
-void Journal::RecordSubmitter::finish_submit_batch(
- RecordBatch* p_batch,
- maybe_result_t maybe_result)
-{
- assert(p_batch->is_submitting());
- p_batch->set_result(maybe_result);
- free_batch_ptrs.push_back(p_batch);
- decrement_io_with_flush();
-}
-
-void Journal::RecordSubmitter::flush_current_batch()
-{
- LOG_PREFIX(RecordSubmitter::flush_current_batch);
- RecordBatch* p_batch = p_current_batch;
- assert(p_batch->is_pending());
- p_current_batch = nullptr;
- pop_free_batch();
-
- increment_io();
- auto num = p_batch->get_num_records();
- auto committed_to = journal_segment_manager.get_committed_to();
- auto [to_write, sizes] = p_batch->encode_batch(
- committed_to, journal_segment_manager.get_nonce());
- DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
- num, sizes, committed_to, num_outstanding_io);
- account_submission(num, sizes);
- std::ignore = journal_segment_manager.write(to_write
- ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
- TRACE("{} records, {}, write done with {}", num, sizes, write_result);
- finish_submit_batch(p_batch, write_result);
- }).handle_error(
- crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} records, {}, got error {}", num, sizes, e);
- finish_submit_batch(p_batch, std::nullopt);
- })
- ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} records, {}, got exception {}", num, sizes, e);
- finish_submit_batch(p_batch, std::nullopt);
- });
-}
-
-Journal::RecordSubmitter::submit_pending_ret
-Journal::RecordSubmitter::submit_pending(
- record_t&& record,
- OrderingHandle& handle,
- bool flush)
-{
- LOG_PREFIX(RecordSubmitter::submit_pending);
- assert(!p_current_batch->is_submitting());
- stats.record_batch_stats.increment(
- p_current_batch->get_num_records() + 1);
- bool do_flush = (flush || state == state_t::IDLE);
- auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
- if (do_flush && p_current_batch->is_empty()) {
- // fast path with direct write
- increment_io();
- auto committed_to = journal_segment_manager.get_committed_to();
- auto [to_write, sizes] = p_current_batch->submit_pending_fast(
- std::move(record),
- journal_segment_manager.get_block_size(),
- committed_to,
- journal_segment_manager.get_nonce());
- DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
- (void*)&handle, sizes, committed_to, num_outstanding_io);
- account_submission(1, sizes);
- return journal_segment_manager.write(to_write
- ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
- return record_locator_t{
- write_result.start_seq.offset.add_offset(mdlength),
- write_result
- };
- }).finally([this] {
- decrement_io_with_flush();
- });
- } else {
- // indirect write with or without the existing pending records
- auto write_fut = p_current_batch->add_pending(
- std::move(record),
- handle,
- journal_segment_manager.get_block_size());
- if (do_flush) {
- DEBUG("H{} added pending and flush", (void*)&handle);
- flush_current_batch();
- } else {
- DEBUG("H{} added with {} pending",
- (void*)&handle, p_current_batch->get_num_records());
- }
- return write_fut;
- }
- }();
- return handle.enter(write_pipeline->device_submission
- ).then([write_fut=std::move(write_fut)]() mutable {
- return std::move(write_fut);
- }).safe_then([this, FNAME, &handle](auto submit_result) {
- return handle.enter(write_pipeline->finalize
- ).then([this, FNAME, submit_result, &handle] {
- DEBUG("H{} finish with {}", (void*)&handle, submit_result);
- journal_segment_manager.mark_committed(
- submit_result.write_result.get_end_seq());
- return submit_result;
- });
- });
-}
-
-Journal::RecordSubmitter::do_submit_ret
-Journal::RecordSubmitter::do_submit(
- record_t&& record,
- OrderingHandle& handle)
-{
- LOG_PREFIX(RecordSubmitter::do_submit);
- TRACE("H{} outstanding_io={}/{} ...",
- (void*)&handle, num_outstanding_io, io_depth_limit);
- assert(!p_current_batch->is_submitting());
- if (state <= state_t::PENDING) {
- // can increment io depth
- assert(!wait_submit_promise.has_value());
- auto maybe_new_size = p_current_batch->can_batch(
- record, journal_segment_manager.get_block_size());
- if (!maybe_new_size.has_value() ||
- (maybe_new_size->get_encoded_length() >
- journal_segment_manager.get_max_write_length())) {
- TRACE("H{} flush", (void*)&handle);
- assert(p_current_batch->is_pending());
- flush_current_batch();
- return do_submit(std::move(record), handle);
- } else if (journal_segment_manager.needs_roll(
- maybe_new_size->get_encoded_length())) {
- if (p_current_batch->is_pending()) {
- TRACE("H{} flush and roll", (void*)&handle);
- flush_current_batch();
- } else {
- TRACE("H{} roll", (void*)&handle);
- }
- return journal_segment_manager.roll(
- ).safe_then([this, record=std::move(record), &handle]() mutable {
- return do_submit(std::move(record), handle);
- });
- } else {
- bool flush = (maybe_new_size->get_fullness() > preferred_fullness ?
- true : false);
- return submit_pending(std::move(record), handle, flush);
- }
- }
-
- assert(state == state_t::FULL);
- // cannot increment io depth
- auto maybe_new_size = p_current_batch->can_batch(
- record, journal_segment_manager.get_block_size());
- if (!maybe_new_size.has_value() ||
- (maybe_new_size->get_encoded_length() >
- journal_segment_manager.get_max_write_length()) ||
- journal_segment_manager.needs_roll(
- maybe_new_size->get_encoded_length())) {
- if (!wait_submit_promise.has_value()) {
- wait_submit_promise = seastar::promise<>();
- }
- DEBUG("H{} wait ...", (void*)&handle);
- return wait_submit_promise->get_future(
- ).then([this, record=std::move(record), &handle]() mutable {
- return do_submit(std::move(record), handle);
- });
- } else {
- return submit_pending(std::move(record), handle, false);
- }
+ return std::make_unique<SegmentedJournal>(sm, reader, provider);
}
}
#pragma once
-#include <boost/intrusive_ptr.hpp>
-#include <optional>
+#include <memory>
-#include <seastar/core/circular_buffer.hh>
-#include <seastar/core/future.hh>
-#include <seastar/core/metrics.hh>
-#include <seastar/core/shared_future.hh>
-
-#include "include/ceph_assert.h"
-#include "include/buffer.h"
-#include "include/denc.h"
-
-#include "crimson/os/seastore/extent_reader.h"
-#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
-#include "crimson/osd/exceptions.h"
namespace crimson::os::seastore {
+namespace nvme_device {
+class NVMeBlockDevice;
+}
+
+class SegmentManager;
+class ExtentReader;
class SegmentProvider;
-class SegmentedAllocator;
-/**
- * Manages stream of atomically written records to a SegmentManager.
- */
class Journal {
public:
- Journal(SegmentManager &segment_manager, ExtentReader& scanner);
-
- /**
- * Gets the current journal segment sequence.
- */
- segment_seq_t get_segment_seq() const {
- return journal_segment_manager.get_segment_seq();
- }
-
- /**
- * Sets the SegmentProvider.
- *
- * Not provided in constructor to allow the provider to not own
- * or construct the Journal (TransactionManager).
- *
- * Note, Journal does not own this ptr, user must ensure that
- * *provider outlives Journal.
- */
- void set_segment_provider(SegmentProvider *provider) {
- segment_provider = provider;
- journal_segment_manager.set_segment_provider(provider);
- }
-
/**
* initializes journal for new writes -- must run prior to calls
* to submit_record. Should be called after replay if not a new
crimson::ct_error::input_output_error
>;
using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
- open_for_write_ret open_for_write();
+ virtual open_for_write_ret open_for_write() = 0;
- /**
- * close journal
- *
- * TODO: should probably flush and disallow further writes
- */
+ /// close journal
using close_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
- close_ertr::future<> close();
+ virtual close_ertr::future<> close() = 0;
/**
* submit_record
using submit_record_ret = submit_record_ertr::future<
record_locator_t
>;
- submit_record_ret submit_record(
+ virtual submit_record_ret submit_record(
record_t &&record,
OrderingHandle &handle
- ) {
- return record_submitter.submit(std::move(record), handle);
- }
+ ) = 0;
/**
* flush
* Note, flush() machinery must go through the same pipeline
* stages and locks as submit_record.
*/
- seastar::future<> flush(OrderingHandle &handle) {
- return record_submitter.flush(handle);
- }
+ virtual seastar::future<> flush(OrderingHandle &handle) = 0;
+
+ /// sets write pipeline reference
+ virtual void set_write_pipeline(WritePipeline *_write_pipeline) = 0;
/**
* Read deltas and pass to delta_handler
* record_block_start (argument to delta_handler) is the start of the
* of the first block in the record
*/
- using replay_ertr = SegmentManager::read_ertr;
+ using replay_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent,
+ crimson::ct_error::erange>;
using replay_ret = replay_ertr::future<>;
using delta_handler_t = std::function<
replay_ret(const record_locator_t&,
const delta_info_t&)>;
- replay_ret replay(
- std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
- delta_handler_t &&delta_handler);
-
- void set_write_pipeline(WritePipeline* write_pipeline) {
- record_submitter.set_write_pipeline(write_pipeline);
- }
-
-private:
- class JournalSegmentManager {
- public:
- JournalSegmentManager(SegmentManager&);
-
- using base_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- extent_len_t get_max_write_length() const {
- return segment_manager.get_segment_size() -
- p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
- size_t(segment_manager.get_block_size()));
- }
-
- device_id_t get_device_id() const {
- return segment_manager.get_device_id();
- }
-
- seastore_off_t get_block_size() const {
- return segment_manager.get_block_size();
- }
-
- segment_nonce_t get_nonce() const {
- return current_segment_nonce;
- }
-
- journal_seq_t get_committed_to() const {
- return committed_to;
- }
-
- segment_seq_t get_segment_seq() const {
- return next_journal_segment_seq - 1;
- }
-
- void set_segment_provider(SegmentProvider* provider) {
- segment_provider = provider;
- }
-
- void set_segment_seq(segment_seq_t current_seq) {
- next_journal_segment_seq = (current_seq + 1);
- }
-
- using open_ertr = base_ertr;
- using open_ret = open_ertr::future<journal_seq_t>;
- open_ret open();
-
- using close_ertr = base_ertr;
- close_ertr::future<> close();
-
- // returns true iff the current segment has insufficient space
- bool needs_roll(std::size_t length) const {
- auto write_capacity = current_journal_segment->get_write_capacity();
- return length + written_to > std::size_t(write_capacity);
- }
-
- // close the current segment and initialize next one
- using roll_ertr = base_ertr;
- roll_ertr::future<> roll();
-
- // write the buffer, return the write result
- // May be called concurrently, writes may complete in any order.
- using write_ertr = base_ertr;
- using write_ret = write_ertr::future<write_result_t>;
- write_ret write(ceph::bufferlist to_write);
-
- // mark write committed in order
- void mark_committed(const journal_seq_t& new_committed_to);
-
- private:
- journal_seq_t get_current_write_seq() const {
- assert(current_journal_segment);
- return journal_seq_t{
- get_segment_seq(),
- paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
- written_to)
- };
- }
-
- void reset() {
- next_journal_segment_seq = 0;
- current_segment_nonce = 0;
- current_journal_segment.reset();
- written_to = 0;
- committed_to = {};
- }
-
- // prepare segment for writes, writes out segment header
- using initialize_segment_ertr = base_ertr;
- initialize_segment_ertr::future<> initialize_segment(Segment&);
-
- SegmentProvider* segment_provider;
- SegmentManager& segment_manager;
-
- segment_seq_t next_journal_segment_seq;
- segment_nonce_t current_segment_nonce;
-
- SegmentRef current_journal_segment;
- seastore_off_t written_to;
- // committed_to may be in a previous journal segment
- journal_seq_t committed_to;
- };
-
- class RecordBatch {
- enum class state_t {
- EMPTY = 0,
- PENDING,
- SUBMITTING
- };
-
- public:
- RecordBatch() = default;
- RecordBatch(RecordBatch&&) = delete;
- RecordBatch(const RecordBatch&) = delete;
- RecordBatch& operator=(RecordBatch&&) = delete;
- RecordBatch& operator=(const RecordBatch&) = delete;
-
- bool is_empty() const {
- return state == state_t::EMPTY;
- }
-
- bool is_pending() const {
- return state == state_t::PENDING;
- }
-
- bool is_submitting() const {
- return state == state_t::SUBMITTING;
- }
+ virtual replay_ret replay(
+ delta_handler_t &&delta_handler) = 0;
- std::size_t get_index() const {
- return index;
- }
-
- std::size_t get_num_records() const {
- return pending.get_size();
- }
-
- // return the expected write sizes if allows to batch,
- // otherwise, return nullopt
- std::optional<record_group_size_t> can_batch(
- const record_t& record,
- extent_len_t block_size) const {
- assert(state != state_t::SUBMITTING);
- if (pending.get_size() >= batch_capacity ||
- (pending.get_size() > 0 &&
- pending.size.get_encoded_length() > batch_flush_size)) {
- assert(state == state_t::PENDING);
- return std::nullopt;
- }
- return get_encoded_length_after(record, block_size);
- }
-
- void initialize(std::size_t i,
- std::size_t _batch_capacity,
- std::size_t _batch_flush_size) {
- ceph_assert(_batch_capacity > 0);
- index = i;
- batch_capacity = _batch_capacity;
- batch_flush_size = _batch_flush_size;
- pending.reserve(batch_capacity);
- }
-
- // Add to the batch, the future will be resolved after the batch is
- // written.
- //
- // Set write_result_t::write_length to 0 if the record is not the first one
- // in the batch.
- using add_pending_ertr = JournalSegmentManager::write_ertr;
- using add_pending_ret = add_pending_ertr::future<record_locator_t>;
- add_pending_ret add_pending(
- record_t&&,
- OrderingHandle&,
- extent_len_t block_size);
-
- // Encode the batched records for write.
- std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce);
-
- // Set the write result and reset for reuse
- using maybe_result_t = std::optional<write_result_t>;
- void set_result(maybe_result_t maybe_write_end_seq);
-
- // The fast path that is equivalent to submit a single record as a batch.
- //
- // Essentially, equivalent to the combined logic of:
- // add_pending(), encode_batch() and set_result() above without
- // the intervention of the shared io_promise.
- //
- // Note the current RecordBatch can be reused afterwards.
- std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
- record_t&&,
- extent_len_t block_size,
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce);
-
- private:
- record_group_size_t get_encoded_length_after(
- const record_t& record,
- extent_len_t block_size) const {
- return pending.size.get_encoded_length_after(
- record.size, block_size);
- }
-
- state_t state = state_t::EMPTY;
- std::size_t index = 0;
- std::size_t batch_capacity = 0;
- std::size_t batch_flush_size = 0;
-
- record_group_t pending;
- std::size_t submitting_size = 0;
- seastore_off_t submitting_length = 0;
- seastore_off_t submitting_mdlength = 0;
-
- struct promise_result_t {
- write_result_t write_result;
- seastore_off_t mdlength;
- };
- using maybe_promise_result_t = std::optional<promise_result_t>;
- std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
- };
-
- class RecordSubmitter {
- enum class state_t {
- IDLE = 0, // outstanding_io == 0
- PENDING, // outstanding_io < io_depth_limit
- FULL // outstanding_io == io_depth_limit
- // OVERFLOW: outstanding_io > io_depth_limit is impossible
- };
-
- struct grouped_io_stats {
- uint64_t num_io = 0;
- uint64_t num_io_grouped = 0;
-
- void increment(uint64_t num_grouped_io) {
- ++num_io;
- num_io_grouped += num_grouped_io;
- }
- };
-
- public:
- RecordSubmitter(std::size_t io_depth,
- std::size_t batch_capacity,
- std::size_t batch_flush_size,
- double preferred_fullness,
- JournalSegmentManager&);
-
- grouped_io_stats get_record_batch_stats() const {
- return stats.record_batch_stats;
- }
-
- grouped_io_stats get_io_depth_stats() const {
- return stats.io_depth_stats;
- }
-
- uint64_t get_record_group_padding_bytes() const {
- return stats.record_group_padding_bytes;
- }
-
- uint64_t get_record_group_metadata_bytes() const {
- return stats.record_group_metadata_bytes;
- }
-
- uint64_t get_record_group_data_bytes() const {
- return stats.record_group_data_bytes;
- }
-
- void reset_stats() {
- stats = {};
- }
-
- void set_write_pipeline(WritePipeline *_write_pipeline) {
- write_pipeline = _write_pipeline;
- }
-
- using submit_ret = Journal::submit_record_ret;
- submit_ret submit(record_t&&, OrderingHandle&);
- seastar::future<> flush(OrderingHandle &handle);
-
- private:
- void update_state();
-
- void increment_io() {
- ++num_outstanding_io;
- stats.io_depth_stats.increment(num_outstanding_io);
- update_state();
- }
-
- void decrement_io_with_flush();
-
- void pop_free_batch() {
- assert(p_current_batch == nullptr);
- assert(!free_batch_ptrs.empty());
- p_current_batch = free_batch_ptrs.front();
- assert(p_current_batch->is_empty());
- assert(p_current_batch == &batches[p_current_batch->get_index()]);
- free_batch_ptrs.pop_front();
- }
-
- void account_submission(std::size_t, const record_group_size_t&);
-
- using maybe_result_t = RecordBatch::maybe_result_t;
- void finish_submit_batch(RecordBatch*, maybe_result_t);
-
- void flush_current_batch();
-
- using submit_pending_ertr = JournalSegmentManager::write_ertr;
- using submit_pending_ret = submit_pending_ertr::future<
- record_locator_t>;
- submit_pending_ret submit_pending(
- record_t&&, OrderingHandle &handle, bool flush);
-
- using do_submit_ret = submit_pending_ret;
- do_submit_ret do_submit(
- record_t&&, OrderingHandle&);
-
- state_t state = state_t::IDLE;
- std::size_t num_outstanding_io = 0;
- std::size_t io_depth_limit;
- double preferred_fullness;
-
- WritePipeline* write_pipeline = nullptr;
- JournalSegmentManager& journal_segment_manager;
- std::unique_ptr<RecordBatch[]> batches;
- std::size_t current_batch_index;
- // should not be nullptr after constructed
- RecordBatch* p_current_batch = nullptr;
- seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
- std::optional<seastar::promise<> > wait_submit_promise;
-
- struct {
- grouped_io_stats record_batch_stats;
- grouped_io_stats io_depth_stats;
- uint64_t record_group_padding_bytes = 0;
- uint64_t record_group_metadata_bytes = 0;
- uint64_t record_group_data_bytes = 0;
- } stats;
- };
-
- SegmentProvider* segment_provider = nullptr;
- JournalSegmentManager journal_segment_manager;
- RecordSubmitter record_submitter;
- ExtentReader& scanner;
- seastar::metrics::metric_group metrics;
+ virtual ~Journal() {}
+};
+using JournalRef = std::unique_ptr<Journal>;
- /// return ordered vector of segments to replay
- using replay_segments_t = std::vector<
- std::pair<journal_seq_t, segment_header_t>>;
- using prep_replay_segments_ertr = crimson::errorator<
- crimson::ct_error::input_output_error
- >;
- using prep_replay_segments_fut = prep_replay_segments_ertr::future<
- replay_segments_t>;
- prep_replay_segments_fut prep_replay_segments(
- std::vector<std::pair<segment_id_t, segment_header_t>> segments);
+namespace journal {
- /// replays records starting at start through end of segment
- replay_ertr::future<>
- replay_segment(
- journal_seq_t start, ///< [in] starting addr, seq
- segment_header_t header, ///< [in] segment header
- delta_handler_t &delta_handler ///< [in] processes deltas in order
- );
+JournalRef make_segmented(
+ SegmentManager &sm,
+ ExtentReader &reader,
+ SegmentProvider &provider);
- void register_metrics();
-};
-using JournalRef = std::unique_ptr<Journal>;
+}
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "include/intarith.h"
+
+#include "segmented_journal.h"
+
+#include "crimson/common/config_proxy.h"
+#include "crimson/os/seastore/logging.h"
+
+SET_SUBSYS(seastore_journal);
+
+/*
+ * format:
+ * - H<handle-addr> information
+ *
+ * levels:
+ * - INFO: major initiation, closing, rolling and replay operations
+ * - DEBUG: INFO details, major submit operations
+ * - TRACE: DEBUG details
+ */
+
+namespace crimson::os::seastore::journal {
+
+segment_nonce_t generate_nonce(
+ segment_seq_t seq,
+ const seastore_meta_t &meta)
+{
+ return ceph_crc32c(
+ seq,
+ reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
+ sizeof(meta.seastore_id.uuid));
+}
+
+SegmentedJournal::SegmentedJournal(
+ SegmentManager &segment_manager,
+ ExtentReader &scanner,
+ SegmentProvider &segment_provider)
+ : segment_provider(segment_provider),
+ journal_segment_manager(segment_manager, segment_provider),
+ record_submitter(crimson::common::get_conf<uint64_t>(
+ "seastore_journal_iodepth_limit"),
+ crimson::common::get_conf<uint64_t>(
+ "seastore_journal_batch_capacity"),
+ crimson::common::get_conf<Option::size_t>(
+ "seastore_journal_batch_flush_size"),
+ crimson::common::get_conf<double>(
+ "seastore_journal_batch_preferred_fullness"),
+ journal_segment_manager),
+ scanner(scanner)
+{
+ register_metrics();
+}
+
+SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write()
+{
+ LOG_PREFIX(Journal::open_for_write);
+ INFO("device_id={}", journal_segment_manager.get_device_id());
+ return journal_segment_manager.open();
+}
+
+SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
+{
+ LOG_PREFIX(Journal::close);
+ INFO("closing");
+ metrics.clear();
+ return journal_segment_manager.close();
+}
+
+SegmentedJournal::prep_replay_segments_fut
+SegmentedJournal::prep_replay_segments(
+ std::vector<std::pair<segment_id_t, segment_header_t>> segments)
+{
+ LOG_PREFIX(Journal::prep_replay_segments);
+ if (segments.empty()) {
+ ERROR("no journal segments for replay");
+ return crimson::ct_error::input_output_error::make();
+ }
+ std::sort(
+ segments.begin(),
+ segments.end(),
+ [](const auto <, const auto &rt) {
+ return lt.second.journal_segment_seq <
+ rt.second.journal_segment_seq;
+ });
+
+ journal_segment_manager.set_segment_seq(
+ segments.rbegin()->second.journal_segment_seq);
+ std::for_each(
+ segments.begin(),
+ segments.end(),
+ [this, FNAME](auto &seg)
+ {
+ if (seg.first != seg.second.physical_segment_id ||
+ seg.first.device_id() != journal_segment_manager.get_device_id() ||
+ seg.second.get_type() != segment_type_t::JOURNAL) {
+ ERROR("illegal journal segment for replay -- {}", seg.second);
+ ceph_abort();
+ }
+ });
+
+ auto journal_tail = segments.rbegin()->second.journal_tail;
+ segment_provider.update_journal_tail_committed(journal_tail);
+ auto replay_from = journal_tail.offset;
+ auto from = segments.begin();
+ if (replay_from != P_ADDR_NULL) {
+ from = std::find_if(
+ segments.begin(),
+ segments.end(),
+ [&replay_from](const auto &seg) -> bool {
+ auto& seg_addr = replay_from.as_seg_paddr();
+ return seg.first == seg_addr.get_segment_id();
+ });
+ if (from->second.journal_segment_seq != journal_tail.segment_seq) {
+ ERROR("journal_tail {} does not match {}",
+ journal_tail, from->second);
+ ceph_abort();
+ }
+ } else {
+ replay_from = paddr_t::make_seg_paddr(
+ from->first,
+ journal_segment_manager.get_block_size());
+ }
+
+ auto num_segments = segments.end() - from;
+ INFO("{} segments to replay, from {}",
+ num_segments, replay_from);
+ auto ret = replay_segments_t(num_segments);
+ std::transform(
+ from, segments.end(), ret.begin(),
+ [this](const auto &p) {
+ auto ret = journal_seq_t{
+ p.second.journal_segment_seq,
+ paddr_t::make_seg_paddr(
+ p.first,
+ journal_segment_manager.get_block_size())
+ };
+ return std::make_pair(ret, p.second);
+ });
+ ret[0].first.offset = replay_from;
+ return prep_replay_segments_fut(
+ prep_replay_segments_ertr::ready_future_marker{},
+ std::move(ret));
+}
+
+SegmentedJournal::replay_ertr::future<>
+SegmentedJournal::replay_segment(
+ journal_seq_t seq,
+ segment_header_t header,
+ delta_handler_t &handler)
+{
+ LOG_PREFIX(Journal::replay_segment);
+ INFO("starting at {} -- {}", seq, header);
+ return seastar::do_with(
+ scan_valid_records_cursor(seq),
+ ExtentReader::found_record_handler_t([=, &handler](
+ record_locator_t locator,
+ const record_group_header_t& header,
+ const bufferlist& mdbuf)
+ -> ExtentReader::scan_valid_records_ertr::future<>
+ {
+ auto maybe_record_deltas_list = try_decode_deltas(
+ header, mdbuf, locator.record_block_base);
+ if (!maybe_record_deltas_list) {
+ // This should be impossible, we did check the crc on the mdbuf
+ ERROR("unable to decode deltas for record {} at {}",
+ header, locator);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ return seastar::do_with(
+ std::move(*maybe_record_deltas_list),
+ [write_result=locator.write_result,
+ this,
+ FNAME,
+ &handler](auto& record_deltas_list)
+ {
+ return crimson::do_for_each(
+ record_deltas_list,
+ [write_result,
+ this,
+ FNAME,
+ &handler](record_deltas_t& record_deltas)
+ {
+ auto locator = record_locator_t{
+ record_deltas.record_block_base,
+ write_result
+ };
+ DEBUG("processing {} deltas at block_base {}",
+ record_deltas.deltas.size(),
+ locator);
+ return crimson::do_for_each(
+ record_deltas.deltas,
+ [locator,
+ this,
+ FNAME,
+ &handler](delta_info_t& delta)
+ {
+ /* The journal may validly contain deltas for extents in
+ * since released segments. We can detect those cases by
+ * checking whether the segment in question currently has a
+ * sequence number > the current journal segment seq. We can
+ * safetly skip these deltas because the extent must already
+ * have been rewritten.
+ */
+ if (delta.paddr != P_ADDR_NULL) {
+ auto& seg_addr = delta.paddr.as_seg_paddr();
+ auto delta_paddr_segment_seq = segment_provider.get_seq(seg_addr.get_segment_id());
+ auto delta_paddr_segment_type = segment_seq_to_type(delta_paddr_segment_seq);
+ auto locator_segment_seq = locator.write_result.start_seq.segment_seq;
+ if (delta_paddr_segment_type == segment_type_t::NULL_SEG ||
+ (delta_paddr_segment_type == segment_type_t::JOURNAL &&
+ delta_paddr_segment_seq > locator_segment_seq)) {
+ SUBDEBUG(seastore_cache,
+ "delta is obsolete, delta_paddr_segment_seq={}, locator_segment_seq={} -- {}",
+ segment_seq_printer_t{delta_paddr_segment_seq},
+ segment_seq_printer_t{locator_segment_seq},
+ delta);
+ return replay_ertr::now();
+ }
+ }
+ return handler(locator, delta);
+ });
+ });
+ });
+ }),
+ [=](auto &cursor, auto &dhandler) {
+ return scanner.scan_valid_records(
+ cursor,
+ header.segment_nonce,
+ std::numeric_limits<size_t>::max(),
+ dhandler).safe_then([](auto){}
+ ).handle_error(
+ replay_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "shouldn't meet with any other error other replay_ertr"
+ }
+ );
+ }
+ );
+}
+
+SegmentedJournal::find_journal_segments_ret
+SegmentedJournal::find_journal_segments()
+{
+ return seastar::do_with(
+ find_journal_segments_ret_bare{},
+ [this](auto &ret) -> find_journal_segments_ret {
+ return crimson::do_for_each(
+ boost::counting_iterator<device_segment_id_t>(0),
+ boost::counting_iterator<device_segment_id_t>(
+ journal_segment_manager.get_num_segments()),
+ [this, &ret](device_segment_id_t d_segment_id) {
+ segment_id_t segment_id{
+ journal_segment_manager.get_device_id(),
+ d_segment_id};
+ return scanner.read_segment_header(
+ segment_id
+ ).safe_then([segment_id, &ret](auto &&header) {
+ if (header.get_type() == segment_type_t::JOURNAL) {
+ ret.emplace_back(std::make_pair(segment_id, std::move(header)));
+ }
+ }).handle_error(
+ crimson::ct_error::enoent::handle([](auto) {
+ return find_journal_segments_ertr::now();
+ }),
+ crimson::ct_error::enodata::handle([](auto) {
+ return find_journal_segments_ertr::now();
+ }),
+ crimson::ct_error::input_output_error::pass_further{}
+ );
+ }).safe_then([&ret]() mutable {
+ return find_journal_segments_ret{
+ find_journal_segments_ertr::ready_future_marker{},
+ std::move(ret)};
+ });
+ });
+}
+
+SegmentedJournal::replay_ret SegmentedJournal::replay(
+ delta_handler_t &&delta_handler)
+{
+ LOG_PREFIX(Journal::replay);
+ return find_journal_segments(
+ ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)]
+ (auto &&segment_headers) mutable -> replay_ret {
+ INFO("got {} segments", segment_headers.size());
+ return seastar::do_with(
+ std::move(delta_handler), replay_segments_t(),
+ [this, segment_headers=std::move(segment_headers)]
+ (auto &handler, auto &segments) mutable -> replay_ret {
+ return prep_replay_segments(std::move(segment_headers)
+ ).safe_then([this, &handler, &segments](auto replay_segs) mutable {
+ segments = std::move(replay_segs);
+ return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
+ return replay_segment(i.first, i.second, handler);
+ });
+ });
+ });
+ });
+}
+
+void SegmentedJournal::register_metrics()
+{
+ LOG_PREFIX(Journal::register_metrics);
+ DEBUG("");
+ record_submitter.reset_stats();
+ namespace sm = seastar::metrics;
+ metrics.add_group(
+ "journal",
+ {
+ sm::make_counter(
+ "record_num",
+ [this] {
+ return record_submitter.get_record_batch_stats().num_io;
+ },
+ sm::description("total number of records submitted")
+ ),
+ sm::make_counter(
+ "record_batch_num",
+ [this] {
+ return record_submitter.get_record_batch_stats().num_io_grouped;
+ },
+ sm::description("total number of records batched")
+ ),
+ sm::make_counter(
+ "io_num",
+ [this] {
+ return record_submitter.get_io_depth_stats().num_io;
+ },
+ sm::description("total number of io submitted")
+ ),
+ sm::make_counter(
+ "io_depth_num",
+ [this] {
+ return record_submitter.get_io_depth_stats().num_io_grouped;
+ },
+ sm::description("total number of io depth")
+ ),
+ sm::make_counter(
+ "record_group_padding_bytes",
+ [this] {
+ return record_submitter.get_record_group_padding_bytes();
+ },
+ sm::description("bytes of metadata padding when write record groups")
+ ),
+ sm::make_counter(
+ "record_group_metadata_bytes",
+ [this] {
+ return record_submitter.get_record_group_metadata_bytes();
+ },
+ sm::description("bytes of raw metadata when write record groups")
+ ),
+ sm::make_counter(
+ "record_group_data_bytes",
+ [this] {
+ return record_submitter.get_record_group_data_bytes();
+ },
+ sm::description("bytes of data when write record groups")
+ ),
+ }
+ );
+}
+
+SegmentedJournal::JournalSegmentManager::JournalSegmentManager(
+ SegmentManager& segment_manager,
+ SegmentProvider& segment_provider)
+ : segment_provider{segment_provider}, segment_manager{segment_manager}
+{
+ reset();
+}
+
+SegmentedJournal::JournalSegmentManager::open_ret
+SegmentedJournal::JournalSegmentManager::open()
+{
+ return roll().safe_then([this] {
+ return get_current_write_seq();
+ });
+}
+
+SegmentedJournal::JournalSegmentManager::close_ertr::future<>
+SegmentedJournal::JournalSegmentManager::close()
+{
+ LOG_PREFIX(JournalSegmentManager::close);
+ if (current_journal_segment) {
+ INFO("segment_id={}, seq={}, "
+ "written_to={}, committed_to={}, nonce={}",
+ current_journal_segment->get_segment_id(),
+ get_segment_seq(),
+ written_to,
+ committed_to,
+ current_segment_nonce);
+ } else {
+ INFO("no current journal segment");
+ }
+
+ return (
+ current_journal_segment ?
+ current_journal_segment->close() :
+ Segment::close_ertr::now()
+ ).handle_error(
+ close_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in JournalSegmentManager::close()"
+ }
+ ).finally([this] {
+ reset();
+ });
+}
+
+SegmentedJournal::JournalSegmentManager::roll_ertr::future<>
+SegmentedJournal::JournalSegmentManager::roll()
+{
+ LOG_PREFIX(JournalSegmentManager::roll);
+ auto old_segment_id = current_journal_segment ?
+ current_journal_segment->get_segment_id() :
+ NULL_SEG_ID;
+ if (current_journal_segment) {
+ INFO("closing segment {}, seq={}, "
+ "written_to={}, committed_to={}, nonce={}",
+ old_segment_id,
+ get_segment_seq(),
+ written_to,
+ committed_to,
+ current_segment_nonce);
+ }
+
+ return (
+ current_journal_segment ?
+ current_journal_segment->close() :
+ Segment::close_ertr::now()
+ ).safe_then([this] {
+ return segment_provider.get_segment(
+ get_device_id(), next_journal_segment_seq);
+ }).safe_then([this](auto segment) {
+ return segment_manager.open(segment);
+ }).safe_then([this](auto sref) {
+ current_journal_segment = sref;
+ return initialize_segment(*current_journal_segment);
+ }).safe_then([this, old_segment_id] {
+ if (old_segment_id != NULL_SEG_ID) {
+ segment_provider.close_segment(old_segment_id);
+ }
+ }).handle_error(
+ roll_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in JournalSegmentManager::roll"
+ }
+ );
+}
+
+SegmentedJournal::JournalSegmentManager::write_ret
+SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write)
+{
+ LOG_PREFIX(JournalSegmentManager::write);
+ auto write_length = to_write.length();
+ auto write_start_seq = get_current_write_seq();
+ TRACE("{}~{}", write_start_seq, write_length);
+ assert(write_length > 0);
+ assert((write_length % segment_manager.get_block_size()) == 0);
+ assert(!needs_roll(write_length));
+
+ auto write_start_offset = written_to;
+ written_to += write_length;
+ auto write_result = write_result_t{
+ write_start_seq,
+ static_cast<seastore_off_t>(write_length)
+ };
+ return current_journal_segment->write(
+ write_start_offset, to_write
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in JournalSegmentManager::write"
+ }
+ ).safe_then([write_result] {
+ return write_result;
+ });
+}
+
+void SegmentedJournal::JournalSegmentManager::mark_committed(
+ const journal_seq_t& new_committed_to)
+{
+ LOG_PREFIX(JournalSegmentManager::mark_committed);
+ TRACE("{} => {}", committed_to, new_committed_to);
+ assert(committed_to == journal_seq_t() ||
+ committed_to <= new_committed_to);
+ committed_to = new_committed_to;
+}
+
+SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<>
+SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment)
+{
+ LOG_PREFIX(JournalSegmentManager::initialize_segment);
+ auto new_tail = segment_provider.get_journal_tail_target();
+ // write out header
+ ceph_assert(segment.get_write_ptr() == 0);
+ bufferlist bl;
+
+ segment_seq_t seq = next_journal_segment_seq++;
+ current_segment_nonce = generate_nonce(
+ seq, segment_manager.get_meta());
+ auto header = segment_header_t{
+ seq,
+ segment.get_segment_id(),
+ new_tail,
+ current_segment_nonce};
+ INFO("writing {} ...", header);
+ ceph_assert(header.get_type() == segment_type_t::JOURNAL);
+ encode(header, bl);
+
+ bufferptr bp(
+ ceph::buffer::create_page_aligned(
+ segment_manager.get_block_size()));
+ bp.zero();
+ auto iter = bl.cbegin();
+ iter.copy(bl.length(), bp.c_str());
+ bl.clear();
+ bl.append(bp);
+
+ written_to = 0;
+ return write(bl
+ ).safe_then([this, new_tail](auto) {
+ segment_provider.update_journal_tail_committed(new_tail);
+ });
+}
+
+SegmentedJournal::RecordBatch::add_pending_ret
+SegmentedJournal::RecordBatch::add_pending(
+ record_t&& record,
+ OrderingHandle& handle,
+ extent_len_t block_size)
+{
+ LOG_PREFIX(RecordBatch::add_pending);
+ auto new_size = get_encoded_length_after(record, block_size);
+ auto dlength_offset = pending.size.dlength;
+ TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
+ (void*)&handle,
+ pending.get_size() + 1,
+ new_size.get_encoded_length(),
+ dlength_offset);
+ assert(state != state_t::SUBMITTING);
+ assert(can_batch(record, block_size).value() == new_size);
+
+ pending.push_back(
+ std::move(record), block_size);
+ assert(pending.size == new_size);
+ if (state == state_t::EMPTY) {
+ assert(!io_promise.has_value());
+ io_promise = seastar::shared_promise<maybe_promise_result_t>();
+ } else {
+ assert(io_promise.has_value());
+ }
+ state = state_t::PENDING;
+
+ return io_promise->get_shared_future(
+ ).then([dlength_offset, FNAME, &handle
+ ](auto maybe_promise_result) -> add_pending_ret {
+ if (!maybe_promise_result.has_value()) {
+ ERROR("H{} write failed", (void*)&handle);
+ return crimson::ct_error::input_output_error::make();
+ }
+ auto write_result = maybe_promise_result->write_result;
+ auto submit_result = record_locator_t{
+ write_result.start_seq.offset.add_offset(
+ maybe_promise_result->mdlength + dlength_offset),
+ write_result
+ };
+ TRACE("H{} write finish with {}", (void*)&handle, submit_result);
+ return add_pending_ret(
+ add_pending_ertr::ready_future_marker{},
+ submit_result);
+ });
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+SegmentedJournal::RecordBatch::encode_batch(
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce)
+{
+ assert(state == state_t::PENDING);
+ assert(pending.get_size() > 0);
+ assert(io_promise.has_value());
+
+ state = state_t::SUBMITTING;
+ submitting_size = pending.get_size();
+ auto gsize = pending.size;
+ submitting_length = gsize.get_encoded_length();
+ submitting_mdlength = gsize.get_mdlength();
+ auto bl = encode_records(pending, committed_to, segment_nonce);
+ // Note: pending is cleared here
+ assert(bl.length() == (std::size_t)submitting_length);
+ return std::make_pair(bl, gsize);
+}
+
+void SegmentedJournal::RecordBatch::set_result(
+ maybe_result_t maybe_write_result)
+{
+ maybe_promise_result_t result;
+ if (maybe_write_result.has_value()) {
+ assert(maybe_write_result->length == submitting_length);
+ result = promise_result_t{
+ *maybe_write_result,
+ submitting_mdlength
+ };
+ }
+ assert(state == state_t::SUBMITTING);
+ assert(io_promise.has_value());
+
+ state = state_t::EMPTY;
+ submitting_size = 0;
+ submitting_length = 0;
+ submitting_mdlength = 0;
+ io_promise->set_value(result);
+ io_promise.reset();
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+SegmentedJournal::RecordBatch::submit_pending_fast(
+ record_t&& record,
+ extent_len_t block_size,
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce)
+{
+ auto new_size = get_encoded_length_after(record, block_size);
+ std::ignore = new_size;
+ assert(state == state_t::EMPTY);
+ assert(can_batch(record, block_size).value() == new_size);
+
+ auto group = record_group_t(std::move(record), block_size);
+ auto size = group.size;
+ assert(size == new_size);
+ auto bl = encode_records(group, committed_to, segment_nonce);
+ assert(bl.length() == size.get_encoded_length());
+ return std::make_pair(bl, size);
+}
+
+SegmentedJournal::RecordSubmitter::RecordSubmitter(
+ std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ double preferred_fullness,
+ JournalSegmentManager& jsm)
+ : io_depth_limit{io_depth},
+ preferred_fullness{preferred_fullness},
+ journal_segment_manager{jsm},
+ batches(new RecordBatch[io_depth + 1])
+{
+ LOG_PREFIX(RecordSubmitter);
+ INFO("Journal::RecordSubmitter: io_depth_limit={}, "
+ "batch_capacity={}, batch_flush_size={}, "
+ "preferred_fullness={}",
+ io_depth, batch_capacity,
+ batch_flush_size, preferred_fullness);
+ ceph_assert(io_depth > 0);
+ ceph_assert(batch_capacity > 0);
+ ceph_assert(preferred_fullness >= 0 &&
+ preferred_fullness <= 1);
+ free_batch_ptrs.reserve(io_depth + 1);
+ for (std::size_t i = 0; i <= io_depth; ++i) {
+ batches[i].initialize(i, batch_capacity, batch_flush_size);
+ free_batch_ptrs.push_back(&batches[i]);
+ }
+ pop_free_batch();
+}
+
+SegmentedJournal::RecordSubmitter::submit_ret
+SegmentedJournal::RecordSubmitter::submit(
+ record_t&& record,
+ OrderingHandle& handle)
+{
+ LOG_PREFIX(RecordSubmitter::submit);
+ DEBUG("H{} {} start ...", (void*)&handle, record);
+ assert(write_pipeline);
+ auto expected_size = record_group_size_t(
+ record.size,
+ journal_segment_manager.get_block_size()
+ ).get_encoded_length();
+ auto max_record_length = journal_segment_manager.get_max_write_length();
+ if (expected_size > max_record_length) {
+ ERROR("H{} {} exceeds max record size {}",
+ (void*)&handle, record, max_record_length);
+ return crimson::ct_error::erange::make();
+ }
+
+ return do_submit(std::move(record), handle);
+}
+
+seastar::future<> SegmentedJournal::RecordSubmitter::flush(OrderingHandle &handle)
+{
+ LOG_PREFIX(RecordSubmitter::flush);
+ DEBUG("H{} flush", (void*)&handle);
+ return handle.enter(write_pipeline->device_submission
+ ).then([this, &handle] {
+ return handle.enter(write_pipeline->finalize);
+ }).then([FNAME, &handle] {
+ DEBUG("H{} flush done", (void*)&handle);
+ });
+}
+
+void SegmentedJournal::RecordSubmitter::update_state()
+{
+ if (num_outstanding_io == 0) {
+ state = state_t::IDLE;
+ } else if (num_outstanding_io < io_depth_limit) {
+ state = state_t::PENDING;
+ } else if (num_outstanding_io == io_depth_limit) {
+ state = state_t::FULL;
+ } else {
+ ceph_abort("fatal error: io-depth overflow");
+ }
+}
+
+void SegmentedJournal::RecordSubmitter::decrement_io_with_flush()
+{
+ LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
+ assert(num_outstanding_io > 0);
+ --num_outstanding_io;
+#ifndef NDEBUG
+ auto prv_state = state;
+#endif
+ update_state();
+
+ if (wait_submit_promise.has_value()) {
+ DEBUG("wait resolved");
+ assert(prv_state == state_t::FULL);
+ wait_submit_promise->set_value();
+ wait_submit_promise.reset();
+ }
+
+ if (!p_current_batch->is_empty()) {
+ TRACE("flush");
+ flush_current_batch();
+ }
+}
+
+void SegmentedJournal::RecordSubmitter::account_submission(
+ std::size_t num,
+ const record_group_size_t& size)
+{
+ stats.record_group_padding_bytes +=
+ (size.get_mdlength() - size.get_raw_mdlength());
+ stats.record_group_metadata_bytes += size.get_raw_mdlength();
+ stats.record_group_data_bytes += size.dlength;
+}
+
+void SegmentedJournal::RecordSubmitter::finish_submit_batch(
+ RecordBatch* p_batch,
+ maybe_result_t maybe_result)
+{
+ assert(p_batch->is_submitting());
+ p_batch->set_result(maybe_result);
+ free_batch_ptrs.push_back(p_batch);
+ decrement_io_with_flush();
+}
+
+void SegmentedJournal::RecordSubmitter::flush_current_batch()
+{
+ LOG_PREFIX(RecordSubmitter::flush_current_batch);
+ RecordBatch* p_batch = p_current_batch;
+ assert(p_batch->is_pending());
+ p_current_batch = nullptr;
+ pop_free_batch();
+
+ increment_io();
+ auto num = p_batch->get_num_records();
+ auto committed_to = journal_segment_manager.get_committed_to();
+ auto [to_write, sizes] = p_batch->encode_batch(
+ committed_to, journal_segment_manager.get_nonce());
+ DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
+ num, sizes, committed_to, num_outstanding_io);
+ account_submission(num, sizes);
+ std::ignore = journal_segment_manager.write(to_write
+ ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+ TRACE("{} records, {}, write done with {}", num, sizes, write_result);
+ finish_submit_batch(p_batch, write_result);
+ }).handle_error(
+ crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} records, {}, got error {}", num, sizes, e);
+ finish_submit_batch(p_batch, std::nullopt);
+ })
+ ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} records, {}, got exception {}", num, sizes, e);
+ finish_submit_batch(p_batch, std::nullopt);
+ });
+}
+
+SegmentedJournal::RecordSubmitter::submit_pending_ret
+SegmentedJournal::RecordSubmitter::submit_pending(
+ record_t&& record,
+ OrderingHandle& handle,
+ bool flush)
+{
+ LOG_PREFIX(RecordSubmitter::submit_pending);
+ assert(!p_current_batch->is_submitting());
+ stats.record_batch_stats.increment(
+ p_current_batch->get_num_records() + 1);
+ bool do_flush = (flush || state == state_t::IDLE);
+ auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
+ if (do_flush && p_current_batch->is_empty()) {
+ // fast path with direct write
+ increment_io();
+ auto committed_to = journal_segment_manager.get_committed_to();
+ auto [to_write, sizes] = p_current_batch->submit_pending_fast(
+ std::move(record),
+ journal_segment_manager.get_block_size(),
+ committed_to,
+ journal_segment_manager.get_nonce());
+ DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
+ (void*)&handle, sizes, committed_to, num_outstanding_io);
+ account_submission(1, sizes);
+ return journal_segment_manager.write(to_write
+ ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+ return record_locator_t{
+ write_result.start_seq.offset.add_offset(mdlength),
+ write_result
+ };
+ }).finally([this] {
+ decrement_io_with_flush();
+ });
+ } else {
+ // indirect write with or without the existing pending records
+ auto write_fut = p_current_batch->add_pending(
+ std::move(record),
+ handle,
+ journal_segment_manager.get_block_size());
+ if (do_flush) {
+ DEBUG("H{} added pending and flush", (void*)&handle);
+ flush_current_batch();
+ } else {
+ DEBUG("H{} added with {} pending",
+ (void*)&handle, p_current_batch->get_num_records());
+ }
+ return write_fut;
+ }
+ }();
+ return handle.enter(write_pipeline->device_submission
+ ).then([write_fut=std::move(write_fut)]() mutable {
+ return std::move(write_fut);
+ }).safe_then([this, FNAME, &handle](auto submit_result) {
+ return handle.enter(write_pipeline->finalize
+ ).then([this, FNAME, submit_result, &handle] {
+ DEBUG("H{} finish with {}", (void*)&handle, submit_result);
+ journal_segment_manager.mark_committed(
+ submit_result.write_result.get_end_seq());
+ return submit_result;
+ });
+ });
+}
+
+SegmentedJournal::RecordSubmitter::do_submit_ret
+SegmentedJournal::RecordSubmitter::do_submit(
+ record_t&& record,
+ OrderingHandle& handle)
+{
+ LOG_PREFIX(RecordSubmitter::do_submit);
+ TRACE("H{} outstanding_io={}/{} ...",
+ (void*)&handle, num_outstanding_io, io_depth_limit);
+ assert(!p_current_batch->is_submitting());
+ if (state <= state_t::PENDING) {
+ // can increment io depth
+ assert(!wait_submit_promise.has_value());
+ auto maybe_new_size = p_current_batch->can_batch(
+ record, journal_segment_manager.get_block_size());
+ if (!maybe_new_size.has_value() ||
+ (maybe_new_size->get_encoded_length() >
+ journal_segment_manager.get_max_write_length())) {
+ TRACE("H{} flush", (void*)&handle);
+ assert(p_current_batch->is_pending());
+ flush_current_batch();
+ return do_submit(std::move(record), handle);
+ } else if (journal_segment_manager.needs_roll(
+ maybe_new_size->get_encoded_length())) {
+ if (p_current_batch->is_pending()) {
+ TRACE("H{} flush and roll", (void*)&handle);
+ flush_current_batch();
+ } else {
+ TRACE("H{} roll", (void*)&handle);
+ }
+ return journal_segment_manager.roll(
+ ).safe_then([this, record=std::move(record), &handle]() mutable {
+ return do_submit(std::move(record), handle);
+ });
+ } else {
+ bool flush = (maybe_new_size->get_fullness() > preferred_fullness ?
+ true : false);
+ return submit_pending(std::move(record), handle, flush);
+ }
+ }
+
+ assert(state == state_t::FULL);
+ // cannot increment io depth
+ auto maybe_new_size = p_current_batch->can_batch(
+ record, journal_segment_manager.get_block_size());
+ if (!maybe_new_size.has_value() ||
+ (maybe_new_size->get_encoded_length() >
+ journal_segment_manager.get_max_write_length()) ||
+ journal_segment_manager.needs_roll(
+ maybe_new_size->get_encoded_length())) {
+ if (!wait_submit_promise.has_value()) {
+ wait_submit_promise = seastar::promise<>();
+ }
+ DEBUG("H{} wait ...", (void*)&handle);
+ return wait_submit_promise->get_future(
+ ).then([this, record=std::move(record), &handle]() mutable {
+ return do_submit(std::move(record), handle);
+ });
+ } else {
+ return submit_pending(std::move(record), handle, false);
+ }
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <optional>
+
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/metrics.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/denc.h"
+
+#include "crimson/os/seastore/segment_cleaner.h"
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/extent_reader.h"
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/ordering_handle.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/osd/exceptions.h"
+
+namespace crimson::os::seastore::journal {
+
+/**
+ * Manages stream of atomically written records to a SegmentManager.
+ */
+class SegmentedJournal : public Journal {
+public:
+ SegmentedJournal(
+ SegmentManager &segment_manager,
+ ExtentReader& scanner,
+ SegmentProvider& cleaner);
+ ~SegmentedJournal() {}
+
+ open_for_write_ret open_for_write() final;
+
+ close_ertr::future<> close() final;
+
+ submit_record_ret submit_record(
+ record_t &&record,
+ OrderingHandle &handle
+ ) final {
+ return record_submitter.submit(std::move(record), handle);
+ }
+
+ seastar::future<> flush(OrderingHandle &handle) final {
+ return record_submitter.flush(handle);
+ }
+
+ replay_ret replay(delta_handler_t &&delta_handler) final;
+
+ void set_write_pipeline(WritePipeline* write_pipeline) final {
+ record_submitter.set_write_pipeline(write_pipeline);
+ }
+
+private:
+ class JournalSegmentManager {
+ public:
+ JournalSegmentManager(SegmentManager&, SegmentProvider&);
+
+ using base_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ extent_len_t get_max_write_length() const {
+ return segment_manager.get_segment_size() -
+ p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+ size_t(segment_manager.get_block_size()));
+ }
+
+ device_id_t get_device_id() const {
+ return segment_manager.get_device_id();
+ }
+
+ device_segment_id_t get_num_segments() const {
+ return segment_manager.get_num_segments();
+ }
+
+ seastore_off_t get_block_size() const {
+ return segment_manager.get_block_size();
+ }
+
+ segment_nonce_t get_nonce() const {
+ return current_segment_nonce;
+ }
+
+ journal_seq_t get_committed_to() const {
+ return committed_to;
+ }
+
+ segment_seq_t get_segment_seq() const {
+ return next_journal_segment_seq - 1;
+ }
+
+ void set_segment_seq(segment_seq_t current_seq) {
+ next_journal_segment_seq = (current_seq + 1);
+ }
+
+ using open_ertr = base_ertr;
+ using open_ret = open_ertr::future<journal_seq_t>;
+ open_ret open();
+
+ using close_ertr = base_ertr;
+ close_ertr::future<> close();
+
+ // returns true iff the current segment has insufficient space
+ bool needs_roll(std::size_t length) const {
+ auto write_capacity = current_journal_segment->get_write_capacity();
+ return length + written_to > std::size_t(write_capacity);
+ }
+
+ // close the current segment and initialize next one
+ using roll_ertr = base_ertr;
+ roll_ertr::future<> roll();
+
+ // write the buffer, return the write result
+ // May be called concurrently, writes may complete in any order.
+ using write_ertr = base_ertr;
+ using write_ret = write_ertr::future<write_result_t>;
+ write_ret write(ceph::bufferlist to_write);
+
+ // mark write committed in order
+ void mark_committed(const journal_seq_t& new_committed_to);
+
+ private:
+ journal_seq_t get_current_write_seq() const {
+ assert(current_journal_segment);
+ return journal_seq_t{
+ get_segment_seq(),
+ paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
+ written_to)
+ };
+ }
+
+ void reset() {
+ next_journal_segment_seq = 0;
+ current_segment_nonce = 0;
+ current_journal_segment.reset();
+ written_to = 0;
+ committed_to = {};
+ }
+
+ // prepare segment for writes, writes out segment header
+ using initialize_segment_ertr = base_ertr;
+ initialize_segment_ertr::future<> initialize_segment(Segment&);
+
+ SegmentProvider& segment_provider;
+ SegmentManager& segment_manager;
+
+ segment_seq_t next_journal_segment_seq;
+ segment_nonce_t current_segment_nonce;
+
+ SegmentRef current_journal_segment;
+ seastore_off_t written_to;
+ // committed_to may be in a previous journal segment
+ journal_seq_t committed_to;
+ };
+
+ class RecordBatch {
+ enum class state_t {
+ EMPTY = 0,
+ PENDING,
+ SUBMITTING
+ };
+
+ public:
+ RecordBatch() = default;
+ RecordBatch(RecordBatch&&) = delete;
+ RecordBatch(const RecordBatch&) = delete;
+ RecordBatch& operator=(RecordBatch&&) = delete;
+ RecordBatch& operator=(const RecordBatch&) = delete;
+
+ bool is_empty() const {
+ return state == state_t::EMPTY;
+ }
+
+ bool is_pending() const {
+ return state == state_t::PENDING;
+ }
+
+ bool is_submitting() const {
+ return state == state_t::SUBMITTING;
+ }
+
+ std::size_t get_index() const {
+ return index;
+ }
+
+ std::size_t get_num_records() const {
+ return pending.get_size();
+ }
+
+ // return the expected write sizes if allows to batch,
+ // otherwise, return nullopt
+ std::optional<record_group_size_t> can_batch(
+ const record_t& record,
+ extent_len_t block_size) const {
+ assert(state != state_t::SUBMITTING);
+ if (pending.get_size() >= batch_capacity ||
+ (pending.get_size() > 0 &&
+ pending.size.get_encoded_length() > batch_flush_size)) {
+ assert(state == state_t::PENDING);
+ return std::nullopt;
+ }
+ return get_encoded_length_after(record, block_size);
+ }
+
+ void initialize(std::size_t i,
+ std::size_t _batch_capacity,
+ std::size_t _batch_flush_size) {
+ ceph_assert(_batch_capacity > 0);
+ index = i;
+ batch_capacity = _batch_capacity;
+ batch_flush_size = _batch_flush_size;
+ pending.reserve(batch_capacity);
+ }
+
+ // Add to the batch, the future will be resolved after the batch is
+ // written.
+ //
+ // Set write_result_t::write_length to 0 if the record is not the first one
+ // in the batch.
+ using add_pending_ertr = JournalSegmentManager::write_ertr;
+ using add_pending_ret = add_pending_ertr::future<record_locator_t>;
+ add_pending_ret add_pending(
+ record_t&&,
+ OrderingHandle&,
+ extent_len_t block_size);
+
+ // Encode the batched records for write.
+ std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce);
+
+ // Set the write result and reset for reuse
+ using maybe_result_t = std::optional<write_result_t>;
+ void set_result(maybe_result_t maybe_write_end_seq);
+
+ // The fast path that is equivalent to submit a single record as a batch.
+ //
+ // Essentially, equivalent to the combined logic of:
+ // add_pending(), encode_batch() and set_result() above without
+ // the intervention of the shared io_promise.
+ //
+ // Note the current RecordBatch can be reused afterwards.
+ std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
+ record_t&&,
+ extent_len_t block_size,
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce);
+
+ private:
+ record_group_size_t get_encoded_length_after(
+ const record_t& record,
+ extent_len_t block_size) const {
+ return pending.size.get_encoded_length_after(
+ record.size, block_size);
+ }
+
+ state_t state = state_t::EMPTY;
+ std::size_t index = 0;
+ std::size_t batch_capacity = 0;
+ std::size_t batch_flush_size = 0;
+
+ record_group_t pending;
+ std::size_t submitting_size = 0;
+ seastore_off_t submitting_length = 0;
+ seastore_off_t submitting_mdlength = 0;
+
+ struct promise_result_t {
+ write_result_t write_result;
+ seastore_off_t mdlength;
+ };
+ using maybe_promise_result_t = std::optional<promise_result_t>;
+ std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
+ };
+
+ class RecordSubmitter {
+ enum class state_t {
+ IDLE = 0, // outstanding_io == 0
+ PENDING, // outstanding_io < io_depth_limit
+ FULL // outstanding_io == io_depth_limit
+ // OVERFLOW: outstanding_io > io_depth_limit is impossible
+ };
+
+ struct grouped_io_stats {
+ uint64_t num_io = 0;
+ uint64_t num_io_grouped = 0;
+
+ void increment(uint64_t num_grouped_io) {
+ ++num_io;
+ num_io_grouped += num_grouped_io;
+ }
+ };
+
+ public:
+ RecordSubmitter(std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ double preferred_fullness,
+ JournalSegmentManager&);
+
+ grouped_io_stats get_record_batch_stats() const {
+ return stats.record_batch_stats;
+ }
+
+ grouped_io_stats get_io_depth_stats() const {
+ return stats.io_depth_stats;
+ }
+
+ uint64_t get_record_group_padding_bytes() const {
+ return stats.record_group_padding_bytes;
+ }
+
+ uint64_t get_record_group_metadata_bytes() const {
+ return stats.record_group_metadata_bytes;
+ }
+
+ uint64_t get_record_group_data_bytes() const {
+ return stats.record_group_data_bytes;
+ }
+
+ void reset_stats() {
+ stats = {};
+ }
+
+ void set_write_pipeline(WritePipeline *_write_pipeline) {
+ write_pipeline = _write_pipeline;
+ }
+
+ using submit_ret = Journal::submit_record_ret;
+ submit_ret submit(record_t&&, OrderingHandle&);
+ seastar::future<> flush(OrderingHandle &handle);
+
+ private:
+ void update_state();
+
+ void increment_io() {
+ ++num_outstanding_io;
+ stats.io_depth_stats.increment(num_outstanding_io);
+ update_state();
+ }
+
+ void decrement_io_with_flush();
+
+ void pop_free_batch() {
+ assert(p_current_batch == nullptr);
+ assert(!free_batch_ptrs.empty());
+ p_current_batch = free_batch_ptrs.front();
+ assert(p_current_batch->is_empty());
+ assert(p_current_batch == &batches[p_current_batch->get_index()]);
+ free_batch_ptrs.pop_front();
+ }
+
+ void account_submission(std::size_t, const record_group_size_t&);
+
+ using maybe_result_t = RecordBatch::maybe_result_t;
+ void finish_submit_batch(RecordBatch*, maybe_result_t);
+
+ void flush_current_batch();
+
+ using submit_pending_ertr = JournalSegmentManager::write_ertr;
+ using submit_pending_ret = submit_pending_ertr::future<
+ record_locator_t>;
+ submit_pending_ret submit_pending(
+ record_t&&, OrderingHandle &handle, bool flush);
+
+ using do_submit_ret = submit_pending_ret;
+ do_submit_ret do_submit(
+ record_t&&, OrderingHandle&);
+
+ state_t state = state_t::IDLE;
+ std::size_t num_outstanding_io = 0;
+ std::size_t io_depth_limit;
+ double preferred_fullness;
+
+ WritePipeline* write_pipeline = nullptr;
+ JournalSegmentManager& journal_segment_manager;
+ std::unique_ptr<RecordBatch[]> batches;
+ std::size_t current_batch_index;
+ // should not be nullptr after constructed
+ RecordBatch* p_current_batch = nullptr;
+ seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
+ std::optional<seastar::promise<> > wait_submit_promise;
+
+ struct {
+ grouped_io_stats record_batch_stats;
+ grouped_io_stats io_depth_stats;
+ uint64_t record_group_padding_bytes = 0;
+ uint64_t record_group_metadata_bytes = 0;
+ uint64_t record_group_data_bytes = 0;
+ } stats;
+ };
+
+ SegmentProvider& segment_provider;
+ JournalSegmentManager journal_segment_manager;
+ RecordSubmitter record_submitter;
+ ExtentReader& scanner;
+ seastar::metrics::metric_group metrics;
+
+ /// read journal segment headers from scanner
+ using find_journal_segments_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ using find_journal_segments_ret_bare = std::vector<
+ std::pair<segment_id_t, segment_header_t>>;
+ using find_journal_segments_ret = find_journal_segments_ertr::future<
+ find_journal_segments_ret_bare>;
+ find_journal_segments_ret find_journal_segments();
+
+ /// return ordered vector of segments to replay
+ using replay_segments_t = std::vector<
+ std::pair<journal_seq_t, segment_header_t>>;
+ using prep_replay_segments_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error
+ >;
+ using prep_replay_segments_fut = prep_replay_segments_ertr::future<
+ replay_segments_t>;
+ prep_replay_segments_fut prep_replay_segments(
+ std::vector<std::pair<segment_id_t, segment_header_t>> segments);
+
+ /// replays records starting at start through end of segment
+ replay_ertr::future<>
+ replay_segment(
+ journal_seq_t start, ///< [in] starting addr, seq
+ segment_header_t header, ///< [in] segment header
+ delta_handler_t &delta_handler ///< [in] processes deltas in order
+ );
+
+ void register_metrics();
+};
+
+}
std::move(scanner),
false /* detailed */);
- auto journal = std::make_unique<Journal>(*sm, scanner_ref);
+ auto journal = journal::make_segmented(*sm, scanner_ref, *segment_cleaner);
auto epm = std::make_unique<ExtentPlacementManager>();
auto cache = std::make_unique<Cache>(scanner_ref, *epm);
auto lba_manager = lba_manager::create_lba_manager(*sm, *cache);
- journal->set_segment_provider(&*segment_cleaner);
-
auto tm = std::make_unique<TransactionManager>(
*sm,
std::move(segment_cleaner),
});
}
-SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() {
- logger().debug("SegmentCleaner::init_segments: {} segments", segments.size());
- return seastar::do_with(
- std::vector<std::pair<segment_id_t, segment_header_t>>(),
- [this](auto& segment_set) {
- return crimson::do_for_each(
- segments.begin(),
- segments.end(),
- [this, &segment_set](auto& it) {
- auto segment_id = it.first;
- return scanner->read_segment_header(
- segment_id
- ).safe_then([&segment_set, segment_id, this](auto header) {
- logger().debug(
- "ExtentReader::init_segments: segment_id={} -- {}",
+SegmentCleaner::mount_ret SegmentCleaner::mount(
+ device_id_t pdevice_id,
+ std::vector<SegmentManager*>& sms)
+{
+ logger().debug(
+ "SegmentCleaner::mount: {} segment managers", sms.size());
+ init_complete = false;
+ stats = {};
+ journal_tail_target = journal_seq_t{};
+ journal_tail_committed = journal_seq_t{};
+ journal_head = journal_seq_t{};
+ journal_device_id = pdevice_id;
+
+ space_tracker.reset(
+ detailed ?
+ (SpaceTrackerI*)new SpaceTrackerDetailed(
+ sms) :
+ (SpaceTrackerI*)new SpaceTrackerSimple(
+ sms));
+
+ segments.clear();
+ for (auto sm : sms) {
+ // sms is a vector that is indexed by device id and
+ // always has "max_device" elements, some of which
+ // may be null.
+ if (!sm) {
+ continue;
+ }
+ segments.add_segment_manager(*sm);
+ stats.empty_segments += sm->get_num_segments();
+ }
+ metrics.clear();
+ register_metrics();
+
+ logger().debug("SegmentCleaner::mount: {} segments", segments.size());
+ return crimson::do_for_each(
+ segments.begin(),
+ segments.end(),
+ [this](auto& it) {
+ auto segment_id = it.first;
+ return scanner->read_segment_header(
+ segment_id
+ ).safe_then([segment_id, this](auto header) {
+ logger().debug(
+ "ExtentReader::mount: segment_id={} -- {}",
+ segment_id, header);
+ auto s_type = header.get_type();
+ if (s_type == segment_type_t::NULL_SEG) {
+ logger().error(
+ "ExtentReader::mount: got null segment, segment_id={} -- {}",
segment_id, header);
- auto s_type = header.get_type();
- if (s_type == segment_type_t::NULL_SEG) {
- logger().error(
- "ExtentReader::init_segments: got null segment, segment_id={} -- {}",
- segment_id, header);
- ceph_abort();
- }
- if (s_type == segment_type_t::JOURNAL) {
- segment_set.emplace_back(std::make_pair(segment_id, std::move(header)));
- }
- init_mark_segment_closed(
- segment_id,
- header.journal_segment_seq);
- }).handle_error(
- crimson::ct_error::enoent::handle([](auto) {
- return init_segments_ertr::now();
- }),
- crimson::ct_error::enodata::handle([](auto) {
- return init_segments_ertr::now();
- }),
- crimson::ct_error::input_output_error::pass_further{}
- );
- }).safe_then([&segment_set] {
- return seastar::make_ready_future<
- std::vector<std::pair<segment_id_t, segment_header_t>>>(
- std::move(segment_set));
- });
+ ceph_abort();
+ }
+ init_mark_segment_closed(
+ segment_id,
+ header.journal_segment_seq);
+ }).handle_error(
+ crimson::ct_error::enoent::handle([](auto) {
+ return mount_ertr::now();
+ }),
+ crimson::ct_error::enodata::handle([](auto) {
+ return mount_ertr::now();
+ }),
+ crimson::ct_error::input_output_error::pass_further{}
+ );
});
}
#include "crimson/common/log.h"
#include "crimson/os/seastore/cached_extent.h"
-#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/extent_reader.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/transaction.h"
ExtentReaderRef&& scanner,
bool detailed = false);
- void mount(device_id_t pdevice_id, std::vector<SegmentManager*>& sms) {
- crimson::get_logger(ceph_subsys_seastore_cleaner).debug(
- "SegmentCleaner::mount: {} segment managers", sms.size());
- init_complete = false;
- stats = {};
- journal_tail_target = journal_seq_t{};
- journal_tail_committed = journal_seq_t{};
- journal_head = journal_seq_t{};
- journal_device_id = pdevice_id;
-
- space_tracker.reset(
- detailed ?
- (SpaceTrackerI*)new SpaceTrackerDetailed(
- sms) :
- (SpaceTrackerI*)new SpaceTrackerSimple(
- sms));
-
- segments.clear();
- for (auto sm : sms) {
- // sms is a vector that is indexed by device id and
- // always has "max_device" elements, some of which
- // may be null.
- if (!sm) {
- continue;
- }
- segments.add_segment_manager(*sm);
- stats.empty_segments += sm->get_num_segments();
- }
- metrics.clear();
- register_metrics();
- }
-
- using init_segments_ertr = crimson::errorator<
+ using mount_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
- using init_segments_ret_bare =
- std::vector<std::pair<segment_id_t, segment_header_t>>;
- using init_segments_ret = init_segments_ertr::future<init_segments_ret_bare>;
- init_segments_ret init_segments();
+ using mount_ret = mount_ertr::future<>;
+ mount_ret mount(device_id_t pdevice_id, std::vector<SegmentManager*>& sms);
get_segment_ret get_segment(
device_id_t id, segment_seq_t seq) final;
{
LOG_PREFIX(TransactionManager::mkfs);
INFO("enter");
- segment_cleaner->mount(
+ return segment_cleaner->mount(
segment_manager.get_device_id(),
- scanner.get_segment_managers());
- return journal->open_for_write().safe_then([this, FNAME](auto addr) {
+ scanner.get_segment_managers()
+ ).safe_then([this] {
+ return journal->open_for_write();
+ }).safe_then([this, FNAME](auto addr) {
segment_cleaner->init_mkfs(addr);
return with_transaction_intr(
Transaction::src_t::MUTATE,
LOG_PREFIX(TransactionManager::mount);
INFO("enter");
cache->init();
- segment_cleaner->mount(
+ return segment_cleaner->mount(
segment_manager.get_device_id(),
- scanner.get_segment_managers());
- return segment_cleaner->init_segments().safe_then(
- [this](auto&& segments) {
+ scanner.get_segment_managers()
+ ).safe_then([this] {
return journal->replay(
- std::move(segments),
[this](const auto &offsets, const auto &e) {
- auto start_seq = offsets.write_result.start_seq;
- segment_cleaner->update_journal_tail_target(
- cache->get_oldest_dirty_from().value_or(start_seq));
- return cache->replay_delta(
- start_seq,
- offsets.record_block_base,
- e);
- });
+ auto start_seq = offsets.write_result.start_seq;
+ segment_cleaner->update_journal_tail_target(
+ cache->get_oldest_dirty_from().value_or(start_seq));
+ return cache->replay_delta(
+ start_seq,
+ offsets.record_block_base,
+ e);
+ });
}).safe_then([this] {
return journal->open_for_write();
}).safe_then([this, FNAME](auto addr) {
SegmentCleaner::config_t::get_default(),
std::move(scanner),
false /* detailed */);
- std::vector<SegmentManager*> sms;
- segment_cleaner->mount(segment_manager->get_device_id(), sms);
- auto journal = std::make_unique<Journal>(*segment_manager, *scanner);
+ auto journal = journal::make_segmented(
+ *segment_manager, *scanner, *segment_cleaner);
auto epm = std::make_unique<ExtentPlacementManager>();
auto cache = std::make_unique<Cache>(scanner_ref, *epm);
auto lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache);
*segment_cleaner,
*segment_manager));
- journal->set_segment_provider(&*segment_cleaner);
-
tm = std::make_unique<TransactionManager>(
*segment_manager,
std::move(segment_cleaner),
seastar::future<> set_up_fut() final {
segment_manager = segment_manager::create_test_ephemeral();
scanner.reset(new ExtentReader());
- journal.reset(new Journal(*segment_manager, *scanner));
+ auto& scanner_ref = *scanner.get();
+ journal = journal::make_segmented(
+ *segment_manager, scanner_ref, *this);
epm.reset(new ExtentPlacementManager());
- cache.reset(new Cache(*scanner, *epm));
+ cache.reset(new Cache(scanner_ref, *epm));
block_size = segment_manager->get_block_size();
next = segment_id_t{segment_manager->get_device_id(), 0};
- scanner->add_segment_manager(segment_manager.get());
- journal->set_segment_provider(this);
+ scanner_ref.add_segment_manager(segment_manager.get());
journal->set_write_pipeline(&pipeline);
return segment_manager->init(
struct journal_test_t : seastar_test_suite_t, SegmentProvider {
segment_manager::EphemeralSegmentManagerRef segment_manager;
WritePipeline pipeline;
- std::unique_ptr<Journal> journal;
+ JournalRef journal;
std::vector<record_validator_t> records;
block_size = segment_manager->get_block_size();
scanner.reset(new ExtentReader());
next = segment_id_t(segment_manager->get_device_id(), 0);
- journal.reset(new Journal(*segment_manager, *scanner));
-
- journal->set_segment_provider(this);
+ journal = journal::make_segmented(*segment_manager, *scanner, *this);
journal->set_write_pipeline(&pipeline);
scanner->add_segment_manager(segment_manager.get());
return segment_manager->init(
auto replay(T &&f) {
return journal->close(
).safe_then([this, f=std::move(f)]() mutable {
- journal.reset(new Journal(*segment_manager, *scanner));
- journal->set_segment_provider(this);
+ journal = journal::make_segmented(
+ *segment_manager, *scanner, *this);
journal->set_write_pipeline(&pipeline);
- return seastar::do_with(
- std::vector<std::pair<segment_id_t, segment_header_t>>(),
- [this](auto& segments) {
- return crimson::do_for_each(
- boost::make_counting_iterator(device_segment_id_t{0}),
- boost::make_counting_iterator(device_segment_id_t{
- segment_manager->get_num_segments()}),
- [this, &segments](auto segment_id) {
- return scanner->read_segment_header(segment_id_t{0, segment_id})
- .safe_then([&segments, segment_id](auto header) {
- if (header.get_type() == segment_type_t::JOURNAL) {
- segments.emplace_back(
- std::make_pair(
- segment_id_t{0, segment_id},
- std::move(header)
- ));
- }
- return seastar::now();
- }).handle_error(
- crimson::ct_error::enoent::handle([](auto) {
- return SegmentCleaner::init_segments_ertr::now();
- }),
- crimson::ct_error::enodata::handle([](auto) {
- return SegmentCleaner::init_segments_ertr::now();
- }),
- crimson::ct_error::input_output_error::pass_further{}
- );
- }).safe_then([&segments] {
- return seastar::make_ready_future<
- std::vector<std::pair<segment_id_t, segment_header_t>>>(
- std::move(segments));
- });
- }).safe_then([this, f=std::move(f)](auto&& segments) mutable {
- return journal->replay(
- std::move(segments),
- std::forward<T>(std::move(f)));
- }).safe_then([this] {
- return journal->open_for_write();
- });
+ return journal->replay(std::forward<T>(std::move(f)));
+ }).safe_then([this] {
+ return journal->open_for_write();
});
}
SegmentCleaner::config_t::get_default(),
std::move(scanner),
true);
- auto journal = std::make_unique<Journal>(segment_manager, scanner_ref);
+ auto journal = journal::make_segmented(
+ segment_manager,
+ scanner_ref,
+ *segment_cleaner);
auto epm = std::make_unique<ExtentPlacementManager>();
auto cache = std::make_unique<Cache>(scanner_ref, *epm);
auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);
*segment_cleaner,
segment_manager));
- journal->set_segment_provider(&*segment_cleaner);
-
return std::make_unique<TransactionManager>(
segment_manager,
std::move(segment_cleaner),