]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/topic: add rgwrados::topic interface for topic metadata
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Jan 2024 20:33:25 +0000 (15:33 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 10 Apr 2024 13:18:06 +0000 (09:18 -0400)
add a new interface for topic metadata that doesn't depend on metadata
backends. this low-level interface is used by both RadosStore and the
topic metadata handler

remove Driver::delete_bucket_topic_mapping() from sal because the omap
object is deleted internally by rgwrados::topic::remove()

remove the RGWRados::topics_pool_ctx member

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 6d1d036afb2d1624674fef43f2e70ef3b3ae2859)

13 files changed:
src/rgw/CMakeLists.txt
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/driver/rados/rgw_service.cc
src/rgw/driver/rados/rgw_service.h
src/rgw/driver/rados/topic.cc [new file with mode: 0644]
src/rgw/driver/rados/topic.h [new file with mode: 0644]
src/rgw/rgw_pubsub.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h

index 2987b70b3826f4fb372a97d40e5cd2890673f324..031fc47bfc296aaed100d26aff6e9634796fce20 100644 (file)
@@ -194,7 +194,8 @@ set(librgw_common_srcs
   driver/rados/rgw_trim_mdlog.cc
   driver/rados/rgw_user.cc
   driver/rados/rgw_zone.cc
-  driver/rados/sync_fairness.cc)
+  driver/rados/sync_fairness.cc
+  driver/rados/topic.cc)
 
 list(APPEND librgw_common_srcs
   driver/immutable_config/store.cc
index 8552c29cf476f0a992e062882cab91b8c500b612..b74f2d0798d51c6192a90904770b39bc22111e55 100644 (file)
@@ -1212,10 +1212,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
   if (ret < 0)
     return ret;
 
-  ret = open_topics_pool_ctx(dpp);
-  if (ret < 0)
-    return ret;
-
   pools_initialized = true;
 
   if (use_gc) {
@@ -1447,12 +1443,6 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp)
   return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
 }
 
-int RGWRados::open_topics_pool_ctx(const DoutPrefixProvider* dpp) {
-  return rgw_init_ioctx(dpp, get_rados_handle(),
-                        svc.zone->get_zone_params().topics_pool,
-                        topics_pool_ctx, true, true);
-}
-
 int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
                            bool mostly_omap, bool bulk)
 {
index f43b1b4e5314416d14148e58e7b76b79ba7e100e..3d7776b0fa0d04c087e95b2eb9fdad9f849806dd 100644 (file)
@@ -359,7 +359,6 @@ class RGWRados
   int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
   int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
   int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
-  int open_topics_pool_ctx(const DoutPrefixProvider* dpp);
 
   int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx&  io_ctx,
                    bool mostly_omap, bool bulk);
@@ -449,7 +448,6 @@ protected:
   librados::IoCtx objexp_pool_ctx;
   librados::IoCtx reshard_pool_ctx;
   librados::IoCtx notif_pool_ctx;     // .rgw.notif
-  librados::IoCtx topics_pool_ctx;  // .rgw.meta:topics
 
   bool pools_initialized{false};
 
@@ -535,8 +533,6 @@ public:
   librados::IoCtx& get_notif_pool_ctx() {
     return notif_pool_ctx;
   }
-
-  librados::IoCtx& get_topics_pool_ctx() { return topics_pool_ctx; }
   
   void set_context(CephContext *_cct) {
     cct = _cct;
index 0e6cbf96aea852689ea238776ae045f16324b1b0..dcec859cc498e66bc820fab4f6251e16e8aa902d 100644 (file)
@@ -66,6 +66,7 @@
 #include "cls/rgw/cls_rgw_client.h"
 
 #include "rgw_pubsub.h"
+#include "topic.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -1141,59 +1142,35 @@ int RadosStore::read_topic_v2(const std::string& topic_name,
                               rgw_pubsub_topic& topic,
                               RGWObjVersionTracker* objv_tracker,
                               optional_yield y,
-                              const DoutPrefixProvider* dpp) {
-  bufferlist bl;
-  auto mtime = ceph::real_clock::zero();
-  RGWSI_MBSObj_GetParams params(&bl, nullptr, &mtime);
-  std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
-      svc()->topic->svc.meta_be->alloc_ctx());
-  ctx->init(svc()->topic->get_be_handler());
-  const int ret = svc()->topic->svc.meta_be->get(
-      ctx.get(), get_topic_metadata_key(tenant, topic_name),
-      params, objv_tracker, y, dpp);
-  if (ret < 0) {
-    return ret;
-  }
-
-  auto iter = bl.cbegin();
-  try {
-    decode(topic, iter);
-  } catch (buffer::error& err) {
-    ldpp_dout(dpp, 20) << " failed to decode topic: " << topic_name
-                       << ". error: " << err.what() << dendl;
-    return -EIO;
-  }
-  return 0;
+                              const DoutPrefixProvider* dpp)
+{
+  const RGWZoneParams& zone = svc()->zone->get_zone_params();
+  const std::string key = get_topic_metadata_key(tenant, topic_name);
+  return rgwrados::topic::read(dpp, y, *svc()->sysobj, svc()->cache,
+                               zone, key, topic, *ctl()->meta.topic_cache,
+                               nullptr, objv_tracker);
 }
 
