]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite-notification: Add support to replicate topic creation in multisite...
authorkchheda3 <kchheda3@bloomberg.net>
Mon, 27 Nov 2023 20:51:02 +0000 (15:51 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 10 Apr 2024 13:18:06 +0000 (09:18 -0400)
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
(cherry picked from commit c4a147d079cb9f268340e5f79ec5350a94296658)

24 files changed:
src/rgw/CMakeLists.txt
src/rgw/driver/daos/rgw_sal_daos.h
src/rgw/driver/motr/rgw_sal_motr.h
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_rest_pubsub.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/driver/rados/rgw_service.cc
src/rgw/driver/rados/rgw_service.h
src/rgw/driver/rados/rgw_sync.cc
src/rgw/driver/rados/rgw_zone.h
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h
src/rgw/rgw_zone.cc
src/rgw/rgw_zone_features.h
src/rgw/services/svc_topic_rados.cc [new file with mode: 0644]
src/rgw/services/svc_topic_rados.h [new file with mode: 0644]

index 3c2f1423808b2639cc3fdfd4fda8e2a5ab7b025a..2987b70b3826f4fb372a97d40e5cd2890673f324 100644 (file)
@@ -53,6 +53,7 @@ set(librgw_common_srcs
   services/svc_sys_obj_cache.cc
   services/svc_sys_obj_core.cc
   services/svc_tier_rados.cc
+  services/svc_topic_rados.cc
   services/svc_user.cc
   services/svc_user_rados.cc
   services/svc_zone.cc
index c5cfefc222d101a029a0ab608ccfba189737385a..479f5b84a5e30e250c6c8cb6bb16bfc9e9fda53a 100644 (file)
@@ -430,6 +430,9 @@ class DaosZoneGroup : public StoreZoneGroup {
     return std::make_unique<DaosZoneGroup>(store, group);
   }
   const RGWZoneGroup& get_group() { return group; }
+  virtual bool supports_feature(std::string_view feature) const override {
+    return group.supports(feature);
+  }
 };
 
 class DaosZone : public StoreZone {
index 3cc3b37fa9ad0e4073a172540dc30727efc93f3f..9ff14a58307dcefad2b5e065c2f213a30f902129 100644 (file)
@@ -474,6 +474,9 @@ public:
   virtual std::unique_ptr<ZoneGroup> clone() override {
     return std::make_unique<MotrZoneGroup>(store, group);
   }
+  virtual bool supports_feature(std::string_view feature) const override {
+    return group.supports(feature);
+  }
   friend class MotrZone;
 };
 
index 9baed182ed9466beae6a8f52e7821afd7d6eedff..4a2cf8271651a211a9cb55227e314b819a745f4a 100644 (file)
@@ -14,6 +14,7 @@
 #include "rgw_sal_rados.h"
 #include "rgw_pubsub.h"
 #include "rgw_pubsub_push.h"
+#include "rgw_zone_features.h"
 #include "rgw_perf_counters.h"
 #include "common/dout.h"
 #include <chrono>
index 27bde7a95d5af781c8be28fb9b7f8162fc408c5b..8b37992b91887e028d97cb3333c7cfd3335674cb 100644 (file)
@@ -34,5 +34,6 @@ public:
   int postauth_init(optional_yield) override { return 0; }
   int authorize(const DoutPrefixProvider* dpp, optional_yield y) override;
   static bool action_exists(const req_state* s);
+  static bool action_exists(const req_info& info);
 };
 
index e04e25842e2b4c136a37793ce221d15b5e00d7a9..608800c7e573edb077cb84d904c9c00e05f46601 100644 (file)
@@ -62,6 +62,7 @@
 #include "services/svc_role_rados.h"
 #include "services/svc_user.h"
 #include "services/svc_sys_obj_cache.h"
+#include "services/svc_topic_rados.h"
 #include "cls/rgw/cls_rgw_client.h"
 
 #include "rgw_pubsub.h"
@@ -1111,6 +1112,66 @@ int RadosStore::remove_topics(const std::string& tenant, RGWObjVersionTracker* o
       objv_tracker, y);
 }
 
+int RadosStore::read_topic_v2(const std::string& topic_name,
+                              const std::string& tenant,
+                              rgw_pubsub_topic& topic,
+                              RGWObjVersionTracker* objv_tracker,
+                              optional_yield y,
+                              const DoutPrefixProvider* dpp) {
+  bufferlist bl;
+  auto mtime = ceph::real_clock::zero();
+  RGWSI_MBSObj_GetParams params(&bl, nullptr, &mtime);
+  std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
+      svc()->topic->svc.meta_be->alloc_ctx());
+  ctx->init(svc()->topic->get_be_handler());
+  const int ret = svc()->topic->svc.meta_be->get(
+      ctx.get(), get_topic_key(topic_name, tenant), params, objv_tracker, y,
+      dpp);
+  if (ret < 0) {
+    return ret;
+  }
+
+  auto iter = bl.cbegin();
+  try {
+    decode(topic, iter);
+  } catch (buffer::error& err) {
+    ldpp_dout(dpp, 20) << " failed to decode topic: " << topic_name
+                       << ". error: " << err.what() << dendl;
+    return -EIO;
+  }
+  return 0;
+}
+
+int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic,
+                               RGWObjVersionTracker* objv_tracker,
+                               optional_yield y,
+                               const DoutPrefixProvider* dpp) {
+  bufferlist bl;
+  encode(topic, bl);
+  RGWSI_MBSObj_PutParams params(bl, nullptr, ceph::real_clock::zero(),
+                                /*exclusive*/ false);
+  std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
+      svc()->topic->svc.meta_be->alloc_ctx());
+  ctx->init(svc()->topic->get_be_handler());
+  return svc()->topic->svc.meta_be->put(
+      ctx.get(), get_topic_key(topic.name, topic.user.tenant), params,
+      objv_tracker, y, dpp);
+}
+
+int RadosStore::remove_topic_v2(const std::string& topic_name,
+                                const std::string& tenant,
+                                RGWObjVersionTracker* objv_tracker,
+                                optional_yield y,
+                                const DoutPrefixProvider* dpp) {
+  RGWSI_MBSObj_RemoveParams params;
+  std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
+      svc()->topic->svc.meta_be->alloc_ctx());
+  ctx->init(svc()->topic->get_be_handler());
+  return svc()->topic->svc.meta_be->remove(ctx.get(),
+                                           get_topic_key(topic_name, tenant),
+                                           params, objv_tracker, y, dpp);
+}
+
 int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
 {
   return rados->delete_raw_obj(dpp, obj, y);
index 39a554f15c95bca9de7dc5d751e523e69f6fa01d..33db603d785acbff5822155f11cb9a82bd11d5bf 100644 (file)
@@ -85,6 +85,9 @@ public:
   virtual std::unique_ptr<ZoneGroup> clone() override {
     return std::make_unique<RadosZoneGroup>(store, group);
   }
+  virtual bool supports_feature(std::string_view feature) const override {
+    return group.supports(feature);
+  }
   const RGWZoneGroup& get_group() const { return group; }
 };
 
