]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite-notification: Add support to replicate bucket notifications in multisit...
authorkchheda3 <kchheda3@bloomberg.net>
Tue, 5 Dec 2023 17:26:48 +0000 (12:26 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 5 Mar 2024 17:55:24 +0000 (12:55 -0500)
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_notify.h
src/rgw/rgw_admin.cc
src/rgw/rgw_common.h
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc

index d0eacf81e8c121a32600461af843acc531dbdd33..9f5565531829474f8013358ad23e13d14f14bfae 100644 (file)
@@ -1337,13 +1337,24 @@ static int bucket_stats(rgw::sal::Driver* driver,
   }
 
   // bucket notifications
-  RGWPubSub ps(driver, tenant_name);
   rgw_pubsub_bucket_topics result;
-  const RGWPubSub::Bucket b(ps, bucket.get());
-  ret = b.get_topics(dpp, result, y);
-  if (ret < 0 && ret != -ENOENT) {
-    cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
-    return -ret;
+  if (driver->get_zone()->get_zonegroup().supports_feature(
+          rgw::zone_features::notification_v2)) {
+    ret = get_bucket_notifications(dpp, bucket.get(), result);
+    if (ret < 0) {
+      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
+           << std::endl;
+      return -ret;
+    }
+  } else {
+    RGWPubSub ps(driver, tenant_name);
+    const RGWPubSub::Bucket b(ps, bucket.get());
+    ret = b.get_topics(dpp, result, y);
+    if (ret < 0 && ret != -ENOENT) {
+      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
+           << std::endl;
+      return -ret;
+    }
   }
   result.dump(formatter);
 
index 4a2cf8271651a211a9cb55227e314b819a745f4a..e5d5dd602f598bcb7049a4d9318650ebca51c787 100644 (file)
@@ -16,6 +16,7 @@
 #include "rgw_pubsub_push.h"
 #include "rgw_zone_features.h"
 #include "rgw_perf_counters.h"
+#include "services/svc_zone.h"
 #include "common/dout.h"
 #include <chrono>
 
@@ -980,13 +981,35 @@ static inline bool notification_match(reservation_t& res,
                      reservation_t& res,
                      const RGWObjTags* req_tags)
 {
-  const RGWPubSub ps(res.store, res.user_tenant);
-  const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
   rgw_pubsub_bucket_topics bucket_topics;
-  auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
-  if (rc < 0) {
-    // failed to fetch bucket topics
-    return rc;
+  if (do_all_zonegroups_support_notification_v2(
+          res.store->svc()->zone->get_current_period().get_map().zonegroups)) {
+    auto ret = 0;
+    if (!res.s) {
+      //  for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs
+      //  are not loaded, so force to reload the bucket, that reloads the attr.
+      // for non S3-request caller, res.s is nullptr
+      ret = res.bucket->load_bucket(dpp, res.yield);
+      if (ret < 0) {
+        ldpp_dout(dpp, 1)
+            << "ERROR: failed to reload bucket: '" << res.bucket->get_name()
+            << "' to get bucket notification attrs with error ret= " << ret
+            << dendl;
+        return ret;
+      }
+    }
+    ret = get_bucket_notifications(dpp, res.bucket, bucket_topics);
+    if (ret < 0) {
+      return ret;
+    }
+  } else {
+    const RGWPubSub ps(res.store, res.user_tenant);
+    const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
+    auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
+    if (rc < 0) {
+      // failed to fetch bucket topics
+      return rc;
+    }
   }
   for (const auto& bucket_topic : bucket_topics.topics) {
     const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
@@ -1027,7 +1050,31 @@ static inline bool notification_match(reservation_t& res,
         return ret;
       }
     }
-    res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
+    // load the topic,if there is change in topic config while it's stored in
+    // notification.
+    rgw_pubsub_topic result;
+    const RGWPubSub ps(
+        res.store, res.user_tenant,
+        &res.store->svc()->zone->get_current_period().get_map().zonegroups);
+    auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield);
+    if (ret < 0) {
+      ldpp_dout(res.dpp, 1)
+          << "INFO: failed to load topic: " << topic_cfg.name
+          << ". error: " << ret
+          << " while storing the persistent notification event" << dendl;
+      if (ret == -ENOENT) {
+        // either the topic is deleted but the corresponding notification still
+        // exist or in v2 mode the notification could have synced first but
+        // topic is not synced yet.
+        return 0;
+      }
+      ldpp_dout(res.dpp, 1)
+          << "WARN: Using the stored topic from bucket notification struct."
+          << dendl;
+      res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
+    } else {
+      res.topics.emplace_back(topic_filter.s3_id, result, res_id);
+    }
   }
   return 0;
 }
index 20f0c17e5bb5b1e5e065dce0dcb54b2de11a2966..5117f9eecd8ff63eb8c232e8bcca366260d0c478 100644 (file)
@@ -63,7 +63,7 @@ struct reservation_t {
   size_t size;
   rgw::sal::Object* const object;
   rgw::sal::Object* const src_object; // may differ from object
-  rgw::sal::Bucket* const bucket;
+  rgw::sal::Bucket* bucket;
   const std::string* const object_name;
   boost::optional<const RGWObjTags&> tagset;
   meta_map_t x_meta_map; // metadata cached by value
index ced7b15120d603534996c9fa3a61f432cde625a2..afb161c90aeb5c90028a10afa687dff62a635125 100644 (file)
@@ -10614,20 +10614,28 @@ next:
       return EINVAL;
     }
 
-    RGWPubSub ps(driver, tenant);
-
     rgw_pubsub_bucket_topics result;
     int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
     if (ret < 0) {
       cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-
-    const RGWPubSub::Bucket b(ps, bucket.get());
-    ret = b.get_topics(dpp(), result, null_yield);
-    if (ret < 0 && ret != -ENOENT) {
-      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
+    if (driver->get_zone()->get_zonegroup().supports_feature(
+            rgw::zone_features::notification_v2)) {
+      ret = get_bucket_notifications(dpp(), bucket.get(), result);
+      if (ret < 0) {
+        cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
+             << std::endl;
+        return -ret;
+      }
+    } else {
+      RGWPubSub ps(driver, tenant);
+      const RGWPubSub::Bucket b(ps, bucket.get());
+      ret = b.get_topics(dpp(), result, null_yield);
+      if (ret < 0 && ret != -ENOENT) {
+        cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
     }
     encode_json("result", result, formatter.get());
     formatter->flush(cout);
@@ -10699,24 +10707,30 @@ next:
       cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-
-    RGWPubSub ps(driver, tenant);
-
     rgw_pubsub_bucket_topics bucket_topics;
-    const RGWPubSub::Bucket b(ps, bucket.get());
-    ret = b.get_topics(dpp(), bucket_topics, null_yield);
-    if (ret < 0 && ret != -ENOENT) {
-      cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
+    if (driver->get_zone()->get_zonegroup().supports_feature(
+            rgw::zone_features::notification_v2)) {
+      ret = get_bucket_notifications(dpp(), bucket.get(), bucket_topics);
+      if (ret < 0) {
+        cerr << "ERROR: could not get bucket notifications: "
+             << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
+    } else {
+      RGWPubSub ps(driver, tenant);
+      const RGWPubSub::Bucket b(ps, bucket.get());
+      ret = b.get_topics(dpp(), bucket_topics, null_yield);
+      if (ret < 0 && ret != -ENOENT) {
+        cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
     }
-
-    rgw_pubsub_topic_filter bucket_topic;
-    ret = b.get_notification_by_id(dpp(), notification_id, bucket_topic, null_yield);
-    if (ret < 0) {
-      cerr << "ERROR: could not get notification: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
+    auto iter = find_unique_topic(bucket_topics, notification_id);
+    if (!iter) {
+      cerr << "ERROR: notification was not found" << std::endl;
+      return -ENOENT;
     }
-    encode_json("notification", bucket_topic, formatter.get());
+    encode_json("notification", *iter, formatter.get());
     formatter->flush(cout);
   }
 
@@ -10725,7 +10739,10 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-
+    if (!driver->is_meta_master()) {
+      cerr << "ERROR: Run 'topic rm' from master zone " << std::endl;
+      return -EINVAL;
+    }
     ret = rgw::notify::remove_persistent_topic(
         dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
     if (ret < 0) {
@@ -10753,28 +10770,37 @@ next:
       cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
       return EINVAL;
     }
-
+    if (!driver->is_meta_master()) {
+      cerr << "ERROR: Run 'notification rm' from master zone " << std::endl;
+      return -EINVAL;
+    }
     int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
     if (ret < 0) {
       cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
 
-    RGWPubSub ps(driver, tenant);
+    if (driver->get_zone()->get_zonegroup().supports_feature(
+            rgw::zone_features::notification_v2)) {
+      ret = remove_notification_v2(dpp(), driver, bucket.get(), notification_id,
+                                   null_yield);
+    } else {
+      RGWPubSub ps(driver, tenant);
 
-    rgw_pubsub_bucket_topics bucket_topics;
-    const RGWPubSub::Bucket b(ps, bucket.get());
-    ret = b.get_topics(dpp(), bucket_topics, null_yield);
-    if (ret < 0 && ret != -ENOENT) {
-      cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
+      rgw_pubsub_bucket_topics bucket_topics;
+      const RGWPubSub::Bucket b(ps, bucket.get());
+      ret = b.get_topics(dpp(), bucket_topics, null_yield);
+      if (ret < 0 && ret != -ENOENT) {
+        cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
 
-    rgw_pubsub_topic_filter bucket_topic;
-    if(notification_id.empty()) {
-      ret = b.remove_notifications(dpp(), null_yield);
-    } else {
-      ret = b.remove_notification_by_id(dpp(), notification_id, null_yield);
+      rgw_pubsub_topic_filter bucket_topic;
+      if(notification_id.empty()) {
+        ret = b.remove_notifications(dpp(), null_yield);
+      } else {
+        ret = b.remove_notification_by_id(dpp(), notification_id, null_yield);
+      }
     }
   }
 
index 5bc5c6de2b5ba119831356c5d6f961b6c466fc07..9b4dbfda9de4cca46f761fd43c758bd14331eb12 100644 (file)
@@ -171,6 +171,8 @@ using ceph::crypto::MD5;
 
 #define RGW_ATTR_TRACE RGW_ATTR_PREFIX "trace"
 
+#define RGW_ATTR_BUCKET_NOTIFICATION RGW_ATTR_PREFIX "bucket-notification"
+
 enum class RGWFormat : int8_t {
   BAD_FORMAT = -1,
   PLAIN = 0,
index c563b959863ae6716a38b8b2ac4d75954f0835a5..628f57901afed84b208418e71f171e9b03cdb26b 100644 (file)
@@ -586,7 +586,7 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub
 int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
 {
   if (use_notification_v2) {
-    const int ret = driver->read_topic(name, tenant, result, nullptr, y, dpp);
+    const int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
     if (ret < 0) {
       ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
                         << " tenant: " << tenant << ", ret=" << ret << dendl;
@@ -610,39 +610,26 @@ int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name,
   return 0;
 }
 
-// from list of bucket topics, find the one that was auto-generated by a notification
-auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string &notification_id) {
-  auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(),
-                         [&](const auto& val) { return notification_id == val.second.s3_id; });
-  return it != bucket_topics.topics.end() ?
-         std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
-         std::nullopt;
-}
-
-int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id,
-                                              rgw_pubsub_topic_filter& result, optional_yield y) const {
-  rgw_pubsub_bucket_topics bucket_topics;
-  const int ret = read_topics(dpp, bucket_topics, nullptr, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl;
-    return ret;
+int get_bucket_notifications(const DoutPrefixProvider* dpp,
+                             rgw::sal::Bucket* bucket,
+                             rgw_pubsub_bucket_topics& bucket_topics) {
+  const rgw::sal::Attrs& attrs = bucket->get_attrs();
+  auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+  if (iter == attrs.end()) {
+    return 0;
   }
-
-  auto iter = find_unique_topic(bucket_topics, notification_id);
-  if (!iter) {
-    ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
-    return -ENOENT;
+  try {
+    const auto& bl = iter->second;
+    auto biter = bl.cbegin();
+    bucket_topics.decode(biter);
+  } catch (buffer::error& err) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to decode bucket topics for bucket: "
+                      << bucket->get_name() << dendl;
+    return -EIO;
   }
-
-  result = iter->get();
   return 0;
 }
 
-
-int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
-    const rgw::notify::EventTypeList& events, optional_yield y) const {
-  return create_notification(dpp, topic_name, events, std::nullopt, "", y);
-    }
 bool do_all_zonegroups_support_notification_v2(
     std::map<std::string, RGWZoneGroup> zonegroups) {
   for (const auto& [_, zonegroup] : zonegroups) {
@@ -653,7 +640,85 @@ bool do_all_zonegroups_support_notification_v2(
   return true;
 }
 
-int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
+std::string topic_to_unique(const std::string& topic,
+                            const std::string& notification) {
+  return notification + "_" + topic;
+}
+
+// from list of bucket topics, find the one that was auto-generated by a notification
+std::optional<rgw_pubsub_topic_filter> find_unique_topic(
+    const rgw_pubsub_bucket_topics& bucket_topics,
+    const std::string& notification_id) {
+  auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(),
+                         [&](const auto& val) { return notification_id == val.second.s3_id; });
+  if (it != bucket_topics.topics.end())
+    return it->second;
+  return std::nullopt;
+}
+
+int delete_all_notifications(const DoutPrefixProvider* dpp,
+                             const rgw_pubsub_bucket_topics& bucket_topics,
+                             std::map<std::string, bufferlist>& attrs,
+                             rgw::sal::Bucket* bucket,
+                             rgw::sal::Driver* driver,
+                             optional_yield y) {
+  auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+  if (iter == attrs.end()) {
+    return 0;
+  }
+  // delete all notifications of on a bucket
+  attrs.erase(iter);
+  const auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1)
+        << "Failed to remove RGW_ATTR_BUCKET_NOTIFICATION attr on bucket="
+        << bucket->get_name() << " ret= " << ret << dendl;
+  }
+  return ret;
+}
+
+int remove_notification_v2(const DoutPrefixProvider* dpp,
+                           rgw::sal::Driver* driver,
+                           rgw::sal::Bucket* bucket,
+                           const std::string& notification_id,
+                           optional_yield y) {
+  rgw_pubsub_bucket_topics bucket_topics;
+  auto ret = get_bucket_notifications(dpp, bucket, bucket_topics);
+  if (ret < 0) {
+    return -ret;
+  }
+  // no notifications on the bucket.
+  if (bucket_topics.topics.empty()) {
+    return 0;
+  }
+  rgw::sal::Attrs& attrs = bucket->get_attrs();
+  if (notification_id.empty()) {
+    return delete_all_notifications(dpp, bucket_topics, attrs, bucket, driver,
+                                    y);
+  }
+  // delete a specific notification
+  const auto unique_topic = find_unique_topic(bucket_topics, notification_id);
+  if (!unique_topic) {
+    // notification to be removed is not found - considered success
+    ldpp_dout(dpp, 20) << "notification '" << notification_id
+                       << "' already removed" << dendl;
+    return 0;
+  }
+  const auto& topic_name = unique_topic->topic.name;
+  bucket_topics.topics.erase(topic_to_unique(topic_name, notification_id));
+  bufferlist bl;
+  bucket_topics.encode(bl);
+  attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+  ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1)
+        << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
+        << bucket->get_name() << " returned err= " << ret << dendl;
+  }
+  return ret;
+}
+
+int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
     const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const {
   rgw_pubsub_topic topic_info;
 
@@ -720,7 +785,7 @@ int RGWPubSub::Bucket::remove_notification_inner(const DoutPrefixProvider *dpp,
       ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
       return -ENOENT;
     }
-    topic_name = std::make_unique<std::string>(iter->get().topic.name);
+    topic_name = std::make_unique<std::string>(iter->topic.name);
   }
 
   if (bucket_topics.topics.erase(*topic_name) == 0) {
@@ -786,12 +851,11 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
                             const rgw_pubsub_topic& topic,
                             optional_yield y) const {
   RGWObjVersionTracker objv_tracker;
-  const auto ret = driver->write_topic(topic, &objv_tracker, y, dpp);
+  auto ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
                       << dendl;
   }
-
   return ret;
 }
 
@@ -848,13 +912,13 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
   if (ret < 0 && ret != -ENOENT) {
     return ret;
   } else if (ret == -ENOENT) {
-    // its not an error if no topics exist, just a no-op
+    // it's not an error if no topics exist, just a no-op
     ldpp_dout(dpp, 10) << "WARNING: topic name:" << name
                        << " does not exist, deletion is a no-op: ret=" << ret
                        << dendl;
     return 0;
   }
-  ret = driver->remove_topic(name, tenant, &objv_tracker, y, dpp);
+  ret = driver->remove_topic_v2(name, tenant, &objv_tracker, y, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
                       << dendl;
index 4afc101c63ccc36f4e03b77772bd0ee640d2c5b2..46da8e045a9f57400b5a024d666f4c4aac4386d0 100644 (file)
@@ -672,3 +672,23 @@ namespace rgw::notify {
 
 bool do_all_zonegroups_support_notification_v2(
     std::map<std::string, RGWZoneGroup> zonegroups);
+
+std::string topic_to_unique(const std::string& topic,
+                            const std::string& notification);
+
+std::optional<rgw_pubsub_topic_filter> find_unique_topic(
+    const rgw_pubsub_bucket_topics& bucket_topics,
+    const std::string& notif_name);
+
+// Delete the bucket notification if |notification_id| is passed, else delete
+// all the bucket notifications for the given |bucket| and update the topic
+// bucket mapping.
+int remove_notification_v2(const DoutPrefixProvider* dpp,
+                           rgw::sal::Driver* driver,
+                           rgw::sal::Bucket* bucket,
+                           const std::string& notification_id,
+                           optional_yield y);
+
+int get_bucket_notifications(const DoutPrefixProvider* dpp,
+                             rgw::sal::Bucket* bucket,
+                             rgw_pubsub_bucket_topics& bucket_topics);
index 7396fcfddd4197cd9624fa0c2647781ead45cbbf..f2e5439208feaa87e06ff07c64526266e9e13160 100644 (file)
@@ -894,29 +894,6 @@ int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, option
   return 0;
 }
 
-namespace {
-// return a unique topic by prefexing with the notification name: <notification>_<topic>
-std::string topic_to_unique(const std::string& topic, const std::string& notification) {
-  return notification + "_" + topic;
-}
-
-// extract the topic from a unique topic of the form: <notification>_<topic>
-[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
-  if (unique_topic.find(notification + "_") == std::string::npos) {
-    return "";
-  }
-  return unique_topic.substr(notification.length() + 1);
-}
-
-// from list of bucket topics, find the one that was auto-generated by a notification
-auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
-    auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
-    return it != bucket_topics.topics.end() ?
-        std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
-        std::nullopt;
-}
-}
-
 int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
   int op_ret = b.remove_notification(dpp, topic_name, y);
   if (op_ret < 0) {
@@ -944,6 +921,7 @@ int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_buc
 // a "notification" and a subscription will be auto-generated
 // actual configuration is XML encoded in the body of the message
 class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+  bufferlist data;
   int verify_params() override {
     bool exists;
     const auto no_value = s->info.args.get("notification", &exists);
@@ -965,7 +943,6 @@ class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
   int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
     const auto max_size = s->cct->_conf->rgw_max_put_param_size;
     int r;
-    bufferlist data;
     std::tie(r, data) = read_all_input(s, max_size, false);
 
     if (r < 0) {
@@ -1010,9 +987,14 @@ public:
 
 
   void execute(optional_yield) override;
+  void execute_v2(optional_yield);
 };
 
 void RGWPSCreateNotifOp::execute(optional_yield y) {
+  if (do_all_zonegroups_support_notification_v2(
+          s->penv.site->get_period()->get_map().zonegroups)) {
+    return execute_v2(y);
+  }
   op_ret = verify_params();
   if (op_ret < 0) {
     return;
@@ -1023,6 +1005,16 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
+  if (!driver->is_meta_master()) {
+    op_ret = rgw_forward_request_to_master(
+        this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1) << "CreateBucketNotification "
+                            "forward_request_to_master returned ret = "
+                         << op_ret << dendl;
+      return;
+    }
+  }
 
   std::unique_ptr<rgw::sal::Bucket> bucket;
   op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
@@ -1134,6 +1126,132 @@ int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
   return 0;
 }
 
+void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
+  op_ret = verify_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  rgw_pubsub_s3_notifications configurations;
+  op_ret = get_params_from_body(configurations);
+  if (op_ret < 0) {
+    return;
+  }
+  if (!driver->is_meta_master()) {
+    op_ret = rgw_forward_request_to_master(
+        this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1) << "CreateBucketNotification "
+                            "forward_request_to_master returned ret = "
+                         << op_ret << dendl;
+      return;
+    }
+  }
+
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
+                               &bucket, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get bucket '"
+                       << (s->bucket_tenant.empty()
+                               ? s->bucket_name
+                               : s->bucket_tenant + ":" + s->bucket_name)
+                       << "' info, ret = " << op_ret << dendl;
+    return;
+  }
+  if (configurations.list.empty()) {
+    op_ret = remove_notification_v2(this, driver, bucket.get(),
+                                    /*delete all notif=true*/ "", y);
+    return;
+  }
+  rgw_pubsub_bucket_topics bucket_topics;
+  op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1)
+        << "failed to load existing bucket notification on bucket: "
+        << (s->bucket_tenant.empty() ? s->bucket_name
+                                     : s->bucket_tenant + ":" + s->bucket_name)
+        << "' , ret = " << op_ret << dendl;
+    return;
+  }
+  const RGWPubSub ps(driver, s->owner.id.tenant,
+                     &s->penv.site->get_period()->get_map().zonegroups);
+  std::unordered_map<std::string, rgw_pubsub_topic> topics;
+  const auto rgwbucket = rgw_bucket(s->bucket_tenant, s->bucket_name, "");
+  for (const auto& c : configurations.list) {
+    const auto& notif_name = c.id;
+    if (notif_name.empty()) {
+      ldpp_dout(this, 1) << "missing notification id" << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+    if (c.topic_arn.empty()) {
+      ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name
+                         << "'" << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+
+    const auto arn = rgw::ARN::parse(c.topic_arn);
+    if (!arn || arn->resource.empty()) {
+      ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn
+                         << "' in notification: '" << notif_name << "'"
+                         << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+
+    if (std::find(c.events.begin(), c.events.end(),
+                  rgw::notify::UnknownEvent) != c.events.end()) {
+      ldpp_dout(this, 1) << "unknown event type in notification: '"
+                         << notif_name << "'" << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+    const auto& topic_name = arn->resource;
+    if (!topics.contains(topic_name)) {
+      // get topic information. destination information is stored in the topic
+      rgw_pubsub_topic topic_info;
+      op_ret = ps.get_topic(this, topic_name, topic_info, y);
+      if (op_ret < 0) {
+        ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+                           << "', ret=" << op_ret << dendl;
+        return;
+      }
+      op_ret = verify_topic_owner_or_policy(
+          s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
+          rgw::IAM::snsPublish);
+      if (op_ret != 0) {
+        ldpp_dout(this, 1) << "failed to create notification for topic '"
+                           << topic_name << "' topic owned by other user"
+                           << dendl;
+        return;
+      }
+      topics[topic_name] = std::move(topic_info);
+    }
+    auto& topic_filter =
+        bucket_topics.topics[topic_to_unique(topic_name, notif_name)];
+    topic_filter.topic = topics[topic_name];
+    topic_filter.events = c.events;
+    topic_filter.s3_id = notif_name;
+    topic_filter.s3_filter = c.filter;
+  }
+  // finally store all the bucket notifications as attr.
+  bufferlist bl;
+  bucket_topics.encode(bl);
+  rgw::sal::Attrs& attrs = bucket->get_attrs();
+  attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+  op_ret = bucket->merge_and_store_attrs(this, attrs, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1)
+        << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
+        << bucket->get_name() << " returned err= " << op_ret << dendl;
+    return;
+  }
+  ldpp_dout(this, 20) << "successfully created bucket notification for bucket: "
+                      << bucket->get_name() << dendl;
+}
+
 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
 class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
   int get_params(std::string& notif_name) const {
@@ -1149,8 +1267,9 @@ class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
     }
     return 0;
   }
+  void execute_v2(optional_yield y);
 
-public:
+ public:
   int verify_permission(optional_yield y) override;
 
   void pre_exec() override {
@@ -1165,11 +1284,26 @@ public:
 };
 
 void RGWPSDeleteNotifOp::execute(optional_yield y) {
+  if (do_all_zonegroups_support_notification_v2(
+          s->penv.site->get_period()->get_map().zonegroups)) {
+    return execute_v2(y);
+  }
   std::string notif_name;
   op_ret = get_params(notif_name);
   if (op_ret < 0) {
     return;
   }
+  if (!driver->is_meta_master()) {
+    bufferlist indata;
+    op_ret = rgw_forward_request_to_master(
+        this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1) << "DeleteBucketNotification "
+                            "forward_request_to_master returned error ret= "
+                         << op_ret << dendl;
+      return;
+    }
+  }
 
   std::unique_ptr<rgw::sal::Bucket> bucket;
   op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
@@ -1196,7 +1330,7 @@ void RGWPSDeleteNotifOp::execute(optional_yield y) {
     // delete a specific notification
     const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
     if (unique_topic) {
-      const auto unique_topic_name = unique_topic->get().topic.name;
+      const auto unique_topic_name = unique_topic->topic.name;
       op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
       return;
     }
@@ -1216,6 +1350,38 @@ int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
   return 0;
 }
 
+void RGWPSDeleteNotifOp::execute_v2(optional_yield y) {
+  std::string notif_name;
+  op_ret = get_params(notif_name);
+  if (op_ret < 0) {
+    return;
+  }
+  if (!driver->is_meta_master()) {
+    bufferlist indata;
+    op_ret = rgw_forward_request_to_master(
+        this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1) << "DeleteBucketNotification "
+                            "forward_request_to_master returned error ret= "
+                         << op_ret << dendl;
+      return;
+    }
+  }
+
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
+                               &bucket, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get bucket '"
+                       << (s->bucket_tenant.empty()
+                               ? s->bucket_name
+                               : s->bucket_tenant + ":" + s->bucket_name)
+                       << "' info, ret = " << op_ret << dendl;
+    return;
+  }
+  op_ret = remove_notification_v2(this, driver, bucket.get(), notif_name, y);
+}
+
 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
 class RGWPSListNotifsOp : public RGWOp {
   rgw_pubsub_s3_notifications notifications;
@@ -1278,21 +1444,26 @@ void RGWPSListNotifsOp::execute(optional_yield y) {
     return;
   }
 
-  const RGWPubSub ps(driver, s->owner.id.tenant);
-  const RGWPubSub::Bucket b(ps, bucket.get());
-  
   // get all topics on a bucket
   rgw_pubsub_bucket_topics bucket_topics;
-  op_ret = b.get_topics(this, bucket_topics, y);
+  if (do_all_zonegroups_support_notification_v2(
+          s->penv.site->get_period()->get_map().zonegroups)) {
+    op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics);
+  } else {
+    const RGWPubSub ps(driver, s->owner.id.tenant);
+    const RGWPubSub::Bucket b(ps, bucket.get());
+    op_ret = b.get_topics(this, bucket_topics, y);
+  }
   if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
+    ldpp_dout(this, 1) << "failed to get list of topics from bucket '"
+                       << s->bucket_name << "', ret=" << op_ret << dendl;
     return;
   }
   if (!notif_name.empty()) {
     // get info of a specific notification
     const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
     if (unique_topic) {
-      notifications.list.emplace_back(unique_topic->get());
+      notifications.list.emplace_back(*unique_topic);
       return;
     }
     op_ret = -ENOENT;