-int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic,
-                               RGWObjVersionTracker* objv_tracker,
+int RadosStore::write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+                               RGWObjVersionTracker& objv_tracker,
                                optional_yield y,
-                               const DoutPrefixProvider* dpp) {
-  bufferlist bl;
-  encode(topic, bl);
-  RGWSI_MBSObj_PutParams params(bl, nullptr, ceph::real_clock::zero(),
-                                /*exclusive*/ false);
-  std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
-      svc()->topic->svc.meta_be->alloc_ctx());
-  ctx->init(svc()->topic->get_be_handler());
-  return svc()->topic->svc.meta_be->put(
-      ctx.get(), get_topic_metadata_key(topic.user.tenant, topic.name),
-      params, objv_tracker, y, dpp);
+                               const DoutPrefixProvider* dpp)
+{
+  const RGWZoneParams& zone = svc()->zone->get_zone_params();
+  return rgwrados::topic::write(dpp, y, *svc()->sysobj, svc()->mdlog, zone,
+                                topic, objv_tracker, {}, exclusive);
 }
 
 int RadosStore::remove_topic_v2(const std::string& topic_name,
                                 const std::string& tenant,
-                                RGWObjVersionTracker* objv_tracker,
+                                RGWObjVersionTracker& objv_tracker,
                                 optional_yield y,
-                                const DoutPrefixProvider* dpp) {
-  RGWSI_MBSObj_RemoveParams params;
-  std::unique_ptr<RGWSI_MetaBackend::Context> ctx(
-      svc()->topic->svc.meta_be->alloc_ctx());
-  ctx->init(svc()->topic->get_be_handler());
-  return svc()->topic->svc.meta_be->remove(ctx.get(),
-                                           get_topic_metadata_key(tenant, topic_name),
-                                           params, objv_tracker, y, dpp);
+                                const DoutPrefixProvider* dpp)
+{
+  const RGWZoneParams& zone = svc()->zone->get_zone_params();
+  const std::string key = get_topic_metadata_key(tenant, topic_name);
+  return rgwrados::topic::remove(dpp, y, *svc()->sysobj, svc()->mdlog,
+                                 zone, key, objv_tracker);
 }
 
 int RadosStore::remove_bucket_mapping_from_topics(
@@ -1223,18 +1200,15 @@ int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
                                             bool add_mapping,
                                             optional_yield y,
                                             const DoutPrefixProvider* dpp) {
-  bufferlist empty_bl;
-  librados::ObjectWriteOperation op;
+  librados::Rados& rados = *getRados()->get_rados_handle();
+  const RGWZoneParams& zone = svc()->zone->get_zone_params();
+  const std::string key = get_topic_metadata_key(topic.user.tenant, topic.name);
   int ret = 0;
   if (add_mapping) {
-    std::map<std::string, bufferlist> mapping{{bucket_key, empty_bl}};
-    op.omap_set(mapping);
+    ret = rgwrados::topic::link_bucket(dpp, y, rados, zone, key, bucket_key);
   } else {
-    std::set<std::string> to_rm{{bucket_key}};
-    op.omap_rm_keys(to_rm);
+    ret = rgwrados::topic::unlink_bucket(dpp, y, rados, zone, key, bucket_key);
   }
-  ret = rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
-                          get_bucket_topic_mapping_oid(topic), &op, y);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to " << (add_mapping ? "add" : "remove")
                       << " topic bucket mapping for bucket: " << bucket_key
@@ -1250,57 +1224,25 @@ int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
 int RadosStore::get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
                                          std::set<std::string>& bucket_keys,
                                          optional_yield y,
