From 2a3f1174b8f2d939187527b7b167f2602281564d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 12 Jun 2025 16:11:30 -0400 Subject: [PATCH] rgw/lc: pass optional_yield arguments to WorkQ functions Signed-off-by: Casey Bodley --- src/rgw/rgw_lc.cc | 79 ++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 4397c5b5502..aa6c713391b 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -537,7 +537,8 @@ struct lc_op_ctx { }; /* lc_op_ctx */ -static bool pass_size_limit_checks(const DoutPrefixProvider *dpp, lc_op_ctx& oc) { +static bool pass_size_limit_checks(const DoutPrefixProvider *dpp, + optional_yield y, lc_op_ctx& oc) { const auto& op = oc.op; if (op.size_gt || op.size_lt) { @@ -546,7 +547,7 @@ static bool pass_size_limit_checks(const DoutPrefixProvider *dpp, lc_op_ctx& oc) auto& o = oc.o; std::unique_ptr obj = bucket->get_object(o.key); - ret = obj->load_obj_state(dpp, null_yield, true); + ret = obj->load_obj_state(dpp, y, true); if (ret < 0) { return false; } @@ -571,6 +572,7 @@ static std::string lc_id = "rgw lifecycle"; static std::string lc_req_id = "0"; static void send_notification(const DoutPrefixProvider* dpp, + optional_yield y, rgw::sal::Driver* driver, rgw::sal::Object* obj, rgw::sal::Bucket* bucket, @@ -581,7 +583,7 @@ static void send_notification(const DoutPrefixProvider* dpp, // notification supported only for RADOS driver for now auto notify = driver->get_notification( dpp, obj, nullptr, event_types, bucket, lc_id, - const_cast(bucket->get_tenant()), lc_req_id, null_yield); + const_cast(bucket->get_tenant()), lc_req_id, y); int ret = notify->publish_reserve(dpp, nullptr); if (ret < 0) { @@ -622,7 +624,7 @@ static bool zonegroup_lc_check(const DoutPrefixProvider *dpp, rgw::sal::Zone* zo } static int remove_expired_obj(const DoutPrefixProvider* dpp, - lc_op_ctx& oc, + optional_yield y, lc_op_ctx& oc, bool remove_indeed, const rgw::notify::EventTypeList& event_types) { int ret{0}; @@ -645,7 +647,7 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, string etag; auto obj = oc.bucket->get_object(obj_key); - ret = obj->load_obj_state(dpp, null_yield, true); + ret = obj->load_obj_state(dpp, y, true); if (ret < 0) { /* for delete markers, we expect load_obj_state() to "fail" * with -ENOENT */ @@ -679,13 +681,13 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone())) ? rgw::sal::FLAG_LOG_OP : 0; - ret = del_op->delete_obj(dpp, null_yield, flags); + ret = del_op->delete_obj(dpp, y, flags); if (ret < 0) { ldpp_dout(dpp, 1) << fmt::format("ERROR: {} failed, with error: {}", __func__, ret) << dendl; } else { if (have_notify) { - send_notification(dpp, driver, obj.get(), oc.bucket, etag, size, + send_notification(dpp, y, driver, obj.get(), oc.bucket, etag, size, version_id, event_types); } } @@ -955,7 +957,7 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, params.ns = RGW_OBJ_NS_MULTIPART; params.access_list_filter = MultipartMetaFilter; - auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi) { + auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi, optional_yield y) { int ret{0}; auto wt = std::get>(wi); auto& [rule, obj] = wt; @@ -966,7 +968,7 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, auto sal_obj = target->get_object(key); string etag; - ret = sal_obj->load_obj_state(this, null_yield, true); + ret = sal_obj->load_obj_state(this, y, true); if (ret < 0) { return ret; } @@ -976,10 +978,10 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, } auto size = sal_obj->get_size(); - ret = mpu->abort(this, cct, null_yield); + ret = mpu->abort(this, cct, y); if (ret == 0) { const auto event_type = rgw::notify::ObjectExpirationAbortMPU; - send_notification(this, driver, sal_obj.get(), target, etag, size, + send_notification(this, y, driver, sal_obj.get(), target, etag, size, obj.key.instance, {event_type}); if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); @@ -1211,7 +1213,7 @@ public: is_expired = obj_has_expired(dpp, oc.cct, mtime, op.expiration, exp_time); } - auto size_check_p = pass_size_limit_checks(dpp, oc); + auto size_check_p = pass_size_limit_checks(dpp, y, oc); ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << " size_check_p: " @@ -1221,12 +1223,12 @@ public: return is_expired && size_check_p; } - int process(lc_op_ctx& oc) override { + int process(lc_op_ctx& oc, optional_yield y) override { auto& o = oc.o; int r; if (o.is_delete_marker()) { r = remove_expired_obj( - oc.dpp, oc, true, + oc.dpp, y, oc, true, {rgw::notify::ObjectExpirationDeleteMarker, rgw::notify::LifecycleExpirationDeleteMarkerCreated}); if (r < 0) { @@ -1241,7 +1243,7 @@ public: << " " << oc.wq->thr_name() << dendl; } else { /* ! o.is_delete_marker() */ - r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioning_enabled(), + r = remove_expired_obj(oc.dpp, y, oc, !oc.bucket->versioning_enabled(), {rgw::notify::ObjectExpirationCurrent, rgw::notify::LifecycleExpirationDelete}); if (r < 0) { @@ -1279,7 +1281,7 @@ public: int expiration = oc.op.noncur_expiration; bool is_expired = obj_has_expired(dpp, oc.cct, oc.effective_mtime, expiration, exp_time); - auto size_check_p = pass_size_limit_checks(dpp, oc); + auto size_check_p = pass_size_limit_checks(dpp, y, oc); auto newer_noncurrent_p = (oc.num_noncurrent > oc.op.newer_noncurrent); ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" @@ -1291,12 +1293,12 @@ public: return is_expired && (oc.num_noncurrent > oc.op.newer_noncurrent) && size_check_p && - pass_object_lock_check(oc.driver, oc.obj.get(), dpp); + pass_object_lock_check(oc.driver, oc.obj.get(), dpp, y); } - int process(lc_op_ctx& oc) override { + int process(lc_op_ctx& oc, optional_yield y) override { auto& o = oc.o; - int r = remove_expired_obj(oc.dpp, oc, true, + int r = remove_expired_obj(oc.dpp, y, oc, true, {rgw::notify::LifecycleExpirationDelete, rgw::notify::ObjectExpirationNoncurrent}); if (r < 0) { @@ -1341,9 +1343,9 @@ public: return true; } - int process(lc_op_ctx& oc) override { + int process(lc_op_ctx& oc, optional_yield y) override { auto& o = oc.o; - int r = remove_expired_obj(oc.dpp, oc, true, + int r = remove_expired_obj(oc.dpp, y, oc, true, {rgw::notify::ObjectExpirationDeleteMarker, rgw::notify::LifecycleExpirationDeleteMarkerCreated}); if (r < 0) { @@ -1402,7 +1404,7 @@ public: is_expired = obj_has_expired(dpp, oc.cct, mtime, transition.days, exp_time); } - auto size_check_p = pass_size_limit_checks(dpp, oc); + auto size_check_p = pass_size_limit_checks(dpp, y, oc); ldpp_dout(oc.dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << " " << " size_check_p: " @@ -1427,19 +1429,19 @@ public: */ if (! oc.bucket->versioning_enabled()) { ret = - remove_expired_obj(oc.dpp, oc, true, {/* no delete notify expected */}); + remove_expired_obj(oc.dpp, y, oc, true, {/* no delete notify expected */}); ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not versioned flags: " << oc.o.flags << dendl; } else { /* versioned */ if (oc.o.is_current() && !oc.o.is_delete_marker()) { - ret = remove_expired_obj(oc.dpp, oc, false, {/* no delete notify expected */}); + ret = remove_expired_obj(oc.dpp, y, oc, false, {/* no delete notify expected */}); ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } else { - ret = remove_expired_obj(oc.dpp, oc, true, + ret = remove_expired_obj(oc.dpp, y, oc, true, {/* no delete notify expected */}); ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " @@ -1451,7 +1453,7 @@ public: return ret; } - int transition_obj_to_cloud(lc_op_ctx& oc) { + int transition_obj_to_cloud(lc_op_ctx& oc, optional_yield y) { int ret{0}; /* If CurrentVersion object & bucket has versioning enabled, remove it & * create delete marker */ @@ -1463,7 +1465,7 @@ public: auto& obj = oc.obj; string etag; - ret = obj->load_obj_state(oc.dpp, null_yield, true); + ret = obj->load_obj_state(oc.dpp, y, true); if (ret < 0) { return ret; } @@ -1475,8 +1477,7 @@ public: ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, oc.env.worker->get_cloud_targets(), - oc.cct, !delete_object, oc.dpp, - null_yield); + oc.cct, !delete_object, oc.dpp, y); if (ret < 0) { return ret; } else { @@ -1489,7 +1490,7 @@ public: } else { event_types.push_back(rgw::notify::ObjectTransitionNonCurrent); } - send_notification(oc.dpp, oc.driver, obj.get(), oc.bucket, etag, size, + send_notification(oc.dpp, y, oc.driver, obj.get(), oc.bucket, etag, size, oc.o.key.instance, event_types); } @@ -1504,7 +1505,7 @@ public: return 0; } - int process(lc_op_ctx& oc) override { + int process(lc_op_ctx& oc, optional_yield y) override { auto& o = oc.o; int r; @@ -1519,7 +1520,7 @@ public: auto& obj = oc.obj; string etag; - r = obj->load_obj_state(oc.dpp, null_yield, true); + r = obj->load_obj_state(oc.dpp, y, true); if (r < 0) { ldpp_dout(oc.dpp, 0) << fmt::format("ERROR: get_obj_state() failed on transition of object k={} error r={}", @@ -1546,7 +1547,7 @@ public: oc.driver->get_notification( oc.dpp, obj.get(), nullptr, event_types, bucket, lc_id, const_cast(oc.bucket->get_tenant()), lc_req_id, - null_yield); + y); auto version_id = oc.o.key.instance; r = notify->publish_reserve(oc.dpp, nullptr); @@ -1594,7 +1595,7 @@ public: uint32_t flags = !zonegroup_lc_check(oc.dpp, oc.driver->get_zone()) ? rgw::sal::FLAG_LOG_OP : 0; int r = oc.obj->transition(oc.bucket, target_placement, o.meta.mtime, - o.versioned_epoch, oc.dpp, null_yield, flags); + o.versioned_epoch, oc.dpp, y, flags); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " << oc.bucket << ":" << o.key @@ -1634,8 +1635,8 @@ protected: public: LCOpAction_CurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {} - int process(lc_op_ctx& oc) override { - int r = LCOpAction_Transition::process(oc); + int process(lc_op_ctx& oc, optional_yield y) override { + int r = LCOpAction_Transition::process(oc, y); if (r == 0) { if (perfcounter) { perfcounter->inc(l_rgw_lc_transition_current, 1); @@ -1659,8 +1660,8 @@ public: const transition_action& _transition) : LCOpAction_Transition(_transition) {} - int process(lc_op_ctx& oc) override { - int r = LCOpAction_Transition::process(oc); + int process(lc_op_ctx& oc, optional_yield y) override { + int r = LCOpAction_Transition::process(oc, y); if (r == 0) { if (perfcounter) { perfcounter->inc(l_rgw_lc_transition_noncurrent, 1); @@ -1829,7 +1830,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, /* fetch information for zone checks */ rgw::sal::Zone* zone = driver->get_zone(); - auto pf = [&bucket_name](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { + auto pf = [&bucket_name](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi, optional_yield y) { auto wt = std::get>(wi); auto& [op_rule, o] = wt; -- 2.39.5