]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite-notification: Add omap object to store the mapping between bucket and...
authorkchheda3 <kchheda3@bloomberg.net>
Thu, 21 Dec 2023 22:23:05 +0000 (17:23 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 5 Mar 2024 17:55:24 +0000 (12:55 -0500)
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
15 files changed:
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h
src/rgw/services/svc_topic_rados.cc
src/rgw/services/svc_topic_rados.h

index 9f5565531829474f8013358ad23e13d14f14bfae..ce869f399d6e2a656468b63db105ccd8a04cef0b 100644 (file)
@@ -1336,28 +1336,6 @@ static int bucket_stats(rgw::sal::Driver* driver,
     }
   }
 
-  // bucket notifications
-  rgw_pubsub_bucket_topics result;
-  if (driver->get_zone()->get_zonegroup().supports_feature(
-          rgw::zone_features::notification_v2)) {
-    ret = get_bucket_notifications(dpp, bucket.get(), result);
-    if (ret < 0) {
-      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
-           << std::endl;
-      return -ret;
-    }
-  } else {
-    RGWPubSub ps(driver, tenant_name);
-    const RGWPubSub::Bucket b(ps, bucket.get());
-    ret = b.get_topics(dpp, result, y);
-    if (ret < 0 && ret != -ENOENT) {
-      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
-           << std::endl;
-      return -ret;
-    }
-  }
-  result.dump(formatter);
-
   // TODO: bucket CORS
   // TODO: bucket LC
   formatter->close_section();
@@ -2132,6 +2110,92 @@ int RGWMetadataHandlerPut_Bucket::put_post(const DoutPrefixProvider *dpp)
   return ret;
 }
 
+int update_bucket_topic_mappings(const DoutPrefixProvider* dpp,
+                                 RGWBucketCompleteInfo* orig_bci,
+                                 RGWBucketCompleteInfo* current_bci,
+                                 rgw::sal::Driver* driver) {
+  const auto decode_attrs = [](const rgw::sal::Attrs& attrs,
+                               rgw_pubsub_bucket_topics& bucket_topics) -> int {
+    auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+    if (iter == attrs.end()) {
+      return 0;
+    }
+    try {
+      const auto& bl = iter->second;
+      auto biter = bl.cbegin();
+      bucket_topics.decode(biter);
+    } catch (buffer::error& err) {
+      return -EIO;
+    }
+    return 0;
+  };
+  std::string bucket_name;
+  std::string bucket_tenant;
+  rgw_pubsub_bucket_topics old_bucket_topics;
+  if (orig_bci) {
+    auto ret = decode_attrs(orig_bci->attrs, old_bucket_topics);
+    if (ret < 0) {
+      ldpp_dout(dpp, 1)
+          << "ERROR: failed to decode OLD bucket topics for bucket: "
+          << orig_bci->info.bucket.name << dendl;
+      return ret;
+    }
+    bucket_name = orig_bci->info.bucket.name;
+    bucket_tenant = orig_bci->info.bucket.tenant;
+  }
+  rgw_pubsub_bucket_topics current_bucket_topics;
+  if (current_bci) {
+    auto ret = decode_attrs(current_bci->attrs, current_bucket_topics);
+    if (ret < 0) {
+      ldpp_dout(dpp, 1)
+          << "ERROR: failed to decode current bucket topics for bucket: "
+          << current_bci->info.bucket.name << dendl;
+      return ret;
+    }
+    bucket_name = current_bci->info.bucket.name;
+    bucket_tenant = current_bci->info.bucket.tenant;
+  }
+  // fetch the list of subscribed topics stored inside old_bucket attrs.
+  std::unordered_map<std::string, rgw_pubsub_topic> old_topics;
+  for (const auto& [_, topic_filter] : old_bucket_topics.topics) {
+    old_topics[topic_filter.topic.name] = topic_filter.topic;
+  }
+  // fetch the list of subscribed topics stored inside current_bucket attrs.
+  std::unordered_map<std::string, rgw_pubsub_topic> current_topics;
+  for (const auto& [_, topic_filter] : current_bucket_topics.topics) {
+    current_topics[topic_filter.topic.name] = topic_filter.topic;
+  }
+  // traverse thru old topics and check if they are not in current, then delete
+  // the mapping, if present in both current and old then delete from current
+  // set as we do not need to update those mapping.
+  int ret = 0;
+  for (const auto& [topic_name, topic] : old_topics) {
+    auto it = current_topics.find(topic_name);
+    if (it == current_topics.end()) {
+      const auto op_ret = driver->update_bucket_topic_mapping(
+          topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
+          /*add_mapping=*/false, null_yield, dpp);
+      if (op_ret < 0) {
+        ret = op_ret;
+      }
+    } else {
+      // already that attr is present, so do not update the mapping.
+      current_topics.erase(it);
+    }
+  }
+  // traverse thru current topics and check if they are any present, then add
+  // the mapping.
+  for (const auto& [topic_name, topic] : current_topics) {
+    const auto op_ret = driver->update_bucket_topic_mapping(
+        topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
+        /*add_mapping=*/true, null_yield, dpp);
+    if (op_ret < 0) {
+      ret = op_ret;
+    }
+  }
+  return ret;
+}
+
 static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
 
    char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