-                                         const DoutPrefixProvider* dpp) {
-  constexpr auto max_chunk = 1024U;
-  std::string start_after;
-  bool more = true;
-  int rval;
-  while (more) {
-    librados::ObjectReadOperation op;
-    std::set<std::string> curr_keys;
-    op.omap_get_keys2(start_after, max_chunk, &curr_keys, &more, &rval);
-    const auto ret =
-        rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
-                          get_bucket_topic_mapping_oid(topic), &op, nullptr, y);
-    if (ret == -ENOENT) {
-      // mapping object was not created - nothing to do
-      return 0;
-    }
+                                         const DoutPrefixProvider* dpp)
+{
+  librados::Rados& rados = *getRados()->get_rados_handle();
+  const RGWZoneParams& zone = svc()->zone->get_zone_params();
+  const std::string key = get_topic_metadata_key(topic.user.tenant, topic.name);
+  constexpr int max_chunk = 1024;
+  std::string marker;
+
+  do {
+    int ret = rgwrados::topic::list_buckets(dpp, y, rados, zone, key, marker,
+                                            max_chunk, bucket_keys, marker);
     if (ret < 0) {
-      // TODO: do we need to check on rval as well as ret?
       ldpp_dout(dpp, 1)
           << "ERROR: failed to read bucket topic mapping object for topic: "
           << topic.name << ", ret= " << ret << dendl;
       return ret;
     }
-    if (more) {
-      if (curr_keys.empty()) {
-        return -EINVAL;  // something wrong.
-      }
-      start_after = *curr_keys.rbegin();
-    }
-    bucket_keys.merge(curr_keys);
-  }
-  return 0;
-}
+  } while (!marker.empty());
 
-int RadosStore::delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
-                                            optional_yield y,
-                                            const DoutPrefixProvider* dpp) {
-  librados::ObjectWriteOperation op;
-  op.remove();
-  const int ret =
-      rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
-                        get_bucket_topic_mapping_oid(topic), &op, y);
-  if (ret < 0 && ret != -ENOENT) {
-    ldpp_dout(dpp, 1)
-        << "ERROR: failed removing bucket topic mapping omap for topic: "
-        << topic.name << ", ret=" << ret << dendl;
-    return ret;
-  }
-  ldpp_dout(dpp, 20)
-      << "Successfully deleted topic bucket mapping omap for topic: "
-      << topic.name << dendl;
   return 0;
 }
 
index e15c754d1b491e73cf30d02560f6372a2bd2cd75..1eccb89dad33a858e0372aff351d3b850cc0bcbc 100644 (file)
@@ -170,13 +170,13 @@ class RadosStore : public StoreDriver {
                       RGWObjVersionTracker* objv_tracker,
                       optional_yield y,
                       const DoutPrefixProvider* dpp) override;
-    int write_topic_v2(const rgw_pubsub_topic& topic,
-                       RGWObjVersionTracker* objv_tracker,
+    int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+                       RGWObjVersionTracker& objv_tracker,
                        optional_yield y,
                        const DoutPrefixProvider* dpp) override;
     int remove_topic_v2(const std::string& topic_name,
                         const std::string& tenant,
-                        RGWObjVersionTracker* objv_tracker,
+                        RGWObjVersionTracker& objv_tracker,
                         optional_yield y,
                         const DoutPrefixProvider* dpp) override;
     int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
@@ -193,9 +193,6 @@ class RadosStore : public StoreDriver {
                                  std::set<std::string>& bucket_keys,
                                  optional_yield y,
                                  const DoutPrefixProvider* dpp) override;
-    int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
-                                    optional_yield y,
-                                    const DoutPrefixProvider* dpp) override;
     virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
 
index 0c0e2bbea659d0c23620321ccee597c23a81c929..b264a7c2e776874ac80da3fd4faca63db1c68cdd 100644 (file)
@@ -39,6 +39,7 @@
 #include "rgw_user.h"
 #include "rgw_role.h"
 #include "rgw_pubsub.h"
+#include "topic.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -416,6 +417,13 @@ int RGWCtlDef::init(RGWServices& svc, rgw::sal::Driver* driver, const DoutPrefix
   bucket_meta_handler->init(svc.bucket, bucket.get());
   bi_meta_handler->init(svc.zone, svc.bucket, svc.bi);
 
+  meta.topic_cache = std::make_unique<RGWChainedCacheImpl<rgwrados::topic::cache_entry>>();
+  meta.topic_cache->init(svc.cache);
+
+  meta.topic = rgwrados::topic::create_metadata_handler(
+      *svc.sysobj, svc.cache, *svc.mdlog, svc.zone->get_zone_params(),
+      *meta.topic_cache);
+
   RGWOTPMetadataHandlerBase *otp_handler = static_cast<RGWOTPMetadataHandlerBase *>(meta.otp.get());
   otp_handler->init(svc.zone, svc.meta_be_otp, svc.otp);
 
