From: kchheda3 Date: Tue, 14 May 2024 18:57:55 +0000 (-0400) Subject: rgw/lifecycle-notification: Do not block lc processing for notification errors. X-Git-Tag: v19.1.1~50^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=113e500338d757a0c2dc73b409adf8d2b99e2aec;p=ceph.git rgw/lifecycle-notification: Do not block lc processing for notification errors. Currently if there is any error while calling publish_reserve the lc processing is cancelled for that object. This is different from behavior we have for replication events where the notification errors are not blocking replication. On similar note, lc being internal ceph processing, notification error's should not block the lc processing. Signed-off-by: kchheda3 (cherry picked from commit 87084c755f62870132a04eb810552fdcd1afeb0c) --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index cfc9ce635488..cb1b92a04e7b 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -539,6 +539,35 @@ static bool pass_size_limit_checks(const DoutPrefixProvider *dpp, lc_op_ctx& oc) static std::string lc_id = "rgw lifecycle"; static std::string lc_req_id = "0"; +static void send_notification(const DoutPrefixProvider* dpp, + rgw::sal::Driver* driver, + rgw::sal::Object* obj, + rgw::sal::Bucket* bucket, + const std::string& etag, + uint64_t size, + const std::string& version_id, + const rgw::notify::EventTypeList& event_types) { + // notification supported only for RADOS driver for now + auto notify = driver->get_notification( + dpp, obj, nullptr, event_types, bucket, lc_id, + const_cast(bucket->get_tenant()), lc_req_id, null_yield); + + int ret = notify->publish_reserve(dpp, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: notify publish_reserve failed, with error: " + << ret << " for lc object: " << obj->get_name() + << " for event_types: " << event_types << dendl; + return; + } + ret = notify->publish_commit(dpp, size, ceph::real_clock::now(), etag, + version_id); + if (ret < 0) { + ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: " + << ret << " for lc object: " << obj->get_name() + << " for event_types: " << event_types << dendl; + } +} + /* do all zones in the zone group process LC? */ static bool zonegroup_lc_check(const DoutPrefixProvider *dpp, rgw::sal::Zone* zone) { @@ -572,7 +601,6 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, auto& meta = o.meta; int ret; auto version_id = obj_key.instance; // deep copy, so not cleared below - std::unique_ptr notify; /* per discussion w/Daniel, Casey,and Eric, we *do need* * a new sal object handle, based on the following decision @@ -595,6 +623,7 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, if (iter != obj_state->attrset.end()) { etag = rgw_bl_str(iter->second); } + auto size = obj->get_obj_size(); std::unique_ptr del_op = obj->get_delete_op(); @@ -605,20 +634,6 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, del_op->params.bucket_owner = bucket_info.owner; del_op->params.unmod_since = meta.mtime; - // notification supported only for RADOS driver for now - notify = driver->get_notification( - dpp, obj.get(), nullptr, event_types, oc.bucket, lc_id, - const_cast(oc.bucket->get_tenant()), lc_req_id, null_yield); - - ret = notify->publish_reserve(dpp, nullptr); - if ( ret < 0) { - ldpp_dout(dpp, 1) - << "ERROR: notify reservation failed, deferring delete of object k=" - << o.key - << dendl; - return ret; - } - uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone())) ? rgw::sal::FLAG_LOG_OP : 0; ret = del_op->delete_obj(dpp, null_yield, flags); @@ -626,14 +641,8 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp, ldpp_dout(dpp, 1) << fmt::format("ERROR: {} failed, with error: {}", __func__, ret) << dendl; } else { - // send request to notification manager - int publish_ret = notify->publish_commit(dpp, obj_state->size, - ceph::real_clock::now(), - etag, - version_id); - if (publish_ret < 0) { - ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: " << publish_ret << dendl; - } + send_notification(dpp, driver, obj.get(), oc.bucket, etag, size, version_id, + event_types); } return ret; @@ -882,8 +891,6 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, params.ns = RGW_OBJ_NS_MULTIPART; params.access_list_filter = MultipartMetaFilter; - const auto event_type = rgw::notify::ObjectExpirationAbortMPU; - auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi) { int ret{0}; auto wt = boost::get>(wi); @@ -904,36 +911,13 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, if (iter != obj_state->attrset.end()) { etag = rgw_bl_str(iter->second); } - - std::unique_ptr notify - = driver->get_notification( - this, sal_obj.get(), nullptr, {event_type}, target, lc_id, - const_cast(target->get_tenant()), lc_req_id, - null_yield); - auto version_id = obj.key.instance; - - ret = notify->publish_reserve(this, nullptr); - if (ret < 0) { - ldpp_dout(wk->get_lc(), 0) - << "ERROR: reserving persistent notification for " - "abort_multipart_upload, ret=" - << ret << ", thread:" << wq->thr_name() - << ", deferring mpu cleanup for meta:" << obj.key << dendl; - return ret; - } + auto size = sal_obj->get_obj_size(); ret = mpu->abort(this, cct, null_yield); if (ret == 0) { - int publish_ret = notify->publish_commit( - this, obj_state->size, - ceph::real_clock::now(), - etag, - version_id); - if (publish_ret < 0) { - ldpp_dout(wk->get_lc(), 5) - << "WARNING: notify publish_commit failed, with error: " << ret - << dendl; - } + const auto event_type = rgw::notify::ObjectExpirationAbortMPU; + send_notification(this, driver, sal_obj.get(), target, etag, size, + obj.key.instance, {event_type}); if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); } @@ -1429,6 +1413,7 @@ public: if (iter != obj_state->attrset.end()) { etag = rgw_bl_str(iter->second); } +<<<<<<< HEAD rgw::notify::EventTypeList event_types; if (bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { @@ -1445,15 +1430,7 @@ public: const_cast(oc.bucket->get_tenant()), lc_req_id, null_yield); auto version_id = oc.o.key.instance; - - ret = notify->publish_reserve(oc.dpp, nullptr); - if (ret < 0) { - ldpp_dout(oc.dpp, 1) - << "ERROR: notify reservation failed, deferring transition of object k=" - << oc.o.key - << dendl; - return ret; - } + auto size = obj->get_obj_size(); ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, oc.env.worker->get_cloud_targets(), @@ -1462,15 +1439,17 @@ public: if (ret < 0) { return ret; } else { - // send request to notification manager - int publish_ret = notify->publish_commit(oc.dpp, obj_state->size, - ceph::real_clock::now(), - etag, - version_id); - if (publish_ret < 0) { - ldpp_dout(oc.dpp, 5) << - "WARNING: notify publish_commit failed, with error: " << publish_ret << dendl; + rgw::notify::EventTypeList event_types; + if (bucket->versioned() && oc.o.is_current() && + !oc.o.is_delete_marker()) { + event_types.insert(event_types.end(), + {rgw::notify::ObjectTransitionCurrent, + rgw::notify::LifecycleTransition}); + } else { + event_types.push_back(rgw::notify::ObjectTransitionNonCurrent); } + send_notification(oc.dpp, oc.driver, obj.get(), oc.bucket, etag, size, + oc.o.key.instance, event_types); } if (delete_object) {