trim the log every second (`mds_log_trim_upkeep_interval` config). Also,
a couple of configs govern how much time the MDS spends in trimming its
logs. These configs are `mds_log_trim_threshold` and `mds_log_trim_decay_rate`.
+* RGW: Notification topics are now owned by the user that created them.
+ By default, only the owner can read/write their topics. Topic policy documents
+ are now supported to grant these permissions to other users. Preexisting topics
+ are treated as if they have no owner, and any user can read/write them using the SNS API.
+ If such a topic is recreated with CreateTopic, the issuing user becomes the new owner.
+ For backward compatibility, all users still have permission to publish bucket
+ notifications to topics owned by other users. A new configuration parameter:
+ ``rgw_topic_require_publish_policy`` can be enabled to deny ``sns:Publish``
+ permissions unless explicitly granted by topic policy.
>=18.0.0
services:
- rgw
with_legacy: true
-- name: mandatory_topic_permissions
+- name: rgw_topic_require_publish_policy
type: bool
level: basic
- desc: Whether to validate user permissions to access notification topics.
+ desc: Whether to validate user permissions to publish notifications to topics.
long_desc: If true, all users (other then the owner of the topic) will need
- to have a policy to access topics.
+ to have a policy to publish notifications to topics.
The topic policy can be set by owner via CreateTopic() or SetTopicAttribute().
Following permissions can be granted "sns:Publish", "sns:GetTopicAttributes",
- "sns:SetTopicAttributes" and "sns:DeleteTopic" via Policy.
+ "sns:SetTopicAttributes", "sns:DeleteTopic" and "sns:CreateTopic" via Policy.
NOTE that even if set to "false" topics will still follow the policies if set on them.
default: false
services:
{ "sns:DeleteTopic", snsDeleteTopic},
{ "sns:Publish", snsPublish},
{ "sns:SetTopicAttributes", snsSetTopicAttributes},
+ { "sns:CreateTopic", snsCreateTopic},
};
struct PolicyParser;
case snsPublish:
return "sns:Publish";
+
+ case snsCreateTopic:
+ return "sns:CreateTopic";
}
return "s3Invalid";
}
static constexpr std::uint64_t snsDeleteTopic = stsAll + 2;
static constexpr std::uint64_t snsPublish = stsAll + 3;
static constexpr std::uint64_t snsSetTopicAttributes = stsAll + 4;
-static constexpr std::uint64_t snsAll = stsAll + 5;
+static constexpr std::uint64_t snsCreateTopic = stsAll + 5;
+static constexpr std::uint64_t snsAll = stsAll + 6;
static constexpr std::uint64_t s3Count = s3All;
static constexpr std::uint64_t allCount = snsAll + 1;
#include <algorithm>
#include <boost/tokenizer.hpp>
#include <optional>
+#include "rgw_iam_policy.h"
#include "rgw_rest_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_pubsub.h"
s->cct, s->owner.id.tenant, bl,
s->cct->_conf.get_val<bool>("rgw_policy_reject_invalid_principals"));
} catch (rgw::IAM::PolicyParseException& e) {
- ldout(s->cct, 1) << "failed to parse policy:' " << policy_text
- << " ' with error: " << e.what() << dendl;
+ ldout(s->cct, 1) << "failed to parse policy: '" << policy_text
+ << "' with error: " << e.what() << dendl;
s->err.message = e.what();
return std::nullopt;
}
}
// no policy set.
if (topic.policy_text.empty()) {
- // if mandatory_topic_permissions is true, then validate all users for
- // permission.
- if (s->cct->_conf->mandatory_topic_permissions) {
- return -EACCES;
- } else {
+ // if rgw_topic_require_publish_policy is "false" dont validate "publish" policies
+ if (op == rgw::IAM::snsPublish && !s->cct->_conf->rgw_topic_require_publish_policy) {
+ return 0;
+ }
+ if (topic.user.empty()) {
+ // if we don't know the original user and there is no policy
+ // we will not reject the request.
+ // this is for compatibility with versions that did not store the user in the topic
return 0;
}
+ s->err.message = "Topic was created by another user.";
+ return -EACCES;
}
// bufferlist::static_from_string wants non const string
std::string policy_text(topic.policy_text);
s->user->get_tenant(), topic.name);
if (!p || p->eval(s->env, *s->auth.identity, op, arn, princ_type) !=
rgw::IAM::Effect::Allow) {
- ldout(s->cct, 1) << "topic_policy failed validation, topic_policy: " << p
+ ldout(s->cct, 1) << "topic policy failed validation, topic policy: " << p
<< dendl;
return -EACCES;
}
return 0;
}
if (ret == 0) {
- if (result.user == s->owner.id ||
- !s->cct->_conf->mandatory_topic_permissions) {
+ ret = verify_topic_owner_or_policy(
+ s, result, driver->get_zone()->get_zonegroup().get_name(),
+ rgw::IAM::snsCreateTopic);
+ if (ret == 0)
+ {
return 0;
}
- ldpp_dout(this, 1) << "failed to create topic '" << topic_name
+
+ ldpp_dout(this, 1) << "no permission to modify topic '" << topic_name
<< "', topic already exist." << dendl;
- return -EPERM;
+ return -EACCES;
}
ldpp_dout(this, 1) << "failed to read topic '" << topic_name
<< "', with error:" << ret << dendl;
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsGetTopicAttributes);
if (op_ret != 0) {
- ldpp_dout(this, 1) << "failed to get topic '" << topic_name
- << "', topic owned by other user" << dendl;
+ ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
+ << "'" << dendl;
return;
}
ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsGetTopicAttributes);
if (op_ret != 0) {
- ldpp_dout(this, 1) << "failed to get topic '" << topic_name
- << "', topic owned by other user" << dendl;
+ ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
+ << "'" << dendl;
return;
}
ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsSetTopicAttributes);
if (ret != 0) {
- ldpp_dout(this, 1) << "failed to set attributes for topic '" << topic_name
- << "', topic owned by other user" << dendl;
+ ldpp_dout(this, 1) << "no permission to set attributes for topic '" << topic_name
+ << "'" << dendl;
return ret;
}
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsDeleteTopic);
if (op_ret != 0) {
- ldpp_dout(this, 1) << "failed to remove topic '" << topic_name
- << "' topic owned by other user" << dendl;
+ ldpp_dout(this, 1) << "no permission to remove topic '" << topic_name
+ << "'" << dendl;
return;
}
} else {
s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsPublish);
if (op_ret != 0) {
- ldpp_dout(this, 1) << "failed to create notification for topic '"
- << topic_name << "' topic owned by other user"
- << dendl;
+ ldpp_dout(this, 1) << "no permission to create notification for topic '"
+ << topic_name << "'" << dendl;
return;
}
// make sure that full topic configuration match
conn.delete_bucket(bucket_name)
http_server.close()
+
@attr('basic_test')
def test_ps_s3_topic_permissions():
""" test s3 topic set/get/delete permissions """
"Sid": "Statement",
"Effect": "Deny",
"Principal": "*",
- "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes"],
+ "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes", "sns:DeleteTopic", "sns:CreateTopic"],
"Resource": f"arn:aws:sns:{zonegroup}::{topic_name}"
}
]
topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
topic_arn = topic_conf.set_config()
- # 2nd user tries to fetch the topic
topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args)
+ try:
+ # 2nd user tries to override the topic
+ topic_arn = topic_conf2.set_config()
+ assert False, "'AccessDenied' error is expected"
+ except ClientError as err:
+ if 'Error' in err.response:
+ assert_equal(err.response['Error']['Code'], 'AccessDenied')
+ else:
+ assert_equal(err.response['Code'], 'AccessDenied')
+ except Exception as err:
+ print('unexpected error type: '+type(err).__name__)
+
+ # 2nd user tries to fetch the topic
_, status = topic_conf2.get_config(topic_arn=topic_arn)
assert_equal(status, 403)
+
try:
# 2nd user tries to set the attribute
status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
except Exception as err:
print('unexpected error type: '+type(err).__name__)
+ try:
+ # 2nd user tries to delete the topic
+ status = topic_conf2.del_config(topic_arn=topic_arn)
+ assert False, "'AccessDenied' error is expected"
+ except ClientError as err:
+ if 'Error' in err.response:
+ assert_equal(err.response['Error']['Code'], 'AccessDenied')
+ else:
+ assert_equal(err.response['Code'], 'AccessDenied')
+ except Exception as err:
+ print('unexpected error type: '+type(err).__name__)
+
# Topic policy is now added by the 1st user to allow 2nd user.
topic_policy = topic_policy.replace("Deny", "Allow")
topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
_, status = s3_notification_conf2.set_config()
assert_equal(status, 200)
+ # 2nd user tries to delete the topic again
+ status = topic_conf2.del_config(topic_arn=topic_arn)
+ assert_equal(status, 200)
+
+ # cleanup
+ s3_notification_conf2.del_config()
+ # delete the bucket
+ conn2.delete_bucket(bucket_name)
+
+
+@attr('basic_test')
+def test_ps_s3_topic_no_permissions():
+ """ test s3 topic set/get/delete permissions """
+ conn1 = connection()
+ conn2 = another_user()
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic without policy
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args)
+ try:
+ # 2nd user tries to override the topic
+ topic_arn = topic_conf2.set_config()
+ assert False, "'AccessDenied' error is expected"
+ except ClientError as err:
+ if 'Error' in err.response:
+ assert_equal(err.response['Error']['Code'], 'AccessDenied')
+ else:
+ assert_equal(err.response['Code'], 'AccessDenied')
+ except Exception as err:
+ print('unexpected error type: '+type(err).__name__)
+
+ # 2nd user tries to fetch the topic
+ _, status = topic_conf2.get_config(topic_arn=topic_arn)
+ assert_equal(status, 403)
+
+ try:
+ # 2nd user tries to set the attribute
+ status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
+ assert False, "'AccessDenied' error is expected"
+ except ClientError as err:
+ if 'Error' in err.response:
+ assert_equal(err.response['Error']['Code'], 'AccessDenied')
+ else:
+ assert_equal(err.response['Code'], 'AccessDenied')
+ except Exception as err:
+ print('unexpected error type: '+type(err).__name__)
+
+ # create bucket for conn2 publish notification to topic
+ # should be allowed based on the default value of rgw_topic_require_publish_policy=false
+ _ = conn2.create_bucket(bucket_name)
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf2.set_config()
+ assert_equal(status, 200)
+
+ try:
+ # 2nd user tries to delete the topic
+ status = topic_conf2.del_config(topic_arn=topic_arn)
+ assert False, "'AccessDenied' error is expected"
+ except ClientError as err:
+ if 'Error' in err.response:
+ assert_equal(err.response['Error']['Code'], 'AccessDenied')
+ else:
+ assert_equal(err.response['Code'], 'AccessDenied')
+ except Exception as err:
+ print('unexpected error type: '+type(err).__name__)
# cleanup
s3_notification_conf2.del_config()
# delete the bucket
conn2.delete_bucket(bucket_name)
+
def kafka_security(security_type, mechanism='PLAIN'):
""" test pushing kafka s3 notification securly to master """
conn = connection()