From 64ac1de01f7f26c7af3da481ec522ca7dd188c55 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 28 Mar 2024 13:29:37 -0400 Subject: [PATCH] rgw/notify: publish functions use rgw_pubsub_dest::persistent_queue Signed-off-by: Casey Bodley --- src/rgw/driver/rados/rgw_notify.cc | 61 ++++++++++++++---------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 0b235b5620ec5..11623dd48f8bb 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -1024,9 +1024,9 @@ int publish_reserve(const DoutPrefixProvider* dpp, return rc; } } - for (const auto& bucket_topic : bucket_topics.topics) { - const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; - const rgw_pubsub_topic& topic_cfg = topic_filter.topic; + for (auto& bucket_topic : bucket_topics.topics) { + rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; + rgw_pubsub_topic& topic_cfg = topic_filter.topic; for (auto& event_type : event_types) { if (!notification_match(res, topic_filter, event_type, req_tags)) { // notification does not apply to req_state @@ -1039,6 +1039,26 @@ int publish_reserve(const DoutPrefixProvider* dpp, << "') apply to event of type: '" << to_string(event_type) << "'" << dendl; + // reload the topic in case it changed since the notification was added + const RGWPubSub ps(res.store, res.user_tenant, site); + int ret = ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, + topic_cfg, res.yield, nullptr); + if (ret < 0) { + ldpp_dout(res.dpp, 1) + << "INFO: failed to load topic: " << topic_cfg.dest.arn_topic + << ". error: " << ret + << " while reserving persistent notification event" << dendl; + if (ret == -ENOENT) { + // either the topic is deleted but the corresponding notification + // still exist or in v2 mode the notification could have synced first + // but topic is not synced yet. + return 0; + } + ldpp_dout(res.dpp, 1) + << "WARN: Using the stored topic from bucket notification struct." + << dendl; + } + cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID; if (topic_cfg.dest.persistent) { // TODO: take default reservation size from conf @@ -1047,7 +1067,7 @@ int publish_reserve(const DoutPrefixProvider* dpp, librados::ObjectWriteOperation op; bufferlist obl; int rval; - const auto& queue_name = topic_cfg.dest.arn_topic; + const auto& queue_name = topic_cfg.dest.persistent_queue; cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval); auto ret = rgw_rados_operate( res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name, @@ -1067,31 +1087,8 @@ int publish_reserve(const DoutPrefixProvider* dpp, return ret; } } - // load the topic,if there is change in topic config while it's stored in - // notification. - rgw_pubsub_topic result; - const RGWPubSub ps(res.store, res.user_tenant, site); - auto ret = - ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, result, res.yield, nullptr); - if (ret < 0) { - ldpp_dout(res.dpp, 1) - << "INFO: failed to load topic: " << topic_cfg.name - << ". error: " << ret - << " while reserving persistent notification event" << dendl; - if (ret == -ENOENT) { - // either the topic is deleted but the corresponding notification - // still exist or in v2 mode the notification could have synced first - // but topic is not synced yet. - return 0; - } - ldpp_dout(res.dpp, 1) - << "WARN: Using the stored topic from bucket notification struct." - << dendl; - res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, - event_type); - } else { - res.topics.emplace_back(topic_filter.s3_id, result, res_id, event_type); - } + + res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type); } } return 0; @@ -1127,7 +1124,7 @@ int publish_commit(rgw::sal::Object* obj, event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration; bufferlist bl; encode(event_entry, bl); - const auto& queue_name = topic.cfg.dest.arn_topic; + const auto& queue_name = topic.cfg.dest.persistent_queue; if (bl.length() > res.size) { // try to make a larger reservation, fail only if this is not possible ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() @@ -1140,7 +1137,7 @@ int publish_commit(rgw::sal::Object* obj, cls_2pc_queue_abort(op, topic.res_id); auto ret = rgw_rados_operate( dpp, res.store->getRados()->get_notif_pool_ctx(), - topic.cfg.dest.arn_topic, &op, + queue_name, &op, res.yield); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " @@ -1224,7 +1221,7 @@ int publish_abort(reservation_t& res) { // nothing to abort or already committed/aborted continue; } - const auto& queue_name = topic.cfg.dest.arn_topic; + const auto& queue_name = topic.cfg.dest.persistent_queue; librados::ObjectWriteOperation op; cls_2pc_queue_abort(op, topic.res_id); const auto ret = rgw_rados_operate( -- 2.39.5