]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
sts: test role policy around nonexistent objects 389/head
authorCasey Bodley <cbodley@redhat.com>
Wed, 7 Apr 2021 15:33:46 +0000 (11:33 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 7 Apr 2021 15:41:30 +0000 (11:41 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
s3tests_boto3/functional/test_sts.py

index fb4ab11cea1f2df276c4d6fd660056e068eff33a..78c6901cca475b2ba017e67c623f9048794d75af 100644 (file)
@@ -257,6 +257,89 @@ def test_assume_role_creds_expiry():
         s3bucket_error = e.response.get("Error", {}).get("Code")
     eq(s3bucket_error,'AccessDenied')
 
+@attr(resource='assume role')
+@attr(method='head')
+@attr(operation='check')
+@attr(assertion='HEAD fails with 403 when role policy denies s3:ListBucket')
+@attr('test_of_sts')
+def test_assume_role_deny_head_nonexistent():
+    # create a bucket with the normal s3 client
+    bucket_name = get_new_bucket_name()
+    get_client().create_bucket(Bucket=bucket_name)
+
+    iam_client=get_iam_client()
+    sts_client=get_sts_client()
+    sts_user_id=get_alt_user_id()
+    default_endpoint=get_config_endpoint()
+    role_session_name=get_parameter_name()
+
+    policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
+    (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+    eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name)
+
+    # allow GetObject but deny ListBucket
+    role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":"s3:GetObject","Principal":"*","Resource":"arn:aws:s3:::*"}}'
+    (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+    resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+    s3_client = boto3.client('s3',
+               aws_access_key_id = resp['Credentials']['AccessKeyId'],
+               aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+               aws_session_token = resp['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='')
+    status=200
+    try:
+        s3_client.head_object(Bucket=bucket_name, Key='nonexistent')
+    except ClientError as e:
+        status = e.response['ResponseMetadata']['HTTPStatusCode']
+    eq(status,403)
+
+@attr(resource='assume role')
+@attr(method='head')
+@attr(operation='check')
+@attr(assertion='HEAD fails with 404 when role policy allows s3:ListBucket')
+@attr('test_of_sts')
+def test_assume_role_allow_head_nonexistent():
+    # create a bucket with the normal s3 client
+    bucket_name = get_new_bucket_name()
+    get_client().create_bucket(Bucket=bucket_name)
+
+    iam_client=get_iam_client()
+    sts_client=get_sts_client()
+    sts_user_id=get_alt_user_id()
+    default_endpoint=get_config_endpoint()
+    role_session_name=get_parameter_name()
+
+    policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
+    (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+    eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name)
+
+    # allow GetObject and ListBucket
+    role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":["s3:GetObject","s3:ListBucket"],"Principal":"*","Resource":"arn:aws:s3:::*"}}'
+    (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+    resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+    s3_client = boto3.client('s3',
+               aws_access_key_id = resp['Credentials']['AccessKeyId'],
+               aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+               aws_session_token = resp['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='')
+    status=200
+    try:
+        s3_client.head_object(Bucket=bucket_name, Key='nonexistent')
+    except ClientError as e:
+        status = e.response['ResponseMetadata']['HTTPStatusCode']
+    eq(status,404)
+
+
 @attr(resource='assume role with web identity')
 @attr(method='get')
 @attr(operation='check')