]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notify: publish functions use rgw_pubsub_dest::persistent_queue
authorCasey Bodley <cbodley@redhat.com>
Thu, 28 Mar 2024 17:29:37 +0000 (13:29 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 12 Apr 2024 19:34:30 +0000 (15:34 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 64ac1de01f7f26c7af3da481ec522ca7dd188c55)

src/rgw/driver/rados/rgw_notify.cc

index ba3312e2cc2d91c5286786a0046ce43d32539a1e..27577bfe7282327e8d68e8359bd0fedbbcf0c670 100644 (file)
@@ -1002,9 +1002,9 @@ int publish_reserve(const DoutPrefixProvider* dpp,
       return rc;
     }
   }
-  for (const auto& bucket_topic : bucket_topics.topics) {
-    const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
-    const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
+  for (auto& bucket_topic : bucket_topics.topics) {
+    rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
+    rgw_pubsub_topic& topic_cfg = topic_filter.topic;
     for (auto& event_type : event_types) {
       if (!notification_match(res, topic_filter, event_type, req_tags)) {
         // notification does not apply to req_state
@@ -1017,6 +1017,26 @@ int publish_reserve(const DoutPrefixProvider* dpp,
           << "') apply to event of type: '" << to_string(event_type) << "'"
           << dendl;
 
+      // reload the topic in case it changed since the notification was added
+      const RGWPubSub ps(res.store, res.user_tenant, site);
+      int ret = ps.get_topic(res.dpp, topic_cfg.dest.arn_topic,
+                             topic_cfg, res.yield, nullptr);
+      if (ret < 0) {
+        ldpp_dout(res.dpp, 1)
+            << "INFO: failed to load topic: " << topic_cfg.dest.arn_topic
+            << ". error: " << ret
+            << " while reserving persistent notification event" << dendl;
+        if (ret == -ENOENT) {
+          // either the topic is deleted but the corresponding notification
+          // still exist or in v2 mode the notification could have synced first
+          // but topic is not synced yet.
+          return 0;
+        }
+        ldpp_dout(res.dpp, 1)
+            << "WARN: Using the stored topic from bucket notification struct."
+            << dendl;
+      }
+
       cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
       if (topic_cfg.dest.persistent) {
         // TODO: take default reservation size from conf
@@ -1025,7 +1045,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
         librados::ObjectWriteOperation op;
         bufferlist obl;
         int rval;
-        const auto& queue_name = topic_cfg.dest.arn_topic;
+        const auto& queue_name = topic_cfg.dest.persistent_queue;
         cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
         auto ret = rgw_rados_operate(
             res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name,
@@ -1045,31 +1065,8 @@ int publish_reserve(const DoutPrefixProvider* dpp,
           return ret;
         }
       }
-      // load the topic,if there is change in topic config while it's stored in
-      // notification.
-      rgw_pubsub_topic result;
-      const RGWPubSub ps(res.store, res.user_tenant, site);
-      auto ret =
-          ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, result, res.yield, nullptr);
-      if (ret < 0) {
-        ldpp_dout(res.dpp, 1)
-            << "INFO: failed to load topic: " << topic_cfg.name
-            << ". error: " << ret
-            << " while reserving persistent notification event" << dendl;
-        if (ret == -ENOENT) {
-          // either the topic is deleted but the corresponding notification
-          // still exist or in v2 mode the notification could have synced first
-          // but topic is not synced yet.
-          return 0;
-        }
-        ldpp_dout(res.dpp, 1)
-            << "WARN: Using the stored topic from bucket notification struct."
-            << dendl;
-        res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id,
-                                event_type);
-      } else {
-        res.topics.emplace_back(topic_filter.s3_id, result, res_id, event_type);
-      }
+
+      res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type);
     }
   }
   return 0;
@@ -1105,7 +1102,7 @@ int publish_commit(rgw::sal::Object* obj,
       event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration;
       bufferlist bl;
       encode(event_entry, bl);
-      const auto& queue_name = topic.cfg.dest.arn_topic;
+      const auto& queue_name = topic.cfg.dest.persistent_queue;
       if (bl.length() > res.size) {
         // try to make a larger reservation, fail only if this is not possible
         ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
@@ -1118,7 +1115,7 @@ int publish_commit(rgw::sal::Object* obj,
         cls_2pc_queue_abort(op, topic.res_id);
         auto ret = rgw_rados_operate(
          dpp, res.store->getRados()->get_notif_pool_ctx(),
-         topic.cfg.dest.arn_topic, &op,
+         queue_name, &op,
          res.yield);
         if (ret < 0) {
           ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
@@ -1209,7 +1206,7 @@ int publish_abort(reservation_t& res) {
       // nothing to abort or already committed/aborted
       continue;
     }
-    const auto& queue_name = topic.cfg.dest.arn_topic;
+    const auto& queue_name = topic.cfg.dest.persistent_queue;
     librados::ObjectWriteOperation op;
     cls_2pc_queue_abort(op, topic.res_id);
     const auto ret = rgw_rados_operate(