@@ -2443,7 +2507,14 @@ public:
     if (ret < 0 && ret != -ENOENT)
       return ret;
 
-    return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
+    ret = svc.bucket->remove_bucket_instance_info(
+        ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
+    if (ret < 0)
+      return ret;
+    ret = update_bucket_topic_mappings(dpp, &bci, /*current_bci=*/nullptr,
+                                       driver);
+    // update_bucket_topic_mapping error is swallowed.
+    return 0;
   }
 
   int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
@@ -2648,6 +2719,21 @@ int RGWMetadataHandlerPut_BucketInstance::put_post(const DoutPrefixProvider *dpp
     }
   } /* update lc */
 
+  /* update bucket topic mapping */
+  {
+    auto* orig_obj = static_cast<RGWBucketInstanceMetadataObject*>(old_obj);
+    auto* orig_bci = (orig_obj ? &orig_obj->get_bci() : nullptr);
+    ret = update_bucket_topic_mappings(dpp, orig_bci, &bci, bihandler->driver);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << __func__
+                        << " failed to apply bucket topic mapping for "
+                        << bci.info.bucket.name << dendl;
+      return ret;
+    }
+    ldpp_dout(dpp, 20) << __func__
+                       << " successfully applied bucket topic mapping for "
+                       << bci.info.bucket.name << dendl;
+  }
   return STATUS_APPLIED;
 }
 
index e5d5dd602f598bcb7049a4d9318650ebca51c787..e3ee61ba9df0d95c876dea119ef8ff3853b63a0e 100644 (file)
@@ -494,7 +494,8 @@ private:
           RGWPubSub ps(&rados_store, tenant_name);
 
           rgw_pubsub_topic topic;
-          auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield));
+          auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
+                           optional_yield(io_context, yield), nullptr);
           if (ret_of_get_topic < 0) {
             // we can't migrate entries without topic info
             ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
@@ -1056,7 +1057,7 @@ static inline bool notification_match(reservation_t& res,
     const RGWPubSub ps(
         res.store, res.user_tenant,
         &res.store->svc()->zone->get_current_period().get_map().zonegroups);
-    auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield);
+    auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
     if (ret < 0) {
       ldpp_dout(res.dpp, 1)
           << "INFO: failed to load topic: " << topic_cfg.name
index 5e111103c1c2663f4c68c135f63231b39dae2e2a..b8fcfa2f368fb04b4475695125e2f02ffaf4d4c3 100644 (file)
@@ -1211,6 +1211,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
   if (ret < 0)
     return ret;
 
+  ret = open_topics_pool_ctx(dpp);
+  if (ret < 0)
+    return ret;
+
   pools_initialized = true;
 
   if (use_gc) {
@@ -1446,6 +1450,12 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp)
   return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
 }
 
+int RGWRados::open_topics_pool_ctx(const DoutPrefixProvider* dpp) {
+  return rgw_init_ioctx(dpp, get_rados_handle(),
+                        svc.zone->get_zone_params().topics_pool,
+                        topics_pool_ctx, true, true);
+}
+
 int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
                            bool mostly_omap, bool bulk)
 {
index 264f5eb4f33a1ef3f9e3963e74f81a3206f2c460..7e7a58480a6534d560fc3467150434dba79120cd 100644 (file)
@@ -358,6 +358,7 @@ class RGWRados
   int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
   int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
   int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
+  int open_topics_pool_ctx(const DoutPrefixProvider* dpp);
 
   int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx&  io_ctx,
                    bool mostly_omap, bool bulk);
@@ -447,6 +448,7 @@ protected:
   librados::IoCtx objexp_pool_ctx;
   librados::IoCtx reshard_pool_ctx;
   librados::IoCtx notif_pool_ctx;     // .rgw.notif
+  librados::IoCtx topics_pool_ctx;  // .rgw.meta:topics
 
   bool pools_initialized{false};
 
@@ -533,6 +535,8 @@ public:
     return notif_pool_ctx;
   }
 
+  librados::IoCtx& get_topics_pool_ctx() { return topics_pool_ctx; }
+  
   void set_context(CephContext *_cct) {
     cct = _cct;
   }
