]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: use rgw_pubsub_dest::persistent_queue for queue oid
authorCasey Bodley <cbodley@redhat.com>
Wed, 20 Mar 2024 18:16:15 +0000 (14:16 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 12 Apr 2024 19:34:30 +0000 (15:34 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 4bac81a2bc540b110003b7e5a86c8670ea2a2cc8)

src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_notify.h
src/rgw/driver/rados/topic.cc
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_rest_pubsub.cc

index 275bfc1d72345329ce64653c741b52a423e89740..ba3312e2cc2d91c5286786a0046ce43d32539a1e 100644 (file)
@@ -1226,18 +1226,19 @@ int publish_abort(reservation_t& res) {
   return 0;
 }
 
-int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
-                                             const std::string &topic_name, rgw_topic_stats &stats, optional_yield y)
+int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+                               const std::string &queue_name, rgw_topic_stats &stats, optional_yield y)
 {
+  // TODO: use optional_yield instead calling rados_ioctx.operate() synchronously
   cls_2pc_reservations reservations;
-  auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations);
+  auto ret = cls_2pc_queue_list_reservations(rados_ioctx, queue_name, reservations);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
     return ret;
   }
   stats.queue_reservations = reservations.size();
 
-  ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size);
+  ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, stats.queue_entries, stats.queue_size);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
     return ret;
index 87e9791053725dde952e9ada8f983a00d67d61f2..7014cda3ca35011ce93d151503545d931896c44b 100644 (file)
@@ -134,8 +134,8 @@ int publish_commit(rgw::sal::Object* obj,
 // cancel the reservation
 int publish_abort(reservation_t& reservation);
 
-int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
-                                             const std::string &topic_name, rgw_topic_stats &stats, optional_yield y);
+int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+                               const std::string &queue_name, rgw_topic_stats &stats, optional_yield y);
 
 }
 
index 3e43a67012e99a61766dc8db90f44829404f2a6b..7769649fbbfe767907fdfd9cb616b6eba6cf1c04 100644 (file)
@@ -352,11 +352,12 @@ class MetadataHandler : public RGWMetadataHandler {
     if (r < 0) {
       return r;
     }
-    if (!info.dest.push_endpoint.empty() && info.dest.persistent) {
-      r = rgw::notify::add_persistent_topic(info.name, y);
+    if (!info.dest.push_endpoint.empty() && info.dest.persistent &&
+        !info.dest.persistent_queue.empty()) {
+      r = rgw::notify::add_persistent_topic(info.dest.persistent_queue, y);
       if (r < 0) {
         ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic "
-            << info.name << " with: " << cpp_strerror(r) << dendl;
+            << info.dest.persistent_queue << " with: " << cpp_strerror(r) << dendl;
         return r;
       }
     }
@@ -370,18 +371,29 @@ class MetadataHandler : public RGWMetadataHandler {
     std::string tenant;
     parse_topic_metadata_key(entry, tenant, name);
 
-    int r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone,
-                          tenant, name, objv_tracker);
+    rgw_pubsub_topic info;
+    int r = read(dpp, y, sysobj, cache_svc, zone, entry,
+                 info, cache, nullptr, &objv_tracker);
     if (r < 0) {
       return r;
     }
 
-    // delete persistent topic queue. expect ENOENT for non-persistent topics
-    r = rgw::notify::remove_persistent_topic(name, y);
-    if (r < 0 && r != -ENOENT) {
-      ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
-                        << name << " with error: " << r << dendl;
-    } // not fatal
+    r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone,
+                      tenant, name, objv_tracker);
+    if (r < 0) {
+      return r;
+    }
+
+    const rgw_pubsub_dest& dest = info.dest;
+    if (!dest.push_endpoint.empty() && dest.persistent &&
+        !dest.persistent_queue.empty()) {
+      // delete persistent topic queue
+      r = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
+      if (r < 0 && r != -ENOENT) {
+        ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
+                          << name << " with error: " << r << dendl;
+      } // not fatal
+    }
     return 0;
   }
 
