]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
BucketPolicy: add tests for ConfirmRemoveSelfBucketAccess header 567/head
authorSeena Fallah <seenafallah@gmail.com>
Wed, 22 May 2024 15:57:25 +0000 (17:57 +0200)
committerSeena Fallah <seenafallah@gmail.com>
Wed, 19 Mar 2025 09:29:21 +0000 (10:29 +0100)
Refrence: https://github.com/ceph/ceph/pull/57629
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
s3tests_boto3/functional/__init__.py
s3tests_boto3/functional/test_s3.py

index 5fd58913f47e6fb642518a16611a66c998465d7c..08f884bee112dd75812da656dfb4ec6024673bd0 100644 (file)
@@ -804,3 +804,24 @@ def get_restore_debug_interval():
 
 def get_read_through_days():
     return config.read_through_restore_days
+
+def create_iam_user_s3client(client):
+    prefix = get_iam_path_prefix()
+
+    # generate random name
+    randname = ''.join(
+        random.choice(string.ascii_lowercase + string.digits)
+        for c in range(8)
+    )
+    name = make_iam_name(randname)
+
+    user = client.create_user(UserName=name, Path=prefix)
+
+    # create s3 access and secret keys
+    keys = client.create_access_key(UserName=user['User']['UserName'])
+
+    # create s3 client
+    return get_iam_s3client(
+        aws_access_key_id=keys['AccessKey']['AccessKeyId'],
+        aws_secret_access_key=keys['AccessKey']['SecretAccessKey'],
+    )
index 2e86d44310c3ee84b1e8963d2816fe5dca46921f..b6e3c5c63df4e7bd0731ffb1cf0ae06d95d46281 100644 (file)
@@ -28,11 +28,8 @@ import dateutil.parser
 import ssl
 from collections import namedtuple
 from collections import defaultdict
-from io import StringIO
 from io import BytesIO
 
-from email.header import decode_header
-
 from .utils import assert_raises
 from .utils import generate_random
 from .utils import _get_status_and_error_code
@@ -40,6 +37,8 @@ from .utils import _get_status
 
 from .policy import Policy, Statement, make_json_policy
 
+from .iam import iam_root
+
 from . import (
     configfile,
     setup_teardown,
@@ -68,6 +67,7 @@ from . import (
     get_alt_user_id,
     get_alt_email,
     get_alt_client,
+    get_iam_root_client,
     get_tenant_client,
     get_v2_tenant_client,
     get_tenant_iam_client,
@@ -90,6 +90,7 @@ from . import (
     get_lc_debug_interval,
     get_restore_debug_interval,
     get_read_through_days,
+    create_iam_user_s3client,
     )
 
 
@@ -10848,6 +10849,87 @@ def test_bucketv2_policy():
     response = alt_client.list_objects_v2(Bucket=bucket_name)
     assert len(response['Contents']) == 1
 
+@pytest.mark.bucket_policy
+@pytest.mark.iam_account
+@pytest.mark.iam_user
+def test_bucket_policy_deny_self_denied_policy(iam_root):
+    root_client = get_iam_root_client(service_name="s3")
+    bucket_name = get_new_bucket(root_client)
+
+    resource1 = "arn:aws:s3:::" + bucket_name
+    resource2 = "arn:aws:s3:::" + bucket_name + "/*"
+    policy_document = json.dumps(
+    {
+        "Version": "2012-10-17",
+        "Statement": [{
+        "Effect": "Deny",
+        "Principal": "*",
+        "Action": [
+            "s3:PutBucketPolicy",
+            "s3:GetBucketPolicy",
+            "s3:DeleteBucketPolicy",
+        ],
+        "Resource": [
+            "{}".format(resource1),
+            "{}".format(resource2)
+          ]
+        }]
+     })
+
+    root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+    # non-root account should not be able to get, put or delete bucket policy
+    root_alt_client = create_iam_user_s3client(iam_root)
+    check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
+    check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
+    check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
+
+    # root account should be able to get, put or delete bucket policy
+    response = root_client.get_bucket_policy(Bucket=bucket_name)
+    assert response['Policy'] == policy_document
+    root_client.delete_bucket_policy(Bucket=bucket_name)
+    root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+@pytest.mark.bucket_policy
+@pytest.mark.iam_account
+@pytest.mark.iam_user
+def test_bucket_policy_deny_self_denied_policy_confirm_header(iam_root):
+    root_client = get_iam_root_client(service_name="s3")
+    bucket_name = get_new_bucket(root_client)
+
+    resource1 = "arn:aws:s3:::" + bucket_name
+    resource2 = "arn:aws:s3:::" + bucket_name + "/*"
+    policy_document = json.dumps(
+    {
+        "Version": "2012-10-17",
+        "Statement": [{
+        "Effect": "Deny",
+        "Principal": "*",
+        "Action": [
+            "s3:PutBucketPolicy",
+            "s3:GetBucketPolicy",
+            "s3:DeleteBucketPolicy",
+        ],
+        "Resource": [
+            "{}".format(resource1),
+            "{}".format(resource2)
+          ]
+        }]
+     })
+
+    root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document, ConfirmRemoveSelfBucketAccess=True)
+
+    # non-root account should not be able to get, put or delete bucket policy
+    root_alt_client = create_iam_user_s3client(iam_root)
+    check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
+    check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
+    check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
+
+    # root account should not be able to get, put or delete bucket policy
+    check_access_denied(root_client.get_bucket_policy, Bucket=bucket_name)
+    check_access_denied(root_client.delete_bucket_policy, Bucket=bucket_name)
+    check_access_denied(root_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
+
 @pytest.mark.bucket_policy
 def test_bucket_policy_acl():
     bucket_name = get_new_bucket()