From 4f33f3c60d4fd34fc87be4c96e0720f264dafa37 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 18 Jan 2024 15:34:16 -0500 Subject: [PATCH] rgw/topic: metadata handler adds/removes persistent queues Signed-off-by: Casey Bodley (cherry picked from commit 633a5bef314b5fbc3087271d684f22f9beafb21a) --- src/rgw/driver/rados/topic.cc | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/rgw/driver/rados/topic.cc b/src/rgw/driver/rados/topic.cc index 3e409e0b07b3f..86ce6bb819b4d 100644 --- a/src/rgw/driver/rados/topic.cc +++ b/src/rgw/driver/rados/topic.cc @@ -18,6 +18,7 @@ #include "rgw_common.h" #include "rgw_metadata.h" #include "rgw_metadata_lister.h" +#include "rgw_notify.h" #include "rgw_pubsub.h" #include "rgw_rados.h" #include "rgw_string.h" @@ -315,15 +316,39 @@ class MetadataHandler : public RGWMetadataHandler { auto mtime = robj->get_mtime(); constexpr bool exclusive = false; - int ret = write(dpp, y, sysobj, &mdlog, zone, info, - objv_tracker, mtime, exclusive); - return ret < 0 ? ret : STATUS_APPLIED; + int r = write(dpp, y, sysobj, &mdlog, zone, info, + objv_tracker, mtime, exclusive); + if (r < 0) { + return r; + } + if (!info.dest.push_endpoint.empty() && info.dest.persistent) { + r = rgw::notify::add_persistent_topic(info.name, y); + if (r < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic " + << info.name << " with: " << cpp_strerror(r) << dendl; + return r; + } + } + return STATUS_APPLIED; } int remove(std::string& entry, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override { - return topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker); + int r = topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker); + if (r < 0) { + return r; + } + // delete persistent topic queue. expect ENOENT for non-persistent topics + std::string name; + std::string tenant; + parse_topic_metadata_key(entry, tenant, name); + r = rgw::notify::remove_persistent_topic(name, y); + if (r < 0 && r != -ENOENT) { + ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: " + << name << " with error: " << r << dendl; + } // not fatal + return 0; } int mutate(const std::string& entry, const ceph::real_time& mtime, -- 2.39.5