index 608800c7e573edb077cb84d904c9c00e05f46601..2bde7d192e5ac296f408e59d93792a8732db2763 100644 (file)
@@ -325,6 +325,30 @@ int RadosBucket::remove(const DoutPrefixProvider* dpp,
       this, get_attrs(), merge_attrs);
   }
 
+  // remove bucket-topic mapping
+  auto iter = get_attrs().find(RGW_ATTR_BUCKET_NOTIFICATION);
+  if (iter != get_attrs().end()) {
+    rgw_pubsub_bucket_topics bucket_topics;
+    try {
+      const auto& bl = iter->second;
+      auto biter = bl.cbegin();
+      bucket_topics.decode(biter);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to decode bucket topics for bucket: "
+                        << get_name() << dendl;
+    }
+    if (!bucket_topics.topics.empty()) {
+      ret = store->remove_bucket_mapping_from_topics(
+          bucket_topics, rgw_make_bucket_entry_name(get_tenant(), get_name()),
+          y, dpp);
+      if (ret < 0) {
+        ldpp_dout(dpp, 1)
+            << "ERROR: unable to remove notifications from bucket "
+            << get_name() << ". ret=" << ret << dendl;
+      }
+    }
+  }
+
   ret = store->ctl()->bucket->sync_user_stats(dpp, info.owner, info, y, nullptr);
   if (ret < 0) {
      ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" <<  ret << dendl;
@@ -1172,6 +1196,114 @@ int RadosStore::remove_topic_v2(const std::string& topic_name,
                                            params, objv_tracker, y, dpp);
 }
 
+int RadosStore::remove_bucket_mapping_from_topics(
+    const rgw_pubsub_bucket_topics& bucket_topics,
+    const std::string& bucket_key,
+    optional_yield y,
+    const DoutPrefixProvider* dpp) {
+  // remove the bucket name from  the topic-bucket omap for each topic
+  // subscribed.
+  std::unordered_set<std::string> topics_mapping_to_remove;
+  int ret = 0;
+  for (const auto& [_, topic_filter] : bucket_topics.topics) {
+    if (!topics_mapping_to_remove.insert(topic_filter.topic.name).second) {
+      continue;  // already removed.
+    }
+    int op_ret = update_bucket_topic_mapping(topic_filter.topic, bucket_key,
+                                             /*add_mapping=*/false, y, dpp);
+    if (op_ret < 0) {
+      ret = op_ret;
+    }
+  }
+  return ret;
+}
+
+int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                            const std::string& bucket_key,
+                                            bool add_mapping,
+                                            optional_yield y,
+                                            const DoutPrefixProvider* dpp) {
+  bufferlist empty_bl;
+  librados::ObjectWriteOperation op;
+  int ret = 0;
+  if (add_mapping) {
+    std::map<std::string, bufferlist> mapping{{bucket_key, empty_bl}};
+    op.omap_set(mapping);
+  } else {
+    std::set<std::string> to_rm{{bucket_key}};
+    op.omap_rm_keys(to_rm);
+  }
+  ret = rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+                          get_bucket_topic_mapping_oid(topic), &op, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to " << (add_mapping ? "add" : "remove")
+                      << " topic bucket mapping for bucket: " << bucket_key
+                      << " and topic: " << topic.name << " with ret:" << ret << dendl;
+    return ret;
+  }
+  ldpp_dout(dpp, 20) << "Successfully " << (add_mapping ? "added" : "removed")
+                     << " topic bucket mapping for bucket: " << bucket_key
+                     << " and topic: " << topic.name << dendl;
+  return ret;
+}
+
+int RadosStore::get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                         std::set<std::string>& bucket_keys,
+                                         optional_yield y,
+                                         const DoutPrefixProvider* dpp) {
+  constexpr auto max_chunk = 1024U;
+  std::string start_after;
+  bool more = true;
+  int rval;
+  while (more) {
+    librados::ObjectReadOperation op;
+    std::set<std::string> curr_keys;
+    op.omap_get_keys2(start_after, max_chunk, &curr_keys, &more, &rval);
+    const auto ret =
+        rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+                          get_bucket_topic_mapping_oid(topic), &op, nullptr, y);
+    if (ret == -ENOENT) {
+      // mapping object was not created - nothing to do
+      return 0;
+    }
+    if (ret < 0) {
+      // TODO: do we need to check on rval as well as ret?
+      ldpp_dout(dpp, 1)
+          << "ERROR: failed to read bucket topic mapping object for topic: "
+          << topic.name << ", ret= " << ret << dendl;
+      return ret;
+    }
+    if (more) {
+      if (curr_keys.empty()) {
+        return -EINVAL;  // something wrong.
+      }
+      start_after = *curr_keys.rbegin();
+    }
+    bucket_keys.merge(curr_keys);
+  }
+  return 0;
+}
+
+int RadosStore::delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                            optional_yield y,
+                                            const DoutPrefixProvider* dpp) {
+  librados::ObjectWriteOperation op;
+  op.remove();
+  const int ret =
+      rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+                        get_bucket_topic_mapping_oid(topic), &op, y);
+  if (ret < 0 && ret != -ENOENT) {
+    ldpp_dout(dpp, 1)
+        << "ERROR: failed removing bucket topic mapping omap for topic: "
+        << topic.name << ", ret=" << ret << dendl;
+    return ret;
+  }
+  ldpp_dout(dpp, 20)
+      << "Successfully deleted topic bucket mapping omap for topic: "
+      << topic.name << dendl;
+  return 0;
+}
+
 int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
 {
   return rados->delete_raw_obj(dpp, obj, y);
index 33db603d785acbff5822155f11cb9a82bd11d5bf..85612eec1a9ba58a078c5ea98eedee3bb5a28d63 100644 (file)
@@ -184,6 +184,23 @@ class RadosStore : public StoreDriver {
                         RGWObjVersionTracker* objv_tracker,
                         optional_yield y,
                         const DoutPrefixProvider* dpp) override;
+    int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                    const std::string& bucket_key,
+                                    bool add_mapping,
+                                    optional_yield y,
+                                    const DoutPrefixProvider* dpp) override;
+    int remove_bucket_mapping_from_topics(
+        const rgw_pubsub_bucket_topics& bucket_topics,
+        const std::string& bucket_key,
+        optional_yield y,
+        const DoutPrefixProvider* dpp) override;
+    int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                 std::set<std::string>& bucket_keys,
+                                 optional_yield y,
+                                 const DoutPrefixProvider* dpp) override;
+    int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                    optional_yield y,
+                                    const DoutPrefixProvider* dpp) override;
     virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
 
