[&Attributes.entry.12.key=time_to_live&Attributes.entry.12.value=<seconds to live>]
[&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=<retries number>]
[&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=<sleep seconds>]
+ [&Attributes.entry.14.key=Policy&Attributes.entry.14.value=<policy-JSON-string>]
Request parameters:
default value is taken from `rgw_topic_persistency_sleep_duration`.
providing a value overrides the global value.
zero value mean there is no delay between retries.
+- Policy: This will control who can access the topic in addition to the owner of the topic.
+ The policy passed needs to be a JSON string similar to bucket policy.
+ For example, one can send a policy string as follows::
+
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": ["arn:aws:iam::usfolks:user/fred:subuser"]},
+ "Action": ["sns:GetTopicAttributes","sns:Publish"],
+ "Resource": ["arn:aws:sns:default::mytopic"],
+ }]
+ }
+
+ Currently, we support only the following actions:
+ - sns:GetTopicAttributes To list or get existing topics
+ - sns:SetTopicAttributes To set attributes for the existing topic
+ - sns:DeleteTopic To delete the existing topic
+ - sns:Publish To be able to create/subscribe notification on existing topic
- HTTP endpoint
- TopicArn: topic `ARN
<https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html>`_.
- OpaqueData: The opaque data set on the topic.
+- Policy: Any access permission set on the topic.
Get Topic Information
`````````````````````
- TopicArn: topic `ARN
<https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html>`_.
- OpaqueData: the opaque data set on the topic.
+- Policy: Any access permission set on the topic.
Delete Topic
````````````
topic, the request must be made over HTTPS. The "topic list" request will
otherwise be rejected.
+Set Topic Attributes
+````````````````````
+
+::
+
+ POST
+
+ Action=SetTopicAttributes
+ &TopicArn=<topic-arn>&AttributeName=<attribute-name>&AttributeValue=<attribute-value>
+
+This allows to set/modify existing attributes on the specified topic.
+
+.. note::
+
+ - The AttributeName passed will either be updated or created (if not exist) with AttributeValue passed.
+ - Any unsupported AttributeName passed will result in error 400.
+
+The response has the following format:
+
+::
+
+ <SetTopicAttributesResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
+ <ResponseMetadata>
+ <RequestId></RequestId>
+ </ResponseMetadata>
+ </SetTopicAttributesResponse>
+
+Valid AttributeName that can be passed:
+
+ - push-endpoint: This is the URI of an endpoint to send push notifications to.
+ - OpaqueData: Opaque data is set in the topic configuration and added to all
+ notifications that are triggered by the topic.
+ - persistent: This indicates whether notifications to this endpoint are
+ persistent (=asynchronous) or not persistent. (This is "false" by default.)
+ - time_to_live: This will limit the time (in seconds) to retain the notifications.
+ - max_retries: This will limit the max retries before expiring notifications.
+ - retry_sleep_duration: This will control the frequency of retrying the notifications.
+ - Policy: This will control who can access the topic other than owner of the topic.
+ - verify-ssl: This indicates whether the server certificates must be validated by
+ the client. This is "true" by default.
+ - ``use-ssl``: If this is set to "true", a secure connection is used to
+ connect to the broker. This is "false" by default.
+ - cloudevents: This indicates whether the HTTP header should contain
+ attributes according to the `S3 CloudEvents Spec`_.
+ - amqp-exchange: The exchanges must exist and must be able to route messages
+ based on topics.
+ - amqp-ack-level: No end2end acknowledgement is required. Messages may persist in the
+ broker before being delivered to their final destinations.
+ - ``ca-location``: If this is provided and a secure connection is used, the
+ specified CA will be used instead of the default CA to authenticate the
+ broker.
+ - mechanism: may be provided together with user/password (default: ``PLAIN``).
+ - kafka-ack-level: No end2end acknowledgement is required. Messages may persist in the
+ broker before being delivered to their final destinations.
+
Notifications
~~~~~~~~~~~~~
return false;
}
+std::optional<rgw::IAM::Policy> get_policy_from_text(req_state* const s,
+ std::string& policy_text) {
+ const auto bl = bufferlist::static_from_string(policy_text);
+ try {
+ return rgw::IAM::Policy(
+ s->cct, s->owner.get_id().tenant, bl,
+ s->cct->_conf.get_val<bool>("rgw_policy_reject_invalid_principals"));
+ } catch (rgw::IAM::PolicyParseException& e) {
+ ldout(s->cct, 1) << "failed to parse policy:' " << policy_text
+ << " ' with error: " << e.what() << dendl;
+ s->err.message = e.what();
+ return std::nullopt;
+ }
+}
+
+int verify_topic_owner_or_policy(req_state* const s,
+ const rgw_pubsub_topic& topic,
+ const std::string& zonegroup_name,
+ const uint64_t op) {
+ if (topic.user == s->owner.get_id()) {
+ return 0;
+ }
+ // no policy set.
+ if (topic.policy_text.empty()) {
+ // if mandatory_topic_permissions is true, then validate all users for
+ // permission.
+ if (s->cct->_conf->mandatory_topic_permissions) {
+ return -EACCES;
+ } else {
+ return 0;
+ }
+ }
+ // bufferlist::static_from_string wants non const string
+ std::string policy_text(topic.policy_text);
+ const auto p = get_policy_from_text(s, policy_text);
+ rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
+ const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, zonegroup_name,
+ s->user->get_tenant(), topic.name);
+ if (!p || p->eval(s->env, *s->auth.identity, op, arn, princ_type) !=
+ rgw::IAM::Effect::Allow) {
+ ldout(s->cct, 1) << "topic_policy failed validation, topic_policy: " << p
+ << dendl;
+ return -EACCES;
+ }
+ return 0;
+}
+
// command (AWS compliant):
// POST
// Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
rgw_pubsub_dest dest;
std::string topic_arn;
std::string opaque_data;
-
+ std::string policy_text;
+
int get_params() {
topic_name = s->info.args.get("Name");
if (topic_name.empty()) {
return -EINVAL;
}
+ // Remove the args that are parsed, so the push_endpoint_args only contains
+ // necessary one's.
opaque_data = s->info.args.get("OpaqueData");
+ s->info.args.remove("OpaqueData");
dest.push_endpoint = s->info.args.get("push-endpoint");
+ s->info.args.remove("push-endpoint");
s->info.args.get_bool("persistent", &dest.persistent, false);
+ s->info.args.remove("persistent");
s->info.args.get_int("time_to_live", reinterpret_cast<int *>(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE);
+ s->info.args.remove("time_to_live");
s->info.args.get_int("max_retries", reinterpret_cast<int *>(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE);
+ s->info.args.remove("max_retries");
s->info.args.get_int("retry_sleep_duration", reinterpret_cast<int *>(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE);
+ s->info.args.remove("retry_sleep_duration");
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
return -EINVAL;
}
+ // Store topic Policy.
+ policy_text = url_decode(s->info.args.get("Policy"), true);
+ if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) {
+ return -ERR_MALFORMED_DOC;
+ }
+ s->info.args.remove("Policy");
+
for (const auto& param : s->info.args.get_params()) {
if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
continue;
// remove last separator
dest.push_endpoint_args.pop_back();
}
- if (!dest.push_endpoint.empty() && dest.persistent) {
- const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
- if (ret < 0) {
- ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
- return ret;
- }
- }
-
+
// dest object only stores endpoint info
dest.arn_topic = topic_name;
// the topic ARN will be sent in the reply
}
public:
- int verify_permission(optional_yield) override {
- return 0;
- }
+ int verify_permission(optional_yield y) override {
+ auto ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ rgw_pubsub_topic result;
+ ret = ps.get_topic(this, topic_name, result, y);
+ if (ret == -ENOENT) {
+ // topic not present
+ return 0;
+ }
+ if (ret == 0) {
+ if (result.user == s->owner.get_id() ||
+ !s->cct->_conf->mandatory_topic_permissions) {
+ return 0;
+ }
+ ldpp_dout(this, 1) << "failed to create topic '" << topic_name
+ << "', topic already exist." << dendl;
+ return -EPERM;
+ }
+ ldpp_dout(this, 1) << "failed to read topic '" << topic_name
+ << "', with error:" << ret << dendl;
+ return ret;
+ }
void pre_exec() override {
rgw_bucket_object_pre_exec(s);
};
void RGWPSCreateTopicOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
+ if (!dest.push_endpoint.empty() && dest.persistent) {
+ op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
+ "persistent topics. error:"
+ << op_ret << dendl;
+ return;
+ }
}
-
const RGWPubSub ps(driver, s->owner.get_id().tenant);
op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
- s->owner.get_id(), y);
+ s->owner.get_id(), policy_text, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
op_ret = -EPERM;
return;
}
+ for (auto it = result.topics.cbegin(); it != result.topics.cend();) {
+ if (verify_topic_owner_or_policy(
+ s, it->second, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsGetTopicAttributes) != 0) {
+ result.topics.erase(it++);
+ } else {
+ ++it;
+ }
+ }
ldpp_dout(this, 20) << "successfully got topics" << dendl;
}
op_ret = -EPERM;
return;
}
+ op_ret = verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsGetTopicAttributes);
+ if (op_ret != 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+ << "', topic owned by other user" << dendl;
+ return;
+ }
ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
}
op_ret = -EPERM;
return;
}
+ op_ret = verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsGetTopicAttributes);
+ if (op_ret != 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+ << "', topic owned by other user" << dendl;
+ return;
+ }
ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
}
+// command (AWS compliant):
+// POST
+// Action=SetTopicAttributes&TopicArn=<topic-arn>&AttributeName=<attribute-name>&AttributeValue=<attribute-value>
+class RGWPSSetTopicAttributesOp : public RGWOp {
+ private:
+ std::string topic_name;
+ std::string topic_arn;
+ std::string opaque_data;
+ std::string policy_text;
+ rgw_pubsub_dest dest;
+ rgw_user topic_owner;
+ std::string attribute_name;
+
+ int get_params() {
+ const auto arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+ if (!arn || arn->resource.empty()) {
+ ldpp_dout(this, 1) << "SetTopicAttribute Action 'TopicArn' argument is "
+ "missing or invalid"
+ << dendl;
+ return -EINVAL;
+ }
+ topic_arn = arn->to_string();
+ topic_name = arn->resource;
+ attribute_name = s->info.args.get("AttributeName");
+ if (attribute_name.empty()) {
+ ldpp_dout(this, 1)
+ << "SetTopicAttribute Action 'AttributeName' argument is "
+ "missing or invalid"
+ << dendl;
+ return -EINVAL;
+ }
+ return 0;
+ }
+
+ int map_attributes(const rgw_pubsub_topic& topic) {
+ // update the default values that is stored in topic currently.
+ opaque_data = topic.opaque_data;
+ policy_text = topic.policy_text;
+ dest = topic.dest;
+
+ if (attribute_name == "OpaqueData") {
+ opaque_data = s->info.args.get("AttributeValue");
+ } else if (attribute_name == "persistent") {
+ s->info.args.get_bool("AttributeValue", &dest.persistent, false);
+ } else if (attribute_name == "time_to_live") {
+ s->info.args.get_int("AttributeValue",
+ reinterpret_cast<int*>(&dest.time_to_live),
+ rgw::notify::DEFAULT_GLOBAL_VALUE);
+ } else if (attribute_name == "max_retries") {
+ s->info.args.get_int("AttributeValue",
+ reinterpret_cast<int*>(&dest.max_retries),
+ rgw::notify::DEFAULT_GLOBAL_VALUE);
+ } else if (attribute_name == "retry_sleep_duration") {
+ s->info.args.get_int("AttributeValue",
+ reinterpret_cast<int*>(&dest.retry_sleep_duration),
+ rgw::notify::DEFAULT_GLOBAL_VALUE);
+ } else if (attribute_name == "push-endpoint") {
+ dest.push_endpoint = s->info.args.get("AttributeValue");
+ if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+ return -EINVAL;
+ }
+ } else if (attribute_name == "Policy") {
+ policy_text = url_decode(s->info.args.get("AttributeValue"), true);
+ if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) {
+ return -ERR_MALFORMED_DOC;
+ }
+ } else {
+ // replace the push_endpoint_args if passed in SetAttribute.
+ const auto replace_str = [&](const std::string& param,
+ const std::string& val) {
+ auto& push_endpoint_args = dest.push_endpoint_args;
+ const std::string replaced_str = param + "=" + val;
+ const auto pos = push_endpoint_args.find(param);
+ if (pos == std::string::npos) {
+ dest.push_endpoint_args.append("&" + replaced_str);
+ return;
+ }
+ auto end_pos = dest.push_endpoint_args.find("&", pos);
+ end_pos = end_pos == std::string::npos ? push_endpoint_args.length()
+ : end_pos;
+ push_endpoint_args.replace(pos, end_pos - pos, replaced_str);
+ };
+ const std::unordered_set<std::string> push_endpoint_args = {
+ "verify-ssl", "use-ssl", "ca-location", "amqp-ack-level",
+ "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents"};
+ if (push_endpoint_args.count(attribute_name) == 1) {
+ replace_str(attribute_name, s->info.args.get("AttributeValue"));
+ return 0;
+ }
+ ldpp_dout(this, 1)
+ << "SetTopicAttribute Action 'AttributeName' argument is "
+ "invalid: 'AttributeName' = "
+ << attribute_name << dendl;
+ return -EINVAL;
+ }
+ return 0;
+ }
+
+ public:
+ int verify_permission(optional_yield y) override {
+ auto ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+ rgw_pubsub_topic result;
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ ret = ps.get_topic(this, topic_name, result, y);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+ << "', ret=" << ret << dendl;
+ return ret;
+ }
+ topic_owner = result.user;
+ ret = verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsSetTopicAttributes);
+ if (ret != 0) {
+ ldpp_dout(this, 1) << "failed to set attributes for topic '" << topic_name
+ << "', topic owned by other user" << dendl;
+ return ret;
+ }
+
+ return map_attributes(result);
+ }
+
+ void pre_exec() override { rgw_bucket_object_pre_exec(s); }
+ void execute(optional_yield) override;
+
+ const char* name() const override { return "pubsub_topic_set"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_SET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("SetTopicAttributesResponse", AWS_SNS_NS);
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section(); // ResponseMetadata
+ f->close_section(); // SetTopicAttributesResponse
+ rgw_flush_formatter_and_reset(s, f);
+ }
+};
+
+void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
+ if (!dest.push_endpoint.empty() && dest.persistent) {
+ op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1)
+ << "SetTopicAttributes Action failed to create queue for "
+ "persistent topics. error:"
+ << op_ret << dendl;
+ return;
+ }
+ } else { // changing the persistent topic to non-persistent.
+ op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+ if (op_ret != -ENOENT && op_ret < 0) {
+ ldpp_dout(this, 1) << "SetTopicAttributes Action failed to remove queue "
+ "for persistent topics. error:"
+ << op_ret << dendl;
+ return;
+ }
+ }
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
+ topic_owner, policy_text, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to SetAttributes for topic '" << topic_name
+ << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully set the attributes for topic '"
+ << topic_name << "'" << dendl;
+}
+
// command (AWS compliant):
// POST
// Action=DeleteTopic&TopicArn=<topic-arn>
}
topic_name = topic_arn->resource;
-
- // upon deletion it is not known if topic is persistent or not
- // will try to delete the persistent topic anyway
- const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
- if (ret == -ENOENT) {
- // topic was not persistent, or already deleted
- return 0;
- }
- if (ret < 0) {
- ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
- return ret;
- }
-
return 0;
}
return;
}
const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ rgw_pubsub_topic result;
+ op_ret = ps.get_topic(this, topic_name, result, y);
+ if (op_ret == 0) {
+ op_ret = verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsDeleteTopic);
+ if (op_ret != 0) {
+ ldpp_dout(this, 1) << "failed to remove topic '" << topic_name
+ << "' topic owned by other user" << dendl;
+ return;
+ }
+ } else {
+ ldpp_dout(this, 1) << "failed to fetch topic '" << topic_name
+ << "' with error: " << op_ret << dendl;
+ if (op_ret == -ENOENT) {
+ // its not an error if no topics exist, just a no-op
+ op_ret = 0;
+ }
+ return;
+ }
+ // upon deletion it is not known if topic is persistent or not
+ // will try to delete the persistent topic anyway
+ op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+ if (op_ret != -ENOENT && op_ret < 0) {
+ ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for "
+ "persistent topics. error:"
+ << op_ret << dendl;
+ return;
+ }
op_ret = ps.remove_topic(this, topic_name, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
using op_generator = RGWOp*(*)();
static const std::unordered_map<std::string, op_generator> op_generators = {
- {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}},
- {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}},
- {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}},
- {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}},
- {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}}
-};
+ {"CreateTopic", []() -> RGWOp* { return new RGWPSCreateTopicOp; }},
+ {"DeleteTopic", []() -> RGWOp* { return new RGWPSDeleteTopicOp; }},
+ {"ListTopics", []() -> RGWOp* { return new RGWPSListTopicsOp; }},
+ {"GetTopic", []() -> RGWOp* { return new RGWPSGetTopicOp; }},
+ {"GetTopicAttributes",
+ []() -> RGWOp* { return new RGWPSGetTopicAttributesOp; }},
+ {"SetTopicAttributes",
+ []() -> RGWOp* { return new RGWPSSetTopicAttributesOp; }}};
bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s)
{
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
+ op_ret = verify_topic_owner_or_policy(
+ s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsPublish);
+ if (op_ret != 0) {
+ ldpp_dout(this, 1) << "failed to create notification for topic '"
+ << topic_name << "' topic owned by other user"
+ << dendl;
+ return;
+ }
// make sure that full topic configuration match
// TODO: use ARN match function
// ARN is cached to make the "GET" method faster
op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest,
topic_info.arn, topic_info.opaque_data,
- s->owner.get_id(), y);
+ s->owner.get_id(), topic_info.policy_text, y);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;