return rc;
}
}
- for (const auto& bucket_topic : bucket_topics.topics) {
- const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
- const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
+ for (auto& bucket_topic : bucket_topics.topics) {
+ rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
+ rgw_pubsub_topic& topic_cfg = topic_filter.topic;
for (auto& event_type : event_types) {
if (!notification_match(res, topic_filter, event_type, req_tags)) {
// notification does not apply to req_state
<< "') apply to event of type: '" << to_string(event_type) << "'"
<< dendl;
+ // reload the topic in case it changed since the notification was added
+ const RGWPubSub ps(res.store, res.user_tenant, site);
+ int ret = ps.get_topic(res.dpp, topic_cfg.dest.arn_topic,
+ topic_cfg, res.yield, nullptr);
+ if (ret < 0) {
+ ldpp_dout(res.dpp, 1)
+ << "INFO: failed to load topic: " << topic_cfg.dest.arn_topic
+ << ". error: " << ret
+ << " while reserving persistent notification event" << dendl;
+ if (ret == -ENOENT) {
+ // either the topic is deleted but the corresponding notification
+ // still exist or in v2 mode the notification could have synced first
+ // but topic is not synced yet.
+ return 0;
+ }
+ ldpp_dout(res.dpp, 1)
+ << "WARN: Using the stored topic from bucket notification struct."
+ << dendl;
+ }
+
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
if (topic_cfg.dest.persistent) {
// TODO: take default reservation size from conf
librados::ObjectWriteOperation op;
bufferlist obl;
int rval;
- const auto& queue_name = topic_cfg.dest.arn_topic;
+ const auto& queue_name = topic_cfg.dest.persistent_queue;
cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
auto ret = rgw_rados_operate(
res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name,
return ret;
}
}
- // load the topic,if there is change in topic config while it's stored in
- // notification.
- rgw_pubsub_topic result;
- const RGWPubSub ps(res.store, res.user_tenant, site);
- auto ret =
- ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, result, res.yield, nullptr);
- if (ret < 0) {
- ldpp_dout(res.dpp, 1)
- << "INFO: failed to load topic: " << topic_cfg.name
- << ". error: " << ret
- << " while reserving persistent notification event" << dendl;
- if (ret == -ENOENT) {
- // either the topic is deleted but the corresponding notification
- // still exist or in v2 mode the notification could have synced first
- // but topic is not synced yet.
- return 0;
- }
- ldpp_dout(res.dpp, 1)
- << "WARN: Using the stored topic from bucket notification struct."
- << dendl;
- res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id,
- event_type);
- } else {
- res.topics.emplace_back(topic_filter.s3_id, result, res_id, event_type);
- }
+
+ res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type);
}
}
return 0;
event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration;
bufferlist bl;
encode(event_entry, bl);
- const auto& queue_name = topic.cfg.dest.arn_topic;
+ const auto& queue_name = topic.cfg.dest.persistent_queue;
if (bl.length() > res.size) {
// try to make a larger reservation, fail only if this is not possible
ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
cls_2pc_queue_abort(op, topic.res_id);
auto ret = rgw_rados_operate(
dpp, res.store->getRados()->get_notif_pool_ctx(),
- topic.cfg.dest.arn_topic, &op,
+ queue_name, &op,
res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
// nothing to abort or already committed/aborted
continue;
}
- const auto& queue_name = topic.cfg.dest.arn_topic;
+ const auto& queue_name = topic.cfg.dest.persistent_queue;
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
const auto ret = rgw_rados_operate(