]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/topic: metadata handler adds/removes persistent queues
authorCasey Bodley <cbodley@redhat.com>
Thu, 18 Jan 2024 20:34:16 +0000 (15:34 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 5 Mar 2024 17:55:27 +0000 (12:55 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/driver/rados/topic.cc

index 3e409e0b07b3f477e580afdda06eef293794592f..86ce6bb819b4d5faa132c7023c4eb24bc9d3befd 100644 (file)
@@ -18,6 +18,7 @@
 #include "rgw_common.h"
 #include "rgw_metadata.h"
 #include "rgw_metadata_lister.h"
+#include "rgw_notify.h"
 #include "rgw_pubsub.h"
 #include "rgw_rados.h"
 #include "rgw_string.h"
@@ -315,15 +316,39 @@ class MetadataHandler : public RGWMetadataHandler {
     auto mtime = robj->get_mtime();
 
     constexpr bool exclusive = false;
-    int ret = write(dpp, y, sysobj, &mdlog, zone, info,
-                    objv_tracker, mtime, exclusive);
-    return ret < 0 ? ret : STATUS_APPLIED;
+    int r = write(dpp, y, sysobj, &mdlog, zone, info,
+                  objv_tracker, mtime, exclusive);
+    if (r < 0) {
+      return r;
+    }
+    if (!info.dest.push_endpoint.empty() && info.dest.persistent) {
+      r = rgw::notify::add_persistent_topic(info.name, y);
+      if (r < 0) {
+        ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic "
+            << info.name << " with: " << cpp_strerror(r) << dendl;
+        return r;
+      }
+    }
+    return STATUS_APPLIED;
   }
 
   int remove(std::string& entry, RGWObjVersionTracker& objv_tracker,
              optional_yield y, const DoutPrefixProvider *dpp) override
   {
-    return topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker);
+    int r = topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker);
+    if (r < 0) {
+      return r;
+    }
+    // delete persistent topic queue. expect ENOENT for non-persistent topics
+    std::string name;
+    std::string tenant;
+    parse_topic_metadata_key(entry, tenant, name);
+    r = rgw::notify::remove_persistent_topic(name, y);
+    if (r < 0 && r != -ENOENT) {
+      ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
+                        << name << " with error: " << r << dendl;
+    } // not fatal
+    return 0;
   }
 
   int mutate(const std::string& entry, const ceph::real_time& mtime,