From ba454780f19bf98788e2b8382b49f3eace0781be Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 21 Oct 2021 16:01:46 +0800 Subject: [PATCH] crimson/os/seastore/journal: implement RecordSubmitter and RecordBatch To be able to batch records for write, meanwhile, still allows concurrent writes. The current change doesn't seem to impact write performance either to prefer batching or concurrent writes, so set the io_depth to 2 for demonstration. Signed-off-by: Yingxin Cheng --- src/common/options/crimson.yaml.in | 15 ++ src/crimson/os/seastore/journal.cc | 355 ++++++++++++++++++++++++++--- src/crimson/os/seastore/journal.h | 240 +++++++++++++++---- 3 files changed, 540 insertions(+), 70 deletions(-) diff --git a/src/common/options/crimson.yaml.in b/src/common/options/crimson.yaml.in index 516f8726bc5a2..bf24679cdfca2 100644 --- a/src/common/options/crimson.yaml.in +++ b/src/common/options/crimson.yaml.in @@ -45,3 +45,18 @@ options: level: dev desc: Initial number of segments for rewriting extents per device default: 6 +- name: seastore_journal_batch_capacity + type: uint + level: dev + desc: The number limit of records in a journal batch + default: 16 +- name: seastore_journal_batch_flush_size + type: size + level: dev + desc: The size threshold to force flush a journal batch + default: 16_M +- name: seastore_journal_iodepth_limit + type: uint + level: dev + desc: The io depth limit to submit journal records + default: 2 diff --git a/src/crimson/os/seastore/journal.cc b/src/crimson/os/seastore/journal.cc index f4b7d4ef03276..8a976b847d829 100644 --- a/src/crimson/os/seastore/journal.cc +++ b/src/crimson/os/seastore/journal.cc @@ -5,6 +5,7 @@ #include +#include "crimson/common/config_proxy.h" #include "crimson/os/seastore/journal.h" #include "include/intarith.h" @@ -53,38 +54,16 @@ Journal::Journal( SegmentManager& segment_manager, ExtentReader& scanner) : journal_segment_manager(segment_manager), + record_submitter(crimson::common::get_conf( + "seastore_journal_iodepth_limit"), + crimson::common::get_conf( + "seastore_journal_batch_capacity"), + crimson::common::get_conf( + "seastore_journal_batch_flush_size"), + journal_segment_manager), scanner(scanner) {} -Journal::write_record_ret Journal::write_record( - record_size_t rsize, - record_t &&record, - OrderingHandle &handle) -{ - ceph::bufferlist to_write = encode_record( - rsize, - std::move(record), - journal_segment_manager.get_block_size(), - journal_segment_manager.get_committed_to(), - journal_segment_manager.get_nonce()); - // Start write under the current exclusive stage, but wait for it - // in the device_submission concurrent stage to permit multiple - // overlapping writes. - auto write_fut = journal_segment_manager.write(to_write); - return handle.enter(write_pipeline->device_submission - ).then([write_fut = std::move(write_fut)]() mutable { - return std::move(write_fut); - }).safe_then([this, &handle, rsize](journal_seq_t write_start) { - return handle.enter(write_pipeline->finalize - ).then([this, write_start, rsize] { - auto committed_to = write_start; - committed_to.offset.offset += (rsize.mdlength + rsize.dlength); - journal_segment_manager.mark_committed(committed_to); - return write_start.offset; - }); - }); -} - Journal::prep_replay_segments_fut Journal::prep_replay_segments( std::vector> segments) @@ -413,4 +392,322 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment) }); } +Journal::RecordBatch::add_pending_ret +Journal::RecordBatch::add_pending( + record_t&& record, + const record_size_t& rsize) +{ + logger().debug( + "Journal::RecordBatch::add_pending: batches={}, write_size={}", + records.size() + 1, + get_encoded_length(rsize)); + assert(state != state_t::SUBMITTING); + assert(can_batch(rsize)); + + auto record_start_offset = encoded_length; + records.push_back(std::move(record)); + record_sizes.push_back(rsize); + auto new_encoded_length = get_encoded_length(rsize); + assert(new_encoded_length < MAX_SEG_OFF); + encoded_length = new_encoded_length; + if (state == state_t::EMPTY) { + assert(!io_promise.has_value()); + io_promise = seastar::shared_promise >(); + } else { + assert(io_promise.has_value()); + } + state = state_t::PENDING; + + return io_promise->get_shared_future( + ).then([record_start_offset + ](auto batch_write_start) -> add_pending_ret { + if (!batch_write_start.has_value()) { + return crimson::ct_error::input_output_error::make(); + } + auto record_write_start = batch_write_start.value(); + record_write_start.offset.offset += record_start_offset; + return add_pending_ret( + add_pending_ertr::ready_future_marker{}, + record_write_start); + }); +} + +ceph::bufferlist Journal::RecordBatch::encode_records( + size_t block_size, + segment_off_t committed_to, + segment_nonce_t segment_nonce) +{ + logger().debug( + "Journal::RecordBatch::encode_records: batches={}, committed_to={}", + records.size(), + committed_to); + assert(state == state_t::PENDING); + assert(records.size()); + assert(records.size() == record_sizes.size()); + assert(io_promise.has_value()); + + state = state_t::SUBMITTING; + ceph::bufferlist bl; + std::size_t i = 0; + do { + auto record_bl = encode_record( + record_sizes[i], + std::move(records[i]), + block_size, + committed_to, + segment_nonce); + bl.claim_append(record_bl); + } while ((++i) < records.size()); + assert(bl.length() == (std::size_t)encoded_length); + return bl; +} + +void Journal::RecordBatch::set_result( + std::optional batch_write_start) +{ + if (batch_write_start.has_value()) { + logger().debug( + "Journal::RecordBatch::set_result: batches={}, write_start {} => {}", + records.size(), + *batch_write_start, + batch_write_start->offset.offset + encoded_length); + } else { + logger().error( + "Journal::RecordBatch::set_result: batches={}, write is failed!", + records.size()); + } + assert(state == state_t::SUBMITTING); + assert(io_promise.has_value()); + + state = state_t::EMPTY; + encoded_length = 0; + records.clear(); + record_sizes.clear(); + io_promise->set_value(batch_write_start); + io_promise.reset(); +} + +ceph::bufferlist Journal::RecordBatch::submit_pending_fast( + record_t&& record, + const record_size_t& rsize, + size_t block_size, + segment_off_t committed_to, + segment_nonce_t segment_nonce) +{ + logger().debug( + "Journal::RecordBatch::submit_pending_fast: write_size={}", + get_encoded_length(rsize)); + assert(state == state_t::EMPTY); + assert(can_batch(rsize)); + + auto bl = encode_record( + rsize, + std::move(record), + block_size, + committed_to, + segment_nonce); + assert(bl.length() == get_encoded_length(rsize)); + return bl; +} + +Journal::RecordSubmitter::RecordSubmitter( + std::size_t io_depth, + std::size_t batch_capacity, + std::size_t batch_flush_size, + JournalSegmentManager& jsm) + : io_depth_limit{io_depth}, + journal_segment_manager{jsm}, + batches(new RecordBatch[io_depth + 1]) +{ + logger().info("Journal::RecordSubmitter: io_depth_limit={}, " + "batch_capacity={}, batch_flush_size={}", + io_depth, batch_capacity, batch_flush_size); + assert(io_depth > 0); + free_batch_ptrs.reserve(io_depth + 1); + for (std::size_t i = 0; i <= io_depth; ++i) { + batches[i].initialize(i, batch_capacity, batch_flush_size); + free_batch_ptrs.push_back(&batches[i]); + } + pop_free_batch(); +} + +Journal::RecordSubmitter::submit_ret +Journal::RecordSubmitter::submit( + record_t&& record, + OrderingHandle& handle) +{ + assert(write_pipeline); + auto rsize = get_encoded_record_length( + record, journal_segment_manager.get_block_size()); + auto total = rsize.mdlength + rsize.dlength; + auto max_record_length = journal_segment_manager.get_max_write_length(); + if (total > max_record_length) { + logger().error( + "Journal::RecordSubmitter::submit: record size {} exceeds max {}", + total, + max_record_length + ); + return crimson::ct_error::erange::make(); + } + + return do_submit(std::move(record), rsize, handle); +} + +void Journal::RecordSubmitter::update_state() +{ + if (num_outstanding_io == 0) { + state = state_t::IDLE; + } else if (num_outstanding_io < io_depth_limit) { + state = state_t::PENDING; + } else if (num_outstanding_io == io_depth_limit) { + state = state_t::FULL; + } else { + ceph_abort("fatal error: io-depth overflow"); + } +} + +void Journal::RecordSubmitter::finish_submit_batch( + RecordBatch* p_batch, + std::optional result) +{ + assert(p_batch->is_submitting()); + p_batch->set_result(result); + free_batch_ptrs.push_back(p_batch); + decrement_io_with_flush(); +} + +void Journal::RecordSubmitter::flush_current_batch() +{ + RecordBatch* p_batch = p_current_batch; + assert(p_batch->is_pending()); + p_current_batch = nullptr; + pop_free_batch(); + + increment_io(); + ceph::bufferlist to_write = p_batch->encode_records( + journal_segment_manager.get_block_size(), + journal_segment_manager.get_committed_to(), + journal_segment_manager.get_nonce()); + std::ignore = journal_segment_manager.write(to_write + ).safe_then([this, p_batch](journal_seq_t write_start) { + finish_submit_batch(p_batch, write_start); + }).handle_error( + crimson::ct_error::all_same_way([this, p_batch](auto e) { + logger().error( + "Journal::RecordSubmitter::flush_current_batch: got error {}", + e); + finish_submit_batch(p_batch, std::nullopt); + }) + ).handle_exception([this, p_batch](auto e) { + logger().error( + "Journal::RecordSubmitter::flush_current_batch: got exception {}", + e); + finish_submit_batch(p_batch, std::nullopt); + }); +} + +seastar::future> +Journal::RecordSubmitter::mark_record_committed_in_order( + OrderingHandle& handle, + const journal_seq_t& write_start_seq, + const record_size_t& rsize) +{ + return handle.enter(write_pipeline->finalize + ).then([this, write_start_seq, rsize] { + auto committed_to = write_start_seq; + committed_to.offset.offset += (rsize.mdlength + rsize.dlength); + journal_segment_manager.mark_committed(committed_to); + return std::make_pair( + write_start_seq.offset.add_offset(rsize.mdlength), + write_start_seq); + }); +} + +Journal::RecordSubmitter::submit_pending_ret +Journal::RecordSubmitter::submit_pending( + record_t&& record, + const record_size_t& rsize, + OrderingHandle& handle, + bool flush) +{ + assert(!p_current_batch->is_submitting()); + auto write_fut = [this, flush, record=std::move(record), &rsize]() mutable { + if (flush && p_current_batch->is_empty()) { + // fast path with direct write + increment_io(); + ceph::bufferlist to_write = p_current_batch->submit_pending_fast( + std::move(record), + rsize, + journal_segment_manager.get_block_size(), + journal_segment_manager.get_committed_to(), + journal_segment_manager.get_nonce()); + return journal_segment_manager.write(to_write + ).finally([this] { + decrement_io_with_flush(); + }); + } else { + // indirect write with or without the existing pending records + auto write_fut = p_current_batch->add_pending( + std::move(record), rsize); + if (flush) { + flush_current_batch(); + } + return write_fut; + } + }(); + return handle.enter(write_pipeline->device_submission + ).then([write_fut=std::move(write_fut)]() mutable { + return std::move(write_fut); + }).safe_then([this, &handle, rsize](journal_seq_t write_start) { + return mark_record_committed_in_order(handle, write_start, rsize); + }); +} + +Journal::RecordSubmitter::do_submit_ret +Journal::RecordSubmitter::do_submit( + record_t&& record, + const record_size_t& rsize, + OrderingHandle& handle) +{ + assert(!p_current_batch->is_submitting()); + if (state <= state_t::PENDING) { + // can increment io depth + assert(!wait_submit_promise.has_value()); + auto batched_size = p_current_batch->can_batch(rsize); + if (batched_size == 0 || + batched_size > journal_segment_manager.get_max_write_length()) { + assert(p_current_batch->is_pending()); + flush_current_batch(); + return do_submit(std::move(record), rsize, handle); + } else if (journal_segment_manager.needs_roll(batched_size)) { + if (p_current_batch->is_pending()) { + flush_current_batch(); + } + return journal_segment_manager.roll( + ).safe_then([this, record=std::move(record), rsize, &handle]() mutable { + return do_submit(std::move(record), rsize, handle); + }); + } else { + return submit_pending(std::move(record), rsize, handle, true); + } + } + + assert(state == state_t::FULL); + // cannot increment io depth + auto batched_size = p_current_batch->can_batch(rsize); + if (batched_size == 0 || + batched_size > journal_segment_manager.get_max_write_length() || + journal_segment_manager.needs_roll(batched_size)) { + if (!wait_submit_promise.has_value()) { + wait_submit_promise = seastar::promise<>(); + } + return wait_submit_promise->get_future( + ).then([this, record=std::move(record), rsize, &handle]() mutable { + return do_submit(std::move(record), rsize, handle); + }); + } else { + return submit_pending(std::move(record), rsize, handle, false); + } +} + } diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index 064020dcc1c5b..524b1977bc342 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -6,7 +6,9 @@ #include #include +#include #include +#include #include "include/ceph_assert.h" #include "include/buffer.h" @@ -92,35 +94,7 @@ public: record_t &&record, OrderingHandle &handle ) { - assert(write_pipeline); - auto rsize = get_encoded_record_length( - record, journal_segment_manager.get_block_size()); - auto total = rsize.mdlength + rsize.dlength; - auto max_record_length = journal_segment_manager.get_max_write_length(); - if (total > max_record_length) { - auto &logger = crimson::get_logger(ceph_subsys_seastore); - logger.error( - "Journal::submit_record: record size {} exceeds max {}", - total, - max_record_length - ); - return crimson::ct_error::erange::make(); - } - auto roll = journal_segment_manager.needs_roll(total) - ? journal_segment_manager.roll() - : JournalSegmentManager::roll_ertr::now(); - return roll.safe_then( - [this, rsize, record=std::move(record), &handle]() mutable { - auto seq = journal_segment_manager.get_segment_seq(); - return write_record( - rsize, std::move(record), - handle - ).safe_then([rsize, seq](auto addr) { - return std::make_pair( - addr.add_offset(rsize.mdlength), - journal_seq_t{seq, addr}); - }); - }); + return record_submitter.submit(std::move(record), handle); } /** @@ -139,8 +113,8 @@ public: std::vector>&& segment_headers, delta_handler_t &&delta_handler); - void set_write_pipeline(WritePipeline *_write_pipeline) { - write_pipeline = _write_pipeline; + void set_write_pipeline(WritePipeline* write_pipeline) { + record_submitter.set_write_pipeline(write_pipeline); } private: @@ -245,19 +219,203 @@ private: journal_seq_t committed_to; }; + class RecordBatch { + enum class state_t { + EMPTY = 0, + PENDING, + SUBMITTING + }; + + public: + RecordBatch() = default; + RecordBatch(RecordBatch&&) = delete; + RecordBatch(const RecordBatch&) = delete; + RecordBatch& operator=(RecordBatch&&) = delete; + RecordBatch& operator=(const RecordBatch&) = delete; + + bool is_empty() const { + return state == state_t::EMPTY; + } + + bool is_pending() const { + return state == state_t::PENDING; + } + + bool is_submitting() const { + return state == state_t::SUBMITTING; + } + + std::size_t get_index() const { + return index; + } + + std::size_t get_num_records() const { + return records.size(); + } + + // return the expected write size if allows to batch, + // otherwise, return 0 + std::size_t can_batch(const record_size_t& rsize) const { + assert(state != state_t::SUBMITTING); + if (records.size() >= batch_capacity || + static_cast(encoded_length) > batch_flush_size) { + assert(state == state_t::PENDING); + return 0; + } + return get_encoded_length(rsize); + } + + void initialize(std::size_t i, + std::size_t _batch_capacity, + std::size_t _batch_flush_size) { + ceph_assert(_batch_capacity > 0); + index = i; + batch_capacity = _batch_capacity; + batch_flush_size = _batch_flush_size; + records.reserve(batch_capacity); + record_sizes.reserve(batch_capacity); + } + + // Add to the batch, the future will be resolved after the batch is + // written. + using add_pending_ertr = JournalSegmentManager::write_ertr; + using add_pending_ret = JournalSegmentManager::write_ret; + add_pending_ret add_pending(record_t&&, const record_size_t&); + + // Encode the batched records for write. + ceph::bufferlist encode_records( + size_t block_size, + segment_off_t committed_to, + segment_nonce_t segment_nonce); + + // Set the write result and reset for reuse + void set_result(std::optional batch_write_start); + + // The fast path that is equivalent to submit a single record as a batch. + // + // Essentially, equivalent to the combined logic of: + // add_pending(), encode_records() and set_result() above without + // the intervention of the shared io_promise. + // + // Note the current RecordBatch can be reused afterwards. + ceph::bufferlist submit_pending_fast( + record_t&&, + const record_size_t&, + size_t block_size, + segment_off_t committed_to, + segment_nonce_t segment_nonce); + + private: + std::size_t get_encoded_length(const record_size_t& rsize) const { + auto ret = encoded_length + rsize.mdlength + rsize.dlength; + assert(ret > 0); + return ret; + } + + state_t state = state_t::EMPTY; + std::size_t index = 0; + std::size_t batch_capacity = 0; + std::size_t batch_flush_size = 0; + segment_off_t encoded_length = 0; + std::vector records; + std::vector record_sizes; + std::optional > > io_promise; + }; + + class RecordSubmitter { + enum class state_t { + IDLE = 0, // outstanding_io == 0 + PENDING, // outstanding_io < io_depth_limit + FULL // outstanding_io == io_depth_limit + // OVERFLOW: outstanding_io > io_depth_limit is impossible + }; + + public: + RecordSubmitter(std::size_t io_depth, + std::size_t batch_capacity, + std::size_t batch_flush_size, + JournalSegmentManager&); + + void set_write_pipeline(WritePipeline *_write_pipeline) { + write_pipeline = _write_pipeline; + } + + using submit_ret = Journal::submit_record_ret; + submit_ret submit(record_t&&, OrderingHandle&); + + private: + void update_state(); + + void increment_io() { + ++num_outstanding_io; + update_state(); + } + + void decrement_io_with_flush() { + assert(num_outstanding_io > 0); + --num_outstanding_io; +#ifndef NDEBUG + auto prv_state = state; +#endif + update_state(); + + if (wait_submit_promise.has_value()) { + assert(prv_state == state_t::FULL); + wait_submit_promise->set_value(); + wait_submit_promise.reset(); + } + + if (!p_current_batch->is_empty()) { + flush_current_batch(); + } + } + + void pop_free_batch() { + assert(p_current_batch == nullptr); + assert(!free_batch_ptrs.empty()); + p_current_batch = free_batch_ptrs.front(); + assert(p_current_batch->is_empty()); + assert(p_current_batch == &batches[p_current_batch->get_index()]); + free_batch_ptrs.pop_front(); + } + + void finish_submit_batch(RecordBatch*, std::optional); + + void flush_current_batch(); + + seastar::future> + mark_record_committed_in_order( + OrderingHandle&, const journal_seq_t&, const record_size_t&); + + using submit_pending_ertr = JournalSegmentManager::write_ertr; + using submit_pending_ret = submit_pending_ertr::future< + std::pair >; + submit_pending_ret submit_pending( + record_t&&, const record_size_t&, OrderingHandle &handle, bool flush); + + using do_submit_ret = submit_pending_ret; + do_submit_ret do_submit( + record_t&&, const record_size_t&, OrderingHandle&); + + state_t state = state_t::IDLE; + std::size_t num_outstanding_io = 0; + std::size_t io_depth_limit; + + WritePipeline* write_pipeline = nullptr; + JournalSegmentManager& journal_segment_manager; + std::unique_ptr batches; + std::size_t current_batch_index; + // should not be nullptr after constructed + RecordBatch* p_current_batch = nullptr; + seastar::circular_buffer free_batch_ptrs; + std::optional > wait_submit_promise; + }; + SegmentProvider* segment_provider = nullptr; JournalSegmentManager journal_segment_manager; + RecordSubmitter record_submitter; ExtentReader& scanner; - WritePipeline *write_pipeline = nullptr; - - /// do record write - using write_record_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - using write_record_ret = write_record_ertr::future; - write_record_ret write_record( - record_size_t rsize, - record_t &&record, - OrderingHandle &handle); /// return ordered vector of segments to replay using replay_segments_t = std::vector< -- 2.39.5