From 8cfadb7e982a4d16e058dc73a3bd8b2d7b560434 Mon Sep 17 00:00:00 2001 From: kchheda3 Date: Thu, 5 Oct 2023 12:14:35 -0400 Subject: [PATCH] rgw/notification: Add support to set topic policy while topic creation Signed-off-by: kchheda3 --- doc/radosgw/notifications.rst | 77 ++++ src/common/options/rgw.yaml.in | 15 + src/rgw/rgw_auth_s3.cc | 1 + src/rgw/rgw_iam_policy.cc | 22 ++ src/rgw/rgw_iam_policy.h | 9 +- src/rgw/rgw_op_type.h | 1 + src/rgw/rgw_pubsub.cc | 8 +- src/rgw/rgw_pubsub.h | 11 +- src/rgw/rgw_rest_pubsub.cc | 394 ++++++++++++++++++-- src/test/rgw/bucket_notification/api.py | 15 +- src/test/rgw/bucket_notification/test_bn.py | 70 ++++ src/test/rgw/test_rgw_iam_policy.cc | 3 +- 12 files changed, 580 insertions(+), 46 deletions(-) diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 16f4847c5dc..a38ad9e01bd 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -159,6 +159,7 @@ updating, use the name of an existing topic and different endpoint values). [&Attributes.entry.12.key=time_to_live&Attributes.entry.12.value=] [&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=] [&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=] + [&Attributes.entry.14.key=Policy&Attributes.entry.14.value=] Request parameters: @@ -179,6 +180,25 @@ Request parameters: default value is taken from `rgw_topic_persistency_sleep_duration`. providing a value overrides the global value. zero value mean there is no delay between retries. +- Policy: This will control who can access the topic in addition to the owner of the topic. + The policy passed needs to be a JSON string similar to bucket policy. + For example, one can send a policy string as follows:: + + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": ["arn:aws:iam::usfolks:user/fred:subuser"]}, + "Action": ["sns:GetTopicAttributes","sns:Publish"], + "Resource": ["arn:aws:sns:default::mytopic"], + }] + } + + Currently, we support only the following actions: + - sns:GetTopicAttributes To list or get existing topics + - sns:SetTopicAttributes To set attributes for the existing topic + - sns:DeleteTopic To delete the existing topic + - sns:Publish To be able to create/subscribe notification on existing topic - HTTP endpoint @@ -340,6 +360,7 @@ The response has the following format: - TopicArn: topic `ARN `_. - OpaqueData: The opaque data set on the topic. +- Policy: Any access permission set on the topic. Get Topic Information ````````````````````` @@ -393,6 +414,7 @@ The response has the following format: - TopicArn: topic `ARN `_. - OpaqueData: the opaque data set on the topic. +- Policy: Any access permission set on the topic. Delete Topic ```````````` @@ -463,6 +485,61 @@ The response has the following format: topic, the request must be made over HTTPS. The "topic list" request will otherwise be rejected. +Set Topic Attributes +```````````````````` + +:: + + POST + + Action=SetTopicAttributes + &TopicArn=&AttributeName=&AttributeValue= + +This allows to set/modify existing attributes on the specified topic. + +.. note:: + + - The AttributeName passed will either be updated or created (if not exist) with AttributeValue passed. + - Any unsupported AttributeName passed will result in error 400. + +The response has the following format: + +:: + + + + + + + +Valid AttributeName that can be passed: + + - push-endpoint: This is the URI of an endpoint to send push notifications to. + - OpaqueData: Opaque data is set in the topic configuration and added to all + notifications that are triggered by the topic. + - persistent: This indicates whether notifications to this endpoint are + persistent (=asynchronous) or not persistent. (This is "false" by default.) + - time_to_live: This will limit the time (in seconds) to retain the notifications. + - max_retries: This will limit the max retries before expiring notifications. + - retry_sleep_duration: This will control the frequency of retrying the notifications. + - Policy: This will control who can access the topic other than owner of the topic. + - verify-ssl: This indicates whether the server certificates must be validated by + the client. This is "true" by default. + - ``use-ssl``: If this is set to "true", a secure connection is used to + connect to the broker. This is "false" by default. + - cloudevents: This indicates whether the HTTP header should contain + attributes according to the `S3 CloudEvents Spec`_. + - amqp-exchange: The exchanges must exist and must be able to route messages + based on topics. + - amqp-ack-level: No end2end acknowledgement is required. Messages may persist in the + broker before being delivered to their final destinations. + - ``ca-location``: If this is provided and a secure connection is used, the + specified CA will be used instead of the default CA to authenticate the + broker. + - mechanism: may be provided together with user/password (default: ``PLAIN``). + - kafka-ack-level: No end2end acknowledgement is required. Messages may persist in the + broker before being delivered to their final destinations. + Notifications ~~~~~~~~~~~~~ diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 8928e853e64..37453605260 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3850,3 +3850,18 @@ options: services: - rgw with_legacy: true +- name: mandatory_topic_permissions + type: bool + level: basic + desc: Whether to validate user permissions to access notification topics. + long_desc: If true, all users (other then the owner of the topic) will need + to have a policy to access topics. + The topic policy can be set by owner via CreateTopic() or SetTopicAttribute(). + Following permissions can be granted "sns:Publish", "sns:GetTopicAttributes", + "sns:SetTopicAttributes" and "sns:DeleteTopic" via Policy. + NOTE that even if set to "false" topics will still follow the policies if set on them. + default: false + services: + - rgw + with_legacy: true + diff --git a/src/rgw/rgw_auth_s3.cc b/src/rgw/rgw_auth_s3.cc index a2def87040e..d06c8381338 100644 --- a/src/rgw/rgw_auth_s3.cc +++ b/src/rgw/rgw_auth_s3.cc @@ -482,6 +482,7 @@ bool is_non_s3_op(RGWOpType op_type) op_type == RGW_OP_PUBSUB_TOPIC_CREATE || op_type == RGW_OP_PUBSUB_TOPICS_LIST || op_type == RGW_OP_PUBSUB_TOPIC_GET || + op_type == RGW_OP_PUBSUB_TOPIC_SET || op_type == RGW_OP_PUBSUB_TOPIC_DELETE || op_type == RGW_OP_TAG_ROLE || op_type == RGW_OP_LIST_ROLE_TAGS || diff --git a/src/rgw/rgw_iam_policy.cc b/src/rgw/rgw_iam_policy.cc index 35aeb15fcdc..0bfcccff638 100644 --- a/src/rgw/rgw_iam_policy.cc +++ b/src/rgw/rgw_iam_policy.cc @@ -157,6 +157,10 @@ static const actpair actpairs[] = { "sts:AssumeRoleWithWebIdentity", stsAssumeRoleWithWebIdentity}, { "sts:GetSessionToken", stsGetSessionToken}, { "sts:TagSession", stsTagSession}, + { "sns:GetTopicAttributes", snsGetTopicAttributes}, + { "sns:DeleteTopic", snsDeleteTopic}, + { "sns:Publish", snsPublish}, + { "sns:SetTopicAttributes", snsSetTopicAttributes}, }; struct PolicyParser; @@ -600,6 +604,12 @@ bool ParseState::do_string(CephContext* cct, const char* s, size_t l) { if ((t->notaction & stsAllValue) == stsAllValue) { t->notaction[stsAll] = 1; } + if ((t->action & snsAllValue) == snsAllValue) { + t->action[snsAll] = 1; + } + if ((t->notaction & snsAllValue) == snsAllValue) { + t->notaction[snsAll] = 1; + } } } } else if (w->id == TokenID::Resource || w->id == TokenID::NotResource) { @@ -1452,6 +1462,18 @@ const char* action_bit_string(uint64_t action) { case stsTagSession: return "sts:TagSession"; + + case snsSetTopicAttributes: + return "sns:SetTopicAttributes"; + + case snsGetTopicAttributes: + return "sns:GetTopicAttributes"; + + case snsDeleteTopic: + return "sns:DeleteTopic"; + + case snsPublish: + return "sns:Publish"; } return "s3Invalid"; } diff --git a/src/rgw/rgw_iam_policy.h b/src/rgw/rgw_iam_policy.h index c60872850a3..e528d1515c7 100644 --- a/src/rgw/rgw_iam_policy.h +++ b/src/rgw/rgw_iam_policy.h @@ -141,8 +141,14 @@ static constexpr std::uint64_t stsGetSessionToken = iamAll + 3; static constexpr std::uint64_t stsTagSession = iamAll + 4; static constexpr std::uint64_t stsAll = iamAll + 5; +static constexpr std::uint64_t snsGetTopicAttributes = stsAll + 1; +static constexpr std::uint64_t snsDeleteTopic = stsAll + 2; +static constexpr std::uint64_t snsPublish = stsAll + 3; +static constexpr std::uint64_t snsSetTopicAttributes = stsAll + 4; +static constexpr std::uint64_t snsAll = stsAll + 5; + static constexpr std::uint64_t s3Count = s3All; -static constexpr std::uint64_t allCount = stsAll + 1; +static constexpr std::uint64_t allCount = snsAll + 1; using Action_t = std::bitset; using NotAction_t = Action_t; @@ -164,6 +170,7 @@ static const Action_t None(0); static const Action_t s3AllValue = set_cont_bits(0,s3All); static const Action_t iamAllValue = set_cont_bits(s3All+1,iamAll); static const Action_t stsAllValue = set_cont_bits(iamAll+1,stsAll); +static const Action_t snsAllValue = set_cont_bits(stsAll + 1, snsAll); static const Action_t allValue = set_cont_bits(0,allCount); namespace { diff --git a/src/rgw/rgw_op_type.h b/src/rgw/rgw_op_type.h index 375c7348b24..a7a68d4ade5 100644 --- a/src/rgw/rgw_op_type.h +++ b/src/rgw/rgw_op_type.h @@ -104,6 +104,7 @@ enum RGWOpType { RGW_OP_PUBSUB_TOPIC_CREATE, RGW_OP_PUBSUB_TOPICS_LIST, RGW_OP_PUBSUB_TOPIC_GET, + RGW_OP_PUBSUB_TOPIC_SET, RGW_OP_PUBSUB_TOPIC_DELETE, RGW_OP_PUBSUB_SUB_CREATE, RGW_OP_PUBSUB_SUB_GET, diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 51c8b0281c6..14d85abe3b6 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -337,6 +337,7 @@ void rgw_pubsub_topic::dump(Formatter *f) const encode_json("dest", dest, f); encode_json("arn", arn, f); encode_json("opaqueData", opaque_data, f); + encode_json("policy", policy_text, f); } void rgw_pubsub_topic::dump_xml(Formatter *f) const @@ -346,6 +347,7 @@ void rgw_pubsub_topic::dump_xml(Formatter *f) const encode_xml("EndPoint", dest, f); encode_xml("TopicArn", arn, f); encode_xml("OpaqueData", opaque_data, f); + encode_xml("Policy", policy_text, f); } void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) { @@ -365,6 +367,7 @@ void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f); encode_xml_key_value_entry("TopicArn", arn, f); encode_xml_key_value_entry("OpaqueData", opaque_data, f); + encode_xml_key_value_entry("Policy", policy_text, f); f->close_section(); // Attributes } @@ -695,7 +698,9 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, const std::string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, - const rgw_user& user, optional_yield y) const { + const rgw_user& user, + const std::string& policy_text, + optional_yield y) const { RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; @@ -712,6 +717,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, new_topic.dest = dest; new_topic.arn = arn; new_topic.opaque_data = opaque_data; + new_topic.policy_text = policy_text; ret = write_topics(dpp, topics, &objv_tracker, y); if (ret < 0) { diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 3a87d777d6c..ddc72f99b07 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -398,19 +398,21 @@ struct rgw_pubsub_topic { rgw_pubsub_dest dest; std::string arn; std::string opaque_data; + std::string policy_text; void encode(bufferlist& bl) const { - ENCODE_START(3, 1, bl); + ENCODE_START(4, 1, bl); encode(user, bl); encode(name, bl); encode(dest, bl); encode(arn, bl); encode(opaque_data, bl); + encode(policy_text, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(3, bl); + DECODE_START(4, bl); decode(user, bl); decode(name, bl); if (struct_v >= 2) { @@ -420,6 +422,9 @@ struct rgw_pubsub_topic { if (struct_v >= 3) { decode(opaque_data, bl); } + if (struct_v >= 4) { + decode(policy_text, bl); + } DECODE_FINISH(bl); } @@ -629,7 +634,7 @@ public: int create_topic(const DoutPrefixProvider* dpp, const std::string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, const rgw_user& user, - optional_yield y) const; + const std::string& policy_text, optional_yield y) const; // remove a topic according to its name // if the topic does not exists it is a no-op (considered success) // return 0 on success, error code otherwise diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 9bc7a289763..da81711511c 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -67,6 +67,53 @@ bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) { return false; } +std::optional get_policy_from_text(req_state* const s, + std::string& policy_text) { + const auto bl = bufferlist::static_from_string(policy_text); + try { + return rgw::IAM::Policy( + s->cct, s->owner.get_id().tenant, bl, + s->cct->_conf.get_val("rgw_policy_reject_invalid_principals")); + } catch (rgw::IAM::PolicyParseException& e) { + ldout(s->cct, 1) << "failed to parse policy:' " << policy_text + << " ' with error: " << e.what() << dendl; + s->err.message = e.what(); + return std::nullopt; + } +} + +int verify_topic_owner_or_policy(req_state* const s, + const rgw_pubsub_topic& topic, + const std::string& zonegroup_name, + const uint64_t op) { + if (topic.user == s->owner.get_id()) { + return 0; + } + // no policy set. + if (topic.policy_text.empty()) { + // if mandatory_topic_permissions is true, then validate all users for + // permission. + if (s->cct->_conf->mandatory_topic_permissions) { + return -EACCES; + } else { + return 0; + } + } + // bufferlist::static_from_string wants non const string + std::string policy_text(topic.policy_text); + const auto p = get_policy_from_text(s, policy_text); + rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other; + const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, zonegroup_name, + s->user->get_tenant(), topic.name); + if (!p || p->eval(s->env, *s->auth.identity, op, arn, princ_type) != + rgw::IAM::Effect::Allow) { + ldout(s->cct, 1) << "topic_policy failed validation, topic_policy: " << p + << dendl; + return -EACCES; + } + return 0; +} + // command (AWS compliant): // POST // Action=CreateTopic&Name=[&OpaqueData=data][&push-endpoint=[&persistent][&=]] @@ -76,7 +123,8 @@ class RGWPSCreateTopicOp : public RGWOp { rgw_pubsub_dest dest; std::string topic_arn; std::string opaque_data; - + std::string policy_text; + int get_params() { topic_name = s->info.args.get("Name"); if (topic_name.empty()) { @@ -84,17 +132,32 @@ class RGWPSCreateTopicOp : public RGWOp { return -EINVAL; } + // Remove the args that are parsed, so the push_endpoint_args only contains + // necessary one's. opaque_data = s->info.args.get("OpaqueData"); + s->info.args.remove("OpaqueData"); dest.push_endpoint = s->info.args.get("push-endpoint"); + s->info.args.remove("push-endpoint"); s->info.args.get_bool("persistent", &dest.persistent, false); + s->info.args.remove("persistent"); s->info.args.get_int("time_to_live", reinterpret_cast(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE); + s->info.args.remove("time_to_live"); s->info.args.get_int("max_retries", reinterpret_cast(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE); + s->info.args.remove("max_retries"); s->info.args.get_int("retry_sleep_duration", reinterpret_cast(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE); + s->info.args.remove("retry_sleep_duration"); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { return -EINVAL; } + // Store topic Policy. + policy_text = url_decode(s->info.args.get("Policy"), true); + if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) { + return -ERR_MALFORMED_DOC; + } + s->info.args.remove("Policy"); + for (const auto& param : s->info.args.get_params()) { if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") { continue; @@ -106,14 +169,7 @@ class RGWPSCreateTopicOp : public RGWOp { // remove last separator dest.push_endpoint_args.pop_back(); } - if (!dest.push_endpoint.empty() && dest.persistent) { - const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield); - if (ret < 0) { - ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl; - return ret; - } - } - + // dest object only stores endpoint info dest.arn_topic = topic_name; // the topic ARN will be sent in the reply @@ -125,9 +181,32 @@ class RGWPSCreateTopicOp : public RGWOp { } public: - int verify_permission(optional_yield) override { - return 0; - } + int verify_permission(optional_yield y) override { + auto ret = get_params(); + if (ret < 0) { + return ret; + } + + const RGWPubSub ps(driver, s->owner.get_id().tenant); + rgw_pubsub_topic result; + ret = ps.get_topic(this, topic_name, result, y); + if (ret == -ENOENT) { + // topic not present + return 0; + } + if (ret == 0) { + if (result.user == s->owner.get_id() || + !s->cct->_conf->mandatory_topic_permissions) { + return 0; + } + ldpp_dout(this, 1) << "failed to create topic '" << topic_name + << "', topic already exist." << dendl; + return -EPERM; + } + ldpp_dout(this, 1) << "failed to read topic '" << topic_name + << "', with error:" << ret << dendl; + return ret; + } void pre_exec() override { rgw_bucket_object_pre_exec(s); @@ -163,14 +242,18 @@ class RGWPSCreateTopicOp : public RGWOp { }; void RGWPSCreateTopicOp::execute(optional_yield y) { - op_ret = get_params(); - if (op_ret < 0) { - return; + if (!dest.push_endpoint.empty() && dest.persistent) { + op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield); + if (op_ret < 0) { + ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for " + "persistent topics. error:" + << op_ret << dendl; + return; + } } - const RGWPubSub ps(driver, s->owner.get_id().tenant); op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, - s->owner.get_id(), y); + s->owner.get_id(), policy_text, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -236,6 +319,15 @@ void RGWPSListTopicsOp::execute(optional_yield y) { op_ret = -EPERM; return; } + for (auto it = result.topics.cbegin(); it != result.topics.cend();) { + if (verify_topic_owner_or_policy( + s, it->second, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsGetTopicAttributes) != 0) { + result.topics.erase(it++); + } else { + ++it; + } + } ldpp_dout(this, 20) << "successfully got topics" << dendl; } @@ -312,6 +404,14 @@ void RGWPSGetTopicOp::execute(optional_yield y) { op_ret = -EPERM; return; } + op_ret = verify_topic_owner_or_policy( + s, result, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsGetTopicAttributes); + if (op_ret != 0) { + ldpp_dout(this, 1) << "failed to get topic '" << topic_name + << "', topic owned by other user" << dendl; + return; + } ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl; } @@ -388,9 +488,202 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) { op_ret = -EPERM; return; } + op_ret = verify_topic_owner_or_policy( + s, result, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsGetTopicAttributes); + if (op_ret != 0) { + ldpp_dout(this, 1) << "failed to get topic '" << topic_name + << "', topic owned by other user" << dendl; + return; + } ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl; } +// command (AWS compliant): +// POST +// Action=SetTopicAttributes&TopicArn=&AttributeName=&AttributeValue= +class RGWPSSetTopicAttributesOp : public RGWOp { + private: + std::string topic_name; + std::string topic_arn; + std::string opaque_data; + std::string policy_text; + rgw_pubsub_dest dest; + rgw_user topic_owner; + std::string attribute_name; + + int get_params() { + const auto arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); + + if (!arn || arn->resource.empty()) { + ldpp_dout(this, 1) << "SetTopicAttribute Action 'TopicArn' argument is " + "missing or invalid" + << dendl; + return -EINVAL; + } + topic_arn = arn->to_string(); + topic_name = arn->resource; + attribute_name = s->info.args.get("AttributeName"); + if (attribute_name.empty()) { + ldpp_dout(this, 1) + << "SetTopicAttribute Action 'AttributeName' argument is " + "missing or invalid" + << dendl; + return -EINVAL; + } + return 0; + } + + int map_attributes(const rgw_pubsub_topic& topic) { + // update the default values that is stored in topic currently. + opaque_data = topic.opaque_data; + policy_text = topic.policy_text; + dest = topic.dest; + + if (attribute_name == "OpaqueData") { + opaque_data = s->info.args.get("AttributeValue"); + } else if (attribute_name == "persistent") { + s->info.args.get_bool("AttributeValue", &dest.persistent, false); + } else if (attribute_name == "time_to_live") { + s->info.args.get_int("AttributeValue", + reinterpret_cast(&dest.time_to_live), + rgw::notify::DEFAULT_GLOBAL_VALUE); + } else if (attribute_name == "max_retries") { + s->info.args.get_int("AttributeValue", + reinterpret_cast(&dest.max_retries), + rgw::notify::DEFAULT_GLOBAL_VALUE); + } else if (attribute_name == "retry_sleep_duration") { + s->info.args.get_int("AttributeValue", + reinterpret_cast(&dest.retry_sleep_duration), + rgw::notify::DEFAULT_GLOBAL_VALUE); + } else if (attribute_name == "push-endpoint") { + dest.push_endpoint = s->info.args.get("AttributeValue"); + if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { + return -EINVAL; + } + } else if (attribute_name == "Policy") { + policy_text = url_decode(s->info.args.get("AttributeValue"), true); + if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) { + return -ERR_MALFORMED_DOC; + } + } else { + // replace the push_endpoint_args if passed in SetAttribute. + const auto replace_str = [&](const std::string& param, + const std::string& val) { + auto& push_endpoint_args = dest.push_endpoint_args; + const std::string replaced_str = param + "=" + val; + const auto pos = push_endpoint_args.find(param); + if (pos == std::string::npos) { + dest.push_endpoint_args.append("&" + replaced_str); + return; + } + auto end_pos = dest.push_endpoint_args.find("&", pos); + end_pos = end_pos == std::string::npos ? push_endpoint_args.length() + : end_pos; + push_endpoint_args.replace(pos, end_pos - pos, replaced_str); + }; + const std::unordered_set push_endpoint_args = { + "verify-ssl", "use-ssl", "ca-location", "amqp-ack-level", + "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents"}; + if (push_endpoint_args.count(attribute_name) == 1) { + replace_str(attribute_name, s->info.args.get("AttributeValue")); + return 0; + } + ldpp_dout(this, 1) + << "SetTopicAttribute Action 'AttributeName' argument is " + "invalid: 'AttributeName' = " + << attribute_name << dendl; + return -EINVAL; + } + return 0; + } + + public: + int verify_permission(optional_yield y) override { + auto ret = get_params(); + if (ret < 0) { + return ret; + } + rgw_pubsub_topic result; + const RGWPubSub ps(driver, s->owner.get_id().tenant); + ret = ps.get_topic(this, topic_name, result, y); + if (ret < 0) { + ldpp_dout(this, 1) << "failed to get topic '" << topic_name + << "', ret=" << ret << dendl; + return ret; + } + topic_owner = result.user; + ret = verify_topic_owner_or_policy( + s, result, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsSetTopicAttributes); + if (ret != 0) { + ldpp_dout(this, 1) << "failed to set attributes for topic '" << topic_name + << "', topic owned by other user" << dendl; + return ret; + } + + return map_attributes(result); + } + + void pre_exec() override { rgw_bucket_object_pre_exec(s); } + void execute(optional_yield) override; + + const char* name() const override { return "pubsub_topic_set"; } + RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_SET; } + uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } + + void send_response() override { + if (op_ret) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/xml"); + + if (op_ret < 0) { + return; + } + + const auto f = s->formatter; + f->open_object_section_in_ns("SetTopicAttributesResponse", AWS_SNS_NS); + f->open_object_section("ResponseMetadata"); + encode_xml("RequestId", s->req_id, f); + f->close_section(); // ResponseMetadata + f->close_section(); // SetTopicAttributesResponse + rgw_flush_formatter_and_reset(s, f); + } +}; + +void RGWPSSetTopicAttributesOp::execute(optional_yield y) { + if (!dest.push_endpoint.empty() && dest.persistent) { + op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield); + if (op_ret < 0) { + ldpp_dout(this, 1) + << "SetTopicAttributes Action failed to create queue for " + "persistent topics. error:" + << op_ret << dendl; + return; + } + } else { // changing the persistent topic to non-persistent. + op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield); + if (op_ret != -ENOENT && op_ret < 0) { + ldpp_dout(this, 1) << "SetTopicAttributes Action failed to remove queue " + "for persistent topics. error:" + << op_ret << dendl; + return; + } + } + const RGWPubSub ps(driver, s->owner.get_id().tenant); + op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, + topic_owner, policy_text, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "failed to SetAttributes for topic '" << topic_name + << "', ret=" << op_ret << dendl; + return; + } + ldpp_dout(this, 20) << "successfully set the attributes for topic '" + << topic_name << "'" << dendl; +} + // command (AWS compliant): // POST // Action=DeleteTopic&TopicArn= @@ -407,19 +700,6 @@ class RGWPSDeleteTopicOp : public RGWOp { } topic_name = topic_arn->resource; - - // upon deletion it is not known if topic is persistent or not - // will try to delete the persistent topic anyway - const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield); - if (ret == -ENOENT) { - // topic was not persistent, or already deleted - return 0; - } - if (ret < 0) { - ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl; - return ret; - } - return 0; } @@ -463,6 +743,35 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) { return; } const RGWPubSub ps(driver, s->owner.get_id().tenant); + rgw_pubsub_topic result; + op_ret = ps.get_topic(this, topic_name, result, y); + if (op_ret == 0) { + op_ret = verify_topic_owner_or_policy( + s, result, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsDeleteTopic); + if (op_ret != 0) { + ldpp_dout(this, 1) << "failed to remove topic '" << topic_name + << "' topic owned by other user" << dendl; + return; + } + } else { + ldpp_dout(this, 1) << "failed to fetch topic '" << topic_name + << "' with error: " << op_ret << dendl; + if (op_ret == -ENOENT) { + // its not an error if no topics exist, just a no-op + op_ret = 0; + } + return; + } + // upon deletion it is not known if topic is persistent or not + // will try to delete the persistent topic anyway + op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield); + if (op_ret != -ENOENT && op_ret < 0) { + ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for " + "persistent topics. error:" + << op_ret << dendl; + return; + } op_ret = ps.remove_topic(this, topic_name, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl; @@ -473,12 +782,14 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) { using op_generator = RGWOp*(*)(); static const std::unordered_map op_generators = { - {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}}, - {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}}, - {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}}, - {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}}, - {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}} -}; + {"CreateTopic", []() -> RGWOp* { return new RGWPSCreateTopicOp; }}, + {"DeleteTopic", []() -> RGWOp* { return new RGWPSDeleteTopicOp; }}, + {"ListTopics", []() -> RGWOp* { return new RGWPSListTopicsOp; }}, + {"GetTopic", []() -> RGWOp* { return new RGWPSGetTopicOp; }}, + {"GetTopicAttributes", + []() -> RGWOp* { return new RGWPSGetTopicAttributesOp; }}, + {"SetTopicAttributes", + []() -> RGWOp* { return new RGWPSSetTopicAttributesOp; }}}; bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s) { @@ -710,6 +1021,15 @@ void RGWPSCreateNotifOp::execute(optional_yield y) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; } + op_ret = verify_topic_owner_or_policy( + s, topic_info, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsPublish); + if (op_ret != 0) { + ldpp_dout(this, 1) << "failed to create notification for topic '" + << topic_name << "' topic owned by other user" + << dendl; + return; + } // make sure that full topic configuration match // TODO: use ARN match function @@ -722,7 +1042,7 @@ void RGWPSCreateNotifOp::execute(optional_yield y) { // ARN is cached to make the "GET" method faster op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, - s->owner.get_id(), y); + s->owner.get_id(), topic_info.policy_text, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name << "', ret=" << op_ret << dendl; diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py index fe38576fb35..30fce41a1bb 100644 --- a/src/test/rgw/bucket_notification/api.py +++ b/src/test/rgw/bucket_notification/api.py @@ -78,7 +78,7 @@ class PSTopicS3: POST ?Action=GetTopic&TopicArn= POST ?Action=DeleteTopic&TopicArn= """ - def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None): + def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None, policy_text=None): self.conn = conn self.topic_name = topic_name.strip() assert self.topic_name @@ -88,6 +88,8 @@ class PSTopicS3: self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)} if opaque_data is not None: self.attributes['OpaqueData'] = opaque_data + if policy_text is not None: + self.attributes['Policy'] = policy_text protocol = 'https' if conn.is_secure else 'http' self.client = boto3.client('sns', endpoint_url=protocol+'://'+conn.host+':'+str(conn.port), @@ -96,9 +98,9 @@ class PSTopicS3: region_name=region, verify='./cert.pem') - def get_config(self): + def get_config(self, topic_arn=None): """get topic info""" - parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn} + parameters = {'Action': 'GetTopic', 'TopicArn': (topic_arn if topic_arn is not None else self.topic_arn)} body = urlparse.urlencode(parameters) string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()) content_type = 'application/x-www-form-urlencoded; charset=utf-8' @@ -131,6 +133,13 @@ class PSTopicS3: result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes) self.topic_arn = result['TopicArn'] return self.topic_arn + + def set_attributes(self, attribute_name, attribute_val, topic_arn=None): + """set topic attributes.""" + result = self.client.set_topic_attributes(TopicArn=( + topic_arn if topic_arn is not None else self.topic_arn), AttributeName=attribute_name, AttributeValue=attribute_val) + return result['ResponseMetadata']['HTTPStatusCode'] + def del_config(self, topic_arn=None): """delete topic""" diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index ffb8e488bbf..f5017885493 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -4283,6 +4283,76 @@ def test_ps_s3_multiple_topics_notification(): conn.delete_bucket(bucket_name) http_server.close() +@attr('basic_test') +def test_ps_s3_topic_permissions(): + """ test s3 topic set/get/delete permissions """ + conn1 = connection() + conn2 = another_user() + zonegroup = 'default' + bucket_name = gen_bucket_name() + topic_name = bucket_name + TOPIC_SUFFIX + topic_policy = json.dumps({ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "Statement", + "Effect": "Deny", + "Principal": "*", + "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes"], + "Resource": f"arn:aws:sns:{zonegroup}::{topic_name}" + } + ] + }) + # create s3 topic with DENY policy + endpoint_address = 'amqp://127.0.0.1:7001' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' + topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy) + topic_arn = topic_conf.set_config() + + # 2nd user tries to fetch the topic + topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args) + _, status = topic_conf2.get_config(topic_arn=topic_arn) + assert_equal(status, 403) + try: + # 2nd user tries to set the attribute + status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn) + assert False, "'AccessDenied' error is expected" + except Exception as err: + print(err) + + # create bucket for conn2 and try publishing notification to topic + _ = conn2.create_bucket(bucket_name) + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + try: + s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) + _, status = s3_notification_conf2.set_config() + assert False, "'AccessDenied' error is expected" + except ClientError as error: + assert_equal(error.response['Error']['Code'], 'AccessDenied') + + # Topic policy is now added by the 1st user to allow 2nd user. + topic_policy = topic_policy.replace("Deny", "Allow") + topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy) + topic_arn = topic_conf.set_config() + # 2nd user try to fetch topic again + _, status = topic_conf2.get_config(topic_arn=topic_arn) + assert_equal(status, 200) + # 2nd user tries to set the attribute again + status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn) + assert_equal(status, 200) + # 2nd user tries to publish notification again + s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) + _, status = s3_notification_conf2.set_config() + assert_equal(status, 200) + + # cleanup + s3_notification_conf2.del_config() + topic_conf.del_config() + # delete the bucket + conn2.delete_bucket(bucket_name) def kafka_security(security_type, mechanism='PLAIN'): """ test pushing kafka s3 notification securly to master """ diff --git a/src/test/rgw/test_rgw_iam_policy.cc b/src/test/rgw/test_rgw_iam_policy.cc index f4c3c6aff6f..c630646c912 100644 --- a/src/test/rgw/test_rgw_iam_policy.cc +++ b/src/test/rgw/test_rgw_iam_policy.cc @@ -94,6 +94,7 @@ using rgw::IAM::iamCreateRole; using rgw::IAM::iamDeleteRole; using rgw::IAM::iamAll; using rgw::IAM::stsAll; +using rgw::IAM::snsAll; using rgw::IAM::allCount; class FakeIdentity : public Identity { @@ -642,7 +643,7 @@ TEST_F(PolicyTest, Parse6) { EXPECT_TRUE(p->statements[0].noprinc.empty()); EXPECT_EQ(p->statements[0].effect, Effect::Allow); Action_t act; - for (auto i = 0U; i <= stsAll; i++) + for (auto i = 0U; i <= snsAll; i++) act[i] = 1; EXPECT_EQ(p->statements[0].action, act); EXPECT_EQ(p->statements[0].notaction, None); -- 2.39.5