From: kchheda3 Date: Mon, 27 Nov 2023 20:51:02 +0000 (-0500) Subject: rgw/multisite-notification: Add support to replicate topic creation in multisite... X-Git-Tag: testing/wip-batrick-testing-20240411.154038~266^2~20 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c4a147d079cb9f268340e5f79ec5350a94296658;p=ceph-ci.git rgw/multisite-notification: Add support to replicate topic creation in multisite config Signed-off-by: kchheda3 --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 3c2f1423808..2987b70b382 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -53,6 +53,7 @@ set(librgw_common_srcs services/svc_sys_obj_cache.cc services/svc_sys_obj_core.cc services/svc_tier_rados.cc + services/svc_topic_rados.cc services/svc_user.cc services/svc_user_rados.cc services/svc_zone.cc diff --git a/src/rgw/driver/daos/rgw_sal_daos.h b/src/rgw/driver/daos/rgw_sal_daos.h index c5cfefc222d..479f5b84a5e 100644 --- a/src/rgw/driver/daos/rgw_sal_daos.h +++ b/src/rgw/driver/daos/rgw_sal_daos.h @@ -430,6 +430,9 @@ class DaosZoneGroup : public StoreZoneGroup { return std::make_unique(store, group); } const RGWZoneGroup& get_group() { return group; } + virtual bool supports_feature(std::string_view feature) const override { + return group.supports(feature); + } }; class DaosZone : public StoreZone { diff --git a/src/rgw/driver/motr/rgw_sal_motr.h b/src/rgw/driver/motr/rgw_sal_motr.h index 3cc3b37fa9a..9ff14a58307 100644 --- a/src/rgw/driver/motr/rgw_sal_motr.h +++ b/src/rgw/driver/motr/rgw_sal_motr.h @@ -474,6 +474,9 @@ public: virtual std::unique_ptr clone() override { return std::make_unique(store, group); } + virtual bool supports_feature(std::string_view feature) const override { + return group.supports(feature); + } friend class MotrZone; }; diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 9baed182ed9..4a2cf827165 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -14,6 +14,7 @@ #include "rgw_sal_rados.h" #include "rgw_pubsub.h" #include "rgw_pubsub_push.h" +#include "rgw_zone_features.h" #include "rgw_perf_counters.h" #include "common/dout.h" #include diff --git a/src/rgw/driver/rados/rgw_rest_pubsub.h b/src/rgw/driver/rados/rgw_rest_pubsub.h index 27bde7a95d5..8b37992b918 100644 --- a/src/rgw/driver/rados/rgw_rest_pubsub.h +++ b/src/rgw/driver/rados/rgw_rest_pubsub.h @@ -34,5 +34,6 @@ public: int postauth_init(optional_yield) override { return 0; } int authorize(const DoutPrefixProvider* dpp, optional_yield y) override; static bool action_exists(const req_state* s); + static bool action_exists(const req_info& info); }; diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index e04e25842e2..608800c7e57 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -62,6 +62,7 @@ #include "services/svc_role_rados.h" #include "services/svc_user.h" #include "services/svc_sys_obj_cache.h" +#include "services/svc_topic_rados.h" #include "cls/rgw/cls_rgw_client.h" #include "rgw_pubsub.h" @@ -1111,6 +1112,66 @@ int RadosStore::remove_topics(const std::string& tenant, RGWObjVersionTracker* o objv_tracker, y); } +int RadosStore::read_topic_v2(const std::string& topic_name, + const std::string& tenant, + rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) { + bufferlist bl; + auto mtime = ceph::real_clock::zero(); + RGWSI_MBSObj_GetParams params(&bl, nullptr, &mtime); + std::unique_ptr ctx( + svc()->topic->svc.meta_be->alloc_ctx()); + ctx->init(svc()->topic->get_be_handler()); + const int ret = svc()->topic->svc.meta_be->get( + ctx.get(), get_topic_key(topic_name, tenant), params, objv_tracker, y, + dpp); + if (ret < 0) { + return ret; + } + + auto iter = bl.cbegin(); + try { + decode(topic, iter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 20) << " failed to decode topic: " << topic_name + << ". error: " << err.what() << dendl; + return -EIO; + } + return 0; +} + +int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) { + bufferlist bl; + encode(topic, bl); + RGWSI_MBSObj_PutParams params(bl, nullptr, ceph::real_clock::zero(), + /*exclusive*/ false); + std::unique_ptr ctx( + svc()->topic->svc.meta_be->alloc_ctx()); + ctx->init(svc()->topic->get_be_handler()); + return svc()->topic->svc.meta_be->put( + ctx.get(), get_topic_key(topic.name, topic.user.tenant), params, + objv_tracker, y, dpp); +} + +int RadosStore::remove_topic_v2(const std::string& topic_name, + const std::string& tenant, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) { + RGWSI_MBSObj_RemoveParams params; + std::unique_ptr ctx( + svc()->topic->svc.meta_be->alloc_ctx()); + ctx->init(svc()->topic->get_be_handler()); + return svc()->topic->svc.meta_be->remove(ctx.get(), + get_topic_key(topic_name, tenant), + params, objv_tracker, y, dpp); +} + int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y) { return rados->delete_raw_obj(dpp, obj, y); diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 39a554f15c9..33db603d785 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -85,6 +85,9 @@ public: virtual std::unique_ptr clone() override { return std::make_unique(store, group); } + virtual bool supports_feature(std::string_view feature) const override { + return group.supports(feature); + } const RGWZoneGroup& get_group() const { return group; } }; @@ -166,6 +169,21 @@ class RadosStore : public StoreDriver { optional_yield y, const DoutPrefixProvider *dpp) override; int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override; + int read_topic_v2(const std::string& topic_name, + const std::string& tenant, + rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override; + int write_topic_v2(const rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override; + int remove_topic_v2(const std::string& topic_name, + const std::string& tenant, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } diff --git a/src/rgw/driver/rados/rgw_service.cc b/src/rgw/driver/rados/rgw_service.cc index 4be0738bae2..1f05495fb3d 100644 --- a/src/rgw/driver/rados/rgw_service.cc +++ b/src/rgw/driver/rados/rgw_service.cc @@ -26,6 +26,7 @@ #include "services/svc_sys_obj_core.h" #include "services/svc_user_rados.h" #include "services/svc_role_rados.h" +#include "services/svc_topic_rados.h" #include "common/errno.h" @@ -37,6 +38,7 @@ #include "rgw_sal_rados.h" #include "rgw_user.h" #include "rgw_role.h" +#include "rgw_pubsub.h" #define dout_subsys ceph_subsys_rgw @@ -80,6 +82,7 @@ int RGWServices_Def::init(CephContext *cct, role_rados = std::make_unique(cct); async_processor = std::make_unique( cct, cct->_conf->rgw_num_async_rados_threads); + topic_rados = std::make_unique(cct); if (have_cache) { sysobj_cache = std::make_unique(dpp, cct); @@ -124,7 +127,7 @@ int RGWServices_Def::init(CephContext *cct, user_rados->init(driver->getRados()->get_rados_handle(), zone.get(), sysobj.get(), sysobj_cache.get(), meta.get(), meta_be_sobj.get(), sync_modules.get()); role_rados->init(zone.get(), meta.get(), meta_be_sobj.get(), sysobj.get()); - + topic_rados->init(zone.get(), meta.get(), meta_be_sobj.get(), sysobj.get()); can_shutdown = true; int r = finisher->start(y, dpp); @@ -255,7 +258,12 @@ int RGWServices_Def::init(CephContext *cct, ldout(cct, 0) << "ERROR: failed to start role_rados service (" << cpp_strerror(-r) << dendl; return r; } - + r = topic_rados->start(y, dpp); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to start topic_rados service (" + << cpp_strerror(-r) << dendl; + return r; + } } /* cache or core services will be started by sysobj */ @@ -273,6 +281,7 @@ void RGWServices_Def::shutdown() return; } + topic_rados->shutdown(); role_rados->shutdown(); datalog_rados.reset(); user_rados->shutdown(); @@ -342,6 +351,7 @@ int RGWServices::do_init(CephContext *_cct, rgw::sal::RadosStore* driver, bool h user = _svc.user_rados.get(); role = _svc.role_rados.get(); async_processor = _svc.async_processor.get(); + topic = _svc.topic_rados.get(); return 0; } @@ -390,6 +400,7 @@ int RGWCtlDef::init(RGWServices& svc, rgw::sal::Driver* driver, const DoutPrefix meta.otp.reset(RGWOTPMetaHandlerAllocator::alloc()); meta.role = std::make_unique(driver, svc.role); + meta.topic = std::make_unique(driver, svc.topic); user.reset(new RGWUserCtl(svc.zone, svc.user, (RGWUserMetadataHandler *)meta.user.get())); bucket.reset(new RGWBucketCtl(svc.zone, @@ -436,6 +447,7 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, const DoutPrefixPr meta.bucket_instance = _ctl.meta.bucket_instance.get(); meta.otp = _ctl.meta.otp.get(); meta.role = _ctl.meta.role.get(); + meta.topic = _ctl.meta.topic.get(); user = _ctl.user.get(); bucket = _ctl.bucket.get(); @@ -470,6 +482,12 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, const DoutPrefixPr ldout(cct, 0) << "ERROR: failed to start init otp ctl (" << cpp_strerror(-r) << dendl; return r; } + r = meta.topic->attach(meta.mgr); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to start init topic ctl (" + << cpp_strerror(-r) << dendl; + return r; + } return 0; } diff --git a/src/rgw/driver/rados/rgw_service.h b/src/rgw/driver/rados/rgw_service.h index 9996b42e251..08873e6058e 100644 --- a/src/rgw/driver/rados/rgw_service.h +++ b/src/rgw/driver/rados/rgw_service.h @@ -78,6 +78,7 @@ class RGWSI_User_RADOS; class RGWDataChangesLog; class RGWSI_Role_RADOS; class RGWAsyncRadosProcessor; +class RGWSI_Topic_RADOS; struct RGWServices_Def { @@ -109,6 +110,7 @@ struct RGWServices_Def std::unique_ptr role_rados; std::unique_ptr async_processor; + std::unique_ptr topic_rados; RGWServices_Def(); ~RGWServices_Def(); @@ -153,6 +155,7 @@ struct RGWServices RGWSI_User *user{nullptr}; RGWSI_Role_RADOS *role{nullptr}; RGWAsyncRadosProcessor* async_processor; + RGWSI_Topic_RADOS* topic{nullptr}; int do_init(CephContext *cct, rgw::sal::RadosStore* store, bool have_cache, bool raw_storage, bool run_sync, optional_yield y, @@ -187,6 +190,7 @@ struct RGWCtlDef { std::unique_ptr user; std::unique_ptr otp; std::unique_ptr role; + std::unique_ptr topic; _meta(); ~_meta(); @@ -216,6 +220,7 @@ struct RGWCtl { RGWMetadataHandler *user{nullptr}; RGWMetadataHandler *otp{nullptr}; RGWMetadataHandler *role{nullptr}; + RGWMetadataHandler* topic{nullptr}; } meta; RGWUserCtl *user{nullptr}; diff --git a/src/rgw/driver/rados/rgw_sync.cc b/src/rgw/driver/rados/rgw_sync.cc index c5ea9f99ec5..a9ea2ecf549 100644 --- a/src/rgw/driver/rados/rgw_sync.cc +++ b/src/rgw/driver/rados/rgw_sync.cc @@ -886,6 +886,7 @@ public: append_section_from_set(all_sections, "bucket.instance"); append_section_from_set(all_sections, "bucket"); append_section_from_set(all_sections, "roles"); + append_section_from_set(all_sections, "topic"); std::move(all_sections.begin(), all_sections.end(), std::back_inserter(sections)); diff --git a/src/rgw/driver/rados/rgw_zone.h b/src/rgw/driver/rados/rgw_zone.h index f0dccdbc4e9..2eb3e725253 100644 --- a/src/rgw/driver/rados/rgw_zone.h +++ b/src/rgw/driver/rados/rgw_zone.h @@ -114,6 +114,7 @@ struct RGWZoneParams : RGWSystemMetaObj { rgw_pool otp_pool; rgw_pool oidc_pool; rgw_pool notif_pool; + rgw_pool topics_pool; RGWAccessKey system_key; @@ -150,7 +151,7 @@ struct RGWZoneParams : RGWSystemMetaObj { const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const; void encode(bufferlist& bl) const override { - ENCODE_START(14, 1, bl); + ENCODE_START(15, 1, bl); encode(domain_root, bl); encode(control_pool, bl); encode(gc_pool, bl); @@ -176,11 +177,12 @@ struct RGWZoneParams : RGWSystemMetaObj { encode(tier_config, bl); encode(oidc_pool, bl); encode(notif_pool, bl); + encode(topics_pool, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) override { - DECODE_START(14, bl); + DECODE_START(15, bl); decode(domain_root, bl); decode(control_pool, bl); decode(gc_pool, bl); @@ -249,6 +251,11 @@ struct RGWZoneParams : RGWSystemMetaObj { } else { notif_pool = log_pool.name + ":notif"; } + if (struct_v >= 15) { + decode(topics_pool, bl); + } else { + topics_pool = name + ".rgw.meta:topics"; + } DECODE_FINISH(bl); } void dump(Formatter *f) const; diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 0be8dacf862..ced7b15120d 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -10634,10 +10634,15 @@ next: } if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) { - RGWPubSub ps(driver, tenant); - + auto site = std::make_unique(); + ret = site->load(dpp(), null_yield, cfgstore.get()); + if (ret < 0) { + std::cerr << "Unable to initialize site config." << std::endl; + exit(1); + } + RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups); rgw_pubsub_topics result; - int ret = ps.get_topics(dpp(), result, null_yield); + ret = ps.get_topics(dpp(), result, null_yield); if (ret < 0 && ret != -ENOENT) { cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -10661,8 +10666,13 @@ next: cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; return EINVAL; } - - RGWPubSub ps(driver, tenant); + auto site = std::make_unique(); + ret = site->load(dpp(), null_yield, cfgstore.get()); + if (ret < 0) { + std::cerr << "Unable to initialize site config." << std::endl; + exit(1); + } + RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups); rgw_pubsub_topic topic; ret = ps.get_topic(dpp(), topic_name, topic, null_yield); @@ -10723,7 +10733,13 @@ next: return -ret; } - RGWPubSub ps(driver, tenant); + auto site = std::make_unique(); + ret = site->load(dpp(), null_yield, cfgstore.get()); + if (ret < 0) { + std::cerr << "Unable to initialize site config." << std::endl; + exit(1); + } + RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups); ret = ps.remove_topic(dpp(), topic_name, null_yield); if (ret < 0) { diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 7031c2363f0..c563b959863 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -9,6 +9,7 @@ #include "rgw_xml.h" #include "rgw_arn.h" #include "rgw_pubsub_push.h" +#include "common/errno.h" #include #include @@ -371,6 +372,15 @@ void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const f->close_section(); // Attributes } +void rgw_pubsub_topic::decode_json(JSONObj* f) { + JSONDecoder::decode_json("user", user, f); + JSONDecoder::decode_json("name", name, f); + JSONDecoder::decode_json("dest", dest, f); + JSONDecoder::decode_json("arn", arn, f); + JSONDecoder::decode_json("opaqueData", opaque_data, f); + JSONDecoder::decode_json("policy", policy_text, f); +} + void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) { f->open_array_section(name); @@ -462,13 +472,74 @@ std::string rgw_pubsub_dest::to_json_str() const return ss.str(); } +void rgw_pubsub_dest::decode_json(JSONObj* f) { + using rgw::notify::DEFAULT_CONFIG; + using rgw::notify::DEFAULT_GLOBAL_VALUE; + JSONDecoder::decode_json("push_endpoint", push_endpoint, f); + JSONDecoder::decode_json("push_endpoint_args", push_endpoint_args, f); + JSONDecoder::decode_json("push_endpoint_topic", arn_topic, f); + JSONDecoder::decode_json("stored_secret", stored_secret, f); + JSONDecoder::decode_json("persistent", persistent, f); + std::string ttl; + JSONDecoder::decode_json("time_to_live", ttl, f); + time_to_live = ttl == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE : std::stoul(ttl); + + std::string max_retry; + JSONDecoder::decode_json("max_retries", max_retry, f); + max_retries = max_retry == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE + : std::stoul(max_retry); + + std::string sleep_dur; + JSONDecoder::decode_json("retry_sleep_duration", sleep_dur, f); + retry_sleep_duration = sleep_dur == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE + : std::stoul(sleep_dur); +} + RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant) : driver(_driver), tenant(_tenant) {} -int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, +RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, + const std::string& _tenant, + const std::map* _zonegroups) + : driver(_driver), tenant(_tenant), zonegroups(_zonegroups) { + use_notification_v2 = do_all_zonegroups_support_notification_v2(*zonegroups); +} + +int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, RGWObjVersionTracker *objv_tracker, optional_yield y) const { + if (use_notification_v2) { + void* handle = NULL; + auto ret = + driver->meta_list_keys_init(dpp, "topic", std::string(), &handle); + if (ret < 0) { + return ret; + } + bool truncated; + int max = 1000; + do { + std::list topics; + ret = driver->meta_list_keys_next(dpp, handle, max, topics, &truncated); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; + break; + } + for (auto& topic_name : topics) { + rgw_pubsub_topic topic; + int ret = get_topic(dpp, topic_name, topic, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name + << "' info: ret=" << ret << dendl; + continue; + } + result.topics[topic_name] = std::move(topic); + } + } while (truncated); + driver->meta_list_keys_complete(handle); + return ret; + } const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp); if (ret < 0) { ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; @@ -514,6 +585,14 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const { + if (use_notification_v2) { + const int ret = driver->read_topic(name, tenant, result, nullptr, y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name + << " tenant: " << tenant << ", ret=" << ret << dendl; + } + return ret; + } rgw_pubsub_topics topics; const int ret = read_topics(dpp, topics, nullptr, y); if (ret < 0) { @@ -563,6 +642,15 @@ int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, con int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) const { return create_notification(dpp, topic_name, events, std::nullopt, "", y); + } +bool do_all_zonegroups_support_notification_v2( + std::map zonegroups) { + for (const auto& [_, zonegroup] : zonegroups) { + if (!zonegroup.supports(rgw::zone_features::notification_v2)) { + return false; + } + } + return true; } int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, @@ -694,6 +782,19 @@ int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optio return 0; } +int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, + const rgw_pubsub_topic& topic, + optional_yield y) const { + RGWObjVersionTracker objv_tracker; + const auto ret = driver->write_topic(topic, &objv_tracker, y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret + << dendl; + } + + return ret; +} + int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, const std::string& name, const rgw_pubsub_dest& dest, const std::string& arn, @@ -701,6 +802,16 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, const rgw_user& user, const std::string& policy_text, optional_yield y) const { + if (use_notification_v2) { + rgw_pubsub_topic new_topic; + new_topic.user = user; + new_topic.name = name; + new_topic.dest = dest; + new_topic.arn = arn; + new_topic.opaque_data = opaque_data; + new_topic.policy_text = policy_text; + return create_topic(dpp, new_topic, y); + } RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; @@ -728,8 +839,34 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, return 0; } +int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp, + const std::string& name, + optional_yield y) const { + RGWObjVersionTracker objv_tracker; + rgw_pubsub_topic topic; + int ret = get_topic(dpp, name, topic, y); + if (ret < 0 && ret != -ENOENT) { + return ret; + } else if (ret == -ENOENT) { + // its not an error if no topics exist, just a no-op + ldpp_dout(dpp, 10) << "WARNING: topic name:" << name + << " does not exist, deletion is a no-op: ret=" << ret + << dendl; + return 0; + } + ret = driver->remove_topic(name, tenant, &objv_tracker, y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret + << dendl; + } + return ret; +} + int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const { + if (use_notification_v2) { + return remove_topic_v2(dpp, name, y); + } RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; @@ -753,4 +890,3 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na return 0; } - diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index ddc72f99b07..4afc101c63c 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -389,6 +389,7 @@ struct rgw_pubsub_dest { void dump(Formatter *f) const; void dump_xml(Formatter *f) const; std::string to_json_str() const; + void decode_json(JSONObj* obj); }; WRITE_CLASS_ENCODER(rgw_pubsub_dest) @@ -435,6 +436,7 @@ struct rgw_pubsub_topic { void dump(Formatter *f) const; void dump_xml(Formatter *f) const; void dump_xml_as_attributes(Formatter *f) const; + void decode_json(JSONObj* obj); bool operator<(const rgw_pubsub_topic& t) const { return to_str().compare(t.to_str()); @@ -558,6 +560,8 @@ class RGWPubSub rgw::sal::Driver* const driver; const std::string tenant; + const std::map* zonegroups = nullptr; + bool use_notification_v2 = false; int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, RGWObjVersionTracker* objv_tracker, optional_yield y) const; @@ -567,6 +571,10 @@ class RGWPubSub public: RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant); + RGWPubSub(rgw::sal::Driver* _driver, + const std::string& _tenant, + const std::map* zonegroups); + class Bucket { friend class RGWPubSub; const RGWPubSub& ps; @@ -639,6 +647,18 @@ public: // if the topic does not exists it is a no-op (considered success) // return 0 on success, error code otherwise int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const; + // remove a topic according to its name + // if the topic does not exists it is a no-op (considered success) + // return 0 on success, error code otherwise + int remove_topic_v2(const DoutPrefixProvider* dpp, + const std::string& name, + optional_yield y) const; + // create a topic with a name only + // if the topic already exists it is a no-op (considered success) + // return 0 on success, error code otherwise + int create_topic(const DoutPrefixProvider* dpp, + const rgw_pubsub_topic& topic, + optional_yield y) const; }; namespace rgw::notify { @@ -648,4 +668,7 @@ namespace rgw::notify { constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX; // Used in case the topic is using the default global value for dumping in a formatter constexpr static const std::string_view DEFAULT_CONFIG{"None"}; -} \ No newline at end of file +} + +bool do_all_zonegroups_support_notification_v2( + std::map zonegroups); diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 191f535d82b..7396fcfddd4 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -17,6 +17,7 @@ #include "services/svc_zone.h" #include "common/dout.h" #include "rgw_url.h" +#include "rgw_process_env.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw @@ -138,21 +139,13 @@ class RGWPSCreateTopicOp : public RGWOp { return -EINVAL; } - // Remove the args that are parsed, so the push_endpoint_args only contains - // necessary one's. opaque_data = s->info.args.get("OpaqueData"); - s->info.args.remove("OpaqueData"); dest.push_endpoint = s->info.args.get("push-endpoint"); - s->info.args.remove("push-endpoint"); s->info.args.get_bool("persistent", &dest.persistent, false); - s->info.args.remove("persistent"); s->info.args.get_int("time_to_live", reinterpret_cast(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE); - s->info.args.remove("time_to_live"); s->info.args.get_int("max_retries", reinterpret_cast(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE); - s->info.args.remove("max_retries"); s->info.args.get_int("retry_sleep_duration", reinterpret_cast(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE); - s->info.args.remove("retry_sleep_duration"); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { return -EINVAL; @@ -162,8 +155,19 @@ class RGWPSCreateTopicOp : public RGWOp { if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) { return -ERR_MALFORMED_DOC; } - s->info.args.remove("Policy"); + // Remove the args that are parsed, so the push_endpoint_args only contains + // necessary one's which is parsed after this if. but only if master zone, + // else we do not remove as request is forwarded to master. + if (driver->is_meta_master()) { + s->info.args.remove("OpaqueData"); + s->info.args.remove("push-endpoint"); + s->info.args.remove("persistent"); + s->info.args.remove("time_to_live"); + s->info.args.remove("max_retries"); + s->info.args.remove("retry_sleep_duration"); + s->info.args.remove("Policy"); + } for (const auto& param : s->info.args.get_params()) { if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") { continue; @@ -193,7 +197,8 @@ class RGWPSCreateTopicOp : public RGWOp { return ret; } - const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); rgw_pubsub_topic result; ret = ps.get_topic(this, topic_name, result, y); if (ret == -ENOENT) { @@ -252,6 +257,18 @@ class RGWPSCreateTopicOp : public RGWOp { }; void RGWPSCreateTopicOp::execute(optional_yield y) { + // master request will replicate the topic creation. + bufferlist indata; + if (!driver->is_meta_master()) { + op_ret = rgw_forward_request_to_master( + this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) + << "CreateTopic forward_request_to_master returned ret = " << op_ret + << dendl; + return; + } + } if (!dest.push_endpoint.empty() && dest.persistent) { op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield); if (op_ret < 0) { @@ -261,7 +278,8 @@ void RGWPSCreateTopicOp::execute(optional_yield y) { return; } } - const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, s->owner.id, policy_text, y); if (op_ret < 0) { @@ -316,7 +334,8 @@ public: }; void RGWPSListTopicsOp::execute(optional_yield y) { - const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); op_ret = ps.get_topics(this, result, y); // if there are no topics it is not considered an error op_ret = op_ret == -ENOENT ? 0 : op_ret; @@ -403,7 +422,8 @@ void RGWPSGetTopicOp::execute(optional_yield y) { if (op_ret < 0) { return; } - const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); op_ret = ps.get_topic(this, topic_name, result, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; @@ -487,7 +507,8 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) { if (op_ret < 0) { return; } - const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); op_ret = ps.get_topic(this, topic_name, result, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; @@ -615,7 +636,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp { return ret; } rgw_pubsub_topic result; - const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); ret = ps.get_topic(this, topic_name, result, y); if (ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name @@ -664,6 +686,17 @@ class RGWPSSetTopicAttributesOp : public RGWOp { }; void RGWPSSetTopicAttributesOp::execute(optional_yield y) { + if (!driver->is_meta_master()) { + bufferlist indata; + op_ret = rgw_forward_request_to_master( + this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) + << "SetTopicAttributes forward_request_to_master returned ret = " + << op_ret << dendl; + return; + } + } if (!dest.push_endpoint.empty() && dest.persistent) { op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield); if (op_ret < 0) { @@ -682,7 +715,8 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) { return; } } - const RGWPubSub ps(driver, s->owner.id.tenant); + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, topic_owner, policy_text, y); if (op_ret < 0) { @@ -752,7 +786,20 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) { if (op_ret < 0) { return; } - const RGWPubSub ps(driver, s->owner.id.tenant); + if (!driver->is_meta_master()) { + bufferlist indata; + op_ret = rgw_forward_request_to_master( + this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y); + if (op_ret < 0) { + ldpp_dout(this, 1) + << "DeleteTopic forward_request_to_master returned ret = " << op_ret + << dendl; + return; + } + } + const RGWPubSub ps(driver, s->owner.id.tenant, + &s->penv.site->get_period()->get_map().zonegroups); + rgw_pubsub_topic result; op_ret = ps.get_topic(this, topic_name, result, y); if (op_ret == 0) { @@ -809,6 +856,13 @@ bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s) } return false; } +bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_info& info) { + if (info.args.exists("Action")) { + const std::string action_name = info.args.get("Action"); + return op_generators.contains(action_name); + } + return false; +} RGWOp *RGWHandler_REST_PSTopic_AWS::op_post() { diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index bb43029f776..186fb109c2f 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -5206,8 +5206,12 @@ void parse_post_action(const std::string& post_body, req_state* s) } } } - const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body); - s->info.args.append("PayloadHash", payload_hash); + // PayloadHash is present if request is fwd from secondary site in multisite + // environment, so then do not calculate and append. + if (!s->info.args.exists("PayloadHash")) { + const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body); + s->info.args.append("PayloadHash", payload_hash); + } } RGWHandler_REST* RGWRESTMgr_S3::get_handler(rgw::sal::Driver* driver, @@ -5623,7 +5627,9 @@ AWSSignerV4::prepare(const DoutPrefixProvider *dpp, const char* exp_payload_hash = nullptr; string payload_hash; - if (is_non_s3_op) { + // if the request is related to topics (bucket notification), they are part of + // sns service and hence it's a no_s3_op, + if (is_non_s3_op || RGWHandler_REST_PSTopic_AWS::action_exists(info)) { //For non s3 ops, we need to calculate the payload hash payload_hash = info.args.get("PayloadHash"); exp_payload_hash = payload_hash.c_str(); diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index eb4d0c348fa..89ac23341d2 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -44,7 +44,7 @@ class RGWCompressionInfo; struct rgw_pubsub_topics; struct rgw_pubsub_bucket_topics; class RGWZonePlacementInfo; - +struct rgw_pubsub_topic; using RGWBucketListNameFilter = std::function; @@ -314,6 +314,24 @@ class Driver { /** Remove the topic config, optionally a specific version */ virtual int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, optional_yield y,const DoutPrefixProvider *dpp) = 0; + /** Read the topic config entry into data and (optionally) objv_tracker */ + virtual int read_topic_v2(const std::string& topic_name, + const std::string& tenant, + rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) = 0; + /** Write topic info and (optionally) @a objv_tracker into the config */ + virtual int write_topic_v2(const rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) = 0; + /** Remove the topic config, optionally a specific version */ + virtual int remove_topic_v2(const std::string& topic_name, + const std::string& tenant, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) = 0; /** Get access to the lifecycle management thread */ virtual RGWLC* get_rgwlc(void) = 0; /** Get access to the coroutine registry. Used to create new coroutine managers */ @@ -1449,6 +1467,8 @@ public: virtual int list_zones(std::list& zone_ids) = 0; /** Clone a copy of this zonegroup. */ virtual std::unique_ptr clone() = 0; + /** Determine if zonegroup |feature| is supported.*/ + virtual bool supports_feature(std::string_view feature) const = 0; }; /** diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 3c0c7c76519..140b2839662 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -259,6 +259,9 @@ protected: std::unique_ptrzg = std::make_unique(*group.get()); return std::make_unique(store, std::move(zg)); } + virtual bool supports_feature(std::string_view feature) const override { + return group->supports(feature); + } }; class DBZone : public StoreZone { diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index b5c4c4dfc68..71991378e32 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -75,6 +75,9 @@ public: std::unique_ptr nzg = next->clone(); return std::make_unique(std::move(nzg)); } + virtual bool supports_feature(std::string_view feature) const override { + return next->supports_feature(feature); + } }; class FilterZone : public Zone { @@ -194,6 +197,27 @@ public: optional_yield y, const DoutPrefixProvider *dpp) override { return next->remove_topics(tenant, objv_tracker, y, dpp); } + int read_topic_v2(const std::string& topic_name, + const std::string& tenant, + rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return next->read_topic_v2(topic_name, tenant, topic, objv_tracker, y, dpp); + } + int write_topic_v2(const rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return next->write_topic_v2(topic, objv_tracker, y, dpp); + } + int remove_topic_v2(const std::string& topic_name, + const std::string& tenant, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp); + } virtual RGWLC* get_rgwlc(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override; diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index 7c35258dd5d..eda0f08ede6 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -34,6 +34,27 @@ class StoreDriver : public Driver { optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;} int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;} + int read_topic_v2(const std::string& topic_name, + const std::string& tenant, + rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return -EOPNOTSUPP; + } + int write_topic_v2(const rgw_pubsub_topic& topic, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return -ENOENT; + } + int remove_topic_v2(const std::string& topic_name, + const std::string& tenant, + RGWObjVersionTracker* objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return -ENOENT; + } }; class StoreUser : public User { diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index aeb58e2f48f..ed438dead93 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -296,12 +296,12 @@ void RGWZoneParams::decode_json(JSONObj *obj) JSONDecoder::decode_json("user_swift_pool", user_swift_pool, obj); JSONDecoder::decode_json("user_uid_pool", user_uid_pool, obj); JSONDecoder::decode_json("otp_pool", otp_pool, obj); + JSONDecoder::decode_json("notif_pool", notif_pool, obj); + JSONDecoder::decode_json("topics_pool", topics_pool, obj); JSONDecoder::decode_json("system_key", system_key, obj); JSONDecoder::decode_json("placement_pools", placement_pools, obj); JSONDecoder::decode_json("tier_config", tier_config, obj); JSONDecoder::decode_json("realm_id", realm_id, obj); - JSONDecoder::decode_json("notif_pool", notif_pool, obj); - } void RGWZoneParams::dump(Formatter *f) const @@ -321,11 +321,12 @@ void RGWZoneParams::dump(Formatter *f) const encode_json("user_swift_pool", user_swift_pool, f); encode_json("user_uid_pool", user_uid_pool, f); encode_json("otp_pool", otp_pool, f); + encode_json("notif_pool", notif_pool, f); + encode_json("topics_pool", topics_pool, f); encode_json_plain("system_key", system_key, f); encode_json("placement_pools", placement_pools, f); encode_json("tier_config", tier_config, f); encode_json("realm_id", realm_id, f); - encode_json("notif_pool", notif_pool, f); } int RGWZoneParams::init(const DoutPrefixProvider *dpp, @@ -480,6 +481,7 @@ void add_zone_pools(const RGWZoneParams& info, pools.insert(info.reshard_pool); pools.insert(info.oidc_pool); pools.insert(info.notif_pool); + pools.insert(info.topics_pool); for (const auto& [pname, placement] : info.placement_pools) { pools.insert(placement.index_pool); @@ -584,6 +586,7 @@ int RGWZoneParams::fix_pool_names(const DoutPrefixProvider *dpp, optional_yield otp_pool = fix_zone_pool_dup(pools, name, ".rgw.otp", otp_pool); oidc_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:oidc", oidc_pool); notif_pool = fix_zone_pool_dup(pools, name ,".rgw.log:notif", notif_pool); + topics_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:topics", topics_pool); for(auto& iter : placement_pools) { iter.second.index_pool = fix_zone_pool_dup(pools, name, "." + default_bucket_index_pool_suffix, @@ -1245,6 +1248,8 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y, info.otp_pool = fix_zone_pool_dup(pools, info.name, ".rgw.otp", info.otp_pool); info.oidc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:oidc", info.oidc_pool); info.notif_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:notif", info.notif_pool); + info.topics_pool = + fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool); for (auto& [pname, placement] : info.placement_pools) { placement.index_pool = fix_zone_pool_dup(pools, info.name, "." + default_bucket_index_pool_suffix, placement.index_pool); diff --git a/src/rgw/rgw_zone_features.h b/src/rgw/rgw_zone_features.h index 5e1a435d488..600460735a8 100644 --- a/src/rgw/rgw_zone_features.h +++ b/src/rgw/rgw_zone_features.h @@ -15,11 +15,13 @@ namespace rgw::zone_features { // zone feature names inline constexpr std::string_view resharding = "resharding"; inline constexpr std::string_view compress_encrypted = "compress-encrypted"; +inline constexpr std::string_view notification_v2 = "notification_v2"; // static list of features supported by this release inline constexpr std::initializer_list supported = { - resharding, - compress_encrypted, + resharding, + compress_encrypted, + notification_v2, }; inline constexpr bool supports(std::string_view feature) { @@ -33,7 +35,8 @@ inline constexpr bool supports(std::string_view feature) { // static list of features enabled by default on new zonegroups inline constexpr std::initializer_list enabled = { - resharding, + resharding, + notification_v2, }; diff --git a/src/rgw/services/svc_topic_rados.cc b/src/rgw/services/svc_topic_rados.cc new file mode 100644 index 00000000000..64c9106776c --- /dev/null +++ b/src/rgw/services/svc_topic_rados.cc @@ -0,0 +1,207 @@ +#include "svc_topic_rados.h" +#include "rgw_notify.h" +#include "rgw_tools.h" +#include "rgw_zone.h" +#include "svc_meta.h" +#include "svc_meta_be_sobj.h" +#include "svc_zone.h" + +#define dout_subsys ceph_subsys_rgw + +static std::string topic_oid_prefix = "topic."; +static constexpr char topic_tenant_delim[] = ":"; + +std::string get_topic_key(const std::string& topic_name, + const std::string& tenant) { + if (tenant.empty()) { + return topic_name; + } + return tenant + topic_tenant_delim + topic_name; +} + +void parse_topic_entry(const std::string& topic_entry, + std::string* tenant_name, + std::string* topic_name) { + // expected format: [tenant_name:]topic_name* + auto pos = topic_entry.find(topic_tenant_delim); + if (pos != std::string::npos) { + *tenant_name = topic_entry.substr(0, pos); + *topic_name = topic_entry.substr(pos + 1); + } else { + tenant_name->clear(); + *topic_name = topic_entry; + } +} +class RGWSI_Topic_Module : public RGWSI_MBSObj_Handler_Module { + RGWSI_Topic_RADOS::Svc& svc; + const std::string prefix; + + public: + RGWSI_Topic_Module(RGWSI_Topic_RADOS::Svc& _svc) + : RGWSI_MBSObj_Handler_Module("topic"), + svc(_svc), + prefix(topic_oid_prefix) {} + + void get_pool_and_oid(const std::string& key, + rgw_pool* pool, + std::string* oid) override { + if (pool) { + *pool = svc.zone->get_zone_params().topics_pool; + } + + if (oid) { + *oid = key_to_oid(key); + } + } + + bool is_valid_oid(const std::string& oid) override { + return boost::algorithm::starts_with(oid, prefix); + } + + std::string key_to_oid(const std::string& key) override { + return prefix + key; + } + + // This is called after `is_valid_oid` and is assumed to be a valid oid + std::string oid_to_key(const std::string& oid) override { + return oid.substr(prefix.size()); + } + + const std::string& get_oid_prefix() { return prefix; } +}; + +RGWSI_MetaBackend_Handler* RGWSI_Topic_RADOS::get_be_handler() { + return be_handler; +} + +void RGWSI_Topic_RADOS::init(RGWSI_Zone* _zone_svc, + RGWSI_Meta* _meta_svc, + RGWSI_MetaBackend* _meta_be_svc, + RGWSI_SysObj* _sysobj_svc) { + svc.zone = _zone_svc; + svc.meta = _meta_svc; + svc.meta_be = _meta_be_svc; + svc.sysobj = _sysobj_svc; +} + +int RGWSI_Topic_RADOS::do_start(optional_yield y, + const DoutPrefixProvider* dpp) { + int r = svc.meta->create_be_handler(RGWSI_MetaBackend::Type::MDBE_SOBJ, + &be_handler); + if (r < 0) { + ldout(ctx(), 0) << "ERROR: failed to create be_handler for Topics: r=" << r + << dendl; + return r; + } + + auto module = new RGWSI_Topic_Module(svc); + RGWSI_MetaBackend_Handler_SObj* bh = + static_cast(be_handler); + be_module.reset(module); + bh->set_module(module); + return 0; +} + +RGWTopicMetadataHandler::RGWTopicMetadataHandler(rgw::sal::Driver* driver, + RGWSI_Topic_RADOS* topic_svc) { + this->driver = driver; + this->topic_svc = topic_svc; + base_init(topic_svc->ctx(), topic_svc->get_be_handler()); +} + +RGWMetadataObject* RGWTopicMetadataHandler::get_meta_obj( + JSONObj* jo, const obj_version& objv, const ceph::real_time& mtime) { + rgw_pubsub_topic topic; + try { + topic.decode_json(jo); + } catch (JSONDecoder::err& e) { + return nullptr; + } + + return new RGWTopicMetadataObject(topic, objv, mtime, driver); +} + +int RGWTopicMetadataHandler::do_get(RGWSI_MetaBackend_Handler::Op* op, + std::string& entry, RGWMetadataObject** obj, + optional_yield y, + const DoutPrefixProvider* dpp) { + rgw_pubsub_topic result; + std::string topic_name; + std::string tenant; + parse_topic_entry(entry, &tenant, &topic_name); + RGWPubSub ps(driver, tenant, + &topic_svc->svc.zone->get_current_period().get_map().zonegroups); + int ret = ps.get_topic(dpp, topic_name, result, y); + if (ret < 0) { + return ret; + } + ceph::real_time mtime; + obj_version ver; + RGWTopicMetadataObject* rdo = + new RGWTopicMetadataObject(result, ver, mtime, driver); + *obj = rdo; + return 0; +} + +int RGWTopicMetadataHandler::do_remove(RGWSI_MetaBackend_Handler::Op* op, + std::string& entry, + RGWObjVersionTracker& objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp) { + auto ret = rgw::notify::remove_persistent_topic(entry, y); + if (ret != -ENOENT && ret < 0) { + return ret; + } + std::string topic_name; + std::string tenant; + parse_topic_entry(entry, &tenant, &topic_name); + RGWPubSub ps(driver, tenant, + &topic_svc->svc.zone->get_current_period().get_map().zonegroups); + return ps.remove_topic(dpp, topic_name, y); +} + +class RGWMetadataHandlerPut_Topic : public RGWMetadataHandlerPut_SObj { + RGWTopicMetadataHandler* rhandler; + RGWTopicMetadataObject* mdo; + + public: + RGWMetadataHandlerPut_Topic(RGWTopicMetadataHandler* handler, + RGWSI_MetaBackend_Handler::Op* op, + std::string& entry, RGWMetadataObject* obj, + RGWObjVersionTracker& objv_tracker, + optional_yield y, RGWMDLogSyncType type, + bool from_remote_zone) + : RGWMetadataHandlerPut_SObj(handler, op, entry, obj, objv_tracker, y, + type, from_remote_zone), + rhandler(handler) { + mdo = static_cast(obj); + } + + int put_checked(const DoutPrefixProvider* dpp) override { + auto& topic = mdo->get_topic_info(); + auto* driver = mdo->get_driver(); + auto ret = rgw::notify::add_persistent_topic(entry, y); + if (ret < 0) { + return ret; + } + RGWObjVersionTracker objv_tracker; + ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret + << dendl; + } + return ret; + } +}; + +int RGWTopicMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op* op, + std::string& entry, RGWMetadataObject* obj, + RGWObjVersionTracker& objv_tracker, + optional_yield y, + const DoutPrefixProvider* dpp, + RGWMDLogSyncType type, + bool from_remote_zone) { + RGWMetadataHandlerPut_Topic put_op(this, op, entry, obj, objv_tracker, y, + type, from_remote_zone); + return do_put_operate(&put_op, dpp); +} diff --git a/src/rgw/services/svc_topic_rados.h b/src/rgw/services/svc_topic_rados.h new file mode 100644 index 00000000000..e630a610e97 --- /dev/null +++ b/src/rgw/services/svc_topic_rados.h @@ -0,0 +1,96 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "rgw_pubsub.h" +#include "rgw_service.h" +#include "svc_meta_be.h" + +class RGWSI_Topic_RADOS : public RGWServiceInstance { + public: + struct Svc { + RGWSI_Zone* zone{nullptr}; + RGWSI_Meta* meta{nullptr}; + RGWSI_MetaBackend* meta_be{nullptr}; + RGWSI_SysObj* sysobj{nullptr}; + } svc; + + RGWSI_Topic_RADOS(CephContext* cct) : RGWServiceInstance(cct) {} + ~RGWSI_Topic_RADOS() {} + + void init(RGWSI_Zone* _zone_svc, + RGWSI_Meta* _meta_svc, + RGWSI_MetaBackend* _meta_be_svc, + RGWSI_SysObj* _sysobj_svc); + + RGWSI_MetaBackend_Handler* get_be_handler(); + int do_start(optional_yield y, const DoutPrefixProvider* dpp) override; + + private: + RGWSI_MetaBackend_Handler* be_handler; + std::unique_ptr be_module; +}; + +class RGWTopicMetadataObject : public RGWMetadataObject { + rgw_pubsub_topic topic; + rgw::sal::Driver* driver; + + public: + RGWTopicMetadataObject() = default; + RGWTopicMetadataObject(rgw_pubsub_topic& topic, const obj_version& v, + real_time m, rgw::sal::Driver* driver) + : RGWMetadataObject(v, m), topic(topic), driver(driver) {} + + void dump(Formatter* f) const override { topic.dump(f); } + + rgw_pubsub_topic& get_topic_info() { return topic; } + + rgw::sal::Driver* get_driver() { return driver; } +}; +class RGWTopicMetadataHandler : public RGWMetadataHandler_GenericMetaBE { + public: + RGWTopicMetadataHandler(rgw::sal::Driver* driver, + RGWSI_Topic_RADOS* role_svc); + + std::string get_type() final { return "topic"; } + + RGWMetadataObject* get_meta_obj(JSONObj* jo, const obj_version& objv, + const ceph::real_time& mtime); + + int do_get(RGWSI_MetaBackend_Handler::Op* op, std::string& entry, + RGWMetadataObject** obj, optional_yield y, + const DoutPrefixProvider* dpp) final; + + int do_remove(RGWSI_MetaBackend_Handler::Op* op, std::string& entry, + RGWObjVersionTracker& objv_tracker, optional_yield y, + const DoutPrefixProvider* dpp) final; + + int do_put(RGWSI_MetaBackend_Handler::Op* op, std::string& entr, + RGWMetadataObject* obj, RGWObjVersionTracker& objv_tracker, + optional_yield y, const DoutPrefixProvider* dpp, + RGWMDLogSyncType type, bool from_remote_zone) override; + + private: + rgw::sal::Driver* driver; + RGWSI_Topic_RADOS* topic_svc; +}; + +std::string get_topic_key(const std::string& topic_name, + const std::string& tenant); + +void parse_topic_entry(const std::string& topic_entry, + std::string* tenant_name, + std::string* topic_name);