From: kchheda3 Date: Tue, 5 Dec 2023 17:26:48 +0000 (-0500) Subject: rgw/multisite-notification: Add support to replicate bucket notifications in multisit... X-Git-Tag: testing/wip-root-testing-20240411.174241^2~37 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=573c7f11684bc97fcb9f6ffd56038bf137dc223e;p=ceph-ci.git rgw/multisite-notification: Add support to replicate bucket notifications in multisite config. Signed-off-by: kchheda3 (cherry picked from commit ab74e2cfe4e3738b85d38a132f835d1dca113258) --- diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc index d0eacf81e8c..9f556553182 100644 --- a/src/rgw/driver/rados/rgw_bucket.cc +++ b/src/rgw/driver/rados/rgw_bucket.cc @@ -1337,13 +1337,24 @@ static int bucket_stats(rgw::sal::Driver* driver, } // bucket notifications - RGWPubSub ps(driver, tenant_name); rgw_pubsub_bucket_topics result; - const RGWPubSub::Bucket b(ps, bucket.get()); - ret = b.get_topics(dpp, result, y); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; - return -ret; + if (driver->get_zone()->get_zonegroup().supports_feature( + rgw::zone_features::notification_v2)) { + ret = get_bucket_notifications(dpp, bucket.get(), result); + if (ret < 0) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) + << std::endl; + return -ret; + } + } else { + RGWPubSub ps(driver, tenant_name); + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp, result, y); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) + << std::endl; + return -ret; + } } result.dump(formatter); diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 4a2cf827165..e5d5dd602f5 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -16,6 +16,7 @@ #include "rgw_pubsub_push.h" #include "rgw_zone_features.h" #include "rgw_perf_counters.h" +#include "services/svc_zone.h" #include "common/dout.h" #include @@ -980,13 +981,35 @@ static inline bool notification_match(reservation_t& res, reservation_t& res, const RGWObjTags* req_tags) { - const RGWPubSub ps(res.store, res.user_tenant); - const RGWPubSub::Bucket ps_bucket(ps, res.bucket); rgw_pubsub_bucket_topics bucket_topics; - auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield); - if (rc < 0) { - // failed to fetch bucket topics - return rc; + if (do_all_zonegroups_support_notification_v2( + res.store->svc()->zone->get_current_period().get_map().zonegroups)) { + auto ret = 0; + if (!res.s) { + // for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs + // are not loaded, so force to reload the bucket, that reloads the attr. + // for non S3-request caller, res.s is nullptr + ret = res.bucket->load_bucket(dpp, res.yield); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: failed to reload bucket: '" << res.bucket->get_name() + << "' to get bucket notification attrs with error ret= " << ret + << dendl; + return ret; + } + } + ret = get_bucket_notifications(dpp, res.bucket, bucket_topics); + if (ret < 0) { + return ret; + } + } else { + const RGWPubSub ps(res.store, res.user_tenant); + const RGWPubSub::Bucket ps_bucket(ps, res.bucket); + auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield); + if (rc < 0) { + // failed to fetch bucket topics + return rc; + } } for (const auto& bucket_topic : bucket_topics.topics) { const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; @@ -1027,7 +1050,31 @@ static inline bool notification_match(reservation_t& res, return ret; } } - res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id); + // load the topic,if there is change in topic config while it's stored in + // notification. + rgw_pubsub_topic result; + const RGWPubSub ps( + res.store, res.user_tenant, + &res.store->svc()->zone->get_current_period().get_map().zonegroups); + auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield); + if (ret < 0) { + ldpp_dout(res.dpp, 1) + << "INFO: failed to load topic: " << topic_cfg.name + << ". error: " << ret + << " while storing the persistent notification event" << dendl; + if (ret == -ENOENT) { + // either the topic is deleted but the corresponding notification still + // exist or in v2 mode the notification could have synced first but + // topic is not synced yet. + return 0; + } + ldpp_dout(res.dpp, 1) + << "WARN: Using the stored topic from bucket notification struct." + << dendl; + res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id); + } else { + res.topics.emplace_back(topic_filter.s3_id, result, res_id); + } } return 0; } diff --git a/src/rgw/driver/rados/rgw_notify.h b/src/rgw/driver/rados/rgw_notify.h index 20f0c17e5bb..5117f9eecd8 100644 --- a/src/rgw/driver/rados/rgw_notify.h +++ b/src/rgw/driver/rados/rgw_notify.h @@ -63,7 +63,7 @@ struct reservation_t { size_t size; rgw::sal::Object* const object; rgw::sal::Object* const src_object; // may differ from object - rgw::sal::Bucket* const bucket; + rgw::sal::Bucket* bucket; const std::string* const object_name; boost::optional tagset; meta_map_t x_meta_map; // metadata cached by value diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index e25f188a732..ee843d92c10 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -10619,20 +10619,28 @@ next: return EINVAL; } - RGWPubSub ps(driver, tenant); - rgw_pubsub_bucket_topics result; int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket); if (ret < 0) { cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; return -ret; } - - const RGWPubSub::Bucket b(ps, bucket.get()); - ret = b.get_topics(dpp(), result, null_yield); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; - return -ret; + if (driver->get_zone()->get_zonegroup().supports_feature( + rgw::zone_features::notification_v2)) { + ret = get_bucket_notifications(dpp(), bucket.get(), result); + if (ret < 0) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) + << std::endl; + return -ret; + } + } else { + RGWPubSub ps(driver, tenant); + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp(), result, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; + return -ret; + } } encode_json("result", result, formatter.get()); formatter->flush(cout); @@ -10704,24 +10712,30 @@ next: cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; return -ret; } - - RGWPubSub ps(driver, tenant); - rgw_pubsub_bucket_topics bucket_topics; - const RGWPubSub::Bucket b(ps, bucket.get()); - ret = b.get_topics(dpp(), bucket_topics, null_yield); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl; - return -ret; + if (driver->get_zone()->get_zonegroup().supports_feature( + rgw::zone_features::notification_v2)) { + ret = get_bucket_notifications(dpp(), bucket.get(), bucket_topics); + if (ret < 0) { + cerr << "ERROR: could not get bucket notifications: " + << cpp_strerror(-ret) << std::endl; + return -ret; + } + } else { + RGWPubSub ps(driver, tenant); + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp(), bucket_topics, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl; + return -ret; + } } - - rgw_pubsub_topic_filter bucket_topic; - ret = b.get_notification_by_id(dpp(), notification_id, bucket_topic, null_yield); - if (ret < 0) { - cerr << "ERROR: could not get notification: " << cpp_strerror(-ret) << std::endl; - return -ret; + auto iter = find_unique_topic(bucket_topics, notification_id); + if (!iter) { + cerr << "ERROR: notification was not found" << std::endl; + return -ENOENT; } - encode_json("notification", bucket_topic, formatter.get()); + encode_json("notification", *iter, formatter.get()); formatter->flush(cout); } @@ -10730,7 +10744,10 @@ next: cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; return EINVAL; } - + if (!driver->is_meta_master()) { + cerr << "ERROR: Run 'topic rm' from master zone " << std::endl; + return -EINVAL; + } ret = rgw::notify::remove_persistent_topic( dpp(), static_cast(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield); if (ret < 0) { @@ -10758,28 +10775,37 @@ next: cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; return EINVAL; } - + if (!driver->is_meta_master()) { + cerr << "ERROR: Run 'notification rm' from master zone " << std::endl; + return -EINVAL; + } int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket); if (ret < 0) { cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; return -ret; } - RGWPubSub ps(driver, tenant); + if (driver->get_zone()->get_zonegroup().supports_feature( + rgw::zone_features::notification_v2)) { + ret = remove_notification_v2(dpp(), driver, bucket.get(), notification_id, + null_yield); + } else { + RGWPubSub ps(driver, tenant); - rgw_pubsub_bucket_topics bucket_topics; - const RGWPubSub::Bucket b(ps, bucket.get()); - ret = b.get_topics(dpp(), bucket_topics, null_yield); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl; - return -ret; - } + rgw_pubsub_bucket_topics bucket_topics; + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp(), bucket_topics, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl; + return -ret; + } - rgw_pubsub_topic_filter bucket_topic; - if(notification_id.empty()) { - ret = b.remove_notifications(dpp(), null_yield); - } else { - ret = b.remove_notification_by_id(dpp(), notification_id, null_yield); + rgw_pubsub_topic_filter bucket_topic; + if(notification_id.empty()) { + ret = b.remove_notifications(dpp(), null_yield); + } else { + ret = b.remove_notification_by_id(dpp(), notification_id, null_yield); + } } } diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index c2d7abef35e..5dc4f883a17 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -172,6 +172,8 @@ using ceph::crypto::MD5; #define RGW_ATTR_TRACE RGW_ATTR_PREFIX "trace" +#define RGW_ATTR_BUCKET_NOTIFICATION RGW_ATTR_PREFIX "bucket-notification" + enum class RGWFormat : int8_t { BAD_FORMAT = -1, PLAIN = 0, diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index c563b959863..628f57901af 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -586,7 +586,7 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const { if (use_notification_v2) { - const int ret = driver->read_topic(name, tenant, result, nullptr, y, dpp); + const int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name << " tenant: " << tenant << ", ret=" << ret << dendl; @@ -610,39 +610,26 @@ int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, return 0; } -// from list of bucket topics, find the one that was auto-generated by a notification -auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string ¬ification_id) { - auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), - [&](const auto& val) { return notification_id == val.second.s3_id; }); - return it != bucket_topics.topics.end() ? - std::optional>(it->second): - std::nullopt; -} - -int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, - rgw_pubsub_topic_filter& result, optional_yield y) const { - rgw_pubsub_bucket_topics bucket_topics; - const int ret = read_topics(dpp, bucket_topics, nullptr, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl; - return ret; +int get_bucket_notifications(const DoutPrefixProvider* dpp, + rgw::sal::Bucket* bucket, + rgw_pubsub_bucket_topics& bucket_topics) { + const rgw::sal::Attrs& attrs = bucket->get_attrs(); + auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); + if (iter == attrs.end()) { + return 0; } - - auto iter = find_unique_topic(bucket_topics, notification_id); - if (!iter) { - ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl; - return -ENOENT; + try { + const auto& bl = iter->second; + auto biter = bl.cbegin(); + bucket_topics.decode(biter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "ERROR: failed to decode bucket topics for bucket: " + << bucket->get_name() << dendl; + return -EIO; } - - result = iter->get(); return 0; } - -int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, - const rgw::notify::EventTypeList& events, optional_yield y) const { - return create_notification(dpp, topic_name, events, std::nullopt, "", y); - } bool do_all_zonegroups_support_notification_v2( std::map zonegroups) { for (const auto& [_, zonegroup] : zonegroups) { @@ -653,7 +640,85 @@ bool do_all_zonegroups_support_notification_v2( return true; } -int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, +std::string topic_to_unique(const std::string& topic, + const std::string& notification) { + return notification + "_" + topic; +} + +// from list of bucket topics, find the one that was auto-generated by a notification +std::optional find_unique_topic( + const rgw_pubsub_bucket_topics& bucket_topics, + const std::string& notification_id) { + auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), + [&](const auto& val) { return notification_id == val.second.s3_id; }); + if (it != bucket_topics.topics.end()) + return it->second; + return std::nullopt; +} + +int delete_all_notifications(const DoutPrefixProvider* dpp, + const rgw_pubsub_bucket_topics& bucket_topics, + std::map& attrs, + rgw::sal::Bucket* bucket, + rgw::sal::Driver* driver, + optional_yield y) { + auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); + if (iter == attrs.end()) { + return 0; + } + // delete all notifications of on a bucket + attrs.erase(iter); + const auto ret = bucket->merge_and_store_attrs(dpp, attrs, y); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to remove RGW_ATTR_BUCKET_NOTIFICATION attr on bucket=" + << bucket->get_name() << " ret= " << ret << dendl; + } + return ret; +} + +int remove_notification_v2(const DoutPrefixProvider* dpp, + rgw::sal::Driver* driver, + rgw::sal::Bucket* bucket, + const std::string& notification_id, + optional_yield y) { + rgw_pubsub_bucket_topics bucket_topics; + auto ret = get_bucket_notifications(dpp, bucket, bucket_topics); + if (ret < 0) { + return -ret; + } + // no notifications on the bucket. + if (bucket_topics.topics.empty()) { + return 0; + } + rgw::sal::Attrs& attrs = bucket->get_attrs(); + if (notification_id.empty()) { + return delete_all_notifications(dpp, bucket_topics, attrs, bucket, driver, + y); + } + // delete a specific notification + const auto unique_topic = find_unique_topic(bucket_topics, notification_id); + if (!unique_topic) { + // notification to be removed is not found - considered success + ldpp_dout(dpp, 20) << "notification '" << notification_id + << "' already removed" << dendl; + return 0; + } + const auto& topic_name = unique_topic->topic.name; + bucket_topics.topics.erase(topic_to_unique(topic_name, notification_id)); + bufferlist bl; + bucket_topics.encode(bl); + attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl); + ret = bucket->merge_and_store_attrs(dpp, attrs, y); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket=" + << bucket->get_name() << " returned err= " << ret << dendl; + } + return ret; +} + +int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const { rgw_pubsub_topic topic_info; @@ -720,7 +785,7 @@ int RGWPubSub::Bucket::remove_notification_inner(const DoutPrefixProvider *dpp, ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl; return -ENOENT; } - topic_name = std::make_unique(iter->get().topic.name); + topic_name = std::make_unique(iter->topic.name); } if (bucket_topics.topics.erase(*topic_name) == 0) { @@ -786,12 +851,11 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, const rgw_pubsub_topic& topic, optional_yield y) const { RGWObjVersionTracker objv_tracker; - const auto ret = driver->write_topic(topic, &objv_tracker, y, dpp); + auto ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret << dendl; } - return ret; } @@ -848,13 +912,13 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp, if (ret < 0 && ret != -ENOENT) { return ret; } else if (ret == -ENOENT) { - // its not an error if no topics exist, just a no-op + // it's not an error if no topics exist, just a no-op ldpp_dout(dpp, 10) << "WARNING: topic name:" << name << " does not exist, deletion is a no-op: ret=" << ret << dendl; return 0; } - ret = driver->remove_topic(name, tenant, &objv_tracker, y, dpp); + ret = driver->remove_topic_v2(name, tenant, &objv_tracker, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret << dendl; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 4afc101c63c..46da8e045a9 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -672,3 +672,23 @@ namespace rgw::notify { bool do_all_zonegroups_support_notification_v2( std::map zonegroups); + +std::string topic_to_unique(const std::string& topic, + const std::string& notification); + +std::optional find_unique_topic( + const rgw_pubsub_bucket_topics& bucket_topics, + const std::string& notif_name); + +// Delete the bucket notification if |notification_id| is passed, else delete +// all the bucket notifications for the given |bucket| and update the topic +// bucket mapping. +int remove_notification_v2(const DoutPrefixProvider* dpp, + rgw::sal::Driver* driver, + rgw::sal::Bucket* bucket, + const std::string& notification_id, + optional_yield y); + +int get_bucket_notifications(const DoutPrefixProvider* dpp, + rgw::sal::Bucket* bucket, + rgw_pubsub_bucket_topics& bucket_topics); diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 7396fcfddd4..f2e5439208f 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -894,29 +894,6 @@ int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, option return 0; } -namespace { -// return a unique topic by prefexing with the notification name: _ -std::string topic_to_unique(const std::string& topic, const std::string& notification) { - return notification + "_" + topic; -} - -// extract the topic from a unique topic of the form: _ -[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) { - if (unique_topic.find(notification + "_") == std::string::npos) { - return ""; - } - return unique_topic.substr(notification.length() + 1); -} - -// from list of bucket topics, find the one that was auto-generated by a notification -auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) { - auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; }); - return it != bucket_topics.topics.end() ? - std::optional>(it->second): - std::nullopt; -} -} - int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) { int op_ret = b.remove_notification(dpp, topic_name, y); if (op_ret < 0) { @@ -944,6 +921,7 @@ int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_buc // a "notification" and a subscription will be auto-generated // actual configuration is XML encoded in the body of the message class RGWPSCreateNotifOp : public RGWDefaultResponseOp { + bufferlist data; int verify_params() override { bool exists; const auto no_value = s->info.args.get("notification", &exists); @@ -965,7 +943,6 @@ class RGWPSCreateNotifOp : public RGWDefaultResponseOp { int get_params_from_body(rgw_pubsub_s3_notifications& configurations) { const auto max_size = s->cct->_conf->rgw_max_put_param_size; int r; - bufferlist data; std::tie(r, data) = read_all_input(s, max_size, false); if (r < 0) { @@ -1010,9 +987,14 @@ public: void execute(optional_yield) override; + void execute_v2(optional_yield); }; void RGWPSCreateNotifOp::execute(optional_yield y) { + if (do_all_zonegroups_support_notification_v2( + s->penv.site->get_period()->get_map().zonegroups)) { + return execute_v2(y); + } op_ret = verify_params(); if (op_ret < 0) { return; @@ -1023,6 +1005,16 @@ void RGWPSCreateNotifOp::execute(optional_yield y) { if (op_ret < 0) { return; } + if (!driver->is_meta_master()) { + op_ret = rgw_forward_request_to_master( + this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "CreateBucketNotification " + "forward_request_to_master returned ret = " + << op_ret << dendl; + return; + } + } std::unique_ptr bucket; op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), @@ -1134,6 +1126,132 @@ int RGWPSCreateNotifOp::verify_permission(optional_yield y) { return 0; } +void RGWPSCreateNotifOp::execute_v2(optional_yield y) { + op_ret = verify_params(); + if (op_ret < 0) { + return; + } + + rgw_pubsub_s3_notifications configurations; + op_ret = get_params_from_body(configurations); + if (op_ret < 0) { + return; + } + if (!driver->is_meta_master()) { + op_ret = rgw_forward_request_to_master( + this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "CreateBucketNotification " + "forward_request_to_master returned ret = " + << op_ret << dendl; + return; + } + } + + std::unique_ptr bucket; + op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), + &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "failed to get bucket '" + << (s->bucket_tenant.empty() + ? s->bucket_name + : s->bucket_tenant + ":" + s->bucket_name) + << "' info, ret = " << op_ret << dendl; + return; + } + if (configurations.list.empty()) { + op_ret = remove_notification_v2(this, driver, bucket.get(), + /*delete all notif=true*/ "", y); + return; + } + rgw_pubsub_bucket_topics bucket_topics; + op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics); + if (op_ret < 0) { + ldpp_dout(this, 1) + << "failed to load existing bucket notification on bucket: " + << (s->bucket_tenant.empty() ? s->bucket_name + : s->bucket_tenant + ":" + s->bucket_name) + << "' , ret = " << op_ret << dendl; + return; + } + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); + std::unordered_map topics; + const auto rgwbucket = rgw_bucket(s->bucket_tenant, s->bucket_name, ""); + for (const auto& c : configurations.list) { + const auto& notif_name = c.id; + if (notif_name.empty()) { + ldpp_dout(this, 1) << "missing notification id" << dendl; + op_ret = -EINVAL; + return; + } + if (c.topic_arn.empty()) { + ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name + << "'" << dendl; + op_ret = -EINVAL; + return; + } + + const auto arn = rgw::ARN::parse(c.topic_arn); + if (!arn || arn->resource.empty()) { + ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn + << "' in notification: '" << notif_name << "'" + << dendl; + op_ret = -EINVAL; + return; + } + + if (std::find(c.events.begin(), c.events.end(), + rgw::notify::UnknownEvent) != c.events.end()) { + ldpp_dout(this, 1) << "unknown event type in notification: '" + << notif_name << "'" << dendl; + op_ret = -EINVAL; + return; + } + const auto& topic_name = arn->resource; + if (!topics.contains(topic_name)) { + // get topic information. destination information is stored in the topic + rgw_pubsub_topic topic_info; + op_ret = ps.get_topic(this, topic_name, topic_info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "failed to get topic '" << topic_name + << "', ret=" << op_ret << dendl; + return; + } + op_ret = verify_topic_owner_or_policy( + s, topic_info, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsPublish); + if (op_ret != 0) { + ldpp_dout(this, 1) << "failed to create notification for topic '" + << topic_name << "' topic owned by other user" + << dendl; + return; + } + topics[topic_name] = std::move(topic_info); + } + auto& topic_filter = + bucket_topics.topics[topic_to_unique(topic_name, notif_name)]; + topic_filter.topic = topics[topic_name]; + topic_filter.events = c.events; + topic_filter.s3_id = notif_name; + topic_filter.s3_filter = c.filter; + } + // finally store all the bucket notifications as attr. + bufferlist bl; + bucket_topics.encode(bl); + rgw::sal::Attrs& attrs = bucket->get_attrs(); + attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl); + op_ret = bucket->merge_and_store_attrs(this, attrs, y); + if (op_ret < 0) { + ldpp_dout(this, 1) + << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket=" + << bucket->get_name() << " returned err= " << op_ret << dendl; + return; + } + ldpp_dout(this, 20) << "successfully created bucket notification for bucket: " + << bucket->get_name() << dendl; +} + // command (extension to S3): DELETE /bucket?notification[=] class RGWPSDeleteNotifOp : public RGWDefaultResponseOp { int get_params(std::string& notif_name) const { @@ -1149,8 +1267,9 @@ class RGWPSDeleteNotifOp : public RGWDefaultResponseOp { } return 0; } + void execute_v2(optional_yield y); -public: + public: int verify_permission(optional_yield y) override; void pre_exec() override { @@ -1165,11 +1284,26 @@ public: }; void RGWPSDeleteNotifOp::execute(optional_yield y) { + if (do_all_zonegroups_support_notification_v2( + s->penv.site->get_period()->get_map().zonegroups)) { + return execute_v2(y); + } std::string notif_name; op_ret = get_params(notif_name); if (op_ret < 0) { return; } + if (!driver->is_meta_master()) { + bufferlist indata; + op_ret = rgw_forward_request_to_master( + this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "DeleteBucketNotification " + "forward_request_to_master returned error ret= " + << op_ret << dendl; + return; + } + } std::unique_ptr bucket; op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), @@ -1196,7 +1330,7 @@ void RGWPSDeleteNotifOp::execute(optional_yield y) { // delete a specific notification const auto unique_topic = find_unique_topic(bucket_topics, notif_name); if (unique_topic) { - const auto unique_topic_name = unique_topic->get().topic.name; + const auto unique_topic_name = unique_topic->topic.name; op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps); return; } @@ -1216,6 +1350,38 @@ int RGWPSDeleteNotifOp::verify_permission(optional_yield y) { return 0; } +void RGWPSDeleteNotifOp::execute_v2(optional_yield y) { + std::string notif_name; + op_ret = get_params(notif_name); + if (op_ret < 0) { + return; + } + if (!driver->is_meta_master()) { + bufferlist indata; + op_ret = rgw_forward_request_to_master( + this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "DeleteBucketNotification " + "forward_request_to_master returned error ret= " + << op_ret << dendl; + return; + } + } + + std::unique_ptr bucket; + op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), + &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "failed to get bucket '" + << (s->bucket_tenant.empty() + ? s->bucket_name + : s->bucket_tenant + ":" + s->bucket_name) + << "' info, ret = " << op_ret << dendl; + return; + } + op_ret = remove_notification_v2(this, driver, bucket.get(), notif_name, y); +} + // command (S3 compliant): GET /bucket?notification[=] class RGWPSListNotifsOp : public RGWOp { rgw_pubsub_s3_notifications notifications; @@ -1278,21 +1444,26 @@ void RGWPSListNotifsOp::execute(optional_yield y) { return; } - const RGWPubSub ps(driver, s->owner.id.tenant); - const RGWPubSub::Bucket b(ps, bucket.get()); - // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; - op_ret = b.get_topics(this, bucket_topics, y); + if (do_all_zonegroups_support_notification_v2( + s->penv.site->get_period()->get_map().zonegroups)) { + op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics); + } else { + const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub::Bucket b(ps, bucket.get()); + op_ret = b.get_topics(this, bucket_topics, y); + } if (op_ret < 0) { - ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl; + ldpp_dout(this, 1) << "failed to get list of topics from bucket '" + << s->bucket_name << "', ret=" << op_ret << dendl; return; } if (!notif_name.empty()) { // get info of a specific notification const auto unique_topic = find_unique_topic(bucket_topics, notif_name); if (unique_topic) { - notifications.list.emplace_back(unique_topic->get()); + notifications.list.emplace_back(*unique_topic); return; } op_ret = -ENOENT;