@@ -449,6 +457,7 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, const DoutPrefixPr
   meta.otp = _ctl.meta.otp.get();
   meta.role = _ctl.meta.role.get();
   meta.topic = _ctl.meta.topic.get();
+  meta.topic_cache = _ctl.meta.topic_cache.get();
 
   user = _ctl.user.get();
   bucket = _ctl.bucket.get();
index 03b37f1ad5a200cfe97039bc7ca41ab5c334ae0d..617516b8a0289f54a4b90d842ff6a621f762c847 100644 (file)
@@ -17,6 +17,8 @@ class RadosStore;
 
 struct RGWServices_Def;
 
+namespace rgwrados::topic { struct cache_entry; }
+
 class RGWServiceInstance
 {
   friend struct RGWServices_Def;
@@ -186,6 +188,9 @@ class RGWUserCtl;
 class RGWBucketCtl;
 class RGWOTPCtl;
 
+template <class T>
+class RGWChainedCacheImpl;
+
 struct RGWCtlDef {
   struct _meta {
     std::unique_ptr<RGWMetadataManager> mgr;
@@ -196,6 +201,8 @@ struct RGWCtlDef {
     std::unique_ptr<RGWMetadataHandler> role;
     std::unique_ptr<RGWMetadataHandler> topic;
 
+    std::unique_ptr<RGWChainedCacheImpl<rgwrados::topic::cache_entry>> topic_cache;
+
     _meta();
     ~_meta();
   } meta;
@@ -225,6 +232,8 @@ struct RGWCtl {
     RGWMetadataHandler *otp{nullptr};
     RGWMetadataHandler *role{nullptr};
     RGWMetadataHandler* topic{nullptr};
+
+    RGWChainedCacheImpl<rgwrados::topic::cache_entry>* topic_cache{nullptr};
   } meta;
 
   RGWUserCtl *user{nullptr};
diff --git a/src/rgw/driver/rados/topic.cc b/src/rgw/driver/rados/topic.cc
new file mode 100644 (file)
index 0000000..3e409e0
--- /dev/null
@@ -0,0 +1,383 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "topic.h"
+#include "common/errno.h"
+#include "rgw_common.h"
+#include "rgw_metadata.h"
+#include "rgw_metadata_lister.h"
+#include "rgw_pubsub.h"
+#include "rgw_rados.h"
+#include "rgw_string.h"
+#include "rgw_tools.h"
+#include "rgw_zone.h"
+#include "svc_mdlog.h"
+#include "svc_sys_obj_cache.h"
+
+namespace rgwrados::topic {
+
+static const std::string oid_prefix = "topic.";
+static constexpr std::string_view buckets_oid_prefix = "buckets.";
+
+static rgw_raw_obj get_topic_obj(const RGWZoneParams& zone,
+                                 std::string_view metadata_key)
+{
+  std::string oid = string_cat_reserve(oid_prefix, metadata_key);
+  return {zone.topics_pool, std::move(oid)};
+}
+
+static rgw_raw_obj get_buckets_obj(const RGWZoneParams& zone,
+                                   std::string_view metadata_key)
+{
+  std::string oid = string_cat_reserve(buckets_oid_prefix, metadata_key);
+  return {zone.topics_pool, std::move(oid)};
+}
+
+
+int read(const DoutPrefixProvider* dpp, optional_yield y,
+         RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc,
+         const RGWZoneParams& zone, const std::string& topic_key,
+         rgw_pubsub_topic& info, RGWChainedCacheImpl<cache_entry>& cache,
+         ceph::real_time* pmtime, RGWObjVersionTracker* pobjv)
+{
+  if (auto e = cache.find(topic_key)) {
+    if (pmtime) {
+      *pmtime = e->mtime;
+    }
+    if (pobjv) {
+      *pobjv = std::move(e->objv);
+    }
+    info = std::move(e->info);
+    return 0;
+  }
+
+  const rgw_raw_obj obj = get_topic_obj(zone, topic_key);
+
+  bufferlist bl;
+  cache_entry entry;
+  rgw_cache_entry_info cache_info;
+  int r = rgw_get_system_obj(&sysobj, obj.pool, obj.oid, bl, &entry.objv,
+                             &entry.mtime, y, dpp, nullptr, &cache_info);
+  if (r < 0) {
+    return r;
+  }
+
+  try {
+    auto p = bl.cbegin();
+    decode(entry.info, p);
+  } catch (const buffer::error&) {
+    return -EIO;
+  }
+
+  cache.put(dpp, cache_svc, topic_key, &entry, {&cache_info});
+
+  if (pmtime) {
+    *pmtime = entry.mtime;
+  }
+  if (pobjv) {
+    *pobjv = std::move(entry.objv);
+  }
+  info = std::move(entry.info);
+  return 0;
+}
+
+int write(const DoutPrefixProvider* dpp, optional_yield y,
+          RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+          const rgw_pubsub_topic& info, RGWObjVersionTracker& objv,
+          ceph::real_time mtime, bool exclusive)
+{
+  const std::string topic_key = get_topic_metadata_key(info.user.tenant, info.name);
+  const rgw_raw_obj obj = get_topic_obj(zone, topic_key);
+
+  bufferlist bl;
+  encode(info, bl);
+
+  int r = rgw_put_system_obj(dpp, &sysobj, obj.pool, obj.oid,
+                             bl, exclusive, &objv, mtime, y);
+  if (r < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to write topic obj " << obj.oid
+        << " with: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  // record in the mdlog on success
+  if (mdlog) {
+    return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv);
+  }
+  return 0;
+}
+
+int remove(const DoutPrefixProvider* dpp, optional_yield y,
+           RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+           const std::string& topic_key, RGWObjVersionTracker& objv)
+{
+  // delete topic info
+  const rgw_raw_obj topic = get_topic_obj(zone, topic_key);
+  int r = rgw_delete_system_obj(dpp, &sysobj, topic.pool, topic.oid, &objv, y);
+  if (r < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to remove topic obj "
+        << topic.oid << " with: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  // delete the buckets object
+  const rgw_raw_obj buckets = get_buckets_obj(zone, topic_key);
+  r = rgw_delete_system_obj(dpp, &sysobj, buckets.pool,
+                            buckets.oid, nullptr, y);
+  if (r < 0) {
+    ldpp_dout(dpp, 20) << "WARNING: failed to remove topic buckets obj "
+        << buckets.oid << " with: " << cpp_strerror(r) << dendl;
+  } // not fatal
+
+  // record in the mdlog on success
+  if (mdlog) {
+    return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv);
+  }
+  return 0;
+}
+
+
+int link_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+                librados::Rados& rados, const RGWZoneParams& zone,
+                const std::string& topic_key,
+                const std::string& bucket_key)
+{
+  const rgw_raw_obj obj = get_buckets_obj(zone, topic_key);
+
+  rgw_rados_ref ref;
+  int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectWriteOperation op;
+  op.omap_set({{bucket_key, bufferlist{}}});
+
+  return rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, y);
+}
+
+int unlink_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+                  librados::Rados& rados, const RGWZoneParams& zone,
+                  const std::string& topic_key,
+                  const std::string& bucket_key)
+{
+  const rgw_raw_obj obj = get_buckets_obj(zone, topic_key);
+
+  rgw_rados_ref ref;
+  int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectWriteOperation op;
+  op.omap_rm_keys({{bucket_key}});
+
+  return rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, y);
+}
+
+int list_buckets(const DoutPrefixProvider* dpp, optional_yield y,
+                 librados::Rados& rados, const RGWZoneParams& zone,
+                 const std::string& topic_key,
+                 const std::string& marker, int max_items,
+                 std::set<std::string>& bucket_keys,
+                 std::string& next_marker)
+{
+  const rgw_raw_obj obj = get_buckets_obj(zone, topic_key);
+
+  rgw_rados_ref ref;
+  int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectReadOperation op;
+  std::set<std::string> keys;
+  bool more = false;
+  int rval = 0;
+  op.omap_get_keys2(marker, max_items, &keys, &more, &rval);
+  r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, nullptr, y);
+  if (r == -ENOENT) {
+    return 0;
+  }
+  if (r < 0) {
+    return r;
+  }
+  if (rval < 0) {
+    return rval;
+  }
+
+  if (more && !keys.empty()) {
+    next_marker = *keys.rbegin();
+  } else {
+    next_marker.clear();
+  }
+  bucket_keys.merge(std::move(keys));
+
+  return 0;
+}
+
+
+class MetadataObject : public RGWMetadataObject {
+  rgw_pubsub_topic info;
+public:
+  MetadataObject(const rgw_pubsub_topic& info, const obj_version& v, real_time m)
+    : RGWMetadataObject(v, m), info(info) {}
+
+  void dump(Formatter *f) const override {
+    info.dump(f);
+  }
+
+  rgw_pubsub_topic& get_topic_info() {
+    return info;
+  }
+};
+
+class MetadataLister : public RGWMetadataLister {
+ public:
+  using RGWMetadataLister::RGWMetadataLister;
+
+  virtual void filter_transform(std::vector<std::string>& oids,
+                                std::list<std::string>& keys) {
+    // remove the oid prefix from keys
+    constexpr auto trim = [] (const std::string& oid) {
+      return oid.substr(oid_prefix.size());
+    };
+    std::transform(oids.begin(), oids.end(),
+                   std::back_inserter(keys),
+                   trim);
+  }
+};
+
+class MetadataHandler : public RGWMetadataHandler {
+  RGWSI_SysObj& sysobj;
+  RGWSI_SysObj_Cache* cache_svc;
+  RGWSI_MDLog& mdlog;
+  const RGWZoneParams& zone;
+  RGWChainedCacheImpl<cache_entry>& cache;
+ public:
+  MetadataHandler(RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc,
+                  RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+                  RGWChainedCacheImpl<cache_entry>& cache)
+    : sysobj(sysobj), cache_svc(cache_svc), mdlog(mdlog),
+      zone(zone), cache(cache)
+  {}
+
+  std::string get_type() final { return "topic";  }
+
+  RGWMetadataObject* get_meta_obj(JSONObj *jo,
+                                  const obj_version& objv,
+                                  const ceph::real_time& mtime) override
+  {
+    rgw_pubsub_topic info;
+
+    try {
+      info.decode_json(jo);
+    } catch (JSONDecoder:: err& e) {
+      return nullptr;
+    }
+
+    return new MetadataObject(info, objv, mtime);
+  }
+
+  int get(std::string& entry, RGWMetadataObject** obj,
+          optional_yield y, const DoutPrefixProvider* dpp) override
+  {
+    cache_entry e;
+    int ret = read(dpp, y, sysobj, cache_svc, zone, entry,
+                   e.info, cache, &e.mtime, &e.objv);
+    if (ret < 0) {
+      return ret;
+    }
+
+    *obj = new MetadataObject(e.info, e.objv.read_version, e.mtime);
+    return 0;
+  }
+
+  int put(std::string& entry, RGWMetadataObject* obj,
+          RGWObjVersionTracker& objv_tracker,
+          optional_yield y, const DoutPrefixProvider* dpp,
+          RGWMDLogSyncType type, bool from_remote_zone) override
+  {
+    auto robj = static_cast<MetadataObject*>(obj);
+    auto& info = robj->get_topic_info();
+    auto mtime = robj->get_mtime();
+
+    constexpr bool exclusive = false;
+    int ret = write(dpp, y, sysobj, &mdlog, zone, info,
+                    objv_tracker, mtime, exclusive);
+    return ret < 0 ? ret : STATUS_APPLIED;
+  }
+
+  int remove(std::string& entry, RGWObjVersionTracker& objv_tracker,
+             optional_yield y, const DoutPrefixProvider *dpp) override
+  {
+    return topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker);
+  }
+
+  int mutate(const std::string& entry, const ceph::real_time& mtime,
+             RGWObjVersionTracker* objv_tracker, optional_yield y,
+             const DoutPrefixProvider* dpp, RGWMDLogStatus op_type,
+             std::function<int()> f) override
+  {
+    return -ENOTSUP; // unused
+  }
+
+  int list_keys_init(const DoutPrefixProvider* dpp,
+                     const std::string& marker,
+                     void** phandle) override
+  {
+    const auto& pool = zone.topics_pool;
+    auto lister = std::make_unique<MetadataLister>(sysobj.get_pool(pool));
+    int ret = lister->init(dpp, marker, oid_prefix);
+    if (ret < 0) {
+      return ret;
+    }
+    *phandle = lister.release(); // release ownership
+    return 0;
+  }
+
+  int list_keys_next(const DoutPrefixProvider* dpp,
+                     void* handle, int max,
+                     std::list<std::string>& keys,
+                     bool* truncated) override
+  {
+    auto lister = static_cast<RGWMetadataLister*>(handle);
+    return lister->get_next(dpp, max, keys, truncated);
+  }
+
+  void list_keys_complete(void *handle) override
+  {
+    delete static_cast<RGWMetadataLister*>(handle);
+  }
+
+  std::string get_marker(void *handle) override
+  {
+    auto lister = static_cast<RGWMetadataLister*>(handle);
+    return lister->get_marker();
+  }
+};
+
+
+auto create_metadata_handler(RGWSI_SysObj& sysobj,
+                             RGWSI_SysObj_Cache* cache_svc,
+                             RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+                             RGWChainedCacheImpl<cache_entry>& cache)
+    -> std::unique_ptr<RGWMetadataHandler>
+{
+  return std::make_unique<MetadataHandler>(sysobj, cache_svc, mdlog,
+                                           zone, cache);
+}
+
+} // rgwrados::topic
diff --git a/src/rgw/driver/rados/topic.h b/src/rgw/driver/rados/topic.h
new file mode 100644 (file)
index 0000000..3799d00
--- /dev/null
@@ -0,0 +1,94 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <set>
+#include <string>
+#include "include/rados/librados_fwd.hpp"
+#include "common/ceph_time.h"
+#include "rgw_pubsub.h"
+
+class DoutPrefixProvider;
+class optional_yield;
+class RGWMetadataHandler;
+class RGWObjVersionTracker;
+class RGWSI_MDLog;
+class RGWSI_SysObj;
+class RGWSI_SysObj_Cache;
+class RGWZoneParams;
+
+template <typename T> class RGWChainedCacheImpl;
+
+// Rados interface for v2 topic metadata
+namespace rgwrados::topic {
+
+struct cache_entry {
+  rgw_pubsub_topic info;
+  RGWObjVersionTracker objv;
+  ceph::real_time mtime;
+};
+
+/// Read topic info by metadata key.
+int read(const DoutPrefixProvider* dpp, optional_yield y,
+         RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc,
+         const RGWZoneParams& zone, const std::string& topic_key,
+         rgw_pubsub_topic& info, RGWChainedCacheImpl<cache_entry>& cache,
+         ceph::real_time* pmtime = nullptr,
+         RGWObjVersionTracker* pobjv = nullptr);
+
+/// Write or overwrite topic info.
+int write(const DoutPrefixProvider* dpp, optional_yield y,
+          RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+          const rgw_pubsub_topic& info, RGWObjVersionTracker& objv,
+          ceph::real_time mtime, bool exclusive);
+
+/// Remove a topic by metadata key.
+int remove(const DoutPrefixProvider* dpp, optional_yield y,
+           RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog,
+           const RGWZoneParams& zone, const std::string& topic_key,
+           RGWObjVersionTracker& objv);
+
+
+/// Add a bucket key to the topic's list of buckets.
+int link_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+                librados::Rados& rados, const RGWZoneParams& zone,
+                const std::string& topic_key,
+                const std::string& bucket_key);
+
+/// Remove a bucket key from the topic's list of buckets.
+int unlink_bucket(const DoutPrefixProvider* dpp, optional_yield y,
+                  librados::Rados& rados, const RGWZoneParams& zone,
+                  const std::string& topic_key,
+                  const std::string& bucket_key);
+
+/// List the bucket keys associated with a given topic.
+int list_buckets(const DoutPrefixProvider* dpp, optional_yield y,
+                 librados::Rados& rados, const RGWZoneParams& zone,
+                 const std::string& topic_key,
+                 const std::string& marker, int max_items,
+                 std::set<std::string>& bucket_keys,
+                 std::string& next_marker);
+
+
+/// Topic metadata handler factory.
+auto create_metadata_handler(RGWSI_SysObj& sysobj,
+                             RGWSI_SysObj_Cache* cache_svc,
+                             RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+                             RGWChainedCacheImpl<cache_entry>& cache)
+    -> std::unique_ptr<RGWMetadataHandler>;
+
+} // rgwrados::topic
index 474a7c23163427ed8a0ef412d9fe2f088071517d..18c604978e8203855dfa5b791cf42ce736e3754a 100644 (file)
@@ -945,7 +945,9 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
                             const rgw_pubsub_topic& topic,
                             optional_yield y) const {
   RGWObjVersionTracker objv_tracker;
-  auto ret = driver->write_topic_v2(topic, &objv_tracker, y, dpp);
+  objv_tracker.generate_new_write_ver(dpp->get_cct());
+  constexpr bool exclusive = false;
+  auto ret = driver->write_topic_v2(topic, exclusive, objv_tracker, y, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to write topic info: ret=" << ret
                       << dendl;
@@ -1012,13 +1014,12 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
                        << dendl;
     return 0;
   }
-  ret = driver->remove_topic_v2(name, tenant, &objv_tracker, y, dpp);
+  ret = driver->remove_topic_v2(name, tenant, objv_tracker, y, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
                       << dendl;
     return ret;
   }
-  ret = driver->delete_bucket_topic_mapping(topic, y, dpp);
   return ret;
 }
 
