#include "os/Transaction.h"
#include "osd/osd_types.h"
+#include "crimson/common/coroutine.h"
#include "crimson/common/errorator.h"
#include "crimson/common/interruptible_future.h"
#include "crimson/common/type_helpers.h"
// osd_op_params are instantiated by every wr-like operation.
assert(osd_op_params || !want_mutate);
assert(obc);
- rep_op_fut_t maybe_mutated =
- interruptor::make_ready_future<rep_op_fut_tuple>(
- seastar::now(),
- interruptor::make_interruptible(osd_op_errorator::now()));
+
+ auto submitted = interruptor::now();
+ auto all_completed =
+ interruptor::make_interruptible(osd_op_errorator::now());
+
if (cloning_ctx) {
ceph_assert(want_mutate);
}
+
if (want_mutate) {
- maybe_mutated = flush_clone_metadata(
+ auto log_entries = co_await flush_clone_metadata(
prepare_transaction(ops),
snap_mapper,
osdriver,
- txn
- ).then_interruptible([mut_func=std::move(mut_func),
- this](auto&& log_entries) mutable {
- if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) {
- ceph_assert(log_rit->version == osd_op_params->at_version);
- }
- return std::forward<MutFunc>(mut_func)(std::move(txn),
- std::move(obc),
- std::move(*osd_op_params),
- std::move(log_entries)
- ).then_interruptible([](auto p) {
- auto &submitted = std::get<0>(p);
- auto &all_completed = std::get<1>(p);
- return interruptor::make_ready_future<rep_op_fut_tuple>(
- std::move(submitted),
- osd_op_ierrorator::future<>(std::move(all_completed)));
- });
- });
+ txn);
+
+ if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) {
+ ceph_assert(log_rit->version == osd_op_params->at_version);
+ }
+
+ auto [_submitted, _all_completed] = co_await mut_func(
+ std::move(txn),
+ std::move(obc),
+ std::move(*osd_op_params),
+ std::move(log_entries));
+
+ submitted = std::move(_submitted);
+ all_completed = std::move(_all_completed);
}
+
apply_stats();
- if (__builtin_expect(op_effects.empty(), true)) {
- return maybe_mutated;
- } else {
- return maybe_mutated.then_unpack_interruptible(
- // need extra ref pg due to apply_stats() which can be executed after
- // informing snap mapper
- [this, pg=this->pg](auto&& submitted, auto&& all_completed) mutable {
- return interruptor::make_ready_future<rep_op_fut_tuple>(
- std::move(submitted),
- all_completed.safe_then_interruptible([this, pg=std::move(pg)] {
- // let's do the cleaning of `op_effects` in destructor
- return interruptor::do_for_each(op_effects,
- [pg=std::move(pg)](auto& op_effect) {
- return op_effect->execute(pg);
- });
- }));
+ if (op_effects.size()) [[unlikely]] {
+ // need extra ref pg due to apply_stats() which can be executed after
+ // informing snap mapper
+ all_completed =
+ std::move(all_completed).safe_then_interruptible([this, pg=this->pg] {
+ // let's do the cleaning of `op_effects` in destructor
+ return interruptor::do_for_each(op_effects,
+ [pg=std::move(pg)](auto& op_effect) {
+ return op_effect->execute(pg);
+ });
});
}
+
+ co_return std::make_tuple(
+ std::move(submitted),
+ std::move(all_completed));
}
template <class Func>