return true;
}
+auto validate_topic_arn(const std::string& str, std::string& message)
+ -> boost::optional<rgw::ARN>
+{
+ if (str.empty()) {
+ message = "Missing required element TopicArn";
+ return boost::none;
+ }
+ auto arn = rgw::ARN::parse(str);
+ if (!arn || arn->resource.empty()) {
+ message = "Invalid value for TopicArn";
+ return boost::none;
+ }
+ return arn;
+}
+
bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
return topic.dest.stored_secret;
}
private:
bufferlist bl_post_body;
std::string topic_name;
+ rgw::ARN topic_arn;
+ std::optional<rgw_pubsub_topic> existing;
rgw_pubsub_dest dest;
- std::string topic_arn;
std::string opaque_data;
std::string policy_text;
// dest object only stores endpoint info
dest.arn_topic = topic_name;
// the topic ARN will be sent in the reply
- const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
+ topic_arn = rgw::ARN{rgw::Partition::aws, rgw::Service::sns,
driver->get_zone()->get_zonegroup().get_name(),
- s->user->get_tenant(), topic_name);
- topic_arn = arn.to_string();
+ s->user->get_tenant(), topic_name};
return 0;
}
explicit RGWPSCreateTopicOp(bufferlist bl_post_body)
: bl_post_body(std::move(bl_post_body)) {}
- int verify_permission(optional_yield y) override {
- auto ret = get_params();
+ int init_processing(optional_yield y) override {
+ int ret = get_params();
if (ret < 0) {
return ret;
}
+ // try to load existing topic for owner and policy
const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
rgw_pubsub_topic result;
ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret == -ENOENT) {
// topic not present
- return 0;
+ } else if (ret < 0) {
+ ldpp_dout(this, 1) << "failed to read topic '" << topic_name
+ << "', with error:" << ret << dendl;
+ return ret;
+ } else {
+ existing = std::move(result);
}
- if (ret == 0) {
- ret = verify_topic_owner_or_policy(
- s, result, driver->get_zone()->get_zonegroup().get_name(),
- rgw::IAM::snsCreateTopic);
- if (ret == 0)
- {
- return 0;
- }
+ return RGWOp::init_processing(y);
+ }
- ldpp_dout(this, 4) << "no permission to modify topic '" << topic_name
- << "', topic already exist." << dendl;
- return -EACCES;
+ int verify_permission(optional_yield y) override {
+ if (!existing) {
+ return 0;
}
- ldpp_dout(this, 4) << "failed to read topic '" << topic_name
- << "', with error:" << ret << dendl;
- return ret;
- }
+ return verify_topic_owner_or_policy(
+ s, *existing, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsCreateTopic);
+ }
void pre_exec() override {
rgw_bucket_object_pre_exec(s);
const auto f = s->formatter;
f->open_object_section_in_ns("CreateTopicResponse", RGW_REST_SNS_XMLNS);
f->open_object_section("CreateTopicResult");
- encode_xml("TopicArn", topic_arn, f);
+ encode_xml("TopicArn", topic_arn.to_string(), f);
f->close_section(); // CreateTopicResult
f->open_object_section("ResponseMetadata");
encode_xml("RequestId", s->req_id, f);
}
}
const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
- op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
- s->owner.id, policy_text, y);
+ op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
+ opaque_data, s->owner.id, policy_text, y);
if (op_ret < 0) {
ldpp_dout(this, 4) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
// POST
// Action=GetTopic&TopicArn=<topic-arn>
class RGWPSGetTopicOp : public RGWOp {
- private:
+ private:
+ rgw::ARN topic_arn;
std::string topic_name;
rgw_pubsub_topic result;
int get_params() {
- const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
- if (!topic_arn || topic_arn->resource.empty()) {
- ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
- return -EINVAL;
+ auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+ if (!arn) {
+ return -EINVAL;
}
-
- topic_name = topic_arn->resource;
+ topic_arn = std::move(*arn);
+ topic_name = topic_arn.resource;
return 0;
}
- public:
+ public:
+ int init_processing(optional_yield y) override {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+ const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+ ret = ps.get_topic(this, topic_name, result, y, nullptr);
+ if (ret < 0) {
+ ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << ret << dendl;
+ return ret;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ s->err.message = "Topic contains secrets that must be transmitted over a secure transport";
+ return -EPERM;
+ }
+ return RGWOp::init_processing(y);
+ }
+
int verify_permission(optional_yield y) override {
- return 0;
+ return verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsGetTopicAttributes);
}
void pre_exec() override {
rgw_bucket_object_pre_exec(s);
};
void RGWPSGetTopicOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
- op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
- ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
- op_ret = -EPERM;
- return;
- }
- op_ret = verify_topic_owner_or_policy(
- s, result, driver->get_zone()->get_zonegroup().get_name(),
- rgw::IAM::snsGetTopicAttributes);
- if (op_ret != 0) {
- ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
- << "'" << dendl;
- return;
- }
- ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+ ldpp_dout(this, 4) << "successfully got topic '" << topic_name << "'" << dendl;
}
// command (AWS compliant):
// POST
// Action=GetTopicAttributes&TopicArn=<topic-arn>
class RGWPSGetTopicAttributesOp : public RGWOp {
- private:
+ private:
+ rgw::ARN topic_arn;
std::string topic_name;
rgw_pubsub_topic result;
int get_params() {
- const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
- if (!topic_arn || topic_arn->resource.empty()) {
- ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
- return -EINVAL;
+ auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+ if (!arn) {
+ return -EINVAL;
}
+ topic_arn = std::move(*arn);
+ topic_name = topic_arn.resource;
+ return 0;
+ }
- topic_name = topic_arn->resource;
+ public:
+ int init_processing(optional_yield y) override {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+ const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+ ret = ps.get_topic(this, topic_name, result, y, nullptr);
+ if (ret < 0) {
+ ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << ret << dendl;
+ return ret;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ s->err.message = "Topic contains secrets that must be transmitted over a secure transport";
+ return -EPERM;
+ }
return 0;
}
- public:
int verify_permission(optional_yield y) override {
- return 0;
+ return verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsGetTopicAttributes);
}
void pre_exec() override {
rgw_bucket_object_pre_exec(s);
};
void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
- op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
- ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
- op_ret = -EPERM;
- return;
- }
- op_ret = verify_topic_owner_or_policy(
- s, result, driver->get_zone()->get_zonegroup().get_name(),
- rgw::IAM::snsGetTopicAttributes);
- if (op_ret != 0) {
- ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
- << "'" << dendl;
- return;
- }
- ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+ ldpp_dout(this, 4) << "successfully got topic '" << topic_name << "'" << dendl;
}
// command (AWS compliant):
class RGWPSSetTopicAttributesOp : public RGWOp {
private:
bufferlist bl_post_body;
+ rgw::ARN topic_arn;
std::string topic_name;
- std::string topic_arn;
+ rgw_pubsub_topic result;
std::string opaque_data;
std::string policy_text;
rgw_pubsub_dest dest;
std::string attribute_name;
int get_params() {
- const auto arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
- if (!arn || arn->resource.empty()) {
- ldpp_dout(this, 1) << "SetTopicAttribute Action 'TopicArn' argument is "
- "missing or invalid"
- << dendl;
+ auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+ if (!arn) {
return -EINVAL;
}
- topic_arn = arn->to_string();
- topic_name = arn->resource;
+ topic_arn = std::move(*arn);
+ topic_name = topic_arn.resource;
+
attribute_name = s->info.args.get("AttributeName");
if (attribute_name.empty()) {
- ldpp_dout(this, 1)
- << "SetTopicAttribute Action 'AttributeName' argument is "
- "missing or invalid"
- << dendl;
+ s->err.message = "Missing required element AttributeName";
return -EINVAL;
}
return 0;
explicit RGWPSSetTopicAttributesOp(bufferlist bl_post_body)
: bl_post_body(std::move(bl_post_body)) {}
- int verify_permission(optional_yield y) override {
- auto ret = get_params();
+ int init_processing(optional_yield y) override {
+ int ret = get_params();
if (ret < 0) {
return ret;
}
- rgw_pubsub_topic result;
+
const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret < 0) {
return ret;
}
topic_owner = result.owner;
- ret = verify_topic_owner_or_policy(
- s, result, driver->get_zone()->get_zonegroup().get_name(),
- rgw::IAM::snsSetTopicAttributes);
- if (ret != 0) {
- ldpp_dout(this, 1) << "no permission to set attributes for topic '" << topic_name
- << "'" << dendl;
+
+ ret = map_attributes(result);
+ if (ret < 0) {
return ret;
}
- return map_attributes(result);
+ return RGWOp::init_processing(y);
+ }
+
+ int verify_permission(optional_yield y) override {
+ return verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsSetTopicAttributes);
}
void pre_exec() override { rgw_bucket_object_pre_exec(s); }
op_ret = rgw_forward_request_to_master(
this, *s->penv.site, s->user->get_id(), &bl_post_body, nullptr, s->info, y);
if (op_ret < 0) {
- ldpp_dout(this, 1)
+ ldpp_dout(this, 4)
<< "SetTopicAttributes forward_request_to_master returned ret = "
<< op_ret << dendl;
return;
if (!dest.push_endpoint.empty() && dest.persistent) {
op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
if (op_ret < 0) {
- ldpp_dout(this, 1)
+ ldpp_dout(this, 4)
<< "SetTopicAttributes Action failed to create queue for "
"persistent topics. error:"
<< op_ret << dendl;
} else { // changing the persistent topic to non-persistent.
op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
if (op_ret != -ENOENT && op_ret < 0) {
- ldpp_dout(this, 1) << "SetTopicAttributes Action failed to remove queue "
+ ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
"for persistent topics. error:"
<< op_ret << dendl;
return;
}
}
const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
- op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
- topic_owner, policy_text, y);
+ op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
+ opaque_data, topic_owner, policy_text, y);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to SetAttributes for topic '" << topic_name
+ ldpp_dout(this, 4) << "failed to SetAttributes for topic '" << topic_name
<< "', ret=" << op_ret << dendl;
return;
}
class RGWPSDeleteTopicOp : public RGWOp {
private:
bufferlist bl_post_body;
+ rgw::ARN topic_arn;
std::string topic_name;
+ std::optional<rgw_pubsub_topic> topic;
int get_params() {
- const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
- if (!topic_arn || topic_arn->resource.empty()) {
- ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+ auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+ if (!arn) {
return -EINVAL;
}
-
- topic_name = topic_arn->resource;
+ topic_arn = std::move(*arn);
+ topic_name = topic_arn.resource;
return 0;
}
explicit RGWPSDeleteTopicOp(bufferlist bl_post_body)
: bl_post_body(std::move(bl_post_body)) {}
- int verify_permission(optional_yield) override {
- return 0;
+ int init_processing(optional_yield y) override {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+ rgw_pubsub_topic result;
+ ret = ps.get_topic(this, topic_name, result, y, nullptr);
+ if (ret == -ENOENT) {
+ // leave topic empty
+ } else if (ret < 0) {
+ ldpp_dout(this, 4) << "failed to get topic '" << topic_name
+ << "', ret=" << ret << dendl;
+ return ret;
+ } else {
+ topic = std::move(result);
+ }
+
+ return RGWOp::init_processing(y);
}
+
+ int verify_permission(optional_yield y) override {
+ if (!topic) {
+ return 0;
+ }
+ return verify_topic_owner_or_policy(
+ s, *topic, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsDeleteTopic);
+ }
+
void pre_exec() override {
rgw_bucket_object_pre_exec(s);
}
};
void RGWPSDeleteTopicOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
if (!driver->is_meta_master()) {
op_ret = rgw_forward_request_to_master(
this, *s->penv.site, s->user->get_id(), &bl_post_body, nullptr, s->info, y);
return;
}
}
- const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
- rgw_pubsub_topic result;
- op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
- if (op_ret == 0) {
- op_ret = verify_topic_owner_or_policy(
- s, result, driver->get_zone()->get_zonegroup().get_name(),
- rgw::IAM::snsDeleteTopic);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "no permission to remove topic '" << topic_name
- << "'" << dendl;
- return;
- }
- op_ret = ps.remove_topic(this, topic_name, y);
- if (op_ret < 0 && op_ret != -ENOENT) {
- ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
- } else if (op_ret != -ENOENT) {
- ldpp_dout(this, 1) << "failed to fetch topic '" << topic_name
- << "' with error: " << op_ret << dendl;
+ if (!topic) {
return;
}
+
+ const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+ op_ret = ps.remove_topic(this, topic_name, y);
+ if (op_ret < 0 && op_ret != -ENOENT) {
+ ldpp_dout(this, 4) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 4) << "successfully removed topic '" << topic_name << "'" << dendl;
+
if (op_ret == -ENOENT) {
// its not an error if no topics exist, just a no-op
op_ret = 0;
// actual configuration is XML encoded in the body of the message
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
bufferlist data;
+ rgw_pubsub_s3_notifications configurations;
+ std::map<rgw::ARN, rgw_pubsub_topic> topics;
+
int verify_params() override {
bool exists;
const auto no_value = s->info.args.get("notification", &exists);
return 0;
}
public:
+ int init_processing(optional_yield y) override;
int verify_permission(optional_yield y) override;
void pre_exec() override {
RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-
void execute(optional_yield) override;
void execute_v2(optional_yield);
};
-void RGWPSCreateNotifOp::execute(optional_yield y) {
- if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
- return execute_v2(y);
+int RGWPSCreateNotifOp::init_processing(optional_yield y)
+{
+ int ret = verify_params();
+ if (ret < 0) {
+ return ret;
}
- op_ret = verify_params();
- if (op_ret < 0) {
- return;
+
+ ret = get_params_from_body(configurations);
+ if (ret < 0) {
+ return ret;
}
- rgw_pubsub_s3_notifications configurations;
- op_ret = get_params_from_body(configurations);
- if (op_ret < 0) {
- return;
+ const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+
+ for (const auto& c : configurations.list) {
+ const auto& notif_name = c.id;
+ if (notif_name.empty()) {
+ s->err.message = "Missing required element Id";
+ return -EINVAL;
+ }
+ if (c.topic_arn.empty()) {
+ s->err.message = "Missing required element Topic";
+ return -EINVAL;
+ }
+
+ const auto arn = rgw::ARN::parse(c.topic_arn);
+ if (!arn || arn->resource.empty()) {
+ s->err.message = "Invalid Topic ARN";
+ return -EINVAL;
+ }
+ const auto& topic_name = arn->resource;
+
+ if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
+ s->err.message = "Unknown Event type: " + notif_name;
+ return -EINVAL;
+ }
+
+ // load topic metadata if we haven't already
+ auto insert = topics.emplace(std::piecewise_construct,
+ std::forward_as_tuple(*arn),
+ std::forward_as_tuple());
+ if (insert.second) {
+ rgw_pubsub_topic& topic_info = insert.first->second;
+ ret = ps.get_topic(this, topic_name, topic_info, y, nullptr);
+ if (ret < 0) {
+ ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << ret << dendl;
+ return ret;
+ }
+ }
}
+
+ return RGWOp::init_processing(y);
+}
+
+void RGWPSCreateNotifOp::execute(optional_yield y) {
if (!driver->is_meta_master()) {
op_ret = rgw_forward_request_to_master(
this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
}
}
- std::unique_ptr<rgw::sal::Bucket> bucket;
- op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
- &bucket, y);
- if (op_ret < 0) {
- ldpp_dout(this, 4) << "failed to get bucket '" <<
- (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) <<
- "' info, ret = " << op_ret << dendl;
- return;
+ if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
+ return execute_v2(y);
}
const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
- const RGWPubSub::Bucket b(ps, bucket.get());
+ const RGWPubSub::Bucket b(ps, s->bucket.get());
if(configurations.list.empty()) {
// get all topics on a bucket
for (const auto& c : configurations.list) {
const auto& notif_name = c.id;
- if (notif_name.empty()) {
- s->err.message = "Missing required element Id";
- op_ret = -EINVAL;
- return;
- }
- if (c.topic_arn.empty()) {
- s->err.message = "Missing required element Topic";
- op_ret = -EINVAL;
- return;
- }
const auto arn = rgw::ARN::parse(c.topic_arn);
- if (!arn || arn->resource.empty()) {
- s->err.message = "Invalid Topic ARN";
- op_ret = -EINVAL;
- return;
+ if (!arn) { // already validated above
+ continue;
}
+ const auto& topic_name = arn->resource;
- if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
- s->err.message = "Unknown Event type: " + notif_name;
- op_ret = -EINVAL;
- return;
+ auto t = topics.find(*arn);
+ if (t == topics.end()) {
+ continue;
}
+ auto& topic_info = t->second;
- const auto topic_name = arn->resource;
-
- // get topic information. destination information is stored in the topic
- rgw_pubsub_topic topic_info;
- op_ret = ps.get_topic(this, topic_name, topic_info, y, nullptr);
- if (op_ret < 0) {
- ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- op_ret = verify_topic_owner_or_policy(
- s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
- rgw::IAM::snsPublish);
- if (op_ret != 0) {
- ldpp_dout(this, 4) << "no permission to create notification for topic '"
- << topic_name << "'" << dendl;
- return;
- }
// make sure that full topic configuration match
// TODO: use ARN match function
}
int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
+ // require s3:PutBucketNotification permission for the bucket
if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
return -EACCES;
}
+ // require sns:Publish permission for each topic
+ for (const auto& [arn, topic] : topics) {
+ int ret = verify_topic_owner_or_policy(
+ s, topic, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsPublish);
+ if (ret < 0) {
+ ldpp_dout(this, 4) << "no permission to create notification for topic '"
+ << arn << "'" << dendl;
+ return ret;
+ }
+ }
return 0;
}
void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
- op_ret = verify_params();
- if (op_ret < 0) {
- return;
- }
-
- rgw_pubsub_s3_notifications configurations;
- op_ret = get_params_from_body(configurations);
- if (op_ret < 0) {
- return;
- }
- if (!driver->is_meta_master()) {
- op_ret = rgw_forward_request_to_master(
- this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "CreateBucketNotification "
- "forward_request_to_master returned ret = "
- << op_ret << dendl;
- return;
- }
- }
-
if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
<< ". please try again later" << dendl;
return;
}
const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
- std::unordered_map<std::string, rgw_pubsub_topic> topics;
for (const auto& c : configurations.list) {
const auto& notif_name = c.id;
- if (notif_name.empty()) {
- s->err.message = "Missing required element Id";
- op_ret = -EINVAL;
- return;
- }
- if (c.topic_arn.empty()) {
- s->err.message = "Missing required element Topic";
- op_ret = -EINVAL;
- return;
- }
const auto arn = rgw::ARN::parse(c.topic_arn);
- if (!arn || arn->resource.empty()) {
- s->err.message = "Invalid Topic ARN";
- op_ret = -EINVAL;
- return;
- }
-
- if (std::find(c.events.begin(), c.events.end(),
- rgw::notify::UnknownEvent) != c.events.end()) {
- s->err.message = "Unknown Event type: " + notif_name;
- op_ret = -EINVAL;
- return;
+ if (!arn) { // already validated above
+ continue;
}
const auto& topic_name = arn->resource;
- if (!topics.contains(topic_name)) {
- // get topic information. destination information is stored in the topic
- rgw_pubsub_topic topic_info;
- op_ret = ps.get_topic(this, topic_name, topic_info, y,nullptr);
- if (op_ret < 0) {
- ldpp_dout(this, 4) << "failed to get topic '" << topic_name
- << "', ret=" << op_ret << dendl;
- return;
- }
- op_ret = verify_topic_owner_or_policy(
- s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
- rgw::IAM::snsPublish);
- if (op_ret != 0) {
- ldpp_dout(this, 4) << "failed to create notification for topic '"
- << topic_name << "' topic owned by other user"
- << dendl;
- return;
- }
- topics[topic_name] = std::move(topic_info);
+
+ auto t = topics.find(*arn);
+ if (t == topics.end()) {
+ continue;
}
+ auto& topic_info = t->second;
+
auto& topic_filter =
bucket_topics.topics[topic_to_unique(topic_name, notif_name)];
- topic_filter.topic = topics[topic_name];
+ topic_filter.topic = topic_info;
topic_filter.events = c.events;
topic_filter.s3_id = notif_name;
topic_filter.s3_filter = c.filter;
// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
- int get_params(std::string& notif_name) const {
+ std::string notif_name;
+ int get_params() {
bool exists;
notif_name = s->info.args.get("notification", &exists);
if (!exists) {
void execute_v2(optional_yield y);
public:
+ int init_processing(optional_yield y) override;
int verify_permission(optional_yield y) override;
void pre_exec() override {
void execute(optional_yield y) override;
};
-void RGWPSDeleteNotifOp::execute(optional_yield y) {
- if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
- return execute_v2(y);
- }
- std::string notif_name;
- op_ret = get_params(notif_name);
- if (op_ret < 0) {
- return;
+int RGWPSDeleteNotifOp::init_processing(optional_yield y)
+{
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
}
+ return RGWOp::init_processing(y);
+}
+
+void RGWPSDeleteNotifOp::execute(optional_yield y) {
if (!driver->is_meta_master()) {
bufferlist indata;
op_ret = rgw_forward_request_to_master(
}
}
+ if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
+ return execute_v2(y);
+ }
+
const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
const RGWPubSub::Bucket b(ps, s->bucket.get());
}
void RGWPSDeleteNotifOp::execute_v2(optional_yield y) {
- std::string notif_name;
- op_ret = get_params(notif_name);
- if (op_ret < 0) {
- return;
- }
- if (!driver->is_meta_master()) {
- bufferlist indata;
- op_ret = rgw_forward_request_to_master(
- this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
- if (op_ret < 0) {
- ldpp_dout(this, 4) << "DeleteBucketNotification "
- "forward_request_to_master returned error ret= "
- << op_ret << dendl;
- return;
- }
- }
-
if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
ldpp_dout(this, 4) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
<< ". please try again later" << dendl;