}
return seastar::now();
},
- [] (auto&& ctx, ObjectContextRef obc) {
+ [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
if (emplaced) {
const auto& [cookie, entity] = ctx.key;
}
return seastar::now();
},
- [] (auto&& ctx, ObjectContextRef obc) {
+ [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) {
return seastar::do_with(std::move(nh.mapped()),
[ctx](auto&& watcher) {
ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
return seastar::now();
},
- [] (auto&& ctx, ObjectContextRef obc) {
+ [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
auto alive_watchers = obc->watchers | boost::adaptors::map_values
| boost::adaptors::filtered(
[] (const auto& w) {
}
return watch_errorator::now();
},
- [] (auto&& ctx, ObjectContextRef obc) {
+ [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
logger().info("notify_ack watch_cookie={}, notify_id={}",
ctx.watch_cookie, ctx.notify_id);
return seastar::do_for_each(obc->watchers,
// when operation requires this division, some variant of `with_effect()`
// should be used.
struct effect_t {
- virtual osd_op_errorator::future<> execute() = 0;
+ // an effect can affect PG, i.e. create a watch timeout
+ virtual osd_op_errorator::future<> execute(Ref<PG> pg) = 0;
virtual ~effect_t() = default;
};
execute_op(class OSDOp& osd_op);
template <typename MutFunc>
- osd_op_ierrorator::future<> flush_changes(MutFunc&& mut_func) &&;
+ osd_op_ierrorator::future<> flush_changes_n_do_ops_effects(
+ Ref<PG> pg,
+ MutFunc&& mut_func) &&;
const hobject_t &get_target() const {
return obc->obs.oi.soid;
// lambda only when it's closureless. We enforce this restriction due
// the fact that `flush_changes()` std::moves many executer's parts.
using allowed_effect_func_t =
- seastar::future<> (*)(context_t&&, ObjectContextRef);
+ seastar::future<> (*)(context_t&&, ObjectContextRef, Ref<PG>);
static_assert(std::is_convertible_v<EffectFunc, allowed_effect_func_t>,
"with_effect function is not allowed to capture");
struct task_t final : effect_t {
effect_func(std::move(effect_func)),
obc(std::move(obc)) {
}
- osd_op_errorator::future<> execute() final {
- return std::move(effect_func)(std::move(ctx), std::move(obc));
+ osd_op_errorator::future<> execute(Ref<PG> pg) final {
+ return std::move(effect_func)(std::move(ctx),
+ std::move(obc),
+ std::move(pg));
}
};
auto task =
}
template <typename MutFunc>
-OpsExecuter::osd_op_ierrorator::future<> OpsExecuter::flush_changes(
- MutFunc&& mut_func) &&
+OpsExecuter::osd_op_ierrorator::future<>
+OpsExecuter::flush_changes_n_do_ops_effects(Ref<PG> pg, MutFunc&& mut_func) &&
{
const bool want_mutate = !txn.empty();
// osd_op_params are instantiated by every wr-like operation.
if (__builtin_expect(op_effects.empty(), true)) {
return maybe_mutated;
} else {
- return maybe_mutated.safe_then_interruptible([this] {
+ return maybe_mutated.safe_then_interruptible([pg=std::move(pg),
+ this] () mutable {
// let's do the cleaning of `op_effects` in destructor
- return interruptor::do_for_each(op_effects, [] (auto& op_effect) {
- return op_effect->execute();
+ return interruptor::do_for_each(op_effects,
+ [pg=std::move(pg)] (auto& op_effect) {
+ return op_effect->execute(pg);
});
});
}