random_block_manager/avlallocator.cc
journal/segmented_journal.cc
journal/segment_allocator.cc
+ journal/record_submitter.cc
journal.cc
device.cc
segment_manager_group.cc
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/journal/segment_allocator.h"
+#include "crimson/os/seastore/journal/record_submitter.h"
#include "crimson/os/seastore/transaction.h"
#include "crimson/os/seastore/random_block_manager.h"
#include "crimson/os/seastore/random_block_manager/block_rb_manager.h"
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "record_submitter.h"
+
+#include <fmt/format.h>
+#include <fmt/os.h>
+
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/async_cleaner.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+RecordBatch::add_pending_ret
+RecordBatch::add_pending(
+ const std::string& name,
+ record_t&& record,
+ extent_len_t block_size)
+{
+ LOG_PREFIX(RecordBatch::add_pending);
+ auto new_size = get_encoded_length_after(record, block_size);
+ auto dlength_offset = pending.size.dlength;
+ TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
+ name,
+ pending.get_size() + 1,
+ new_size.get_encoded_length(),
+ dlength_offset);
+ assert(state != state_t::SUBMITTING);
+ assert(evaluate_submit(record.size, block_size).submit_size == new_size);
+
+ pending.push_back(
+ std::move(record), block_size);
+ assert(pending.size == new_size);
+ if (state == state_t::EMPTY) {
+ assert(!io_promise.has_value());
+ io_promise = seastar::shared_promise<maybe_promise_result_t>();
+ } else {
+ assert(io_promise.has_value());
+ }
+ state = state_t::PENDING;
+
+ return io_promise->get_shared_future(
+ ).then([dlength_offset, FNAME, &name
+ ](auto maybe_promise_result) -> add_pending_ret {
+ if (!maybe_promise_result.has_value()) {
+ ERROR("{} write failed", name);
+ return crimson::ct_error::input_output_error::make();
+ }
+ auto write_result = maybe_promise_result->write_result;
+ auto submit_result = record_locator_t{
+ write_result.start_seq.offset.add_offset(
+ maybe_promise_result->mdlength + dlength_offset),
+ write_result
+ };
+ TRACE("{} write finish with {}", name, submit_result);
+ return add_pending_ret(
+ add_pending_ertr::ready_future_marker{},
+ submit_result);
+ });
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+RecordBatch::encode_batch(
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce)
+{
+ assert(state == state_t::PENDING);
+ assert(pending.get_size() > 0);
+ assert(io_promise.has_value());
+
+ state = state_t::SUBMITTING;
+ submitting_size = pending.get_size();
+ auto gsize = pending.size;
+ submitting_length = gsize.get_encoded_length();
+ submitting_mdlength = gsize.get_mdlength();
+ auto bl = encode_records(pending, committed_to, segment_nonce);
+ // Note: pending is cleared here
+ assert(bl.length() == submitting_length);
+ return std::make_pair(bl, gsize);
+}
+
+void RecordBatch::set_result(
+ maybe_result_t maybe_write_result)
+{
+ maybe_promise_result_t result;
+ if (maybe_write_result.has_value()) {
+ assert(maybe_write_result->length == submitting_length);
+ result = promise_result_t{
+ *maybe_write_result,
+ submitting_mdlength
+ };
+ }
+ assert(state == state_t::SUBMITTING);
+ assert(io_promise.has_value());
+
+ state = state_t::EMPTY;
+ submitting_size = 0;
+ submitting_length = 0;
+ submitting_mdlength = 0;
+ io_promise->set_value(result);
+ io_promise.reset();
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+RecordBatch::submit_pending_fast(
+ record_t&& record,
+ extent_len_t block_size,
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce)
+{
+ auto new_size = get_encoded_length_after(record, block_size);
+ std::ignore = new_size;
+ assert(state == state_t::EMPTY);
+ assert(evaluate_submit(record.size, block_size).submit_size == new_size);
+
+ auto group = record_group_t(std::move(record), block_size);
+ auto size = group.size;
+ assert(size == new_size);
+ auto bl = encode_records(group, committed_to, segment_nonce);
+ assert(bl.length() == size.get_encoded_length());
+ return std::make_pair(std::move(bl), size);
+}
+
+RecordSubmitter::RecordSubmitter(
+ std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ double preferred_fullness,
+ SegmentAllocator& sa)
+ : io_depth_limit{io_depth},
+ preferred_fullness{preferred_fullness},
+ segment_allocator{sa},
+ batches(new RecordBatch[io_depth + 1])
+{
+ LOG_PREFIX(RecordSubmitter);
+ INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
+ "preferred_fullness={}",
+ get_name(), io_depth, batch_capacity,
+ batch_flush_size, preferred_fullness);
+ ceph_assert(io_depth > 0);
+ ceph_assert(batch_capacity > 0);
+ ceph_assert(preferred_fullness >= 0 &&
+ preferred_fullness <= 1);
+ free_batch_ptrs.reserve(io_depth + 1);
+ for (std::size_t i = 0; i <= io_depth; ++i) {
+ batches[i].initialize(i, batch_capacity, batch_flush_size);
+ free_batch_ptrs.push_back(&batches[i]);
+ }
+ pop_free_batch();
+}
+
+bool RecordSubmitter::is_available() const
+{
+ auto ret = !wait_available_promise.has_value() &&
+ !has_io_error;
+#ifndef NDEBUG
+ if (ret) {
+ // unconditional invariants
+ ceph_assert(segment_allocator.can_write());
+ ceph_assert(p_current_batch != nullptr);
+ ceph_assert(!p_current_batch->is_submitting());
+ // the current batch accepts a further write
+ ceph_assert(!p_current_batch->needs_flush());
+ if (!p_current_batch->is_empty()) {
+ auto submit_length =
+ p_current_batch->get_submit_size().get_encoded_length();
+ ceph_assert(!segment_allocator.needs_roll(submit_length));
+ }
+ // I'm not rolling
+ }
+#endif
+ return ret;
+}
+
+RecordSubmitter::wa_ertr::future<>
+RecordSubmitter::wait_available()
+{
+ LOG_PREFIX(RecordSubmitter::wait_available);
+ assert(!is_available());
+ if (has_io_error) {
+ ERROR("{} I/O is failed before wait", get_name());
+ return crimson::ct_error::input_output_error::make();
+ }
+ return wait_available_promise->get_shared_future(
+ ).then([FNAME, this]() -> wa_ertr::future<> {
+ if (has_io_error) {
+ ERROR("{} I/O is failed after wait", get_name());
+ return crimson::ct_error::input_output_error::make();
+ }
+ return wa_ertr::now();
+ });
+}
+
+RecordSubmitter::action_t
+RecordSubmitter::check_action(
+ const record_size_t& rsize) const
+{
+ assert(is_available());
+ auto eval = p_current_batch->evaluate_submit(
+ rsize, segment_allocator.get_block_size());
+ if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
+ return action_t::ROLL;
+ } else if (eval.is_full) {
+ return action_t::SUBMIT_FULL;
+ } else {
+ return action_t::SUBMIT_NOT_FULL;
+ }
+}
+
+RecordSubmitter::roll_segment_ertr::future<>
+RecordSubmitter::roll_segment()
+{
+ LOG_PREFIX(RecordSubmitter::roll_segment);
+ ceph_assert(p_current_batch->needs_flush() ||
+ is_available());
+ // #1 block concurrent submissions due to rolling
+ wait_available_promise = seastar::shared_promise<>();
+ ceph_assert(!wait_unfull_flush_promise.has_value());
+ return [FNAME, this] {
+ if (p_current_batch->is_pending()) {
+ if (state == state_t::FULL) {
+ DEBUG("{} wait flush ...", get_name());
+ wait_unfull_flush_promise = seastar::promise<>();
+ return wait_unfull_flush_promise->get_future();
+ } else { // IDLE/PENDING
+ DEBUG("{} flush", get_name());
+ flush_current_batch();
+ return seastar::now();
+ }
+ } else {
+ assert(p_current_batch->is_empty());
+ return seastar::now();
+ }
+ }().then_wrapped([FNAME, this](auto fut) {
+ if (fut.failed()) {
+ ERROR("{} rolling is skipped unexpectedly, available", get_name());
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ return roll_segment_ertr::now();
+ } else {
+ // start rolling in background
+ std::ignore = segment_allocator.roll(
+ ).safe_then([FNAME, this] {
+ // good
+ DEBUG("{} rolling done, available", get_name());
+ assert(!has_io_error);
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ }).handle_error(
+ crimson::ct_error::all_same_way([FNAME, this](auto e) {
+ ERROR("{} got error {}, available", get_name(), e);
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ })
+ ).handle_exception([FNAME, this](auto e) {
+ ERROR("{} got exception {}, available", get_name(), e);
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ });
+ // wait for background rolling
+ return wait_available();
+ }
+ });
+}
+
+RecordSubmitter::submit_ret
+RecordSubmitter::submit(
+ record_t&& record,
+ bool with_atomic_roll_segment)
+{
+ LOG_PREFIX(RecordSubmitter::submit);
+ ceph_assert(is_available());
+ assert(check_action(record.size) != action_t::ROLL);
+ segment_allocator.get_provider().update_modify_time(
+ segment_allocator.get_segment_id(),
+ record.modify_time,
+ record.extents.size());
+ auto eval = p_current_batch->evaluate_submit(
+ record.size, segment_allocator.get_block_size());
+ bool needs_flush = (
+ state == state_t::IDLE ||
+ eval.submit_size.get_fullness() > preferred_fullness ||
+ // RecordBatch::needs_flush()
+ eval.is_full ||
+ p_current_batch->get_num_records() + 1 >=
+ p_current_batch->get_batch_capacity());
+ if (p_current_batch->is_empty() &&
+ needs_flush &&
+ state != state_t::FULL) {
+ // fast path with direct write
+ increment_io();
+ auto [to_write, sizes] = p_current_batch->submit_pending_fast(
+ std::move(record),
+ segment_allocator.get_block_size(),
+ committed_to,
+ segment_allocator.get_nonce());
+ DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
+ get_name(), sizes, committed_to, num_outstanding_io);
+ account_submission(1, sizes);
+ return segment_allocator.write(std::move(to_write)
+ ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+ return record_locator_t{
+ write_result.start_seq.offset.add_offset(mdlength),
+ write_result
+ };
+ }).finally([this] {
+ decrement_io_with_flush();
+ });
+ }
+ // indirect batched write
+ auto write_fut = p_current_batch->add_pending(
+ get_name(),
+ std::move(record),
+ segment_allocator.get_block_size());
+ if (needs_flush) {
+ if (state == state_t::FULL) {
+ // #2 block concurrent submissions due to lack of resource
+ DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
+ get_name(),
+ p_current_batch->get_num_records(),
+ num_outstanding_io);
+ if (with_atomic_roll_segment) {
+ // wait_available_promise and wait_unfull_flush_promise
+ // need to be delegated to the follow-up atomic roll_segment();
+ assert(p_current_batch->is_pending());
+ } else {
+ wait_available_promise = seastar::shared_promise<>();
+ ceph_assert(!wait_unfull_flush_promise.has_value());
+ wait_unfull_flush_promise = seastar::promise<>();
+ // flush and mark available in background
+ std::ignore = wait_unfull_flush_promise->get_future(
+ ).finally([FNAME, this] {
+ DEBUG("{} flush done, available", get_name());
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ });
+ }
+ } else {
+ DEBUG("{} added pending, flush", get_name());
+ flush_current_batch();
+ }
+ } else {
+ // will flush later
+ DEBUG("{} added with {} pending, outstanding_io={}",
+ get_name(),
+ p_current_batch->get_num_records(),
+ num_outstanding_io);
+ assert(!p_current_batch->needs_flush());
+ }
+ return write_fut;
+}
+
+RecordSubmitter::open_ret
+RecordSubmitter::open(bool is_mkfs)
+{
+ return segment_allocator.open(is_mkfs
+ ).safe_then([this](journal_seq_t ret) {
+ LOG_PREFIX(RecordSubmitter::open);
+ DEBUG("{} register metrics", get_name());
+ stats = {};
+ namespace sm = seastar::metrics;
+ std::vector<sm::label_instance> label_instances;
+ label_instances.push_back(sm::label_instance("submitter", get_name()));
+ metrics.add_group(
+ "journal",
+ {
+ sm::make_counter(
+ "record_num",
+ stats.record_batch_stats.num_io,
+ sm::description("total number of records submitted"),
+ label_instances
+ ),
+ sm::make_counter(
+ "record_batch_num",
+ stats.record_batch_stats.num_io_grouped,
+ sm::description("total number of records batched"),
+ label_instances
+ ),
+ sm::make_counter(
+ "io_num",
+ stats.io_depth_stats.num_io,
+ sm::description("total number of io submitted"),
+ label_instances
+ ),
+ sm::make_counter(
+ "io_depth_num",
+ stats.io_depth_stats.num_io_grouped,
+ sm::description("total number of io depth"),
+ label_instances
+ ),
+ sm::make_counter(
+ "record_group_padding_bytes",
+ stats.record_group_padding_bytes,
+ sm::description("bytes of metadata padding when write record groups"),
+ label_instances
+ ),
+ sm::make_counter(
+ "record_group_metadata_bytes",
+ stats.record_group_metadata_bytes,
+ sm::description("bytes of raw metadata when write record groups"),
+ label_instances
+ ),
+ sm::make_counter(
+ "record_group_data_bytes",
+ stats.record_group_data_bytes,
+ sm::description("bytes of data when write record groups"),
+ label_instances
+ ),
+ }
+ );
+ return ret;
+ });
+}
+
+RecordSubmitter::close_ertr::future<>
+RecordSubmitter::close()
+{
+ ceph_assert(state == state_t::IDLE);
+ ceph_assert(num_outstanding_io == 0);
+ committed_to = JOURNAL_SEQ_NULL;
+ ceph_assert(p_current_batch != nullptr);
+ ceph_assert(p_current_batch->is_empty());
+ ceph_assert(!wait_available_promise.has_value());
+ has_io_error = false;
+ ceph_assert(!wait_unfull_flush_promise.has_value());
+ metrics.clear();
+ return segment_allocator.close();
+}
+
+void RecordSubmitter::update_state()
+{
+ if (num_outstanding_io == 0) {
+ state = state_t::IDLE;
+ } else if (num_outstanding_io < io_depth_limit) {
+ state = state_t::PENDING;
+ } else if (num_outstanding_io == io_depth_limit) {
+ state = state_t::FULL;
+ } else {
+ ceph_abort("fatal error: io-depth overflow");
+ }
+}
+
+void RecordSubmitter::decrement_io_with_flush()
+{
+ LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
+ assert(num_outstanding_io > 0);
+ auto prv_state = state;
+ --num_outstanding_io;
+ update_state();
+
+ if (prv_state == state_t::FULL) {
+ if (wait_unfull_flush_promise.has_value()) {
+ DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
+ assert(!p_current_batch->is_empty());
+ assert(wait_available_promise.has_value());
+ flush_current_batch();
+ wait_unfull_flush_promise->set_value();
+ wait_unfull_flush_promise.reset();
+ return;
+ }
+ } else {
+ ceph_assert(!wait_unfull_flush_promise.has_value());
+ }
+
+ auto needs_flush = (
+ !p_current_batch->is_empty() && (
+ state == state_t::IDLE ||
+ p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
+ p_current_batch->needs_flush()
+ ));
+ if (needs_flush) {
+ DEBUG("{} flush", get_name());
+ flush_current_batch();
+ }
+}
+
+void RecordSubmitter::account_submission(
+ std::size_t num,
+ const record_group_size_t& size)
+{
+ stats.record_group_padding_bytes +=
+ (size.get_mdlength() - size.get_raw_mdlength());
+ stats.record_group_metadata_bytes += size.get_raw_mdlength();
+ stats.record_group_data_bytes += size.dlength;
+ stats.record_batch_stats.increment(num);
+}
+
+void RecordSubmitter::finish_submit_batch(
+ RecordBatch* p_batch,
+ maybe_result_t maybe_result)
+{
+ assert(p_batch->is_submitting());
+ p_batch->set_result(maybe_result);
+ free_batch_ptrs.push_back(p_batch);
+ decrement_io_with_flush();
+}
+
+void RecordSubmitter::flush_current_batch()
+{
+ LOG_PREFIX(RecordSubmitter::flush_current_batch);
+ RecordBatch* p_batch = p_current_batch;
+ assert(p_batch->is_pending());
+ p_current_batch = nullptr;
+ pop_free_batch();
+
+ increment_io();
+ auto num = p_batch->get_num_records();
+ auto [to_write, sizes] = p_batch->encode_batch(
+ committed_to, segment_allocator.get_nonce());
+ DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
+ get_name(), num, sizes, committed_to, num_outstanding_io);
+ account_submission(num, sizes);
+ std::ignore = segment_allocator.write(std::move(to_write)
+ ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+ TRACE("{} {} records, {}, write done with {}",
+ get_name(), num, sizes, write_result);
+ finish_submit_batch(p_batch, write_result);
+ }).handle_error(
+ crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} {} records, {}, got error {}",
+ get_name(), num, sizes, e);
+ finish_submit_batch(p_batch, std::nullopt);
+ })
+ ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} {} records, {}, got exception {}",
+ get_name(), num, sizes, e);
+ finish_submit_batch(p_batch, std::nullopt);
+ });
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <optional>
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/metrics.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/buffer.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/segment_manager_group.h"
+#include "crimson/os/seastore/segment_seq_allocator.h"
+#include "crimson/os/seastore/journal/segment_allocator.h"
+
+namespace crimson::os::seastore {
+ class SegmentProvider;
+ class JournalTrimmer;
+}
+
+namespace crimson::os::seastore::journal {
+
+/**
+ * RecordBatch
+ *
+ * Maintain a batch of records for submit.
+ */
+class RecordBatch {
+ enum class state_t {
+ EMPTY = 0,
+ PENDING,
+ SUBMITTING
+ };
+
+public:
+ RecordBatch() = default;
+ RecordBatch(RecordBatch&&) = delete;
+ RecordBatch(const RecordBatch&) = delete;
+ RecordBatch& operator=(RecordBatch&&) = delete;
+ RecordBatch& operator=(const RecordBatch&) = delete;
+
+ bool is_empty() const {
+ return state == state_t::EMPTY;
+ }
+
+ bool is_pending() const {
+ return state == state_t::PENDING;
+ }
+
+ bool is_submitting() const {
+ return state == state_t::SUBMITTING;
+ }
+
+ std::size_t get_index() const {
+ return index;
+ }
+
+ std::size_t get_num_records() const {
+ return pending.get_size();
+ }
+
+ std::size_t get_batch_capacity() const {
+ return batch_capacity;
+ }
+
+ const record_group_size_t& get_submit_size() const {
+ assert(state != state_t::EMPTY);
+ return pending.size;
+ }
+
+ bool needs_flush() const {
+ assert(state != state_t::SUBMITTING);
+ assert(pending.get_size() <= batch_capacity);
+ if (state == state_t::EMPTY) {
+ return false;
+ } else {
+ assert(state == state_t::PENDING);
+ return (pending.get_size() >= batch_capacity ||
+ pending.size.get_encoded_length() > batch_flush_size);
+ }
+ }
+
+ struct evaluation_t {
+ record_group_size_t submit_size;
+ bool is_full;
+ };
+ evaluation_t evaluate_submit(
+ const record_size_t& rsize,
+ extent_len_t block_size) const {
+ assert(!needs_flush());
+ auto submit_size = pending.size.get_encoded_length_after(
+ rsize, block_size);
+ bool is_full = submit_size.get_encoded_length() > batch_flush_size;
+ return {submit_size, is_full};
+ }
+
+ void initialize(std::size_t i,
+ std::size_t _batch_capacity,
+ std::size_t _batch_flush_size) {
+ ceph_assert(_batch_capacity > 0);
+ index = i;
+ batch_capacity = _batch_capacity;
+ batch_flush_size = _batch_flush_size;
+ pending.reserve(batch_capacity);
+ }
+
+ // Add to the batch, the future will be resolved after the batch is
+ // written.
+ //
+ // Set write_result_t::write_length to 0 if the record is not the first one
+ // in the batch.
+ using add_pending_ertr = SegmentAllocator::write_ertr;
+ using add_pending_ret = add_pending_ertr::future<record_locator_t>;
+ add_pending_ret add_pending(
+ const std::string& name,
+ record_t&&,
+ extent_len_t block_size);
+
+ // Encode the batched records for write.
+ std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce);
+
+ // Set the write result and reset for reuse
+ using maybe_result_t = std::optional<write_result_t>;
+ void set_result(maybe_result_t maybe_write_end_seq);
+
+ // The fast path that is equivalent to submit a single record as a batch.
+ //
+ // Essentially, equivalent to the combined logic of:
+ // add_pending(), encode_batch() and set_result() above without
+ // the intervention of the shared io_promise.
+ //
+ // Note the current RecordBatch can be reused afterwards.
+ std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
+ record_t&&,
+ extent_len_t block_size,
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce);
+
+private:
+ record_group_size_t get_encoded_length_after(
+ const record_t& record,
+ extent_len_t block_size) const {
+ return pending.size.get_encoded_length_after(
+ record.size, block_size);
+ }
+
+ state_t state = state_t::EMPTY;
+ std::size_t index = 0;
+ std::size_t batch_capacity = 0;
+ std::size_t batch_flush_size = 0;
+
+ record_group_t pending;
+ std::size_t submitting_size = 0;
+ extent_len_t submitting_length = 0;
+ extent_len_t submitting_mdlength = 0;
+
+ struct promise_result_t {
+ write_result_t write_result;
+ extent_len_t mdlength;
+ };
+ using maybe_promise_result_t = std::optional<promise_result_t>;
+ std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
+};
+
+/**
+ * RecordSubmitter
+ *
+ * Submit records concurrently with RecordBatch with SegmentAllocator.
+ *
+ * Configurations and controls:
+ * - io_depth: the io-depth limit to SegmentAllocator;
+ * - batch_capacity: the number limit of records in a RecordBatch;
+ * - batch_flush_size: the bytes threshold to force flush a RecordBatch to
+ * control the maximum latency;
+ * - preferred_fullness: the fullness threshold to flush a RecordBatch;
+ */
+class RecordSubmitter {
+ enum class state_t {
+ IDLE = 0, // outstanding_io == 0
+ PENDING, // outstanding_io < io_depth_limit
+ FULL // outstanding_io == io_depth_limit
+ // OVERFLOW: outstanding_io > io_depth_limit is impossible
+ };
+
+ struct grouped_io_stats {
+ uint64_t num_io = 0;
+ uint64_t num_io_grouped = 0;
+
+ void increment(uint64_t num_grouped_io) {
+ ++num_io;
+ num_io_grouped += num_grouped_io;
+ }
+ };
+
+ using base_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+
+public:
+ RecordSubmitter(std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ double preferred_fullness,
+ SegmentAllocator&);
+
+ const std::string& get_name() const {
+ return segment_allocator.get_name();
+ }
+
+ journal_seq_t get_committed_to() const {
+ return committed_to;
+ }
+
+ // whether is available to submit a record
+ bool is_available() const;
+
+ // wait for available if cannot submit, should check is_available() again
+ // when the future is resolved.
+ using wa_ertr = base_ertr;
+ wa_ertr::future<> wait_available();
+
+ // when available, check for the submit action
+ // according to the pending record size
+ enum class action_t {
+ ROLL,
+ SUBMIT_FULL,
+ SUBMIT_NOT_FULL
+ };
+ action_t check_action(const record_size_t&) const;
+
+ // when available, roll the segment if needed
+ using roll_segment_ertr = base_ertr;
+ roll_segment_ertr::future<> roll_segment();
+
+ // when available, submit the record if possible
+ using submit_ertr = base_ertr;
+ using submit_ret = submit_ertr::future<record_locator_t>;
+ submit_ret submit(record_t&&, bool with_atomic_roll_segment=false);
+
+ void update_committed_to(const journal_seq_t& new_committed_to) {
+ assert(new_committed_to != JOURNAL_SEQ_NULL);
+ assert(committed_to == JOURNAL_SEQ_NULL ||
+ committed_to <= new_committed_to);
+ committed_to = new_committed_to;
+ }
+
+ // open for write, generate the correct print name, and register metrics
+ using open_ertr = base_ertr;
+ using open_ret = open_ertr::future<journal_seq_t>;
+ open_ret open(bool is_mkfs);
+
+ using close_ertr = base_ertr;
+ close_ertr::future<> close();
+
+private:
+ void update_state();
+
+ void increment_io() {
+ ++num_outstanding_io;
+ stats.io_depth_stats.increment(num_outstanding_io);
+ update_state();
+ }
+
+ void decrement_io_with_flush();
+
+ void pop_free_batch() {
+ assert(p_current_batch == nullptr);
+ assert(!free_batch_ptrs.empty());
+ p_current_batch = free_batch_ptrs.front();
+ assert(p_current_batch->is_empty());
+ assert(p_current_batch == &batches[p_current_batch->get_index()]);
+ free_batch_ptrs.pop_front();
+ }
+
+ void account_submission(std::size_t, const record_group_size_t&);
+
+ using maybe_result_t = RecordBatch::maybe_result_t;
+ void finish_submit_batch(RecordBatch*, maybe_result_t);
+
+ void flush_current_batch();
+
+ state_t state = state_t::IDLE;
+ std::size_t num_outstanding_io = 0;
+ std::size_t io_depth_limit;
+ double preferred_fullness;
+
+ SegmentAllocator& segment_allocator;
+ // committed_to may be in a previous journal segment
+ journal_seq_t committed_to = JOURNAL_SEQ_NULL;
+
+ std::unique_ptr<RecordBatch[]> batches;
+ // should not be nullptr after constructed
+ RecordBatch* p_current_batch = nullptr;
+ seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
+
+ // blocked for rolling or lack of resource
+ std::optional<seastar::shared_promise<> > wait_available_promise;
+ bool has_io_error = false;
+ // when needs flush but io depth is full,
+ // wait for decrement_io_with_flush()
+ std::optional<seastar::promise<> > wait_unfull_flush_promise;
+
+ struct {
+ grouped_io_stats record_batch_stats;
+ grouped_io_stats io_depth_stats;
+ uint64_t record_group_padding_bytes = 0;
+ uint64_t record_group_metadata_bytes = 0;
+ uint64_t record_group_data_bytes = 0;
+ } stats;
+ seastar::metrics::metric_group metrics;
+};
+
+}
}
-RecordBatch::add_pending_ret
-RecordBatch::add_pending(
- const std::string& name,
- record_t&& record,
- extent_len_t block_size)
-{
- LOG_PREFIX(RecordBatch::add_pending);
- auto new_size = get_encoded_length_after(record, block_size);
- auto dlength_offset = pending.size.dlength;
- TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
- name,
- pending.get_size() + 1,
- new_size.get_encoded_length(),
- dlength_offset);
- assert(state != state_t::SUBMITTING);
- assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
- pending.push_back(
- std::move(record), block_size);
- assert(pending.size == new_size);
- if (state == state_t::EMPTY) {
- assert(!io_promise.has_value());
- io_promise = seastar::shared_promise<maybe_promise_result_t>();
- } else {
- assert(io_promise.has_value());
- }
- state = state_t::PENDING;
-
- return io_promise->get_shared_future(
- ).then([dlength_offset, FNAME, &name
- ](auto maybe_promise_result) -> add_pending_ret {
- if (!maybe_promise_result.has_value()) {
- ERROR("{} write failed", name);
- return crimson::ct_error::input_output_error::make();
- }
- auto write_result = maybe_promise_result->write_result;
- auto submit_result = record_locator_t{
- write_result.start_seq.offset.add_offset(
- maybe_promise_result->mdlength + dlength_offset),
- write_result
- };
- TRACE("{} write finish with {}", name, submit_result);
- return add_pending_ret(
- add_pending_ertr::ready_future_marker{},
- submit_result);
- });
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-RecordBatch::encode_batch(
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce)
-{
- assert(state == state_t::PENDING);
- assert(pending.get_size() > 0);
- assert(io_promise.has_value());
-
- state = state_t::SUBMITTING;
- submitting_size = pending.get_size();
- auto gsize = pending.size;
- submitting_length = gsize.get_encoded_length();
- submitting_mdlength = gsize.get_mdlength();
- auto bl = encode_records(pending, committed_to, segment_nonce);
- // Note: pending is cleared here
- assert(bl.length() == submitting_length);
- return std::make_pair(bl, gsize);
-}
-
-void RecordBatch::set_result(
- maybe_result_t maybe_write_result)
-{
- maybe_promise_result_t result;
- if (maybe_write_result.has_value()) {
- assert(maybe_write_result->length == submitting_length);
- result = promise_result_t{
- *maybe_write_result,
- submitting_mdlength
- };
- }
- assert(state == state_t::SUBMITTING);
- assert(io_promise.has_value());
-
- state = state_t::EMPTY;
- submitting_size = 0;
- submitting_length = 0;
- submitting_mdlength = 0;
- io_promise->set_value(result);
- io_promise.reset();
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-RecordBatch::submit_pending_fast(
- record_t&& record,
- extent_len_t block_size,
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce)
-{
- auto new_size = get_encoded_length_after(record, block_size);
- std::ignore = new_size;
- assert(state == state_t::EMPTY);
- assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
- auto group = record_group_t(std::move(record), block_size);
- auto size = group.size;
- assert(size == new_size);
- auto bl = encode_records(group, committed_to, segment_nonce);
- assert(bl.length() == size.get_encoded_length());
- return std::make_pair(std::move(bl), size);
-}
-
-RecordSubmitter::RecordSubmitter(
- std::size_t io_depth,
- std::size_t batch_capacity,
- std::size_t batch_flush_size,
- double preferred_fullness,
- SegmentAllocator& sa)
- : io_depth_limit{io_depth},
- preferred_fullness{preferred_fullness},
- segment_allocator{sa},
- batches(new RecordBatch[io_depth + 1])
-{
- LOG_PREFIX(RecordSubmitter);
- INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
- "preferred_fullness={}",
- get_name(), io_depth, batch_capacity,
- batch_flush_size, preferred_fullness);
- ceph_assert(io_depth > 0);
- ceph_assert(batch_capacity > 0);
- ceph_assert(preferred_fullness >= 0 &&
- preferred_fullness <= 1);
- free_batch_ptrs.reserve(io_depth + 1);
- for (std::size_t i = 0; i <= io_depth; ++i) {
- batches[i].initialize(i, batch_capacity, batch_flush_size);
- free_batch_ptrs.push_back(&batches[i]);
- }
- pop_free_batch();
-}
-
-bool RecordSubmitter::is_available() const
-{
- auto ret = !wait_available_promise.has_value() &&
- !has_io_error;
-#ifndef NDEBUG
- if (ret) {
- // unconditional invariants
- ceph_assert(segment_allocator.can_write());
- ceph_assert(p_current_batch != nullptr);
- ceph_assert(!p_current_batch->is_submitting());
- // the current batch accepts a further write
- ceph_assert(!p_current_batch->needs_flush());
- if (!p_current_batch->is_empty()) {
- auto submit_length =
- p_current_batch->get_submit_size().get_encoded_length();
- ceph_assert(!segment_allocator.needs_roll(submit_length));
- }
- // I'm not rolling
- }
-#endif
- return ret;
-}
-
-RecordSubmitter::wa_ertr::future<>
-RecordSubmitter::wait_available()
-{
- LOG_PREFIX(RecordSubmitter::wait_available);
- assert(!is_available());
- if (has_io_error) {
- ERROR("{} I/O is failed before wait", get_name());
- return crimson::ct_error::input_output_error::make();
- }
- return wait_available_promise->get_shared_future(
- ).then([FNAME, this]() -> wa_ertr::future<> {
- if (has_io_error) {
- ERROR("{} I/O is failed after wait", get_name());
- return crimson::ct_error::input_output_error::make();
- }
- return wa_ertr::now();
- });
-}
-
-RecordSubmitter::action_t
-RecordSubmitter::check_action(
- const record_size_t& rsize) const
-{
- assert(is_available());
- auto eval = p_current_batch->evaluate_submit(
- rsize, segment_allocator.get_block_size());
- if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
- return action_t::ROLL;
- } else if (eval.is_full) {
- return action_t::SUBMIT_FULL;
- } else {
- return action_t::SUBMIT_NOT_FULL;
- }
-}
-
-RecordSubmitter::roll_segment_ertr::future<>
-RecordSubmitter::roll_segment()
-{
- LOG_PREFIX(RecordSubmitter::roll_segment);
- ceph_assert(p_current_batch->needs_flush() ||
- is_available());
- // #1 block concurrent submissions due to rolling
- wait_available_promise = seastar::shared_promise<>();
- ceph_assert(!wait_unfull_flush_promise.has_value());
- return [FNAME, this] {
- if (p_current_batch->is_pending()) {
- if (state == state_t::FULL) {
- DEBUG("{} wait flush ...", get_name());
- wait_unfull_flush_promise = seastar::promise<>();
- return wait_unfull_flush_promise->get_future();
- } else { // IDLE/PENDING
- DEBUG("{} flush", get_name());
- flush_current_batch();
- return seastar::now();
- }
- } else {
- assert(p_current_batch->is_empty());
- return seastar::now();
- }
- }().then_wrapped([FNAME, this](auto fut) {
- if (fut.failed()) {
- ERROR("{} rolling is skipped unexpectedly, available", get_name());
- has_io_error = true;
- wait_available_promise->set_value();
- wait_available_promise.reset();
- return roll_segment_ertr::now();
- } else {
- // start rolling in background
- std::ignore = segment_allocator.roll(
- ).safe_then([FNAME, this] {
- // good
- DEBUG("{} rolling done, available", get_name());
- assert(!has_io_error);
- wait_available_promise->set_value();
- wait_available_promise.reset();
- }).handle_error(
- crimson::ct_error::all_same_way([FNAME, this](auto e) {
- ERROR("{} got error {}, available", get_name(), e);
- has_io_error = true;
- wait_available_promise->set_value();
- wait_available_promise.reset();
- })
- ).handle_exception([FNAME, this](auto e) {
- ERROR("{} got exception {}, available", get_name(), e);
- has_io_error = true;
- wait_available_promise->set_value();
- wait_available_promise.reset();
- });
- // wait for background rolling
- return wait_available();
- }
- });
-}
-
-RecordSubmitter::submit_ret
-RecordSubmitter::submit(
- record_t&& record,
- bool with_atomic_roll_segment)
-{
- LOG_PREFIX(RecordSubmitter::submit);
- ceph_assert(is_available());
- assert(check_action(record.size) != action_t::ROLL);
- segment_allocator.get_provider().update_modify_time(
- segment_allocator.get_segment_id(),
- record.modify_time,
- record.extents.size());
- auto eval = p_current_batch->evaluate_submit(
- record.size, segment_allocator.get_block_size());
- bool needs_flush = (
- state == state_t::IDLE ||
- eval.submit_size.get_fullness() > preferred_fullness ||
- // RecordBatch::needs_flush()
- eval.is_full ||
- p_current_batch->get_num_records() + 1 >=
- p_current_batch->get_batch_capacity());
- if (p_current_batch->is_empty() &&
- needs_flush &&
- state != state_t::FULL) {
- // fast path with direct write
- increment_io();
- auto [to_write, sizes] = p_current_batch->submit_pending_fast(
- std::move(record),
- segment_allocator.get_block_size(),
- committed_to,
- segment_allocator.get_nonce());
- DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
- get_name(), sizes, committed_to, num_outstanding_io);
- account_submission(1, sizes);
- return segment_allocator.write(std::move(to_write)
- ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
- return record_locator_t{
- write_result.start_seq.offset.add_offset(mdlength),
- write_result
- };
- }).finally([this] {
- decrement_io_with_flush();
- });
- }
- // indirect batched write
- auto write_fut = p_current_batch->add_pending(
- get_name(),
- std::move(record),
- segment_allocator.get_block_size());
- if (needs_flush) {
- if (state == state_t::FULL) {
- // #2 block concurrent submissions due to lack of resource
- DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
- get_name(),
- p_current_batch->get_num_records(),
- num_outstanding_io);
- if (with_atomic_roll_segment) {
- // wait_available_promise and wait_unfull_flush_promise
- // need to be delegated to the follow-up atomic roll_segment();
- assert(p_current_batch->is_pending());
- } else {
- wait_available_promise = seastar::shared_promise<>();
- ceph_assert(!wait_unfull_flush_promise.has_value());
- wait_unfull_flush_promise = seastar::promise<>();
- // flush and mark available in background
- std::ignore = wait_unfull_flush_promise->get_future(
- ).finally([FNAME, this] {
- DEBUG("{} flush done, available", get_name());
- wait_available_promise->set_value();
- wait_available_promise.reset();
- });
- }
- } else {
- DEBUG("{} added pending, flush", get_name());
- flush_current_batch();
- }
- } else {
- // will flush later
- DEBUG("{} added with {} pending, outstanding_io={}",
- get_name(),
- p_current_batch->get_num_records(),
- num_outstanding_io);
- assert(!p_current_batch->needs_flush());
- }
- return write_fut;
-}
-
-RecordSubmitter::open_ret
-RecordSubmitter::open(bool is_mkfs)
-{
- return segment_allocator.open(is_mkfs
- ).safe_then([this](journal_seq_t ret) {
- LOG_PREFIX(RecordSubmitter::open);
- DEBUG("{} register metrics", get_name());
- stats = {};
- namespace sm = seastar::metrics;
- std::vector<sm::label_instance> label_instances;
- label_instances.push_back(sm::label_instance("submitter", get_name()));
- metrics.add_group(
- "journal",
- {
- sm::make_counter(
- "record_num",
- stats.record_batch_stats.num_io,
- sm::description("total number of records submitted"),
- label_instances
- ),
- sm::make_counter(
- "record_batch_num",
- stats.record_batch_stats.num_io_grouped,
- sm::description("total number of records batched"),
- label_instances
- ),
- sm::make_counter(
- "io_num",
- stats.io_depth_stats.num_io,
- sm::description("total number of io submitted"),
- label_instances
- ),
- sm::make_counter(
- "io_depth_num",
- stats.io_depth_stats.num_io_grouped,
- sm::description("total number of io depth"),
- label_instances
- ),
- sm::make_counter(
- "record_group_padding_bytes",
- stats.record_group_padding_bytes,
- sm::description("bytes of metadata padding when write record groups"),
- label_instances
- ),
- sm::make_counter(
- "record_group_metadata_bytes",
- stats.record_group_metadata_bytes,
- sm::description("bytes of raw metadata when write record groups"),
- label_instances
- ),
- sm::make_counter(
- "record_group_data_bytes",
- stats.record_group_data_bytes,
- sm::description("bytes of data when write record groups"),
- label_instances
- ),
- }
- );
- return ret;
- });
-}
-
-RecordSubmitter::close_ertr::future<>
-RecordSubmitter::close()
-{
- ceph_assert(state == state_t::IDLE);
- ceph_assert(num_outstanding_io == 0);
- committed_to = JOURNAL_SEQ_NULL;
- ceph_assert(p_current_batch != nullptr);
- ceph_assert(p_current_batch->is_empty());
- ceph_assert(!wait_available_promise.has_value());
- has_io_error = false;
- ceph_assert(!wait_unfull_flush_promise.has_value());
- metrics.clear();
- return segment_allocator.close();
-}
-
-void RecordSubmitter::update_state()
-{
- if (num_outstanding_io == 0) {
- state = state_t::IDLE;
- } else if (num_outstanding_io < io_depth_limit) {
- state = state_t::PENDING;
- } else if (num_outstanding_io == io_depth_limit) {
- state = state_t::FULL;
- } else {
- ceph_abort("fatal error: io-depth overflow");
- }
-}
-
-void RecordSubmitter::decrement_io_with_flush()
-{
- LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
- assert(num_outstanding_io > 0);
- auto prv_state = state;
- --num_outstanding_io;
- update_state();
-
- if (prv_state == state_t::FULL) {
- if (wait_unfull_flush_promise.has_value()) {
- DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
- assert(!p_current_batch->is_empty());
- assert(wait_available_promise.has_value());
- flush_current_batch();
- wait_unfull_flush_promise->set_value();
- wait_unfull_flush_promise.reset();
- return;
- }
- } else {
- ceph_assert(!wait_unfull_flush_promise.has_value());
- }
-
- auto needs_flush = (
- !p_current_batch->is_empty() && (
- state == state_t::IDLE ||
- p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
- p_current_batch->needs_flush()
- ));
- if (needs_flush) {
- DEBUG("{} flush", get_name());
- flush_current_batch();
- }
-}
-
-void RecordSubmitter::account_submission(
- std::size_t num,
- const record_group_size_t& size)
-{
- stats.record_group_padding_bytes +=
- (size.get_mdlength() - size.get_raw_mdlength());
- stats.record_group_metadata_bytes += size.get_raw_mdlength();
- stats.record_group_data_bytes += size.dlength;
- stats.record_batch_stats.increment(num);
-}
-
-void RecordSubmitter::finish_submit_batch(
- RecordBatch* p_batch,
- maybe_result_t maybe_result)
-{
- assert(p_batch->is_submitting());
- p_batch->set_result(maybe_result);
- free_batch_ptrs.push_back(p_batch);
- decrement_io_with_flush();
-}
-
-void RecordSubmitter::flush_current_batch()
-{
- LOG_PREFIX(RecordSubmitter::flush_current_batch);
- RecordBatch* p_batch = p_current_batch;
- assert(p_batch->is_pending());
- p_current_batch = nullptr;
- pop_free_batch();
-
- increment_io();
- auto num = p_batch->get_num_records();
- auto [to_write, sizes] = p_batch->encode_batch(
- committed_to, segment_allocator.get_nonce());
- DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
- get_name(), num, sizes, committed_to, num_outstanding_io);
- account_submission(num, sizes);
- std::ignore = segment_allocator.write(std::move(to_write)
- ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
- TRACE("{} {} records, {}, write done with {}",
- get_name(), num, sizes, write_result);
- finish_submit_batch(p_batch, write_result);
- }).handle_error(
- crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} {} records, {}, got error {}",
- get_name(), num, sizes, e);
- finish_submit_batch(p_batch, std::nullopt);
- })
- ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} {} records, {}, got exception {}",
- get_name(), num, sizes, e);
- finish_submit_batch(p_batch, std::nullopt);
- });
-}
-
}
JournalTrimmer *trimmer;
};
-/**
- * RecordBatch
- *
- * Maintain a batch of records for submit.
- */
-class RecordBatch {
- enum class state_t {
- EMPTY = 0,
- PENDING,
- SUBMITTING
- };
-
-public:
- RecordBatch() = default;
- RecordBatch(RecordBatch&&) = delete;
- RecordBatch(const RecordBatch&) = delete;
- RecordBatch& operator=(RecordBatch&&) = delete;
- RecordBatch& operator=(const RecordBatch&) = delete;
-
- bool is_empty() const {
- return state == state_t::EMPTY;
- }
-
- bool is_pending() const {
- return state == state_t::PENDING;
- }
-
- bool is_submitting() const {
- return state == state_t::SUBMITTING;
- }
-
- std::size_t get_index() const {
- return index;
- }
-
- std::size_t get_num_records() const {
- return pending.get_size();
- }
-
- std::size_t get_batch_capacity() const {
- return batch_capacity;
- }
-
- const record_group_size_t& get_submit_size() const {
- assert(state != state_t::EMPTY);
- return pending.size;
- }
-
- bool needs_flush() const {
- assert(state != state_t::SUBMITTING);
- assert(pending.get_size() <= batch_capacity);
- if (state == state_t::EMPTY) {
- return false;
- } else {
- assert(state == state_t::PENDING);
- return (pending.get_size() >= batch_capacity ||
- pending.size.get_encoded_length() > batch_flush_size);
- }
- }
-
- struct evaluation_t {
- record_group_size_t submit_size;
- bool is_full;
- };
- evaluation_t evaluate_submit(
- const record_size_t& rsize,
- extent_len_t block_size) const {
- assert(!needs_flush());
- auto submit_size = pending.size.get_encoded_length_after(
- rsize, block_size);
- bool is_full = submit_size.get_encoded_length() > batch_flush_size;
- return {submit_size, is_full};
- }
-
- void initialize(std::size_t i,
- std::size_t _batch_capacity,
- std::size_t _batch_flush_size) {
- ceph_assert(_batch_capacity > 0);
- index = i;
- batch_capacity = _batch_capacity;
- batch_flush_size = _batch_flush_size;
- pending.reserve(batch_capacity);
- }
-
- // Add to the batch, the future will be resolved after the batch is
- // written.
- //
- // Set write_result_t::write_length to 0 if the record is not the first one
- // in the batch.
- using add_pending_ertr = SegmentAllocator::write_ertr;
- using add_pending_ret = add_pending_ertr::future<record_locator_t>;
- add_pending_ret add_pending(
- const std::string& name,
- record_t&&,
- extent_len_t block_size);
-
- // Encode the batched records for write.
- std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce);
-
- // Set the write result and reset for reuse
- using maybe_result_t = std::optional<write_result_t>;
- void set_result(maybe_result_t maybe_write_end_seq);
-
- // The fast path that is equivalent to submit a single record as a batch.
- //
- // Essentially, equivalent to the combined logic of:
- // add_pending(), encode_batch() and set_result() above without
- // the intervention of the shared io_promise.
- //
- // Note the current RecordBatch can be reused afterwards.
- std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
- record_t&&,
- extent_len_t block_size,
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce);
-
-private:
- record_group_size_t get_encoded_length_after(
- const record_t& record,
- extent_len_t block_size) const {
- return pending.size.get_encoded_length_after(
- record.size, block_size);
- }
-
- state_t state = state_t::EMPTY;
- std::size_t index = 0;
- std::size_t batch_capacity = 0;
- std::size_t batch_flush_size = 0;
-
- record_group_t pending;
- std::size_t submitting_size = 0;
- extent_len_t submitting_length = 0;
- extent_len_t submitting_mdlength = 0;
-
- struct promise_result_t {
- write_result_t write_result;
- extent_len_t mdlength;
- };
- using maybe_promise_result_t = std::optional<promise_result_t>;
- std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
-};
-
-/**
- * RecordSubmitter
- *
- * Submit records concurrently with RecordBatch with SegmentAllocator.
- *
- * Configurations and controls:
- * - io_depth: the io-depth limit to SegmentAllocator;
- * - batch_capacity: the number limit of records in a RecordBatch;
- * - batch_flush_size: the bytes threshold to force flush a RecordBatch to
- * control the maximum latency;
- * - preferred_fullness: the fullness threshold to flush a RecordBatch;
- */
-class RecordSubmitter {
- enum class state_t {
- IDLE = 0, // outstanding_io == 0
- PENDING, // outstanding_io < io_depth_limit
- FULL // outstanding_io == io_depth_limit
- // OVERFLOW: outstanding_io > io_depth_limit is impossible
- };
-
- struct grouped_io_stats {
- uint64_t num_io = 0;
- uint64_t num_io_grouped = 0;
-
- void increment(uint64_t num_grouped_io) {
- ++num_io;
- num_io_grouped += num_grouped_io;
- }
- };
-
- using base_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
-
-public:
- RecordSubmitter(std::size_t io_depth,
- std::size_t batch_capacity,
- std::size_t batch_flush_size,
- double preferred_fullness,
- SegmentAllocator&);
-
- const std::string& get_name() const {
- return segment_allocator.get_name();
- }
-
- journal_seq_t get_committed_to() const {
- return committed_to;
- }
-
- // whether is available to submit a record
- bool is_available() const;
-
- // wait for available if cannot submit, should check is_available() again
- // when the future is resolved.
- using wa_ertr = base_ertr;
- wa_ertr::future<> wait_available();
-
- // when available, check for the submit action
- // according to the pending record size
- enum class action_t {
- ROLL,
- SUBMIT_FULL,
- SUBMIT_NOT_FULL
- };
- action_t check_action(const record_size_t&) const;
-
- // when available, roll the segment if needed
- using roll_segment_ertr = base_ertr;
- roll_segment_ertr::future<> roll_segment();
-
- // when available, submit the record if possible
- using submit_ertr = base_ertr;
- using submit_ret = submit_ertr::future<record_locator_t>;
- submit_ret submit(record_t&&, bool with_atomic_roll_segment=false);
-
- void update_committed_to(const journal_seq_t& new_committed_to) {
- assert(new_committed_to != JOURNAL_SEQ_NULL);
- assert(committed_to == JOURNAL_SEQ_NULL ||
- committed_to <= new_committed_to);
- committed_to = new_committed_to;
- }
-
- // open for write, generate the correct print name, and register metrics
- using open_ertr = base_ertr;
- using open_ret = open_ertr::future<journal_seq_t>;
- open_ret open(bool is_mkfs);
-
- using close_ertr = base_ertr;
- close_ertr::future<> close();
-
-private:
- void update_state();
-
- void increment_io() {
- ++num_outstanding_io;
- stats.io_depth_stats.increment(num_outstanding_io);
- update_state();
- }
-
- void decrement_io_with_flush();
-
- void pop_free_batch() {
- assert(p_current_batch == nullptr);
- assert(!free_batch_ptrs.empty());
- p_current_batch = free_batch_ptrs.front();
- assert(p_current_batch->is_empty());
- assert(p_current_batch == &batches[p_current_batch->get_index()]);
- free_batch_ptrs.pop_front();
- }
-
- void account_submission(std::size_t, const record_group_size_t&);
-
- using maybe_result_t = RecordBatch::maybe_result_t;
- void finish_submit_batch(RecordBatch*, maybe_result_t);
-
- void flush_current_batch();
-
- state_t state = state_t::IDLE;
- std::size_t num_outstanding_io = 0;
- std::size_t io_depth_limit;
- double preferred_fullness;
-
- SegmentAllocator& segment_allocator;
- // committed_to may be in a previous journal segment
- journal_seq_t committed_to = JOURNAL_SEQ_NULL;
-
- std::unique_ptr<RecordBatch[]> batches;
- // should not be nullptr after constructed
- RecordBatch* p_current_batch = nullptr;
- seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
-
- // blocked for rolling or lack of resource
- std::optional<seastar::shared_promise<> > wait_available_promise;
- bool has_io_error = false;
- // when needs flush but io depth is full,
- // wait for decrement_io_with_flush()
- std::optional<seastar::promise<> > wait_unfull_flush_promise;
-
- struct {
- grouped_io_stats record_batch_stats;
- grouped_io_stats io_depth_stats;
- uint64_t record_group_padding_bytes = 0;
- uint64_t record_group_metadata_bytes = 0;
- uint64_t record_group_data_bytes = 0;
- } stats;
- seastar::metrics::metric_group metrics;
-};
-
}
#include "crimson/osd/exceptions.h"
#include "segment_allocator.h"
#include "crimson/os/seastore/segment_seq_allocator.h"
+#include "record_submitter.h"
namespace crimson::os::seastore::journal {
/**