From: Casey Bodley Date: Thu, 28 Mar 2024 17:29:37 +0000 (-0400) Subject: rgw/notify: publish functions use rgw_pubsub_dest::persistent_queue X-Git-Tag: v19.1.0~99^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6bc65dd55b287e20c69b3e938e0f292cfcba1d3d;p=ceph.git rgw/notify: publish functions use rgw_pubsub_dest::persistent_queue Signed-off-by: Casey Bodley (cherry picked from commit 64ac1de01f7f26c7af3da481ec522ca7dd188c55) --- diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index ba3312e2cc2d9..27577bfe72823 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -1002,9 +1002,9 @@ int publish_reserve(const DoutPrefixProvider* dpp, return rc; } } - for (const auto& bucket_topic : bucket_topics.topics) { - const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; - const rgw_pubsub_topic& topic_cfg = topic_filter.topic; + for (auto& bucket_topic : bucket_topics.topics) { + rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; + rgw_pubsub_topic& topic_cfg = topic_filter.topic; for (auto& event_type : event_types) { if (!notification_match(res, topic_filter, event_type, req_tags)) { // notification does not apply to req_state @@ -1017,6 +1017,26 @@ int publish_reserve(const DoutPrefixProvider* dpp, << "') apply to event of type: '" << to_string(event_type) << "'" << dendl; + // reload the topic in case it changed since the notification was added + const RGWPubSub ps(res.store, res.user_tenant, site); + int ret = ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, + topic_cfg, res.yield, nullptr); + if (ret < 0) { + ldpp_dout(res.dpp, 1) + << "INFO: failed to load topic: " << topic_cfg.dest.arn_topic + << ". error: " << ret + << " while reserving persistent notification event" << dendl; + if (ret == -ENOENT) { + // either the topic is deleted but the corresponding notification + // still exist or in v2 mode the notification could have synced first + // but topic is not synced yet. + return 0; + } + ldpp_dout(res.dpp, 1) + << "WARN: Using the stored topic from bucket notification struct." + << dendl; + } + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; if (topic_cfg.dest.persistent) { // TODO: take default reservation size from conf @@ -1025,7 +1045,7 @@ int publish_reserve(const DoutPrefixProvider* dpp, librados::ObjectWriteOperation op; bufferlist obl; int rval; - const auto& queue_name = topic_cfg.dest.arn_topic; + const auto& queue_name = topic_cfg.dest.persistent_queue; cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval); auto ret = rgw_rados_operate( res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name, @@ -1045,31 +1065,8 @@ int publish_reserve(const DoutPrefixProvider* dpp, return ret; } } - // load the topic,if there is change in topic config while it's stored in - // notification. - rgw_pubsub_topic result; - const RGWPubSub ps(res.store, res.user_tenant, site); - auto ret = - ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, result, res.yield, nullptr); - if (ret < 0) { - ldpp_dout(res.dpp, 1) - << "INFO: failed to load topic: " << topic_cfg.name - << ". error: " << ret - << " while reserving persistent notification event" << dendl; - if (ret == -ENOENT) { - // either the topic is deleted but the corresponding notification - // still exist or in v2 mode the notification could have synced first - // but topic is not synced yet. - return 0; - } - ldpp_dout(res.dpp, 1) - << "WARN: Using the stored topic from bucket notification struct." - << dendl; - res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, - event_type); - } else { - res.topics.emplace_back(topic_filter.s3_id, result, res_id, event_type); - } + + res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type); } } return 0; @@ -1105,7 +1102,7 @@ int publish_commit(rgw::sal::Object* obj, event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration; bufferlist bl; encode(event_entry, bl); - const auto& queue_name = topic.cfg.dest.arn_topic; + const auto& queue_name = topic.cfg.dest.persistent_queue; if (bl.length() > res.size) { // try to make a larger reservation, fail only if this is not possible ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() @@ -1118,7 +1115,7 @@ int publish_commit(rgw::sal::Object* obj, cls_2pc_queue_abort(op, topic.res_id); auto ret = rgw_rados_operate( dpp, res.store->getRados()->get_notif_pool_ctx(), - topic.cfg.dest.arn_topic, &op, + queue_name, &op, res.yield); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " @@ -1209,7 +1206,7 @@ int publish_abort(reservation_t& res) { // nothing to abort or already committed/aborted continue; } - const auto& queue_name = topic.cfg.dest.arn_topic; + const auto& queue_name = topic.cfg.dest.persistent_queue; librados::ObjectWriteOperation op; cls_2pc_queue_abort(op, topic.res_id); const auto ret = rgw_rados_operate(