#include <memory>
#include <optional>
+#include <type_traits>
#include <boost/intrusive_ptr.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
namespace ceph::osd {
class OpsExecuter {
+ // an operation can be divided into two stages: main and effect-exposing
+ // one. The former is performed immediately on call to `do_osd_op()` while
+ // the later on `submit_changes()` – after successfully processing main
+ // stages of all involved operations. When any stage fails, none of all
+ // scheduled effect-exposing stages will be executed.
+ // when operation requires this division, `with_effect()` should be used.
+ struct effect_t {
+ virtual seastar::future<> execute() = 0;
+ virtual ~effect_t() = default;
+ };
+
PGBackend::cached_os_t os;
PG& pg;
PGBackend& backend;
size_t num_read = 0; ///< count read ops
size_t num_write = 0; ///< count update ops
+ // this gizmo could be wrapped in std::optional for the sake of lazy
+ // initialization. we don't need it for ops that doesn't have effect
+ // TODO: verify the init overhead of chunked_fifo
+ seastar::chunked_fifo<std::unique_ptr<effect_t>> op_effects;
+
+ template <class Context, class MainFunc, class EffectFunc>
+ auto with_effect(Context&& ctx, MainFunc&& main_func, EffectFunc&& effect_func);
+
seastar::future<> do_op_call(class OSDOp& osd_op);
template <class Func>
seastar::future<> do_osd_op(class OSDOp& osd_op);
- template <typename Func> seastar::future<> submit_changes(Func&& f) && {
- return std::forward<Func>(f)(std::move(txn), std::move(os));
- }
+ template <typename Func>
+ seastar::future<> submit_changes(Func&& f) &&;
};
+template <class Context, class MainFunc, class EffectFunc>
+auto OpsExecuter::with_effect(
+ Context&& ctx,
+ MainFunc&& main_func,
+ EffectFunc&& effect_func)
+{
+ using context_t = std::decay_t<Context>;
+ // the language offers implicit conversion to pointer-to-function for
+ // lambda only when it's closureless
+ static_assert(std::is_convertible_v<EffectFunc,
+ seastar::future<> (*)(context_t&&)>,
+ "with_effect function is not allowed to capture");
+ struct task_t final : effect_t {
+ context_t ctx;
+ EffectFunc effect_func;
+
+ task_t(Context&& ctx, EffectFunc&& effect_func)
+ : ctx(std::move(ctx)), effect_func(std::move(effect_func)) {}
+ seastar::future<> execute() final {
+ return std::move(effect_func)(std::move(ctx));
+ }
+ };
+ auto task =
+ std::make_unique<task_t>(std::move(ctx), std::move(effect_func));
+ auto& ctx_ref = task->ctx;
+ op_effects.emplace_back(std::move(task));
+ return std::forward<MainFunc>(main_func)(ctx_ref);
+}
+
+template <typename Func>
+seastar::future<> OpsExecuter::submit_changes(Func&& f) && {
+ return std::forward<Func>(f)(std::move(txn), std::move(os)).then(
+ // NOTE: this lambda could be scheduled conditionally (if_then?)
+ [this] {
+ return seastar::do_until(
+ [this] { return op_effects.empty(); },
+ [this] {
+ auto fut = op_effects.front()->execute();
+ op_effects.pop_front();
+ return fut;
+ });
+ });
+}
+
} // namespace ceph::osd