);
}
+RecordBatch::add_pending_ret
+RecordBatch::add_pending(
+ const std::string& name,
+ record_t&& record,
+ extent_len_t block_size)
+{
+ LOG_PREFIX(RecordBatch::add_pending);
+ auto new_size = get_encoded_length_after(record, block_size);
+ auto dlength_offset = pending.size.dlength;
+ TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
+ name,
+ pending.get_size() + 1,
+ new_size.get_encoded_length(),
+ dlength_offset);
+ assert(state != state_t::SUBMITTING);
+ assert(evaluate_submit(record.size, block_size).submit_size == new_size);
+
+ pending.push_back(
+ std::move(record), block_size);
+ assert(pending.size == new_size);
+ if (state == state_t::EMPTY) {
+ assert(!io_promise.has_value());
+ io_promise = seastar::shared_promise<maybe_promise_result_t>();
+ } else {
+ assert(io_promise.has_value());
+ }
+ state = state_t::PENDING;
+
+ return io_promise->get_shared_future(
+ ).then([dlength_offset, FNAME, &name
+ ](auto maybe_promise_result) -> add_pending_ret {
+ if (!maybe_promise_result.has_value()) {
+ ERROR("{} write failed", name);
+ return crimson::ct_error::input_output_error::make();
+ }
+ auto write_result = maybe_promise_result->write_result;
+ auto submit_result = record_locator_t{
+ write_result.start_seq.offset.add_offset(
+ maybe_promise_result->mdlength + dlength_offset),
+ write_result
+ };
+ TRACE("{} write finish with {}", name, submit_result);
+ return add_pending_ret(
+ add_pending_ertr::ready_future_marker{},
+ submit_result);
+ });
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+RecordBatch::encode_batch(
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce)
+{
+ assert(state == state_t::PENDING);
+ assert(pending.get_size() > 0);
+ assert(io_promise.has_value());
+
+ state = state_t::SUBMITTING;
+ submitting_size = pending.get_size();
+ auto gsize = pending.size;
+ submitting_length = gsize.get_encoded_length();
+ submitting_mdlength = gsize.get_mdlength();
+ auto bl = encode_records(pending, committed_to, segment_nonce);
+ // Note: pending is cleared here
+ assert(bl.length() == (std::size_t)submitting_length);
+ return std::make_pair(bl, gsize);
+}
+
+void RecordBatch::set_result(
+ maybe_result_t maybe_write_result)
+{
+ maybe_promise_result_t result;
+ if (maybe_write_result.has_value()) {
+ assert(maybe_write_result->length == submitting_length);
+ result = promise_result_t{
+ *maybe_write_result,
+ submitting_mdlength
+ };
+ }
+ assert(state == state_t::SUBMITTING);
+ assert(io_promise.has_value());
+
+ state = state_t::EMPTY;
+ submitting_size = 0;
+ submitting_length = 0;
+ submitting_mdlength = 0;
+ io_promise->set_value(result);
+ io_promise.reset();
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+RecordBatch::submit_pending_fast(
+ record_t&& record,
+ extent_len_t block_size,
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce)
+{
+ auto new_size = get_encoded_length_after(record, block_size);
+ std::ignore = new_size;
+ assert(state == state_t::EMPTY);
+ assert(evaluate_submit(record.size, block_size).submit_size == new_size);
+
+ auto group = record_group_t(std::move(record), block_size);
+ auto size = group.size;
+ assert(size == new_size);
+ auto bl = encode_records(group, committed_to, segment_nonce);
+ assert(bl.length() == size.get_encoded_length());
+ return std::make_pair(bl, size);
+}
+
+RecordSubmitter::RecordSubmitter(
+ std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ double preferred_fullness,
+ SegmentAllocator& sa)
+ : io_depth_limit{io_depth},
+ preferred_fullness{preferred_fullness},
+ segment_allocator{sa},
+ batches(new RecordBatch[io_depth + 1])
+{
+ LOG_PREFIX(RecordSubmitter);
+ INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
+ "preferred_fullness={}",
+ get_name(), io_depth, batch_capacity,
+ batch_flush_size, preferred_fullness);
+ ceph_assert(io_depth > 0);
+ ceph_assert(batch_capacity > 0);
+ ceph_assert(preferred_fullness >= 0 &&
+ preferred_fullness <= 1);
+ free_batch_ptrs.reserve(io_depth + 1);
+ for (std::size_t i = 0; i <= io_depth; ++i) {
+ batches[i].initialize(i, batch_capacity, batch_flush_size);
+ free_batch_ptrs.push_back(&batches[i]);
+ }
+ pop_free_batch();
+}
+
+bool RecordSubmitter::is_available() const
+{
+ auto ret = !wait_available_promise.has_value() &&
+ !has_io_error;
+#ifndef NDEBUG
+ if (ret) {
+ // invariants when available
+ ceph_assert(segment_allocator.can_write());
+ ceph_assert(p_current_batch != nullptr);
+ ceph_assert(!p_current_batch->is_submitting());
+ ceph_assert(!p_current_batch->needs_flush());
+ if (!p_current_batch->is_empty()) {
+ auto submit_length =
+ p_current_batch->get_submit_size().get_encoded_length();
+ ceph_assert(!segment_allocator.needs_roll(submit_length));
+ }
+ }
+#endif
+ return ret;
+}
+
+RecordSubmitter::wa_ertr::future<>
+RecordSubmitter::wait_available()
+{
+ LOG_PREFIX(RecordSubmitter::wait_available);
+ assert(!is_available());
+ if (has_io_error) {
+ ERROR("{} I/O is failed before wait", get_name());
+ return crimson::ct_error::input_output_error::make();
+ }
+ return wait_available_promise->get_shared_future(
+ ).then([FNAME, this]() -> wa_ertr::future<> {
+ if (has_io_error) {
+ ERROR("{} I/O is failed after wait", get_name());
+ return crimson::ct_error::input_output_error::make();
+ }
+ return wa_ertr::now();
+ });
+}
+
+RecordSubmitter::action_t
+RecordSubmitter::check_action(
+ const record_size_t& rsize) const
+{
+ assert(is_available());
+ auto eval = p_current_batch->evaluate_submit(
+ rsize, segment_allocator.get_block_size());
+ if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
+ return action_t::ROLL;
+ } else if (eval.is_full) {
+ return action_t::SUBMIT_FULL;
+ } else {
+ return action_t::SUBMIT_NOT_FULL;
+ }
+}
+
+RecordSubmitter::roll_segment_ertr::future<>
+RecordSubmitter::roll_segment()
+{
+ LOG_PREFIX(RecordSubmitter::roll_segment);
+ assert(is_available());
+ // #1 block concurrent submissions due to rolling
+ wait_available_promise = seastar::shared_promise<>();
+ assert(!wait_unfull_flush_promise.has_value());
+ return [FNAME, this] {
+ if (p_current_batch->is_pending()) {
+ if (state == state_t::FULL) {
+ DEBUG("{} wait flush ...", get_name());
+ wait_unfull_flush_promise = seastar::promise<>();
+ return wait_unfull_flush_promise->get_future();
+ } else { // IDLE/PENDING
+ DEBUG("{} flush", get_name());
+ flush_current_batch();
+ return seastar::now();
+ }
+ } else {
+ assert(p_current_batch->is_empty());
+ return seastar::now();
+ }
+ }().then_wrapped([FNAME, this](auto fut) {
+ if (fut.failed()) {
+ ERROR("{} rolling is skipped unexpectedly, available", get_name());
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ return roll_segment_ertr::now();
+ } else {
+ // start rolling in background
+ std::ignore = segment_allocator.roll(
+ ).safe_then([FNAME, this] {
+ // good
+ DEBUG("{} rolling done, available", get_name());
+ assert(!has_io_error);
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ }).handle_error(
+ crimson::ct_error::all_same_way([FNAME, this](auto e) {
+ ERROR("{} got error {}, available", get_name(), e);
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ })
+ ).handle_exception([FNAME, this](auto e) {
+ ERROR("{} got exception {}, available", get_name(), e);
+ has_io_error = true;
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ });
+ // wait for background rolling
+ return wait_available();
+ }
+ });
+}
+
+RecordSubmitter::submit_ret
+RecordSubmitter::submit(record_t&& record)
+{
+ LOG_PREFIX(RecordSubmitter::submit);
+ assert(is_available());
+ assert(check_action(record.size) != action_t::ROLL);
+ auto eval = p_current_batch->evaluate_submit(
+ record.size, segment_allocator.get_block_size());
+ bool needs_flush = (
+ state == state_t::IDLE ||
+ eval.submit_size.get_fullness() > preferred_fullness ||
+ // RecordBatch::needs_flush()
+ eval.is_full ||
+ p_current_batch->get_num_records() + 1 >=
+ p_current_batch->get_batch_capacity());
+ if (p_current_batch->is_empty() &&
+ needs_flush &&
+ state != state_t::FULL) {
+ // fast path with direct write
+ increment_io();
+ auto [to_write, sizes] = p_current_batch->submit_pending_fast(
+ std::move(record),
+ segment_allocator.get_block_size(),
+ committed_to,
+ segment_allocator.get_nonce());
+ DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
+ get_name(), sizes, committed_to, num_outstanding_io);
+ account_submission(1, sizes);
+ return segment_allocator.write(to_write
+ ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+ return record_locator_t{
+ write_result.start_seq.offset.add_offset(mdlength),
+ write_result
+ };
+ }).finally([this] {
+ decrement_io_with_flush();
+ });
+ }
+ // indirect batched write
+ auto write_fut = p_current_batch->add_pending(
+ get_name(),
+ std::move(record),
+ segment_allocator.get_block_size());
+ if (needs_flush) {
+ if (state == state_t::FULL) {
+ // #2 block concurrent submissions due to lack of resource
+ DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
+ get_name(),
+ p_current_batch->get_num_records(),
+ num_outstanding_io);
+ wait_available_promise = seastar::shared_promise<>();
+ assert(!wait_unfull_flush_promise.has_value());
+ wait_unfull_flush_promise = seastar::promise<>();
+ // flush and mark available in background
+ std::ignore = wait_unfull_flush_promise->get_future(
+ ).finally([FNAME, this] {
+ DEBUG("{} flush done, available", get_name());
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ });
+ } else {
+ DEBUG("{} added pending, flush", get_name());
+ flush_current_batch();
+ }
+ } else {
+ // will flush later
+ DEBUG("{} added with {} pending, outstanding_io={}",
+ get_name(),
+ p_current_batch->get_num_records(),
+ num_outstanding_io);
+ assert(!p_current_batch->needs_flush());
+ }
+ return write_fut;
+}
+
+void RecordSubmitter::update_state()
+{
+ if (num_outstanding_io == 0) {
+ state = state_t::IDLE;
+ } else if (num_outstanding_io < io_depth_limit) {
+ state = state_t::PENDING;
+ } else if (num_outstanding_io == io_depth_limit) {
+ state = state_t::FULL;
+ } else {
+ ceph_abort("fatal error: io-depth overflow");
+ }
+}
+
+void RecordSubmitter::decrement_io_with_flush()
+{
+ LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
+ assert(num_outstanding_io > 0);
+ auto prv_state = state;
+ --num_outstanding_io;
+ update_state();
+
+ if (prv_state == state_t::FULL) {
+ if (wait_unfull_flush_promise.has_value()) {
+ DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
+ assert(!p_current_batch->is_empty());
+ assert(wait_available_promise.has_value());
+ flush_current_batch();
+ wait_unfull_flush_promise->set_value();
+ wait_unfull_flush_promise.reset();
+ return;
+ }
+ } else {
+ assert(!wait_unfull_flush_promise.has_value());
+ }
+
+ auto needs_flush = (
+ !p_current_batch->is_empty() && (
+ state == state_t::IDLE ||
+ p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
+ p_current_batch->needs_flush()
+ ));
+ if (needs_flush) {
+ DEBUG("{} flush", get_name());
+ flush_current_batch();
+ }
+}
+
+void RecordSubmitter::account_submission(
+ std::size_t num,
+ const record_group_size_t& size)
+{
+ stats.record_group_padding_bytes +=
+ (size.get_mdlength() - size.get_raw_mdlength());
+ stats.record_group_metadata_bytes += size.get_raw_mdlength();
+ stats.record_group_data_bytes += size.dlength;
+}
+
+void RecordSubmitter::finish_submit_batch(
+ RecordBatch* p_batch,
+ maybe_result_t maybe_result)
+{
+ assert(p_batch->is_submitting());
+ p_batch->set_result(maybe_result);
+ free_batch_ptrs.push_back(p_batch);
+ decrement_io_with_flush();
+}
+
+void RecordSubmitter::flush_current_batch()
+{
+ LOG_PREFIX(RecordSubmitter::flush_current_batch);
+ RecordBatch* p_batch = p_current_batch;
+ assert(p_batch->is_pending());
+ p_current_batch = nullptr;
+ pop_free_batch();
+
+ increment_io();
+ auto num = p_batch->get_num_records();
+ auto [to_write, sizes] = p_batch->encode_batch(
+ committed_to, segment_allocator.get_nonce());
+ DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
+ get_name(), num, sizes, committed_to, num_outstanding_io);
+ account_submission(num, sizes);
+ std::ignore = segment_allocator.write(to_write
+ ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+ TRACE("{} {} records, {}, write done with {}",
+ get_name(), num, sizes, write_result);
+ finish_submit_batch(p_batch, write_result);
+ }).handle_error(
+ crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} {} records, {}, got error {}",
+ get_name(), num, sizes, e);
+ finish_submit_batch(p_batch, std::nullopt);
+ })
+ ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ERROR("{} {} records, {}, got exception {}",
+ get_name(), num, sizes, e);
+ finish_submit_batch(p_batch, std::nullopt);
+ });
+}
+
}
#pragma once
+#include <optional>
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/shared_future.hh>
+
#include "include/buffer.h"
#include "crimson/common/errorator.h"
seastore_off_t written_to;
};
+/**
+ * RecordBatch
+ *
+ * Maintain a batch of records for submit.
+ */
+class RecordBatch {
+ enum class state_t {
+ EMPTY = 0,
+ PENDING,
+ SUBMITTING
+ };
+
+public:
+ RecordBatch() = default;
+ RecordBatch(RecordBatch&&) = delete;
+ RecordBatch(const RecordBatch&) = delete;
+ RecordBatch& operator=(RecordBatch&&) = delete;
+ RecordBatch& operator=(const RecordBatch&) = delete;
+
+ bool is_empty() const {
+ return state == state_t::EMPTY;
+ }
+
+ bool is_pending() const {
+ return state == state_t::PENDING;
+ }
+
+ bool is_submitting() const {
+ return state == state_t::SUBMITTING;
+ }
+
+ std::size_t get_index() const {
+ return index;
+ }
+
+ std::size_t get_num_records() const {
+ return pending.get_size();
+ }
+
+ std::size_t get_batch_capacity() const {
+ return batch_capacity;
+ }
+
+ const record_group_size_t& get_submit_size() const {
+ assert(state != state_t::EMPTY);
+ return pending.size;
+ }
+
+ bool needs_flush() const {
+ assert(state != state_t::SUBMITTING);
+ assert(pending.get_size() <= batch_capacity);
+ if (state == state_t::EMPTY) {
+ return false;
+ } else {
+ assert(state == state_t::PENDING);
+ return (pending.get_size() >= batch_capacity ||
+ pending.size.get_encoded_length() > batch_flush_size);
+ }
+ }
+
+ struct evaluation_t {
+ record_group_size_t submit_size;
+ bool is_full;
+ };
+ evaluation_t evaluate_submit(
+ const record_size_t& rsize,
+ extent_len_t block_size) const {
+ assert(!needs_flush());
+ auto submit_size = pending.size.get_encoded_length_after(
+ rsize, block_size);
+ bool is_full = submit_size.get_encoded_length() > batch_flush_size;
+ return {submit_size, is_full};
+ }
+
+ void initialize(std::size_t i,
+ std::size_t _batch_capacity,
+ std::size_t _batch_flush_size) {
+ ceph_assert(_batch_capacity > 0);
+ index = i;
+ batch_capacity = _batch_capacity;
+ batch_flush_size = _batch_flush_size;
+ pending.reserve(batch_capacity);
+ }
+
+ // Add to the batch, the future will be resolved after the batch is
+ // written.
+ //
+ // Set write_result_t::write_length to 0 if the record is not the first one
+ // in the batch.
+ using add_pending_ertr = SegmentAllocator::write_ertr;
+ using add_pending_ret = add_pending_ertr::future<record_locator_t>;
+ add_pending_ret add_pending(
+ const std::string& name,
+ record_t&&,
+ extent_len_t block_size);
+
+ // Encode the batched records for write.
+ std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce);
+
+ // Set the write result and reset for reuse
+ using maybe_result_t = std::optional<write_result_t>;
+ void set_result(maybe_result_t maybe_write_end_seq);
+
+ // The fast path that is equivalent to submit a single record as a batch.
+ //
+ // Essentially, equivalent to the combined logic of:
+ // add_pending(), encode_batch() and set_result() above without
+ // the intervention of the shared io_promise.
+ //
+ // Note the current RecordBatch can be reused afterwards.
+ std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
+ record_t&&,
+ extent_len_t block_size,
+ const journal_seq_t& committed_to,
+ segment_nonce_t segment_nonce);
+
+private:
+ record_group_size_t get_encoded_length_after(
+ const record_t& record,
+ extent_len_t block_size) const {
+ return pending.size.get_encoded_length_after(
+ record.size, block_size);
+ }
+
+ state_t state = state_t::EMPTY;
+ std::size_t index = 0;
+ std::size_t batch_capacity = 0;
+ std::size_t batch_flush_size = 0;
+
+ record_group_t pending;
+ std::size_t submitting_size = 0;
+ seastore_off_t submitting_length = 0;
+ seastore_off_t submitting_mdlength = 0;
+
+ struct promise_result_t {
+ write_result_t write_result;
+ seastore_off_t mdlength;
+ };
+ using maybe_promise_result_t = std::optional<promise_result_t>;
+ std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
+};
+
+/**
+ * RecordSubmitter
+ *
+ * Submit records concurrently with RecordBatch with SegmentAllocator.
+ *
+ * Configurations and controls:
+ * - io_depth: the io-depth limit to SegmentAllocator;
+ * - batch_capacity: the number limit of records in a RecordBatch;
+ * - batch_flush_size: the bytes threshold to force flush a RecordBatch to
+ * control the maximum latency;
+ * - preferred_fullness: the fullness threshold to flush a RecordBatch;
+ */
+class RecordSubmitter {
+ enum class state_t {
+ IDLE = 0, // outstanding_io == 0
+ PENDING, // outstanding_io < io_depth_limit
+ FULL // outstanding_io == io_depth_limit
+ // OVERFLOW: outstanding_io > io_depth_limit is impossible
+ };
+
+ struct grouped_io_stats {
+ uint64_t num_io = 0;
+ uint64_t num_io_grouped = 0;
+
+ void increment(uint64_t num_grouped_io) {
+ ++num_io;
+ num_io_grouped += num_grouped_io;
+ }
+ };
+
+ using base_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+
+public:
+ RecordSubmitter(std::size_t io_depth,
+ std::size_t batch_capacity,
+ std::size_t batch_flush_size,
+ double preferred_fullness,
+ SegmentAllocator&);
+
+ const std::string& get_name() const {
+ return segment_allocator.get_name();
+ }
+
+ grouped_io_stats get_record_batch_stats() const {
+ return stats.record_batch_stats;
+ }
+
+ grouped_io_stats get_io_depth_stats() const {
+ return stats.io_depth_stats;
+ }
+
+ uint64_t get_record_group_padding_bytes() const {
+ return stats.record_group_padding_bytes;
+ }
+
+ uint64_t get_record_group_metadata_bytes() const {
+ return stats.record_group_metadata_bytes;
+ }
+
+ uint64_t get_record_group_data_bytes() const {
+ return stats.record_group_data_bytes;
+ }
+
+ journal_seq_t get_committed_to() const {
+ return committed_to;
+ }
+
+ void reset_stats() {
+ stats = {};
+ }
+
+ // whether is available to submit a record
+ bool is_available() const;
+
+ // wait for available if cannot submit, should check is_available() again
+ // when the future is resolved.
+ using wa_ertr = base_ertr;
+ wa_ertr::future<> wait_available();
+
+ // when available, check for the submit action
+ // according to the pending record size
+ enum class action_t {
+ ROLL,
+ SUBMIT_FULL,
+ SUBMIT_NOT_FULL
+ };
+ action_t check_action(const record_size_t&) const;
+
+ // when available, roll the segment if needed
+ using roll_segment_ertr = base_ertr;
+ roll_segment_ertr::future<> roll_segment();
+
+ // when available, submit the record if possible
+ using submit_ertr = base_ertr;
+ using submit_ret = submit_ertr::future<record_locator_t>;
+ submit_ret submit(record_t&&);
+
+ void update_committed_to(const journal_seq_t& new_committed_to) {
+ assert(new_committed_to != JOURNAL_SEQ_NULL);
+ assert(committed_to == JOURNAL_SEQ_NULL ||
+ committed_to <= new_committed_to);
+ committed_to = new_committed_to;
+ }
+
+private:
+ void update_state();
+
+ void increment_io() {
+ ++num_outstanding_io;
+ stats.io_depth_stats.increment(num_outstanding_io);
+ update_state();
+ }
+
+ void decrement_io_with_flush();
+
+ void pop_free_batch() {
+ assert(p_current_batch == nullptr);
+ assert(!free_batch_ptrs.empty());
+ p_current_batch = free_batch_ptrs.front();
+ assert(p_current_batch->is_empty());
+ assert(p_current_batch == &batches[p_current_batch->get_index()]);
+ free_batch_ptrs.pop_front();
+ }
+
+ void account_submission(std::size_t, const record_group_size_t&);
+
+ using maybe_result_t = RecordBatch::maybe_result_t;
+ void finish_submit_batch(RecordBatch*, maybe_result_t);
+
+ void flush_current_batch();
+
+ state_t state = state_t::IDLE;
+ std::size_t num_outstanding_io = 0;
+ std::size_t io_depth_limit;
+ double preferred_fullness;
+
+ SegmentAllocator& segment_allocator;
+ // committed_to may be in a previous journal segment
+ journal_seq_t committed_to = JOURNAL_SEQ_NULL;
+
+ std::unique_ptr<RecordBatch[]> batches;
+ // should not be nullptr after constructed
+ RecordBatch* p_current_batch = nullptr;
+ seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
+
+ // blocked for rolling or lack of resource
+ std::optional<seastar::shared_promise<> > wait_available_promise;
+ bool has_io_error = false;
+ // when needs flush but io depth is full,
+ // wait for decrement_io_with_flush()
+ std::optional<seastar::promise<> > wait_unfull_flush_promise;
+
+ struct {
+ grouped_io_stats record_batch_stats;
+ grouped_io_stats io_depth_stats;
+ uint64_t record_group_padding_bytes = 0;
+ uint64_t record_group_metadata_bytes = 0;
+ uint64_t record_group_data_bytes = 0;
+ } stats;
+};
+
}
);
}
-SegmentedJournal::RecordBatch::add_pending_ret
-SegmentedJournal::RecordBatch::add_pending(
- const std::string& name,
- record_t&& record,
- extent_len_t block_size)
-{
- LOG_PREFIX(RecordBatch::add_pending);
- auto new_size = get_encoded_length_after(record, block_size);
- auto dlength_offset = pending.size.dlength;
- TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
- name,
- pending.get_size() + 1,
- new_size.get_encoded_length(),
- dlength_offset);
- assert(state != state_t::SUBMITTING);
- assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
- pending.push_back(
- std::move(record), block_size);
- assert(pending.size == new_size);
- if (state == state_t::EMPTY) {
- assert(!io_promise.has_value());
- io_promise = seastar::shared_promise<maybe_promise_result_t>();
- } else {
- assert(io_promise.has_value());
- }
- state = state_t::PENDING;
-
- return io_promise->get_shared_future(
- ).then([dlength_offset, FNAME, &name
- ](auto maybe_promise_result) -> add_pending_ret {
- if (!maybe_promise_result.has_value()) {
- ERROR("{} write failed", name);
- return crimson::ct_error::input_output_error::make();
- }
- auto write_result = maybe_promise_result->write_result;
- auto submit_result = record_locator_t{
- write_result.start_seq.offset.add_offset(
- maybe_promise_result->mdlength + dlength_offset),
- write_result
- };
- TRACE("{} write finish with {}", name, submit_result);
- return add_pending_ret(
- add_pending_ertr::ready_future_marker{},
- submit_result);
- });
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-SegmentedJournal::RecordBatch::encode_batch(
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce)
-{
- assert(state == state_t::PENDING);
- assert(pending.get_size() > 0);
- assert(io_promise.has_value());
-
- state = state_t::SUBMITTING;
- submitting_size = pending.get_size();
- auto gsize = pending.size;
- submitting_length = gsize.get_encoded_length();
- submitting_mdlength = gsize.get_mdlength();
- auto bl = encode_records(pending, committed_to, segment_nonce);
- // Note: pending is cleared here
- assert(bl.length() == (std::size_t)submitting_length);
- return std::make_pair(bl, gsize);
-}
-
-void SegmentedJournal::RecordBatch::set_result(
- maybe_result_t maybe_write_result)
-{
- maybe_promise_result_t result;
- if (maybe_write_result.has_value()) {
- assert(maybe_write_result->length == submitting_length);
- result = promise_result_t{
- *maybe_write_result,
- submitting_mdlength
- };
- }
- assert(state == state_t::SUBMITTING);
- assert(io_promise.has_value());
-
- state = state_t::EMPTY;
- submitting_size = 0;
- submitting_length = 0;
- submitting_mdlength = 0;
- io_promise->set_value(result);
- io_promise.reset();
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-SegmentedJournal::RecordBatch::submit_pending_fast(
- record_t&& record,
- extent_len_t block_size,
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce)
-{
- auto new_size = get_encoded_length_after(record, block_size);
- std::ignore = new_size;
- assert(state == state_t::EMPTY);
- assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
- auto group = record_group_t(std::move(record), block_size);
- auto size = group.size;
- assert(size == new_size);
- auto bl = encode_records(group, committed_to, segment_nonce);
- assert(bl.length() == size.get_encoded_length());
- return std::make_pair(bl, size);
-}
-
-SegmentedJournal::RecordSubmitter::RecordSubmitter(
- std::size_t io_depth,
- std::size_t batch_capacity,
- std::size_t batch_flush_size,
- double preferred_fullness,
- SegmentAllocator& sa)
- : io_depth_limit{io_depth},
- preferred_fullness{preferred_fullness},
- segment_allocator{sa},
- batches(new RecordBatch[io_depth + 1])
-{
- LOG_PREFIX(RecordSubmitter);
- INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
- "preferred_fullness={}",
- get_name(), io_depth, batch_capacity,
- batch_flush_size, preferred_fullness);
- ceph_assert(io_depth > 0);
- ceph_assert(batch_capacity > 0);
- ceph_assert(preferred_fullness >= 0 &&
- preferred_fullness <= 1);
- free_batch_ptrs.reserve(io_depth + 1);
- for (std::size_t i = 0; i <= io_depth; ++i) {
- batches[i].initialize(i, batch_capacity, batch_flush_size);
- free_batch_ptrs.push_back(&batches[i]);
- }
- pop_free_batch();
-}
-
-bool SegmentedJournal::RecordSubmitter::is_available() const
-{
- auto ret = !wait_available_promise.has_value() &&
- !has_io_error;
-#ifndef NDEBUG
- if (ret) {
- // invariants when available
- ceph_assert(segment_allocator.can_write());
- ceph_assert(p_current_batch != nullptr);
- ceph_assert(!p_current_batch->is_submitting());
- ceph_assert(!p_current_batch->needs_flush());
- if (!p_current_batch->is_empty()) {
- auto submit_length =
- p_current_batch->get_submit_size().get_encoded_length();
- ceph_assert(!segment_allocator.needs_roll(submit_length));
- }
- }
-#endif
- return ret;
-}
-
-SegmentedJournal::RecordSubmitter::wa_ertr::future<>
-SegmentedJournal::RecordSubmitter::wait_available()
-{
- LOG_PREFIX(RecordSubmitter::wait_available);
- assert(!is_available());
- if (has_io_error) {
- ERROR("{} I/O is failed before wait", get_name());
- return crimson::ct_error::input_output_error::make();
- }
- return wait_available_promise->get_shared_future(
- ).then([FNAME, this]() -> wa_ertr::future<> {
- if (has_io_error) {
- ERROR("{} I/O is failed after wait", get_name());
- return crimson::ct_error::input_output_error::make();
- }
- return wa_ertr::now();
- });
-}
-
-SegmentedJournal::RecordSubmitter::action_t
-SegmentedJournal::RecordSubmitter::check_action(
- const record_size_t& rsize) const
-{
- assert(is_available());
- auto eval = p_current_batch->evaluate_submit(
- rsize, segment_allocator.get_block_size());
- if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
- return action_t::ROLL;
- } else if (eval.is_full) {
- return action_t::SUBMIT_FULL;
- } else {
- return action_t::SUBMIT_NOT_FULL;
- }
-}
-
-SegmentedJournal::RecordSubmitter::roll_segment_ertr::future<>
-SegmentedJournal::RecordSubmitter::roll_segment()
-{
- LOG_PREFIX(RecordSubmitter::roll_segment);
- assert(is_available());
- // #1 block concurrent submissions due to rolling
- wait_available_promise = seastar::shared_promise<>();
- assert(!wait_unfull_flush_promise.has_value());
- return [FNAME, this] {
- if (p_current_batch->is_pending()) {
- if (state == state_t::FULL) {
- DEBUG("{} wait flush ...", get_name());
- wait_unfull_flush_promise = seastar::promise<>();
- return wait_unfull_flush_promise->get_future();
- } else { // IDLE/PENDING
- DEBUG("{} flush", get_name());
- flush_current_batch();
- return seastar::now();
- }
- } else {
- assert(p_current_batch->is_empty());
- return seastar::now();
- }
- }().then_wrapped([FNAME, this](auto fut) {
- if (fut.failed()) {
- ERROR("{} rolling is skipped unexpectedly, available", get_name());
- has_io_error = true;
- wait_available_promise->set_value();
- wait_available_promise.reset();
- return roll_segment_ertr::now();
- } else {
- // start rolling in background
- std::ignore = segment_allocator.roll(
- ).safe_then([FNAME, this] {
- // good
- DEBUG("{} rolling done, available", get_name());
- assert(!has_io_error);
- wait_available_promise->set_value();
- wait_available_promise.reset();
- }).handle_error(
- crimson::ct_error::all_same_way([FNAME, this](auto e) {
- ERROR("{} got error {}, available", get_name(), e);
- has_io_error = true;
- wait_available_promise->set_value();
- wait_available_promise.reset();
- })
- ).handle_exception([FNAME, this](auto e) {
- ERROR("{} got exception {}, available", get_name(), e);
- has_io_error = true;
- wait_available_promise->set_value();
- wait_available_promise.reset();
- });
- // wait for background rolling
- return wait_available();
- }
- });
-}
-
-SegmentedJournal::RecordSubmitter::submit_ret
-SegmentedJournal::RecordSubmitter::submit(record_t&& record)
-{
- LOG_PREFIX(RecordSubmitter::submit);
- assert(is_available());
- assert(check_action(record.size) != action_t::ROLL);
- auto eval = p_current_batch->evaluate_submit(
- record.size, segment_allocator.get_block_size());
- bool needs_flush = (
- state == state_t::IDLE ||
- eval.submit_size.get_fullness() > preferred_fullness ||
- // RecordBatch::needs_flush()
- eval.is_full ||
- p_current_batch->get_num_records() + 1 >=
- p_current_batch->get_batch_capacity());
- if (p_current_batch->is_empty() &&
- needs_flush &&
- state != state_t::FULL) {
- // fast path with direct write
- increment_io();
- auto [to_write, sizes] = p_current_batch->submit_pending_fast(
- std::move(record),
- segment_allocator.get_block_size(),
- committed_to,
- segment_allocator.get_nonce());
- DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
- get_name(), sizes, committed_to, num_outstanding_io);
- account_submission(1, sizes);
- return segment_allocator.write(to_write
- ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
- return record_locator_t{
- write_result.start_seq.offset.add_offset(mdlength),
- write_result
- };
- }).finally([this] {
- decrement_io_with_flush();
- });
- }
- // indirect batched write
- auto write_fut = p_current_batch->add_pending(
- get_name(),
- std::move(record),
- segment_allocator.get_block_size());
- if (needs_flush) {
- if (state == state_t::FULL) {
- // #2 block concurrent submissions due to lack of resource
- DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
- get_name(),
- p_current_batch->get_num_records(),
- num_outstanding_io);
- wait_available_promise = seastar::shared_promise<>();
- assert(!wait_unfull_flush_promise.has_value());
- wait_unfull_flush_promise = seastar::promise<>();
- // flush and mark available in background
- std::ignore = wait_unfull_flush_promise->get_future(
- ).finally([FNAME, this] {
- DEBUG("{} flush done, available", get_name());
- wait_available_promise->set_value();
- wait_available_promise.reset();
- });
- } else {
- DEBUG("{} added pending, flush", get_name());
- flush_current_batch();
- }
- } else {
- // will flush later
- DEBUG("{} added with {} pending, outstanding_io={}",
- get_name(),
- p_current_batch->get_num_records(),
- num_outstanding_io);
- assert(!p_current_batch->needs_flush());
- }
- return write_fut;
-}
-
-void SegmentedJournal::RecordSubmitter::update_state()
-{
- if (num_outstanding_io == 0) {
- state = state_t::IDLE;
- } else if (num_outstanding_io < io_depth_limit) {
- state = state_t::PENDING;
- } else if (num_outstanding_io == io_depth_limit) {
- state = state_t::FULL;
- } else {
- ceph_abort("fatal error: io-depth overflow");
- }
-}
-
-void SegmentedJournal::RecordSubmitter::decrement_io_with_flush()
-{
- LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
- assert(num_outstanding_io > 0);
- auto prv_state = state;
- --num_outstanding_io;
- update_state();
-
- if (prv_state == state_t::FULL) {
- if (wait_unfull_flush_promise.has_value()) {
- DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
- assert(!p_current_batch->is_empty());
- assert(wait_available_promise.has_value());
- flush_current_batch();
- wait_unfull_flush_promise->set_value();
- wait_unfull_flush_promise.reset();
- return;
- }
- } else {
- assert(!wait_unfull_flush_promise.has_value());
- }
-
- auto needs_flush = (
- !p_current_batch->is_empty() && (
- state == state_t::IDLE ||
- p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
- p_current_batch->needs_flush()
- ));
- if (needs_flush) {
- DEBUG("{} flush", get_name());
- flush_current_batch();
- }
-}
-
-void SegmentedJournal::RecordSubmitter::account_submission(
- std::size_t num,
- const record_group_size_t& size)
-{
- stats.record_group_padding_bytes +=
- (size.get_mdlength() - size.get_raw_mdlength());
- stats.record_group_metadata_bytes += size.get_raw_mdlength();
- stats.record_group_data_bytes += size.dlength;
-}
-
-void SegmentedJournal::RecordSubmitter::finish_submit_batch(
- RecordBatch* p_batch,
- maybe_result_t maybe_result)
-{
- assert(p_batch->is_submitting());
- p_batch->set_result(maybe_result);
- free_batch_ptrs.push_back(p_batch);
- decrement_io_with_flush();
-}
-
-void SegmentedJournal::RecordSubmitter::flush_current_batch()
-{
- LOG_PREFIX(RecordSubmitter::flush_current_batch);
- RecordBatch* p_batch = p_current_batch;
- assert(p_batch->is_pending());
- p_current_batch = nullptr;
- pop_free_batch();
-
- increment_io();
- auto num = p_batch->get_num_records();
- auto [to_write, sizes] = p_batch->encode_batch(
- committed_to, segment_allocator.get_nonce());
- DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
- get_name(), num, sizes, committed_to, num_outstanding_io);
- account_submission(num, sizes);
- std::ignore = segment_allocator.write(to_write
- ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
- TRACE("{} {} records, {}, write done with {}",
- get_name(), num, sizes, write_result);
- finish_submit_batch(p_batch, write_result);
- }).handle_error(
- crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} {} records, {}, got error {}",
- get_name(), num, sizes, e);
- finish_submit_batch(p_batch, std::nullopt);
- })
- ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
- ERROR("{} {} records, {}, got exception {}",
- get_name(), num, sizes, e);
- finish_submit_batch(p_batch, std::nullopt);
- });
-}
-
}
#pragma once
-#include <boost/intrusive_ptr.hpp>
-#include <optional>
-
-#include <seastar/core/circular_buffer.hh>
#include <seastar/core/future.hh>
#include <seastar/core/metrics.hh>
-#include <seastar/core/shared_future.hh>
#include "include/ceph_assert.h"
#include "include/buffer.h"
OrderingHandle &handle
);
- class RecordBatch {
- enum class state_t {
- EMPTY = 0,
- PENDING,
- SUBMITTING
- };
-
- public:
- RecordBatch() = default;
- RecordBatch(RecordBatch&&) = delete;
- RecordBatch(const RecordBatch&) = delete;
- RecordBatch& operator=(RecordBatch&&) = delete;
- RecordBatch& operator=(const RecordBatch&) = delete;
-
- bool is_empty() const {
- return state == state_t::EMPTY;
- }
-
- bool is_pending() const {
- return state == state_t::PENDING;
- }
-
- bool is_submitting() const {
- return state == state_t::SUBMITTING;
- }
-
- std::size_t get_index() const {
- return index;
- }
-
- std::size_t get_num_records() const {
- return pending.get_size();
- }
-
- std::size_t get_batch_capacity() const {
- return batch_capacity;
- }
-
- const record_group_size_t& get_submit_size() const {
- assert(state != state_t::EMPTY);
- return pending.size;
- }
-
- bool needs_flush() const {
- assert(state != state_t::SUBMITTING);
- assert(pending.get_size() <= batch_capacity);
- if (state == state_t::EMPTY) {
- return false;
- } else {
- assert(state == state_t::PENDING);
- return (pending.get_size() >= batch_capacity ||
- pending.size.get_encoded_length() > batch_flush_size);
- }
- }
-
- struct evaluation_t {
- record_group_size_t submit_size;
- bool is_full;
- };
- evaluation_t evaluate_submit(
- const record_size_t& rsize,
- extent_len_t block_size) const {
- assert(!needs_flush());
- auto submit_size = pending.size.get_encoded_length_after(
- rsize, block_size);
- bool is_full = submit_size.get_encoded_length() > batch_flush_size;
- return {submit_size, is_full};
- }
-
- void initialize(std::size_t i,
- std::size_t _batch_capacity,
- std::size_t _batch_flush_size) {
- ceph_assert(_batch_capacity > 0);
- index = i;
- batch_capacity = _batch_capacity;
- batch_flush_size = _batch_flush_size;
- pending.reserve(batch_capacity);
- }
-
- // Add to the batch, the future will be resolved after the batch is
- // written.
- //
- // Set write_result_t::write_length to 0 if the record is not the first one
- // in the batch.
- using add_pending_ertr = SegmentAllocator::write_ertr;
- using add_pending_ret = add_pending_ertr::future<record_locator_t>;
- add_pending_ret add_pending(
- const std::string& name,
- record_t&&,
- extent_len_t block_size);
-
- // Encode the batched records for write.
- std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce);
-
- // Set the write result and reset for reuse
- using maybe_result_t = std::optional<write_result_t>;
- void set_result(maybe_result_t maybe_write_end_seq);
-
- // The fast path that is equivalent to submit a single record as a batch.
- //
- // Essentially, equivalent to the combined logic of:
- // add_pending(), encode_batch() and set_result() above without
- // the intervention of the shared io_promise.
- //
- // Note the current RecordBatch can be reused afterwards.
- std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
- record_t&&,
- extent_len_t block_size,
- const journal_seq_t& committed_to,
- segment_nonce_t segment_nonce);
-
- private:
- record_group_size_t get_encoded_length_after(
- const record_t& record,
- extent_len_t block_size) const {
- return pending.size.get_encoded_length_after(
- record.size, block_size);
- }
-
- state_t state = state_t::EMPTY;
- std::size_t index = 0;
- std::size_t batch_capacity = 0;
- std::size_t batch_flush_size = 0;
-
- record_group_t pending;
- std::size_t submitting_size = 0;
- seastore_off_t submitting_length = 0;
- seastore_off_t submitting_mdlength = 0;
-
- struct promise_result_t {
- write_result_t write_result;
- seastore_off_t mdlength;
- };
- using maybe_promise_result_t = std::optional<promise_result_t>;
- std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
- };
-
- class RecordSubmitter {
- enum class state_t {
- IDLE = 0, // outstanding_io == 0
- PENDING, // outstanding_io < io_depth_limit
- FULL // outstanding_io == io_depth_limit
- // OVERFLOW: outstanding_io > io_depth_limit is impossible
- };
-
- struct grouped_io_stats {
- uint64_t num_io = 0;
- uint64_t num_io_grouped = 0;
-
- void increment(uint64_t num_grouped_io) {
- ++num_io;
- num_io_grouped += num_grouped_io;
- }
- };
-
- using base_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
-
- public:
- RecordSubmitter(std::size_t io_depth,
- std::size_t batch_capacity,
- std::size_t batch_flush_size,
- double preferred_fullness,
- SegmentAllocator&);
-
- const std::string& get_name() const {
- return segment_allocator.get_name();
- }
-
- grouped_io_stats get_record_batch_stats() const {
- return stats.record_batch_stats;
- }
-
- grouped_io_stats get_io_depth_stats() const {
- return stats.io_depth_stats;
- }
-
- uint64_t get_record_group_padding_bytes() const {
- return stats.record_group_padding_bytes;
- }
-
- uint64_t get_record_group_metadata_bytes() const {
- return stats.record_group_metadata_bytes;
- }
-
- uint64_t get_record_group_data_bytes() const {
- return stats.record_group_data_bytes;
- }
-
- journal_seq_t get_committed_to() const {
- return committed_to;
- }
-
- void reset_stats() {
- stats = {};
- }
-
- // whether is available to submit a record
- bool is_available() const;
-
- // wait for available if cannot submit, should check is_available() again
- // when the future is resolved.
- using wa_ertr = base_ertr;
- wa_ertr::future<> wait_available();
-
- // when available, check for the submit action
- // according to the pending record size
- enum class action_t {
- ROLL,
- SUBMIT_FULL,
- SUBMIT_NOT_FULL
- };
- action_t check_action(const record_size_t&) const;
-
- // when available, roll the segment if needed
- using roll_segment_ertr = base_ertr;
- roll_segment_ertr::future<> roll_segment();
-
- // when available, submit the record if possible
- using submit_ertr = base_ertr;
- using submit_ret = submit_ertr::future<record_locator_t>;
- submit_ret submit(record_t&&);
-
- void update_committed_to(const journal_seq_t& new_committed_to) {
- assert(new_committed_to != JOURNAL_SEQ_NULL);
- assert(committed_to == JOURNAL_SEQ_NULL ||
- committed_to <= new_committed_to);
- committed_to = new_committed_to;
- }
-
- private:
- void update_state();
-
- void increment_io() {
- ++num_outstanding_io;
- stats.io_depth_stats.increment(num_outstanding_io);
- update_state();
- }
-
- void decrement_io_with_flush();
-
- void pop_free_batch() {
- assert(p_current_batch == nullptr);
- assert(!free_batch_ptrs.empty());
- p_current_batch = free_batch_ptrs.front();
- assert(p_current_batch->is_empty());
- assert(p_current_batch == &batches[p_current_batch->get_index()]);
- free_batch_ptrs.pop_front();
- }
-
- void account_submission(std::size_t, const record_group_size_t&);
-
- using maybe_result_t = RecordBatch::maybe_result_t;
- void finish_submit_batch(RecordBatch*, maybe_result_t);
-
- void flush_current_batch();
-
- state_t state = state_t::IDLE;
- std::size_t num_outstanding_io = 0;
- std::size_t io_depth_limit;
- double preferred_fullness;
-
- SegmentAllocator& segment_allocator;
- // committed_to may be in a previous journal segment
- journal_seq_t committed_to = JOURNAL_SEQ_NULL;
-
- std::unique_ptr<RecordBatch[]> batches;
- // should not be nullptr after constructed
- RecordBatch* p_current_batch = nullptr;
- seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
-
- // blocked for rolling or lack of resource
- std::optional<seastar::shared_promise<> > wait_available_promise;
- bool has_io_error = false;
- // when needs flush but io depth is full,
- // wait for decrement_io_with_flush()
- std::optional<seastar::promise<> > wait_unfull_flush_promise;
-
- struct {
- grouped_io_stats record_batch_stats;
- grouped_io_stats io_depth_stats;
- uint64_t record_group_padding_bytes = 0;
- uint64_t record_group_metadata_bytes = 0;
- uint64_t record_group_data_bytes = 0;
- } stats;
- };
-
SegmentProvider& segment_provider;
SegmentAllocator journal_segment_allocator;
RecordSubmitter record_submitter;