From 73a7ecfe2653d5539cc1395af7f2d7a2bc87ac5c Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 16 Jul 2024 14:36:22 +0800 Subject: [PATCH] crimson/os/seastore/record_submitter: refactor to make write base available Make sure the write base is available upon RecordSubmitter::submit(). Signed-off-by: Yingxin Cheng --- .../os/seastore/journal/record_submitter.cc | 35 ++++++++++++++----- .../os/seastore/journal/record_submitter.h | 17 +++++++-- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/src/crimson/os/seastore/journal/record_submitter.cc b/src/crimson/os/seastore/journal/record_submitter.cc index 8d71cb994a321..e5fa90b4b22b0 100644 --- a/src/crimson/os/seastore/journal/record_submitter.cc +++ b/src/crimson/os/seastore/journal/record_submitter.cc @@ -17,7 +17,8 @@ RecordBatch::add_pending_ret RecordBatch::add_pending( const std::string& name, record_t&& record, - extent_len_t block_size) + extent_len_t block_size, + std::optional maybe_write_base) { LOG_PREFIX(RecordBatch::add_pending); auto new_size = get_encoded_length_after(record, block_size); @@ -36,10 +37,14 @@ RecordBatch::add_pending( if (state == state_t::EMPTY) { assert(!io_promise.has_value()); io_promise = seastar::shared_promise(); + assert(maybe_write_base.has_value()); + assert(!write_base.has_value()); + write_base = maybe_write_base; } else { assert(io_promise.has_value()); } state = state_t::PENDING; + assert(write_base.has_value()); return io_promise->get_shared_future( ).then([dlength_offset, FNAME, &name @@ -61,22 +66,25 @@ RecordBatch::add_pending( }); } -ceph::bufferlist RecordBatch::encode_batch( +RecordBatch::encode_ret_t RecordBatch::encode_batch( const journal_seq_t& committed_to, segment_nonce_t segment_nonce) { assert(state == state_t::PENDING); assert(pending.get_size() > 0); assert(io_promise.has_value()); + assert(write_base.has_value()); state = state_t::SUBMITTING; + auto _write_base = *write_base; + write_base.reset(); submitting_size = pending.get_size(); submitting_length = pending.size.get_encoded_length(); submitting_mdlength = pending.size.get_mdlength(); auto bl = encode_records(pending, committed_to, segment_nonce); // Note: pending is cleared here assert(bl.length() == submitting_length); - return bl; + return {_write_base, std::move(bl)}; } void RecordBatch::set_result( @@ -92,6 +100,7 @@ void RecordBatch::set_result( } assert(state == state_t::SUBMITTING); assert(io_promise.has_value()); + assert(!write_base.has_value()); state = state_t::EMPTY; submitting_size = 0; @@ -325,10 +334,19 @@ RecordSubmitter::submit( }); } // indirect batched write + std::optional maybe_write_base; + if (p_current_batch->is_empty()) { + maybe_write_base = journal_allocator.get_written_to(); + } else { + assert(p_current_batch->get_write_base().has_value()); + assert(*p_current_batch->get_write_base() == + journal_allocator.get_written_to()); + } auto write_fut = p_current_batch->add_pending( get_name(), std::move(record), - journal_allocator.get_block_size()); + journal_allocator.get_block_size(), + maybe_write_base); if (needs_flush) { if (state == state_t::FULL) { // #2 block concurrent submissions due to lack of resource @@ -531,15 +549,16 @@ void RecordSubmitter::flush_current_batch() account_submission(rg); assert(stats.record_batch_stats.num_io == stats.io_depth_stats.num_io); - auto to_write = p_batch->encode_batch( + auto encode_ret = p_batch->encode_batch( get_committed_to(), journal_allocator.get_nonce()); // Note: rg is cleared DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...", get_name(), num, sizes, get_committed_to(), num_outstanding_io); + assert(encode_ret.write_base == journal_allocator.get_written_to()); write_result_t result{ - journal_allocator.get_written_to(), - to_write.length()}; - std::ignore = journal_allocator.write(std::move(to_write) + encode_ret.write_base, + encode_ret.bl.length()}; + std::ignore = journal_allocator.write(std::move(encode_ret.bl) ).safe_then([this, p_batch, FNAME, num, sizes, result] { TRACE("{} {} records, {}, write done with {}", get_name(), num, sizes, result); diff --git a/src/crimson/os/seastore/journal/record_submitter.h b/src/crimson/os/seastore/journal/record_submitter.h index 62047f6bdcd76..4b38b29207387 100644 --- a/src/crimson/os/seastore/journal/record_submitter.h +++ b/src/crimson/os/seastore/journal/record_submitter.h @@ -102,6 +102,10 @@ public: return pending.size; } + std::optional get_write_base() const { + return write_base; + } + bool needs_flush() const { assert(state != state_t::SUBMITTING); assert(pending.get_size() <= batch_capacity); @@ -147,15 +151,22 @@ public: // // Set write_result_t::write_length to 0 if the record is not the first one // in the batch. + // + // write_base must be assigned when the state is empty using add_pending_ertr = JournalAllocator::write_ertr; using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending( const std::string& name, record_t&&, - extent_len_t block_size); + extent_len_t block_size, + std::optional maybe_write_base); // Encode the batched records for write. - ceph::bufferlist encode_batch( + struct encode_ret_t { + journal_seq_t write_base; + ceph::bufferlist bl; + }; + encode_ret_t encode_batch( const journal_seq_t& committed_to, segment_nonce_t segment_nonce); @@ -188,6 +199,8 @@ private: std::size_t index = 0; std::size_t batch_capacity = 0; std::size_t batch_flush_size = 0; + // Valid at state_t::PENDING + std::optional write_base; record_group_t pending; std::size_t submitting_size = 0; -- 2.39.5