// Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
class RGWPSCreateTopicOp : public RGWOp {
private:
- std::optional<RGWPubSub> ps;
std::string topic_name;
rgw_pubsub_dest dest;
std::string topic_arn;
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- op_ret = ps->create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
// Action=ListTopics
class RGWPSListTopicsOp : public RGWOp {
private:
- std::optional<RGWPubSub> ps;
rgw_pubsub_topics result;
public:
};
void RGWPSListTopicsOp::execute(optional_yield y) {
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- op_ret = ps->get_topics(&result);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ op_ret = ps.get_topics(&result);
// if there are no topics it is not considered an error
op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {
class RGWPSGetTopicOp : public RGWOp {
private:
std::string topic_name;
- std::optional<RGWPubSub> ps;
rgw_pubsub_topic result;
int get_params() {
if (op_ret < 0) {
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- op_ret = ps->get_topic(topic_name, &result);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ op_ret = ps.get_topic(topic_name, &result);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
class RGWPSGetTopicAttributesOp : public RGWOp {
private:
std::string topic_name;
- std::optional<RGWPubSub> ps;
rgw_pubsub_topic result;
int get_params() {
if (op_ret < 0) {
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- op_ret = ps->get_topic(topic_name, &result);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ op_ret = ps.get_topic(topic_name, &result);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
class RGWPSDeleteTopicOp : public RGWOp {
private:
std::string topic_name;
- std::optional<RGWPubSub> ps;
int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (op_ret < 0) {
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- op_ret = ps->remove_topic(this, topic_name, y);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ op_ret = ps.remove_topic(this, topic_name, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
return;
}
}
-int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
- int op_ret = b->remove_notification(dpp, topic_name, y);
+int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, RGWPubSub::Bucket& b, optional_yield y, RGWPubSub& ps) {
+ int op_ret = b.remove_notification(dpp, topic_name, y);
if (op_ret < 0) {
ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
}
return op_ret;
}
-int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
+int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, RGWPubSub::Bucket& b, optional_yield y, RGWPubSub& ps) {
// delete all notifications of on a bucket
for (const auto& topic : bucket_topics.topics) {
const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
// actual configuration is XML encoded in the body of the message
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
private:
- std::optional<RGWPubSub> ps;
std::string bucket_name;
RGWBucketInfo bucket_info;
rgw_pubsub_s3_notifications configurations;
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- ceph_assert(b);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ RGWPubSub::Bucket b(&ps, bucket_info.bucket);
if(configurations.list.empty()) {
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
+ op_ret = b.get_topics(&bucket_topics);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
return;
}
- op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
return;
}
// get topic information. destination information is stored in the topic
rgw_pubsub_topic topic_info;
- op_ret = ps->get_topic(topic_name, &topic_info);
+ op_ret = ps.get_topic(topic_name, &topic_info);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
// generate the internal topic. destination is stored here for the "push-only" case
// when no subscription exists
// ARN is cached to make the "GET" method faster
- op_ret = ps->create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
+ op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
// generate the notification
rgw::notify::EventTypeList events;
- op_ret = b->create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
+ op_ret = b.create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
// rollback generated topic (ignore return value)
- ps->remove_topic(this, unique_topic_name, y);
+ ps.remove_topic(this, unique_topic_name, y);
return;
}
ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
private:
- std::optional<RGWPubSub> ps;
std::string bucket_name;
RGWBucketInfo bucket_info;
std::string notif_name;
return;
}
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- ceph_assert(b);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ RGWPubSub::Bucket b(&ps, bucket_info.bucket);
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
+ op_ret = b.get_topics(&bucket_topics);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
return;
const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
if (unique_topic) {
const auto unique_topic_name = unique_topic->get().topic.name;
- op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
+ op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
return;
}
// notification to be removed is not found - considered success
return;
}
- op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
}
int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
private:
std::string bucket_name;
RGWBucketInfo bucket_info;
- std::optional<RGWPubSub> ps;
std::string notif_name;
rgw_pubsub_s3_notifications notifications;
};
void RGWPSListNotifsOp::execute(optional_yield y) {
- ps.emplace(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- auto b = ps->get_bucket(bucket_info.bucket);
- ceph_assert(b);
+ RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ RGWPubSub::Bucket b(&ps, bucket_info.bucket);
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
+ op_ret = b.get_topics(&bucket_topics);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
return;