return 0;
}
-int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
- const std::string &topic_name, rgw_topic_stats &stats, optional_yield y)
+int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+ const std::string &queue_name, rgw_topic_stats &stats, optional_yield y)
{
+ // TODO: use optional_yield instead calling rados_ioctx.operate() synchronously
cls_2pc_reservations reservations;
- auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations);
+ auto ret = cls_2pc_queue_list_reservations(rados_ioctx, queue_name, reservations);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
return ret;
}
stats.queue_reservations = reservations.size();
- ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size);
+ ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, stats.queue_entries, stats.queue_size);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
return ret;
// cancel the reservation
int publish_abort(reservation_t& reservation);
-int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
- const std::string &topic_name, rgw_topic_stats &stats, optional_yield y);
+int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+ const std::string &queue_name, rgw_topic_stats &stats, optional_yield y);
}
if (r < 0) {
return r;
}
- if (!info.dest.push_endpoint.empty() && info.dest.persistent) {
- r = rgw::notify::add_persistent_topic(info.name, y);
+ if (!info.dest.push_endpoint.empty() && info.dest.persistent &&
+ !info.dest.persistent_queue.empty()) {
+ r = rgw::notify::add_persistent_topic(info.dest.persistent_queue, y);
if (r < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic "
- << info.name << " with: " << cpp_strerror(r) << dendl;
+ << info.dest.persistent_queue << " with: " << cpp_strerror(r) << dendl;
return r;
}
}
std::string tenant;
parse_topic_metadata_key(entry, tenant, name);
- int r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone,
- tenant, name, objv_tracker);
+ rgw_pubsub_topic info;
+ int r = read(dpp, y, sysobj, cache_svc, zone, entry,
+ info, cache, nullptr, &objv_tracker);
if (r < 0) {
return r;
}
- // delete persistent topic queue. expect ENOENT for non-persistent topics
- r = rgw::notify::remove_persistent_topic(name, y);
- if (r < 0 && r != -ENOENT) {
- ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
- << name << " with error: " << r << dendl;
- } // not fatal
+ r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone,
+ tenant, name, objv_tracker);
+ if (r < 0) {
+ return r;
+ }
+
+ const rgw_pubsub_dest& dest = info.dest;
+ if (!dest.push_endpoint.empty() && dest.persistent &&
+ !dest.persistent_queue.empty()) {
+ // delete persistent topic queue
+ r = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
+ if (r < 0 && r != -ENOENT) {
+ ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
+ << name << " with error: " << r << dendl;
+ } // not fatal
+ }
return 0;
}
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
+ const std::string& account = !account_id.empty() ? account_id : tenant;
+ RGWPubSub ps(driver, account, *site);
+
+ rgw_pubsub_topic topic;
+ ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr);
+ if (ret < 0) {
+ cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ if (topic.dest.persistent_queue.empty()) {
+ cerr << "This topic does not have a persistent queue." << std::endl;
+ return ENOENT;
+ }
rgw::notify::rgw_topic_stats stats;
- ret = rgw::notify::get_persistent_queue_stats_by_topic_name(
- dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(), topic_name,
- stats, null_yield);
+ ret = rgw::notify::get_persistent_queue_stats(
+ dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(),
+ topic.dest.persistent_queue, stats, null_yield);
if (ret < 0) {
cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
return -ret;
const rgw_pubsub_dest& dest = topic.dest;
if (!dest.push_endpoint.empty() && dest.persistent &&
!dest.persistent_queue.empty()) {
- ret = rgw::notify::remove_persistent_topic(topic.name, y);
+ ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
"persistent topic: " << cpp_strerror(ret) << dendl;
if (!dest.push_endpoint.empty() && dest.persistent &&
!dest.persistent_queue.empty()) {
- ret = rgw::notify::remove_persistent_topic(name, y);
+ ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
"persistent topic: " << cpp_strerror(ret) << dendl;
return false;
}
+static bool topic_needs_queue(const rgw_pubsub_dest& dest)
+{
+ return !dest.push_endpoint.empty() && dest.persistent;
+}
+
auto get_policy_from_text(req_state* const s, const std::string& policy_text)
-> boost::optional<rgw::IAM::Policy>
{
ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret == -ENOENT) {
// topic not present
-
- // initialize the persistent queue's location. this cannot change for
- // existing topics. use ':' as the namespace delimiter because its
- // inclusion in a TopicName would break ARNs
- dest.persistent_queue = string_cat_reserve(
- get_account_or_tenant(s->owner.id), ":", topic_name);
} else if (ret < 0) {
ldpp_dout(this, 1) << "failed to read topic '" << topic_name
<< "', with error:" << ret << dendl;
return;
}
}
- if (!dest.push_endpoint.empty() && dest.persistent) {
- op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+
+ // don't add a persistent queue if we already have one
+ const bool already_persistent = topic && topic_needs_queue(topic->dest);
+ if (!already_persistent && topic_needs_queue(dest)) {
+ // initialize the persistent queue's location, using ':' as the namespace
+ // delimiter because its inclusion in a TopicName would break ARNs
+ dest.persistent_queue = string_cat_reserve(
+ get_account_or_tenant(s->owner.id), ":", topic_name);
+
+ op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
if (op_ret < 0) {
ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
"persistent topics. error:"
return;
}
}
- if (!dest.push_endpoint.empty() && dest.persistent) {
- op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+ // don't add a persistent queue if we already have one
+ const bool already_persistent = topic_needs_queue(result.dest);
+ if (!already_persistent && topic_needs_queue(dest)) {
+ // initialize the persistent queue's location, using ':' as the namespace
+ // delimiter because its inclusion in a TopicName would break ARNs
+ dest.persistent_queue = string_cat_reserve(
+ get_account_or_tenant(s->owner.id), ":", topic_name);
+
+ op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
if (op_ret < 0) {
ldpp_dout(this, 4)
<< "SetTopicAttributes Action failed to create queue for "
<< op_ret << dendl;
return;
}
- } else { // changing the persistent topic to non-persistent.
- op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+ } else if (already_persistent) {
+ // changing the persistent topic to non-persistent.
+ op_ret = rgw::notify::remove_persistent_topic(result.dest.persistent_queue, s->yield);
if (op_ret != -ENOENT && op_ret < 0) {
ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
"for persistent topics. error:"