index f1bc455835f6b1c709f01b8cb0f714b2fcddfa1b..060cfc1e3513bad94b81ec8c1994cdd083c72f15 100644 (file)
@@ -321,15 +321,15 @@ class Driver {
                               RGWObjVersionTracker* objv_tracker,
                               optional_yield y,
                               const DoutPrefixProvider* dpp) = 0;
-    /** Write topic info and (optionally) @a objv_tracker into the config */
-    virtual int write_topic_v2(const rgw_pubsub_topic& topic,
-                               RGWObjVersionTracker* objv_tracker,
+    /** Write topic info and @a objv_tracker into the config */
+    virtual int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+                               RGWObjVersionTracker& objv_tracker,
                                optional_yield y,
                                const DoutPrefixProvider* dpp) = 0;
     /** Remove the topic config, optionally a specific version */
     virtual int remove_topic_v2(const std::string& topic_name,
                                 const std::string& tenant,
-                                RGWObjVersionTracker* objv_tracker,
+                                RGWObjVersionTracker& objv_tracker,
                                 optional_yield y,
                                 const DoutPrefixProvider* dpp) = 0;
     /** Update the bucket-topic mapping in the store, if |add_mapping|=true then
@@ -356,10 +356,6 @@ class Driver {
                                          std::set<std::string>& bucket_keys,
                                          optional_yield y,
                                          const DoutPrefixProvider* dpp) = 0;
-    /** Remove the bucket-topic mapping from the backend store. */
-    virtual int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
-                                            optional_yield y,
-                                            const DoutPrefixProvider* dpp) = 0;
     /** Get access to the lifecycle management thread */
     virtual RGWLC* get_rgwlc(void) = 0;
     /** Get access to the coroutine registry.  Used to create new coroutine managers */
