From: Radoslaw Zarzynski Date: Tue, 13 Dec 2022 18:30:28 +0000 (+0000) Subject: crimson/osd: drop the nested interruptors in snap trimming X-Git-Tag: v18.1.0~260^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=877c272c2f21938dde604e35c51a7ce604a9f556;p=ceph.git crimson/osd: drop the nested interruptors in snap trimming `crimson::interruptible` does not support that. See the `DISABLED_nested_interruptors` unit test. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index 505092bcb53c..1e7dcda761af 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -48,13 +48,11 @@ void SnapTrimEvent::SubOpBlocker::emplace_back(Args&&... args) subops.emplace_back(std::forward(args)...); }; -seastar::future<> SnapTrimEvent::SubOpBlocker::wait_completion() +SnapTrimEvent::interruptible_future<> +SnapTrimEvent::SubOpBlocker::wait_completion() { - auto rng = subops | std::views::values; - return seastar::when_all_succeed( - std::begin(rng), std::end(rng) - ).then([] (auto&&...) { - return seastar::now(); + return interruptor::do_for_each(subops, [](auto&& kv) { + return std::move(kv.second); }); } @@ -149,7 +147,10 @@ seastar::future SnapTrimEvent::with_pg( pg, object, snapid); - subop_blocker.emplace_back(op->get_id(), std::move(fut)); + subop_blocker.emplace_back( + op->get_id(), + std::move(fut).handle_error_interruptible(crimson::ct_error::assert_all{}) + ); } return enter_stage( wait_subop @@ -194,7 +195,8 @@ CommonPGPipeline& SnapTrimObjSubEvent::pp() return pg->request_pg_pipeline; } -seastar::future<> SnapTrimObjSubEvent::start() +SnapTrimObjSubEvent::remove_or_update_iertr::future<> +SnapTrimObjSubEvent::start() { logger().debug("{}: start", *this); @@ -478,66 +480,59 @@ SnapTrimObjSubEvent::remove_or_update( }); } -seastar::future<> SnapTrimObjSubEvent::with_pg( +SnapTrimObjSubEvent::remove_or_update_iertr::future<> +SnapTrimObjSubEvent::with_pg( ShardServices &shard_services, Ref _pg) { - return interruptor::with_interruption([this] { + return enter_stage( + pp().wait_for_active + ).then_interruptible([this] { + return with_blocking_event([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this] { return enter_stage( - pp().wait_for_active - ).then_interruptible([this] { - return with_blocking_event([this] (auto&& trigger) { - return pg->wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this] { - return enter_stage( - pp().recover_missing); - }).then_interruptible([this] { - //return do_recover_missing(pg, get_target_oid()); - return seastar::now(); - }).then_interruptible([this] { + pp().recover_missing); + }).then_interruptible([] { + //return do_recover_missing(pg, get_target_oid()); + return seastar::now(); + }).then_interruptible([this] { + return enter_stage( + pp().get_obc); + }).then_interruptible([this] { + logger().debug("{}: getting obc for {}", *this, coid); + // end of commonality + // with_cone_obc lock both clone's and head's obcs + return pg->obc_loader.with_clone_obc(coid, [this](auto clone_obc) { + logger().debug("{}: got clone_obc={}", *this, fmt::ptr(clone_obc.get())); return enter_stage( - pp().get_obc); - }).then_interruptible([this] { - logger().debug("{}: getting obc for {}", *this, coid); - // end of commonality - // with_cone_obc lock both clone's and head's obcs - return pg->obc_loader.with_clone_obc(coid, [this](auto clone_obc) { - logger().debug("{}: got clone_obc={}", *this, clone_obc); - return enter_stage( - pp().process - ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable { - logger().debug("{}: processing clone_obc={}", *this, clone_obc); - return remove_or_update( - clone_obc, clone_obc->head - ).safe_then_unpack_interruptible([clone_obc, this] - (auto&& txn, auto&& log_entries) mutable { - auto [submitted, all_completed] = pg->submit_transaction( - std::move(clone_obc), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - return submitted.then_interruptible( - [all_completed=std::move(all_completed), this] () mutable { - return enter_stage( - wait_repop - ).then_interruptible([all_completed=std::move(all_completed)] () mutable { - return std::move(all_completed); - }); + pp().process + ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable { + logger().debug("{}: processing clone_obc={}", *this, fmt::ptr(clone_obc.get())); + return remove_or_update( + clone_obc, clone_obc->head + ).safe_then_unpack_interruptible([clone_obc, this] + (auto&& txn, auto&& log_entries) mutable { + auto [submitted, all_completed] = pg->submit_transaction( + std::move(clone_obc), + std::move(txn), + std::move(osd_op_p), + std::move(log_entries)); + return submitted.then_interruptible( + [all_completed=std::move(all_completed), this] () mutable { + return enter_stage( + wait_repop + ).then_interruptible([all_completed=std::move(all_completed)] () mutable { + return std::move(all_completed); }); }); }); - }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] { - return seastar::now(); - })); - }).then_interruptible([] { - // end of commonality + }); + }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] { return seastar::now(); - }); - }, [this](std::exception_ptr eptr) { - // TODO: better debug output - logger().debug("{}: interrupted {}", *this, eptr); - }, pg); + })); + }); } void SnapTrimObjSubEvent::print(std::ostream &lhs) const diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h index 902faf9c5840..13b42723ea28 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.h +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -54,14 +54,15 @@ private: struct SubOpBlocker : crimson::BlockerT { static constexpr const char* type_name = "CompoundOpBlocker"; - using id_done_t = std::pair>; + using id_done_t = std::pair>; void dump_detail(Formatter *f) const final; template void emplace_back(Args&&... args); - seastar::future<> wait_completion(); + interruptible_future<> wait_completion(); private: std::vector subops; } subop_blocker; @@ -106,6 +107,12 @@ public: // cannot revisite a pipeline's stage it already saw. class SnapTrimObjSubEvent : public PhasedOperationT { public: + using remove_or_update_ertr = + crimson::errorator; + using remove_or_update_iertr = + crimson::interruptible::interruptible_errorator< + IOInterruptCondition, remove_or_update_ertr>; + static constexpr OperationTypeCode type = OperationTypeCode::snaptrimobj_subevent; @@ -120,8 +127,8 @@ public: void print(std::ostream &) const final; void dump_detail(ceph::Formatter* f) const final; - seastar::future<> start(); - seastar::future<> with_pg( + remove_or_update_iertr::future<> start(); + remove_or_update_iertr::future<> with_pg( ShardServices &shard_services, Ref pg); CommonPGPipeline& pp(); @@ -129,12 +136,6 @@ public: private: object_stat_sum_t delta_stats; - using remove_or_update_ertr = - crimson::errorator; - using remove_or_update_iertr = - crimson::interruptible::interruptible_errorator< - IOInterruptCondition, remove_or_update_ertr>; - remove_or_update_iertr::future<> remove_clone( ObjectContextRef obc, ceph::os::Transaction& txn,