From: Matan Breizman Date: Wed, 21 Aug 2024 13:37:13 +0000 (+0000) Subject: crimson/osd/ops_executer: flush_changes_n_do_ops_effects to use X-Git-Tag: testing/wip-pdonnell-testing-20240916.200549-debug~52^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=dbf7425c89e6ae854e3fd47b5e05eaf076ba6245;p=ceph-ci.git crimson/osd/ops_executer: flush_changes_n_do_ops_effects to use coroutines Signed-off-by: Matan Breizman --- diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 812e2468266..0398e30540d 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -21,6 +21,7 @@ #include "os/Transaction.h" #include "osd/osd_types.h" +#include "crimson/common/coroutine.h" #include "crimson/common/errorator.h" #include "crimson/common/interruptible_future.h" #include "crimson/common/type_helpers.h" @@ -518,57 +519,54 @@ OpsExecuter::flush_changes_n_do_ops_effects( // osd_op_params are instantiated by every wr-like operation. assert(osd_op_params || !want_mutate); assert(obc); - rep_op_fut_t maybe_mutated = - interruptor::make_ready_future( - seastar::now(), - interruptor::make_interruptible(osd_op_errorator::now())); + + auto submitted = interruptor::now(); + auto all_completed = + interruptor::make_interruptible(osd_op_errorator::now()); + if (cloning_ctx) { ceph_assert(want_mutate); } + if (want_mutate) { - maybe_mutated = flush_clone_metadata( + auto log_entries = co_await flush_clone_metadata( prepare_transaction(ops), snap_mapper, osdriver, - txn - ).then_interruptible([mut_func=std::move(mut_func), - this](auto&& log_entries) mutable { - if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) { - ceph_assert(log_rit->version == osd_op_params->at_version); - } - return std::forward(mut_func)(std::move(txn), - std::move(obc), - std::move(*osd_op_params), - std::move(log_entries) - ).then_interruptible([](auto p) { - auto &submitted = std::get<0>(p); - auto &all_completed = std::get<1>(p); - return interruptor::make_ready_future( - std::move(submitted), - osd_op_ierrorator::future<>(std::move(all_completed))); - }); - }); + txn); + + if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) { + ceph_assert(log_rit->version == osd_op_params->at_version); + } + + auto [_submitted, _all_completed] = co_await mut_func( + std::move(txn), + std::move(obc), + std::move(*osd_op_params), + std::move(log_entries)); + + submitted = std::move(_submitted); + all_completed = std::move(_all_completed); } + apply_stats(); - if (__builtin_expect(op_effects.empty(), true)) { - return maybe_mutated; - } else { - return maybe_mutated.then_unpack_interruptible( - // need extra ref pg due to apply_stats() which can be executed after - // informing snap mapper - [this, pg=this->pg](auto&& submitted, auto&& all_completed) mutable { - return interruptor::make_ready_future( - std::move(submitted), - all_completed.safe_then_interruptible([this, pg=std::move(pg)] { - // let's do the cleaning of `op_effects` in destructor - return interruptor::do_for_each(op_effects, - [pg=std::move(pg)](auto& op_effect) { - return op_effect->execute(pg); - }); - })); + if (op_effects.size()) [[unlikely]] { + // need extra ref pg due to apply_stats() which can be executed after + // informing snap mapper + all_completed = + std::move(all_completed).safe_then_interruptible([this, pg=this->pg] { + // let's do the cleaning of `op_effects` in destructor + return interruptor::do_for_each(op_effects, + [pg=std::move(pg)](auto& op_effect) { + return op_effect->execute(pg); + }); }); } + + co_return std::make_tuple( + std::move(submitted), + std::move(all_completed)); } template