index 8bb704ce17c1b2f94cdf37748faf128da9d457f4..83832922f90ddd53cfb33e17644cf9e13f23a869 100644 (file)
@@ -202,15 +202,15 @@ public:
                     const DoutPrefixProvider* dpp) override {
     return next->read_topic_v2(topic_name, tenant, topic, objv_tracker, y, dpp);
   }
-  int write_topic_v2(const rgw_pubsub_topic& topic,
-                     RGWObjVersionTracker* objv_tracker,
+  int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+                     RGWObjVersionTracker& objv_tracker,
                      optional_yield y,
                      const DoutPrefixProvider* dpp) override {
-    return next->write_topic_v2(topic, objv_tracker, y, dpp);
+    return next->write_topic_v2(topic, exclusive, objv_tracker, y, dpp);
   }
   int remove_topic_v2(const std::string& topic_name,
                       const std::string& tenant,
-                      RGWObjVersionTracker* objv_tracker,
+                      RGWObjVersionTracker& objv_tracker,
                       optional_yield y,
                       const DoutPrefixProvider* dpp) override {
     return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
@@ -237,11 +237,6 @@ public:
                                const DoutPrefixProvider* dpp) override {
     return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
   }
-  int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
-                                  optional_yield y,
-                                  const DoutPrefixProvider* dpp) override {
-    return next->delete_bucket_topic_mapping(topic, y, dpp);
-  }
   virtual RGWLC* get_rgwlc(void) override;
   virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
 
