listing.topics, listing.next_marker);
}
+int RadosStore::add_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue)
+{
+ return rgw::notify::add_persistent_topic(
+ dpp, getRados()->get_notif_pool_ctx(), topic_queue, y);
+}
+
+int RadosStore::remove_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue)
+{
+ return rgw::notify::remove_persistent_topic(
+ dpp, getRados()->get_notif_pool_ctx(), topic_queue, y);
+}
+
int RadosStore::remove_bucket_mapping_from_topics(
const rgw_pubsub_bucket_topics& bucket_topics,
const std::string& bucket_key,
std::string_view marker,
uint32_t max_items,
TopicList& listing) override;
+ int add_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) override;
+ int remove_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) override;
int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
const std::string& bucket_key,
bool add_mapping,
std::string_view marker,
uint32_t max_items,
TopicList& listing) = 0;
+
+ // TODO: backends should manage persistent topic queues internally on
+ // write_topic_v2()/remove_topic_v2()
+ virtual int add_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) = 0;
+ virtual int remove_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) = 0;
+
/** Update the bucket-topic mapping in the store, if |add_mapping|=true then
* adding the |bucket_key| |topic| mapping to store, else delete the
* |bucket_key| |topic| mapping from the store. The |bucket_key| is
return -ENOTSUP;
}
+ int DBStore::add_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue)
+ {
+ return -ENOTSUP;
+ }
+
+ int DBStore::remove_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue)
+ {
+ return -ENOTSUP;
+ }
+
RGWLC* DBStore::get_rgwlc(void) {
return lc;
}
uint32_t max_items,
TopicList& listing) override;
+ int add_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) override;
+ int remove_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) override;
+
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
return std::make_unique<FilterNotification>(std::move(n));
}
+int FilterDriver::add_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue)
+{
+ return next->add_persistent_topic(dpp, y, topic_queue);
+}
+
+int FilterDriver::remove_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue)
+{
+ return next->remove_persistent_topic(dpp, y, topic_queue);
+}
+
RGWLC* FilterDriver::get_rgwlc()
{
return next->get_rgwlc();
return next->list_account_topics(dpp, y, account_id, marker,
max_items, listing);
}
+ int add_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) override;
+ int remove_persistent_topic(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const std::string& topic_queue) override;
int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
const std::string& bucket_key,
bool add_mapping,