oidc_arn = None
return (oidc_arn, oidc_error)
+def get_s3_resource_using_iam_creds():
+ iam_access_key = get_iam_access_key()
+ iam_secret_key = get_iam_secret_key()
+ default_endpoint = get_config_endpoint()
+
+ s3_res_iam_creds = boto3.resource('s3',
+ aws_access_key_id = iam_access_key,
+ aws_secret_access_key = iam_secret_key,
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ return s3_res_iam_creds
+
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
@attr(operation='check')
@attr(assertion='assuming role using web token using aws:RequestTag in trust policy')
@attr('webidentity_test')
+@attr('abac_test')
@attr('token_request_tag_trust_policy_test')
def test_assume_role_with_web_identity_with_request_tag():
check_webidentity()
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
+@attr('abac_test')
@attr('token_principal_tag_role_policy_test')
def test_assume_role_with_web_identity_with_principal_tag():
check_webidentity()
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
+@attr('abac_test')
@attr('token_principal_tag_role_policy_test')
def test_assume_role_with_web_identity_for_all_values():
check_webidentity()
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:PrincipalTag in role policy')
@attr('webidentity_test')
+@attr('abac_test')
@attr('token_principal_tag_role_policy_test')
def test_assume_role_with_web_identity_for_all_values_deny():
check_webidentity()
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in trust policy')
@attr('webidentity_test')
+@attr('abac_test')
@attr('token_tag_keys_test')
def test_assume_role_with_web_identity_tag_keys_trust_policy():
check_webidentity()
@attr(operation='check')
@attr(assertion='assuming role using web token with aws:TagKeys in role permission policy')
@attr('webidentity_test')
+@attr('abac_test')
@attr('token_tag_keys_test')
def test_assume_role_with_web_identity_tag_keys_role_policy():
check_webidentity()
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='assuming role using web token with s3:ResourceTag in role permission policy')
+@attr('webidentity_test')
+@attr('abac_test')
+@attr('token_resource_tags_test')
+def test_assume_role_with_web_identity_resource_tag():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ user_token=get_user_token()
+ realm=get_realm_name()
+
+ s3_res_iam_creds = get_s3_resource_using_iam_creds()
+
+ s3_client_iam_creds = s3_res_iam_creds.meta.client
+
+ bucket_name = get_new_bucket_name()
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
+ Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]})
+
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url='http://localhost:8080/auth/realms/{}'.format(realm),
+ ThumbprintList=[
+ thumbprint,
+ ],
+ )
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='assuming role using web token with s3:ResourceTag with missing tags on bucket')
+@attr('webidentity_test')
+@attr('abac_test')
+@attr('token_resource_tags_test')
+def test_assume_role_with_web_identity_resource_tag_deny():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ user_token=get_user_token()
+ realm=get_realm_name()
+
+ s3_res_iam_creds = get_s3_resource_using_iam_creds()
+
+ s3_client_iam_creds = s3_res_iam_creds.meta.client
+
+ bucket_name = get_new_bucket_name()
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url='http://localhost:8080/auth/realms/{}'.format(realm),
+ ThumbprintList=[
+ thumbprint,
+ ],
+ )
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
+ except ClientError as e:
+ s3_put_obj_error = e.response.get("Error", {}).get("Code")
+ eq(s3_put_obj_error,'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='assuming role using web token with s3:ResourceTag with wrong resource tag in policy')
+@attr('webidentity_test')
+@attr('abac_test')
+@attr('token_resource_tags_test')
+def test_assume_role_with_web_identity_wrong_resource_tag_deny():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ user_token=get_user_token()
+ realm=get_realm_name()
+
+ s3_res_iam_creds = get_s3_resource_using_iam_creds()
+
+ s3_client_iam_creds = s3_res_iam_creds.meta.client
+
+ bucket_name = get_new_bucket_name()
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
+ Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'WrongResourcetag'}]})
+
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url='http://localhost:8080/auth/realms/{}'.format(realm),
+ ThumbprintList=[
+ thumbprint,
+ ],
+ )
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}"
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt")
+ except ClientError as e:
+ s3_put_obj_error = e.response.get("Error", {}).get("Code")
+ eq(s3_put_obj_error,'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='assuming role using web token with s3:ResourceTag matching aws:PrincipalTag in role permission policy')
+@attr('webidentity_test')
+@attr('abac_test')
+@attr('token_resource_tags_test')
+def test_assume_role_with_web_identity_resource_tag_princ_tag():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ user_token=get_user_token()
+ realm=get_realm_name()
+
+ s3_res_iam_creds = get_s3_resource_using_iam_creds()
+
+ s3_client_iam_creds = s3_res_iam_creds.meta.client
+
+ bucket_name = get_new_bucket_name()
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
+ Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
+
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url='http://localhost:8080/auth/realms/{}'.format(realm),
+ ThumbprintList=[
+ thumbprint,
+ ],
+ )
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}"
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ tags = 'Department=Engineering&Department=Marketing'
+ key = "test-1.txt"
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags)
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key=key)
+ eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='assuming role using web token with s3:ResourceTag used to test copy object')
+@attr('webidentity_test')
+@attr('abac_test')
+@attr('token_resource_tags_test')
+def test_assume_role_with_web_identity_resource_tag_copy_obj():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ user_token=get_user_token()
+ realm=get_realm_name()
+
+ s3_res_iam_creds = get_s3_resource_using_iam_creds()
+
+ s3_client_iam_creds = s3_res_iam_creds.meta.client
+
+ #create two buckets and add same tags to both
+ bucket_name = get_new_bucket_name()
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name)
+ Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
+
+ copy_bucket_name = get_new_bucket_name()
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=copy_bucket_name)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ bucket_tagging = s3_res_iam_creds.BucketTagging(copy_bucket_name)
+ Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]})
+
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url='http://localhost:8080/auth/realms/{}'.format(realm),
+ ThumbprintList=[
+ thumbprint,
+ ],
+ )
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}"
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ tags = 'Department=Engineering'
+ key = "test-1.txt"
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags)
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ #copy to same bucket
+ copy_source = {
+ 'Bucket': bucket_name,
+ 'Key': 'test-1.txt'
+ }
+
+ s3_client.copy(copy_source, bucket_name, "test-2.txt")
+
+ s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key="test-2.txt")
+ eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ #copy to another bucket
+ copy_source = {
+ 'Bucket': bucket_name,
+ 'Key': 'test-1.txt'
+ }
+
+ s3_client.copy(copy_source, copy_bucket_name, "test-1.txt")
+
+ s3_get_obj = s3_client.get_object(Bucket=copy_bucket_name, Key="test-1.txt")
+ eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+ )
+