// a "notification" and a subscription will be auto-generated
// actual configuration is XML encoded in the body of the message
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
- private:
- std::string bucket_name;
- std::unique_ptr<rgw::sal::Bucket> bucket;
- rgw_pubsub_s3_notifications configurations;
-
- int get_params() {
+ int verify_params() override {
bool exists;
const auto no_value = s->info.args.get("notification", &exists);
if (!exists) {
ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
return -EINVAL;
}
- bucket_name = s->bucket_name;
return 0;
}
- public:
- int verify_permission(optional_yield y) override;
-
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- const char* name() const override { return "pubsub_notification_create_s3"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-
- int get_params_from_body() {
+ int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
int r;
bufferlist data;
}
return 0;
}
+public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notification_create_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
void execute(optional_yield) override;
};
void RGWPSCreateNotifOp::execute(optional_yield y) {
- op_ret = get_params_from_body();
+ op_ret = verify_params();
if (op_ret < 0) {
return;
}
+ rgw_pubsub_s3_notifications configurations;
+ op_ret = get_params_from_body(configurations);
+ if (op_ret < 0) {
+ return;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
+ return;
+ }
+
const RGWPubSub ps(driver, s->owner.get_id().tenant);
const RGWPubSub::Bucket b(ps, bucket.get());
rgw_pubsub_bucket_topics bucket_topics;
op_ret = b.get_topics(this, bucket_topics, y);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_name << "', ret=" << op_ret << dendl;
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
return;
}
}
int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
- ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
- if (ret < 0) {
- ldpp_dout(this, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
- return ret;
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+ return -EACCES;
}
- if (bucket->get_info().owner != s->owner.get_id()) {
- ldpp_dout(this, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
- return -EPERM;
- }
return 0;
}
// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
- private:
- std::string bucket_name;
- std::unique_ptr<rgw::sal::Bucket> bucket;
- std::string notif_name;
-
- public:
- int verify_permission(optional_yield y) override;
-
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- const char* name() const override { return "pubsub_notification_delete_s3"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-
- int get_params() {
+ int get_params(std::string& notif_name) const {
bool exists;
notif_name = s->info.args.get("notification", &exists);
if (!exists) {
ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
return -EINVAL;
}
- bucket_name = s->bucket_name;
return 0;
}
+public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notification_delete_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
void execute(optional_yield y) override;
};
void RGWPSDeleteNotifOp::execute(optional_yield y) {
- op_ret = get_params();
+ std::string notif_name;
+ op_ret = get_params(notif_name);
if (op_ret < 0) {
return;
}
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
+ return;
+ }
+
const RGWPubSub ps(driver, s->owner.get_id().tenant);
const RGWPubSub::Bucket b(ps, bucket.get());
rgw_pubsub_bucket_topics bucket_topics;
op_ret = b.get_topics(this, bucket_topics, y);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_name << "', ret=" << op_ret << dendl;
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
return;
}
}
int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
- int ret = get_params();
- if (ret < 0) {
- return ret;
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+ return -EACCES;
}
- std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
- ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket->get_info().owner != s->owner.get_id()) {
- ldpp_dout(this, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
- return -EPERM;
- }
return 0;
}
// command (S3 compliant): GET /bucket?notification[=<notification-id>]
class RGWPSListNotifsOp : public RGWOp {
-private:
- std::string bucket_name;
- std::unique_ptr<rgw::sal::Bucket> bucket;
- std::string notif_name;
rgw_pubsub_s3_notifications notifications;
- int get_params() {
+ int get_params(std::string& notif_name) const {
bool exists;
notif_name = s->info.args.get("notification", &exists);
if (!exists) {
ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
return -EINVAL;
}
- bucket_name = s->bucket_name;
return 0;
}
- public:
+public:
int verify_permission(optional_yield y) override;
void pre_exec() override {
};
void RGWPSListNotifsOp::execute(optional_yield y) {
+ std::string notif_name;
+ op_ret = get_params(notif_name);
+ if (op_ret < 0) {
+ return;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
+ return;
+ }
+
const RGWPubSub ps(driver, s->owner.get_id().tenant);
const RGWPubSub::Bucket b(ps, bucket.get());
rgw_pubsub_bucket_topics bucket_topics;
op_ret = b.get_topics(this, bucket_topics, y);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_name << "', ret=" << op_ret << dendl;
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
return;
}
if (!notif_name.empty()) {
}
int RGWPSListNotifsOp::verify_permission(optional_yield y) {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
- ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket->get_info().owner != s->owner.get_id()) {
- ldpp_dout(this, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
- return -EPERM;
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketNotification)) {
+ return -EACCES;
}
return 0;
import os
import string
import boto
+from botocore.exceptions import ClientError
from http import server as http_server
from random import randint
import hashlib
return conn
+def another_user(tenant=None):
+ access_key = str(time.time())
+ secret_key = str(time.time())
+ uid = 'superman' + str(time.time())
+ if tenant:
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ else:
+ _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+
+ assert_equal(result, 0)
+ conn = S3Connection(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host(),
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ return conn
+
##############
# bucket notifications tests
##############
@attr('basic_test')
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
-
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = 'superman' + str(time.time())
tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
- assert_equal(result, 0)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ conn = another_user(tenant)
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
@attr('basic_test')
def test_ps_s3_topic_admin_on_master():
""" test s3 topics set/get/delete on master """
-
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = 'superman' + str(time.time())
tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
- assert_equal(result, 0)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ conn = another_user(tenant)
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
# delete the bucket
conn.delete_bucket(bucket_name)
+@attr('basic_test')
+def test_ps_s3_notification_permissions():
+ """ test s3 notification set/get/delete permissions """
+ conn1 = connection()
+ conn2 = another_user()
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ # create bucket
+ bucket = conn1.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create s3 topic
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # one user create a notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf1 = PSNotificationS3(conn1, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf1.set_config()
+ assert_equal(status, 200)
+ # another user try to fetch it
+ s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf2.get_config()
+ assert False, "'AccessDenied' error is expected"
+ except ClientError as error:
+ assert_equal(error.response['Error']['Code'], 'AccessDenied')
+ # other user try to delete the notification
+ _, status = s3_notification_conf2.del_config()
+ assert_equal(status, 403)
+
+ # bucket policy is added by the 1st user
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn1.host+':'+str(conn1.port),
+ aws_access_key_id=conn1.aws_access_key_id,
+ aws_secret_access_key=conn1.aws_secret_access_key)
+ bucket_policy = json.dumps({
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Sid": "Statement",
+ "Effect": "Allow",
+ "Principal": "*",
+ "Action": ["s3:GetBucketNotification", "s3:PutBucketNotification"],
+ "Resource": f"arn:aws:s3:::{bucket_name}"
+ }
+ ]
+ })
+ response = client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)
+ assert_equal(int(response['ResponseMetadata']['HTTPStatusCode']/100), 2)
+ result = client.get_bucket_policy(Bucket=bucket_name)
+ print(result['Policy'])
+
+ # 2nd user try to fetch it again
+ _, status = s3_notification_conf2.get_config()
+ assert_equal(status, 200)
+
+ # 2nd user try to delete it again
+ result, status = s3_notification_conf2.del_config()
+ assert_equal(status, 200)
+
+ # 2nd user try to add another notification
+ topic_conf_list = [{'Id': notification_name+"2",
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+ result, status = s3_notification_conf2.set_config()
+ assert_equal(status, 200)
+
+ # cleanup
+ s3_notification_conf1.del_config()
+ s3_notification_conf2.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn1.delete_bucket(bucket_name)
@attr('amqp_test')
def test_ps_s3_notification_push_amqp_on_master():