index afb161c90aeb5c90028a10afa687dff62a635125..bbd9231893fb1f1a61e303714c9d6462f1785cff 100644 (file)
@@ -1168,6 +1168,15 @@ static void show_reshard_status(
   formatter->flush(cout);
 }
 
+static void show_topics_info_v2(const rgw_pubsub_topic& topic,
+                                std::set<std::string> subscribed_buckets,
+                                Formatter* formatter) {
+  formatter->open_object_section("topic");
+  topic.dump(formatter);
+  encode_json("subscribed_buckets", subscribed_buckets, formatter);
+  formatter->close_section();
+}
+
 class StoreDestructor {
   rgw::sal::Driver* driver;
 public:
@@ -10665,7 +10674,24 @@ next:
         }
       }
     }
-    encode_json("result", result, formatter.get());
+    if (driver->get_zone()->get_zonegroup().supports_feature(
+            rgw::zone_features::notification_v2)) {
+      Formatter::ObjectSection top_section(*formatter, "result");
+      Formatter::ArraySection s(*formatter, "topics");
+      for (const auto& [_, topic] : result.topics) {
+        std::set<std::string> subscribed_buckets;
+        ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
+                                               null_yield, dpp());
+        if (ret < 0) {
+          cerr << "failed to fetch bucket topic mapping info for topic: "
+               << topic.name << ", ret=" << ret << std::endl;
+        } else {
+          show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+        }
+      }
+    } else {
+      encode_json("result", result, formatter.get());
+    }
     formatter->flush(cout);
   }
 
@@ -10683,12 +10709,19 @@ next:
     RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
 
     rgw_pubsub_topic topic;
-    ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
+    std::set<std::string> subscribed_buckets;
+    ret =
+        ps.get_topic(dpp(), topic_name, topic, null_yield, &subscribed_buckets);
     if (ret < 0) {
       cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-    encode_json("topic", topic, formatter.get());
+    if (driver->get_zone()->get_zonegroup().supports_feature(
+            rgw::zone_features::notification_v2)) {
+      show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+    } else {
+      encode_json("topic", topic, formatter.get());
+    }
     formatter->flush(cout);
   }
 
index 628f57901afed84b208418e71f171e9b03cdb26b..dec38ee87e878b3f92d6044f82475a0494eef116 100644 (file)
@@ -9,7 +9,9 @@
 #include "rgw_xml.h"
 #include "rgw_arn.h"
 #include "rgw_pubsub_push.h"
+#include "rgw_bucket.h"
 #include "common/errno.h"
+#include "svc_topic_rados.h"
 #include <regex>
 #include <algorithm>
 
@@ -369,6 +371,7 @@ void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
   encode_xml_key_value_entry("TopicArn", arn, f);
   encode_xml_key_value_entry("OpaqueData", opaque_data, f);
   encode_xml_key_value_entry("Policy", policy_text, f);
+  std::ostringstream stream;
   f->close_section(); // Attributes
 }
 
@@ -526,15 +529,20 @@ int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& res
                 << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
             break;
       }
