services/svc_sys_obj_cache.cc
services/svc_sys_obj_core.cc
services/svc_tier_rados.cc
+ services/svc_topic_rados.cc
services/svc_user.cc
services/svc_user_rados.cc
services/svc_zone.cc
return std::make_unique<DaosZoneGroup>(store, group);
}
const RGWZoneGroup& get_group() { return group; }
+ virtual bool supports_feature(std::string_view feature) const override {
+ return group.supports(feature);
+ }
};
class DaosZone : public StoreZone {
virtual std::unique_ptr<ZoneGroup> clone() override {
return std::make_unique<MotrZoneGroup>(store, group);
}
+ virtual bool supports_feature(std::string_view feature) const override {
+ return group.supports(feature);
+ }
friend class MotrZone;
};
#include "rgw_sal_rados.h"
#include "rgw_pubsub.h"
#include "rgw_pubsub_push.h"
+#include "rgw_zone_features.h"
#include "rgw_perf_counters.h"
#include "common/dout.h"
#include <chrono>
int postauth_init(optional_yield) override { return 0; }
int authorize(const DoutPrefixProvider* dpp, optional_yield y) override;
static bool action_exists(const req_state* s);
+ static bool action_exists(const req_info& info);
};
#include "services/svc_role_rados.h"
#include "services/svc_user.h"
#include "services/svc_sys_obj_cache.h"
+#include "services/svc_topic_rados.h"
#include "cls/rgw/cls_rgw_client.h"
#include "rgw_pubsub.h"
objv_tracker, y);
}
+int RadosStore::read_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ bufferlist bl;
+ auto mtime = ceph::real_clock::zero();
+ RGWSI_MBSObj_GetParams params(&bl, nullptr, &mtime);
+ std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
+ svc()->topic->svc.meta_be->alloc_ctx());
+ ctx->init(svc()->topic->get_be_handler());
+ const int ret = svc()->topic->svc.meta_be->get(
+ ctx.get(), get_topic_key(topic_name, tenant), params, objv_tracker, y,
+ dpp);
+ if (ret < 0) {
+ return ret;
+ }
+
+ auto iter = bl.cbegin();
+ try {
+ decode(topic, iter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 20) << " failed to decode topic: " << topic_name
+ << ". error: " << err.what() << dendl;
+ return -EIO;
+ }
+ return 0;
+}
+
+int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ bufferlist bl;
+ encode(topic, bl);
+ RGWSI_MBSObj_PutParams params(bl, nullptr, ceph::real_clock::zero(),
+ /*exclusive*/ false);
+ std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
+ svc()->topic->svc.meta_be->alloc_ctx());
+ ctx->init(svc()->topic->get_be_handler());
+ return svc()->topic->svc.meta_be->put(
+ ctx.get(), get_topic_key(topic.name, topic.user.tenant), params,
+ objv_tracker, y, dpp);
+}
+
+int RadosStore::remove_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ RGWSI_MBSObj_RemoveParams params;
+ std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
+ svc()->topic->svc.meta_be->alloc_ctx());
+ ctx->init(svc()->topic->get_be_handler());
+ return svc()->topic->svc.meta_be->remove(ctx.get(),
+ get_topic_key(topic_name, tenant),
+ params, objv_tracker, y, dpp);
+}
+
int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
{
return rados->delete_raw_obj(dpp, obj, y);
virtual std::unique_ptr<ZoneGroup> clone() override {
return std::make_unique<RadosZoneGroup>(store, group);
}
+ virtual bool supports_feature(std::string_view feature) const override {
+ return group.supports(feature);
+ }
const RGWZoneGroup& get_group() const { return group; }
};
optional_yield y, const DoutPrefixProvider *dpp) override;
int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override;
+ int read_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int write_topic_v2(const rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int remove_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
#include "services/svc_sys_obj_core.h"
#include "services/svc_user_rados.h"
#include "services/svc_role_rados.h"
+#include "services/svc_topic_rados.h"
#include "common/errno.h"
#include "rgw_sal_rados.h"
#include "rgw_user.h"
#include "rgw_role.h"
+#include "rgw_pubsub.h"
#define dout_subsys ceph_subsys_rgw
role_rados = std::make_unique<RGWSI_Role_RADOS>(cct);
async_processor = std::make_unique<RGWAsyncRadosProcessor>(
cct, cct->_conf->rgw_num_async_rados_threads);
+ topic_rados = std::make_unique<RGWSI_Topic_RADOS>(cct);
if (have_cache) {
sysobj_cache = std::make_unique<RGWSI_SysObj_Cache>(dpp, cct);
user_rados->init(driver->getRados()->get_rados_handle(), zone.get(), sysobj.get(), sysobj_cache.get(),
meta.get(), meta_be_sobj.get(), sync_modules.get());
role_rados->init(zone.get(), meta.get(), meta_be_sobj.get(), sysobj.get());
-
+ topic_rados->init(zone.get(), meta.get(), meta_be_sobj.get(), sysobj.get());
can_shutdown = true;
int r = finisher->start(y, dpp);
ldout(cct, 0) << "ERROR: failed to start role_rados service (" << cpp_strerror(-r) << dendl;
return r;
}
-
+ r = topic_rados->start(y, dpp);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start topic_rados service ("
+ << cpp_strerror(-r) << dendl;
+ return r;
+ }
}
/* cache or core services will be started by sysobj */
return;
}
+ topic_rados->shutdown();
role_rados->shutdown();
datalog_rados.reset();
user_rados->shutdown();
user = _svc.user_rados.get();
role = _svc.role_rados.get();
async_processor = _svc.async_processor.get();
+ topic = _svc.topic_rados.get();
return 0;
}
meta.otp.reset(RGWOTPMetaHandlerAllocator::alloc());
meta.role = std::make_unique<rgw::sal::RGWRoleMetadataHandler>(driver, svc.role);
+ meta.topic = std::make_unique<RGWTopicMetadataHandler>(driver, svc.topic);
user.reset(new RGWUserCtl(svc.zone, svc.user, (RGWUserMetadataHandler *)meta.user.get()));
bucket.reset(new RGWBucketCtl(svc.zone,
meta.bucket_instance = _ctl.meta.bucket_instance.get();
meta.otp = _ctl.meta.otp.get();
meta.role = _ctl.meta.role.get();
+ meta.topic = _ctl.meta.topic.get();
user = _ctl.user.get();
bucket = _ctl.bucket.get();
ldout(cct, 0) << "ERROR: failed to start init otp ctl (" << cpp_strerror(-r) << dendl;
return r;
}
+ r = meta.topic->attach(meta.mgr);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start init topic ctl ("
+ << cpp_strerror(-r) << dendl;
+ return r;
+ }
return 0;
}
class RGWDataChangesLog;
class RGWSI_Role_RADOS;
class RGWAsyncRadosProcessor;
+class RGWSI_Topic_RADOS;
struct RGWServices_Def
{
std::unique_ptr<RGWSI_Role_RADOS> role_rados;
std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
+ std::unique_ptr<RGWSI_Topic_RADOS> topic_rados;
RGWServices_Def();
~RGWServices_Def();
RGWSI_User *user{nullptr};
RGWSI_Role_RADOS *role{nullptr};
RGWAsyncRadosProcessor* async_processor;
+ RGWSI_Topic_RADOS* topic{nullptr};
int do_init(CephContext *cct, rgw::sal::RadosStore* store, bool have_cache,
bool raw_storage, bool run_sync, optional_yield y,
std::unique_ptr<RGWMetadataHandler> user;
std::unique_ptr<RGWMetadataHandler> otp;
std::unique_ptr<RGWMetadataHandler> role;
+ std::unique_ptr<RGWMetadataHandler> topic;
_meta();
~_meta();
RGWMetadataHandler *user{nullptr};
RGWMetadataHandler *otp{nullptr};
RGWMetadataHandler *role{nullptr};
+ RGWMetadataHandler* topic{nullptr};
} meta;
RGWUserCtl *user{nullptr};
append_section_from_set(all_sections, "bucket.instance");
append_section_from_set(all_sections, "bucket");
append_section_from_set(all_sections, "roles");
+ append_section_from_set(all_sections, "topic");
std::move(all_sections.begin(), all_sections.end(),
std::back_inserter(sections));
rgw_pool otp_pool;
rgw_pool oidc_pool;
rgw_pool notif_pool;
+ rgw_pool topics_pool;
RGWAccessKey system_key;
const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
void encode(bufferlist& bl) const override {
- ENCODE_START(14, 1, bl);
+ ENCODE_START(15, 1, bl);
encode(domain_root, bl);
encode(control_pool, bl);
encode(gc_pool, bl);
encode(tier_config, bl);
encode(oidc_pool, bl);
encode(notif_pool, bl);
+ encode(topics_pool, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) override {
- DECODE_START(14, bl);
+ DECODE_START(15, bl);
decode(domain_root, bl);
decode(control_pool, bl);
decode(gc_pool, bl);
} else {
notif_pool = log_pool.name + ":notif";
}
+ if (struct_v >= 15) {
+ decode(topics_pool, bl);
+ } else {
+ topics_pool = name + ".rgw.meta:topics";
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
}
if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
- RGWPubSub ps(driver, tenant);
-
+ auto site = std::make_unique<rgw::SiteConfig>();
+ ret = site->load(dpp(), null_yield, cfgstore.get());
+ if (ret < 0) {
+ std::cerr << "Unable to initialize site config." << std::endl;
+ exit(1);
+ }
+ RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
rgw_pubsub_topics result;
- int ret = ps.get_topics(dpp(), result, null_yield);
+ ret = ps.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
-
- RGWPubSub ps(driver, tenant);
+ auto site = std::make_unique<rgw::SiteConfig>();
+ ret = site->load(dpp(), null_yield, cfgstore.get());
+ if (ret < 0) {
+ std::cerr << "Unable to initialize site config." << std::endl;
+ exit(1);
+ }
+ RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
rgw_pubsub_topic topic;
ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
return -ret;
}
- RGWPubSub ps(driver, tenant);
+ auto site = std::make_unique<rgw::SiteConfig>();
+ ret = site->load(dpp(), null_yield, cfgstore.get());
+ if (ret < 0) {
+ std::cerr << "Unable to initialize site config." << std::endl;
+ exit(1);
+ }
+ RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
ret = ps.remove_topic(dpp(), topic_name, null_yield);
if (ret < 0) {
#include "rgw_xml.h"
#include "rgw_arn.h"
#include "rgw_pubsub_push.h"
+#include "common/errno.h"
#include <regex>
#include <algorithm>
f->close_section(); // Attributes
}
+void rgw_pubsub_topic::decode_json(JSONObj* f) {
+ JSONDecoder::decode_json("user", user, f);
+ JSONDecoder::decode_json("name", name, f);
+ JSONDecoder::decode_json("dest", dest, f);
+ JSONDecoder::decode_json("arn", arn, f);
+ JSONDecoder::decode_json("opaqueData", opaque_data, f);
+ JSONDecoder::decode_json("policy", policy_text, f);
+}
+
void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
{
f->open_array_section(name);
return ss.str();
}
+void rgw_pubsub_dest::decode_json(JSONObj* f) {
+ using rgw::notify::DEFAULT_CONFIG;
+ using rgw::notify::DEFAULT_GLOBAL_VALUE;
+ JSONDecoder::decode_json("push_endpoint", push_endpoint, f);
+ JSONDecoder::decode_json("push_endpoint_args", push_endpoint_args, f);
+ JSONDecoder::decode_json("push_endpoint_topic", arn_topic, f);
+ JSONDecoder::decode_json("stored_secret", stored_secret, f);
+ JSONDecoder::decode_json("persistent", persistent, f);
+ std::string ttl;
+ JSONDecoder::decode_json("time_to_live", ttl, f);
+ time_to_live = ttl == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE : std::stoul(ttl);
+
+ std::string max_retry;
+ JSONDecoder::decode_json("max_retries", max_retry, f);
+ max_retries = max_retry == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE
+ : std::stoul(max_retry);
+
+ std::string sleep_dur;
+ JSONDecoder::decode_json("retry_sleep_duration", sleep_dur, f);
+ retry_sleep_duration = sleep_dur == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE
+ : std::stoul(sleep_dur);
+}
+
RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant)
: driver(_driver), tenant(_tenant)
{}
-int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
+RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
+ const std::string& _tenant,
+ const std::map<std::string, RGWZoneGroup>* _zonegroups)
+ : driver(_driver), tenant(_tenant), zonegroups(_zonegroups) {
+ use_notification_v2 = do_all_zonegroups_support_notification_v2(*zonegroups);
+}
+
+int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWObjVersionTracker *objv_tracker, optional_yield y) const
{
+ if (use_notification_v2) {
+ void* handle = NULL;
+ auto ret =
+ driver->meta_list_keys_init(dpp, "topic", std::string(), &handle);
+ if (ret < 0) {
+ return ret;
+ }
+ bool truncated;
+ int max = 1000;
+ do {
+ std::list<std::string> topics;
+ ret = driver->meta_list_keys_next(dpp, handle, max, topics, &truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
+ break;
+ }
+ for (auto& topic_name : topics) {
+ rgw_pubsub_topic topic;
+ int ret = get_topic(dpp, topic_name, topic, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name
+ << "' info: ret=" << ret << dendl;
+ continue;
+ }
+ result.topics[topic_name] = std::move(topic);
+ }
+ } while (truncated);
+ driver->meta_list_keys_complete(handle);
+ return ret;
+ }
const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
{
+ if (use_notification_v2) {
+ const int ret = driver->read_topic(name, tenant, result, nullptr, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
+ << " tenant: " << tenant << ", ret=" << ret << dendl;
+ }
+ return ret;
+ }
rgw_pubsub_topics topics;
const int ret = read_topics(dpp, topics, nullptr, y);
if (ret < 0) {
int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, optional_yield y) const {
return create_notification(dpp, topic_name, events, std::nullopt, "", y);
+ }
+bool do_all_zonegroups_support_notification_v2(
+ std::map<std::string, RGWZoneGroup> zonegroups) {
+ for (const auto& [_, zonegroup] : zonegroups) {
+ if (!zonegroup.supports(rgw::zone_features::notification_v2)) {
+ return false;
+ }
+ }
+ return true;
}
int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
return 0;
}
+int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
+ const rgw_pubsub_topic& topic,
+ optional_yield y) const {
+ RGWObjVersionTracker objv_tracker;
+ const auto ret = driver->write_topic(topic, &objv_tracker, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
+ << dendl;
+ }
+
+ return ret;
+}
+
int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
const std::string& name,
const rgw_pubsub_dest& dest, const std::string& arn,
const rgw_user& user,
const std::string& policy_text,
optional_yield y) const {
+ if (use_notification_v2) {
+ rgw_pubsub_topic new_topic;
+ new_topic.user = user;
+ new_topic.name = name;
+ new_topic.dest = dest;
+ new_topic.arn = arn;
+ new_topic.opaque_data = opaque_data;
+ new_topic.policy_text = policy_text;
+ return create_topic(dpp, new_topic, y);
+ }
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
return 0;
}
+int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
+ const std::string& name,
+ optional_yield y) const {
+ RGWObjVersionTracker objv_tracker;
+ rgw_pubsub_topic topic;
+ int ret = get_topic(dpp, name, topic, y);
+ if (ret < 0 && ret != -ENOENT) {
+ return ret;
+ } else if (ret == -ENOENT) {
+ // its not an error if no topics exist, just a no-op
+ ldpp_dout(dpp, 10) << "WARNING: topic name:" << name
+ << " does not exist, deletion is a no-op: ret=" << ret
+ << dendl;
+ return 0;
+ }
+ ret = driver->remove_topic(name, tenant, &objv_tracker, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
+ << dendl;
+ }
+ return ret;
+}
+
int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const
{
+ if (use_notification_v2) {
+ return remove_topic_v2(dpp, name, y);
+ }
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
return 0;
}
-
void dump(Formatter *f) const;
void dump_xml(Formatter *f) const;
std::string to_json_str() const;
+ void decode_json(JSONObj* obj);
};
WRITE_CLASS_ENCODER(rgw_pubsub_dest)
void dump(Formatter *f) const;
void dump_xml(Formatter *f) const;
void dump_xml_as_attributes(Formatter *f) const;
+ void decode_json(JSONObj* obj);
bool operator<(const rgw_pubsub_topic& t) const {
return to_str().compare(t.to_str());
rgw::sal::Driver* const driver;
const std::string tenant;
+ const std::map<std::string, RGWZoneGroup>* zonegroups = nullptr;
+ bool use_notification_v2 = false;
int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWObjVersionTracker* objv_tracker, optional_yield y) const;
public:
RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
+ RGWPubSub(rgw::sal::Driver* _driver,
+ const std::string& _tenant,
+ const std::map<std::string, RGWZoneGroup>* zonegroups);
+
class Bucket {
friend class RGWPubSub;
const RGWPubSub& ps;
// if the topic does not exists it is a no-op (considered success)
// return 0 on success, error code otherwise
int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
+ // remove a topic according to its name
+ // if the topic does not exists it is a no-op (considered success)
+ // return 0 on success, error code otherwise
+ int remove_topic_v2(const DoutPrefixProvider* dpp,
+ const std::string& name,
+ optional_yield y) const;
+ // create a topic with a name only
+ // if the topic already exists it is a no-op (considered success)
+ // return 0 on success, error code otherwise
+ int create_topic(const DoutPrefixProvider* dpp,
+ const rgw_pubsub_topic& topic,
+ optional_yield y) const;
};
namespace rgw::notify {
constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX;
// Used in case the topic is using the default global value for dumping in a formatter
constexpr static const std::string_view DEFAULT_CONFIG{"None"};
-}
\ No newline at end of file
+}
+
+bool do_all_zonegroups_support_notification_v2(
+ std::map<std::string, RGWZoneGroup> zonegroups);
#include "services/svc_zone.h"
#include "common/dout.h"
#include "rgw_url.h"
+#include "rgw_process_env.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
return -EINVAL;
}
- // Remove the args that are parsed, so the push_endpoint_args only contains
- // necessary one's.
opaque_data = s->info.args.get("OpaqueData");
- s->info.args.remove("OpaqueData");
dest.push_endpoint = s->info.args.get("push-endpoint");
- s->info.args.remove("push-endpoint");
s->info.args.get_bool("persistent", &dest.persistent, false);
- s->info.args.remove("persistent");
s->info.args.get_int("time_to_live", reinterpret_cast<int *>(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE);
- s->info.args.remove("time_to_live");
s->info.args.get_int("max_retries", reinterpret_cast<int *>(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE);
- s->info.args.remove("max_retries");
s->info.args.get_int("retry_sleep_duration", reinterpret_cast<int *>(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE);
- s->info.args.remove("retry_sleep_duration");
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
return -EINVAL;
if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) {
return -ERR_MALFORMED_DOC;
}
- s->info.args.remove("Policy");
+ // Remove the args that are parsed, so the push_endpoint_args only contains
+ // necessary one's which is parsed after this if. but only if master zone,
+ // else we do not remove as request is forwarded to master.
+ if (driver->is_meta_master()) {
+ s->info.args.remove("OpaqueData");
+ s->info.args.remove("push-endpoint");
+ s->info.args.remove("persistent");
+ s->info.args.remove("time_to_live");
+ s->info.args.remove("max_retries");
+ s->info.args.remove("retry_sleep_duration");
+ s->info.args.remove("Policy");
+ }
for (const auto& param : s->info.args.get_params()) {
if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
continue;
return ret;
}
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
rgw_pubsub_topic result;
ret = ps.get_topic(this, topic_name, result, y);
if (ret == -ENOENT) {
};
void RGWPSCreateTopicOp::execute(optional_yield y) {
+ // master request will replicate the topic creation.
+ bufferlist indata;
+ if (!driver->is_meta_master()) {
+ op_ret = rgw_forward_request_to_master(
+ this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1)
+ << "CreateTopic forward_request_to_master returned ret = " << op_ret
+ << dendl;
+ return;
+ }
+ }
if (!dest.push_endpoint.empty() && dest.persistent) {
op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
if (op_ret < 0) {
return;
}
}
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
s->owner.id, policy_text, y);
if (op_ret < 0) {
};
void RGWPSListTopicsOp::execute(optional_yield y) {
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
op_ret = ps.get_topics(this, result, y);
// if there are no topics it is not considered an error
op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {
return;
}
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
op_ret = ps.get_topic(this, topic_name, result, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
op_ret = ps.get_topic(this, topic_name, result, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return ret;
}
rgw_pubsub_topic result;
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
ret = ps.get_topic(this, topic_name, result, y);
if (ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
};
void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
+ if (!driver->is_meta_master()) {
+ bufferlist indata;
+ op_ret = rgw_forward_request_to_master(
+ this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1)
+ << "SetTopicAttributes forward_request_to_master returned ret = "
+ << op_ret << dendl;
+ return;
+ }
+ }
if (!dest.push_endpoint.empty() && dest.persistent) {
op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
if (op_ret < 0) {
return;
}
}
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
topic_owner, policy_text, y);
if (op_ret < 0) {
if (op_ret < 0) {
return;
}
- const RGWPubSub ps(driver, s->owner.id.tenant);
+ if (!driver->is_meta_master()) {
+ bufferlist indata;
+ op_ret = rgw_forward_request_to_master(
+ this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1)
+ << "DeleteTopic forward_request_to_master returned ret = " << op_ret
+ << dendl;
+ return;
+ }
+ }
+ const RGWPubSub ps(driver, s->owner.id.tenant,
+ &s->penv.site->get_period()->get_map().zonegroups);
+
rgw_pubsub_topic result;
op_ret = ps.get_topic(this, topic_name, result, y);
if (op_ret == 0) {
}
return false;
}
+bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_info& info) {
+ if (info.args.exists("Action")) {
+ const std::string action_name = info.args.get("Action");
+ return op_generators.contains(action_name);
+ }
+ return false;
+}
RGWOp *RGWHandler_REST_PSTopic_AWS::op_post()
{
}
}
}
- const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
- s->info.args.append("PayloadHash", payload_hash);
+ // PayloadHash is present if request is fwd from secondary site in multisite
+ // environment, so then do not calculate and append.
+ if (!s->info.args.exists("PayloadHash")) {
+ const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
+ s->info.args.append("PayloadHash", payload_hash);
+ }
}
RGWHandler_REST* RGWRESTMgr_S3::get_handler(rgw::sal::Driver* driver,
const char* exp_payload_hash = nullptr;
string payload_hash;
- if (is_non_s3_op) {
+ // if the request is related to topics (bucket notification), they are part of
+ // sns service and hence it's a no_s3_op,
+ if (is_non_s3_op || RGWHandler_REST_PSTopic_AWS::action_exists(info)) {
//For non s3 ops, we need to calculate the payload hash
payload_hash = info.args.get("PayloadHash");
exp_payload_hash = payload_hash.c_str();
struct rgw_pubsub_topics;
struct rgw_pubsub_bucket_topics;
class RGWZonePlacementInfo;
-
+struct rgw_pubsub_topic;
using RGWBucketListNameFilter = std::function<bool (const std::string&)>;
/** Remove the topic config, optionally a specific version */
virtual int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
optional_yield y,const DoutPrefixProvider *dpp) = 0;
+ /** Read the topic config entry into data and (optionally) objv_tracker */
+ virtual int read_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Write topic info and (optionally) @a objv_tracker into the config */
+ virtual int write_topic_v2(const rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Remove the topic config, optionally a specific version */
+ virtual int remove_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
virtual int list_zones(std::list<std::string>& zone_ids) = 0;
/** Clone a copy of this zonegroup. */
virtual std::unique_ptr<ZoneGroup> clone() = 0;
+ /** Determine if zonegroup |feature| is supported.*/
+ virtual bool supports_feature(std::string_view feature) const = 0;
};
/**
std::unique_ptr<RGWZoneGroup>zg = std::make_unique<RGWZoneGroup>(*group.get());
return std::make_unique<DBZoneGroup>(store, std::move(zg));
}
+ virtual bool supports_feature(std::string_view feature) const override {
+ return group->supports(feature);
+ }
};
class DBZone : public StoreZone {
std::unique_ptr<ZoneGroup> nzg = next->clone();
return std::make_unique<FilterZoneGroup>(std::move(nzg));
}
+ virtual bool supports_feature(std::string_view feature) const override {
+ return next->supports_feature(feature);
+ }
};
class FilterZone : public Zone {
optional_yield y, const DoutPrefixProvider *dpp) override {
return next->remove_topics(tenant, objv_tracker, y, dpp);
}
+ int read_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->read_topic_v2(topic_name, tenant, topic, objv_tracker, y, dpp);
+ }
+ int write_topic_v2(const rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->write_topic_v2(topic, objv_tracker, y, dpp);
+ }
+ int remove_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
+ }
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;}
int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;}
+ int read_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
+ }
+ int write_topic_v2(const rgw_pubsub_topic& topic,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -ENOENT;
+ }
+ int remove_topic_v2(const std::string& topic_name,
+ const std::string& tenant,
+ RGWObjVersionTracker* objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -ENOENT;
+ }
};
class StoreUser : public User {
JSONDecoder::decode_json("user_swift_pool", user_swift_pool, obj);
JSONDecoder::decode_json("user_uid_pool", user_uid_pool, obj);
JSONDecoder::decode_json("otp_pool", otp_pool, obj);
+ JSONDecoder::decode_json("notif_pool", notif_pool, obj);
+ JSONDecoder::decode_json("topics_pool", topics_pool, obj);
JSONDecoder::decode_json("system_key", system_key, obj);
JSONDecoder::decode_json("placement_pools", placement_pools, obj);
JSONDecoder::decode_json("tier_config", tier_config, obj);
JSONDecoder::decode_json("realm_id", realm_id, obj);
- JSONDecoder::decode_json("notif_pool", notif_pool, obj);
-
}
void RGWZoneParams::dump(Formatter *f) const
encode_json("user_swift_pool", user_swift_pool, f);
encode_json("user_uid_pool", user_uid_pool, f);
encode_json("otp_pool", otp_pool, f);
+ encode_json("notif_pool", notif_pool, f);
+ encode_json("topics_pool", topics_pool, f);
encode_json_plain("system_key", system_key, f);
encode_json("placement_pools", placement_pools, f);
encode_json("tier_config", tier_config, f);
encode_json("realm_id", realm_id, f);
- encode_json("notif_pool", notif_pool, f);
}
int RGWZoneParams::init(const DoutPrefixProvider *dpp,
pools.insert(info.reshard_pool);
pools.insert(info.oidc_pool);
pools.insert(info.notif_pool);
+ pools.insert(info.topics_pool);
for (const auto& [pname, placement] : info.placement_pools) {
pools.insert(placement.index_pool);
otp_pool = fix_zone_pool_dup(pools, name, ".rgw.otp", otp_pool);
oidc_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:oidc", oidc_pool);
notif_pool = fix_zone_pool_dup(pools, name ,".rgw.log:notif", notif_pool);
+ topics_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:topics", topics_pool);
for(auto& iter : placement_pools) {
iter.second.index_pool = fix_zone_pool_dup(pools, name, "." + default_bucket_index_pool_suffix,
info.otp_pool = fix_zone_pool_dup(pools, info.name, ".rgw.otp", info.otp_pool);
info.oidc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:oidc", info.oidc_pool);
info.notif_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:notif", info.notif_pool);
+ info.topics_pool =
+ fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool);
for (auto& [pname, placement] : info.placement_pools) {
placement.index_pool = fix_zone_pool_dup(pools, info.name, "." + default_bucket_index_pool_suffix, placement.index_pool);
// zone feature names
inline constexpr std::string_view resharding = "resharding";
inline constexpr std::string_view compress_encrypted = "compress-encrypted";
+inline constexpr std::string_view notification_v2 = "notification_v2";
// static list of features supported by this release
inline constexpr std::initializer_list<std::string_view> supported = {
- resharding,
- compress_encrypted,
+ resharding,
+ compress_encrypted,
+ notification_v2,
};
inline constexpr bool supports(std::string_view feature) {
// static list of features enabled by default on new zonegroups
inline constexpr std::initializer_list<std::string_view> enabled = {
- resharding,
+ resharding,
+ notification_v2,
};
--- /dev/null
+#include "svc_topic_rados.h"
+#include "rgw_notify.h"
+#include "rgw_tools.h"
+#include "rgw_zone.h"
+#include "svc_meta.h"
+#include "svc_meta_be_sobj.h"
+#include "svc_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static std::string topic_oid_prefix = "topic.";
+static constexpr char topic_tenant_delim[] = ":";
+
+std::string get_topic_key(const std::string& topic_name,
+ const std::string& tenant) {
+ if (tenant.empty()) {
+ return topic_name;
+ }
+ return tenant + topic_tenant_delim + topic_name;
+}
+
+void parse_topic_entry(const std::string& topic_entry,
+ std::string* tenant_name,
+ std::string* topic_name) {
+ // expected format: [tenant_name:]topic_name*
+ auto pos = topic_entry.find(topic_tenant_delim);
+ if (pos != std::string::npos) {
+ *tenant_name = topic_entry.substr(0, pos);
+ *topic_name = topic_entry.substr(pos + 1);
+ } else {
+ tenant_name->clear();
+ *topic_name = topic_entry;
+ }
+}
+class RGWSI_Topic_Module : public RGWSI_MBSObj_Handler_Module {
+ RGWSI_Topic_RADOS::Svc& svc;
+ const std::string prefix;
+
+ public:
+ RGWSI_Topic_Module(RGWSI_Topic_RADOS::Svc& _svc)
+ : RGWSI_MBSObj_Handler_Module("topic"),
+ svc(_svc),
+ prefix(topic_oid_prefix) {}
+
+ void get_pool_and_oid(const std::string& key,
+ rgw_pool* pool,
+ std::string* oid) override {
+ if (pool) {
+ *pool = svc.zone->get_zone_params().topics_pool;
+ }
+
+ if (oid) {
+ *oid = key_to_oid(key);
+ }
+ }
+
+ bool is_valid_oid(const std::string& oid) override {
+ return boost::algorithm::starts_with(oid, prefix);
+ }
+
+ std::string key_to_oid(const std::string& key) override {
+ return prefix + key;
+ }
+
+ // This is called after `is_valid_oid` and is assumed to be a valid oid
+ std::string oid_to_key(const std::string& oid) override {
+ return oid.substr(prefix.size());
+ }
+
+ const std::string& get_oid_prefix() { return prefix; }
+};
+
+RGWSI_MetaBackend_Handler* RGWSI_Topic_RADOS::get_be_handler() {
+ return be_handler;
+}
+
+void RGWSI_Topic_RADOS::init(RGWSI_Zone* _zone_svc,
+ RGWSI_Meta* _meta_svc,
+ RGWSI_MetaBackend* _meta_be_svc,
+ RGWSI_SysObj* _sysobj_svc) {
+ svc.zone = _zone_svc;
+ svc.meta = _meta_svc;
+ svc.meta_be = _meta_be_svc;
+ svc.sysobj = _sysobj_svc;
+}
+
+int RGWSI_Topic_RADOS::do_start(optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ int r = svc.meta->create_be_handler(RGWSI_MetaBackend::Type::MDBE_SOBJ,
+ &be_handler);
+ if (r < 0) {
+ ldout(ctx(), 0) << "ERROR: failed to create be_handler for Topics: r=" << r
+ << dendl;
+ return r;
+ }
+
+ auto module = new RGWSI_Topic_Module(svc);
+ RGWSI_MetaBackend_Handler_SObj* bh =
+ static_cast<RGWSI_MetaBackend_Handler_SObj*>(be_handler);
+ be_module.reset(module);
+ bh->set_module(module);
+ return 0;
+}
+
+RGWTopicMetadataHandler::RGWTopicMetadataHandler(rgw::sal::Driver* driver,
+ RGWSI_Topic_RADOS* topic_svc) {
+ this->driver = driver;
+ this->topic_svc = topic_svc;
+ base_init(topic_svc->ctx(), topic_svc->get_be_handler());
+}
+
+RGWMetadataObject* RGWTopicMetadataHandler::get_meta_obj(
+ JSONObj* jo, const obj_version& objv, const ceph::real_time& mtime) {
+ rgw_pubsub_topic topic;
+ try {
+ topic.decode_json(jo);
+ } catch (JSONDecoder::err& e) {
+ return nullptr;
+ }
+
+ return new RGWTopicMetadataObject(topic, objv, mtime, driver);
+}
+
+int RGWTopicMetadataHandler::do_get(RGWSI_MetaBackend_Handler::Op* op,
+ std::string& entry, RGWMetadataObject** obj,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ rgw_pubsub_topic result;
+ std::string topic_name;
+ std::string tenant;
+ parse_topic_entry(entry, &tenant, &topic_name);
+ RGWPubSub ps(driver, tenant,
+ &topic_svc->svc.zone->get_current_period().get_map().zonegroups);
+ int ret = ps.get_topic(dpp, topic_name, result, y);
+ if (ret < 0) {
+ return ret;
+ }
+ ceph::real_time mtime;
+ obj_version ver;
+ RGWTopicMetadataObject* rdo =
+ new RGWTopicMetadataObject(result, ver, mtime, driver);
+ *obj = rdo;
+ return 0;
+}
+
+int RGWTopicMetadataHandler::do_remove(RGWSI_MetaBackend_Handler::Op* op,
+ std::string& entry,
+ RGWObjVersionTracker& objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ auto ret = rgw::notify::remove_persistent_topic(entry, y);
+ if (ret != -ENOENT && ret < 0) {
+ return ret;
+ }
+ std::string topic_name;
+ std::string tenant;
+ parse_topic_entry(entry, &tenant, &topic_name);
+ RGWPubSub ps(driver, tenant,
+ &topic_svc->svc.zone->get_current_period().get_map().zonegroups);
+ return ps.remove_topic(dpp, topic_name, y);
+}
+
+class RGWMetadataHandlerPut_Topic : public RGWMetadataHandlerPut_SObj {
+ RGWTopicMetadataHandler* rhandler;
+ RGWTopicMetadataObject* mdo;
+
+ public:
+ RGWMetadataHandlerPut_Topic(RGWTopicMetadataHandler* handler,
+ RGWSI_MetaBackend_Handler::Op* op,
+ std::string& entry, RGWMetadataObject* obj,
+ RGWObjVersionTracker& objv_tracker,
+ optional_yield y, RGWMDLogSyncType type,
+ bool from_remote_zone)
+ : RGWMetadataHandlerPut_SObj(handler, op, entry, obj, objv_tracker, y,
+ type, from_remote_zone),
+ rhandler(handler) {
+ mdo = static_cast<RGWTopicMetadataObject*>(obj);
+ }
+
+ int put_checked(const DoutPrefixProvider* dpp) override {
+ auto& topic = mdo->get_topic_info();
+ auto* driver = mdo->get_driver();
+ auto ret = rgw::notify::add_persistent_topic(entry, y);
+ if (ret < 0) {
+ return ret;
+ }
+ RGWObjVersionTracker objv_tracker;
+ ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
+ << dendl;
+ }
+ return ret;
+ }
+};
+
+int RGWTopicMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op* op,
+ std::string& entry, RGWMetadataObject* obj,
+ RGWObjVersionTracker& objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider* dpp,
+ RGWMDLogSyncType type,
+ bool from_remote_zone) {
+ RGWMetadataHandlerPut_Topic put_op(this, op, entry, obj, objv_tracker, y,
+ type, from_remote_zone);
+ return do_put_operate(&put_op, dpp);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "rgw_pubsub.h"
+#include "rgw_service.h"
+#include "svc_meta_be.h"
+
+class RGWSI_Topic_RADOS : public RGWServiceInstance {
+ public:
+ struct Svc {
+ RGWSI_Zone* zone{nullptr};
+ RGWSI_Meta* meta{nullptr};
+ RGWSI_MetaBackend* meta_be{nullptr};
+ RGWSI_SysObj* sysobj{nullptr};
+ } svc;
+
+ RGWSI_Topic_RADOS(CephContext* cct) : RGWServiceInstance(cct) {}
+ ~RGWSI_Topic_RADOS() {}
+
+ void init(RGWSI_Zone* _zone_svc,
+ RGWSI_Meta* _meta_svc,
+ RGWSI_MetaBackend* _meta_be_svc,
+ RGWSI_SysObj* _sysobj_svc);
+
+ RGWSI_MetaBackend_Handler* get_be_handler();
+ int do_start(optional_yield y, const DoutPrefixProvider* dpp) override;
+
+ private:
+ RGWSI_MetaBackend_Handler* be_handler;
+ std::unique_ptr<RGWSI_MetaBackend::Module> be_module;
+};
+
+class RGWTopicMetadataObject : public RGWMetadataObject {
+ rgw_pubsub_topic topic;
+ rgw::sal::Driver* driver;
+
+ public:
+ RGWTopicMetadataObject() = default;
+ RGWTopicMetadataObject(rgw_pubsub_topic& topic, const obj_version& v,
+ real_time m, rgw::sal::Driver* driver)
+ : RGWMetadataObject(v, m), topic(topic), driver(driver) {}
+
+ void dump(Formatter* f) const override { topic.dump(f); }
+
+ rgw_pubsub_topic& get_topic_info() { return topic; }
+
+ rgw::sal::Driver* get_driver() { return driver; }
+};
+class RGWTopicMetadataHandler : public RGWMetadataHandler_GenericMetaBE {
+ public:
+ RGWTopicMetadataHandler(rgw::sal::Driver* driver,
+ RGWSI_Topic_RADOS* role_svc);
+
+ std::string get_type() final { return "topic"; }
+
+ RGWMetadataObject* get_meta_obj(JSONObj* jo, const obj_version& objv,
+ const ceph::real_time& mtime);
+
+ int do_get(RGWSI_MetaBackend_Handler::Op* op, std::string& entry,
+ RGWMetadataObject** obj, optional_yield y,
+ const DoutPrefixProvider* dpp) final;
+
+ int do_remove(RGWSI_MetaBackend_Handler::Op* op, std::string& entry,
+ RGWObjVersionTracker& objv_tracker, optional_yield y,
+ const DoutPrefixProvider* dpp) final;
+
+ int do_put(RGWSI_MetaBackend_Handler::Op* op, std::string& entr,
+ RGWMetadataObject* obj, RGWObjVersionTracker& objv_tracker,
+ optional_yield y, const DoutPrefixProvider* dpp,
+ RGWMDLogSyncType type, bool from_remote_zone) override;
+
+ private:
+ rgw::sal::Driver* driver;
+ RGWSI_Topic_RADOS* topic_svc;
+};
+
+std::string get_topic_key(const std::string& topic_name,
+ const std::string& tenant);
+
+void parse_topic_entry(const std::string& topic_entry,
+ std::string* tenant_name,
+ std::string* topic_name);