From dd5ec4bafa8953cfbe7e7eba130361e98275d92a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 2 May 2024 14:35:47 -0400 Subject: [PATCH] rgw/sal: add/remove_persistent_topic() interface Signed-off-by: Casey Bodley (cherry picked from commit 51ab34d8283dfd08d639860d98f055621baeed44) --- src/rgw/driver/rados/rgw_sal_rados.cc | 16 ++++++++++++++++ src/rgw/driver/rados/rgw_sal_rados.h | 6 ++++++ src/rgw/rgw_sal.h | 10 ++++++++++ src/rgw/rgw_sal_dbstore.cc | 14 ++++++++++++++ src/rgw/rgw_sal_dbstore.h | 7 +++++++ src/rgw/rgw_sal_filter.cc | 14 ++++++++++++++ src/rgw/rgw_sal_filter.h | 6 ++++++ 7 files changed, 73 insertions(+) diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 64444e4f8b729..f1d7b18be4f27 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -1726,6 +1726,22 @@ int RadosStore::list_account_topics(const DoutPrefixProvider* dpp, listing.topics, listing.next_marker); } +int RadosStore::add_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) +{ + return rgw::notify::add_persistent_topic( + dpp, getRados()->get_notif_pool_ctx(), topic_queue, y); +} + +int RadosStore::remove_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) +{ + return rgw::notify::remove_persistent_topic( + dpp, getRados()->get_notif_pool_ctx(), topic_queue, y); +} + int RadosStore::remove_bucket_mapping_from_topics( const rgw_pubsub_bucket_topics& bucket_topics, const std::string& bucket_key, diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 78bd849717f60..9f5d021987ad3 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -319,6 +319,12 @@ class RadosStore : public StoreDriver { std::string_view marker, uint32_t max_items, TopicList& listing) override; + int add_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) override; + int remove_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) override; int update_bucket_topic_mapping(const rgw_pubsub_topic& topic, const std::string& bucket_key, bool add_mapping, diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index e3a892b6992ce..cd29e9c742834 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -542,6 +542,16 @@ class Driver { std::string_view marker, uint32_t max_items, TopicList& listing) = 0; + + // TODO: backends should manage persistent topic queues internally on + // write_topic_v2()/remove_topic_v2() + virtual int add_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) = 0; + virtual int remove_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) = 0; + /** Update the bucket-topic mapping in the store, if |add_mapping|=true then * adding the |bucket_key| |topic| mapping to store, else delete the * |bucket_key| |topic| mapping from the store. The |bucket_key| is diff --git a/src/rgw/rgw_sal_dbstore.cc b/src/rgw/rgw_sal_dbstore.cc index 2ce6304646eac..874935f161974 100644 --- a/src/rgw/rgw_sal_dbstore.cc +++ b/src/rgw/rgw_sal_dbstore.cc @@ -1912,6 +1912,20 @@ namespace rgw::sal { return -ENOTSUP; } + int DBStore::add_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) + { + return -ENOTSUP; + } + + int DBStore::remove_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) + { + return -ENOTSUP; + } + RGWLC* DBStore::get_rgwlc(void) { return lc; } diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 4770713e762a6..eaa5fda5060cc 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -887,6 +887,13 @@ public: uint32_t max_items, TopicList& listing) override; + int add_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) override; + int remove_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) override; + virtual RGWLC* get_rgwlc(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; } virtual int log_usage(const DoutPrefixProvider *dpp, std::map& usage_info, optional_yield y) override; diff --git a/src/rgw/rgw_sal_filter.cc b/src/rgw/rgw_sal_filter.cc index 66a2466040ba5..d6d51b204612b 100644 --- a/src/rgw/rgw_sal_filter.cc +++ b/src/rgw/rgw_sal_filter.cc @@ -463,6 +463,20 @@ std::unique_ptr FilterDriver::get_notification( return std::make_unique(std::move(n)); } +int FilterDriver::add_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) +{ + return next->add_persistent_topic(dpp, y, topic_queue); +} + +int FilterDriver::remove_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) +{ + return next->remove_persistent_topic(dpp, y, topic_queue); +} + RGWLC* FilterDriver::get_rgwlc() { return next->get_rgwlc(); diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 95d00960cbd2d..eaa8af2cb437d 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -353,6 +353,12 @@ public: return next->list_account_topics(dpp, y, account_id, marker, max_items, listing); } + int add_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) override; + int remove_persistent_topic(const DoutPrefixProvider* dpp, + optional_yield y, + const std::string& topic_queue) override; int update_bucket_topic_mapping(const rgw_pubsub_topic& topic, const std::string& bucket_key, bool add_mapping, -- 2.39.5