-      for (auto& topic_name : topics) {
-            rgw_pubsub_topic topic;
-            int ret = get_topic(dpp, topic_name, topic, y);
-            if (ret < 0) {
-              ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name
-                                << "' info: ret=" << ret << dendl;
-              continue;
-            }
-            result.topics[topic_name] = std::move(topic);
+      for (auto& topic_entry : topics) {
+        std::string topic_name;
+        std::string topic_tenant;
+        parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
+        if (tenant != topic_tenant) {
+          continue;
+        }
+        rgw_pubsub_topic topic;
+        const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr);
+        if (op_ret < 0) {
+          ret = op_ret;
+          continue;
+        }
+        result.topics[topic_name] = std::move(topic);
       }
     } while (truncated);
     driver->meta_list_keys_complete(handle);
@@ -583,13 +591,26 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub
   return 0;
 }
 
-int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
-{
+int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
+                         const std::string& name,
+                         rgw_pubsub_topic& result,
+                         optional_yield y,
+                         std::set<std::string>* subscribed_buckets) const {
   if (use_notification_v2) {
-    const int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
+    int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
     if (ret < 0) {
       ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
                         << " tenant: " << tenant << ", ret=" << ret << dendl;
+      return ret;
+    }
+    if (subscribed_buckets) {
+      ret =
+          driver->get_bucket_topic_mapping(result, *subscribed_buckets, y, dpp);
+      if (ret < 0) {
+        ldpp_dout(dpp, 1)
+            << "failed to fetch bucket topic mapping info for topic: " << name
+            << " tenant: " << tenant << ", ret=" << ret << dendl;
+      }
     }
     return ret;
   }
@@ -656,19 +677,56 @@ std::optional<rgw_pubsub_topic_filter> find_unique_topic(
   return std::nullopt;
 }
 
-int delete_all_notifications(const DoutPrefixProvider* dpp,
-                             const rgw_pubsub_bucket_topics& bucket_topics,
-                             std::map<std::string, bufferlist>& attrs,
-                             rgw::sal::Bucket* bucket,
-                             rgw::sal::Driver* driver,
-                             optional_yield y) {
+int store_bucket_attrs_and_update_mapping(
+    const DoutPrefixProvider* dpp,
+    rgw::sal::Driver* driver,
+    rgw::sal::Bucket* bucket,
+    rgw_pubsub_bucket_topics& bucket_topics,
+    const rgw_pubsub_topic& topic,
+    optional_yield y) {
+  rgw::sal::Attrs& attrs = bucket->get_attrs();
+  if (!bucket_topics.topics.empty()) {
+    bufferlist bl;
+    bucket_topics.encode(bl);
+    attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+  } else {
+    auto it = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+    if (it != attrs.end()) {
+      attrs.erase(it);
+    }
+  }
+  auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1)
+        << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
+        << bucket->get_name() << " returned err= " << ret << dendl;
+    return ret;
+  }
+  if (bucket_topics.topics.empty()) {
+    // remove the bucket name from  the topic-bucket omap
+    auto op_ret = driver->update_bucket_topic_mapping(
+        topic,
+        rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()),
+        /*add_mapping=*/false, y, dpp);
+    if (op_ret < 0) {
+      // TODO: should the error be reported, as attrs are already deleted.
+      // ret = op_ret;
+    }
+  }
+  return ret;
+}
+
+int delete_notification_attrs(const DoutPrefixProvider* dpp,
+                              rgw::sal::Bucket* bucket,
+                              optional_yield y) {
+  auto& attrs = bucket->get_attrs();
   auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
   if (iter == attrs.end()) {
     return 0;
   }
   // delete all notifications of on a bucket
   attrs.erase(iter);
-  const auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+  auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
   if (ret < 0) {
     ldpp_dout(dpp, 1)
         << "Failed to remove RGW_ATTR_BUCKET_NOTIFICATION attr on bucket="
@@ -691,11 +749,23 @@ int remove_notification_v2(const DoutPrefixProvider* dpp,
   if (bucket_topics.topics.empty()) {
     return 0;
   }
-  rgw::sal::Attrs& attrs = bucket->get_attrs();
+  // delete all notifications
   if (notification_id.empty()) {
-    return delete_all_notifications(dpp, bucket_topics, attrs, bucket, driver,
-                                    y);
+    ret = delete_notification_attrs(dpp, bucket, y);
+    if (ret < 0) {
+      return ret;
+    }
+    int op_ret = driver->remove_bucket_mapping_from_topics(
+        bucket_topics,
+        rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()), y,
+        dpp);
+    if (op_ret < 0) {
+      // TODO: should the error be reported, as attrs are already deleted.
+      // ret = op_ret;
+    }
+    return ret;
   }
+
   // delete a specific notification
   const auto unique_topic = find_unique_topic(bucket_topics, notification_id);
   if (!unique_topic) {
@@ -706,23 +776,15 @@ int remove_notification_v2(const DoutPrefixProvider* dpp,
   }
   const auto& topic_name = unique_topic->topic.name;
   bucket_topics.topics.erase(topic_to_unique(topic_name, notification_id));
-  bufferlist bl;
-  bucket_topics.encode(bl);
-  attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
-  ret = bucket->merge_and_store_attrs(dpp, attrs, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1)
-        << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
-        << bucket->get_name() << " returned err= " << ret << dendl;
-  }
-  return ret;
+  return store_bucket_attrs_and_update_mapping(
+      dpp, driver, bucket, bucket_topics, unique_topic->topic, y);
 }
 
 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
     const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const {
   rgw_pubsub_topic topic_info;
 
-  int ret = ps.get_topic(dpp, topic_name, topic_info, y);
+  int ret = ps.get_topic(dpp, topic_name, topic_info, y, nullptr);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
     return ret;
@@ -908,7 +970,7 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
                                optional_yield y) const {
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_topic topic;
-  int ret = get_topic(dpp, name, topic, y);
+  int ret = get_topic(dpp, name, topic, y, nullptr);
   if (ret < 0 && ret != -ENOENT) {
     return ret;
   } else if (ret == -ENOENT) {
@@ -922,7 +984,9 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
                       << dendl;
+    return ret;
   }
+  ret = driver->delete_bucket_topic_mapping(topic, y, dpp);
   return ret;
 }
 
index 46da8e045a9f57400b5a024d666f4c4aac4386d0..8509d86e2255b4f739ebe83518e310120dfd73e5 100644 (file)
@@ -602,19 +602,13 @@ public:
     int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
       return read_topics(dpp, result, nullptr, y);
     }