@@ -166,6 +169,21 @@ class RadosStore : public StoreDriver {
        optional_yield y, const DoutPrefixProvider *dpp) override;
     int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
         optional_yield y, const DoutPrefixProvider *dpp) override;
+    int read_topic_v2(const std::string& topic_name,
+                      const std::string& tenant,
+                      rgw_pubsub_topic& topic,
+                      RGWObjVersionTracker* objv_tracker,
+                      optional_yield y,
+                      const DoutPrefixProvider* dpp) override;
+    int write_topic_v2(const rgw_pubsub_topic& topic,
+                       RGWObjVersionTracker* objv_tracker,
+                       optional_yield y,
+                       const DoutPrefixProvider* dpp) override;
+    int remove_topic_v2(const std::string& topic_name,
+                        const std::string& tenant,
+                        RGWObjVersionTracker* objv_tracker,
+                        optional_yield y,
+                        const DoutPrefixProvider* dpp) override;
     virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
 
index 4be0738bae2bf0ab0e24eaabd7e24b606463b9a9..1f05495fb3d4617e06847ee97da5c768db9e392e 100644 (file)
@@ -26,6 +26,7 @@
 #include "services/svc_sys_obj_core.h"
 #include "services/svc_user_rados.h"
 #include "services/svc_role_rados.h"
+#include "services/svc_topic_rados.h"
 
 #include "common/errno.h"
 
@@ -37,6 +38,7 @@
 #include "rgw_sal_rados.h"
 #include "rgw_user.h"
 #include "rgw_role.h"
+#include "rgw_pubsub.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -80,6 +82,7 @@ int RGWServices_Def::init(CephContext *cct,
   role_rados = std::make_unique<RGWSI_Role_RADOS>(cct);
   async_processor = std::make_unique<RGWAsyncRadosProcessor>(
     cct, cct->_conf->rgw_num_async_rados_threads);
+  topic_rados = std::make_unique<RGWSI_Topic_RADOS>(cct);
 
   if (have_cache) {
     sysobj_cache = std::make_unique<RGWSI_SysObj_Cache>(dpp, cct);
@@ -124,7 +127,7 @@ int RGWServices_Def::init(CephContext *cct,
   user_rados->init(driver->getRados()->get_rados_handle(), zone.get(), sysobj.get(), sysobj_cache.get(),
                    meta.get(), meta_be_sobj.get(), sync_modules.get());
   role_rados->init(zone.get(), meta.get(), meta_be_sobj.get(), sysobj.get());
-
+  topic_rados->init(zone.get(), meta.get(), meta_be_sobj.get(), sysobj.get());
   can_shutdown = true;
 
   int r = finisher->start(y, dpp);
@@ -255,7 +258,12 @@ int RGWServices_Def::init(CephContext *cct,
       ldout(cct, 0) << "ERROR: failed to start role_rados service (" << cpp_strerror(-r) << dendl;
       return r;
     }
-
+    r = topic_rados->start(y, dpp);
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to start topic_rados service ("
+                    << cpp_strerror(-r) << dendl;
+      return r;
+    }
   }
 
   /* cache or core services will be started by sysobj */
@@ -273,6 +281,7 @@ void RGWServices_Def::shutdown()
     return;
   }
 
+  topic_rados->shutdown();
   role_rados->shutdown();
   datalog_rados.reset();
   user_rados->shutdown();
@@ -342,6 +351,7 @@ int RGWServices::do_init(CephContext *_cct, rgw::sal::RadosStore* driver, bool h
   user = _svc.user_rados.get();
   role = _svc.role_rados.get();
   async_processor = _svc.async_processor.get();
+  topic = _svc.topic_rados.get();
 
   return 0;
 }
