From c54bbe3e0f604d48b2740389e091eff854ceb87f Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 9 Mar 2022 22:57:26 +0800 Subject: [PATCH] crimson/os/seastore: move RecordSubmitter to segment_allocator.h Signed-off-by: Yingxin Cheng --- .../os/seastore/journal/segment_allocator.cc | 427 ++++++++++++++++++ .../os/seastore/journal/segment_allocator.h | 310 +++++++++++++ .../os/seastore/journal/segmented_journal.cc | 427 ------------------ .../os/seastore/journal/segmented_journal.h | 294 ------------ 4 files changed, 737 insertions(+), 721 deletions(-) diff --git a/src/crimson/os/seastore/journal/segment_allocator.cc b/src/crimson/os/seastore/journal/segment_allocator.cc index a8146b4e462..01ac3e01c93 100644 --- a/src/crimson/os/seastore/journal/segment_allocator.cc +++ b/src/crimson/os/seastore/journal/segment_allocator.cc @@ -260,4 +260,431 @@ SegmentAllocator::close_segment(bool is_rolling) ); } +RecordBatch::add_pending_ret +RecordBatch::add_pending( + const std::string& name, + record_t&& record, + extent_len_t block_size) +{ + LOG_PREFIX(RecordBatch::add_pending); + auto new_size = get_encoded_length_after(record, block_size); + auto dlength_offset = pending.size.dlength; + TRACE("{} batches={}, write_size={}, dlength_offset={} ...", + name, + pending.get_size() + 1, + new_size.get_encoded_length(), + dlength_offset); + assert(state != state_t::SUBMITTING); + assert(evaluate_submit(record.size, block_size).submit_size == new_size); + + pending.push_back( + std::move(record), block_size); + assert(pending.size == new_size); + if (state == state_t::EMPTY) { + assert(!io_promise.has_value()); + io_promise = seastar::shared_promise(); + } else { + assert(io_promise.has_value()); + } + state = state_t::PENDING; + + return io_promise->get_shared_future( + ).then([dlength_offset, FNAME, &name + ](auto maybe_promise_result) -> add_pending_ret { + if (!maybe_promise_result.has_value()) { + ERROR("{} write failed", name); + return crimson::ct_error::input_output_error::make(); + } + auto write_result = maybe_promise_result->write_result; + auto submit_result = record_locator_t{ + write_result.start_seq.offset.add_offset( + maybe_promise_result->mdlength + dlength_offset), + write_result + }; + TRACE("{} write finish with {}", name, submit_result); + return add_pending_ret( + add_pending_ertr::ready_future_marker{}, + submit_result); + }); +} + +std::pair +RecordBatch::encode_batch( + const journal_seq_t& committed_to, + segment_nonce_t segment_nonce) +{ + assert(state == state_t::PENDING); + assert(pending.get_size() > 0); + assert(io_promise.has_value()); + + state = state_t::SUBMITTING; + submitting_size = pending.get_size(); + auto gsize = pending.size; + submitting_length = gsize.get_encoded_length(); + submitting_mdlength = gsize.get_mdlength(); + auto bl = encode_records(pending, committed_to, segment_nonce); + // Note: pending is cleared here + assert(bl.length() == (std::size_t)submitting_length); + return std::make_pair(bl, gsize); +} + +void RecordBatch::set_result( + maybe_result_t maybe_write_result) +{ + maybe_promise_result_t result; + if (maybe_write_result.has_value()) { + assert(maybe_write_result->length == submitting_length); + result = promise_result_t{ + *maybe_write_result, + submitting_mdlength + }; + } + assert(state == state_t::SUBMITTING); + assert(io_promise.has_value()); + + state = state_t::EMPTY; + submitting_size = 0; + submitting_length = 0; + submitting_mdlength = 0; + io_promise->set_value(result); + io_promise.reset(); +} + +std::pair +RecordBatch::submit_pending_fast( + record_t&& record, + extent_len_t block_size, + const journal_seq_t& committed_to, + segment_nonce_t segment_nonce) +{ + auto new_size = get_encoded_length_after(record, block_size); + std::ignore = new_size; + assert(state == state_t::EMPTY); + assert(evaluate_submit(record.size, block_size).submit_size == new_size); + + auto group = record_group_t(std::move(record), block_size); + auto size = group.size; + assert(size == new_size); + auto bl = encode_records(group, committed_to, segment_nonce); + assert(bl.length() == size.get_encoded_length()); + return std::make_pair(bl, size); +} + +RecordSubmitter::RecordSubmitter( + std::size_t io_depth, + std::size_t batch_capacity, + std::size_t batch_flush_size, + double preferred_fullness, + SegmentAllocator& sa) + : io_depth_limit{io_depth}, + preferred_fullness{preferred_fullness}, + segment_allocator{sa}, + batches(new RecordBatch[io_depth + 1]) +{ + LOG_PREFIX(RecordSubmitter); + INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, " + "preferred_fullness={}", + get_name(), io_depth, batch_capacity, + batch_flush_size, preferred_fullness); + ceph_assert(io_depth > 0); + ceph_assert(batch_capacity > 0); + ceph_assert(preferred_fullness >= 0 && + preferred_fullness <= 1); + free_batch_ptrs.reserve(io_depth + 1); + for (std::size_t i = 0; i <= io_depth; ++i) { + batches[i].initialize(i, batch_capacity, batch_flush_size); + free_batch_ptrs.push_back(&batches[i]); + } + pop_free_batch(); +} + +bool RecordSubmitter::is_available() const +{ + auto ret = !wait_available_promise.has_value() && + !has_io_error; +#ifndef NDEBUG + if (ret) { + // invariants when available + ceph_assert(segment_allocator.can_write()); + ceph_assert(p_current_batch != nullptr); + ceph_assert(!p_current_batch->is_submitting()); + ceph_assert(!p_current_batch->needs_flush()); + if (!p_current_batch->is_empty()) { + auto submit_length = + p_current_batch->get_submit_size().get_encoded_length(); + ceph_assert(!segment_allocator.needs_roll(submit_length)); + } + } +#endif + return ret; +} + +RecordSubmitter::wa_ertr::future<> +RecordSubmitter::wait_available() +{ + LOG_PREFIX(RecordSubmitter::wait_available); + assert(!is_available()); + if (has_io_error) { + ERROR("{} I/O is failed before wait", get_name()); + return crimson::ct_error::input_output_error::make(); + } + return wait_available_promise->get_shared_future( + ).then([FNAME, this]() -> wa_ertr::future<> { + if (has_io_error) { + ERROR("{} I/O is failed after wait", get_name()); + return crimson::ct_error::input_output_error::make(); + } + return wa_ertr::now(); + }); +} + +RecordSubmitter::action_t +RecordSubmitter::check_action( + const record_size_t& rsize) const +{ + assert(is_available()); + auto eval = p_current_batch->evaluate_submit( + rsize, segment_allocator.get_block_size()); + if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) { + return action_t::ROLL; + } else if (eval.is_full) { + return action_t::SUBMIT_FULL; + } else { + return action_t::SUBMIT_NOT_FULL; + } +} + +RecordSubmitter::roll_segment_ertr::future<> +RecordSubmitter::roll_segment() +{ + LOG_PREFIX(RecordSubmitter::roll_segment); + assert(is_available()); + // #1 block concurrent submissions due to rolling + wait_available_promise = seastar::shared_promise<>(); + assert(!wait_unfull_flush_promise.has_value()); + return [FNAME, this] { + if (p_current_batch->is_pending()) { + if (state == state_t::FULL) { + DEBUG("{} wait flush ...", get_name()); + wait_unfull_flush_promise = seastar::promise<>(); + return wait_unfull_flush_promise->get_future(); + } else { // IDLE/PENDING + DEBUG("{} flush", get_name()); + flush_current_batch(); + return seastar::now(); + } + } else { + assert(p_current_batch->is_empty()); + return seastar::now(); + } + }().then_wrapped([FNAME, this](auto fut) { + if (fut.failed()) { + ERROR("{} rolling is skipped unexpectedly, available", get_name()); + has_io_error = true; + wait_available_promise->set_value(); + wait_available_promise.reset(); + return roll_segment_ertr::now(); + } else { + // start rolling in background + std::ignore = segment_allocator.roll( + ).safe_then([FNAME, this] { + // good + DEBUG("{} rolling done, available", get_name()); + assert(!has_io_error); + wait_available_promise->set_value(); + wait_available_promise.reset(); + }).handle_error( + crimson::ct_error::all_same_way([FNAME, this](auto e) { + ERROR("{} got error {}, available", get_name(), e); + has_io_error = true; + wait_available_promise->set_value(); + wait_available_promise.reset(); + }) + ).handle_exception([FNAME, this](auto e) { + ERROR("{} got exception {}, available", get_name(), e); + has_io_error = true; + wait_available_promise->set_value(); + wait_available_promise.reset(); + }); + // wait for background rolling + return wait_available(); + } + }); +} + +RecordSubmitter::submit_ret +RecordSubmitter::submit(record_t&& record) +{ + LOG_PREFIX(RecordSubmitter::submit); + assert(is_available()); + assert(check_action(record.size) != action_t::ROLL); + auto eval = p_current_batch->evaluate_submit( + record.size, segment_allocator.get_block_size()); + bool needs_flush = ( + state == state_t::IDLE || + eval.submit_size.get_fullness() > preferred_fullness || + // RecordBatch::needs_flush() + eval.is_full || + p_current_batch->get_num_records() + 1 >= + p_current_batch->get_batch_capacity()); + if (p_current_batch->is_empty() && + needs_flush && + state != state_t::FULL) { + // fast path with direct write + increment_io(); + auto [to_write, sizes] = p_current_batch->submit_pending_fast( + std::move(record), + segment_allocator.get_block_size(), + committed_to, + segment_allocator.get_nonce()); + DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...", + get_name(), sizes, committed_to, num_outstanding_io); + account_submission(1, sizes); + return segment_allocator.write(to_write + ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { + return record_locator_t{ + write_result.start_seq.offset.add_offset(mdlength), + write_result + }; + }).finally([this] { + decrement_io_with_flush(); + }); + } + // indirect batched write + auto write_fut = p_current_batch->add_pending( + get_name(), + std::move(record), + segment_allocator.get_block_size()); + if (needs_flush) { + if (state == state_t::FULL) { + // #2 block concurrent submissions due to lack of resource + DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...", + get_name(), + p_current_batch->get_num_records(), + num_outstanding_io); + wait_available_promise = seastar::shared_promise<>(); + assert(!wait_unfull_flush_promise.has_value()); + wait_unfull_flush_promise = seastar::promise<>(); + // flush and mark available in background + std::ignore = wait_unfull_flush_promise->get_future( + ).finally([FNAME, this] { + DEBUG("{} flush done, available", get_name()); + wait_available_promise->set_value(); + wait_available_promise.reset(); + }); + } else { + DEBUG("{} added pending, flush", get_name()); + flush_current_batch(); + } + } else { + // will flush later + DEBUG("{} added with {} pending, outstanding_io={}", + get_name(), + p_current_batch->get_num_records(), + num_outstanding_io); + assert(!p_current_batch->needs_flush()); + } + return write_fut; +} + +void RecordSubmitter::update_state() +{ + if (num_outstanding_io == 0) { + state = state_t::IDLE; + } else if (num_outstanding_io < io_depth_limit) { + state = state_t::PENDING; + } else if (num_outstanding_io == io_depth_limit) { + state = state_t::FULL; + } else { + ceph_abort("fatal error: io-depth overflow"); + } +} + +void RecordSubmitter::decrement_io_with_flush() +{ + LOG_PREFIX(RecordSubmitter::decrement_io_with_flush); + assert(num_outstanding_io > 0); + auto prv_state = state; + --num_outstanding_io; + update_state(); + + if (prv_state == state_t::FULL) { + if (wait_unfull_flush_promise.has_value()) { + DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name()); + assert(!p_current_batch->is_empty()); + assert(wait_available_promise.has_value()); + flush_current_batch(); + wait_unfull_flush_promise->set_value(); + wait_unfull_flush_promise.reset(); + return; + } + } else { + assert(!wait_unfull_flush_promise.has_value()); + } + + auto needs_flush = ( + !p_current_batch->is_empty() && ( + state == state_t::IDLE || + p_current_batch->get_submit_size().get_fullness() > preferred_fullness || + p_current_batch->needs_flush() + )); + if (needs_flush) { + DEBUG("{} flush", get_name()); + flush_current_batch(); + } +} + +void RecordSubmitter::account_submission( + std::size_t num, + const record_group_size_t& size) +{ + stats.record_group_padding_bytes += + (size.get_mdlength() - size.get_raw_mdlength()); + stats.record_group_metadata_bytes += size.get_raw_mdlength(); + stats.record_group_data_bytes += size.dlength; +} + +void RecordSubmitter::finish_submit_batch( + RecordBatch* p_batch, + maybe_result_t maybe_result) +{ + assert(p_batch->is_submitting()); + p_batch->set_result(maybe_result); + free_batch_ptrs.push_back(p_batch); + decrement_io_with_flush(); +} + +void RecordSubmitter::flush_current_batch() +{ + LOG_PREFIX(RecordSubmitter::flush_current_batch); + RecordBatch* p_batch = p_current_batch; + assert(p_batch->is_pending()); + p_current_batch = nullptr; + pop_free_batch(); + + increment_io(); + auto num = p_batch->get_num_records(); + auto [to_write, sizes] = p_batch->encode_batch( + committed_to, segment_allocator.get_nonce()); + DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...", + get_name(), num, sizes, committed_to, num_outstanding_io); + account_submission(num, sizes); + std::ignore = segment_allocator.write(to_write + ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { + TRACE("{} {} records, {}, write done with {}", + get_name(), num, sizes, write_result); + finish_submit_batch(p_batch, write_result); + }).handle_error( + crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) { + ERROR("{} {} records, {}, got error {}", + get_name(), num, sizes, e); + finish_submit_batch(p_batch, std::nullopt); + }) + ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) { + ERROR("{} {} records, {}, got exception {}", + get_name(), num, sizes, e); + finish_submit_batch(p_batch, std::nullopt); + }); +} + } diff --git a/src/crimson/os/seastore/journal/segment_allocator.h b/src/crimson/os/seastore/journal/segment_allocator.h index 98266f3fd09..a09c1667b56 100644 --- a/src/crimson/os/seastore/journal/segment_allocator.h +++ b/src/crimson/os/seastore/journal/segment_allocator.h @@ -3,6 +3,10 @@ #pragma once +#include +#include +#include + #include "include/buffer.h" #include "crimson/common/errorator.h" @@ -138,4 +142,310 @@ class SegmentAllocator { seastore_off_t written_to; }; +/** + * RecordBatch + * + * Maintain a batch of records for submit. + */ +class RecordBatch { + enum class state_t { + EMPTY = 0, + PENDING, + SUBMITTING + }; + +public: + RecordBatch() = default; + RecordBatch(RecordBatch&&) = delete; + RecordBatch(const RecordBatch&) = delete; + RecordBatch& operator=(RecordBatch&&) = delete; + RecordBatch& operator=(const RecordBatch&) = delete; + + bool is_empty() const { + return state == state_t::EMPTY; + } + + bool is_pending() const { + return state == state_t::PENDING; + } + + bool is_submitting() const { + return state == state_t::SUBMITTING; + } + + std::size_t get_index() const { + return index; + } + + std::size_t get_num_records() const { + return pending.get_size(); + } + + std::size_t get_batch_capacity() const { + return batch_capacity; + } + + const record_group_size_t& get_submit_size() const { + assert(state != state_t::EMPTY); + return pending.size; + } + + bool needs_flush() const { + assert(state != state_t::SUBMITTING); + assert(pending.get_size() <= batch_capacity); + if (state == state_t::EMPTY) { + return false; + } else { + assert(state == state_t::PENDING); + return (pending.get_size() >= batch_capacity || + pending.size.get_encoded_length() > batch_flush_size); + } + } + + struct evaluation_t { + record_group_size_t submit_size; + bool is_full; + }; + evaluation_t evaluate_submit( + const record_size_t& rsize, + extent_len_t block_size) const { + assert(!needs_flush()); + auto submit_size = pending.size.get_encoded_length_after( + rsize, block_size); + bool is_full = submit_size.get_encoded_length() > batch_flush_size; + return {submit_size, is_full}; + } + + void initialize(std::size_t i, + std::size_t _batch_capacity, + std::size_t _batch_flush_size) { + ceph_assert(_batch_capacity > 0); + index = i; + batch_capacity = _batch_capacity; + batch_flush_size = _batch_flush_size; + pending.reserve(batch_capacity); + } + + // Add to the batch, the future will be resolved after the batch is + // written. + // + // Set write_result_t::write_length to 0 if the record is not the first one + // in the batch. + using add_pending_ertr = SegmentAllocator::write_ertr; + using add_pending_ret = add_pending_ertr::future; + add_pending_ret add_pending( + const std::string& name, + record_t&&, + extent_len_t block_size); + + // Encode the batched records for write. + std::pair encode_batch( + const journal_seq_t& committed_to, + segment_nonce_t segment_nonce); + + // Set the write result and reset for reuse + using maybe_result_t = std::optional; + void set_result(maybe_result_t maybe_write_end_seq); + + // The fast path that is equivalent to submit a single record as a batch. + // + // Essentially, equivalent to the combined logic of: + // add_pending(), encode_batch() and set_result() above without + // the intervention of the shared io_promise. + // + // Note the current RecordBatch can be reused afterwards. + std::pair submit_pending_fast( + record_t&&, + extent_len_t block_size, + const journal_seq_t& committed_to, + segment_nonce_t segment_nonce); + +private: + record_group_size_t get_encoded_length_after( + const record_t& record, + extent_len_t block_size) const { + return pending.size.get_encoded_length_after( + record.size, block_size); + } + + state_t state = state_t::EMPTY; + std::size_t index = 0; + std::size_t batch_capacity = 0; + std::size_t batch_flush_size = 0; + + record_group_t pending; + std::size_t submitting_size = 0; + seastore_off_t submitting_length = 0; + seastore_off_t submitting_mdlength = 0; + + struct promise_result_t { + write_result_t write_result; + seastore_off_t mdlength; + }; + using maybe_promise_result_t = std::optional; + std::optional > io_promise; +}; + +/** + * RecordSubmitter + * + * Submit records concurrently with RecordBatch with SegmentAllocator. + * + * Configurations and controls: + * - io_depth: the io-depth limit to SegmentAllocator; + * - batch_capacity: the number limit of records in a RecordBatch; + * - batch_flush_size: the bytes threshold to force flush a RecordBatch to + * control the maximum latency; + * - preferred_fullness: the fullness threshold to flush a RecordBatch; + */ +class RecordSubmitter { + enum class state_t { + IDLE = 0, // outstanding_io == 0 + PENDING, // outstanding_io < io_depth_limit + FULL // outstanding_io == io_depth_limit + // OVERFLOW: outstanding_io > io_depth_limit is impossible + }; + + struct grouped_io_stats { + uint64_t num_io = 0; + uint64_t num_io_grouped = 0; + + void increment(uint64_t num_grouped_io) { + ++num_io; + num_io_grouped += num_grouped_io; + } + }; + + using base_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + +public: + RecordSubmitter(std::size_t io_depth, + std::size_t batch_capacity, + std::size_t batch_flush_size, + double preferred_fullness, + SegmentAllocator&); + + const std::string& get_name() const { + return segment_allocator.get_name(); + } + + grouped_io_stats get_record_batch_stats() const { + return stats.record_batch_stats; + } + + grouped_io_stats get_io_depth_stats() const { + return stats.io_depth_stats; + } + + uint64_t get_record_group_padding_bytes() const { + return stats.record_group_padding_bytes; + } + + uint64_t get_record_group_metadata_bytes() const { + return stats.record_group_metadata_bytes; + } + + uint64_t get_record_group_data_bytes() const { + return stats.record_group_data_bytes; + } + + journal_seq_t get_committed_to() const { + return committed_to; + } + + void reset_stats() { + stats = {}; + } + + // whether is available to submit a record + bool is_available() const; + + // wait for available if cannot submit, should check is_available() again + // when the future is resolved. + using wa_ertr = base_ertr; + wa_ertr::future<> wait_available(); + + // when available, check for the submit action + // according to the pending record size + enum class action_t { + ROLL, + SUBMIT_FULL, + SUBMIT_NOT_FULL + }; + action_t check_action(const record_size_t&) const; + + // when available, roll the segment if needed + using roll_segment_ertr = base_ertr; + roll_segment_ertr::future<> roll_segment(); + + // when available, submit the record if possible + using submit_ertr = base_ertr; + using submit_ret = submit_ertr::future; + submit_ret submit(record_t&&); + + void update_committed_to(const journal_seq_t& new_committed_to) { + assert(new_committed_to != JOURNAL_SEQ_NULL); + assert(committed_to == JOURNAL_SEQ_NULL || + committed_to <= new_committed_to); + committed_to = new_committed_to; + } + +private: + void update_state(); + + void increment_io() { + ++num_outstanding_io; + stats.io_depth_stats.increment(num_outstanding_io); + update_state(); + } + + void decrement_io_with_flush(); + + void pop_free_batch() { + assert(p_current_batch == nullptr); + assert(!free_batch_ptrs.empty()); + p_current_batch = free_batch_ptrs.front(); + assert(p_current_batch->is_empty()); + assert(p_current_batch == &batches[p_current_batch->get_index()]); + free_batch_ptrs.pop_front(); + } + + void account_submission(std::size_t, const record_group_size_t&); + + using maybe_result_t = RecordBatch::maybe_result_t; + void finish_submit_batch(RecordBatch*, maybe_result_t); + + void flush_current_batch(); + + state_t state = state_t::IDLE; + std::size_t num_outstanding_io = 0; + std::size_t io_depth_limit; + double preferred_fullness; + + SegmentAllocator& segment_allocator; + // committed_to may be in a previous journal segment + journal_seq_t committed_to = JOURNAL_SEQ_NULL; + + std::unique_ptr batches; + // should not be nullptr after constructed + RecordBatch* p_current_batch = nullptr; + seastar::circular_buffer free_batch_ptrs; + + // blocked for rolling or lack of resource + std::optional > wait_available_promise; + bool has_io_error = false; + // when needs flush but io depth is full, + // wait for decrement_io_with_flush() + std::optional > wait_unfull_flush_promise; + + struct { + grouped_io_stats record_batch_stats; + grouped_io_stats io_depth_stats; + uint64_t record_group_padding_bytes = 0; + uint64_t record_group_metadata_bytes = 0; + uint64_t record_group_data_bytes = 0; + } stats; +}; + } diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index f60cd4fd5fb..175193e8867 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -442,431 +442,4 @@ void SegmentedJournal::register_metrics() ); } -SegmentedJournal::RecordBatch::add_pending_ret -SegmentedJournal::RecordBatch::add_pending( - const std::string& name, - record_t&& record, - extent_len_t block_size) -{ - LOG_PREFIX(RecordBatch::add_pending); - auto new_size = get_encoded_length_after(record, block_size); - auto dlength_offset = pending.size.dlength; - TRACE("{} batches={}, write_size={}, dlength_offset={} ...", - name, - pending.get_size() + 1, - new_size.get_encoded_length(), - dlength_offset); - assert(state != state_t::SUBMITTING); - assert(evaluate_submit(record.size, block_size).submit_size == new_size); - - pending.push_back( - std::move(record), block_size); - assert(pending.size == new_size); - if (state == state_t::EMPTY) { - assert(!io_promise.has_value()); - io_promise = seastar::shared_promise(); - } else { - assert(io_promise.has_value()); - } - state = state_t::PENDING; - - return io_promise->get_shared_future( - ).then([dlength_offset, FNAME, &name - ](auto maybe_promise_result) -> add_pending_ret { - if (!maybe_promise_result.has_value()) { - ERROR("{} write failed", name); - return crimson::ct_error::input_output_error::make(); - } - auto write_result = maybe_promise_result->write_result; - auto submit_result = record_locator_t{ - write_result.start_seq.offset.add_offset( - maybe_promise_result->mdlength + dlength_offset), - write_result - }; - TRACE("{} write finish with {}", name, submit_result); - return add_pending_ret( - add_pending_ertr::ready_future_marker{}, - submit_result); - }); -} - -std::pair -SegmentedJournal::RecordBatch::encode_batch( - const journal_seq_t& committed_to, - segment_nonce_t segment_nonce) -{ - assert(state == state_t::PENDING); - assert(pending.get_size() > 0); - assert(io_promise.has_value()); - - state = state_t::SUBMITTING; - submitting_size = pending.get_size(); - auto gsize = pending.size; - submitting_length = gsize.get_encoded_length(); - submitting_mdlength = gsize.get_mdlength(); - auto bl = encode_records(pending, committed_to, segment_nonce); - // Note: pending is cleared here - assert(bl.length() == (std::size_t)submitting_length); - return std::make_pair(bl, gsize); -} - -void SegmentedJournal::RecordBatch::set_result( - maybe_result_t maybe_write_result) -{ - maybe_promise_result_t result; - if (maybe_write_result.has_value()) { - assert(maybe_write_result->length == submitting_length); - result = promise_result_t{ - *maybe_write_result, - submitting_mdlength - }; - } - assert(state == state_t::SUBMITTING); - assert(io_promise.has_value()); - - state = state_t::EMPTY; - submitting_size = 0; - submitting_length = 0; - submitting_mdlength = 0; - io_promise->set_value(result); - io_promise.reset(); -} - -std::pair -SegmentedJournal::RecordBatch::submit_pending_fast( - record_t&& record, - extent_len_t block_size, - const journal_seq_t& committed_to, - segment_nonce_t segment_nonce) -{ - auto new_size = get_encoded_length_after(record, block_size); - std::ignore = new_size; - assert(state == state_t::EMPTY); - assert(evaluate_submit(record.size, block_size).submit_size == new_size); - - auto group = record_group_t(std::move(record), block_size); - auto size = group.size; - assert(size == new_size); - auto bl = encode_records(group, committed_to, segment_nonce); - assert(bl.length() == size.get_encoded_length()); - return std::make_pair(bl, size); -} - -SegmentedJournal::RecordSubmitter::RecordSubmitter( - std::size_t io_depth, - std::size_t batch_capacity, - std::size_t batch_flush_size, - double preferred_fullness, - SegmentAllocator& sa) - : io_depth_limit{io_depth}, - preferred_fullness{preferred_fullness}, - segment_allocator{sa}, - batches(new RecordBatch[io_depth + 1]) -{ - LOG_PREFIX(RecordSubmitter); - INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, " - "preferred_fullness={}", - get_name(), io_depth, batch_capacity, - batch_flush_size, preferred_fullness); - ceph_assert(io_depth > 0); - ceph_assert(batch_capacity > 0); - ceph_assert(preferred_fullness >= 0 && - preferred_fullness <= 1); - free_batch_ptrs.reserve(io_depth + 1); - for (std::size_t i = 0; i <= io_depth; ++i) { - batches[i].initialize(i, batch_capacity, batch_flush_size); - free_batch_ptrs.push_back(&batches[i]); - } - pop_free_batch(); -} - -bool SegmentedJournal::RecordSubmitter::is_available() const -{ - auto ret = !wait_available_promise.has_value() && - !has_io_error; -#ifndef NDEBUG - if (ret) { - // invariants when available - ceph_assert(segment_allocator.can_write()); - ceph_assert(p_current_batch != nullptr); - ceph_assert(!p_current_batch->is_submitting()); - ceph_assert(!p_current_batch->needs_flush()); - if (!p_current_batch->is_empty()) { - auto submit_length = - p_current_batch->get_submit_size().get_encoded_length(); - ceph_assert(!segment_allocator.needs_roll(submit_length)); - } - } -#endif - return ret; -} - -SegmentedJournal::RecordSubmitter::wa_ertr::future<> -SegmentedJournal::RecordSubmitter::wait_available() -{ - LOG_PREFIX(RecordSubmitter::wait_available); - assert(!is_available()); - if (has_io_error) { - ERROR("{} I/O is failed before wait", get_name()); - return crimson::ct_error::input_output_error::make(); - } - return wait_available_promise->get_shared_future( - ).then([FNAME, this]() -> wa_ertr::future<> { - if (has_io_error) { - ERROR("{} I/O is failed after wait", get_name()); - return crimson::ct_error::input_output_error::make(); - } - return wa_ertr::now(); - }); -} - -SegmentedJournal::RecordSubmitter::action_t -SegmentedJournal::RecordSubmitter::check_action( - const record_size_t& rsize) const -{ - assert(is_available()); - auto eval = p_current_batch->evaluate_submit( - rsize, segment_allocator.get_block_size()); - if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) { - return action_t::ROLL; - } else if (eval.is_full) { - return action_t::SUBMIT_FULL; - } else { - return action_t::SUBMIT_NOT_FULL; - } -} - -SegmentedJournal::RecordSubmitter::roll_segment_ertr::future<> -SegmentedJournal::RecordSubmitter::roll_segment() -{ - LOG_PREFIX(RecordSubmitter::roll_segment); - assert(is_available()); - // #1 block concurrent submissions due to rolling - wait_available_promise = seastar::shared_promise<>(); - assert(!wait_unfull_flush_promise.has_value()); - return [FNAME, this] { - if (p_current_batch->is_pending()) { - if (state == state_t::FULL) { - DEBUG("{} wait flush ...", get_name()); - wait_unfull_flush_promise = seastar::promise<>(); - return wait_unfull_flush_promise->get_future(); - } else { // IDLE/PENDING - DEBUG("{} flush", get_name()); - flush_current_batch(); - return seastar::now(); - } - } else { - assert(p_current_batch->is_empty()); - return seastar::now(); - } - }().then_wrapped([FNAME, this](auto fut) { - if (fut.failed()) { - ERROR("{} rolling is skipped unexpectedly, available", get_name()); - has_io_error = true; - wait_available_promise->set_value(); - wait_available_promise.reset(); - return roll_segment_ertr::now(); - } else { - // start rolling in background - std::ignore = segment_allocator.roll( - ).safe_then([FNAME, this] { - // good - DEBUG("{} rolling done, available", get_name()); - assert(!has_io_error); - wait_available_promise->set_value(); - wait_available_promise.reset(); - }).handle_error( - crimson::ct_error::all_same_way([FNAME, this](auto e) { - ERROR("{} got error {}, available", get_name(), e); - has_io_error = true; - wait_available_promise->set_value(); - wait_available_promise.reset(); - }) - ).handle_exception([FNAME, this](auto e) { - ERROR("{} got exception {}, available", get_name(), e); - has_io_error = true; - wait_available_promise->set_value(); - wait_available_promise.reset(); - }); - // wait for background rolling - return wait_available(); - } - }); -} - -SegmentedJournal::RecordSubmitter::submit_ret -SegmentedJournal::RecordSubmitter::submit(record_t&& record) -{ - LOG_PREFIX(RecordSubmitter::submit); - assert(is_available()); - assert(check_action(record.size) != action_t::ROLL); - auto eval = p_current_batch->evaluate_submit( - record.size, segment_allocator.get_block_size()); - bool needs_flush = ( - state == state_t::IDLE || - eval.submit_size.get_fullness() > preferred_fullness || - // RecordBatch::needs_flush() - eval.is_full || - p_current_batch->get_num_records() + 1 >= - p_current_batch->get_batch_capacity()); - if (p_current_batch->is_empty() && - needs_flush && - state != state_t::FULL) { - // fast path with direct write - increment_io(); - auto [to_write, sizes] = p_current_batch->submit_pending_fast( - std::move(record), - segment_allocator.get_block_size(), - committed_to, - segment_allocator.get_nonce()); - DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...", - get_name(), sizes, committed_to, num_outstanding_io); - account_submission(1, sizes); - return segment_allocator.write(to_write - ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { - return record_locator_t{ - write_result.start_seq.offset.add_offset(mdlength), - write_result - }; - }).finally([this] { - decrement_io_with_flush(); - }); - } - // indirect batched write - auto write_fut = p_current_batch->add_pending( - get_name(), - std::move(record), - segment_allocator.get_block_size()); - if (needs_flush) { - if (state == state_t::FULL) { - // #2 block concurrent submissions due to lack of resource - DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...", - get_name(), - p_current_batch->get_num_records(), - num_outstanding_io); - wait_available_promise = seastar::shared_promise<>(); - assert(!wait_unfull_flush_promise.has_value()); - wait_unfull_flush_promise = seastar::promise<>(); - // flush and mark available in background - std::ignore = wait_unfull_flush_promise->get_future( - ).finally([FNAME, this] { - DEBUG("{} flush done, available", get_name()); - wait_available_promise->set_value(); - wait_available_promise.reset(); - }); - } else { - DEBUG("{} added pending, flush", get_name()); - flush_current_batch(); - } - } else { - // will flush later - DEBUG("{} added with {} pending, outstanding_io={}", - get_name(), - p_current_batch->get_num_records(), - num_outstanding_io); - assert(!p_current_batch->needs_flush()); - } - return write_fut; -} - -void SegmentedJournal::RecordSubmitter::update_state() -{ - if (num_outstanding_io == 0) { - state = state_t::IDLE; - } else if (num_outstanding_io < io_depth_limit) { - state = state_t::PENDING; - } else if (num_outstanding_io == io_depth_limit) { - state = state_t::FULL; - } else { - ceph_abort("fatal error: io-depth overflow"); - } -} - -void SegmentedJournal::RecordSubmitter::decrement_io_with_flush() -{ - LOG_PREFIX(RecordSubmitter::decrement_io_with_flush); - assert(num_outstanding_io > 0); - auto prv_state = state; - --num_outstanding_io; - update_state(); - - if (prv_state == state_t::FULL) { - if (wait_unfull_flush_promise.has_value()) { - DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name()); - assert(!p_current_batch->is_empty()); - assert(wait_available_promise.has_value()); - flush_current_batch(); - wait_unfull_flush_promise->set_value(); - wait_unfull_flush_promise.reset(); - return; - } - } else { - assert(!wait_unfull_flush_promise.has_value()); - } - - auto needs_flush = ( - !p_current_batch->is_empty() && ( - state == state_t::IDLE || - p_current_batch->get_submit_size().get_fullness() > preferred_fullness || - p_current_batch->needs_flush() - )); - if (needs_flush) { - DEBUG("{} flush", get_name()); - flush_current_batch(); - } -} - -void SegmentedJournal::RecordSubmitter::account_submission( - std::size_t num, - const record_group_size_t& size) -{ - stats.record_group_padding_bytes += - (size.get_mdlength() - size.get_raw_mdlength()); - stats.record_group_metadata_bytes += size.get_raw_mdlength(); - stats.record_group_data_bytes += size.dlength; -} - -void SegmentedJournal::RecordSubmitter::finish_submit_batch( - RecordBatch* p_batch, - maybe_result_t maybe_result) -{ - assert(p_batch->is_submitting()); - p_batch->set_result(maybe_result); - free_batch_ptrs.push_back(p_batch); - decrement_io_with_flush(); -} - -void SegmentedJournal::RecordSubmitter::flush_current_batch() -{ - LOG_PREFIX(RecordSubmitter::flush_current_batch); - RecordBatch* p_batch = p_current_batch; - assert(p_batch->is_pending()); - p_current_batch = nullptr; - pop_free_batch(); - - increment_io(); - auto num = p_batch->get_num_records(); - auto [to_write, sizes] = p_batch->encode_batch( - committed_to, segment_allocator.get_nonce()); - DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...", - get_name(), num, sizes, committed_to, num_outstanding_io); - account_submission(num, sizes); - std::ignore = segment_allocator.write(to_write - ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { - TRACE("{} {} records, {}, write done with {}", - get_name(), num, sizes, write_result); - finish_submit_batch(p_batch, write_result); - }).handle_error( - crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) { - ERROR("{} {} records, {}, got error {}", - get_name(), num, sizes, e); - finish_submit_batch(p_batch, std::nullopt); - }) - ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) { - ERROR("{} {} records, {}, got exception {}", - get_name(), num, sizes, e); - finish_submit_batch(p_batch, std::nullopt); - }); -} - } diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index d8cf773e503..1ab33dd9281 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -3,13 +3,8 @@ #pragma once -#include -#include - -#include #include #include -#include #include "include/ceph_assert.h" #include "include/buffer.h" @@ -58,295 +53,6 @@ private: OrderingHandle &handle ); - class RecordBatch { - enum class state_t { - EMPTY = 0, - PENDING, - SUBMITTING - }; - - public: - RecordBatch() = default; - RecordBatch(RecordBatch&&) = delete; - RecordBatch(const RecordBatch&) = delete; - RecordBatch& operator=(RecordBatch&&) = delete; - RecordBatch& operator=(const RecordBatch&) = delete; - - bool is_empty() const { - return state == state_t::EMPTY; - } - - bool is_pending() const { - return state == state_t::PENDING; - } - - bool is_submitting() const { - return state == state_t::SUBMITTING; - } - - std::size_t get_index() const { - return index; - } - - std::size_t get_num_records() const { - return pending.get_size(); - } - - std::size_t get_batch_capacity() const { - return batch_capacity; - } - - const record_group_size_t& get_submit_size() const { - assert(state != state_t::EMPTY); - return pending.size; - } - - bool needs_flush() const { - assert(state != state_t::SUBMITTING); - assert(pending.get_size() <= batch_capacity); - if (state == state_t::EMPTY) { - return false; - } else { - assert(state == state_t::PENDING); - return (pending.get_size() >= batch_capacity || - pending.size.get_encoded_length() > batch_flush_size); - } - } - - struct evaluation_t { - record_group_size_t submit_size; - bool is_full; - }; - evaluation_t evaluate_submit( - const record_size_t& rsize, - extent_len_t block_size) const { - assert(!needs_flush()); - auto submit_size = pending.size.get_encoded_length_after( - rsize, block_size); - bool is_full = submit_size.get_encoded_length() > batch_flush_size; - return {submit_size, is_full}; - } - - void initialize(std::size_t i, - std::size_t _batch_capacity, - std::size_t _batch_flush_size) { - ceph_assert(_batch_capacity > 0); - index = i; - batch_capacity = _batch_capacity; - batch_flush_size = _batch_flush_size; - pending.reserve(batch_capacity); - } - - // Add to the batch, the future will be resolved after the batch is - // written. - // - // Set write_result_t::write_length to 0 if the record is not the first one - // in the batch. - using add_pending_ertr = SegmentAllocator::write_ertr; - using add_pending_ret = add_pending_ertr::future; - add_pending_ret add_pending( - const std::string& name, - record_t&&, - extent_len_t block_size); - - // Encode the batched records for write. - std::pair encode_batch( - const journal_seq_t& committed_to, - segment_nonce_t segment_nonce); - - // Set the write result and reset for reuse - using maybe_result_t = std::optional; - void set_result(maybe_result_t maybe_write_end_seq); - - // The fast path that is equivalent to submit a single record as a batch. - // - // Essentially, equivalent to the combined logic of: - // add_pending(), encode_batch() and set_result() above without - // the intervention of the shared io_promise. - // - // Note the current RecordBatch can be reused afterwards. - std::pair submit_pending_fast( - record_t&&, - extent_len_t block_size, - const journal_seq_t& committed_to, - segment_nonce_t segment_nonce); - - private: - record_group_size_t get_encoded_length_after( - const record_t& record, - extent_len_t block_size) const { - return pending.size.get_encoded_length_after( - record.size, block_size); - } - - state_t state = state_t::EMPTY; - std::size_t index = 0; - std::size_t batch_capacity = 0; - std::size_t batch_flush_size = 0; - - record_group_t pending; - std::size_t submitting_size = 0; - seastore_off_t submitting_length = 0; - seastore_off_t submitting_mdlength = 0; - - struct promise_result_t { - write_result_t write_result; - seastore_off_t mdlength; - }; - using maybe_promise_result_t = std::optional; - std::optional > io_promise; - }; - - class RecordSubmitter { - enum class state_t { - IDLE = 0, // outstanding_io == 0 - PENDING, // outstanding_io < io_depth_limit - FULL // outstanding_io == io_depth_limit - // OVERFLOW: outstanding_io > io_depth_limit is impossible - }; - - struct grouped_io_stats { - uint64_t num_io = 0; - uint64_t num_io_grouped = 0; - - void increment(uint64_t num_grouped_io) { - ++num_io; - num_io_grouped += num_grouped_io; - } - }; - - using base_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - - public: - RecordSubmitter(std::size_t io_depth, - std::size_t batch_capacity, - std::size_t batch_flush_size, - double preferred_fullness, - SegmentAllocator&); - - const std::string& get_name() const { - return segment_allocator.get_name(); - } - - grouped_io_stats get_record_batch_stats() const { - return stats.record_batch_stats; - } - - grouped_io_stats get_io_depth_stats() const { - return stats.io_depth_stats; - } - - uint64_t get_record_group_padding_bytes() const { - return stats.record_group_padding_bytes; - } - - uint64_t get_record_group_metadata_bytes() const { - return stats.record_group_metadata_bytes; - } - - uint64_t get_record_group_data_bytes() const { - return stats.record_group_data_bytes; - } - - journal_seq_t get_committed_to() const { - return committed_to; - } - - void reset_stats() { - stats = {}; - } - - // whether is available to submit a record - bool is_available() const; - - // wait for available if cannot submit, should check is_available() again - // when the future is resolved. - using wa_ertr = base_ertr; - wa_ertr::future<> wait_available(); - - // when available, check for the submit action - // according to the pending record size - enum class action_t { - ROLL, - SUBMIT_FULL, - SUBMIT_NOT_FULL - }; - action_t check_action(const record_size_t&) const; - - // when available, roll the segment if needed - using roll_segment_ertr = base_ertr; - roll_segment_ertr::future<> roll_segment(); - - // when available, submit the record if possible - using submit_ertr = base_ertr; - using submit_ret = submit_ertr::future; - submit_ret submit(record_t&&); - - void update_committed_to(const journal_seq_t& new_committed_to) { - assert(new_committed_to != JOURNAL_SEQ_NULL); - assert(committed_to == JOURNAL_SEQ_NULL || - committed_to <= new_committed_to); - committed_to = new_committed_to; - } - - private: - void update_state(); - - void increment_io() { - ++num_outstanding_io; - stats.io_depth_stats.increment(num_outstanding_io); - update_state(); - } - - void decrement_io_with_flush(); - - void pop_free_batch() { - assert(p_current_batch == nullptr); - assert(!free_batch_ptrs.empty()); - p_current_batch = free_batch_ptrs.front(); - assert(p_current_batch->is_empty()); - assert(p_current_batch == &batches[p_current_batch->get_index()]); - free_batch_ptrs.pop_front(); - } - - void account_submission(std::size_t, const record_group_size_t&); - - using maybe_result_t = RecordBatch::maybe_result_t; - void finish_submit_batch(RecordBatch*, maybe_result_t); - - void flush_current_batch(); - - state_t state = state_t::IDLE; - std::size_t num_outstanding_io = 0; - std::size_t io_depth_limit; - double preferred_fullness; - - SegmentAllocator& segment_allocator; - // committed_to may be in a previous journal segment - journal_seq_t committed_to = JOURNAL_SEQ_NULL; - - std::unique_ptr batches; - // should not be nullptr after constructed - RecordBatch* p_current_batch = nullptr; - seastar::circular_buffer free_batch_ptrs; - - // blocked for rolling or lack of resource - std::optional > wait_available_promise; - bool has_io_error = false; - // when needs flush but io depth is full, - // wait for decrement_io_with_flush() - std::optional > wait_unfull_flush_promise; - - struct { - grouped_io_stats record_batch_stats; - grouped_io_stats io_depth_stats; - uint64_t record_group_padding_bytes = 0; - uint64_t record_group_metadata_bytes = 0; - uint64_t record_group_data_bytes = 0; - } stats; - }; - SegmentProvider& segment_provider; SegmentAllocator journal_segment_allocator; RecordSubmitter record_submitter; -- 2.39.5