-    // get a bucket_topic with by its name and populate it into "result"
-    // return -ENOENT if the topic does not exists
-    // return 0 on success, error code otherwise
-    int get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, rgw_pubsub_topic_filter& result, optional_yield y) const;
     // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
     // assigning a notification name is optional (needed for S3 compatible notifications)
     // if the topic already exist on the bucket, the filter event list may be updated
     // for S3 compliant notifications the version with: s3_filter and notif_name should be used
     // return -ENOENT if the topic does not exists
     // return 0 on success, error code otherwise
-    int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
-        const rgw::notify::EventTypeList& events, optional_yield y) const;
-    int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, 
+    int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
         const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
     // remove a topic and filter from bucket
     // if the topic does not exists on the bucket it is a no-op (considered success)
@@ -633,9 +627,15 @@ public:
     return read_topics(dpp, result, nullptr, y);
   }
   // get a topic with by its name and populate it into "result"
-  // return -ENOENT if the topic does not exists 
-  // return 0 on success, error code otherwise
-  int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const;
+  // return -ENOENT if the topic does not exists
+  // return 0 on success, error code otherwise.
+  // if |subscribed_buckets| valid, then for notification_v2 read the bucket
+  // topic mapping object.
+  int get_topic(const DoutPrefixProvider* dpp,
+                const std::string& name,
+                rgw_pubsub_topic& result,
+                optional_yield y,
+                std::set<std::string>* subscribed_buckets) const;
   // create a topic with a name only
   // if the topic already exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
