subops.emplace_back(std::forward<Args>(args)...);
};
-seastar::future<> SnapTrimEvent::SubOpBlocker::wait_completion()
+SnapTrimEvent::interruptible_future<>
+SnapTrimEvent::SubOpBlocker::wait_completion()
{
- auto rng = subops | std::views::values;
- return seastar::when_all_succeed(
- std::begin(rng), std::end(rng)
- ).then([] (auto&&...) {
- return seastar::now();
+ return interruptor::do_for_each(subops, [](auto&& kv) {
+ return std::move(kv.second);
});
}
pg,
object,
snapid);
- subop_blocker.emplace_back(op->get_id(), std::move(fut));
+ subop_blocker.emplace_back(
+ op->get_id(),
+ std::move(fut).handle_error_interruptible(crimson::ct_error::assert_all{})
+ );
}
return enter_stage<interruptor>(
wait_subop
return pg->request_pg_pipeline;
}
-seastar::future<> SnapTrimObjSubEvent::start()
+SnapTrimObjSubEvent::remove_or_update_iertr::future<>
+SnapTrimObjSubEvent::start()
{
logger().debug("{}: start", *this);
});
}
-seastar::future<> SnapTrimObjSubEvent::with_pg(
+SnapTrimObjSubEvent::remove_or_update_iertr::future<>
+SnapTrimObjSubEvent::with_pg(
ShardServices &shard_services, Ref<PG> _pg)
{
- return interruptor::with_interruption([this] {
+ return enter_stage<interruptor>(
+ pp().wait_for_active
+ ).then_interruptible([this] {
+ return with_blocking_event<PGActivationBlocker::BlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->wait_for_active_blocker.wait(std::move(trigger));
+ });
+ }).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().wait_for_active
- ).then_interruptible([this] {
- return with_blocking_event<PGActivationBlocker::BlockingEvent,
- interruptor>([this] (auto&& trigger) {
- return pg->wait_for_active_blocker.wait(std::move(trigger));
- });
- }).then_interruptible([this] {
- return enter_stage<interruptor>(
- pp().recover_missing);
- }).then_interruptible([this] {
- //return do_recover_missing(pg, get_target_oid());
- return seastar::now();
- }).then_interruptible([this] {
+ pp().recover_missing);
+ }).then_interruptible([] {
+ //return do_recover_missing(pg, get_target_oid());
+ return seastar::now();
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ pp().get_obc);
+ }).then_interruptible([this] {
+ logger().debug("{}: getting obc for {}", *this, coid);
+ // end of commonality
+ // with_cone_obc lock both clone's and head's obcs
+ return pg->obc_loader.with_clone_obc<RWState::RWWRITE>(coid, [this](auto clone_obc) {
+ logger().debug("{}: got clone_obc={}", *this, fmt::ptr(clone_obc.get()));
return enter_stage<interruptor>(
- pp().get_obc);
- }).then_interruptible([this] {
- logger().debug("{}: getting obc for {}", *this, coid);
- // end of commonality
- // with_cone_obc lock both clone's and head's obcs
- return pg->obc_loader.with_clone_obc<RWState::RWWRITE>(coid, [this](auto clone_obc) {
- logger().debug("{}: got clone_obc={}", *this, clone_obc);
- return enter_stage<interruptor>(
- pp().process
- ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
- logger().debug("{}: processing clone_obc={}", *this, clone_obc);
- return remove_or_update(
- clone_obc, clone_obc->head
- ).safe_then_unpack_interruptible([clone_obc, this]
- (auto&& txn, auto&& log_entries) mutable {
- auto [submitted, all_completed] = pg->submit_transaction(
- std::move(clone_obc),
- std::move(txn),
- std::move(osd_op_p),
- std::move(log_entries));
- return submitted.then_interruptible(
- [all_completed=std::move(all_completed), this] () mutable {
- return enter_stage<interruptor>(
- wait_repop
- ).then_interruptible([all_completed=std::move(all_completed)] () mutable {
- return std::move(all_completed);
- });
+ pp().process
+ ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
+ logger().debug("{}: processing clone_obc={}", *this, fmt::ptr(clone_obc.get()));
+ return remove_or_update(
+ clone_obc, clone_obc->head
+ ).safe_then_unpack_interruptible([clone_obc, this]
+ (auto&& txn, auto&& log_entries) mutable {
+ auto [submitted, all_completed] = pg->submit_transaction(
+ std::move(clone_obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ std::move(log_entries));
+ return submitted.then_interruptible(
+ [all_completed=std::move(all_completed), this] () mutable {
+ return enter_stage<interruptor>(
+ wait_repop
+ ).then_interruptible([all_completed=std::move(all_completed)] () mutable {
+ return std::move(all_completed);
});
});
});
- }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
- return seastar::now();
- }));
- }).then_interruptible([] {
- // end of commonality
+ });
+ }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
return seastar::now();
- });
- }, [this](std::exception_ptr eptr) {
- // TODO: better debug output
- logger().debug("{}: interrupted {}", *this, eptr);
- }, pg);
+ }));
+ });
}
void SnapTrimObjSubEvent::print(std::ostream &lhs) const
struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
static constexpr const char* type_name = "CompoundOpBlocker";
- using id_done_t = std::pair<crimson::Operation::id_t, seastar::future<>>;
+ using id_done_t = std::pair<crimson::Operation::id_t,
+ interruptible_future<>>;
void dump_detail(Formatter *f) const final;
template <class... Args>
void emplace_back(Args&&... args);
- seastar::future<> wait_completion();
+ interruptible_future<> wait_completion();
private:
std::vector<id_done_t> subops;
} subop_blocker;
// cannot revisite a pipeline's stage it already saw.
class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> {
public:
+ using remove_or_update_ertr =
+ crimson::errorator<crimson::ct_error::enoent>;
+ using remove_or_update_iertr =
+ crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, remove_or_update_ertr>;
+
static constexpr OperationTypeCode type =
OperationTypeCode::snaptrimobj_subevent;
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
- seastar::future<> start();
- seastar::future<> with_pg(
+ remove_or_update_iertr::future<> start();
+ remove_or_update_iertr::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
CommonPGPipeline& pp();
private:
object_stat_sum_t delta_stats;
- using remove_or_update_ertr =
- crimson::errorator<crimson::ct_error::enoent>;
- using remove_or_update_iertr =
- crimson::interruptible::interruptible_errorator<
- IOInterruptCondition, remove_or_update_ertr>;
-
remove_or_update_iertr::future<> remove_clone(
ObjectContextRef obc,
ceph::os::Transaction& txn,