From aa206bc0614d4207df61ad6156b659a44d555851 Mon Sep 17 00:00:00 2001 From: ShreeJejurikar Date: Wed, 13 May 2026 21:23:49 +0530 Subject: [PATCH] test: verify Requester field is the assumed-role ARN Add a bucket logging test that asserts the standard access log Requester field is set to the assumed-role ARN (per AWS S3 spec) when a request is made with STS temporary credentials. The test creates an IAM role assumable by the alt user, assumes it via STS, performs an S3 PutObject with the temporary credentials, flushes the bucket log, and verifies the log record's Requester field matches arn:aws:sts:::assumed-role//. Tracker: https://tracker.ceph.com/issues/71742 Signed-off-by: Shree Jejurikar --- s3tests/functional/test_s3.py | 90 ++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index c34c1e46..5fe104a3 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -38,7 +38,7 @@ from .utils import _get_status from .policy import Policy, Statement, make_json_policy -from .iam import iam_root +from .iam import iam_root, nuke_role from . import ( configfile, @@ -97,6 +97,9 @@ from . import ( get_restore_processor_period, get_read_through_days, create_iam_user_s3client, + get_iam_client, + get_sts_client, + get_parameter_name, ) @@ -15816,6 +15819,91 @@ def test_bucket_logging_request_id(): assert _verify_record_field(body, src_bucket_name, 'REST.PUT.OBJECT', key, "Standard", "RequestID", request_id) +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +@pytest.mark.fails_on_dbstore +def test_bucket_logging_requester_assumed_role(): + """verify the standard access log Requester field contains the assumed-role ARN + for requests made with STS temporary credentials""" + iam_client = get_iam_client() + sts_client = get_sts_client() + alt_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + + # create an IAM role assumable by the alt user + trust_policy = json.dumps({ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": [f"arn:aws:iam:::user/{alt_user_id}"]}, + "Action": ["sts:AssumeRole"] + }] + }) + role_name = get_parameter_name() + role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=trust_policy) + role_arn = role_response['Role']['Arn'] + + try: + role_policy = json.dumps({ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "s3:*", + "Resource": "*" + }] + }) + iam_client.put_role_policy(RoleName=role_name, PolicyName='S3FullAccess', PolicyDocument=role_policy) + + # assume the role + resp = sts_client.assume_role(RoleArn=role_arn, RoleSessionName=role_session_name) + + assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 + creds = resp['Credentials'] + role_s3 = boto3.client( + 's3', + aws_access_key_id=creds['AccessKeyId'], + aws_secret_access_key=creds['SecretAccessKey'], + aws_session_token=creds['SessionToken'], + endpoint_url=default_endpoint, + region_name='') + + src_bucket_name = get_new_bucket_name() + log_bucket_name = get_new_bucket_name() + role_s3.create_bucket(Bucket=src_bucket_name) + role_s3.create_bucket(Bucket=log_bucket_name) + prefix = 'log/' + _set_log_bucket_policy_tenant(role_s3, "", log_bucket_name, "", alt_user_id, [src_bucket_name], [prefix]) + + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix} + role_s3.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={'LoggingEnabled': logging_enabled}) + + key = 'my-test-object' + role_s3.put_object(Bucket=src_bucket_name, Key=key, Body=randcontent()) + + _flush_logs(role_s3, src_bucket_name) + response = role_s3.list_objects_v2(Bucket=log_bucket_name) + log_keys = _get_keys(response) + assert len(log_keys) == 1 + + response = role_s3.get_object(Bucket=log_bucket_name, Key=log_keys[0]) + body = _get_body(response) + expected_substring = f'assumed-role/{role_name}/{role_session_name}' + found = False + for record in body.splitlines(): + if src_bucket_name not in record or 'REST.PUT.OBJECT' not in record or key not in record: + continue + parsed = _parse_log_record(record, "Standard") + requester = parsed['Requester'] + assert requester.startswith('arn:aws:sts::'), f"Requester should be an STS ARN, got: {requester}" + assert expected_substring in requester, f"Requester missing {expected_substring}, got: {requester}" + found = True + break + assert found, "no PUT log record found for the assumed-role request" + finally: + nuke_role(iam_client, role_name) + + def _bucket_logging_key_filter(log_type): src_bucket_name = get_new_bucket_name() src_bucket = get_new_bucket_resource(name=src_bucket_name) -- 2.47.3