#include "rgw_common.h"
#include "rgw_metadata.h"
#include "rgw_metadata_lister.h"
+#include "rgw_notify.h"
#include "rgw_pubsub.h"
#include "rgw_rados.h"
#include "rgw_string.h"
auto mtime = robj->get_mtime();
constexpr bool exclusive = false;
- int ret = write(dpp, y, sysobj, &mdlog, zone, info,
- objv_tracker, mtime, exclusive);
- return ret < 0 ? ret : STATUS_APPLIED;
+ int r = write(dpp, y, sysobj, &mdlog, zone, info,
+ objv_tracker, mtime, exclusive);
+ if (r < 0) {
+ return r;
+ }
+ if (!info.dest.push_endpoint.empty() && info.dest.persistent) {
+ r = rgw::notify::add_persistent_topic(info.name, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic "
+ << info.name << " with: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+ return STATUS_APPLIED;
}
int remove(std::string& entry, RGWObjVersionTracker& objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override
{
- return topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker);
+ int r = topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker);
+ if (r < 0) {
+ return r;
+ }
+ // delete persistent topic queue. expect ENOENT for non-persistent topics
+ std::string name;
+ std::string tenant;
+ parse_topic_metadata_key(entry, tenant, name);
+ r = rgw::notify::remove_persistent_topic(name, y);
+ if (r < 0 && r != -ENOENT) {
+ ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
+ << name << " with error: " << r << dendl;
+ } // not fatal
+ return 0;
}
int mutate(const std::string& entry, const ceph::real_time& mtime,