]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
test: verify Requester field is the assumed-role ARN 749/head
authorShreeJejurikar <shreemj8@gmail.com>
Wed, 13 May 2026 15:53:49 +0000 (21:23 +0530)
committerShreeJejurikar <shreemj8@gmail.com>
Thu, 14 May 2026 09:23:09 +0000 (14:53 +0530)
Add a bucket logging test that asserts the standard access log Requester
field is set to the assumed-role ARN (per AWS S3 spec) when a request is
made with STS temporary credentials.

The test creates an IAM role assumable by the alt user, assumes it via
STS, performs an S3 PutObject with the temporary credentials, flushes the
bucket log, and verifies the log record's Requester field matches
arn:aws:sts:::assumed-role/<role>/<session>.

Tracker: https://tracker.ceph.com/issues/71742
Signed-off-by: Shree Jejurikar <shree.jejurikar@gmail.com>
s3tests/functional/test_s3.py

index c34c1e46946105c721784c1172e35e70aa32b9cf..5fe104a3856e01d2992af1e561a69f5f84c07422 100644 (file)
@@ -38,7 +38,7 @@ from .utils import _get_status
 
 from .policy import Policy, Statement, make_json_policy
 
-from .iam import iam_root
+from .iam import iam_root, nuke_role
 
 from . import (
     configfile,
@@ -97,6 +97,9 @@ from . import (
     get_restore_processor_period,
     get_read_through_days,
     create_iam_user_s3client,
+    get_iam_client,
+    get_sts_client,
+    get_parameter_name,
     )
 
 
@@ -15816,6 +15819,91 @@ def test_bucket_logging_request_id():
         assert _verify_record_field(body, src_bucket_name, 'REST.PUT.OBJECT', key, "Standard", "RequestID", request_id)
 
 
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+@pytest.mark.fails_on_dbstore
+def test_bucket_logging_requester_assumed_role():
+    """verify the standard access log Requester field contains the assumed-role ARN
+    for requests made with STS temporary credentials"""
+    iam_client = get_iam_client()
+    sts_client = get_sts_client()
+    alt_user_id = get_alt_user_id()
+    default_endpoint = get_config_endpoint()
+    role_session_name = get_parameter_name()
+
+    # create an IAM role assumable by the alt user
+    trust_policy = json.dumps({
+        "Version": "2012-10-17",
+        "Statement": [{
+            "Effect": "Allow",
+            "Principal": {"AWS": [f"arn:aws:iam:::user/{alt_user_id}"]},
+            "Action": ["sts:AssumeRole"]
+        }]
+    })
+    role_name = get_parameter_name()
+    role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=trust_policy)
+    role_arn = role_response['Role']['Arn']
+
+    try:
+        role_policy = json.dumps({
+            "Version": "2012-10-17",
+            "Statement": [{
+                "Effect": "Allow",
+                "Action": "s3:*",
+                "Resource": "*"
+            }]
+        })
+        iam_client.put_role_policy(RoleName=role_name, PolicyName='S3FullAccess', PolicyDocument=role_policy)
+
+        # assume the role
+        resp = sts_client.assume_role(RoleArn=role_arn, RoleSessionName=role_session_name)
+
+        assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
+        creds = resp['Credentials']
+        role_s3 = boto3.client(
+            's3',
+            aws_access_key_id=creds['AccessKeyId'],
+            aws_secret_access_key=creds['SecretAccessKey'],
+            aws_session_token=creds['SessionToken'],
+            endpoint_url=default_endpoint,
+            region_name='')
+
+        src_bucket_name = get_new_bucket_name()
+        log_bucket_name = get_new_bucket_name()
+        role_s3.create_bucket(Bucket=src_bucket_name)
+        role_s3.create_bucket(Bucket=log_bucket_name)
+        prefix = 'log/'
+        _set_log_bucket_policy_tenant(role_s3, "", log_bucket_name, "", alt_user_id, [src_bucket_name], [prefix])
+
+        logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+        role_s3.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={'LoggingEnabled': logging_enabled})
+
+        key = 'my-test-object'
+        role_s3.put_object(Bucket=src_bucket_name, Key=key, Body=randcontent())
+
+        _flush_logs(role_s3, src_bucket_name)
+        response = role_s3.list_objects_v2(Bucket=log_bucket_name)
+        log_keys = _get_keys(response)
+        assert len(log_keys) == 1
+
+        response = role_s3.get_object(Bucket=log_bucket_name, Key=log_keys[0])
+        body = _get_body(response)
+        expected_substring = f'assumed-role/{role_name}/{role_session_name}'
+        found = False
+        for record in body.splitlines():
+            if src_bucket_name not in record or 'REST.PUT.OBJECT' not in record or key not in record:
+                continue
+            parsed = _parse_log_record(record, "Standard")
+            requester = parsed['Requester']
+            assert requester.startswith('arn:aws:sts::'), f"Requester should be an STS ARN, got: {requester}"
+            assert expected_substring in requester, f"Requester missing {expected_substring}, got: {requester}"
+            found = True
+            break
+        assert found, "no PUT log record found for the assumed-role request"
+    finally:
+        nuke_role(iam_client, role_name)
+
+
 def _bucket_logging_key_filter(log_type):
     src_bucket_name = get_new_bucket_name()
     src_bucket = get_new_bucket_resource(name=src_bucket_name)