index f2e5439208feaa87e06ff07c64526266e9e13160..38943736e4514b6bbdc0598818ae5011abf11946 100644 (file)
@@ -200,7 +200,7 @@ class RGWPSCreateTopicOp : public RGWOp {
     const RGWPubSub ps(driver, s->owner.id.tenant,
                        &s->penv.site->get_period()->get_map().zonegroups);
     rgw_pubsub_topic result;
-    ret = ps.get_topic(this, topic_name, result, y);
+    ret = ps.get_topic(this, topic_name, result, y, nullptr);
     if (ret == -ENOENT) {
       // topic not present
       return 0;
@@ -424,7 +424,7 @@ void RGWPSGetTopicOp::execute(optional_yield y) {
   }
   const RGWPubSub ps(driver, s->owner.id.tenant,
                      &s->penv.site->get_period()->get_map().zonegroups);
-  op_ret = ps.get_topic(this, topic_name, result, y);
+  op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
   if (op_ret < 0) {
     ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
@@ -509,7 +509,7 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
   }
   const RGWPubSub ps(driver, s->owner.id.tenant,
                      &s->penv.site->get_period()->get_map().zonegroups);
-  op_ret = ps.get_topic(this, topic_name, result, y);
+  op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
   if (op_ret < 0) {
     ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
@@ -638,7 +638,7 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
     rgw_pubsub_topic result;
     const RGWPubSub ps(driver, s->owner.id.tenant,
                        &s->penv.site->get_period()->get_map().zonegroups);
-    ret = ps.get_topic(this, topic_name, result, y);
+    ret = ps.get_topic(this, topic_name, result, y, nullptr);
     if (ret < 0) {
       ldpp_dout(this, 1) << "failed to get topic '" << topic_name
                          << "', ret=" << ret << dendl;
@@ -801,7 +801,7 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
                      &s->penv.site->get_period()->get_map().zonegroups);
 
   rgw_pubsub_topic result;
-  op_ret = ps.get_topic(this, topic_name, result, y);
+  op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
   if (op_ret == 0) {
     op_ret = verify_topic_owner_or_policy(
         s, result, driver->get_zone()->get_zonegroup().get_name(),
@@ -1071,8 +1071,8 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
     const auto topic_name = arn->resource;
 
     // get topic information. destination information is stored in the topic
-    rgw_pubsub_topic topic_info;  
-    op_ret = ps.get_topic(this, topic_name, topic_info, y);
+    rgw_pubsub_topic topic_info;
+    op_ret = ps.get_topic(this, topic_name, topic_info, y, nullptr);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
       return;
@@ -1177,7 +1177,6 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
   const RGWPubSub ps(driver, s->owner.id.tenant,
                      &s->penv.site->get_period()->get_map().zonegroups);
   std::unordered_map<std::string, rgw_pubsub_topic> topics;
-  const auto rgwbucket = rgw_bucket(s->bucket_tenant, s->bucket_name, "");
   for (const auto& c : configurations.list) {
     const auto& notif_name = c.id;
     if (notif_name.empty()) {
@@ -1212,7 +1211,7 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
     if (!topics.contains(topic_name)) {
       // get topic information. destination information is stored in the topic
       rgw_pubsub_topic topic_info;
-      op_ret = ps.get_topic(this, topic_name, topic_info, y);
+      op_ret = ps.get_topic(this, topic_name, topic_info, y,nullptr);
       if (op_ret < 0) {
         ldpp_dout(this, 1) << "failed to get topic '" << topic_name
                            << "', ret=" << op_ret << dendl;
@@ -1248,6 +1247,18 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
         << bucket->get_name() << " returned err= " << op_ret << dendl;
     return;
   }
+  for (const auto& [_, topic] : topics) {
+    const auto ret = driver->update_bucket_topic_mapping(
+        topic,
+        rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()),
+        /*add_mapping=*/true, y, this);
+    if (ret < 0) {
+      ldpp_dout(this, 1) << "Failed to remove topic mapping on bucket="
+                         << bucket->get_name() << " ret= " << ret << dendl;
+      // error should be reported ??
+      // op_ret = ret;
+    }
+  }
   ldpp_dout(this, 20) << "successfully created bucket notification for bucket: "
                       << bucket->get_name() << dendl;
 }
index 89ac23341d23daec9f90762fb2acb606353dbcab..4dc3c9dc83651ede345fa627f5e3021720da1c0f 100644 (file)
@@ -332,6 +332,34 @@ class Driver {
                                 RGWObjVersionTracker* objv_tracker,
                                 optional_yield y,
                                 const DoutPrefixProvider* dpp) = 0;
+    /** Update the bucket-topic mapping in the store, if |add_mapping|=true then
+     * adding the |bucket_key| |topic| mapping to store, else delete the
+     * |bucket_key| |topic| mapping from the store.  The |bucket_key| is
+     * in the format |tenant_name + "/" + bucket_name| if tenant is not empty
+     * else |bucket_name|*/
+    virtual int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                            const std::string& bucket_key,
+                                            bool add_mapping,
+                                            optional_yield y,
+                                            const DoutPrefixProvider* dpp) = 0;
+    /** Remove the |bucket_key| from bucket-topic mapping in the store, for all
+    the topics under |bucket_topics|*/
+    virtual int remove_bucket_mapping_from_topics(
+        const rgw_pubsub_bucket_topics& bucket_topics,
+        const std::string& bucket_key,
+        optional_yield y,
+        const DoutPrefixProvider* dpp) = 0;
+    /** Get the bucket-topic mapping from the backend store. The |bucket_keys|
+     * are in the format |tenant_name + "/" + bucket_name| if tenant is not
+     * empty else |bucket_name|*/
+    virtual int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                         std::set<std::string>& bucket_keys,
+                                         optional_yield y,
+                                         const DoutPrefixProvider* dpp) = 0;
+    /** Remove the bucket-topic mapping from the backend store. */
+    virtual int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                            optional_yield y,
+                                            const DoutPrefixProvider* dpp) = 0;
     /** Get access to the lifecycle management thread */
     virtual RGWLC* get_rgwlc(void) = 0;
     /** Get access to the coroutine registry.  Used to create new coroutine managers */
index 71991378e32143bd4dd8b69831770eb7a93c5f17..d5bf9afe248cc418ef92703b1b91d518978b3f76 100644 (file)
@@ -218,7 +218,33 @@ public:
                       const DoutPrefixProvider* dpp) override {
     return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
   }
-
+  int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                  const std::string& bucket_key,
+                                  bool add_mapping,
+                                  optional_yield y,
+                                  const DoutPrefixProvider* dpp) override {
+    return next->update_bucket_topic_mapping(topic, bucket_key, add_mapping, y,
+                                             dpp);
+  }
+  int remove_bucket_mapping_from_topics(
+      const rgw_pubsub_bucket_topics& bucket_topics,
+      const std::string& bucket_key,
+      optional_yield y,
+      const DoutPrefixProvider* dpp) override {
+    return next->remove_bucket_mapping_from_topics(bucket_topics, bucket_key, y,
+                                                   dpp);
+  }
+  int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                               std::set<std::string>& bucket_keys,
+                               optional_yield y,
+                               const DoutPrefixProvider* dpp) override {
+    return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
+  }
+  int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                  optional_yield y,
+                                  const DoutPrefixProvider* dpp) override {
+    return next->delete_bucket_topic_mapping(topic, y, dpp);
+  }
   virtual RGWLC* get_rgwlc(void) override;
   virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
 