index b34276a9daaf8fe2512a028932939fd1ba7fde80..f0ac762554e64a9ec1f414e7b21b9c3c9e8e2d5e 100644 (file)
@@ -42,15 +42,15 @@ class StoreDriver : public Driver {
                       const DoutPrefixProvider* dpp) override {
       return -EOPNOTSUPP;
     }
-    int write_topic_v2(const rgw_pubsub_topic& topic,
-                       RGWObjVersionTracker* objv_tracker,
+    int write_topic_v2(const rgw_pubsub_topic& topic, bool exclusive,
+                       RGWObjVersionTracker& objv_tracker,
                        optional_yield y,
                        const DoutPrefixProvider* dpp) override {
       return -EOPNOTSUPP;
     }
     int remove_topic_v2(const std::string& topic_name,
                         const std::string& tenant,
-                        RGWObjVersionTracker* objv_tracker,
+                        RGWObjVersionTracker& objv_tracker,
                         optional_yield y,
                         const DoutPrefixProvider* dpp) override {
       return -EOPNOTSUPP;
@@ -75,11 +75,6 @@ class StoreDriver : public Driver {
                                  const DoutPrefixProvider* dpp) override {
       return -EOPNOTSUPP;
     }
-    int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
-                                    optional_yield y,
-                                    const DoutPrefixProvider* dpp) override {
-      return -EOPNOTSUPP;
-    }
 };
 
 class StoreUser : public User {