From 6d1d036afb2d1624674fef43f2e70ef3b3ae2859 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 10 Jan 2024 15:33:25 -0500 Subject: [PATCH] rgw/topic: add rgwrados::topic interface for topic metadata add a new interface for topic metadata that doesn't depend on metadata backends. this low-level interface is used by both RadosStore and the topic metadata handler remove Driver::delete_bucket_topic_mapping() from sal because the omap object is deleted internally by rgwrados::topic::remove() remove the RGWRados::topics_pool_ctx member Signed-off-by: Casey Bodley --- src/rgw/CMakeLists.txt | 3 +- src/rgw/driver/rados/rgw_rados.cc | 10 - src/rgw/driver/rados/rgw_rados.h | 4 - src/rgw/driver/rados/rgw_sal_rados.cc | 136 +++------ src/rgw/driver/rados/rgw_sal_rados.h | 9 +- src/rgw/driver/rados/rgw_service.cc | 9 + src/rgw/driver/rados/rgw_service.h | 9 + src/rgw/driver/rados/topic.cc | 383 ++++++++++++++++++++++++++ src/rgw/driver/rados/topic.h | 94 +++++++ src/rgw/rgw_pubsub.cc | 7 +- src/rgw/rgw_sal.h | 12 +- src/rgw/rgw_sal_filter.h | 13 +- src/rgw/rgw_sal_store.h | 11 +- 13 files changed, 554 insertions(+), 146 deletions(-) create mode 100644 src/rgw/driver/rados/topic.cc create mode 100644 src/rgw/driver/rados/topic.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 2987b70b382..031fc47bfc2 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -194,7 +194,8 @@ set(librgw_common_srcs driver/rados/rgw_trim_mdlog.cc driver/rados/rgw_user.cc driver/rados/rgw_zone.cc - driver/rados/sync_fairness.cc) + driver/rados/sync_fairness.cc + driver/rados/topic.cc) list(APPEND librgw_common_srcs driver/immutable_config/store.cc diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 6084c21ef8e..1d8b13b9297 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1211,10 +1211,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) if (ret < 0) return ret; - ret = open_topics_pool_ctx(dpp); - if (ret < 0) - return ret; - pools_initialized = true; if (use_gc) { @@ -1446,12 +1442,6 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp) return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true); } -int RGWRados::open_topics_pool_ctx(const DoutPrefixProvider* dpp) { - return rgw_init_ioctx(dpp, get_rados_handle(), - svc.zone->get_zone_params().topics_pool, - topics_pool_ctx, true, true); -} - int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx, bool mostly_omap, bool bulk) { diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index f43b1b4e531..3d7776b0fa0 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -359,7 +359,6 @@ class RGWRados int open_objexp_pool_ctx(const DoutPrefixProvider *dpp); int open_reshard_pool_ctx(const DoutPrefixProvider *dpp); int open_notif_pool_ctx(const DoutPrefixProvider *dpp); - int open_topics_pool_ctx(const DoutPrefixProvider* dpp); int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx, bool mostly_omap, bool bulk); @@ -449,7 +448,6 @@ protected: librados::IoCtx objexp_pool_ctx; librados::IoCtx reshard_pool_ctx; librados::IoCtx notif_pool_ctx; // .rgw.notif - librados::IoCtx topics_pool_ctx; // .rgw.meta:topics bool pools_initialized{false}; @@ -535,8 +533,6 @@ public: librados::IoCtx& get_notif_pool_ctx() { return notif_pool_ctx; } - - librados::IoCtx& get_topics_pool_ctx() { return topics_pool_ctx; } void set_context(CephContext *_cct) { cct = _cct; diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 0e6cbf96aea..dcec859cc49 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -66,6 +66,7 @@ #include "cls/rgw/cls_rgw_client.h" #include "rgw_pubsub.h" +#include "topic.h" #define dout_subsys ceph_subsys_rgw @@ -1141,59 +1142,35 @@ int RadosStore::read_topic_v2(const std::string& topic_name, rgw_pubsub_topic& topic, RGWObjVersionTracker* objv_tracker, optional_yield y, - const DoutPrefixProvider* dpp) { - bufferlist bl; - auto mtime = ceph::real_clock::zero(); - RGWSI_MBSObj_GetParams params(&bl, nullptr, &mtime); - std::unique_ptr ctx( - svc()->topic->svc.meta_be->alloc_ctx()); - ctx->init(svc()->topic->get_be_handler()); - const int ret = svc()->topic->svc.meta_be->get( - ctx.get(), get_topic_metadata_key(tenant, topic_name), - params, objv_tracker, y, dpp); - if (ret < 0) { - return ret; - } - - auto iter = bl.cbegin(); - try { - decode(topic, iter); - } catch (buffer::error& err) { - ldpp_dout(dpp, 20) << " failed to decode topic: " << topic_name - << ". error: " << err.what() << dendl; - return -EIO; - } - return 0; + const DoutPrefixProvider* dpp) +{ + const RGWZoneParams& zone = svc()->zone->get_zone_params(); + const std::string key = get_topic_metadata_key(tenant, topic_name); + return rgwrados::topic::read(dpp, y, *svc()->sysobj, svc()->cache, + zone, key, topic, *ctl()->meta.topic_cache, + nullptr, objv_tracker); } -int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic, - RGWObjVersionTracker* objv_tracker, +int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive, + RGWObjVersionTracker& objv_tracker, optional_yield y, - const DoutPrefixProvider* dpp) { - bufferlist bl; - encode(topic, bl); - RGWSI_MBSObj_PutParams params(bl, nullptr, ceph::real_clock::zero(), - /*exclusive*/ false); - std::unique_ptr ctx( - svc()->topic->svc.meta_be->alloc_ctx()); - ctx->init(svc()->topic->get_be_handler()); - return svc()->topic->svc.meta_be->put( - ctx.get(), get_topic_metadata_key(topic.user.tenant, topic.name), - params, objv_tracker, y, dpp); + const DoutPrefixProvider* dpp) +{ + const RGWZoneParams& zone = svc()->zone->get_zone_params(); + return rgwrados::topic::write(dpp, y, *svc()->sysobj, svc()->mdlog, zone, + topic, objv_tracker, {}, exclusive); } int RadosStore::remove_topic_v2(const std::string& topic_name, const std::string& tenant, - RGWObjVersionTracker* objv_tracker, + RGWObjVersionTracker& objv_tracker, optional_yield y, - const DoutPrefixProvider* dpp) { - RGWSI_MBSObj_RemoveParams params; - std::unique_ptr ctx( - svc()->topic->svc.meta_be->alloc_ctx()); - ctx->init(svc()->topic->get_be_handler()); - return svc()->topic->svc.meta_be->remove(ctx.get(), - get_topic_metadata_key(tenant, topic_name), - params, objv_tracker, y, dpp); + const DoutPrefixProvider* dpp) +{ + const RGWZoneParams& zone = svc()->zone->get_zone_params(); + const std::string key = get_topic_metadata_key(tenant, topic_name); + return rgwrados::topic::remove(dpp, y, *svc()->sysobj, svc()->mdlog, + zone, key, objv_tracker); } int RadosStore::remove_bucket_mapping_from_topics( @@ -1223,18 +1200,15 @@ int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic, bool add_mapping, optional_yield y, const DoutPrefixProvider* dpp) { - bufferlist empty_bl; - librados::ObjectWriteOperation op; + librados::Rados& rados = *getRados()->get_rados_handle(); + const RGWZoneParams& zone = svc()->zone->get_zone_params(); + const std::string key = get_topic_metadata_key(topic.user.tenant, topic.name); int ret = 0; if (add_mapping) { - std::map mapping{{bucket_key, empty_bl}}; - op.omap_set(mapping); + ret = rgwrados::topic::link_bucket(dpp, y, rados, zone, key, bucket_key); } else { - std::set to_rm{{bucket_key}}; - op.omap_rm_keys(to_rm); + ret = rgwrados::topic::unlink_bucket(dpp, y, rados, zone, key, bucket_key); } - ret = rgw_rados_operate(dpp, rados->get_topics_pool_ctx(), - get_bucket_topic_mapping_oid(topic), &op, y); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to " << (add_mapping ? "add" : "remove") << " topic bucket mapping for bucket: " << bucket_key @@ -1250,57 +1224,25 @@ int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic, int RadosStore::get_bucket_topic_mapping(const rgw_pubsub_topic& topic, std::set& bucket_keys, optional_yield y, - const DoutPrefixProvider* dpp) { - constexpr auto max_chunk = 1024U; - std::string start_after; - bool more = true; - int rval; - while (more) { - librados::ObjectReadOperation op; - std::set curr_keys; - op.omap_get_keys2(start_after, max_chunk, &curr_keys, &more, &rval); - const auto ret = - rgw_rados_operate(dpp, rados->get_topics_pool_ctx(), - get_bucket_topic_mapping_oid(topic), &op, nullptr, y); - if (ret == -ENOENT) { - // mapping object was not created - nothing to do - return 0; - } + const DoutPrefixProvider* dpp) +{ + librados::Rados& rados = *getRados()->get_rados_handle(); + const RGWZoneParams& zone = svc()->zone->get_zone_params(); + const std::string key = get_topic_metadata_key(topic.user.tenant, topic.name); + constexpr int max_chunk = 1024; + std::string marker; + + do { + int ret = rgwrados::topic::list_buckets(dpp, y, rados, zone, key, marker, + max_chunk, bucket_keys, marker); if (ret < 0) { - // TODO: do we need to check on rval as well as ret? ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topic mapping object for topic: " << topic.name << ", ret= " << ret << dendl; return ret; } - if (more) { - if (curr_keys.empty()) { - return -EINVAL; // something wrong. - } - start_after = *curr_keys.rbegin(); - } - bucket_keys.merge(curr_keys); - } - return 0; -} + } while (!marker.empty()); -int RadosStore::delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, - optional_yield y, - const DoutPrefixProvider* dpp) { - librados::ObjectWriteOperation op; - op.remove(); - const int ret = - rgw_rados_operate(dpp, rados->get_topics_pool_ctx(), - get_bucket_topic_mapping_oid(topic), &op, y); - if (ret < 0 && ret != -ENOENT) { - ldpp_dout(dpp, 1) - << "ERROR: failed removing bucket topic mapping omap for topic: " - << topic.name << ", ret=" << ret << dendl; - return ret; - } - ldpp_dout(dpp, 20) - << "Successfully deleted topic bucket mapping omap for topic: " - << topic.name << dendl; return 0; } diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index e15c754d1b4..1eccb89dad3 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -170,13 +170,13 @@ class RadosStore : public StoreDriver { RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override; - int write_topic_v2(const rgw_pubsub_topic& topic, - RGWObjVersionTracker* objv_tracker, + int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override; int remove_topic_v2(const std::string& topic_name, const std::string& tenant, - RGWObjVersionTracker* objv_tracker, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override; int update_bucket_topic_mapping(const rgw_pubsub_topic& topic, @@ -193,9 +193,6 @@ class RadosStore : public StoreDriver { std::set& bucket_keys, optional_yield y, const DoutPrefixProvider* dpp) override; - int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, - optional_yield y, - const DoutPrefixProvider* dpp) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } diff --git a/src/rgw/driver/rados/rgw_service.cc b/src/rgw/driver/rados/rgw_service.cc index 0c0e2bbea65..b264a7c2e77 100644 --- a/src/rgw/driver/rados/rgw_service.cc +++ b/src/rgw/driver/rados/rgw_service.cc @@ -39,6 +39,7 @@ #include "rgw_user.h" #include "rgw_role.h" #include "rgw_pubsub.h" +#include "topic.h" #define dout_subsys ceph_subsys_rgw @@ -416,6 +417,13 @@ int RGWCtlDef::init(RGWServices& svc, rgw::sal::Driver* driver, const DoutPrefix bucket_meta_handler->init(svc.bucket, bucket.get()); bi_meta_handler->init(svc.zone, svc.bucket, svc.bi); + meta.topic_cache = std::make_unique>(); + meta.topic_cache->init(svc.cache); + + meta.topic = rgwrados::topic::create_metadata_handler( + *svc.sysobj, svc.cache, *svc.mdlog, svc.zone->get_zone_params(), + *meta.topic_cache); + RGWOTPMetadataHandlerBase *otp_handler = static_cast(meta.otp.get()); otp_handler->init(svc.zone, svc.meta_be_otp, svc.otp); @@ -449,6 +457,7 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, const DoutPrefixPr meta.otp = _ctl.meta.otp.get(); meta.role = _ctl.meta.role.get(); meta.topic = _ctl.meta.topic.get(); + meta.topic_cache = _ctl.meta.topic_cache.get(); user = _ctl.user.get(); bucket = _ctl.bucket.get(); diff --git a/src/rgw/driver/rados/rgw_service.h b/src/rgw/driver/rados/rgw_service.h index 03b37f1ad5a..617516b8a02 100644 --- a/src/rgw/driver/rados/rgw_service.h +++ b/src/rgw/driver/rados/rgw_service.h @@ -17,6 +17,8 @@ class RadosStore; struct RGWServices_Def; +namespace rgwrados::topic { struct cache_entry; } + class RGWServiceInstance { friend struct RGWServices_Def; @@ -186,6 +188,9 @@ class RGWUserCtl; class RGWBucketCtl; class RGWOTPCtl; +template +class RGWChainedCacheImpl; + struct RGWCtlDef { struct _meta { std::unique_ptr mgr; @@ -196,6 +201,8 @@ struct RGWCtlDef { std::unique_ptr role; std::unique_ptr topic; + std::unique_ptr> topic_cache; + _meta(); ~_meta(); } meta; @@ -225,6 +232,8 @@ struct RGWCtl { RGWMetadataHandler *otp{nullptr}; RGWMetadataHandler *role{nullptr}; RGWMetadataHandler* topic{nullptr}; + + RGWChainedCacheImpl* topic_cache{nullptr}; } meta; RGWUserCtl *user{nullptr}; diff --git a/src/rgw/driver/rados/topic.cc b/src/rgw/driver/rados/topic.cc new file mode 100644 index 00000000000..3e409e0b07b --- /dev/null +++ b/src/rgw/driver/rados/topic.cc @@ -0,0 +1,383 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "topic.h" +#include "common/errno.h" +#include "rgw_common.h" +#include "rgw_metadata.h" +#include "rgw_metadata_lister.h" +#include "rgw_pubsub.h" +#include "rgw_rados.h" +#include "rgw_string.h" +#include "rgw_tools.h" +#include "rgw_zone.h" +#include "svc_mdlog.h" +#include "svc_sys_obj_cache.h" + +namespace rgwrados::topic { + +static const std::string oid_prefix = "topic."; +static constexpr std::string_view buckets_oid_prefix = "buckets."; + +static rgw_raw_obj get_topic_obj(const RGWZoneParams& zone, + std::string_view metadata_key) +{ + std::string oid = string_cat_reserve(oid_prefix, metadata_key); + return {zone.topics_pool, std::move(oid)}; +} + +static rgw_raw_obj get_buckets_obj(const RGWZoneParams& zone, + std::string_view metadata_key) +{ + std::string oid = string_cat_reserve(buckets_oid_prefix, metadata_key); + return {zone.topics_pool, std::move(oid)}; +} + + +int read(const DoutPrefixProvider* dpp, optional_yield y, + RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc, + const RGWZoneParams& zone, const std::string& topic_key, + rgw_pubsub_topic& info, RGWChainedCacheImpl& cache, + ceph::real_time* pmtime, RGWObjVersionTracker* pobjv) +{ + if (auto e = cache.find(topic_key)) { + if (pmtime) { + *pmtime = e->mtime; + } + if (pobjv) { + *pobjv = std::move(e->objv); + } + info = std::move(e->info); + return 0; + } + + const rgw_raw_obj obj = get_topic_obj(zone, topic_key); + + bufferlist bl; + cache_entry entry; + rgw_cache_entry_info cache_info; + int r = rgw_get_system_obj(&sysobj, obj.pool, obj.oid, bl, &entry.objv, + &entry.mtime, y, dpp, nullptr, &cache_info); + if (r < 0) { + return r; + } + + try { + auto p = bl.cbegin(); + decode(entry.info, p); + } catch (const buffer::error&) { + return -EIO; + } + + cache.put(dpp, cache_svc, topic_key, &entry, {&cache_info}); + + if (pmtime) { + *pmtime = entry.mtime; + } + if (pobjv) { + *pobjv = std::move(entry.objv); + } + info = std::move(entry.info); + return 0; +} + +int write(const DoutPrefixProvider* dpp, optional_yield y, + RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone, + const rgw_pubsub_topic& info, RGWObjVersionTracker& objv, + ceph::real_time mtime, bool exclusive) +{ + const std::string topic_key = get_topic_metadata_key(info.user.tenant, info.name); + const rgw_raw_obj obj = get_topic_obj(zone, topic_key); + + bufferlist bl; + encode(info, bl); + + int r = rgw_put_system_obj(dpp, &sysobj, obj.pool, obj.oid, + bl, exclusive, &objv, mtime, y); + if (r < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to write topic obj " << obj.oid + << " with: " << cpp_strerror(r) << dendl; + return r; + } + + // record in the mdlog on success + if (mdlog) { + return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv); + } + return 0; +} + +int remove(const DoutPrefixProvider* dpp, optional_yield y, + RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone, + const std::string& topic_key, RGWObjVersionTracker& objv) +{ + // delete topic info + const rgw_raw_obj topic = get_topic_obj(zone, topic_key); + int r = rgw_delete_system_obj(dpp, &sysobj, topic.pool, topic.oid, &objv, y); + if (r < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to remove topic obj " + << topic.oid << " with: " << cpp_strerror(r) << dendl; + return r; + } + + // delete the buckets object + const rgw_raw_obj buckets = get_buckets_obj(zone, topic_key); + r = rgw_delete_system_obj(dpp, &sysobj, buckets.pool, + buckets.oid, nullptr, y); + if (r < 0) { + ldpp_dout(dpp, 20) << "WARNING: failed to remove topic buckets obj " + << buckets.oid << " with: " << cpp_strerror(r) << dendl; + } // not fatal + + // record in the mdlog on success + if (mdlog) { + return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv); + } + return 0; +} + + +int link_bucket(const DoutPrefixProvider* dpp, optional_yield y, + librados::Rados& rados, const RGWZoneParams& zone, + const std::string& topic_key, + const std::string& bucket_key) +{ + const rgw_raw_obj obj = get_buckets_obj(zone, topic_key); + + rgw_rados_ref ref; + int r = rgw_get_rados_ref(dpp, &rados, obj, &ref); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation op; + op.omap_set({{bucket_key, bufferlist{}}}); + + return rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, y); +} + +int unlink_bucket(const DoutPrefixProvider* dpp, optional_yield y, + librados::Rados& rados, const RGWZoneParams& zone, + const std::string& topic_key, + const std::string& bucket_key) +{ + const rgw_raw_obj obj = get_buckets_obj(zone, topic_key); + + rgw_rados_ref ref; + int r = rgw_get_rados_ref(dpp, &rados, obj, &ref); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation op; + op.omap_rm_keys({{bucket_key}}); + + return rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, y); +} + +int list_buckets(const DoutPrefixProvider* dpp, optional_yield y, + librados::Rados& rados, const RGWZoneParams& zone, + const std::string& topic_key, + const std::string& marker, int max_items, + std::set& bucket_keys, + std::string& next_marker) +{ + const rgw_raw_obj obj = get_buckets_obj(zone, topic_key); + + rgw_rados_ref ref; + int r = rgw_get_rados_ref(dpp, &rados, obj, &ref); + if (r < 0) { + return r; + } + + librados::ObjectReadOperation op; + std::set keys; + bool more = false; + int rval = 0; + op.omap_get_keys2(marker, max_items, &keys, &more, &rval); + r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, nullptr, y); + if (r == -ENOENT) { + return 0; + } + if (r < 0) { + return r; + } + if (rval < 0) { + return rval; + } + + if (more && !keys.empty()) { + next_marker = *keys.rbegin(); + } else { + next_marker.clear(); + } + bucket_keys.merge(std::move(keys)); + + return 0; +} + + +class MetadataObject : public RGWMetadataObject { + rgw_pubsub_topic info; +public: + MetadataObject(const rgw_pubsub_topic& info, const obj_version& v, real_time m) + : RGWMetadataObject(v, m), info(info) {} + + void dump(Formatter *f) const override { + info.dump(f); + } + + rgw_pubsub_topic& get_topic_info() { + return info; + } +}; + +class MetadataLister : public RGWMetadataLister { + public: + using RGWMetadataLister::RGWMetadataLister; + + virtual void filter_transform(std::vector& oids, + std::list& keys) { + // remove the oid prefix from keys + constexpr auto trim = [] (const std::string& oid) { + return oid.substr(oid_prefix.size()); + }; + std::transform(oids.begin(), oids.end(), + std::back_inserter(keys), + trim); + } +}; + +class MetadataHandler : public RGWMetadataHandler { + RGWSI_SysObj& sysobj; + RGWSI_SysObj_Cache* cache_svc; + RGWSI_MDLog& mdlog; + const RGWZoneParams& zone; + RGWChainedCacheImpl& cache; + public: + MetadataHandler(RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc, + RGWSI_MDLog& mdlog, const RGWZoneParams& zone, + RGWChainedCacheImpl& cache) + : sysobj(sysobj), cache_svc(cache_svc), mdlog(mdlog), + zone(zone), cache(cache) + {} + + std::string get_type() final { return "topic"; } + + RGWMetadataObject* get_meta_obj(JSONObj *jo, + const obj_version& objv, + const ceph::real_time& mtime) override + { + rgw_pubsub_topic info; + + try { + info.decode_json(jo); + } catch (JSONDecoder:: err& e) { + return nullptr; + } + + return new MetadataObject(info, objv, mtime); + } + + int get(std::string& entry, RGWMetadataObject** obj, + optional_yield y, const DoutPrefixProvider* dpp) override + { + cache_entry e; + int ret = read(dpp, y, sysobj, cache_svc, zone, entry, + e.info, cache, &e.mtime, &e.objv); + if (ret < 0) { + return ret; + } + + *obj = new MetadataObject(e.info, e.objv.read_version, e.mtime); + return 0; + } + + int put(std::string& entry, RGWMetadataObject* obj, + RGWObjVersionTracker& objv_tracker, + optional_yield y, const DoutPrefixProvider* dpp, + RGWMDLogSyncType type, bool from_remote_zone) override + { + auto robj = static_cast(obj); + auto& info = robj->get_topic_info(); + auto mtime = robj->get_mtime(); + + constexpr bool exclusive = false; + int ret = write(dpp, y, sysobj, &mdlog, zone, info, + objv_tracker, mtime, exclusive); + return ret < 0 ? ret : STATUS_APPLIED; + } + + int remove(std::string& entry, RGWObjVersionTracker& objv_tracker, + optional_yield y, const DoutPrefixProvider *dpp) override + { + return topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker); + } + + int mutate(const std::string& entry, const ceph::real_time& mtime, + RGWObjVersionTracker* objv_tracker, optional_yield y, + const DoutPrefixProvider* dpp, RGWMDLogStatus op_type, + std::function f) override + { + return -ENOTSUP; // unused + } + + int list_keys_init(const DoutPrefixProvider* dpp, + const std::string& marker, + void** phandle) override + { + const auto& pool = zone.topics_pool; + auto lister = std::make_unique(sysobj.get_pool(pool)); + int ret = lister->init(dpp, marker, oid_prefix); + if (ret < 0) { + return ret; + } + *phandle = lister.release(); // release ownership + return 0; + } + + int list_keys_next(const DoutPrefixProvider* dpp, + void* handle, int max, + std::list& keys, + bool* truncated) override + { + auto lister = static_cast(handle); + return lister->get_next(dpp, max, keys, truncated); + } + + void list_keys_complete(void *handle) override + { + delete static_cast(handle); + } + + std::string get_marker(void *handle) override + { + auto lister = static_cast(handle); + return lister->get_marker(); + } +}; + + +auto create_metadata_handler(RGWSI_SysObj& sysobj, + RGWSI_SysObj_Cache* cache_svc, + RGWSI_MDLog& mdlog, const RGWZoneParams& zone, + RGWChainedCacheImpl& cache) + -> std::unique_ptr +{ + return std::make_unique(sysobj, cache_svc, mdlog, + zone, cache); +} + +} // rgwrados::topic diff --git a/src/rgw/driver/rados/topic.h b/src/rgw/driver/rados/topic.h new file mode 100644 index 00000000000..3799d001ec7 --- /dev/null +++ b/src/rgw/driver/rados/topic.h @@ -0,0 +1,94 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include "include/rados/librados_fwd.hpp" +#include "common/ceph_time.h" +#include "rgw_pubsub.h" + +class DoutPrefixProvider; +class optional_yield; +class RGWMetadataHandler; +class RGWObjVersionTracker; +class RGWSI_MDLog; +class RGWSI_SysObj; +class RGWSI_SysObj_Cache; +class RGWZoneParams; + +template class RGWChainedCacheImpl; + +// Rados interface for v2 topic metadata +namespace rgwrados::topic { + +struct cache_entry { + rgw_pubsub_topic info; + RGWObjVersionTracker objv; + ceph::real_time mtime; +}; + +/// Read topic info by metadata key. +int read(const DoutPrefixProvider* dpp, optional_yield y, + RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc, + const RGWZoneParams& zone, const std::string& topic_key, + rgw_pubsub_topic& info, RGWChainedCacheImpl& cache, + ceph::real_time* pmtime = nullptr, + RGWObjVersionTracker* pobjv = nullptr); + +/// Write or overwrite topic info. +int write(const DoutPrefixProvider* dpp, optional_yield y, + RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone, + const rgw_pubsub_topic& info, RGWObjVersionTracker& objv, + ceph::real_time mtime, bool exclusive); + +/// Remove a topic by metadata key. +int remove(const DoutPrefixProvider* dpp, optional_yield y, + RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, + const RGWZoneParams& zone, const std::string& topic_key, + RGWObjVersionTracker& objv); + + +/// Add a bucket key to the topic's list of buckets. +int link_bucket(const DoutPrefixProvider* dpp, optional_yield y, + librados::Rados& rados, const RGWZoneParams& zone, + const std::string& topic_key, + const std::string& bucket_key); + +/// Remove a bucket key from the topic's list of buckets. +int unlink_bucket(const DoutPrefixProvider* dpp, optional_yield y, + librados::Rados& rados, const RGWZoneParams& zone, + const std::string& topic_key, + const std::string& bucket_key); + +/// List the bucket keys associated with a given topic. +int list_buckets(const DoutPrefixProvider* dpp, optional_yield y, + librados::Rados& rados, const RGWZoneParams& zone, + const std::string& topic_key, + const std::string& marker, int max_items, + std::set& bucket_keys, + std::string& next_marker); + + +/// Topic metadata handler factory. +auto create_metadata_handler(RGWSI_SysObj& sysobj, + RGWSI_SysObj_Cache* cache_svc, + RGWSI_MDLog& mdlog, const RGWZoneParams& zone, + RGWChainedCacheImpl& cache) + -> std::unique_ptr; + +} // rgwrados::topic diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 474a7c23163..18c604978e8 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -945,7 +945,9 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, const rgw_pubsub_topic& topic, optional_yield y) const { RGWObjVersionTracker objv_tracker; - auto ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp); + objv_tracker.generate_new_write_ver(dpp->get_cct()); + constexpr bool exclusive = false; + auto ret = driver->write_topic_v2(topic, exclusive, objv_tracker, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret << dendl; @@ -1012,13 +1014,12 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp, << dendl; return 0; } - ret = driver->remove_topic_v2(name, tenant, &objv_tracker, y, dpp); + ret = driver->remove_topic_v2(name, tenant, objv_tracker, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret << dendl; return ret; } - ret = driver->delete_bucket_topic_mapping(topic, y, dpp); return ret; } diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index f1bc455835f..060cfc1e351 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -321,15 +321,15 @@ class Driver { RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) = 0; - /** Write topic info and (optionally) @a objv_tracker into the config */ - virtual int write_topic_v2(const rgw_pubsub_topic& topic, - RGWObjVersionTracker* objv_tracker, + /** Write topic info and @a objv_tracker into the config */ + virtual int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) = 0; /** Remove the topic config, optionally a specific version */ virtual int remove_topic_v2(const std::string& topic_name, const std::string& tenant, - RGWObjVersionTracker* objv_tracker, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) = 0; /** Update the bucket-topic mapping in the store, if |add_mapping|=true then @@ -356,10 +356,6 @@ class Driver { std::set& bucket_keys, optional_yield y, const DoutPrefixProvider* dpp) = 0; - /** Remove the bucket-topic mapping from the backend store. */ - virtual int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, - optional_yield y, - const DoutPrefixProvider* dpp) = 0; /** Get access to the lifecycle management thread */ virtual RGWLC* get_rgwlc(void) = 0; /** Get access to the coroutine registry. Used to create new coroutine managers */ diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 8bb704ce17c..83832922f90 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -202,15 +202,15 @@ public: const DoutPrefixProvider* dpp) override { return next->read_topic_v2(topic_name, tenant, topic, objv_tracker, y, dpp); } - int write_topic_v2(const rgw_pubsub_topic& topic, - RGWObjVersionTracker* objv_tracker, + int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override { - return next->write_topic_v2(topic, objv_tracker, y, dpp); + return next->write_topic_v2(topic, exclusive, objv_tracker, y, dpp); } int remove_topic_v2(const std::string& topic_name, const std::string& tenant, - RGWObjVersionTracker* objv_tracker, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override { return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp); @@ -237,11 +237,6 @@ public: const DoutPrefixProvider* dpp) override { return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp); } - int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, - optional_yield y, - const DoutPrefixProvider* dpp) override { - return next->delete_bucket_topic_mapping(topic, y, dpp); - } virtual RGWLC* get_rgwlc(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override; diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index b34276a9daa..f0ac762554e 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -42,15 +42,15 @@ class StoreDriver : public Driver { const DoutPrefixProvider* dpp) override { return -EOPNOTSUPP; } - int write_topic_v2(const rgw_pubsub_topic& topic, - RGWObjVersionTracker* objv_tracker, + int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override { return -EOPNOTSUPP; } int remove_topic_v2(const std::string& topic_name, const std::string& tenant, - RGWObjVersionTracker* objv_tracker, + RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override { return -EOPNOTSUPP; @@ -75,11 +75,6 @@ class StoreDriver : public Driver { const DoutPrefixProvider* dpp) override { return -EOPNOTSUPP; } - int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, - optional_yield y, - const DoutPrefixProvider* dpp) override { - return -EOPNOTSUPP; - } }; class StoreUser : public User { -- 2.39.5