index eda0f08ede6f0122d590309681858cb67c461fcf..b34276a9daaf8fe2512a028932939fd1ba7fde80 100644 (file)
@@ -46,14 +46,39 @@ class StoreDriver : public Driver {
                        RGWObjVersionTracker* objv_tracker,
                        optional_yield y,
                        const DoutPrefixProvider* dpp) override {
-      return -ENOENT;
+      return -EOPNOTSUPP;
     }
     int remove_topic_v2(const std::string& topic_name,
                         const std::string& tenant,
                         RGWObjVersionTracker* objv_tracker,
                         optional_yield y,
                         const DoutPrefixProvider* dpp) override {
-      return -ENOENT;
+      return -EOPNOTSUPP;
+    }
+    int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                    const std::string& bucket_key,
+                                    bool add_mapping,
+                                    optional_yield y,
+                                    const DoutPrefixProvider* dpp) override {
+      return -EOPNOTSUPP;
+    }
+    int remove_bucket_mapping_from_topics(
+        const rgw_pubsub_bucket_topics& bucket_topics,
+        const std::string& bucket_key,
+        optional_yield y,
+        const DoutPrefixProvider* dpp) override {
+      return -EOPNOTSUPP;
+    }
+    int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                 std::set<std::string>& bucket_keys,
+                                 optional_yield y,
+                                 const DoutPrefixProvider* dpp) override {
+      return -EOPNOTSUPP;
+    }
+    int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+                                    optional_yield y,
+                                    const DoutPrefixProvider* dpp) override {
+      return -EOPNOTSUPP;
     }
 };
 
index 64c9106776cc049e8e643242b2f10a0f631b2282..9e0b75d46647b8ddcadadf5a427d74c2fba4309e 100644 (file)
@@ -10,6 +10,7 @@
 
 static std::string topic_oid_prefix = "topic.";
 static constexpr char topic_tenant_delim[] = ":";
+static std::string bucket_topic_oid_prefix = "buckets.";
 
 std::string get_topic_key(const std::string& topic_name,
                           const std::string& tenant) {
@@ -32,6 +33,11 @@ void parse_topic_entry(const std::string& topic_entry,
     *topic_name = topic_entry;
   }
 }
+
+std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic) {
+  return bucket_topic_oid_prefix + get_topic_key(topic.name, topic.user.tenant);
+}
+
 class RGWSI_Topic_Module : public RGWSI_MBSObj_Handler_Module {
   RGWSI_Topic_RADOS::Svc& svc;
   const std::string prefix;
@@ -131,7 +137,7 @@ int RGWTopicMetadataHandler::do_get(RGWSI_MetaBackend_Handler::Op* op,
   parse_topic_entry(entry, &tenant, &topic_name);
   RGWPubSub ps(driver, tenant,
                &topic_svc->svc.zone->get_current_period().get_map().zonegroups);
-  int ret = ps.get_topic(dpp, topic_name, result, y);
+  int ret = ps.get_topic(dpp, topic_name, result, y, nullptr);
   if (ret < 0) {
     return ret;
   }
index e630a610e974e23b92323dbd5e814e9db985c9bf..bc4e35373459ee73048b4d25c56b6f833bdc7c62 100644 (file)
@@ -94,3 +94,5 @@ std::string get_topic_key(const std::string& topic_name,
 void parse_topic_entry(const std::string& topic_entry,
                        std::string* tenant_name,
                        std::string* topic_name);
+
+std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic);
\ No newline at end of file