From 341009db418874a3b0c7732bac190c68e1b720ee Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 20 Mar 2024 14:16:15 -0400 Subject: [PATCH] rgw/pubsub: use rgw_pubsub_dest::persistent_queue for queue oid Signed-off-by: Casey Bodley (cherry picked from commit 4bac81a2bc540b110003b7e5a86c8670ea2a2cc8) --- src/rgw/driver/rados/rgw_notify.cc | 9 ++++--- src/rgw/driver/rados/rgw_notify.h | 4 +-- src/rgw/driver/rados/topic.cc | 34 +++++++++++++++++--------- src/rgw/rgw_admin.cc | 20 ++++++++++++--- src/rgw/rgw_pubsub.cc | 4 +-- src/rgw/rgw_rest_pubsub.cc | 39 +++++++++++++++++++++--------- 6 files changed, 76 insertions(+), 34 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 275bfc1d72345..ba3312e2cc2d9 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -1226,18 +1226,19 @@ int publish_abort(reservation_t& res) { return 0; } -int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx, - const std::string &topic_name, rgw_topic_stats &stats, optional_yield y) +int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx, + const std::string &queue_name, rgw_topic_stats &stats, optional_yield y) { + // TODO: use optional_yield instead calling rados_ioctx.operate() synchronously cls_2pc_reservations reservations; - auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations); + auto ret = cls_2pc_queue_list_reservations(rados_ioctx, queue_name, reservations); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl; return ret; } stats.queue_reservations = reservations.size(); - ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size); + ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, stats.queue_entries, stats.queue_size); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl; return ret; diff --git a/src/rgw/driver/rados/rgw_notify.h b/src/rgw/driver/rados/rgw_notify.h index 87e9791053725..7014cda3ca350 100644 --- a/src/rgw/driver/rados/rgw_notify.h +++ b/src/rgw/driver/rados/rgw_notify.h @@ -134,8 +134,8 @@ int publish_commit(rgw::sal::Object* obj, // cancel the reservation int publish_abort(reservation_t& reservation); -int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx, - const std::string &topic_name, rgw_topic_stats &stats, optional_yield y); +int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx, + const std::string &queue_name, rgw_topic_stats &stats, optional_yield y); } diff --git a/src/rgw/driver/rados/topic.cc b/src/rgw/driver/rados/topic.cc index 3e43a67012e99..7769649fbbfe7 100644 --- a/src/rgw/driver/rados/topic.cc +++ b/src/rgw/driver/rados/topic.cc @@ -352,11 +352,12 @@ class MetadataHandler : public RGWMetadataHandler { if (r < 0) { return r; } - if (!info.dest.push_endpoint.empty() && info.dest.persistent) { - r = rgw::notify::add_persistent_topic(info.name, y); + if (!info.dest.push_endpoint.empty() && info.dest.persistent && + !info.dest.persistent_queue.empty()) { + r = rgw::notify::add_persistent_topic(info.dest.persistent_queue, y); if (r < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic " - << info.name << " with: " << cpp_strerror(r) << dendl; + << info.dest.persistent_queue << " with: " << cpp_strerror(r) << dendl; return r; } } @@ -370,18 +371,29 @@ class MetadataHandler : public RGWMetadataHandler { std::string tenant; parse_topic_metadata_key(entry, tenant, name); - int r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone, - tenant, name, objv_tracker); + rgw_pubsub_topic info; + int r = read(dpp, y, sysobj, cache_svc, zone, entry, + info, cache, nullptr, &objv_tracker); if (r < 0) { return r; } - // delete persistent topic queue. expect ENOENT for non-persistent topics - r = rgw::notify::remove_persistent_topic(name, y); - if (r < 0 && r != -ENOENT) { - ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: " - << name << " with error: " << r << dendl; - } // not fatal + r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone, + tenant, name, objv_tracker); + if (r < 0) { + return r; + } + + const rgw_pubsub_dest& dest = info.dest; + if (!dest.push_endpoint.empty() && dest.persistent && + !dest.persistent_queue.empty()) { + // delete persistent topic queue + r = rgw::notify::remove_persistent_topic(dest.persistent_queue, y); + if (r < 0 && r != -ENOENT) { + ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: " + << name << " with error: " << r << dendl; + } // not fatal + } return 0; } diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 127a225ea363e..b9dc45a741859 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -11238,11 +11238,25 @@ next: cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; return EINVAL; } + const std::string& account = !account_id.empty() ? account_id : tenant; + RGWPubSub ps(driver, account, *site); + + rgw_pubsub_topic topic; + ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr); + if (ret < 0) { + cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + if (topic.dest.persistent_queue.empty()) { + cerr << "This topic does not have a persistent queue." << std::endl; + return ENOENT; + } rgw::notify::rgw_topic_stats stats; - ret = rgw::notify::get_persistent_queue_stats_by_topic_name( - dpp(), static_cast(driver)->getRados()->get_notif_pool_ctx(), topic_name, - stats, null_yield); + ret = rgw::notify::get_persistent_queue_stats( + dpp(), static_cast(driver)->getRados()->get_notif_pool_ctx(), + topic.dest.persistent_queue, stats, null_yield); if (ret < 0) { cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl; return -ret; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 641381e8b6b9f..e99d36017942f 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -1083,7 +1083,7 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp, const rgw_pubsub_dest& dest = topic.dest; if (!dest.push_endpoint.empty() && dest.persistent && !dest.persistent_queue.empty()) { - ret = rgw::notify::remove_persistent_topic(topic.name, y); + ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for " "persistent topic: " << cpp_strerror(ret) << dendl; @@ -1130,7 +1130,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na if (!dest.push_endpoint.empty() && dest.persistent && !dest.persistent_queue.empty()) { - ret = rgw::notify::remove_persistent_topic(name, y); + ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for " "persistent topic: " << cpp_strerror(ret) << dendl; diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index ff5c824a6be6d..ca54d0c324f85 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -111,6 +111,11 @@ bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) { return false; } +static bool topic_needs_queue(const rgw_pubsub_dest& dest) +{ + return !dest.push_endpoint.empty() && dest.persistent; +} + auto get_policy_from_text(req_state* const s, const std::string& policy_text) -> boost::optional { @@ -306,12 +311,6 @@ class RGWPSCreateTopicOp : public RGWOp { ret = ps.get_topic(this, topic_name, result, y, nullptr); if (ret == -ENOENT) { // topic not present - - // initialize the persistent queue's location. this cannot change for - // existing topics. use ':' as the namespace delimiter because its - // inclusion in a TopicName would break ARNs - dest.persistent_queue = string_cat_reserve( - get_account_or_tenant(s->owner.id), ":", topic_name); } else if (ret < 0) { ldpp_dout(this, 1) << "failed to read topic '" << topic_name << "', with error:" << ret << dendl; @@ -387,8 +386,16 @@ void RGWPSCreateTopicOp::execute(optional_yield y) { return; } } - if (!dest.push_endpoint.empty() && dest.persistent) { - op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield); + + // don't add a persistent queue if we already have one + const bool already_persistent = topic && topic_needs_queue(topic->dest); + if (!already_persistent && topic_needs_queue(dest)) { + // initialize the persistent queue's location, using ':' as the namespace + // delimiter because its inclusion in a TopicName would break ARNs + dest.persistent_queue = string_cat_reserve( + get_account_or_tenant(s->owner.id), ":", topic_name); + + op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield); if (op_ret < 0) { ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for " "persistent topics. error:" @@ -838,8 +845,15 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) { return; } } - if (!dest.push_endpoint.empty() && dest.persistent) { - op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield); + // don't add a persistent queue if we already have one + const bool already_persistent = topic_needs_queue(result.dest); + if (!already_persistent && topic_needs_queue(dest)) { + // initialize the persistent queue's location, using ':' as the namespace + // delimiter because its inclusion in a TopicName would break ARNs + dest.persistent_queue = string_cat_reserve( + get_account_or_tenant(s->owner.id), ":", topic_name); + + op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield); if (op_ret < 0) { ldpp_dout(this, 4) << "SetTopicAttributes Action failed to create queue for " @@ -847,8 +861,9 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) { << op_ret << dendl; return; } - } else { // changing the persistent topic to non-persistent. - op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield); + } else if (already_persistent) { + // changing the persistent topic to non-persistent. + op_ret = rgw::notify::remove_persistent_topic(result.dest.persistent_queue, s->yield); if (op_ret != -ENOENT && op_ret < 0) { ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue " "for persistent topics. error:" -- 2.39.5