}
}
- // bucket notifications
- rgw_pubsub_bucket_topics result;
- if (driver->get_zone()->get_zonegroup().supports_feature(
- rgw::zone_features::notification_v2)) {
- ret = get_bucket_notifications(dpp, bucket.get(), result);
- if (ret < 0) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
- << std::endl;
- return -ret;
- }
- } else {
- RGWPubSub ps(driver, tenant_name);
- const RGWPubSub::Bucket b(ps, bucket.get());
- ret = b.get_topics(dpp, result, y);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
- << std::endl;
- return -ret;
- }
- }
- result.dump(formatter);
-
// TODO: bucket CORS
// TODO: bucket LC
formatter->close_section();
return ret;
}
+int update_bucket_topic_mappings(const DoutPrefixProvider* dpp,
+ RGWBucketCompleteInfo* orig_bci,
+ RGWBucketCompleteInfo* current_bci,
+ rgw::sal::Driver* driver) {
+ const auto decode_attrs = [](const rgw::sal::Attrs& attrs,
+ rgw_pubsub_bucket_topics& bucket_topics) -> int {
+ auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (iter == attrs.end()) {
+ return 0;
+ }
+ try {
+ const auto& bl = iter->second;
+ auto biter = bl.cbegin();
+ bucket_topics.decode(biter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ return 0;
+ };
+ std::string bucket_name;
+ std::string bucket_tenant;
+ rgw_pubsub_bucket_topics old_bucket_topics;
+ if (orig_bci) {
+ auto ret = decode_attrs(orig_bci->attrs, old_bucket_topics);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed to decode OLD bucket topics for bucket: "
+ << orig_bci->info.bucket.name << dendl;
+ return ret;
+ }
+ bucket_name = orig_bci->info.bucket.name;
+ bucket_tenant = orig_bci->info.bucket.tenant;
+ }
+ rgw_pubsub_bucket_topics current_bucket_topics;
+ if (current_bci) {
+ auto ret = decode_attrs(current_bci->attrs, current_bucket_topics);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed to decode current bucket topics for bucket: "
+ << current_bci->info.bucket.name << dendl;
+ return ret;
+ }
+ bucket_name = current_bci->info.bucket.name;
+ bucket_tenant = current_bci->info.bucket.tenant;
+ }
+ // fetch the list of subscribed topics stored inside old_bucket attrs.
+ std::unordered_map<std::string, rgw_pubsub_topic> old_topics;
+ for (const auto& [_, topic_filter] : old_bucket_topics.topics) {
+ old_topics[topic_filter.topic.name] = topic_filter.topic;
+ }
+ // fetch the list of subscribed topics stored inside current_bucket attrs.
+ std::unordered_map<std::string, rgw_pubsub_topic> current_topics;
+ for (const auto& [_, topic_filter] : current_bucket_topics.topics) {
+ current_topics[topic_filter.topic.name] = topic_filter.topic;
+ }
+ // traverse thru old topics and check if they are not in current, then delete
+ // the mapping, if present in both current and old then delete from current
+ // set as we do not need to update those mapping.
+ int ret = 0;
+ for (const auto& [topic_name, topic] : old_topics) {
+ auto it = current_topics.find(topic_name);
+ if (it == current_topics.end()) {
+ const auto op_ret = driver->update_bucket_topic_mapping(
+ topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
+ /*add_mapping=*/false, null_yield, dpp);
+ if (op_ret < 0) {
+ ret = op_ret;
+ }
+ } else {
+ // already that attr is present, so do not update the mapping.
+ current_topics.erase(it);
+ }
+ }
+ // traverse thru current topics and check if they are any present, then add
+ // the mapping.
+ for (const auto& [topic_name, topic] : current_topics) {
+ const auto op_ret = driver->update_bucket_topic_mapping(
+ topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
+ /*add_mapping=*/true, null_yield, dpp);
+ if (op_ret < 0) {
+ ret = op_ret;
+ }
+ }
+ return ret;
+}
+
static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
if (ret < 0 && ret != -ENOENT)
return ret;
- return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
+ ret = svc.bucket->remove_bucket_instance_info(
+ ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
+ if (ret < 0)
+ return ret;
+ ret = update_bucket_topic_mappings(dpp, &bci, /*current_bci=*/nullptr,
+ driver);
+ // update_bucket_topic_mapping error is swallowed.
+ return 0;
}
int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
}
} /* update lc */
+ /* update bucket topic mapping */
+ {
+ auto* orig_obj = static_cast<RGWBucketInstanceMetadataObject*>(old_obj);
+ auto* orig_bci = (orig_obj ? &orig_obj->get_bci() : nullptr);
+ ret = update_bucket_topic_mappings(dpp, orig_bci, &bci, bihandler->driver);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__
+ << " failed to apply bucket topic mapping for "
+ << bci.info.bucket.name << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << __func__
+ << " successfully applied bucket topic mapping for "
+ << bci.info.bucket.name << dendl;
+ }
return STATUS_APPLIED;
}
RGWPubSub ps(&rados_store, tenant_name);
rgw_pubsub_topic topic;
- auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield));
+ auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
+ optional_yield(io_context, yield), nullptr);
if (ret_of_get_topic < 0) {
// we can't migrate entries without topic info
ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
const RGWPubSub ps(
res.store, res.user_tenant,
&res.store->svc()->zone->get_current_period().get_map().zonegroups);
- auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield);
+ auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
if (ret < 0) {
ldpp_dout(res.dpp, 1)
<< "INFO: failed to load topic: " << topic_cfg.name
if (ret < 0)
return ret;
+ ret = open_topics_pool_ctx(dpp);
+ if (ret < 0)
+ return ret;
+
pools_initialized = true;
if (use_gc) {
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
}
+int RGWRados::open_topics_pool_ctx(const DoutPrefixProvider* dpp) {
+ return rgw_init_ioctx(dpp, get_rados_handle(),
+ svc.zone->get_zone_params().topics_pool,
+ topics_pool_ctx, true, true);
+}
+
int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk)
{
int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
+ int open_topics_pool_ctx(const DoutPrefixProvider* dpp);
int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk);
librados::IoCtx objexp_pool_ctx;
librados::IoCtx reshard_pool_ctx;
librados::IoCtx notif_pool_ctx; // .rgw.notif
+ librados::IoCtx topics_pool_ctx; // .rgw.meta:topics
bool pools_initialized{false};
return notif_pool_ctx;
}
+ librados::IoCtx& get_topics_pool_ctx() { return topics_pool_ctx; }
+
void set_context(CephContext *_cct) {
cct = _cct;
}
this, get_attrs(), merge_attrs);
}
+ // remove bucket-topic mapping
+ auto iter = get_attrs().find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (iter != get_attrs().end()) {
+ rgw_pubsub_bucket_topics bucket_topics;
+ try {
+ const auto& bl = iter->second;
+ auto biter = bl.cbegin();
+ bucket_topics.decode(biter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to decode bucket topics for bucket: "
+ << get_name() << dendl;
+ }
+ if (!bucket_topics.topics.empty()) {
+ ret = store->remove_bucket_mapping_from_topics(
+ bucket_topics, rgw_make_bucket_entry_name(get_tenant(), get_name()),
+ y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: unable to remove notifications from bucket "
+ << get_name() << ". ret=" << ret << dendl;
+ }
+ }
+ }
+
ret = store->ctl()->bucket->sync_user_stats(dpp, info.owner, info, y, nullptr);
if (ret < 0) {
ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
params, objv_tracker, y, dpp);
}
+int RadosStore::remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ // remove the bucket name from the topic-bucket omap for each topic
+ // subscribed.
+ std::unordered_set<std::string> topics_mapping_to_remove;
+ int ret = 0;
+ for (const auto& [_, topic_filter] : bucket_topics.topics) {
+ if (!topics_mapping_to_remove.insert(topic_filter.topic.name).second) {
+ continue; // already removed.
+ }
+ int op_ret = update_bucket_topic_mapping(topic_filter.topic, bucket_key,
+ /*add_mapping=*/false, y, dpp);
+ if (op_ret < 0) {
+ ret = op_ret;
+ }
+ }
+ return ret;
+}
+
+int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ bufferlist empty_bl;
+ librados::ObjectWriteOperation op;
+ int ret = 0;
+ if (add_mapping) {
+ std::map<std::string, bufferlist> mapping{{bucket_key, empty_bl}};
+ op.omap_set(mapping);
+ } else {
+ std::set<std::string> to_rm{{bucket_key}};
+ op.omap_rm_keys(to_rm);
+ }
+ ret = rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+ get_bucket_topic_mapping_oid(topic), &op, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to " << (add_mapping ? "add" : "remove")
+ << " topic bucket mapping for bucket: " << bucket_key
+ << " and topic: " << topic.name << " with ret:" << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "Successfully " << (add_mapping ? "added" : "removed")
+ << " topic bucket mapping for bucket: " << bucket_key
+ << " and topic: " << topic.name << dendl;
+ return ret;
+}
+
+int RadosStore::get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ constexpr auto max_chunk = 1024U;
+ std::string start_after;
+ bool more = true;
+ int rval;
+ while (more) {
+ librados::ObjectReadOperation op;
+ std::set<std::string> curr_keys;
+ op.omap_get_keys2(start_after, max_chunk, &curr_keys, &more, &rval);
+ const auto ret =
+ rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+ get_bucket_topic_mapping_oid(topic), &op, nullptr, y);
+ if (ret == -ENOENT) {
+ // mapping object was not created - nothing to do
+ return 0;
+ }
+ if (ret < 0) {
+ // TODO: do we need to check on rval as well as ret?
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed to read bucket topic mapping object for topic: "
+ << topic.name << ", ret= " << ret << dendl;
+ return ret;
+ }
+ if (more) {
+ if (curr_keys.empty()) {
+ return -EINVAL; // something wrong.
+ }
+ start_after = *curr_keys.rbegin();
+ }
+ bucket_keys.merge(curr_keys);
+ }
+ return 0;
+}
+
+int RadosStore::delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ librados::ObjectWriteOperation op;
+ op.remove();
+ const int ret =
+ rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+ get_bucket_topic_mapping_oid(topic), &op, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed removing bucket topic mapping omap for topic: "
+ << topic.name << ", ret=" << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20)
+ << "Successfully deleted topic bucket mapping omap for topic: "
+ << topic.name << dendl;
+ return 0;
+}
+
int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
{
return rados->delete_raw_obj(dpp, obj, y);
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override;
+ int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
formatter->flush(cout);
}
+static void show_topics_info_v2(const rgw_pubsub_topic& topic,
+ std::set<std::string> subscribed_buckets,
+ Formatter* formatter) {
+ formatter->open_object_section("topic");
+ topic.dump(formatter);
+ encode_json("subscribed_buckets", subscribed_buckets, formatter);
+ formatter->close_section();
+}
+
class StoreDestructor {
rgw::sal::Driver* driver;
public:
}
}
}
- encode_json("result", result, formatter.get());
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ Formatter::ObjectSection top_section(*formatter, "result");
+ Formatter::ArraySection s(*formatter, "topics");
+ for (const auto& [_, topic] : result.topics) {
+ std::set<std::string> subscribed_buckets;
+ ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
+ null_yield, dpp());
+ if (ret < 0) {
+ cerr << "failed to fetch bucket topic mapping info for topic: "
+ << topic.name << ", ret=" << ret << std::endl;
+ } else {
+ show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+ }
+ }
+ } else {
+ encode_json("result", result, formatter.get());
+ }
formatter->flush(cout);
}
RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
rgw_pubsub_topic topic;
- ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
+ std::set<std::string> subscribed_buckets;
+ ret =
+ ps.get_topic(dpp(), topic_name, topic, null_yield, &subscribed_buckets);
if (ret < 0) {
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- encode_json("topic", topic, formatter.get());
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+ } else {
+ encode_json("topic", topic, formatter.get());
+ }
formatter->flush(cout);
}
#include "rgw_xml.h"
#include "rgw_arn.h"
#include "rgw_pubsub_push.h"
+#include "rgw_bucket.h"
#include "common/errno.h"
+#include "svc_topic_rados.h"
#include <regex>
#include <algorithm>
encode_xml_key_value_entry("TopicArn", arn, f);
encode_xml_key_value_entry("OpaqueData", opaque_data, f);
encode_xml_key_value_entry("Policy", policy_text, f);
+ std::ostringstream stream;
f->close_section(); // Attributes
}
<< "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
break;
}
- for (auto& topic_name : topics) {
- rgw_pubsub_topic topic;
- int ret = get_topic(dpp, topic_name, topic, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name
- << "' info: ret=" << ret << dendl;
- continue;
- }
- result.topics[topic_name] = std::move(topic);
+ for (auto& topic_entry : topics) {
+ std::string topic_name;
+ std::string topic_tenant;
+ parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
+ if (tenant != topic_tenant) {
+ continue;
+ }
+ rgw_pubsub_topic topic;
+ const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr);
+ if (op_ret < 0) {
+ ret = op_ret;
+ continue;
+ }
+ result.topics[topic_name] = std::move(topic);
}
} while (truncated);
driver->meta_list_keys_complete(handle);
return 0;
}
-int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
-{
+int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
+ const std::string& name,
+ rgw_pubsub_topic& result,
+ optional_yield y,
+ std::set<std::string>* subscribed_buckets) const {
if (use_notification_v2) {
- const int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
+ int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
<< " tenant: " << tenant << ", ret=" << ret << dendl;
+ return ret;
+ }
+ if (subscribed_buckets) {
+ ret =
+ driver->get_bucket_topic_mapping(result, *subscribed_buckets, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "failed to fetch bucket topic mapping info for topic: " << name
+ << " tenant: " << tenant << ", ret=" << ret << dendl;
+ }
}
return ret;
}
return std::nullopt;
}
-int delete_all_notifications(const DoutPrefixProvider* dpp,
- const rgw_pubsub_bucket_topics& bucket_topics,
- std::map<std::string, bufferlist>& attrs,
- rgw::sal::Bucket* bucket,
- rgw::sal::Driver* driver,
- optional_yield y) {
+int store_bucket_attrs_and_update_mapping(
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Driver* driver,
+ rgw::sal::Bucket* bucket,
+ rgw_pubsub_bucket_topics& bucket_topics,
+ const rgw_pubsub_topic& topic,
+ optional_yield y) {
+ rgw::sal::Attrs& attrs = bucket->get_attrs();
+ if (!bucket_topics.topics.empty()) {
+ bufferlist bl;
+ bucket_topics.encode(bl);
+ attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+ } else {
+ auto it = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (it != attrs.end()) {
+ attrs.erase(it);
+ }
+ }
+ auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
+ << bucket->get_name() << " returned err= " << ret << dendl;
+ return ret;
+ }
+ if (bucket_topics.topics.empty()) {
+ // remove the bucket name from the topic-bucket omap
+ auto op_ret = driver->update_bucket_topic_mapping(
+ topic,
+ rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()),
+ /*add_mapping=*/false, y, dpp);
+ if (op_ret < 0) {
+ // TODO: should the error be reported, as attrs are already deleted.
+ // ret = op_ret;
+ }
+ }
+ return ret;
+}
+
+int delete_notification_attrs(const DoutPrefixProvider* dpp,
+ rgw::sal::Bucket* bucket,
+ optional_yield y) {
+ auto& attrs = bucket->get_attrs();
auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
if (iter == attrs.end()) {
return 0;
}
// delete all notifications of on a bucket
attrs.erase(iter);
- const auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+ auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "Failed to remove RGW_ATTR_BUCKET_NOTIFICATION attr on bucket="
if (bucket_topics.topics.empty()) {
return 0;
}
- rgw::sal::Attrs& attrs = bucket->get_attrs();
+ // delete all notifications
if (notification_id.empty()) {
- return delete_all_notifications(dpp, bucket_topics, attrs, bucket, driver,
- y);
+ ret = delete_notification_attrs(dpp, bucket, y);
+ if (ret < 0) {
+ return ret;
+ }
+ int op_ret = driver->remove_bucket_mapping_from_topics(
+ bucket_topics,
+ rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()), y,
+ dpp);
+ if (op_ret < 0) {
+ // TODO: should the error be reported, as attrs are already deleted.
+ // ret = op_ret;
+ }
+ return ret;
}
+
// delete a specific notification
const auto unique_topic = find_unique_topic(bucket_topics, notification_id);
if (!unique_topic) {
}
const auto& topic_name = unique_topic->topic.name;
bucket_topics.topics.erase(topic_to_unique(topic_name, notification_id));
- bufferlist bl;
- bucket_topics.encode(bl);
- attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
- ret = bucket->merge_and_store_attrs(dpp, attrs, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1)
- << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
- << bucket->get_name() << " returned err= " << ret << dendl;
- }
- return ret;
+ return store_bucket_attrs_and_update_mapping(
+ dpp, driver, bucket, bucket_topics, unique_topic->topic, y);
}
int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const {
rgw_pubsub_topic topic_info;
- int ret = ps.get_topic(dpp, topic_name, topic_info, y);
+ int ret = ps.get_topic(dpp, topic_name, topic_info, y, nullptr);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
return ret;
optional_yield y) const {
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topic topic;
- int ret = get_topic(dpp, name, topic, y);
+ int ret = get_topic(dpp, name, topic, y, nullptr);
if (ret < 0 && ret != -ENOENT) {
return ret;
} else if (ret == -ENOENT) {
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
<< dendl;
+ return ret;
}
+ ret = driver->delete_bucket_topic_mapping(topic, y, dpp);
return ret;
}
int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
return read_topics(dpp, result, nullptr, y);
}
- // get a bucket_topic with by its name and populate it into "result"
- // return -ENOENT if the topic does not exists
- // return 0 on success, error code otherwise
- int get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, rgw_pubsub_topic_filter& result, optional_yield y) const;
// adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
// assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
// for S3 compliant notifications the version with: s3_filter and notif_name should be used
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
- const rgw::notify::EventTypeList& events, optional_yield y) const;
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
return read_topics(dpp, result, nullptr, y);
}
// get a topic with by its name and populate it into "result"
- // return -ENOENT if the topic does not exists
- // return 0 on success, error code otherwise
- int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const;
+ // return -ENOENT if the topic does not exists
+ // return 0 on success, error code otherwise.
+ // if |subscribed_buckets| valid, then for notification_v2 read the bucket
+ // topic mapping object.
+ int get_topic(const DoutPrefixProvider* dpp,
+ const std::string& name,
+ rgw_pubsub_topic& result,
+ optional_yield y,
+ std::set<std::string>* subscribed_buckets) const;
// create a topic with a name only
// if the topic already exists it is a no-op (considered success)
// return 0 on success, error code otherwise
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
rgw_pubsub_topic result;
- ret = ps.get_topic(this, topic_name, result, y);
+ ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret == -ENOENT) {
// topic not present
return 0;
}
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
- op_ret = ps.get_topic(this, topic_name, result, y);
+ op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
- op_ret = ps.get_topic(this, topic_name, result, y);
+ op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
rgw_pubsub_topic result;
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
- ret = ps.get_topic(this, topic_name, result, y);
+ ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << ret << dendl;
&s->penv.site->get_period()->get_map().zonegroups);
rgw_pubsub_topic result;
- op_ret = ps.get_topic(this, topic_name, result, y);
+ op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret == 0) {
op_ret = verify_topic_owner_or_policy(
s, result, driver->get_zone()->get_zonegroup().get_name(),
const auto topic_name = arn->resource;
// get topic information. destination information is stored in the topic
- rgw_pubsub_topic topic_info;
- op_ret = ps.get_topic(this, topic_name, topic_info, y);
+ rgw_pubsub_topic topic_info;
+ op_ret = ps.get_topic(this, topic_name, topic_info, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
std::unordered_map<std::string, rgw_pubsub_topic> topics;
- const auto rgwbucket = rgw_bucket(s->bucket_tenant, s->bucket_name, "");
for (const auto& c : configurations.list) {
const auto& notif_name = c.id;
if (notif_name.empty()) {
if (!topics.contains(topic_name)) {
// get topic information. destination information is stored in the topic
rgw_pubsub_topic topic_info;
- op_ret = ps.get_topic(this, topic_name, topic_info, y);
+ op_ret = ps.get_topic(this, topic_name, topic_info, y,nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret << dendl;
<< bucket->get_name() << " returned err= " << op_ret << dendl;
return;
}
+ for (const auto& [_, topic] : topics) {
+ const auto ret = driver->update_bucket_topic_mapping(
+ topic,
+ rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()),
+ /*add_mapping=*/true, y, this);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "Failed to remove topic mapping on bucket="
+ << bucket->get_name() << " ret= " << ret << dendl;
+ // error should be reported ??
+ // op_ret = ret;
+ }
+ }
ldpp_dout(this, 20) << "successfully created bucket notification for bucket: "
<< bucket->get_name() << dendl;
}
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) = 0;
+ /** Update the bucket-topic mapping in the store, if |add_mapping|=true then
+ * adding the |bucket_key| |topic| mapping to store, else delete the
+ * |bucket_key| |topic| mapping from the store. The |bucket_key| is
+ * in the format |tenant_name + "/" + bucket_name| if tenant is not empty
+ * else |bucket_name|*/
+ virtual int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Remove the |bucket_key| from bucket-topic mapping in the store, for all
+ the topics under |bucket_topics|*/
+ virtual int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Get the bucket-topic mapping from the backend store. The |bucket_keys|
+ * are in the format |tenant_name + "/" + bucket_name| if tenant is not
+ * empty else |bucket_name|*/
+ virtual int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Remove the bucket-topic mapping from the backend store. */
+ virtual int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
const DoutPrefixProvider* dpp) override {
return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
}
-
+ int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->update_bucket_topic_mapping(topic, bucket_key, add_mapping, y,
+ dpp);
+ }
+ int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->remove_bucket_mapping_from_topics(bucket_topics, bucket_key, y,
+ dpp);
+ }
+ int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
+ }
+ int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->delete_bucket_topic_mapping(topic, y, dpp);
+ }
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
- return -ENOENT;
+ return -EOPNOTSUPP;
}
int remove_topic_v2(const std::string& topic_name,
const std::string& tenant,
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
- return -ENOENT;
+ return -EOPNOTSUPP;
+ }
+ int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
+ }
+ int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
+ }
+ int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
+ }
+ int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
}
};
static std::string topic_oid_prefix = "topic.";
static constexpr char topic_tenant_delim[] = ":";
+static std::string bucket_topic_oid_prefix = "buckets.";
std::string get_topic_key(const std::string& topic_name,
const std::string& tenant) {
*topic_name = topic_entry;
}
}
+
+std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic) {
+ return bucket_topic_oid_prefix + get_topic_key(topic.name, topic.user.tenant);
+}
+
class RGWSI_Topic_Module : public RGWSI_MBSObj_Handler_Module {
RGWSI_Topic_RADOS::Svc& svc;
const std::string prefix;
parse_topic_entry(entry, &tenant, &topic_name);
RGWPubSub ps(driver, tenant,
&topic_svc->svc.zone->get_current_period().get_map().zonegroups);
- int ret = ps.get_topic(dpp, topic_name, result, y);
+ int ret = ps.get_topic(dpp, topic_name, result, y, nullptr);
if (ret < 0) {
return ret;
}
void parse_topic_entry(const std::string& topic_entry,
std::string* tenant_name,
std::string* topic_name);
+
+std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic);
\ No newline at end of file