@@ -390,6 +400,7 @@ int RGWCtlDef::init(RGWServices& svc, rgw::sal::Driver* driver, const DoutPrefix
 
   meta.otp.reset(RGWOTPMetaHandlerAllocator::alloc());
   meta.role = std::make_unique<rgw::sal::RGWRoleMetadataHandler>(driver, svc.role);
+  meta.topic = std::make_unique<RGWTopicMetadataHandler>(driver, svc.topic);
 
   user.reset(new RGWUserCtl(svc.zone, svc.user, (RGWUserMetadataHandler *)meta.user.get()));
   bucket.reset(new RGWBucketCtl(svc.zone,
@@ -436,6 +447,7 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, const DoutPrefixPr
   meta.bucket_instance = _ctl.meta.bucket_instance.get();
   meta.otp = _ctl.meta.otp.get();
   meta.role = _ctl.meta.role.get();
+  meta.topic = _ctl.meta.topic.get();
 
   user = _ctl.user.get();
   bucket = _ctl.bucket.get();
@@ -470,6 +482,12 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, const DoutPrefixPr
     ldout(cct, 0) << "ERROR: failed to start init otp ctl (" << cpp_strerror(-r) << dendl;
     return r;
   }
+  r = meta.topic->attach(meta.mgr);
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: failed to start init topic ctl ("
+                  << cpp_strerror(-r) << dendl;
+    return r;
+  }
   return 0;
 }
 
index 9996b42e2514563aaf7afebdec1375f7d0659062..08873e6058e5ada2f41a05e01853f847079cbc03 100644 (file)
@@ -78,6 +78,7 @@ class RGWSI_User_RADOS;
 class RGWDataChangesLog;
 class RGWSI_Role_RADOS;
 class RGWAsyncRadosProcessor;
+class RGWSI_Topic_RADOS;
 
 struct RGWServices_Def
 {
@@ -109,6 +110,7 @@ struct RGWServices_Def
   std::unique_ptr<RGWSI_Role_RADOS> role_rados;
   std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
 
+  std::unique_ptr<RGWSI_Topic_RADOS> topic_rados;
   RGWServices_Def();
   ~RGWServices_Def();
 
@@ -153,6 +155,7 @@ struct RGWServices
   RGWSI_User *user{nullptr};
   RGWSI_Role_RADOS *role{nullptr};
   RGWAsyncRadosProcessor* async_processor;
+  RGWSI_Topic_RADOS* topic{nullptr};
 
   int do_init(CephContext *cct, rgw::sal::RadosStore* store, bool have_cache,
              bool raw_storage, bool run_sync, optional_yield y,
@@ -187,6 +190,7 @@ struct RGWCtlDef {
     std::unique_ptr<RGWMetadataHandler> user;
     std::unique_ptr<RGWMetadataHandler> otp;
     std::unique_ptr<RGWMetadataHandler> role;
+    std::unique_ptr<RGWMetadataHandler> topic;
 
     _meta();
     ~_meta();
@@ -216,6 +220,7 @@ struct RGWCtl {
     RGWMetadataHandler *user{nullptr};
     RGWMetadataHandler *otp{nullptr};
     RGWMetadataHandler *role{nullptr};
+    RGWMetadataHandler* topic{nullptr};
   } meta;
 
   RGWUserCtl *user{nullptr};
index c5ea9f99ec5fa9517f19918fec6e34aaede5ecc8..a9ea2ecf5491d5dd6a9760442626c2c57ee14a72 100644 (file)
@@ -886,6 +886,7 @@ public:
     append_section_from_set(all_sections, "bucket.instance");
     append_section_from_set(all_sections, "bucket");
     append_section_from_set(all_sections, "roles");
+    append_section_from_set(all_sections, "topic");
 
     std::move(all_sections.begin(), all_sections.end(),
               std::back_inserter(sections));
index f0dccdbc4e94e86ed45523ea9cbfe55471e8d51f..2eb3e725253fa7f342db1a04c8507815312cfdc3 100644 (file)
@@ -114,6 +114,7 @@ struct RGWZoneParams : RGWSystemMetaObj {
   rgw_pool otp_pool;
   rgw_pool oidc_pool;
   rgw_pool notif_pool;
+  rgw_pool topics_pool;
 
   RGWAccessKey system_key;
 
@@ -150,7 +151,7 @@ struct RGWZoneParams : RGWSystemMetaObj {
   const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
   
   void encode(bufferlist& bl) const override {
-    ENCODE_START(14, 1, bl);
+    ENCODE_START(15, 1, bl);
     encode(domain_root, bl);
     encode(control_pool, bl);
     encode(gc_pool, bl);
@@ -176,11 +177,12 @@ struct RGWZoneParams : RGWSystemMetaObj {
     encode(tier_config, bl);
     encode(oidc_pool, bl);
     encode(notif_pool, bl);
+    encode(topics_pool, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) override {
-    DECODE_START(14, bl);
+    DECODE_START(15, bl);
     decode(domain_root, bl);
     decode(control_pool, bl);
     decode(gc_pool, bl);
@@ -249,6 +251,11 @@ struct RGWZoneParams : RGWSystemMetaObj {
     } else {
       notif_pool = log_pool.name + ":notif";
     }
+    if (struct_v >= 15) {
+      decode(topics_pool, bl);
+    } else {
+      topics_pool = name + ".rgw.meta:topics";
+    }
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
index 4d4e76c74e88ef40c38838ca12c8bcfbc7472b83..e25f188a732823917f8be8991c620ac2e349401c 100644 (file)
@@ -10639,10 +10639,15 @@ next:
   }
 
   if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
-    RGWPubSub ps(driver, tenant);
-
+    auto site = std::make_unique<rgw::SiteConfig>();
+    ret = site->load(dpp(), null_yield, cfgstore.get());
+    if (ret < 0) {
+      std::cerr << "Unable to initialize site config." << std::endl;
+      exit(1);
+    }
+    RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
     rgw_pubsub_topics result;
-    int ret = ps.get_topics(dpp(), result, null_yield);
+    ret = ps.get_topics(dpp(), result, null_yield);
     if (ret < 0 && ret != -ENOENT) {
       cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -10666,8 +10671,13 @@ next:
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
     }
-
-    RGWPubSub ps(driver, tenant);
+    auto site = std::make_unique<rgw::SiteConfig>();
+    ret = site->load(dpp(), null_yield, cfgstore.get());
+    if (ret < 0) {
+      std::cerr << "Unable to initialize site config." << std::endl;
+      exit(1);
+    }
+    RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
 
     rgw_pubsub_topic topic;
     ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
@@ -10728,7 +10738,13 @@ next:
       return -ret;
     }
 
-    RGWPubSub ps(driver, tenant);
+    auto site = std::make_unique<rgw::SiteConfig>();
+    ret = site->load(dpp(), null_yield, cfgstore.get());
+    if (ret < 0) {
+      std::cerr << "Unable to initialize site config." << std::endl;
+      exit(1);
+    }
+    RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
 
     ret = ps.remove_topic(dpp(), topic_name, null_yield);
     if (ret < 0) {
index 7031c2363f0b8eea6a71b3804c3b15bd070fd000..c563b959863ae6716a38b8b2ac4d75954f0835a5 100644 (file)
@@ -9,6 +9,7 @@
 #include "rgw_xml.h"
 #include "rgw_arn.h"
 #include "rgw_pubsub_push.h"
+#include "common/errno.h"
 #include <regex>
 #include <algorithm>
 
@@ -371,6 +372,15 @@ void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
   f->close_section(); // Attributes
 }
 
+void rgw_pubsub_topic::decode_json(JSONObj* f) {
+  JSONDecoder::decode_json("user", user, f);
+  JSONDecoder::decode_json("name", name, f);
+  JSONDecoder::decode_json("dest", dest, f);
+  JSONDecoder::decode_json("arn", arn, f);
+  JSONDecoder::decode_json("opaqueData", opaque_data, f);
+  JSONDecoder::decode_json("policy", policy_text, f);
+}
+
 void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
 {
   f->open_array_section(name);
@@ -462,13 +472,74 @@ std::string rgw_pubsub_dest::to_json_str() const
   return ss.str();
 }
 
+void rgw_pubsub_dest::decode_json(JSONObj* f) {
+  using rgw::notify::DEFAULT_CONFIG;
+  using rgw::notify::DEFAULT_GLOBAL_VALUE;
+  JSONDecoder::decode_json("push_endpoint", push_endpoint, f);
+  JSONDecoder::decode_json("push_endpoint_args", push_endpoint_args, f);
+  JSONDecoder::decode_json("push_endpoint_topic", arn_topic, f);
+  JSONDecoder::decode_json("stored_secret", stored_secret, f);
+  JSONDecoder::decode_json("persistent", persistent, f);
+  std::string ttl;
+  JSONDecoder::decode_json("time_to_live", ttl, f);
+  time_to_live = ttl == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE : std::stoul(ttl);
+
+  std::string max_retry;
+  JSONDecoder::decode_json("max_retries", max_retry, f);
+  max_retries = max_retry == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE
+                                            : std::stoul(max_retry);
+
+  std::string sleep_dur;
+  JSONDecoder::decode_json("retry_sleep_duration", sleep_dur, f);
+  retry_sleep_duration = sleep_dur == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE
+                                                     : std::stoul(sleep_dur);
+}
+
 RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant)
   : driver(_driver), tenant(_tenant)
 {}
 
-int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, 
+RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
+                     const std::string& _tenant,
+                     const std::map<std::string, RGWZoneGroup>* _zonegroups)
+    : driver(_driver), tenant(_tenant), zonegroups(_zonegroups) {
+  use_notification_v2 = do_all_zonegroups_support_notification_v2(*zonegroups);
+}
+
+int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
     RGWObjVersionTracker *objv_tracker, optional_yield y) const
 {
+  if (use_notification_v2) {
+    void* handle = NULL;
+    auto ret =
+        driver->meta_list_keys_init(dpp, "topic", std::string(), &handle);
+    if (ret < 0) {
+      return ret;
+    }
+    bool truncated;
+    int max = 1000;
+    do {
+      std::list<std::string> topics;
+      ret = driver->meta_list_keys_next(dpp, handle, max, topics, &truncated);
+      if (ret < 0) {
+            ldpp_dout(dpp, 1)
+                << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
+            break;
+      }
+      for (auto& topic_name : topics) {
+            rgw_pubsub_topic topic;
+            int ret = get_topic(dpp, topic_name, topic, y);
+            if (ret < 0) {
+              ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name
+                                << "' info: ret=" << ret << dendl;
+              continue;
+            }
+            result.topics[topic_name] = std::move(topic);
+      }
+    } while (truncated);
+    driver->meta_list_keys_complete(handle);
+    return ret;
+  }
   const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
@@ -514,6 +585,14 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub
 
 int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
 {
+  if (use_notification_v2) {
+    const int ret = driver->read_topic(name, tenant, result, nullptr, y, dpp);
+    if (ret < 0) {
+      ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
+                        << " tenant: " << tenant << ", ret=" << ret << dendl;
+    }
+    return ret;
+  }
   rgw_pubsub_topics topics;
   const int ret = read_topics(dpp, topics, nullptr, y);
   if (ret < 0) {
@@ -563,6 +642,15 @@ int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, con
 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
     const rgw::notify::EventTypeList& events, optional_yield y) const {
   return create_notification(dpp, topic_name, events, std::nullopt, "", y);
+    }
+bool do_all_zonegroups_support_notification_v2(
+    std::map<std::string, RGWZoneGroup> zonegroups) {
+  for (const auto& [_, zonegroup] : zonegroups) {
+    if (!zonegroup.supports(rgw::zone_features::notification_v2)) {
+      return false;
+    }
+  }
+  return true;
 }
 
 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
@@ -694,6 +782,19 @@ int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optio
   return 0;
 }
 
+int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
+                            const rgw_pubsub_topic& topic,
+                            optional_yield y) const {
+  RGWObjVersionTracker objv_tracker;
+  const auto ret = driver->write_topic(topic, &objv_tracker, y, dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
+                      << dendl;
+  }
+
+  return ret;
+}
+
 int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
                             const std::string& name,
                             const rgw_pubsub_dest& dest, const std::string& arn,
@@ -701,6 +802,16 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
                             const rgw_user& user,
                             const std::string& policy_text,
                             optional_yield y) const {
+  if (use_notification_v2) {
+    rgw_pubsub_topic new_topic;
+    new_topic.user = user;
+    new_topic.name = name;
+    new_topic.dest = dest;
+    new_topic.arn = arn;
+    new_topic.opaque_data = opaque_data;
+    new_topic.policy_text = policy_text;
+    return create_topic(dpp, new_topic, y);
+  }
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_topics topics;
 
@@ -728,8 +839,34 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
   return 0;
 }
 
+int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
+                               const std::string& name,
+                               optional_yield y) const {
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_topic topic;
+  int ret = get_topic(dpp, name, topic, y);
+  if (ret < 0 && ret != -ENOENT) {
+    return ret;
+  } else if (ret == -ENOENT) {
+    // its not an error if no topics exist, just a no-op
+    ldpp_dout(dpp, 10) << "WARNING: topic name:" << name
+                       << " does not exist, deletion is a no-op: ret=" << ret
+                       << dendl;
+    return 0;
+  }
+  ret = driver->remove_topic(name, tenant, &objv_tracker, y, dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
+                      << dendl;
+  }
+  return ret;
+}
+
 int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const
 {
+  if (use_notification_v2) {
+    return remove_topic_v2(dpp, name, y);
+  }
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_topics topics;
 
@@ -753,4 +890,3 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
 
   return 0;
 }
-
index ddc72f99b076a062dd8599c2bddeb0b18d065f9d..4afc101c63ccc36f4e03b77772bd0ee640d2c5b2 100644 (file)
@@ -389,6 +389,7 @@ struct rgw_pubsub_dest {
   void dump(Formatter *f) const;
   void dump_xml(Formatter *f) const;
   std::string to_json_str() const;
+  void decode_json(JSONObj* obj);
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_dest)
 
@@ -435,6 +436,7 @@ struct rgw_pubsub_topic {
   void dump(Formatter *f) const;
   void dump_xml(Formatter *f) const;
   void dump_xml_as_attributes(Formatter *f) const;
+  void decode_json(JSONObj* obj);
 
   bool operator<(const rgw_pubsub_topic& t) const {
     return to_str().compare(t.to_str());
@@ -558,6 +560,8 @@ class RGWPubSub
 
   rgw::sal::Driver* const driver;
   const std::string tenant;
+  const std::map<std::string, RGWZoneGroup>* zonegroups = nullptr;
+  bool use_notification_v2 = false;
 
   int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, 
       RGWObjVersionTracker* objv_tracker, optional_yield y) const;
@@ -567,6 +571,10 @@ class RGWPubSub
 public:
   RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
 
+ RGWPubSub(rgw::sal::Driver* _driver,
+           const std::string& _tenant,
+           const std::map<std::string, RGWZoneGroup>* zonegroups);
+
   class Bucket {
     friend class RGWPubSub;
     const RGWPubSub& ps;
@@ -639,6 +647,18 @@ public:
   // if the topic does not exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
   int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
+  // remove a topic according to its name
+  // if the topic does not exists it is a no-op (considered success)
+  // return 0 on success, error code otherwise
+  int remove_topic_v2(const DoutPrefixProvider* dpp,
+                      const std::string& name,
+                      optional_yield y) const;
+  // create a topic with a name only
+  // if the topic already exists it is a no-op (considered success)
+  // return 0 on success, error code otherwise
+  int create_topic(const DoutPrefixProvider* dpp,
+                   const rgw_pubsub_topic& topic,
+                   optional_yield y) const;
 };
 
 namespace rgw::notify {
@@ -648,4 +668,7 @@ namespace rgw::notify {
   constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX;
   // Used in case the topic is using the default global value for dumping in a formatter
   constexpr static const std::string_view DEFAULT_CONFIG{"None"};
-}
\ No newline at end of file
+}
+
+bool do_all_zonegroups_support_notification_v2(
+    std::map<std::string, RGWZoneGroup> zonegroups);
index 191f535d82bd48c31792fd0b650ff7b196a77be4..7396fcfddd4197cd9624fa0c2647781ead45cbbf 100644 (file)
@@ -17,6 +17,7 @@
 #include "services/svc_zone.h"
 #include "common/dout.h"
 #include "rgw_url.h"
+#include "rgw_process_env.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
@@ -138,21 +139,13 @@ class RGWPSCreateTopicOp : public RGWOp {
       return -EINVAL;
     }
 
-    // Remove the args that are parsed, so the push_endpoint_args only contains
-    // necessary one's.
     opaque_data = s->info.args.get("OpaqueData");
-    s->info.args.remove("OpaqueData");
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
-    s->info.args.remove("push-endpoint");
     s->info.args.get_bool("persistent", &dest.persistent, false);
-    s->info.args.remove("persistent");
     s->info.args.get_int("time_to_live", reinterpret_cast<int *>(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE);
-    s->info.args.remove("time_to_live");
     s->info.args.get_int("max_retries", reinterpret_cast<int *>(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE);
-    s->info.args.remove("max_retries");
     s->info.args.get_int("retry_sleep_duration", reinterpret_cast<int *>(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE);
-    s->info.args.remove("retry_sleep_duration");
 
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
       return -EINVAL;
@@ -162,8 +155,19 @@ class RGWPSCreateTopicOp : public RGWOp {
     if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) {
       return -ERR_MALFORMED_DOC;
     }
-    s->info.args.remove("Policy");
 
+    // Remove the args that are parsed, so the push_endpoint_args only contains
+    // necessary one's which is parsed after this if. but only if master zone,
+    // else we do not remove as request is forwarded to master.
+    if (driver->is_meta_master()) {
+      s->info.args.remove("OpaqueData");
+      s->info.args.remove("push-endpoint");
+      s->info.args.remove("persistent");
+      s->info.args.remove("time_to_live");
+      s->info.args.remove("max_retries");
+      s->info.args.remove("retry_sleep_duration");
+      s->info.args.remove("Policy");
+    }
     for (const auto& param : s->info.args.get_params()) {
       if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
         continue;
@@ -193,7 +197,8 @@ class RGWPSCreateTopicOp : public RGWOp {
       return ret;
     }
 
-    const RGWPubSub ps(driver, s->owner.id.tenant);
+    const RGWPubSub ps(driver, s->owner.id.tenant,
+                       &s->penv.site->get_period()->get_map().zonegroups);
     rgw_pubsub_topic result;
     ret = ps.get_topic(this, topic_name, result, y);
     if (ret == -ENOENT) {
@@ -252,6 +257,18 @@ class RGWPSCreateTopicOp : public RGWOp {
 };
 
 void RGWPSCreateTopicOp::execute(optional_yield y) {
+  // master request will replicate the topic creation.
+  bufferlist indata;
+  if (!driver->is_meta_master()) {
+    op_ret = rgw_forward_request_to_master(
+        this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1)
+          << "CreateTopic forward_request_to_master returned ret = " << op_ret
+          << dendl;
+      return;
+    }
+  }
   if (!dest.push_endpoint.empty() && dest.persistent) {
     op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
     if (op_ret < 0) {
@@ -261,7 +278,8 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
       return;
     }
   }
-  const RGWPubSub ps(driver, s->owner.id.tenant);
+  const RGWPubSub ps(driver, s->owner.id.tenant,
+                     &s->penv.site->get_period()->get_map().zonegroups);
   op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
                            s->owner.id, policy_text, y);
   if (op_ret < 0) {
@@ -316,7 +334,8 @@ public:
 };
 
 void RGWPSListTopicsOp::execute(optional_yield y) {
-  const RGWPubSub ps(driver, s->owner.id.tenant);
+  const RGWPubSub ps(driver, s->owner.id.tenant,
+                     &s->penv.site->get_period()->get_map().zonegroups);
   op_ret = ps.get_topics(this, result, y);
   // if there are no topics it is not considered an error
   op_ret = op_ret == -ENOENT ? 0 : op_ret;
@@ -403,7 +422,8 @@ void RGWPSGetTopicOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  const RGWPubSub ps(driver, s->owner.id.tenant);
+  const RGWPubSub ps(driver, s->owner.id.tenant,
+                     &s->penv.site->get_period()->get_map().zonegroups);
   op_ret = ps.get_topic(this, topic_name, result, y);
   if (op_ret < 0) {
     ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
@@ -487,7 +507,8 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  const RGWPubSub ps(driver, s->owner.id.tenant);
+  const RGWPubSub ps(driver, s->owner.id.tenant,
+                     &s->penv.site->get_period()->get_map().zonegroups);
   op_ret = ps.get_topic(this, topic_name, result, y);
   if (op_ret < 0) {
     ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
@@ -615,7 +636,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
       return ret;
     }
     rgw_pubsub_topic result;
-    const RGWPubSub ps(driver, s->owner.id.tenant);
+    const RGWPubSub ps(driver, s->owner.id.tenant,
+                       &s->penv.site->get_period()->get_map().zonegroups);
     ret = ps.get_topic(this, topic_name, result, y);
     if (ret < 0) {
       ldpp_dout(this, 1) << "failed to get topic '" << topic_name
@@ -664,6 +686,17 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
 };
 
 void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
+  if (!driver->is_meta_master()) {
+    bufferlist indata;
+    op_ret = rgw_forward_request_to_master(
+        this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1)
+          << "SetTopicAttributes forward_request_to_master returned ret = "
+          << op_ret << dendl;
+      return;
+    }
+  }
   if (!dest.push_endpoint.empty() && dest.persistent) {
     op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
     if (op_ret < 0) {
@@ -682,7 +715,8 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
       return;
     }
   }
-  const RGWPubSub ps(driver, s->owner.id.tenant);
+  const RGWPubSub ps(driver, s->owner.id.tenant,
+                     &s->penv.site->get_period()->get_map().zonegroups);
   op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
                            topic_owner, policy_text, y);
   if (op_ret < 0) {
@@ -752,7 +786,20 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
   if (op_ret < 0) {
     return;
   }
-  const RGWPubSub ps(driver, s->owner.id.tenant);
+  if (!driver->is_meta_master()) {
+    bufferlist indata;
+    op_ret = rgw_forward_request_to_master(
+        this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1)
+          << "DeleteTopic forward_request_to_master returned ret = " << op_ret
+          << dendl;
+      return;
+    }
+  }
+  const RGWPubSub ps(driver, s->owner.id.tenant,
+                     &s->penv.site->get_period()->get_map().zonegroups);
+
   rgw_pubsub_topic result;
   op_ret = ps.get_topic(this, topic_name, result, y);
   if (op_ret == 0) {
@@ -809,6 +856,13 @@ bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s)
   }
   return false;
 }
+bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_info& info) {
+  if (info.args.exists("Action")) {
+    const std::string action_name = info.args.get("Action");
+    return op_generators.contains(action_name);
+  }
+  return false;
+}
 
 RGWOp *RGWHandler_REST_PSTopic_AWS::op_post()
 {
index efeed8f933d622d61346e0351ce59c38f0b034a4..4b7663caf8975e5c998e8dbaeba2844d235a3d22 100644 (file)
@@ -5204,8 +5204,12 @@ void parse_post_action(const std::string& post_body, req_state* s)
       }
     }
   }
-  const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
-  s->info.args.append("PayloadHash", payload_hash);
+  // PayloadHash is present if request is fwd from secondary site in multisite
+  // environment, so then do not calculate and append.
+  if (!s->info.args.exists("PayloadHash")) {
+    const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
+    s->info.args.append("PayloadHash", payload_hash);
+  }
 }
 
 RGWHandler_REST* RGWRESTMgr_S3::get_handler(rgw::sal::Driver* driver,
@@ -5627,7 +5631,9 @@ AWSSignerV4::prepare(const DoutPrefixProvider *dpp,
 
   const char* exp_payload_hash = nullptr;
   string payload_hash;
-  if (is_non_s3_op) {
+  // if the request is related to topics (bucket notification), they are part of
+  // sns service and hence it's a no_s3_op,
+  if (is_non_s3_op || RGWHandler_REST_PSTopic_AWS::action_exists(info)) {
     //For non s3 ops, we need to calculate the payload hash
     payload_hash = info.args.get("PayloadHash");
     exp_payload_hash = payload_hash.c_str();
index eb4d0c348fadc043c56a39cbaf69fe37618ff39d..89ac23341d23daec9f90762fb2acb606353dbcab 100644 (file)
@@ -44,7 +44,7 @@ class RGWCompressionInfo;
 struct rgw_pubsub_topics;
 struct rgw_pubsub_bucket_topics;
 class RGWZonePlacementInfo;
-
+struct rgw_pubsub_topic;
 
 using RGWBucketListNameFilter = std::function<bool (const std::string&)>;
 
@@ -314,6 +314,24 @@ class Driver {
     /** Remove the topic config, optionally a specific version */
     virtual int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
         optional_yield y,const DoutPrefixProvider *dpp) = 0;
+    /** Read the topic config entry into data and (optionally) objv_tracker */
+    virtual int read_topic_v2(const std::string& topic_name,
+                              const std::string& tenant,
+                              rgw_pubsub_topic& topic,
+                              RGWObjVersionTracker* objv_tracker,
+                              optional_yield y,
+                              const DoutPrefixProvider* dpp) = 0;
+    /** Write topic info and (optionally) @a objv_tracker into the config */
+    virtual int write_topic_v2(const rgw_pubsub_topic& topic,
+                               RGWObjVersionTracker* objv_tracker,
+                               optional_yield y,
+                               const DoutPrefixProvider* dpp) = 0;
+    /** Remove the topic config, optionally a specific version */
+    virtual int remove_topic_v2(const std::string& topic_name,
+                                const std::string& tenant,
+                                RGWObjVersionTracker* objv_tracker,
+                                optional_yield y,
+                                const DoutPrefixProvider* dpp) = 0;
     /** Get access to the lifecycle management thread */
     virtual RGWLC* get_rgwlc(void) = 0;
     /** Get access to the coroutine registry.  Used to create new coroutine managers */
@@ -1449,6 +1467,8 @@ public:
   virtual int list_zones(std::list<std::string>& zone_ids) = 0;
   /** Clone a copy of this zonegroup. */
   virtual std::unique_ptr<ZoneGroup> clone() = 0;
+  /** Determine if zonegroup |feature| is supported.*/
+  virtual bool supports_feature(std::string_view feature) const = 0;
 };
 
 /**
index 3c0c7c765198bdcaf31b944068c4f3c341060b9d..140b28396626d906961979dfe222bbc55b02151e 100644 (file)
@@ -259,6 +259,9 @@ protected:
       std::unique_ptr<RGWZoneGroup>zg = std::make_unique<RGWZoneGroup>(*group.get());
       return std::make_unique<DBZoneGroup>(store, std::move(zg));
     }
+    virtual bool supports_feature(std::string_view feature) const override {
+      return group->supports(feature);
+    }
   };
 
   class DBZone : public StoreZone {
index b5c4c4dfc681394492d9b09632bc5df16704c0c3..71991378e32143bd4dd8b69831770eb7a93c5f17 100644 (file)
@@ -75,6 +75,9 @@ public:
     std::unique_ptr<ZoneGroup> nzg = next->clone();
     return std::make_unique<FilterZoneGroup>(std::move(nzg));
   }
+  virtual bool supports_feature(std::string_view feature) const override {
+    return next->supports_feature(feature);
+  }
 };
 
 class FilterZone : public Zone {
@@ -194,6 +197,27 @@ public:
       optional_yield y, const DoutPrefixProvider *dpp) override {
     return next->remove_topics(tenant, objv_tracker, y, dpp);
   }
+  int read_topic_v2(const std::string& topic_name,
+                    const std::string& tenant,
+                    rgw_pubsub_topic& topic,
+                    RGWObjVersionTracker* objv_tracker,
+                    optional_yield y,
+                    const DoutPrefixProvider* dpp) override {
+    return next->read_topic_v2(topic_name, tenant, topic, objv_tracker, y, dpp);
+  }
+  int write_topic_v2(const rgw_pubsub_topic& topic,
+                     RGWObjVersionTracker* objv_tracker,
+                     optional_yield y,
+                     const DoutPrefixProvider* dpp) override {
+    return next->write_topic_v2(topic, objv_tracker, y, dpp);
+  }
+  int remove_topic_v2(const std::string& topic_name,
+                      const std::string& tenant,
+                      RGWObjVersionTracker* objv_tracker,
+                      optional_yield y,
+                      const DoutPrefixProvider* dpp) override {
+    return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
+  }
 
   virtual RGWLC* get_rgwlc(void) override;
   virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
index 7c35258dd5dc149d16b8556ba17e4ce3d9a5ba1b..eda0f08ede6f0122d590309681858cb67c461fcf 100644 (file)
@@ -34,6 +34,27 @@ class StoreDriver : public Driver {
        optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;}
     int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
         optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;}
+    int read_topic_v2(const std::string& topic_name,
+                      const std::string& tenant,
+                      rgw_pubsub_topic& topic,
+                      RGWObjVersionTracker* objv_tracker,
+                      optional_yield y,
+                      const DoutPrefixProvider* dpp) override {
+      return -EOPNOTSUPP;
+    }
+    int write_topic_v2(const rgw_pubsub_topic& topic,
+                       RGWObjVersionTracker* objv_tracker,
+                       optional_yield y,
+                       const DoutPrefixProvider* dpp) override {
+      return -ENOENT;
+    }
+    int remove_topic_v2(const std::string& topic_name,
+                        const std::string& tenant,
+                        RGWObjVersionTracker* objv_tracker,
+                        optional_yield y,
+                        const DoutPrefixProvider* dpp) override {
+      return -ENOENT;
+    }
 };
 
 class StoreUser : public User {
index aeb58e2f48fe513e55f8c21057733de4791f936e..ed438dead9307d1daab58e5fd3c21c830e5d7640 100644 (file)
@@ -296,12 +296,12 @@ void RGWZoneParams::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("user_swift_pool", user_swift_pool, obj);
   JSONDecoder::decode_json("user_uid_pool", user_uid_pool, obj);
   JSONDecoder::decode_json("otp_pool", otp_pool, obj);
+  JSONDecoder::decode_json("notif_pool", notif_pool, obj);
+  JSONDecoder::decode_json("topics_pool", topics_pool, obj);
   JSONDecoder::decode_json("system_key", system_key, obj);
   JSONDecoder::decode_json("placement_pools", placement_pools, obj);
   JSONDecoder::decode_json("tier_config", tier_config, obj);
   JSONDecoder::decode_json("realm_id", realm_id, obj);
-  JSONDecoder::decode_json("notif_pool", notif_pool, obj);
-
 }
 
 void RGWZoneParams::dump(Formatter *f) const
@@ -321,11 +321,12 @@ void RGWZoneParams::dump(Formatter *f) const
   encode_json("user_swift_pool", user_swift_pool, f);
   encode_json("user_uid_pool", user_uid_pool, f);
   encode_json("otp_pool", otp_pool, f);
+  encode_json("notif_pool", notif_pool, f);
+  encode_json("topics_pool", topics_pool, f);
   encode_json_plain("system_key", system_key, f);
   encode_json("placement_pools", placement_pools, f);
   encode_json("tier_config", tier_config, f);
   encode_json("realm_id", realm_id, f);
-  encode_json("notif_pool", notif_pool, f);
 }
 
 int RGWZoneParams::init(const DoutPrefixProvider *dpp, 
@@ -480,6 +481,7 @@ void add_zone_pools(const RGWZoneParams& info,
   pools.insert(info.reshard_pool);
   pools.insert(info.oidc_pool);
   pools.insert(info.notif_pool);
+  pools.insert(info.topics_pool);
 
   for (const auto& [pname, placement] : info.placement_pools) {
     pools.insert(placement.index_pool);
@@ -584,6 +586,7 @@ int RGWZoneParams::fix_pool_names(const DoutPrefixProvider *dpp, optional_yield
   otp_pool = fix_zone_pool_dup(pools, name, ".rgw.otp", otp_pool);
   oidc_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:oidc", oidc_pool);
   notif_pool = fix_zone_pool_dup(pools, name ,".rgw.log:notif", notif_pool);
+  topics_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:topics", topics_pool);
 
   for(auto& iter : placement_pools) {
     iter.second.index_pool = fix_zone_pool_dup(pools, name, "." + default_bucket_index_pool_suffix,
@@ -1245,6 +1248,8 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y,
   info.otp_pool = fix_zone_pool_dup(pools, info.name, ".rgw.otp", info.otp_pool);
   info.oidc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:oidc", info.oidc_pool);
   info.notif_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:notif", info.notif_pool);
+  info.topics_pool =
+      fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool);
 
   for (auto& [pname, placement] : info.placement_pools) {
     placement.index_pool = fix_zone_pool_dup(pools, info.name, "." + default_bucket_index_pool_suffix, placement.index_pool);
index 5e1a435d488e49c98bf5f2f29e1f62bcd043eeaf..600460735a8435f3b4c5d7098500a46b9c414443 100644 (file)
@@ -15,11 +15,13 @@ namespace rgw::zone_features {
 // zone feature names
 inline constexpr std::string_view resharding = "resharding";
 inline constexpr std::string_view compress_encrypted = "compress-encrypted";
+inline constexpr std::string_view notification_v2 = "notification_v2";
 
 // static list of features supported by this release
 inline constexpr std::initializer_list<std::string_view> supported = {
-  resharding,
-  compress_encrypted,
+    resharding,
+    compress_encrypted,
+    notification_v2,
 };
 
 inline constexpr bool supports(std::string_view feature) {
@@ -33,7 +35,8 @@ inline constexpr bool supports(std::string_view feature) {
 
 // static list of features enabled by default on new zonegroups
 inline constexpr std::initializer_list<std::string_view> enabled = {
-  resharding,
+    resharding,
+    notification_v2,
 };
 
 
diff --git a/src/rgw/services/svc_topic_rados.cc b/src/rgw/services/svc_topic_rados.cc
new file mode 100644 (file)
index 0000000..64c9106
--- /dev/null
@@ -0,0 +1,207 @@
+#include "svc_topic_rados.h"
+#include "rgw_notify.h"
+#include "rgw_tools.h"
+#include "rgw_zone.h"
+#include "svc_meta.h"
+#include "svc_meta_be_sobj.h"
+#include "svc_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static std::string topic_oid_prefix = "topic.";
+static constexpr char topic_tenant_delim[] = ":";
+
+std::string get_topic_key(const std::string& topic_name,
+                          const std::string& tenant) {
+  if (tenant.empty()) {
+    return topic_name;
+  }
+  return tenant + topic_tenant_delim + topic_name;
+}
+
+void parse_topic_entry(const std::string& topic_entry,
+                       std::string* tenant_name,
+                       std::string* topic_name) {
+  // expected format: [tenant_name:]topic_name*
+  auto pos = topic_entry.find(topic_tenant_delim);
+  if (pos != std::string::npos) {
+    *tenant_name = topic_entry.substr(0, pos);
+    *topic_name = topic_entry.substr(pos + 1);
+  } else {
+    tenant_name->clear();
+    *topic_name = topic_entry;
+  }
+}
+class RGWSI_Topic_Module : public RGWSI_MBSObj_Handler_Module {
+  RGWSI_Topic_RADOS::Svc& svc;
+  const std::string prefix;
+
+ public:
+  RGWSI_Topic_Module(RGWSI_Topic_RADOS::Svc& _svc)
+      : RGWSI_MBSObj_Handler_Module("topic"),
+        svc(_svc),
+        prefix(topic_oid_prefix) {}
+
+  void get_pool_and_oid(const std::string& key,
+                        rgw_pool* pool,
+                        std::string* oid) override {
+    if (pool) {
+      *pool = svc.zone->get_zone_params().topics_pool;
+    }
+
+    if (oid) {
+      *oid = key_to_oid(key);
+    }
+  }
+
+  bool is_valid_oid(const std::string& oid) override {
+    return boost::algorithm::starts_with(oid, prefix);
+  }
+
+  std::string key_to_oid(const std::string& key) override {
+    return prefix + key;
+  }
+
+  // This is called after `is_valid_oid` and is assumed to be a valid oid
+  std::string oid_to_key(const std::string& oid) override {
+    return oid.substr(prefix.size());
+  }
+
+  const std::string& get_oid_prefix() { return prefix; }
+};
+
+RGWSI_MetaBackend_Handler* RGWSI_Topic_RADOS::get_be_handler() {
+  return be_handler;
+}
+
+void RGWSI_Topic_RADOS::init(RGWSI_Zone* _zone_svc,
+                             RGWSI_Meta* _meta_svc,
+                             RGWSI_MetaBackend* _meta_be_svc,
+                             RGWSI_SysObj* _sysobj_svc) {
+  svc.zone = _zone_svc;
+  svc.meta = _meta_svc;
+  svc.meta_be = _meta_be_svc;
+  svc.sysobj = _sysobj_svc;
+}
+
+int RGWSI_Topic_RADOS::do_start(optional_yield y,
+                                const DoutPrefixProvider* dpp) {
+  int r = svc.meta->create_be_handler(RGWSI_MetaBackend::Type::MDBE_SOBJ,
+                                      &be_handler);
+  if (r < 0) {
+    ldout(ctx(), 0) << "ERROR: failed to create be_handler for Topics: r=" << r
+                    << dendl;
+    return r;
+  }
+
+  auto module = new RGWSI_Topic_Module(svc);
+  RGWSI_MetaBackend_Handler_SObj* bh =
+      static_cast<RGWSI_MetaBackend_Handler_SObj*>(be_handler);
+  be_module.reset(module);
+  bh->set_module(module);
+  return 0;
+}
+
+RGWTopicMetadataHandler::RGWTopicMetadataHandler(rgw::sal::Driver* driver,
+                                                 RGWSI_Topic_RADOS* topic_svc) {
+  this->driver = driver;
+  this->topic_svc = topic_svc;
+  base_init(topic_svc->ctx(), topic_svc->get_be_handler());
+}
+
+RGWMetadataObject* RGWTopicMetadataHandler::get_meta_obj(
+    JSONObj* jo, const obj_version& objv, const ceph::real_time& mtime) {
+  rgw_pubsub_topic topic;
+  try {
+    topic.decode_json(jo);
+  } catch (JSONDecoder::err& e) {
+    return nullptr;
+  }
+
+  return new RGWTopicMetadataObject(topic, objv, mtime, driver);
+}
+
+int RGWTopicMetadataHandler::do_get(RGWSI_MetaBackend_Handler::Op* op,
+                                    std::string& entry, RGWMetadataObject** obj,
+                                    optional_yield y,
+                                    const DoutPrefixProvider* dpp) {
+  rgw_pubsub_topic result;
+  std::string topic_name;
+  std::string tenant;
+  parse_topic_entry(entry, &tenant, &topic_name);
+  RGWPubSub ps(driver, tenant,
+               &topic_svc->svc.zone->get_current_period().get_map().zonegroups);
+  int ret = ps.get_topic(dpp, topic_name, result, y);
+  if (ret < 0) {
+    return ret;
+  }
+  ceph::real_time mtime;
+  obj_version ver;
+  RGWTopicMetadataObject* rdo =
+      new RGWTopicMetadataObject(result, ver, mtime, driver);
+  *obj = rdo;
+  return 0;
+}
+
+int RGWTopicMetadataHandler::do_remove(RGWSI_MetaBackend_Handler::Op* op,
+                                       std::string& entry,
+                                       RGWObjVersionTracker& objv_tracker,
+                                       optional_yield y,
+                                       const DoutPrefixProvider* dpp) {
+  auto ret = rgw::notify::remove_persistent_topic(entry, y);
+  if (ret != -ENOENT && ret < 0) {
+    return ret;
+  }
+  std::string topic_name;
+  std::string tenant;
+  parse_topic_entry(entry, &tenant, &topic_name);
+  RGWPubSub ps(driver, tenant,
+               &topic_svc->svc.zone->get_current_period().get_map().zonegroups);
+  return ps.remove_topic(dpp, topic_name, y);
+}
+
+class RGWMetadataHandlerPut_Topic : public RGWMetadataHandlerPut_SObj {
+  RGWTopicMetadataHandler* rhandler;
+  RGWTopicMetadataObject* mdo;
+
+ public:
+  RGWMetadataHandlerPut_Topic(RGWTopicMetadataHandler* handler,
+                              RGWSI_MetaBackend_Handler::Op* op,
+                              std::string& entry, RGWMetadataObject* obj,
+                              RGWObjVersionTracker& objv_tracker,
+                              optional_yield y, RGWMDLogSyncType type,
+                              bool from_remote_zone)
+      : RGWMetadataHandlerPut_SObj(handler, op, entry, obj, objv_tracker, y,
+                                   type, from_remote_zone),
+        rhandler(handler) {
+    mdo = static_cast<RGWTopicMetadataObject*>(obj);
+  }
+
+  int put_checked(const DoutPrefixProvider* dpp) override {
+    auto& topic = mdo->get_topic_info();
+    auto* driver = mdo->get_driver();
+    auto ret = rgw::notify::add_persistent_topic(entry, y);
+    if (ret < 0) {
+      return ret;
+    }
+    RGWObjVersionTracker objv_tracker;
+    ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp);
+    if (ret < 0) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
+                        << dendl;
+    }
+    return ret;
+  }
+};
+
+int RGWTopicMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op* op,
+                                    std::string& entry, RGWMetadataObject* obj,
+                                    RGWObjVersionTracker& objv_tracker,
+                                    optional_yield y,
+                                    const DoutPrefixProvider* dpp,
+                                    RGWMDLogSyncType type,
+                                    bool from_remote_zone) {
+  RGWMetadataHandlerPut_Topic put_op(this, op, entry, obj, objv_tracker, y,
+                                     type, from_remote_zone);
+  return do_put_operate(&put_op, dpp);
+}
diff --git a/src/rgw/services/svc_topic_rados.h b/src/rgw/services/svc_topic_rados.h
new file mode 100644 (file)
index 0000000..e630a61
--- /dev/null
@@ -0,0 +1,96 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "rgw_pubsub.h"
+#include "rgw_service.h"
+#include "svc_meta_be.h"
+
+class RGWSI_Topic_RADOS : public RGWServiceInstance {
+ public:
+  struct Svc {
+    RGWSI_Zone* zone{nullptr};
+    RGWSI_Meta* meta{nullptr};
+    RGWSI_MetaBackend* meta_be{nullptr};
+    RGWSI_SysObj* sysobj{nullptr};
+  } svc;
+
+  RGWSI_Topic_RADOS(CephContext* cct) : RGWServiceInstance(cct) {}
+  ~RGWSI_Topic_RADOS() {}
+
+  void init(RGWSI_Zone* _zone_svc,
+            RGWSI_Meta* _meta_svc,
+            RGWSI_MetaBackend* _meta_be_svc,
+            RGWSI_SysObj* _sysobj_svc);
+
+  RGWSI_MetaBackend_Handler* get_be_handler();
+  int do_start(optional_yield y, const DoutPrefixProvider* dpp) override;
+
+ private:
+  RGWSI_MetaBackend_Handler* be_handler;
+  std::unique_ptr<RGWSI_MetaBackend::Module> be_module;
+};
+
+class RGWTopicMetadataObject : public RGWMetadataObject {
+  rgw_pubsub_topic topic;
+  rgw::sal::Driver* driver;
+
+ public:
+  RGWTopicMetadataObject() = default;
+  RGWTopicMetadataObject(rgw_pubsub_topic& topic, const obj_version& v,
+                         real_time m, rgw::sal::Driver* driver)
+      : RGWMetadataObject(v, m), topic(topic), driver(driver) {}
+
+  void dump(Formatter* f) const override { topic.dump(f); }
+
+  rgw_pubsub_topic& get_topic_info() { return topic; }
+
+  rgw::sal::Driver* get_driver() { return driver; }
+};
+class RGWTopicMetadataHandler : public RGWMetadataHandler_GenericMetaBE {
+ public:
+  RGWTopicMetadataHandler(rgw::sal::Driver* driver,
+                          RGWSI_Topic_RADOS* role_svc);
+
+  std::string get_type() final { return "topic"; }
+
+  RGWMetadataObject* get_meta_obj(JSONObj* jo, const obj_version& objv,
+                                  const ceph::real_time& mtime);
+
+  int do_get(RGWSI_MetaBackend_Handler::Op* op, std::string& entry,
+             RGWMetadataObject** obj, optional_yield y,
+             const DoutPrefixProvider* dpp) final;
+
+  int do_remove(RGWSI_MetaBackend_Handler::Op* op, std::string& entry,
+                RGWObjVersionTracker& objv_tracker, optional_yield y,
+                const DoutPrefixProvider* dpp) final;
+
+  int do_put(RGWSI_MetaBackend_Handler::Op* op, std::string& entr,
+             RGWMetadataObject* obj, RGWObjVersionTracker& objv_tracker,
+             optional_yield y, const DoutPrefixProvider* dpp,
+             RGWMDLogSyncType type, bool from_remote_zone) override;
+
+ private:
+  rgw::sal::Driver* driver;
+  RGWSI_Topic_RADOS* topic_svc;
+};
+
+std::string get_topic_key(const std::string& topic_name,
+                          const std::string& tenant);
+
+void parse_topic_entry(const std::string& topic_entry,
+                       std::string* tenant_name,
+                       std::string* topic_name);