From: Yingxin Cheng Date: Fri, 20 Dec 2024 08:23:26 +0000 (+0800) Subject: crimson/os/seastore/journal: hide RBM specific finish_commit() X-Git-Tag: v20.0.0~477^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4a08c37da5603d4c65033bf33603d5f402f1de81;p=ceph.git crimson/os/seastore/journal: hide RBM specific finish_commit() By introducing a callback upon completing submission. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/journal.h b/src/crimson/os/seastore/journal.h index a5c9029c43cb..298935bd22e2 100644 --- a/src/crimson/os/seastore/journal.h +++ b/src/crimson/os/seastore/journal.h @@ -59,13 +59,13 @@ public: crimson::ct_error::erange, crimson::ct_error::input_output_error >; - using submit_record_ret = submit_record_ertr::future< - record_locator_t - >; - virtual submit_record_ret submit_record( + using on_submission_func_t = std::function< + void(record_locator_t)>; + virtual submit_record_ertr::future<> submit_record( record_t &&record, - OrderingHandle &handle - ) = 0; + OrderingHandle &handle, + transaction_type_t t_src, + on_submission_func_t &&on_submission) = 0; /** * flush @@ -101,9 +101,6 @@ public: virtual replay_ret replay( delta_handler_t &&delta_handler) = 0; - virtual seastar::future<> finish_commit( - transaction_type_t type) = 0; - virtual ~Journal() {} virtual backend_type_t get_type() = 0; diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc index 9ee8b1b997f0..41ff8318aba0 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.cc +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -58,35 +58,52 @@ CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close() return record_submitter.close(); } -CircularBoundedJournal::submit_record_ret +CircularBoundedJournal::submit_record_ertr::future<> CircularBoundedJournal::submit_record( record_t &&record, - OrderingHandle &handle) + OrderingHandle &handle, + transaction_type_t t_src, + on_submission_func_t &&on_submission) { LOG_PREFIX(CircularBoundedJournal::submit_record); DEBUG("H{} {} start ...", (void*)&handle, record); assert(write_pipeline); - return do_submit_record(std::move(record), handle); + return do_submit_record( + std::move(record), handle, std::move(on_submission) + ).safe_then([this, t_src] { + if (is_trim_transaction(t_src)) { + return update_journal_tail( + trimmer.get_dirty_tail(), + trimmer.get_alloc_tail()); + } else { + return seastar::now(); + } + }); } -CircularBoundedJournal::submit_record_ret +CircularBoundedJournal::submit_record_ertr::future<> CircularBoundedJournal::do_submit_record( record_t &&record, - OrderingHandle &handle) + OrderingHandle &handle, + on_submission_func_t &&on_submission) { LOG_PREFIX(CircularBoundedJournal::do_submit_record); if (!record_submitter.is_available()) { DEBUG("H{} wait ...", (void*)&handle); return record_submitter.wait_available( - ).safe_then([this, record=std::move(record), &handle]() mutable { - return do_submit_record(std::move(record), handle); + ).safe_then([this, record=std::move(record), &handle, + on_submission=std::move(on_submission)]() mutable { + return do_submit_record( + std::move(record), handle, std::move(on_submission)); }); } auto action = record_submitter.check_action(record.size); if (action == RecordSubmitter::action_t::ROLL) { return record_submitter.roll_segment( - ).safe_then([this, record=std::move(record), &handle]() mutable { - return do_submit_record(std::move(record), handle); + ).safe_then([this, record=std::move(record), &handle, + on_submission=std::move(on_submission)]() mutable { + return do_submit_record( + std::move(record), handle, std::move(on_submission)); }); } @@ -99,13 +116,16 @@ CircularBoundedJournal::do_submit_record( return handle.enter(write_pipeline->device_submission ).then([submit_fut=std::move(submit_ret.future)]() mutable { return std::move(submit_fut); - }).safe_then([FNAME, this, &handle](record_locator_t result) { + }).safe_then([FNAME, this, &handle, on_submission=std::move(on_submission) + ](record_locator_t result) mutable { return handle.enter(write_pipeline->finalize - ).then([FNAME, this, result, &handle] { + ).then([FNAME, this, result, &handle, + on_submission=std::move(on_submission)] { DEBUG("H{} finish with {}", (void*)&handle, result); auto new_committed_to = result.write_result.get_end_seq(); record_submitter.update_committed_to(new_committed_to); - return result; + std::invoke(on_submission, result); + return seastar::now(); }); }); } @@ -392,13 +412,4 @@ Journal::replay_ret CircularBoundedJournal::replay( }); } -seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) { - if (is_trim_transaction(type)) { - return update_journal_tail( - trimmer.get_dirty_tail(), - trimmer.get_alloc_tail()); - } - return seastar::now(); -} - } diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.h b/src/crimson/os/seastore/journal/circular_bounded_journal.h index 874bd8dc086d..16278df6cfe1 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.h +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.h @@ -80,9 +80,11 @@ public: return backend_type_t::RANDOM_BLOCK; } - submit_record_ret submit_record( + submit_record_ertr::future<> submit_record( record_t &&record, - OrderingHandle &handle + OrderingHandle &handle, + transaction_type_t t_src, + on_submission_func_t &&on_submission ) final; seastar::future<> flush( @@ -148,8 +150,6 @@ public: return cjs.get_records_start(); } - seastar::future<> finish_commit(transaction_type_t type) final; - using cbj_delta_handler_t = std::function< replay_ertr::future( const record_locator_t&, @@ -160,7 +160,10 @@ public: cbj_delta_handler_t &&delta_handler, journal_seq_t tail); - submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle); + submit_record_ertr::future<> do_submit_record( + record_t &&record, + OrderingHandle &handle, + on_submission_func_t &&on_submission); void try_read_rolled_header(scan_valid_records_cursor &cursor) { paddr_t addr = convert_abs_addr_to_paddr( diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index 6be2ad4936a0..67c0b3fb8ac0 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -368,25 +368,30 @@ seastar::future<> SegmentedJournal::flush(OrderingHandle &handle) }); } -SegmentedJournal::submit_record_ret +SegmentedJournal::submit_record_ertr::future<> SegmentedJournal::do_submit_record( record_t &&record, - OrderingHandle &handle) + OrderingHandle &handle, + on_submission_func_t &&on_submission) { LOG_PREFIX(SegmentedJournal::do_submit_record); if (!record_submitter.is_available()) { DEBUG("H{} wait ...", (void*)&handle); return record_submitter.wait_available( - ).safe_then([this, record=std::move(record), &handle]() mutable { - return do_submit_record(std::move(record), handle); + ).safe_then([this, record=std::move(record), &handle, + on_submission=std::move(on_submission)]() mutable { + return do_submit_record( + std::move(record), handle, std::move(on_submission)); }); } auto action = record_submitter.check_action(record.size); if (action == RecordSubmitter::action_t::ROLL) { DEBUG("H{} roll, unavailable ...", (void*)&handle); return record_submitter.roll_segment( - ).safe_then([this, record=std::move(record), &handle]() mutable { - return do_submit_record(std::move(record), handle); + ).safe_then([this, record=std::move(record), &handle, + on_submission=std::move(on_submission)]() mutable { + return do_submit_record( + std::move(record), handle, std::move(on_submission)); }); } else { // SUBMIT_FULL/NOT_FULL DEBUG("H{} submit {} ...", @@ -398,22 +403,27 @@ SegmentedJournal::do_submit_record( return handle.enter(write_pipeline->device_submission ).then([submit_fut=std::move(submit_ret.future)]() mutable { return std::move(submit_fut); - }).safe_then([FNAME, this, &handle](record_locator_t result) { + }).safe_then([FNAME, this, &handle, on_submission=std::move(on_submission) + ](record_locator_t result) mutable { return handle.enter(write_pipeline->finalize - ).then([FNAME, this, result, &handle] { + ).then([FNAME, this, result, &handle, + on_submission=std::move(on_submission)] { DEBUG("H{} finish with {}", (void*)&handle, result); auto new_committed_to = result.write_result.get_end_seq(); record_submitter.update_committed_to(new_committed_to); - return result; + std::invoke(on_submission, result); + return seastar::now(); }); }); } } -SegmentedJournal::submit_record_ret +SegmentedJournal::submit_record_ertr::future<> SegmentedJournal::submit_record( record_t &&record, - OrderingHandle &handle) + OrderingHandle &handle, + transaction_type_t t_src, + on_submission_func_t &&on_submission) { LOG_PREFIX(SegmentedJournal::submit_record); DEBUG("H{} {} start ...", (void*)&handle, record); @@ -429,7 +439,8 @@ SegmentedJournal::submit_record( return crimson::ct_error::erange::make(); } - return do_submit_record(std::move(record), handle); + return do_submit_record( + std::move(record), handle, std::move(on_submission)); } } diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index 891de7ec3069..3f51de70fb35 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -44,9 +44,11 @@ public: close_ertr::future<> close() final; - submit_record_ret submit_record( + submit_record_ertr::future<> submit_record( record_t &&record, - OrderingHandle &handle) final; + OrderingHandle &handle, + transaction_type_t t_src, + on_submission_func_t &&on_submission) final; seastar::future<> flush(OrderingHandle &handle) final; @@ -59,9 +61,6 @@ public: backend_type_t get_type() final { return backend_type_t::SEGMENTED; } - seastar::future<> finish_commit(transaction_type_t type) { - return seastar::now(); - } bool is_checksum_needed() final { // segmented journal always requires checksum @@ -69,10 +68,10 @@ public: } private: - submit_record_ret do_submit_record( + submit_record_ertr::future<> do_submit_record( record_t &&record, - OrderingHandle &handle - ); + OrderingHandle &handle, + on_submission_func_t &&on_submission); SegmentSeqAllocatorRef segment_seq_allocator; SegmentAllocator journal_segment_allocator; diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index 94e9b3b9ab15..753bd5d6ff62 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -461,8 +461,12 @@ TransactionManager::do_submit_transaction( } SUBTRACET(seastore_t, "submitting record", tref); - return journal->submit_record(std::move(record), tref.get_handle() - ).safe_then([this, FNAME, &tref](auto submit_result) mutable { + return journal->submit_record( + std::move(record), + tref.get_handle(), + tref.get_src(), + [this, FNAME, &tref](record_locator_t submit_result) + { SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result); auto start_seq = submit_result.write_result.start_seq; journal->get_trimmer().set_journal_head(start_seq); @@ -473,10 +477,8 @@ TransactionManager::do_submit_transaction( journal->get_trimmer().update_journal_tails( cache->get_oldest_dirty_from().value_or(start_seq), cache->get_oldest_backref_dirty_from().value_or(start_seq)); - return journal->finish_commit(tref.get_src() - ).then([&tref] { - return tref.get_handle().complete(); - }); + }).safe_then([&tref] { + return tref.get_handle().complete(); }).handle_error( submit_transaction_iertr::pass_further{}, crimson::ct_error::assert_all{"Hit error submitting to journal"} diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index 9988df3a1246..8b1f7435c87b 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -112,14 +112,22 @@ struct btree_test_base : seastar::future<> submit_transaction(TransactionRef t) { auto record = cache->prepare_record(*t, JOURNAL_SEQ_NULL, JOURNAL_SEQ_NULL); - return journal->submit_record(std::move(record), t->get_handle()).safe_then( - [this, t=std::move(t)](auto submit_result) mutable { - cache->complete_commit( - *t, + return seastar::do_with( + std::move(t), [this, record=std::move(record)](auto& _t) mutable { + auto& t = *_t; + return journal->submit_record( + std::move(record), + t.get_handle(), + t.get_src(), + [this, &t](auto submit_result) { + cache->complete_commit( + t, submit_result.record_block_base, submit_result.write_result.start_seq); - complete_commit(*t); - }).handle_error(crimson::ct_error::assert_all{}); + complete_commit(t); + } + ).handle_error(crimson::ct_error::assert_all{}); + }); } virtual LBAManager::mkfs_ret test_structure_setup(Transaction &t) = 0; diff --git a/src/test/crimson/seastore/test_cbjournal.cc b/src/test/crimson/seastore/test_cbjournal.cc index d00a0f42729a..47a08d68cbb4 100644 --- a/src/test/crimson/seastore/test_cbjournal.cc +++ b/src/test/crimson/seastore/test_cbjournal.cc @@ -181,15 +181,20 @@ struct cbjournal_test_t : public seastar_test_suite_t, JournalTrimmer auto submit_record(record_t&& record) { entries.push_back(record); + entry_validator_t& back = entries.back(); OrderingHandle handle = get_dummy_ordering_handle(); - auto [addr, w_result] = cbj->submit_record( - std::move(record), - handle).unsafe_get(); - entries.back().seq = w_result.start_seq; - entries.back().entries = 1; - entries.back().magic = cbj->get_cjs().get_cbj_header().magic; - logger().debug("submit entry to addr {}", entries.back().seq); - return convert_paddr_to_abs_addr(entries.back().seq.offset); + cbj->submit_record( + std::move(record), + handle, + transaction_type_t::MUTATE, + [this, &back](auto locator) { + back.seq = locator.write_result.start_seq; + back.entries = 1; + back.magic = cbj->get_cjs().get_cbj_header().magic; + logger().debug("submit entry to addr {}", back.seq); + } + ).unsafe_get(); + return convert_paddr_to_abs_addr(back.seq.offset); } seastar::future<> tear_down_fut() final { diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc index 2eb791b1d46a..04a99319b117 100644 --- a/src/test/crimson/seastore/test_seastore_journal.cc +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -233,12 +233,17 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider, JournalTrimmer { auto submit_record(T&&... _record) { auto record{std::forward(_record)...}; records.push_back(record); + record_validator_t& back = records.back(); OrderingHandle handle = get_dummy_ordering_handle(); - auto [addr, _] = journal->submit_record( + journal->submit_record( std::move(record), - handle).unsafe_get(); - records.back().record_final_offset = addr; - return addr; + handle, + transaction_type_t::MUTATE, + [&back](auto locator) { + back.record_final_offset = locator.record_block_base; + } + ).unsafe_get(); + return back.record_final_offset; } extent_t generate_extent(size_t blocks) {