driver/rados/rgw_trim_mdlog.cc
driver/rados/rgw_user.cc
driver/rados/rgw_zone.cc
- driver/rados/sync_fairness.cc)
+ driver/rados/sync_fairness.cc
+ driver/rados/topic.cc)
list(APPEND librgw_common_srcs
driver/immutable_config/store.cc
if (ret < 0)
return ret;
- ret = open_topics_pool_ctx(dpp);
- if (ret < 0)
- return ret;
-
pools_initialized = true;
if (use_gc) {
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
}
-int RGWRados::open_topics_pool_ctx(const DoutPrefixProvider* dpp) {
- return rgw_init_ioctx(dpp, get_rados_handle(),
- svc.zone->get_zone_params().topics_pool,
- topics_pool_ctx, true, true);
-}
-
int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk)
{
int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
- int open_topics_pool_ctx(const DoutPrefixProvider* dpp);
int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk);
librados::IoCtx objexp_pool_ctx;
librados::IoCtx reshard_pool_ctx;
librados::IoCtx notif_pool_ctx; // .rgw.notif
- librados::IoCtx topics_pool_ctx; // .rgw.meta:topics
bool pools_initialized{false};
librados::IoCtx& get_notif_pool_ctx() {
return notif_pool_ctx;
}
-
- librados::IoCtx& get_topics_pool_ctx() { return topics_pool_ctx; }
void set_context(CephContext *_cct) {
cct = _cct;
#include "cls/rgw/cls_rgw_client.h"
#include "rgw_pubsub.h"
+#include "topic.h"
#define dout_subsys ceph_subsys_rgw
rgw_pubsub_topic& topic,
RGWObjVersionTracker* objv_tracker,
optional_yield y,
- const DoutPrefixProvider* dpp) {
- bufferlist bl;
- auto mtime = ceph::real_clock::zero();
- RGWSI_MBSObj_GetParams params(&bl, nullptr, &mtime);
- std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
- svc()->topic->svc.meta_be->alloc_ctx());
- ctx->init(svc()->topic->get_be_handler());
- const int ret = svc()->topic->svc.meta_be->get(
- ctx.get(), get_topic_metadata_key(tenant, topic_name),
- params, objv_tracker, y, dpp);
- if (ret < 0) {
- return ret;
- }
-
- auto iter = bl.cbegin();
- try {
- decode(topic, iter);
- } catch (buffer::error& err) {
- ldpp_dout(dpp, 20) << " failed to decode topic: " << topic_name
- << ". error: " << err.what() << dendl;
- return -EIO;
- }
- return 0;
+ const DoutPrefixProvider* dpp)
+{
+ const RGWZoneParams& zone = svc()->zone->get_zone_params();
+ const std::string key = get_topic_metadata_key(tenant, topic_name);
+ return rgwrados::topic::read(dpp, y, *svc()->sysobj, svc()->cache,
+ zone, key, topic, *ctl()->meta.topic_cache,
+ nullptr, objv_tracker);
}
-int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic,
- RGWObjVersionTracker* objv_tracker,
+int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
- const DoutPrefixProvider* dpp) {
- bufferlist bl;
- encode(topic, bl);
- RGWSI_MBSObj_PutParams params(bl, nullptr, ceph::real_clock::zero(),
- /*exclusive*/ false);
- std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
- svc()->topic->svc.meta_be->alloc_ctx());
- ctx->init(svc()->topic->get_be_handler());
- return svc()->topic->svc.meta_be->put(
- ctx.get(), get_topic_metadata_key(topic.user.tenant, topic.name),
- params, objv_tracker, y, dpp);
+ const DoutPrefixProvider* dpp)
+{
+ const RGWZoneParams& zone = svc()->zone->get_zone_params();
+ return rgwrados::topic::write(dpp, y, *svc()->sysobj, svc()->mdlog, zone,
+ topic, objv_tracker, {}, exclusive);
}
int RadosStore::remove_topic_v2(const std::string& topic_name,
const std::string& tenant,
- RGWObjVersionTracker* objv_tracker,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
- const DoutPrefixProvider* dpp) {
- RGWSI_MBSObj_RemoveParams params;
- std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
- svc()->topic->svc.meta_be->alloc_ctx());
- ctx->init(svc()->topic->get_be_handler());
- return svc()->topic->svc.meta_be->remove(ctx.get(),
- get_topic_metadata_key(tenant, topic_name),
- params, objv_tracker, y, dpp);
+ const DoutPrefixProvider* dpp)
+{
+ const RGWZoneParams& zone = svc()->zone->get_zone_params();
+ const std::string key = get_topic_metadata_key(tenant, topic_name);
+ return rgwrados::topic::remove(dpp, y, *svc()->sysobj, svc()->mdlog,
+ zone, key, objv_tracker);
}
int RadosStore::remove_bucket_mapping_from_topics(
bool add_mapping,
optional_yield y,
const DoutPrefixProvider* dpp) {
- bufferlist empty_bl;
- librados::ObjectWriteOperation op;
+ librados::Rados& rados = *getRados()->get_rados_handle();
+ const RGWZoneParams& zone = svc()->zone->get_zone_params();
+ const std::string key = get_topic_metadata_key(topic.user.tenant, topic.name);
int ret = 0;
if (add_mapping) {
- std::map<std::string, bufferlist> mapping{{bucket_key, empty_bl}};
- op.omap_set(mapping);
+ ret = rgwrados::topic::link_bucket(dpp, y, rados, zone, key, bucket_key);
} else {
- std::set<std::string> to_rm{{bucket_key}};
- op.omap_rm_keys(to_rm);
+ ret = rgwrados::topic::unlink_bucket(dpp, y, rados, zone, key, bucket_key);
}
- ret = rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
- get_bucket_topic_mapping_oid(topic), &op, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to " << (add_mapping ? "add" : "remove")
<< " topic bucket mapping for bucket: " << bucket_key
int RadosStore::get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
std::set<std::string>& bucket_keys,
optional_yield y,
- const DoutPrefixProvider* dpp) {
- constexpr auto max_chunk = 1024U;
- std::string start_after;
- bool more = true;
- int rval;
- while (more) {
- librados::ObjectReadOperation op;
- std::set<std::string> curr_keys;
- op.omap_get_keys2(start_after, max_chunk, &curr_keys, &more, &rval);
- const auto ret =
- rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
- get_bucket_topic_mapping_oid(topic), &op, nullptr, y);
- if (ret == -ENOENT) {
- // mapping object was not created - nothing to do
- return 0;
- }
+ const DoutPrefixProvider* dpp)
+{
+ librados::Rados& rados = *getRados()->get_rados_handle();
+ const RGWZoneParams& zone = svc()->zone->get_zone_params();
+ const std::string key = get_topic_metadata_key(topic.user.tenant, topic.name);
+ constexpr int max_chunk = 1024;
+ std::string marker;
+
+ do {
+ int ret = rgwrados::topic::list_buckets(dpp, y, rados, zone, key, marker,
+ max_chunk, bucket_keys, marker);
if (ret < 0) {
- // TODO: do we need to check on rval as well as ret?
ldpp_dout(dpp, 1)
<< "ERROR: failed to read bucket topic mapping object for topic: "
<< topic.name << ", ret= " << ret << dendl;
return ret;
}
- if (more) {
- if (curr_keys.empty()) {
- return -EINVAL; // something wrong.
- }
- start_after = *curr_keys.rbegin();
- }
- bucket_keys.merge(curr_keys);
- }
- return 0;
-}
+ } while (!marker.empty());
-int RadosStore::delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
- optional_yield y,
- const DoutPrefixProvider* dpp) {
- librados::ObjectWriteOperation op;
- op.remove();
- const int ret =
- rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
- get_bucket_topic_mapping_oid(topic), &op, y);
- if (ret < 0 && ret != -ENOENT) {
- ldpp_dout(dpp, 1)
- << "ERROR: failed removing bucket topic mapping omap for topic: "
- << topic.name << ", ret=" << ret << dendl;
- return ret;
- }
- ldpp_dout(dpp, 20)
- << "Successfully deleted topic bucket mapping omap for topic: "
- << topic.name << dendl;
return 0;
}
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override;
- int write_topic_v2(const rgw_pubsub_topic& topic,
- RGWObjVersionTracker* objv_tracker,
+ int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override;
int remove_topic_v2(const std::string& topic_name,
const std::string& tenant,
- RGWObjVersionTracker* objv_tracker,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override;
int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
std::set<std::string>& bucket_keys,
optional_yield y,
const DoutPrefixProvider* dpp) override;
- int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
- optional_yield y,
- const DoutPrefixProvider* dpp) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
#include "rgw_user.h"
#include "rgw_role.h"
#include "rgw_pubsub.h"
+#include "topic.h"
#define dout_subsys ceph_subsys_rgw
bucket_meta_handler->init(svc.bucket, bucket.get());
bi_meta_handler->init(svc.zone, svc.bucket, svc.bi);
+ meta.topic_cache = std::make_unique<RGWChainedCacheImpl<rgwrados::topic::cache_entry>>();
+ meta.topic_cache->init(svc.cache);
+
+ meta.topic = rgwrados::topic::create_metadata_handler(
+ *svc.sysobj, svc.cache, *svc.mdlog, svc.zone->get_zone_params(),
+ *meta.topic_cache);
+
RGWOTPMetadataHandlerBase *otp_handler = static_cast<RGWOTPMetadataHandlerBase *>(meta.otp.get());
otp_handler->init(svc.zone, svc.meta_be_otp, svc.otp);
meta.otp = _ctl.meta.otp.get();
meta.role = _ctl.meta.role.get();
meta.topic = _ctl.meta.topic.get();
+ meta.topic_cache = _ctl.meta.topic_cache.get();
user = _ctl.user.get();
bucket = _ctl.bucket.get();
struct RGWServices_Def;
+namespace rgwrados::topic { struct cache_entry; }
+
class RGWServiceInstance
{
friend struct RGWServices_Def;
class RGWBucketCtl;
class RGWOTPCtl;
+template <class T>
+class RGWChainedCacheImpl;
+
struct RGWCtlDef {
struct _meta {
std::unique_ptr<RGWMetadataManager> mgr;
std::unique_ptr<RGWMetadataHandler> role;
std::unique_ptr<RGWMetadataHandler> topic;
+ std::unique_ptr<RGWChainedCacheImpl<rgwrados::topic::cache_entry>> topic_cache;
+
_meta();
~_meta();
} meta;
RGWMetadataHandler *otp{nullptr};
RGWMetadataHandler *role{nullptr};
RGWMetadataHandler* topic{nullptr};
+
+ RGWChainedCacheImpl<rgwrados::topic::cache_entry>* topic_cache{nullptr};
} meta;
RGWUserCtl *user{nullptr};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "topic.h"
+#include "common/errno.h"
+#include "rgw_common.h"
+#include "rgw_metadata.h"
+#include "rgw_metadata_lister.h"
+#include "rgw_pubsub.h"
+#include "rgw_rados.h"
+#include "rgw_string.h"
+#include "rgw_tools.h"
+#include "rgw_zone.h"
+#include "svc_mdlog.h"
+#include "svc_sys_obj_cache.h"
+
+namespace rgwrados::topic {
+
+static const std::string oid_prefix = "topic.";
+static constexpr std::string_view buckets_oid_prefix = "buckets.";
+
+static rgw_raw_obj get_topic_obj(const RGWZoneParams& zone,
+ std::string_view metadata_key)
+{
+ std::string oid = string_cat_reserve(oid_prefix, metadata_key);
+ return {zone.topics_pool, std::move(oid)};
+}
+
+static rgw_raw_obj get_buckets_obj(const RGWZoneParams& zone,
+ std::string_view metadata_key)
+{
+ std::string oid = string_cat_reserve(buckets_oid_prefix, metadata_key);
+ return {zone.topics_pool, std::move(oid)};
+}
+
+
+int read(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc,
+ const RGWZoneParams& zone, const std::string& topic_key,
+ rgw_pubsub_topic& info, RGWChainedCacheImpl<cache_entry>& cache,
+ ceph::real_time* pmtime, RGWObjVersionTracker* pobjv)
+{
+ if (auto e = cache.find(topic_key)) {
+ if (pmtime) {
+ *pmtime = e->mtime;
+ }
+ if (pobjv) {
+ *pobjv = std::move(e->objv);
+ }
+ info = std::move(e->info);
+ return 0;
+ }
+
+ const rgw_raw_obj obj = get_topic_obj(zone, topic_key);
+
+ bufferlist bl;
+ cache_entry entry;
+ rgw_cache_entry_info cache_info;
+ int r = rgw_get_system_obj(&sysobj, obj.pool, obj.oid, bl, &entry.objv,
+ &entry.mtime, y, dpp, nullptr, &cache_info);
+ if (r < 0) {
+ return r;
+ }
+
+ try {
+ auto p = bl.cbegin();
+ decode(entry.info, p);
+ } catch (const buffer::error&) {
+ return -EIO;
+ }
+
+ cache.put(dpp, cache_svc, topic_key, &entry, {&cache_info});
+
+ if (pmtime) {
+ *pmtime = entry.mtime;
+ }
+ if (pobjv) {
+ *pobjv = std::move(entry.objv);
+ }
+ info = std::move(entry.info);
+ return 0;
+}
+
+int write(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+ const rgw_pubsub_topic& info, RGWObjVersionTracker& objv,
+ ceph::real_time mtime, bool exclusive)
+{
+ const std::string topic_key = get_topic_metadata_key(info.user.tenant, info.name);
+ const rgw_raw_obj obj = get_topic_obj(zone, topic_key);
+
+ bufferlist bl;
+ encode(info, bl);
+
+ int r = rgw_put_system_obj(dpp, &sysobj, obj.pool, obj.oid,
+ bl, exclusive, &objv, mtime, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to write topic obj " << obj.oid
+ << " with: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ // record in the mdlog on success
+ if (mdlog) {
+ return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv);
+ }
+ return 0;
+}
+
+int remove(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+ const std::string& topic_key, RGWObjVersionTracker& objv)
+{
+ // delete topic info
+ const rgw_raw_obj topic = get_topic_obj(zone, topic_key);
+ int r = rgw_delete_system_obj(dpp, &sysobj, topic.pool, topic.oid, &objv, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove topic obj "
+ << topic.oid << " with: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ // delete the buckets object
+ const rgw_raw_obj buckets = get_buckets_obj(zone, topic_key);
+ r = rgw_delete_system_obj(dpp, &sysobj, buckets.pool,
+ buckets.oid, nullptr, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "WARNING: failed to remove topic buckets obj "
+ << buckets.oid << " with: " << cpp_strerror(r) << dendl;
+ } // not fatal
+
+ // record in the mdlog on success
+ if (mdlog) {
+ return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv);
+ }
+ return 0;
+}
+
+
+int link_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& topic_key,
+ const std::string& bucket_key)
+{
+ const rgw_raw_obj obj = get_buckets_obj(zone, topic_key);
+
+ rgw_rados_ref ref;
+ int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ op.omap_set({{bucket_key, bufferlist{}}});
+
+ return rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, y);
+}
+
+int unlink_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& topic_key,
+ const std::string& bucket_key)
+{
+ const rgw_raw_obj obj = get_buckets_obj(zone, topic_key);
+
+ rgw_rados_ref ref;
+ int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ op.omap_rm_keys({{bucket_key}});
+
+ return rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, y);
+}
+
+int list_buckets(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& topic_key,
+ const std::string& marker, int max_items,
+ std::set<std::string>& bucket_keys,
+ std::string& next_marker)
+{
+ const rgw_raw_obj obj = get_buckets_obj(zone, topic_key);
+
+ rgw_rados_ref ref;
+ int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectReadOperation op;
+ std::set<std::string> keys;
+ bool more = false;
+ int rval = 0;
+ op.omap_get_keys2(marker, max_items, &keys, &more, &rval);
+ r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, nullptr, y);
+ if (r == -ENOENT) {
+ return 0;
+ }
+ if (r < 0) {
+ return r;
+ }
+ if (rval < 0) {
+ return rval;
+ }
+
+ if (more && !keys.empty()) {
+ next_marker = *keys.rbegin();
+ } else {
+ next_marker.clear();
+ }
+ bucket_keys.merge(std::move(keys));
+
+ return 0;
+}
+
+
+class MetadataObject : public RGWMetadataObject {
+ rgw_pubsub_topic info;
+public:
+ MetadataObject(const rgw_pubsub_topic& info, const obj_version& v, real_time m)
+ : RGWMetadataObject(v, m), info(info) {}
+
+ void dump(Formatter *f) const override {
+ info.dump(f);
+ }
+
+ rgw_pubsub_topic& get_topic_info() {
+ return info;
+ }
+};
+
+class MetadataLister : public RGWMetadataLister {
+ public:
+ using RGWMetadataLister::RGWMetadataLister;
+
+ virtual void filter_transform(std::vector<std::string>& oids,
+ std::list<std::string>& keys) {
+ // remove the oid prefix from keys
+ constexpr auto trim = [] (const std::string& oid) {
+ return oid.substr(oid_prefix.size());
+ };
+ std::transform(oids.begin(), oids.end(),
+ std::back_inserter(keys),
+ trim);
+ }
+};
+
+class MetadataHandler : public RGWMetadataHandler {
+ RGWSI_SysObj& sysobj;
+ RGWSI_SysObj_Cache* cache_svc;
+ RGWSI_MDLog& mdlog;
+ const RGWZoneParams& zone;
+ RGWChainedCacheImpl<cache_entry>& cache;
+ public:
+ MetadataHandler(RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc,
+ RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+ RGWChainedCacheImpl<cache_entry>& cache)
+ : sysobj(sysobj), cache_svc(cache_svc), mdlog(mdlog),
+ zone(zone), cache(cache)
+ {}
+
+ std::string get_type() final { return "topic"; }
+
+ RGWMetadataObject* get_meta_obj(JSONObj *jo,
+ const obj_version& objv,
+ const ceph::real_time& mtime) override
+ {
+ rgw_pubsub_topic info;
+
+ try {
+ info.decode_json(jo);
+ } catch (JSONDecoder:: err& e) {
+ return nullptr;
+ }
+
+ return new MetadataObject(info, objv, mtime);
+ }
+
+ int get(std::string& entry, RGWMetadataObject** obj,
+ optional_yield y, const DoutPrefixProvider* dpp) override
+ {
+ cache_entry e;
+ int ret = read(dpp, y, sysobj, cache_svc, zone, entry,
+ e.info, cache, &e.mtime, &e.objv);
+ if (ret < 0) {
+ return ret;
+ }
+
+ *obj = new MetadataObject(e.info, e.objv.read_version, e.mtime);
+ return 0;
+ }
+
+ int put(std::string& entry, RGWMetadataObject* obj,
+ RGWObjVersionTracker& objv_tracker,
+ optional_yield y, const DoutPrefixProvider* dpp,
+ RGWMDLogSyncType type, bool from_remote_zone) override
+ {
+ auto robj = static_cast<MetadataObject*>(obj);
+ auto& info = robj->get_topic_info();
+ auto mtime = robj->get_mtime();
+
+ constexpr bool exclusive = false;
+ int ret = write(dpp, y, sysobj, &mdlog, zone, info,
+ objv_tracker, mtime, exclusive);
+ return ret < 0 ? ret : STATUS_APPLIED;
+ }
+
+ int remove(std::string& entry, RGWObjVersionTracker& objv_tracker,
+ optional_yield y, const DoutPrefixProvider *dpp) override
+ {
+ return topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker);
+ }
+
+ int mutate(const std::string& entry, const ceph::real_time& mtime,
+ RGWObjVersionTracker* objv_tracker, optional_yield y,
+ const DoutPrefixProvider* dpp, RGWMDLogStatus op_type,
+ std::function<int()> f) override
+ {
+ return -ENOTSUP; // unused
+ }
+
+ int list_keys_init(const DoutPrefixProvider* dpp,
+ const std::string& marker,
+ void** phandle) override
+ {
+ const auto& pool = zone.topics_pool;
+ auto lister = std::make_unique<MetadataLister>(sysobj.get_pool(pool));
+ int ret = lister->init(dpp, marker, oid_prefix);
+ if (ret < 0) {
+ return ret;
+ }
+ *phandle = lister.release(); // release ownership
+ return 0;
+ }
+
+ int list_keys_next(const DoutPrefixProvider* dpp,
+ void* handle, int max,
+ std::list<std::string>& keys,
+ bool* truncated) override
+ {
+ auto lister = static_cast<RGWMetadataLister*>(handle);
+ return lister->get_next(dpp, max, keys, truncated);
+ }
+
+ void list_keys_complete(void *handle) override
+ {
+ delete static_cast<RGWMetadataLister*>(handle);
+ }
+
+ std::string get_marker(void *handle) override
+ {
+ auto lister = static_cast<RGWMetadataLister*>(handle);
+ return lister->get_marker();
+ }
+};
+
+
+auto create_metadata_handler(RGWSI_SysObj& sysobj,
+ RGWSI_SysObj_Cache* cache_svc,
+ RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+ RGWChainedCacheImpl<cache_entry>& cache)
+ -> std::unique_ptr<RGWMetadataHandler>
+{
+ return std::make_unique<MetadataHandler>(sysobj, cache_svc, mdlog,
+ zone, cache);
+}
+
+} // rgwrados::topic
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <set>
+#include <string>
+#include "include/rados/librados_fwd.hpp"
+#include "common/ceph_time.h"
+#include "rgw_pubsub.h"
+
+class DoutPrefixProvider;
+class optional_yield;
+class RGWMetadataHandler;
+class RGWObjVersionTracker;
+class RGWSI_MDLog;
+class RGWSI_SysObj;
+class RGWSI_SysObj_Cache;
+class RGWZoneParams;
+
+template <typename T> class RGWChainedCacheImpl;
+
+// Rados interface for v2 topic metadata
+namespace rgwrados::topic {
+
+struct cache_entry {
+ rgw_pubsub_topic info;
+ RGWObjVersionTracker objv;
+ ceph::real_time mtime;
+};
+
+/// Read topic info by metadata key.
+int read(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc,
+ const RGWZoneParams& zone, const std::string& topic_key,
+ rgw_pubsub_topic& info, RGWChainedCacheImpl<cache_entry>& cache,
+ ceph::real_time* pmtime = nullptr,
+ RGWObjVersionTracker* pobjv = nullptr);
+
+/// Write or overwrite topic info.
+int write(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+ const rgw_pubsub_topic& info, RGWObjVersionTracker& objv,
+ ceph::real_time mtime, bool exclusive);
+
+/// Remove a topic by metadata key.
+int remove(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog,
+ const RGWZoneParams& zone, const std::string& topic_key,
+ RGWObjVersionTracker& objv);
+
+
+/// Add a bucket key to the topic's list of buckets.
+int link_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& topic_key,
+ const std::string& bucket_key);
+
+/// Remove a bucket key from the topic's list of buckets.
+int unlink_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& topic_key,
+ const std::string& bucket_key);
+
+/// List the bucket keys associated with a given topic.
+int list_buckets(const DoutPrefixProvider* dpp, optional_yield y,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& topic_key,
+ const std::string& marker, int max_items,
+ std::set<std::string>& bucket_keys,
+ std::string& next_marker);
+
+
+/// Topic metadata handler factory.
+auto create_metadata_handler(RGWSI_SysObj& sysobj,
+ RGWSI_SysObj_Cache* cache_svc,
+ RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+ RGWChainedCacheImpl<cache_entry>& cache)
+ -> std::unique_ptr<RGWMetadataHandler>;
+
+} // rgwrados::topic
const rgw_pubsub_topic& topic,
optional_yield y) const {
RGWObjVersionTracker objv_tracker;
- auto ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp);
+ objv_tracker.generate_new_write_ver(dpp->get_cct());
+ constexpr bool exclusive = false;
+ auto ret = driver->write_topic_v2(topic, exclusive, objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
<< dendl;
<< dendl;
return 0;
}
- ret = driver->remove_topic_v2(name, tenant, &objv_tracker, y, dpp);
+ ret = driver->remove_topic_v2(name, tenant, objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
<< dendl;
return ret;
}
- ret = driver->delete_bucket_topic_mapping(topic, y, dpp);
return ret;
}
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) = 0;
- /** Write topic info and (optionally) @a objv_tracker into the config */
- virtual int write_topic_v2(const rgw_pubsub_topic& topic,
- RGWObjVersionTracker* objv_tracker,
+ /** Write topic info and @a objv_tracker into the config */
+ virtual int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) = 0;
/** Remove the topic config, optionally a specific version */
virtual int remove_topic_v2(const std::string& topic_name,
const std::string& tenant,
- RGWObjVersionTracker* objv_tracker,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) = 0;
/** Update the bucket-topic mapping in the store, if |add_mapping|=true then
std::set<std::string>& bucket_keys,
optional_yield y,
const DoutPrefixProvider* dpp) = 0;
- /** Remove the bucket-topic mapping from the backend store. */
- virtual int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
- optional_yield y,
- const DoutPrefixProvider* dpp) = 0;
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
const DoutPrefixProvider* dpp) override {
return next->read_topic_v2(topic_name, tenant, topic, objv_tracker, y, dpp);
}
- int write_topic_v2(const rgw_pubsub_topic& topic,
- RGWObjVersionTracker* objv_tracker,
+ int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
- return next->write_topic_v2(topic, objv_tracker, y, dpp);
+ return next->write_topic_v2(topic, exclusive, objv_tracker, y, dpp);
}
int remove_topic_v2(const std::string& topic_name,
const std::string& tenant,
- RGWObjVersionTracker* objv_tracker,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
const DoutPrefixProvider* dpp) override {
return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
}
- int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
- optional_yield y,
- const DoutPrefixProvider* dpp) override {
- return next->delete_bucket_topic_mapping(topic, y, dpp);
- }
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
const DoutPrefixProvider* dpp) override {
return -EOPNOTSUPP;
}
- int write_topic_v2(const rgw_pubsub_topic& topic,
- RGWObjVersionTracker* objv_tracker,
+ int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
return -EOPNOTSUPP;
}
int remove_topic_v2(const std::string& topic_name,
const std::string& tenant,
- RGWObjVersionTracker* objv_tracker,
+ RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
return -EOPNOTSUPP;
const DoutPrefixProvider* dpp) override {
return -EOPNOTSUPP;
}
- int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
- optional_yield y,
- const DoutPrefixProvider* dpp) override {
- return -EOPNOTSUPP;
- }
};
class StoreUser : public User {