From 08154649e60932d9ea01a8892dab9c2798f6e961 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 9 Mar 2022 21:44:30 +0800 Subject: [PATCH] crimson/os/seastore/journal: rework RecordSubmitter for reuse * Redesign submission logic to support concurrent submits; * Extract out journal specific write_pipeline and flush; * Distinguish logs by name; Signed-off-by: Yingxin Cheng --- .../os/seastore/journal/segmented_journal.cc | 480 +++++++++++------- .../os/seastore/journal/segmented_journal.h | 131 +++-- 2 files changed, 388 insertions(+), 223 deletions(-) diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index db4596d22296e..f60cd4fd5fbc4 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -304,6 +304,82 @@ SegmentedJournal::replay_ret SegmentedJournal::replay( }); } +seastar::future<> SegmentedJournal::flush(OrderingHandle &handle) +{ + LOG_PREFIX(SegmentedJournal::flush); + DEBUG("H{} flush ...", (void*)&handle); + assert(write_pipeline); + return handle.enter(write_pipeline->device_submission + ).then([this, &handle] { + return handle.enter(write_pipeline->finalize); + }).then([FNAME, &handle] { + DEBUG("H{} flush done", (void*)&handle); + }); +} + +SegmentedJournal::submit_record_ret +SegmentedJournal::do_submit_record( + record_t &&record, + OrderingHandle &handle) +{ + LOG_PREFIX(SegmentedJournal::do_submit_record); + if (!record_submitter.is_available()) { + DEBUG("H{} wait ...", (void*)&handle); + return record_submitter.wait_available( + ).safe_then([this, record=std::move(record), &handle]() mutable { + return do_submit_record(std::move(record), handle); + }); + } + auto action = record_submitter.check_action(record.size); + if (action == RecordSubmitter::action_t::ROLL) { + DEBUG("H{} roll, unavailable ...", (void*)&handle); + return record_submitter.roll_segment( + ).safe_then([this, record=std::move(record), &handle]() mutable { + return do_submit_record(std::move(record), handle); + }); + } else { // SUBMIT_FULL/NOT_FULL + DEBUG("H{} submit {} ...", + (void*)&handle, + action == RecordSubmitter::action_t::SUBMIT_FULL ? + "FULL" : "NOT_FULL"); + auto submit_fut = record_submitter.submit(std::move(record)); + return handle.enter(write_pipeline->device_submission + ).then([submit_fut=std::move(submit_fut)]() mutable { + return std::move(submit_fut); + }).safe_then([FNAME, this, &handle](record_locator_t result) { + return handle.enter(write_pipeline->finalize + ).then([FNAME, this, result, &handle] { + DEBUG("H{} finish with {}", (void*)&handle, result); + auto new_committed_to = result.write_result.get_end_seq(); + record_submitter.update_committed_to(new_committed_to); + return result; + }); + }); + } +} + +SegmentedJournal::submit_record_ret +SegmentedJournal::submit_record( + record_t &&record, + OrderingHandle &handle) +{ + LOG_PREFIX(SegmentedJournal::submit_record); + DEBUG("H{} {} start ...", (void*)&handle, record); + assert(write_pipeline); + auto expected_size = record_group_size_t( + record.size, + journal_segment_allocator.get_block_size() + ).get_encoded_length(); + auto max_record_length = journal_segment_allocator.get_max_write_length(); + if (expected_size > max_record_length) { + ERROR("H{} {} exceeds max record size {}", + (void*)&handle, record, max_record_length); + return crimson::ct_error::erange::make(); + } + + return do_submit_record(std::move(record), handle); +} + void SegmentedJournal::register_metrics() { LOG_PREFIX(Journal::register_metrics); @@ -368,20 +444,20 @@ void SegmentedJournal::register_metrics() SegmentedJournal::RecordBatch::add_pending_ret SegmentedJournal::RecordBatch::add_pending( + const std::string& name, record_t&& record, - OrderingHandle& handle, extent_len_t block_size) { LOG_PREFIX(RecordBatch::add_pending); auto new_size = get_encoded_length_after(record, block_size); auto dlength_offset = pending.size.dlength; - TRACE("H{} batches={}, write_size={}, dlength_offset={} ...", - (void*)&handle, + TRACE("{} batches={}, write_size={}, dlength_offset={} ...", + name, pending.get_size() + 1, new_size.get_encoded_length(), dlength_offset); assert(state != state_t::SUBMITTING); - assert(can_batch(record, block_size).value() == new_size); + assert(evaluate_submit(record.size, block_size).submit_size == new_size); pending.push_back( std::move(record), block_size); @@ -395,10 +471,10 @@ SegmentedJournal::RecordBatch::add_pending( state = state_t::PENDING; return io_promise->get_shared_future( - ).then([dlength_offset, FNAME, &handle + ).then([dlength_offset, FNAME, &name ](auto maybe_promise_result) -> add_pending_ret { if (!maybe_promise_result.has_value()) { - ERROR("H{} write failed", (void*)&handle); + ERROR("{} write failed", name); return crimson::ct_error::input_output_error::make(); } auto write_result = maybe_promise_result->write_result; @@ -407,7 +483,7 @@ SegmentedJournal::RecordBatch::add_pending( maybe_promise_result->mdlength + dlength_offset), write_result }; - TRACE("H{} write finish with {}", (void*)&handle, submit_result); + TRACE("{} write finish with {}", name, submit_result); return add_pending_ret( add_pending_ertr::ready_future_marker{}, submit_result); @@ -466,7 +542,7 @@ SegmentedJournal::RecordBatch::submit_pending_fast( auto new_size = get_encoded_length_after(record, block_size); std::ignore = new_size; assert(state == state_t::EMPTY); - assert(can_batch(record, block_size).value() == new_size); + assert(evaluate_submit(record.size, block_size).submit_size == new_size); auto group = record_group_t(std::move(record), block_size); auto size = group.size; @@ -481,17 +557,16 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter( std::size_t batch_capacity, std::size_t batch_flush_size, double preferred_fullness, - SegmentAllocator& jsa) + SegmentAllocator& sa) : io_depth_limit{io_depth}, preferred_fullness{preferred_fullness}, - journal_segment_allocator{jsa}, + segment_allocator{sa}, batches(new RecordBatch[io_depth + 1]) { LOG_PREFIX(RecordSubmitter); - INFO("Journal::RecordSubmitter: io_depth_limit={}, " - "batch_capacity={}, batch_flush_size={}, " + INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, " "preferred_fullness={}", - io_depth, batch_capacity, + get_name(), io_depth, batch_capacity, batch_flush_size, preferred_fullness); ceph_assert(io_depth > 0); ceph_assert(batch_capacity > 0); @@ -505,40 +580,195 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter( pop_free_batch(); } -SegmentedJournal::RecordSubmitter::submit_ret -SegmentedJournal::RecordSubmitter::submit( - record_t&& record, - OrderingHandle& handle) +bool SegmentedJournal::RecordSubmitter::is_available() const { - LOG_PREFIX(RecordSubmitter::submit); - DEBUG("H{} {} start ...", (void*)&handle, record); - assert(write_pipeline); - auto expected_size = record_group_size_t( - record.size, - journal_segment_allocator.get_block_size() - ).get_encoded_length(); - auto max_record_length = journal_segment_allocator.get_max_write_length(); - if (expected_size > max_record_length) { - ERROR("H{} {} exceeds max record size {}", - (void*)&handle, record, max_record_length); - return crimson::ct_error::erange::make(); + auto ret = !wait_available_promise.has_value() && + !has_io_error; +#ifndef NDEBUG + if (ret) { + // invariants when available + ceph_assert(segment_allocator.can_write()); + ceph_assert(p_current_batch != nullptr); + ceph_assert(!p_current_batch->is_submitting()); + ceph_assert(!p_current_batch->needs_flush()); + if (!p_current_batch->is_empty()) { + auto submit_length = + p_current_batch->get_submit_size().get_encoded_length(); + ceph_assert(!segment_allocator.needs_roll(submit_length)); + } } +#endif + return ret; +} - return do_submit(std::move(record), handle); +SegmentedJournal::RecordSubmitter::wa_ertr::future<> +SegmentedJournal::RecordSubmitter::wait_available() +{ + LOG_PREFIX(RecordSubmitter::wait_available); + assert(!is_available()); + if (has_io_error) { + ERROR("{} I/O is failed before wait", get_name()); + return crimson::ct_error::input_output_error::make(); + } + return wait_available_promise->get_shared_future( + ).then([FNAME, this]() -> wa_ertr::future<> { + if (has_io_error) { + ERROR("{} I/O is failed after wait", get_name()); + return crimson::ct_error::input_output_error::make(); + } + return wa_ertr::now(); + }); } -seastar::future<> SegmentedJournal::RecordSubmitter::flush(OrderingHandle &handle) +SegmentedJournal::RecordSubmitter::action_t +SegmentedJournal::RecordSubmitter::check_action( + const record_size_t& rsize) const { - LOG_PREFIX(RecordSubmitter::flush); - DEBUG("H{} flush", (void*)&handle); - return handle.enter(write_pipeline->device_submission - ).then([this, &handle] { - return handle.enter(write_pipeline->finalize); - }).then([FNAME, &handle] { - DEBUG("H{} flush done", (void*)&handle); + assert(is_available()); + auto eval = p_current_batch->evaluate_submit( + rsize, segment_allocator.get_block_size()); + if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) { + return action_t::ROLL; + } else if (eval.is_full) { + return action_t::SUBMIT_FULL; + } else { + return action_t::SUBMIT_NOT_FULL; + } +} + +SegmentedJournal::RecordSubmitter::roll_segment_ertr::future<> +SegmentedJournal::RecordSubmitter::roll_segment() +{ + LOG_PREFIX(RecordSubmitter::roll_segment); + assert(is_available()); + // #1 block concurrent submissions due to rolling + wait_available_promise = seastar::shared_promise<>(); + assert(!wait_unfull_flush_promise.has_value()); + return [FNAME, this] { + if (p_current_batch->is_pending()) { + if (state == state_t::FULL) { + DEBUG("{} wait flush ...", get_name()); + wait_unfull_flush_promise = seastar::promise<>(); + return wait_unfull_flush_promise->get_future(); + } else { // IDLE/PENDING + DEBUG("{} flush", get_name()); + flush_current_batch(); + return seastar::now(); + } + } else { + assert(p_current_batch->is_empty()); + return seastar::now(); + } + }().then_wrapped([FNAME, this](auto fut) { + if (fut.failed()) { + ERROR("{} rolling is skipped unexpectedly, available", get_name()); + has_io_error = true; + wait_available_promise->set_value(); + wait_available_promise.reset(); + return roll_segment_ertr::now(); + } else { + // start rolling in background + std::ignore = segment_allocator.roll( + ).safe_then([FNAME, this] { + // good + DEBUG("{} rolling done, available", get_name()); + assert(!has_io_error); + wait_available_promise->set_value(); + wait_available_promise.reset(); + }).handle_error( + crimson::ct_error::all_same_way([FNAME, this](auto e) { + ERROR("{} got error {}, available", get_name(), e); + has_io_error = true; + wait_available_promise->set_value(); + wait_available_promise.reset(); + }) + ).handle_exception([FNAME, this](auto e) { + ERROR("{} got exception {}, available", get_name(), e); + has_io_error = true; + wait_available_promise->set_value(); + wait_available_promise.reset(); + }); + // wait for background rolling + return wait_available(); + } }); } +SegmentedJournal::RecordSubmitter::submit_ret +SegmentedJournal::RecordSubmitter::submit(record_t&& record) +{ + LOG_PREFIX(RecordSubmitter::submit); + assert(is_available()); + assert(check_action(record.size) != action_t::ROLL); + auto eval = p_current_batch->evaluate_submit( + record.size, segment_allocator.get_block_size()); + bool needs_flush = ( + state == state_t::IDLE || + eval.submit_size.get_fullness() > preferred_fullness || + // RecordBatch::needs_flush() + eval.is_full || + p_current_batch->get_num_records() + 1 >= + p_current_batch->get_batch_capacity()); + if (p_current_batch->is_empty() && + needs_flush && + state != state_t::FULL) { + // fast path with direct write + increment_io(); + auto [to_write, sizes] = p_current_batch->submit_pending_fast( + std::move(record), + segment_allocator.get_block_size(), + committed_to, + segment_allocator.get_nonce()); + DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...", + get_name(), sizes, committed_to, num_outstanding_io); + account_submission(1, sizes); + return segment_allocator.write(to_write + ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { + return record_locator_t{ + write_result.start_seq.offset.add_offset(mdlength), + write_result + }; + }).finally([this] { + decrement_io_with_flush(); + }); + } + // indirect batched write + auto write_fut = p_current_batch->add_pending( + get_name(), + std::move(record), + segment_allocator.get_block_size()); + if (needs_flush) { + if (state == state_t::FULL) { + // #2 block concurrent submissions due to lack of resource + DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...", + get_name(), + p_current_batch->get_num_records(), + num_outstanding_io); + wait_available_promise = seastar::shared_promise<>(); + assert(!wait_unfull_flush_promise.has_value()); + wait_unfull_flush_promise = seastar::promise<>(); + // flush and mark available in background + std::ignore = wait_unfull_flush_promise->get_future( + ).finally([FNAME, this] { + DEBUG("{} flush done, available", get_name()); + wait_available_promise->set_value(); + wait_available_promise.reset(); + }); + } else { + DEBUG("{} added pending, flush", get_name()); + flush_current_batch(); + } + } else { + // will flush later + DEBUG("{} added with {} pending, outstanding_io={}", + get_name(), + p_current_batch->get_num_records(), + num_outstanding_io); + assert(!p_current_batch->needs_flush()); + } + return write_fut; +} + void SegmentedJournal::RecordSubmitter::update_state() { if (num_outstanding_io == 0) { @@ -556,21 +786,32 @@ void SegmentedJournal::RecordSubmitter::decrement_io_with_flush() { LOG_PREFIX(RecordSubmitter::decrement_io_with_flush); assert(num_outstanding_io > 0); - --num_outstanding_io; -#ifndef NDEBUG auto prv_state = state; -#endif + --num_outstanding_io; update_state(); - if (wait_submit_promise.has_value()) { - DEBUG("wait resolved"); - assert(prv_state == state_t::FULL); - wait_submit_promise->set_value(); - wait_submit_promise.reset(); + if (prv_state == state_t::FULL) { + if (wait_unfull_flush_promise.has_value()) { + DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name()); + assert(!p_current_batch->is_empty()); + assert(wait_available_promise.has_value()); + flush_current_batch(); + wait_unfull_flush_promise->set_value(); + wait_unfull_flush_promise.reset(); + return; + } + } else { + assert(!wait_unfull_flush_promise.has_value()); } - if (!p_current_batch->is_empty()) { - TRACE("flush"); + auto needs_flush = ( + !p_current_batch->is_empty() && ( + state == state_t::IDLE || + p_current_batch->get_submit_size().get_fullness() > preferred_fullness || + p_current_batch->needs_flush() + )); + if (needs_flush) { + DEBUG("{} flush", get_name()); flush_current_batch(); } } @@ -606,149 +847,26 @@ void SegmentedJournal::RecordSubmitter::flush_current_batch() increment_io(); auto num = p_batch->get_num_records(); auto [to_write, sizes] = p_batch->encode_batch( - journal_committed_to, journal_segment_allocator.get_nonce()); - DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...", - num, sizes, journal_committed_to, num_outstanding_io); + committed_to, segment_allocator.get_nonce()); + DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...", + get_name(), num, sizes, committed_to, num_outstanding_io); account_submission(num, sizes); - std::ignore = journal_segment_allocator.write(to_write + std::ignore = segment_allocator.write(to_write ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { - TRACE("{} records, {}, write done with {}", num, sizes, write_result); + TRACE("{} {} records, {}, write done with {}", + get_name(), num, sizes, write_result); finish_submit_batch(p_batch, write_result); }).handle_error( crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) { - ERROR("{} records, {}, got error {}", num, sizes, e); + ERROR("{} {} records, {}, got error {}", + get_name(), num, sizes, e); finish_submit_batch(p_batch, std::nullopt); }) ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) { - ERROR("{} records, {}, got exception {}", num, sizes, e); + ERROR("{} {} records, {}, got exception {}", + get_name(), num, sizes, e); finish_submit_batch(p_batch, std::nullopt); }); } -SegmentedJournal::RecordSubmitter::submit_pending_ret -SegmentedJournal::RecordSubmitter::submit_pending( - record_t&& record, - OrderingHandle& handle, - bool flush) -{ - LOG_PREFIX(RecordSubmitter::submit_pending); - assert(!p_current_batch->is_submitting()); - stats.record_batch_stats.increment( - p_current_batch->get_num_records() + 1); - bool do_flush = (flush || state == state_t::IDLE); - auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable { - if (do_flush && p_current_batch->is_empty()) { - // fast path with direct write - increment_io(); - auto [to_write, sizes] = p_current_batch->submit_pending_fast( - std::move(record), - journal_segment_allocator.get_block_size(), - journal_committed_to, - journal_segment_allocator.get_nonce()); - DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...", - (void*)&handle, sizes, journal_committed_to, num_outstanding_io); - account_submission(1, sizes); - return journal_segment_allocator.write(to_write - ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { - return record_locator_t{ - write_result.start_seq.offset.add_offset(mdlength), - write_result - }; - }).finally([this] { - decrement_io_with_flush(); - }); - } else { - // indirect write with or without the existing pending records - auto write_fut = p_current_batch->add_pending( - std::move(record), - handle, - journal_segment_allocator.get_block_size()); - if (do_flush) { - DEBUG("H{} added pending and flush", (void*)&handle); - flush_current_batch(); - } else { - DEBUG("H{} added with {} pending", - (void*)&handle, p_current_batch->get_num_records()); - } - return write_fut; - } - }(); - return handle.enter(write_pipeline->device_submission - ).then([write_fut=std::move(write_fut)]() mutable { - return std::move(write_fut); - }).safe_then([this, FNAME, &handle](auto submit_result) { - return handle.enter(write_pipeline->finalize - ).then([this, FNAME, submit_result, &handle] { - DEBUG("H{} finish with {}", (void*)&handle, submit_result); - auto new_committed_to = submit_result.write_result.get_end_seq(); - assert(journal_committed_to == JOURNAL_SEQ_NULL || - journal_committed_to <= new_committed_to); - journal_committed_to = new_committed_to; - return submit_result; - }); - }); -} - -SegmentedJournal::RecordSubmitter::do_submit_ret -SegmentedJournal::RecordSubmitter::do_submit( - record_t&& record, - OrderingHandle& handle) -{ - LOG_PREFIX(RecordSubmitter::do_submit); - TRACE("H{} outstanding_io={}/{} ...", - (void*)&handle, num_outstanding_io, io_depth_limit); - assert(!p_current_batch->is_submitting()); - if (state <= state_t::PENDING) { - // can increment io depth - assert(!wait_submit_promise.has_value()); - auto maybe_new_size = p_current_batch->can_batch( - record, journal_segment_allocator.get_block_size()); - if (!maybe_new_size.has_value() || - (maybe_new_size->get_encoded_length() > - journal_segment_allocator.get_max_write_length())) { - TRACE("H{} flush", (void*)&handle); - assert(p_current_batch->is_pending()); - flush_current_batch(); - return do_submit(std::move(record), handle); - } else if (journal_segment_allocator.needs_roll( - maybe_new_size->get_encoded_length())) { - if (p_current_batch->is_pending()) { - TRACE("H{} flush and roll", (void*)&handle); - flush_current_batch(); - } else { - TRACE("H{} roll", (void*)&handle); - } - return journal_segment_allocator.roll( - ).safe_then([this, record=std::move(record), &handle]() mutable { - return do_submit(std::move(record), handle); - }); - } else { - bool flush = (maybe_new_size->get_fullness() > preferred_fullness ? - true : false); - return submit_pending(std::move(record), handle, flush); - } - } - - assert(state == state_t::FULL); - // cannot increment io depth - auto maybe_new_size = p_current_batch->can_batch( - record, journal_segment_allocator.get_block_size()); - if (!maybe_new_size.has_value() || - (maybe_new_size->get_encoded_length() > - journal_segment_allocator.get_max_write_length()) || - journal_segment_allocator.needs_roll( - maybe_new_size->get_encoded_length())) { - if (!wait_submit_promise.has_value()) { - wait_submit_promise = seastar::promise<>(); - } - DEBUG("H{} wait ...", (void*)&handle); - return wait_submit_promise->get_future( - ).then([this, record=std::move(record), &handle]() mutable { - return do_submit(std::move(record), handle); - }); - } else { - return submit_pending(std::move(record), handle, false); - } -} - } diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index 17e4056e71569..d8cf773e50324 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -42,22 +42,22 @@ public: submit_record_ret submit_record( record_t &&record, - OrderingHandle &handle - ) final { - return record_submitter.submit(std::move(record), handle); - } + OrderingHandle &handle) final; - seastar::future<> flush(OrderingHandle &handle) final { - return record_submitter.flush(handle); - } + seastar::future<> flush(OrderingHandle &handle) final; replay_ret replay(delta_handler_t &&delta_handler) final; - void set_write_pipeline(WritePipeline* write_pipeline) final { - record_submitter.set_write_pipeline(write_pipeline); + void set_write_pipeline(WritePipeline *_write_pipeline) final { + write_pipeline = _write_pipeline; } private: + submit_record_ret do_submit_record( + record_t &&record, + OrderingHandle &handle + ); + class RecordBatch { enum class state_t { EMPTY = 0, @@ -92,19 +92,39 @@ private: return pending.get_size(); } - // return the expected write sizes if allows to batch, - // otherwise, return nullopt - std::optional can_batch( - const record_t& record, - extent_len_t block_size) const { + std::size_t get_batch_capacity() const { + return batch_capacity; + } + + const record_group_size_t& get_submit_size() const { + assert(state != state_t::EMPTY); + return pending.size; + } + + bool needs_flush() const { assert(state != state_t::SUBMITTING); - if (pending.get_size() >= batch_capacity || - (pending.get_size() > 0 && - pending.size.get_encoded_length() > batch_flush_size)) { + assert(pending.get_size() <= batch_capacity); + if (state == state_t::EMPTY) { + return false; + } else { assert(state == state_t::PENDING); - return std::nullopt; + return (pending.get_size() >= batch_capacity || + pending.size.get_encoded_length() > batch_flush_size); } - return get_encoded_length_after(record, block_size); + } + + struct evaluation_t { + record_group_size_t submit_size; + bool is_full; + }; + evaluation_t evaluate_submit( + const record_size_t& rsize, + extent_len_t block_size) const { + assert(!needs_flush()); + auto submit_size = pending.size.get_encoded_length_after( + rsize, block_size); + bool is_full = submit_size.get_encoded_length() > batch_flush_size; + return {submit_size, is_full}; } void initialize(std::size_t i, @@ -125,8 +145,8 @@ private: using add_pending_ertr = SegmentAllocator::write_ertr; using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending( + const std::string& name, record_t&&, - OrderingHandle&, extent_len_t block_size); // Encode the batched records for write. @@ -195,6 +215,9 @@ private: } }; + using base_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + public: RecordSubmitter(std::size_t io_depth, std::size_t batch_capacity, @@ -202,6 +225,10 @@ private: double preferred_fullness, SegmentAllocator&); + const std::string& get_name() const { + return segment_allocator.get_name(); + } + grouped_io_stats get_record_batch_stats() const { return stats.record_batch_stats; } @@ -223,20 +250,45 @@ private: } journal_seq_t get_committed_to() const { - return journal_committed_to; + return committed_to; } void reset_stats() { stats = {}; } - void set_write_pipeline(WritePipeline *_write_pipeline) { - write_pipeline = _write_pipeline; - } + // whether is available to submit a record + bool is_available() const; - using submit_ret = Journal::submit_record_ret; - submit_ret submit(record_t&&, OrderingHandle&); - seastar::future<> flush(OrderingHandle &handle); + // wait for available if cannot submit, should check is_available() again + // when the future is resolved. + using wa_ertr = base_ertr; + wa_ertr::future<> wait_available(); + + // when available, check for the submit action + // according to the pending record size + enum class action_t { + ROLL, + SUBMIT_FULL, + SUBMIT_NOT_FULL + }; + action_t check_action(const record_size_t&) const; + + // when available, roll the segment if needed + using roll_segment_ertr = base_ertr; + roll_segment_ertr::future<> roll_segment(); + + // when available, submit the record if possible + using submit_ertr = base_ertr; + using submit_ret = submit_ertr::future; + submit_ret submit(record_t&&); + + void update_committed_to(const journal_seq_t& new_committed_to) { + assert(new_committed_to != JOURNAL_SEQ_NULL); + assert(committed_to == JOURNAL_SEQ_NULL || + committed_to <= new_committed_to); + committed_to = new_committed_to; + } private: void update_state(); @@ -265,32 +317,26 @@ private: void flush_current_batch(); - using submit_pending_ertr = SegmentAllocator::write_ertr; - using submit_pending_ret = submit_pending_ertr::future< - record_locator_t>; - submit_pending_ret submit_pending( - record_t&&, OrderingHandle &handle, bool flush); - - using do_submit_ret = submit_pending_ret; - do_submit_ret do_submit( - record_t&&, OrderingHandle&); - state_t state = state_t::IDLE; std::size_t num_outstanding_io = 0; std::size_t io_depth_limit; double preferred_fullness; - WritePipeline* write_pipeline = nullptr; - SegmentAllocator& journal_segment_allocator; + SegmentAllocator& segment_allocator; // committed_to may be in a previous journal segment - journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL; + journal_seq_t committed_to = JOURNAL_SEQ_NULL; std::unique_ptr batches; - std::size_t current_batch_index; // should not be nullptr after constructed RecordBatch* p_current_batch = nullptr; seastar::circular_buffer free_batch_ptrs; - std::optional > wait_submit_promise; + + // blocked for rolling or lack of resource + std::optional > wait_available_promise; + bool has_io_error = false; + // when needs flush but io depth is full, + // wait for decrement_io_with_flush() + std::optional > wait_unfull_flush_promise; struct { grouped_io_stats record_batch_stats; @@ -306,6 +352,7 @@ private: RecordSubmitter record_submitter; ExtentReader& scanner; seastar::metrics::metric_group metrics; + WritePipeline* write_pipeline = nullptr; /// read journal segment headers from scanner using find_journal_segments_ertr = crimson::errorator< -- 2.39.5