}
// bucket notifications
- RGWPubSub ps(driver, tenant_name);
rgw_pubsub_bucket_topics result;
- const RGWPubSub::Bucket b(ps, bucket.get());
- ret = b.get_topics(dpp, result, y);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
- return -ret;
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ ret = get_bucket_notifications(dpp, bucket.get(), result);
+ if (ret < 0) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
+ << std::endl;
+ return -ret;
+ }
+ } else {
+ RGWPubSub ps(driver, tenant_name);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ ret = b.get_topics(dpp, result, y);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
+ << std::endl;
+ return -ret;
+ }
}
result.dump(formatter);
#include "rgw_pubsub_push.h"
#include "rgw_zone_features.h"
#include "rgw_perf_counters.h"
+#include "services/svc_zone.h"
#include "common/dout.h"
#include <chrono>
reservation_t& res,
const RGWObjTags* req_tags)
{
- const RGWPubSub ps(res.store, res.user_tenant);
- const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
rgw_pubsub_bucket_topics bucket_topics;
- auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
- if (rc < 0) {
- // failed to fetch bucket topics
- return rc;
+ if (do_all_zonegroups_support_notification_v2(
+ res.store->svc()->zone->get_current_period().get_map().zonegroups)) {
+ auto ret = 0;
+ if (!res.s) {
+ // for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs
+ // are not loaded, so force to reload the bucket, that reloads the attr.
+ // for non S3-request caller, res.s is nullptr
+ ret = res.bucket->load_bucket(dpp, res.yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed to reload bucket: '" << res.bucket->get_name()
+ << "' to get bucket notification attrs with error ret= " << ret
+ << dendl;
+ return ret;
+ }
+ }
+ ret = get_bucket_notifications(dpp, res.bucket, bucket_topics);
+ if (ret < 0) {
+ return ret;
+ }
+ } else {
+ const RGWPubSub ps(res.store, res.user_tenant);
+ const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
+ auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
+ if (rc < 0) {
+ // failed to fetch bucket topics
+ return rc;
+ }
}
for (const auto& bucket_topic : bucket_topics.topics) {
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
return ret;
}
}
- res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
+ // load the topic,if there is change in topic config while it's stored in
+ // notification.
+ rgw_pubsub_topic result;
+ const RGWPubSub ps(
+ res.store, res.user_tenant,
+ &res.store->svc()->zone->get_current_period().get_map().zonegroups);
+ auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield);
+ if (ret < 0) {
+ ldpp_dout(res.dpp, 1)
+ << "INFO: failed to load topic: " << topic_cfg.name
+ << ". error: " << ret
+ << " while storing the persistent notification event" << dendl;
+ if (ret == -ENOENT) {
+ // either the topic is deleted but the corresponding notification still
+ // exist or in v2 mode the notification could have synced first but
+ // topic is not synced yet.
+ return 0;
+ }
+ ldpp_dout(res.dpp, 1)
+ << "WARN: Using the stored topic from bucket notification struct."
+ << dendl;
+ res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
+ } else {
+ res.topics.emplace_back(topic_filter.s3_id, result, res_id);
+ }
}
return 0;
}
size_t size;
rgw::sal::Object* const object;
rgw::sal::Object* const src_object; // may differ from object
- rgw::sal::Bucket* const bucket;
+ rgw::sal::Bucket* bucket;
const std::string* const object_name;
boost::optional<const RGWObjTags&> tagset;
meta_map_t x_meta_map; // metadata cached by value
return EINVAL;
}
- RGWPubSub ps(driver, tenant);
-
rgw_pubsub_bucket_topics result;
int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
-
- const RGWPubSub::Bucket b(ps, bucket.get());
- ret = b.get_topics(dpp(), result, null_yield);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
- return -ret;
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ ret = get_bucket_notifications(dpp(), bucket.get(), result);
+ if (ret < 0) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
+ << std::endl;
+ return -ret;
+ }
+ } else {
+ RGWPubSub ps(driver, tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ ret = b.get_topics(dpp(), result, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
}
encode_json("result", result, formatter.get());
formatter->flush(cout);
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
-
- RGWPubSub ps(driver, tenant);
-
rgw_pubsub_bucket_topics bucket_topics;
- const RGWPubSub::Bucket b(ps, bucket.get());
- ret = b.get_topics(dpp(), bucket_topics, null_yield);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
- return -ret;
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ ret = get_bucket_notifications(dpp(), bucket.get(), bucket_topics);
+ if (ret < 0) {
+ cerr << "ERROR: could not get bucket notifications: "
+ << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ } else {
+ RGWPubSub ps(driver, tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ ret = b.get_topics(dpp(), bucket_topics, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
}
-
- rgw_pubsub_topic_filter bucket_topic;
- ret = b.get_notification_by_id(dpp(), notification_id, bucket_topic, null_yield);
- if (ret < 0) {
- cerr << "ERROR: could not get notification: " << cpp_strerror(-ret) << std::endl;
- return -ret;
+ auto iter = find_unique_topic(bucket_topics, notification_id);
+ if (!iter) {
+ cerr << "ERROR: notification was not found" << std::endl;
+ return -ENOENT;
}
- encode_json("notification", bucket_topic, formatter.get());
+ encode_json("notification", *iter, formatter.get());
formatter->flush(cout);
}
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
-
+ if (!driver->is_meta_master()) {
+ cerr << "ERROR: Run 'topic rm' from master zone " << std::endl;
+ return -EINVAL;
+ }
ret = rgw::notify::remove_persistent_topic(
dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
if (ret < 0) {
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
return EINVAL;
}
-
+ if (!driver->is_meta_master()) {
+ cerr << "ERROR: Run 'notification rm' from master zone " << std::endl;
+ return -EINVAL;
+ }
int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- RGWPubSub ps(driver, tenant);
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ ret = remove_notification_v2(dpp(), driver, bucket.get(), notification_id,
+ null_yield);
+ } else {
+ RGWPubSub ps(driver, tenant);
- rgw_pubsub_bucket_topics bucket_topics;
- const RGWPubSub::Bucket b(ps, bucket.get());
- ret = b.get_topics(dpp(), bucket_topics, null_yield);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
+ rgw_pubsub_bucket_topics bucket_topics;
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ ret = b.get_topics(dpp(), bucket_topics, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
- rgw_pubsub_topic_filter bucket_topic;
- if(notification_id.empty()) {
- ret = b.remove_notifications(dpp(), null_yield);
- } else {
- ret = b.remove_notification_by_id(dpp(), notification_id, null_yield);
+ rgw_pubsub_topic_filter bucket_topic;
+ if(notification_id.empty()) {
+ ret = b.remove_notifications(dpp(), null_yield);
+ } else {
+ ret = b.remove_notification_by_id(dpp(), notification_id, null_yield);
+ }
}
}
#define RGW_ATTR_TRACE RGW_ATTR_PREFIX "trace"
+#define RGW_ATTR_BUCKET_NOTIFICATION RGW_ATTR_PREFIX "bucket-notification"
+
enum class RGWFormat : int8_t {
BAD_FORMAT = -1,
PLAIN = 0,
int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
{
if (use_notification_v2) {
- const int ret = driver->read_topic(name, tenant, result, nullptr, y, dpp);
+ const int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
<< " tenant: " << tenant << ", ret=" << ret << dendl;
return 0;
}
-// from list of bucket topics, find the one that was auto-generated by a notification
-auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string ¬ification_id) {
- auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(),
- [&](const auto& val) { return notification_id == val.second.s3_id; });
- return it != bucket_topics.topics.end() ?
- std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
- std::nullopt;
-}
-
-int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id,
- rgw_pubsub_topic_filter& result, optional_yield y) const {
- rgw_pubsub_bucket_topics bucket_topics;
- const int ret = read_topics(dpp, bucket_topics, nullptr, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl;
- return ret;
+int get_bucket_notifications(const DoutPrefixProvider* dpp,
+ rgw::sal::Bucket* bucket,
+ rgw_pubsub_bucket_topics& bucket_topics) {
+ const rgw::sal::Attrs& attrs = bucket->get_attrs();
+ auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (iter == attrs.end()) {
+ return 0;
}
-
- auto iter = find_unique_topic(bucket_topics, notification_id);
- if (!iter) {
- ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
- return -ENOENT;
+ try {
+ const auto& bl = iter->second;
+ auto biter = bl.cbegin();
+ bucket_topics.decode(biter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to decode bucket topics for bucket: "
+ << bucket->get_name() << dendl;
+ return -EIO;
}
-
- result = iter->get();
return 0;
}
-
-int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
- const rgw::notify::EventTypeList& events, optional_yield y) const {
- return create_notification(dpp, topic_name, events, std::nullopt, "", y);
- }
bool do_all_zonegroups_support_notification_v2(
std::map<std::string, RGWZoneGroup> zonegroups) {
for (const auto& [_, zonegroup] : zonegroups) {
return true;
}
-int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+std::string topic_to_unique(const std::string& topic,
+ const std::string& notification) {
+ return notification + "_" + topic;
+}
+
+// from list of bucket topics, find the one that was auto-generated by a notification
+std::optional<rgw_pubsub_topic_filter> find_unique_topic(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& notification_id) {
+ auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(),
+ [&](const auto& val) { return notification_id == val.second.s3_id; });
+ if (it != bucket_topics.topics.end())
+ return it->second;
+ return std::nullopt;
+}
+
+int delete_all_notifications(const DoutPrefixProvider* dpp,
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ std::map<std::string, bufferlist>& attrs,
+ rgw::sal::Bucket* bucket,
+ rgw::sal::Driver* driver,
+ optional_yield y) {
+ auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (iter == attrs.end()) {
+ return 0;
+ }
+ // delete all notifications of on a bucket
+ attrs.erase(iter);
+ const auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "Failed to remove RGW_ATTR_BUCKET_NOTIFICATION attr on bucket="
+ << bucket->get_name() << " ret= " << ret << dendl;
+ }
+ return ret;
+}
+
+int remove_notification_v2(const DoutPrefixProvider* dpp,
+ rgw::sal::Driver* driver,
+ rgw::sal::Bucket* bucket,
+ const std::string& notification_id,
+ optional_yield y) {
+ rgw_pubsub_bucket_topics bucket_topics;
+ auto ret = get_bucket_notifications(dpp, bucket, bucket_topics);
+ if (ret < 0) {
+ return -ret;
+ }
+ // no notifications on the bucket.
+ if (bucket_topics.topics.empty()) {
+ return 0;
+ }
+ rgw::sal::Attrs& attrs = bucket->get_attrs();
+ if (notification_id.empty()) {
+ return delete_all_notifications(dpp, bucket_topics, attrs, bucket, driver,
+ y);
+ }
+ // delete a specific notification
+ const auto unique_topic = find_unique_topic(bucket_topics, notification_id);
+ if (!unique_topic) {
+ // notification to be removed is not found - considered success
+ ldpp_dout(dpp, 20) << "notification '" << notification_id
+ << "' already removed" << dendl;
+ return 0;
+ }
+ const auto& topic_name = unique_topic->topic.name;
+ bucket_topics.topics.erase(topic_to_unique(topic_name, notification_id));
+ bufferlist bl;
+ bucket_topics.encode(bl);
+ attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+ ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
+ << bucket->get_name() << " returned err= " << ret << dendl;
+ }
+ return ret;
+}
+
+int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const {
rgw_pubsub_topic topic_info;
ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
return -ENOENT;
}
- topic_name = std::make_unique<std::string>(iter->get().topic.name);
+ topic_name = std::make_unique<std::string>(iter->topic.name);
}
if (bucket_topics.topics.erase(*topic_name) == 0) {
const rgw_pubsub_topic& topic,
optional_yield y) const {
RGWObjVersionTracker objv_tracker;
- const auto ret = driver->write_topic(topic, &objv_tracker, y, dpp);
+ auto ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
<< dendl;
}
-
return ret;
}
if (ret < 0 && ret != -ENOENT) {
return ret;
} else if (ret == -ENOENT) {
- // its not an error if no topics exist, just a no-op
+ // it's not an error if no topics exist, just a no-op
ldpp_dout(dpp, 10) << "WARNING: topic name:" << name
<< " does not exist, deletion is a no-op: ret=" << ret
<< dendl;
return 0;
}
- ret = driver->remove_topic(name, tenant, &objv_tracker, y, dpp);
+ ret = driver->remove_topic_v2(name, tenant, &objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
<< dendl;
bool do_all_zonegroups_support_notification_v2(
std::map<std::string, RGWZoneGroup> zonegroups);
+
+std::string topic_to_unique(const std::string& topic,
+ const std::string& notification);
+
+std::optional<rgw_pubsub_topic_filter> find_unique_topic(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& notif_name);
+
+// Delete the bucket notification if |notification_id| is passed, else delete
+// all the bucket notifications for the given |bucket| and update the topic
+// bucket mapping.
+int remove_notification_v2(const DoutPrefixProvider* dpp,
+ rgw::sal::Driver* driver,
+ rgw::sal::Bucket* bucket,
+ const std::string& notification_id,
+ optional_yield y);
+
+int get_bucket_notifications(const DoutPrefixProvider* dpp,
+ rgw::sal::Bucket* bucket,
+ rgw_pubsub_bucket_topics& bucket_topics);
return 0;
}
-namespace {
-// return a unique topic by prefexing with the notification name: <notification>_<topic>
-std::string topic_to_unique(const std::string& topic, const std::string& notification) {
- return notification + "_" + topic;
-}
-
-// extract the topic from a unique topic of the form: <notification>_<topic>
-[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
- if (unique_topic.find(notification + "_") == std::string::npos) {
- return "";
- }
- return unique_topic.substr(notification.length() + 1);
-}
-
-// from list of bucket topics, find the one that was auto-generated by a notification
-auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
- auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
- return it != bucket_topics.topics.end() ?
- std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
- std::nullopt;
-}
-}
-
int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
int op_ret = b.remove_notification(dpp, topic_name, y);
if (op_ret < 0) {
// a "notification" and a subscription will be auto-generated
// actual configuration is XML encoded in the body of the message
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+ bufferlist data;
int verify_params() override {
bool exists;
const auto no_value = s->info.args.get("notification", &exists);
int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
int r;
- bufferlist data;
std::tie(r, data) = read_all_input(s, max_size, false);
if (r < 0) {
void execute(optional_yield) override;
+ void execute_v2(optional_yield);
};
void RGWPSCreateNotifOp::execute(optional_yield y) {
+ if (do_all_zonegroups_support_notification_v2(
+ s->penv.site->get_period()->get_map().zonegroups)) {
+ return execute_v2(y);
+ }
op_ret = verify_params();
if (op_ret < 0) {
return;
if (op_ret < 0) {
return;
}
+ if (!driver->is_meta_master()) {
+ op_ret = rgw_forward_request_to_master(
+ this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "CreateBucketNotification "
+ "forward_request_to_master returned ret = "
+ << op_ret << dendl;
+ return;
+ }
+ }
std::unique_ptr<rgw::sal::Bucket> bucket;
op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
return 0;
}
+void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
+ op_ret = verify_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ rgw_pubsub_s3_notifications configurations;
+ op_ret = get_params_from_body(configurations);
+ if (op_ret < 0) {
+ return;
+ }
+ if (!driver->is_meta_master()) {
+ op_ret = rgw_forward_request_to_master(
+ this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "CreateBucketNotification "
+ "forward_request_to_master returned ret = "
+ << op_ret << dendl;
+ return;
+ }
+ }
+
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
+ &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '"
+ << (s->bucket_tenant.empty()
+ ? s->bucket_name
+ : s->bucket_tenant + ":" + s->bucket_name)
+ << "' info, ret = " << op_ret << dendl;
+ return;
+ }
+ if (configurations.list.empty()) {
+ op_ret = remove_notification_v2(this, driver, bucket.get(),
+ /*delete all notif=true*/ "", y);
+ return;
+ }
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1)
+ << "failed to load existing bucket notification on bucket: "
+ << (s->bucket_tenant.empty() ? s->bucket_name
+ : s->bucket_tenant + ":" + s->bucket_name)
+ << "' , ret = " << op_ret << dendl;
+ return;
+ }
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
+ std::unordered_map<std::string, rgw_pubsub_topic> topics;
+ const auto rgwbucket = rgw_bucket(s->bucket_tenant, s->bucket_name, "");
+ for (const auto& c : configurations.list) {
+ const auto& notif_name = c.id;
+ if (notif_name.empty()) {
+ ldpp_dout(this, 1) << "missing notification id" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ if (c.topic_arn.empty()) {
+ ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name
+ << "'" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ const auto arn = rgw::ARN::parse(c.topic_arn);
+ if (!arn || arn->resource.empty()) {
+ ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn
+ << "' in notification: '" << notif_name << "'"
+ << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ if (std::find(c.events.begin(), c.events.end(),
+ rgw::notify::UnknownEvent) != c.events.end()) {
+ ldpp_dout(this, 1) << "unknown event type in notification: '"
+ << notif_name << "'" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ const auto& topic_name = arn->resource;
+ if (!topics.contains(topic_name)) {
+ // get topic information. destination information is stored in the topic
+ rgw_pubsub_topic topic_info;
+ op_ret = ps.get_topic(this, topic_name, topic_info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+ << "', ret=" << op_ret << dendl;
+ return;
+ }
+ op_ret = verify_topic_owner_or_policy(
+ s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsPublish);
+ if (op_ret != 0) {
+ ldpp_dout(this, 1) << "failed to create notification for topic '"
+ << topic_name << "' topic owned by other user"
+ << dendl;
+ return;
+ }
+ topics[topic_name] = std::move(topic_info);
+ }
+ auto& topic_filter =
+ bucket_topics.topics[topic_to_unique(topic_name, notif_name)];
+ topic_filter.topic = topics[topic_name];
+ topic_filter.events = c.events;
+ topic_filter.s3_id = notif_name;
+ topic_filter.s3_filter = c.filter;
+ }
+ // finally store all the bucket notifications as attr.
+ bufferlist bl;
+ bucket_topics.encode(bl);
+ rgw::sal::Attrs& attrs = bucket->get_attrs();
+ attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+ op_ret = bucket->merge_and_store_attrs(this, attrs, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1)
+ << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
+ << bucket->get_name() << " returned err= " << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully created bucket notification for bucket: "
+ << bucket->get_name() << dendl;
+}
+
// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
int get_params(std::string& notif_name) const {
}
return 0;
}
+ void execute_v2(optional_yield y);
-public:
+ public:
int verify_permission(optional_yield y) override;
void pre_exec() override {
};
void RGWPSDeleteNotifOp::execute(optional_yield y) {
+ if (do_all_zonegroups_support_notification_v2(
+ s->penv.site->get_period()->get_map().zonegroups)) {
+ return execute_v2(y);
+ }
std::string notif_name;
op_ret = get_params(notif_name);
if (op_ret < 0) {
return;
}
+ if (!driver->is_meta_master()) {
+ bufferlist indata;
+ op_ret = rgw_forward_request_to_master(
+ this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "DeleteBucketNotification "
+ "forward_request_to_master returned error ret= "
+ << op_ret << dendl;
+ return;
+ }
+ }
std::unique_ptr<rgw::sal::Bucket> bucket;
op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
// delete a specific notification
const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
if (unique_topic) {
- const auto unique_topic_name = unique_topic->get().topic.name;
+ const auto unique_topic_name = unique_topic->topic.name;
op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
return;
}
return 0;
}
+void RGWPSDeleteNotifOp::execute_v2(optional_yield y) {
+ std::string notif_name;
+ op_ret = get_params(notif_name);
+ if (op_ret < 0) {
+ return;
+ }
+ if (!driver->is_meta_master()) {
+ bufferlist indata;
+ op_ret = rgw_forward_request_to_master(
+ this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "DeleteBucketNotification "
+ "forward_request_to_master returned error ret= "
+ << op_ret << dendl;
+ return;
+ }
+ }
+
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
+ &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '"
+ << (s->bucket_tenant.empty()
+ ? s->bucket_name
+ : s->bucket_tenant + ":" + s->bucket_name)
+ << "' info, ret = " << op_ret << dendl;
+ return;
+ }
+ op_ret = remove_notification_v2(this, driver, bucket.get(), notif_name, y);
+}
+
// command (S3 compliant): GET /bucket?notification[=<notification-id>]
class RGWPSListNotifsOp : public RGWOp {
rgw_pubsub_s3_notifications notifications;
return;
}
- const RGWPubSub ps(driver, s->owner.id.tenant);
- const RGWPubSub::Bucket b(ps, bucket.get());
-
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b.get_topics(this, bucket_topics, y);
+ if (do_all_zonegroups_support_notification_v2(
+ s->penv.site->get_period()->get_map().zonegroups)) {
+ op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics);
+ } else {
+ const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ op_ret = b.get_topics(this, bucket_topics, y);
+ }
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '"
+ << s->bucket_name << "', ret=" << op_ret << dendl;
return;
}
if (!notif_name.empty()) {
// get info of a specific notification
const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
if (unique_topic) {
- notifications.list.emplace_back(unique_topic->get());
+ notifications.list.emplace_back(*unique_topic);
return;
}
op_ret = -ENOENT;