from .policy import Policy, Statement, make_json_policy
-from .iam import iam_root
+from .iam import iam_root, nuke_role
from . import (
configfile,
get_restore_processor_period,
get_read_through_days,
create_iam_user_s3client,
+ get_iam_client,
+ get_sts_client,
+ get_parameter_name,
)
assert _verify_record_field(body, src_bucket_name, 'REST.PUT.OBJECT', key, "Standard", "RequestID", request_id)
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+@pytest.mark.fails_on_dbstore
+def test_bucket_logging_requester_assumed_role():
+ """verify the standard access log Requester field contains the assumed-role ARN
+ for requests made with STS temporary credentials"""
+ iam_client = get_iam_client()
+ sts_client = get_sts_client()
+ alt_user_id = get_alt_user_id()
+ default_endpoint = get_config_endpoint()
+ role_session_name = get_parameter_name()
+
+ # create an IAM role assumable by the alt user
+ trust_policy = json.dumps({
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": [f"arn:aws:iam:::user/{alt_user_id}"]},
+ "Action": ["sts:AssumeRole"]
+ }]
+ })
+ role_name = get_parameter_name()
+ role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=trust_policy)
+ role_arn = role_response['Role']['Arn']
+
+ try:
+ role_policy = json.dumps({
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Action": "s3:*",
+ "Resource": "*"
+ }]
+ })
+ iam_client.put_role_policy(RoleName=role_name, PolicyName='S3FullAccess', PolicyDocument=role_policy)
+
+ # assume the role
+ resp = sts_client.assume_role(RoleArn=role_arn, RoleSessionName=role_session_name)
+
+ assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
+ creds = resp['Credentials']
+ role_s3 = boto3.client(
+ 's3',
+ aws_access_key_id=creds['AccessKeyId'],
+ aws_secret_access_key=creds['SecretAccessKey'],
+ aws_session_token=creds['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='')
+
+ src_bucket_name = get_new_bucket_name()
+ log_bucket_name = get_new_bucket_name()
+ role_s3.create_bucket(Bucket=src_bucket_name)
+ role_s3.create_bucket(Bucket=log_bucket_name)
+ prefix = 'log/'
+ _set_log_bucket_policy_tenant(role_s3, "", log_bucket_name, "", alt_user_id, [src_bucket_name], [prefix])
+
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+ role_s3.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={'LoggingEnabled': logging_enabled})
+
+ key = 'my-test-object'
+ role_s3.put_object(Bucket=src_bucket_name, Key=key, Body=randcontent())
+
+ _flush_logs(role_s3, src_bucket_name)
+ response = role_s3.list_objects_v2(Bucket=log_bucket_name)
+ log_keys = _get_keys(response)
+ assert len(log_keys) == 1
+
+ response = role_s3.get_object(Bucket=log_bucket_name, Key=log_keys[0])
+ body = _get_body(response)
+ expected_substring = f'assumed-role/{role_name}/{role_session_name}'
+ found = False
+ for record in body.splitlines():
+ if src_bucket_name not in record or 'REST.PUT.OBJECT' not in record or key not in record:
+ continue
+ parsed = _parse_log_record(record, "Standard")
+ requester = parsed['Requester']
+ assert requester.startswith('arn:aws:sts::'), f"Requester should be an STS ARN, got: {requester}"
+ assert expected_substring in requester, f"Requester missing {expected_substring}, got: {requester}"
+ found = True
+ break
+ assert found, "no PUT log record found for the assumed-role request"
+ finally:
+ nuke_role(iam_client, role_name)
+
+
def _bucket_logging_key_filter(log_type):
src_bucket_name = get_new_bucket_name()
src_bucket = get_new_bucket_resource(name=src_bucket_name)