index 127a225ea363e1ef940575700058268b8fe84ad2..b9dc45a7418592bee4c7796b65ffc7a98668e3a4 100644 (file)
@@ -11238,11 +11238,25 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
+    const std::string& account = !account_id.empty() ? account_id : tenant;
+    RGWPubSub ps(driver, account, *site);
+
+    rgw_pubsub_topic topic;
+    ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr);
+    if (ret < 0) {
+      cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    if (topic.dest.persistent_queue.empty()) {
+      cerr << "This topic does not have a persistent queue." << std::endl;
+      return ENOENT;
+    }
 
     rgw::notify::rgw_topic_stats stats;
-    ret = rgw::notify::get_persistent_queue_stats_by_topic_name(
-        dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(), topic_name,
-        stats, null_yield);
+    ret = rgw::notify::get_persistent_queue_stats(
+        dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(),
+        topic.dest.persistent_queue, stats, null_yield);
     if (ret < 0) {
       cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
       return -ret;
index 641381e8b6b9f8d04cc5abb6335492eb9f6a62cc..e99d36017942f9f46a91ec679bd8563aab085e72 100644 (file)
@@ -1083,7 +1083,7 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
   const rgw_pubsub_dest& dest = topic.dest;
   if (!dest.push_endpoint.empty() && dest.persistent &&
       !dest.persistent_queue.empty()) {
-    ret = rgw::notify::remove_persistent_topic(topic.name, y);
+    ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
     if (ret < 0 && ret != -ENOENT) {
       ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
           "persistent topic: " << cpp_strerror(ret) << dendl;
@@ -1130,7 +1130,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
 
   if (!dest.push_endpoint.empty() && dest.persistent &&
       !dest.persistent_queue.empty()) {
-    ret = rgw::notify::remove_persistent_topic(name, y);
+    ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
     if (ret < 0 && ret != -ENOENT) {
       ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
           "persistent topic: " << cpp_strerror(ret) << dendl;
index ff5c824a6be6d87a26cdcc9a16a9ff5d86e77198..ca54d0c324f85f54a3075c4294e3aae971c4c254 100644 (file)
@@ -111,6 +111,11 @@ bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
     return false;
 }
 
+static bool topic_needs_queue(const rgw_pubsub_dest& dest)
+{
+  return !dest.push_endpoint.empty() && dest.persistent;
+}
+
 auto get_policy_from_text(req_state* const s, const std::string& policy_text)
   -> boost::optional<rgw::IAM::Policy>
 {
@@ -306,12 +311,6 @@ class RGWPSCreateTopicOp : public RGWOp {
     ret = ps.get_topic(this, topic_name, result, y, nullptr);
     if (ret == -ENOENT) {
       // topic not present
-
-      // initialize the persistent queue's location. this cannot change for
-      // existing topics. use ':' as the namespace delimiter because its
-      // inclusion in a TopicName would break ARNs
-      dest.persistent_queue = string_cat_reserve(
-          get_account_or_tenant(s->owner.id), ":", topic_name);
     } else if (ret < 0) {
       ldpp_dout(this, 1) << "failed to read topic '" << topic_name
           << "', with error:" << ret << dendl;
@@ -387,8 +386,16 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
       return;
     }
   }
-  if (!dest.push_endpoint.empty() && dest.persistent) {
-    op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+
+  // don't add a persistent queue if we already have one
+  const bool already_persistent = topic && topic_needs_queue(topic->dest);
+  if (!already_persistent && topic_needs_queue(dest)) {
+    // initialize the persistent queue's location, using ':' as the namespace
+    // delimiter because its inclusion in a TopicName would break ARNs
+    dest.persistent_queue = string_cat_reserve(
+        get_account_or_tenant(s->owner.id), ":", topic_name);
+
+    op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
                             "persistent topics. error:"
@@ -838,8 +845,15 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
       return;
     }
   }
-  if (!dest.push_endpoint.empty() && dest.persistent) {
-    op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+  // don't add a persistent queue if we already have one
+  const bool already_persistent = topic_needs_queue(result.dest);
+  if (!already_persistent && topic_needs_queue(dest)) {
+    // initialize the persistent queue's location, using ':' as the namespace
+    // delimiter because its inclusion in a TopicName would break ARNs
+    dest.persistent_queue = string_cat_reserve(
+        get_account_or_tenant(s->owner.id), ":", topic_name);
+
+    op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
     if (op_ret < 0) {
       ldpp_dout(this, 4)
           << "SetTopicAttributes Action failed to create queue for "
@@ -847,8 +861,9 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
           << op_ret << dendl;
       return;
     }
-  } else {  // changing the persistent topic to non-persistent.
-    op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+  } else if (already_persistent) {
+    // changing the persistent topic to non-persistent.
+    op_ret = rgw::notify::remove_persistent_topic(result.dest.persistent_queue, s->yield);
     if (op_ret != -ENOENT && op_ret < 0) {
       ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
                             "for persistent topics. error:"