From 87084c755f62870132a04eb810552fdcd1afeb0c Mon Sep 17 00:00:00 2001 From: kchheda3 Date: Tue, 14 May 2024 14:57:55 -0400 Subject: [PATCH] rgw/lifecycle-notification: Do not block lc processing for notification errors. Currently if there is any error while calling publish_reserve the lc processing is cancelled for that object. This is different from behavior we have for replication events where the notification errors are not blocking replication. On similar note, lc being internal ceph processing, notification error's should not block the lc processing. Signed-off-by: kchheda3 --- src/rgw/rgw_lc.cc | 132 +++++++++++++++++----------------------------- 1 file changed, 47 insertions(+), 85 deletions(-) diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 49a85675bba..0d510d6642d 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -538,6 +538,35 @@ static bool pass_size_limit_checks(const DoutPrefixProvider *dpp, lc_op_ctx& oc) static std::string lc_id = "rgw lifecycle"; static std::string lc_req_id = "0"; +static void send_notification(const DoutPrefixProvider* dpp, + rgw::sal::Driver* driver, + rgw::sal::Object* obj, + rgw::sal::Bucket* bucket, + const std::string& etag, + uint64_t size, + const std::string& version_id, + const rgw::notify::EventTypeList& event_types) { + // notification supported only for RADOS driver for now + auto notify = driver->get_notification( + dpp, obj, nullptr, event_types, bucket, lc_id, + const_cast(bucket->get_tenant()), lc_req_id, null_yield); + + int ret = notify->publish_reserve(dpp, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: notify publish_reserve failed, with error: " + << ret << " for lc object: " << obj->get_name() + << " for event_types: " << event_types << dendl; + return; + } + ret = notify->publish_commit(dpp, size, ceph::real_clock::now(), etag, + version_id); + if (ret < 0) { + ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: " + << ret << " for lc object: " << obj->get_name() + << " for event_types: " << event_types << dendl; + } +} + /* do all zones in the zone group process LC? */ static bool zonegroup_lc_check(const DoutPrefixProvider *dpp, rgw::sal::Zone* zone) { @@ -571,7 +600,6 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, auto& meta = o.meta; int ret; auto version_id = obj_key.instance; // deep copy, so not cleared below - std::unique_ptr notify; /* per discussion w/Daniel, Casey,and Eric, we *do need* * a new sal object handle, based on the following decision @@ -593,6 +621,7 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, if (obj->get_attr(RGW_ATTR_ETAG, bl)) { etag = rgw_bl_str(bl); } + auto size = obj->get_size(); std::unique_ptr del_op = obj->get_delete_op(); @@ -603,20 +632,6 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, del_op->params.bucket_owner = bucket_info.owner; del_op->params.unmod_since = meta.mtime; - // notification supported only for RADOS driver for now - notify = driver->get_notification( - dpp, obj.get(), nullptr, event_types, oc.bucket, lc_id, - const_cast(oc.bucket->get_tenant()), lc_req_id, null_yield); - - ret = notify->publish_reserve(dpp, nullptr); - if ( ret < 0) { - ldpp_dout(dpp, 1) - << "ERROR: notify reservation failed, deferring delete of object k=" - << o.key - << dendl; - return ret; - } - uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone())) ? rgw::sal::FLAG_LOG_OP : 0; ret = del_op->delete_obj(dpp, null_yield, flags); @@ -624,14 +639,8 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, ldpp_dout(dpp, 1) << fmt::format("ERROR: {} failed, with error: {}", __func__, ret) << dendl; } else { - // send request to notification manager - int publish_ret = notify->publish_commit(dpp, obj->get_size(), - ceph::real_clock::now(), - etag, - version_id); - if (publish_ret < 0) { - ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: " << publish_ret << dendl; - } + send_notification(dpp, driver, obj.get(), oc.bucket, etag, size, version_id, + event_types); } return ret; @@ -880,8 +889,6 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, params.ns = RGW_OBJ_NS_MULTIPART; params.access_list_filter = MultipartMetaFilter; - const auto event_type = rgw::notify::ObjectExpirationAbortMPU; - auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi) { int ret{0}; auto wt = boost::get>(wi); @@ -901,36 +908,13 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, if (sal_obj->get_attr(RGW_ATTR_ETAG, bl)) { etag = rgw_bl_str(bl); } - - std::unique_ptr notify - = driver->get_notification( - this, sal_obj.get(), nullptr, {event_type}, target, lc_id, - const_cast(target->get_tenant()), lc_req_id, - null_yield); - auto version_id = obj.key.instance; - - ret = notify->publish_reserve(this, nullptr); - if (ret < 0) { - ldpp_dout(wk->get_lc(), 0) - << "ERROR: reserving persistent notification for " - "abort_multipart_upload, ret=" - << ret << ", thread:" << wq->thr_name() - << ", deferring mpu cleanup for meta:" << obj.key << dendl; - return ret; - } + auto size = sal_obj->get_size(); ret = mpu->abort(this, cct, null_yield); if (ret == 0) { - int publish_ret = notify->publish_commit( - this, sal_obj->get_size(), - ceph::real_clock::now(), - etag, - version_id); - if (publish_ret < 0) { - ldpp_dout(wk->get_lc(), 5) - << "WARNING: notify publish_commit failed, with error: " << ret - << dendl; - } + const auto event_type = rgw::notify::ObjectExpirationAbortMPU; + send_notification(this, driver, sal_obj.get(), target, etag, size, + obj.key.instance, {event_type}); if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); } @@ -1420,31 +1404,7 @@ public: if (obj->get_attr(RGW_ATTR_ETAG, bl)) { etag = rgw_bl_str(bl); } - - rgw::notify::EventTypeList event_types; - if (bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { - event_types.insert(event_types.end(), - {rgw::notify::ObjectTransitionCurrent, - rgw::notify::LifecycleTransition}); - } else { - event_types.push_back(rgw::notify::ObjectTransitionNonCurrent); - } - - std::unique_ptr notify = - oc.driver->get_notification( - oc.dpp, obj.get(), nullptr, event_types, bucket, lc_id, - const_cast(oc.bucket->get_tenant()), lc_req_id, - null_yield); - auto version_id = oc.o.key.instance; - - ret = notify->publish_reserve(oc.dpp, nullptr); - if (ret < 0) { - ldpp_dout(oc.dpp, 1) - << "ERROR: notify reservation failed, deferring transition of object k=" - << oc.o.key - << dendl; - return ret; - } + auto size = obj->get_size(); ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, oc.env.worker->get_cloud_targets(), @@ -1453,15 +1413,17 @@ public: if (ret < 0) { return ret; } else { - // send request to notification manager - int publish_ret = notify->publish_commit(oc.dpp, obj->get_size(), - ceph::real_clock::now(), - etag, - version_id); - if (publish_ret < 0) { - ldpp_dout(oc.dpp, 5) << - "WARNING: notify publish_commit failed, with error: " << publish_ret << dendl; + rgw::notify::EventTypeList event_types; + if (bucket->versioned() && oc.o.is_current() && + !oc.o.is_delete_marker()) { + event_types.insert(event_types.end(), + {rgw::notify::ObjectTransitionCurrent, + rgw::notify::LifecycleTransition}); + } else { + event_types.push_back(rgw::notify::ObjectTransitionNonCurrent); } + send_notification(oc.dpp, oc.driver, obj.get(), oc.bucket, etag, size, + oc.o.key.instance, event_types); } if (delete_object) { -- 2.39.5