From bba25dbce6491dcc80887ac70e012616c9934c47 Mon Sep 17 00:00:00 2001 From: myoungwon oh Date: Wed, 5 Apr 2023 04:37:29 +0000 Subject: [PATCH] crimson/os/seastore/journal: introduce JournalAllocator to generalize SegmentAllocator Signed-off-by: Myoungwon Oh --- .../os/seastore/journal/record_submitter.cc | 45 +++++++------- .../os/seastore/journal/record_submitter.h | 41 ++++++++++-- .../os/seastore/journal/segment_allocator.h | 62 +++++++++---------- 3 files changed, 86 insertions(+), 62 deletions(-) diff --git a/src/crimson/os/seastore/journal/record_submitter.cc b/src/crimson/os/seastore/journal/record_submitter.cc index 09cac5b2671..5ca53b436d5 100644 --- a/src/crimson/os/seastore/journal/record_submitter.cc +++ b/src/crimson/os/seastore/journal/record_submitter.cc @@ -128,10 +128,10 @@ RecordSubmitter::RecordSubmitter( std::size_t batch_capacity, std::size_t batch_flush_size, double preferred_fullness, - SegmentAllocator& sa) + JournalAllocator& ja) : io_depth_limit{io_depth}, preferred_fullness{preferred_fullness}, - segment_allocator{sa}, + journal_allocator{ja}, batches(new RecordBatch[io_depth + 1]) { LOG_PREFIX(RecordSubmitter); @@ -158,7 +158,7 @@ bool RecordSubmitter::is_available() const #ifndef NDEBUG if (ret) { // unconditional invariants - ceph_assert(segment_allocator.can_write()); + ceph_assert(journal_allocator.can_write()); ceph_assert(p_current_batch != nullptr); ceph_assert(!p_current_batch->is_submitting()); // the current batch accepts a further write @@ -166,7 +166,7 @@ bool RecordSubmitter::is_available() const if (!p_current_batch->is_empty()) { auto submit_length = p_current_batch->get_submit_size().get_encoded_length(); - ceph_assert(!segment_allocator.needs_roll(submit_length)); + ceph_assert(!journal_allocator.needs_roll(submit_length)); } // I'm not rolling } @@ -199,8 +199,8 @@ RecordSubmitter::check_action( { assert(is_available()); auto eval = p_current_batch->evaluate_submit( - rsize, segment_allocator.get_block_size()); - if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) { + rsize, journal_allocator.get_block_size()); + if (journal_allocator.needs_roll(eval.submit_size.get_encoded_length())) { return action_t::ROLL; } else if (eval.is_full) { return action_t::SUBMIT_FULL; @@ -242,7 +242,7 @@ RecordSubmitter::roll_segment() return roll_segment_ertr::now(); } else { // start rolling in background - std::ignore = segment_allocator.roll( + std::ignore = journal_allocator.roll( ).safe_then([FNAME, this] { // good DEBUG("{} rolling done, available", get_name()); @@ -276,12 +276,9 @@ RecordSubmitter::submit( LOG_PREFIX(RecordSubmitter::submit); ceph_assert(is_available()); assert(check_action(record.size) != action_t::ROLL); - segment_allocator.get_provider().update_modify_time( - segment_allocator.get_segment_id(), - record.modify_time, - record.extents.size()); + journal_allocator.update_modify_time(record); auto eval = p_current_batch->evaluate_submit( - record.size, segment_allocator.get_block_size()); + record.size, journal_allocator.get_block_size()); bool needs_flush = ( state == state_t::IDLE || eval.submit_size.get_fullness() > preferred_fullness || @@ -296,13 +293,13 @@ RecordSubmitter::submit( increment_io(); auto [to_write, sizes] = p_current_batch->submit_pending_fast( std::move(record), - segment_allocator.get_block_size(), - committed_to, - segment_allocator.get_nonce()); + journal_allocator.get_block_size(), + get_committed_to(), + journal_allocator.get_nonce()); DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...", - get_name(), sizes, committed_to, num_outstanding_io); + get_name(), sizes, get_committed_to(), num_outstanding_io); account_submission(1, sizes); - return segment_allocator.write(std::move(to_write) + return journal_allocator.write(std::move(to_write) ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { return record_locator_t{ write_result.start_seq.offset.add_offset(mdlength), @@ -316,7 +313,7 @@ RecordSubmitter::submit( auto write_fut = p_current_batch->add_pending( get_name(), std::move(record), - segment_allocator.get_block_size()); + journal_allocator.get_block_size()); if (needs_flush) { if (state == state_t::FULL) { // #2 block concurrent submissions due to lack of resource @@ -358,7 +355,7 @@ RecordSubmitter::submit( RecordSubmitter::open_ret RecordSubmitter::open(bool is_mkfs) { - return segment_allocator.open(is_mkfs + return journal_allocator.open(is_mkfs ).safe_then([this](journal_seq_t ret) { LOG_PREFIX(RecordSubmitter::open); DEBUG("{} register metrics", get_name()); @@ -420,16 +417,16 @@ RecordSubmitter::open(bool is_mkfs) RecordSubmitter::close_ertr::future<> RecordSubmitter::close() { + committed_to = JOURNAL_SEQ_NULL; ceph_assert(state == state_t::IDLE); ceph_assert(num_outstanding_io == 0); - committed_to = JOURNAL_SEQ_NULL; ceph_assert(p_current_batch != nullptr); ceph_assert(p_current_batch->is_empty()); ceph_assert(!wait_available_promise.has_value()); has_io_error = false; ceph_assert(!wait_unfull_flush_promise.has_value()); metrics.clear(); - return segment_allocator.close(); + return journal_allocator.close(); } void RecordSubmitter::update_state() @@ -511,11 +508,11 @@ void RecordSubmitter::flush_current_batch() increment_io(); auto num = p_batch->get_num_records(); auto [to_write, sizes] = p_batch->encode_batch( - committed_to, segment_allocator.get_nonce()); + get_committed_to(), journal_allocator.get_nonce()); DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...", - get_name(), num, sizes, committed_to, num_outstanding_io); + get_name(), num, sizes, get_committed_to(), num_outstanding_io); account_submission(num, sizes); - std::ignore = segment_allocator.write(std::move(to_write) + std::ignore = journal_allocator.write(std::move(to_write) ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { TRACE("{} {} records, {}, write done with {}", get_name(), num, sizes, write_result); diff --git a/src/crimson/os/seastore/journal/record_submitter.h b/src/crimson/os/seastore/journal/record_submitter.h index 118d5a7b5d7..eedd2dd8cfd 100644 --- a/src/crimson/os/seastore/journal/record_submitter.h +++ b/src/crimson/os/seastore/journal/record_submitter.h @@ -13,7 +13,6 @@ #include "crimson/common/errorator.h" #include "crimson/os/seastore/segment_manager_group.h" #include "crimson/os/seastore/segment_seq_allocator.h" -#include "crimson/os/seastore/journal/segment_allocator.h" namespace crimson::os::seastore { class SegmentProvider; @@ -22,6 +21,38 @@ namespace crimson::os::seastore { namespace crimson::os::seastore::journal { +class JournalAllocator { +public: + using base_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + virtual const std::string& get_name() const = 0; + + virtual void update_modify_time(record_t& record) = 0; + + virtual extent_len_t get_block_size() const = 0; + + using close_ertr = base_ertr; + virtual close_ertr::future<> close() = 0; + + virtual segment_nonce_t get_nonce() const = 0; + + using write_ertr = base_ertr; + using write_ret = write_ertr::future; + virtual write_ret write(ceph::bufferlist&& to_write) = 0; + + virtual bool can_write() const = 0; + + using roll_ertr = base_ertr; + virtual roll_ertr::future<> roll() = 0; + + virtual bool needs_roll(std::size_t length) const = 0; + + using open_ertr = base_ertr; + using open_ret = open_ertr::future; + virtual open_ret open(bool is_mkfs) = 0; + +}; + /** * RecordBatch * @@ -111,7 +142,7 @@ public: // // Set write_result_t::write_length to 0 if the record is not the first one // in the batch. - using add_pending_ertr = SegmentAllocator::write_ertr; + using add_pending_ertr = JournalAllocator::write_ertr; using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending( const std::string& name, @@ -204,10 +235,10 @@ public: std::size_t batch_capacity, std::size_t batch_flush_size, double preferred_fullness, - SegmentAllocator&); + JournalAllocator&); const std::string& get_name() const { - return segment_allocator.get_name(); + return journal_allocator.get_name(); } journal_seq_t get_committed_to() const { @@ -287,7 +318,7 @@ private: std::size_t io_depth_limit; double preferred_fullness; - SegmentAllocator& segment_allocator; + JournalAllocator& journal_allocator; // committed_to may be in a previous journal segment journal_seq_t committed_to = JOURNAL_SEQ_NULL; diff --git a/src/crimson/os/seastore/journal/segment_allocator.h b/src/crimson/os/seastore/journal/segment_allocator.h index 8cab895f8c7..292c23070ba 100644 --- a/src/crimson/os/seastore/journal/segment_allocator.h +++ b/src/crimson/os/seastore/journal/segment_allocator.h @@ -13,6 +13,8 @@ #include "crimson/common/errorator.h" #include "crimson/os/seastore/segment_manager_group.h" #include "crimson/os/seastore/segment_seq_allocator.h" +#include "crimson/os/seastore/journal/record_submitter.h" +#include "crimson/os/seastore/async_cleaner.h" namespace crimson::os::seastore { class SegmentProvider; @@ -26,27 +28,19 @@ namespace crimson::os::seastore::journal { * * Maintain an available segment for writes. */ -class SegmentAllocator { - using base_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; +class SegmentAllocator : public JournalAllocator { public: + // SegmentAllocator specific methods SegmentAllocator(JournalTrimmer *trimmer, data_category_t category, rewrite_gen_t gen, SegmentProvider &sp, SegmentSeqAllocator &ssa); - const std::string& get_name() const { - return print_name; - } - - SegmentProvider &get_provider() { - return segment_provider; - } - - extent_len_t get_block_size() const { - return sm_group.get_block_size(); + segment_id_t get_segment_id() const { + assert(can_write()); + return current_segment->get_segment_id(); } extent_len_t get_max_write_length() const { @@ -55,27 +49,27 @@ class SegmentAllocator { sm_group.get_rounded_tail_length(); } - bool can_write() const { - return !!current_segment; + public: + // overriding methods + const std::string& get_name() const final { + return print_name; } - segment_id_t get_segment_id() const { - assert(can_write()); - return current_segment->get_segment_id(); + extent_len_t get_block_size() const final { + return sm_group.get_block_size(); } - segment_nonce_t get_nonce() const { - assert(can_write()); - return current_segment_nonce; + bool can_write() const final { + return !!current_segment; } - segment_off_t get_written_to() const { + segment_nonce_t get_nonce() const final { assert(can_write()); - return written_to; + return current_segment_nonce; } // returns true iff the current segment has insufficient space - bool needs_roll(std::size_t length) const { + bool needs_roll(std::size_t length) const final { assert(can_write()); assert(current_segment->get_write_capacity() == sm_group.get_segment_size()); @@ -85,24 +79,26 @@ class SegmentAllocator { } // open for write and generate the correct print name - using open_ertr = base_ertr; - using open_ret = open_ertr::future; - open_ret open(bool is_mkfs); + open_ret open(bool is_mkfs) final; // close the current segment and initialize next one - using roll_ertr = base_ertr; - roll_ertr::future<> roll(); + roll_ertr::future<> roll() final; // write the buffer, return the write result // // May be called concurrently, but writes may complete in any order. // If rolling/opening, no write is allowed. - using write_ertr = base_ertr; - using write_ret = write_ertr::future; - write_ret write(ceph::bufferlist&& to_write); + write_ret write(ceph::bufferlist&& to_write) final; using close_ertr = base_ertr; - close_ertr::future<> close(); + close_ertr::future<> close() final; + + void update_modify_time(record_t& record) final { + segment_provider.update_modify_time( + get_segment_id(), + record.modify_time, + record.extents.size()); + } private: open_ret do_open(bool is_mkfs); -- 2.39.5