From ee36a4eb3e2c5f694efc079bd693908f2441eec9 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 26 Aug 2025 09:04:05 -0700 Subject: [PATCH] crimson/.../transaction_manager: convert do_submit_transaction to coroutine Signed-off-by: Samuel Just --- .../os/seastore/transaction_manager.cc | 118 ++++++++---------- 1 file changed, 53 insertions(+), 65 deletions(-) diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index 24f8df98332..9184a9a27da 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -562,79 +562,67 @@ TransactionManager::do_submit_transaction( { LOG_PREFIX(TransactionManager::do_submit_transaction); SUBDEBUGT(seastore_t, "start, entering ool_writes", tref); - return trans_intr::make_interruptible( + co_await trans_intr::make_interruptible( tref.get_handle().enter(write_pipeline.ool_writes_and_lba_updates) - ).then_interruptible([this, FNAME, &tref, - dispatch_result = std::move(dispatch_result)] { - return seastar::do_with(std::move(dispatch_result), - [this, FNAME, &tref](auto &dispatch_result) { - SUBTRACET(seastore_t, "write delayed ool extents", tref); - return epm->write_delayed_ool_extents(tref, dispatch_result.alloc_map - ).handle_error_interruptible( - crimson::ct_error::input_output_error::pass_further(), - crimson::ct_error::assert_all("invalid error") - ); - }); - }).si_then([&tref, FNAME, this] { - return seastar::do_with( - tref.get_valid_pre_alloc_list(), - [this, FNAME, &tref](auto &allocated_extents) { - return update_lba_mappings(tref, allocated_extents - ).si_then([this, FNAME, &tref, &allocated_extents] { - auto num_extents = allocated_extents.size(); - SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents); - return epm->write_preallocated_ool_extents(tref, allocated_extents - ).handle_error_interruptible( - crimson::ct_error::input_output_error::pass_further(), - crimson::ct_error::assert_all("invalid error") - ); - }); - }); - }).si_then([this, FNAME, &tref] { - SUBTRACET(seastore_t, "entering prepare", tref); - return tref.get_handle().enter(write_pipeline.prepare); - }).si_then([this, FNAME, &tref, trim_alloc_to=std::move(trim_alloc_to)]() mutable - -> submit_transaction_iertr::future<> { - if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) { - SUBTRACET(seastore_t, "trim backref_bufs to {}", tref, *trim_alloc_to); - cache->trim_backref_bufs(*trim_alloc_to); - } + ); - auto record = cache->prepare_record( - tref, - journal->get_trimmer().get_journal_head(), - journal->get_trimmer().get_dirty_tail()); + SUBTRACET(seastore_t, "write delayed ool extents", tref); + co_await epm->write_delayed_ool_extents( + tref, dispatch_result.alloc_map + ); - tref.get_handle().maybe_release_collection_lock(); - if (tref.get_src() == Transaction::src_t::MUTATE) { - --(shard_stats.processing_inlock_io_num); - ++(shard_stats.processing_postlock_io_num); - } + auto allocated_extents = tref.get_valid_pre_alloc_list(); + co_await update_lba_mappings(tref, allocated_extents); - SUBTRACET(seastore_t, "submitting record", tref); - return journal->submit_record( - std::move(record), - tref.get_handle(), - tref.get_src(), - [this, FNAME, &tref](record_locator_t submit_result) - { - SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result); - auto start_seq = submit_result.write_result.start_seq; - journal->get_trimmer().set_journal_head(start_seq); - cache->complete_commit( - tref, - submit_result.record_block_base, - start_seq); - journal->get_trimmer().update_journal_tails( - cache->get_oldest_dirty_from().value_or(start_seq), - cache->get_oldest_backref_dirty_from().value_or(start_seq)); - }).safe_then([&tref] { - return tref.get_handle().complete(); + auto num_extents = allocated_extents.size(); + SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents); + co_await epm->write_preallocated_ool_extents(tref, allocated_extents); + + SUBTRACET(seastore_t, "entering prepare", tref); + co_await trans_intr::make_interruptible( + tref.get_handle().enter(write_pipeline.prepare) + ); + + if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) { + SUBTRACET(seastore_t, "trim backref_bufs to {}", tref, *trim_alloc_to); + cache->trim_backref_bufs(*trim_alloc_to); + } + + auto record = cache->prepare_record( + tref, + journal->get_trimmer().get_journal_head(), + journal->get_trimmer().get_dirty_tail()); + + tref.get_handle().maybe_release_collection_lock(); + if (tref.get_src() == Transaction::src_t::MUTATE) { + --(shard_stats.processing_inlock_io_num); + ++(shard_stats.processing_postlock_io_num); + } + + SUBTRACET(seastore_t, "submitting record", tref); + co_await journal->submit_record( + std::move(record), + tref.get_handle(), + tref.get_src(), + [this, FNAME, &tref](record_locator_t submit_result) { + SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result); + auto start_seq = submit_result.write_result.start_seq; + journal->get_trimmer().set_journal_head(start_seq); + cache->complete_commit( + tref, + submit_result.record_block_base, + start_seq); + journal->get_trimmer().update_journal_tails( + cache->get_oldest_dirty_from().value_or(start_seq), + cache->get_oldest_backref_dirty_from().value_or(start_seq)); }).handle_error( submit_transaction_iertr::pass_further{}, crimson::ct_error::assert_all{"Hit error submitting to journal"} ); - }); + + co_await trans_intr::make_interruptible( + tref.get_handle().complete() + ); } seastar::future<> TransactionManager::flush(OrderingHandle &handle) -- 2.39.5