get_aud,
get_token,
get_realm_name,
- check_webidentity
+ check_webidentity,
+ get_iam_access_key,
+ get_iam_secret_key
)
log = logging.getLogger(__name__)
role_err = e.response['Code']
return (role_err,role_response)
+def get_s3_client_using_iam_creds():
+ iam_access_key = get_iam_access_key()
+ iam_secret_key = get_iam_secret_key()
+ default_endpoint = get_config_endpoint()
+
+ s3_client_iam_creds = boto3.client('s3',
+ aws_access_key_id = iam_access_key,
+ aws_secret_access_key = iam_secret_key,
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ return s3_client_iam_creds
+
+def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist):
+ oidc_arn = None
+ oidc_error = None
+ clientids = []
+ if clientidlist is None:
+ clientidlist=clientids
+ try:
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url=url,
+ ClientIDList=clientidlist,
+ ThumbprintList=thumbprintlist,
+ )
+ oidc_arn = oidc_response['OpenIDConnectProviderArn']
+ print (oidc_arn)
+ except ClientError as e:
+ oidc_error = e.response['Code']
+ print (oidc_error)
+ try:
+ oidc_error = None
+ print (url)
+ if url.startswith('http://'):
+ url = url[len('http://'):]
+ elif url.startswith('https://'):
+ url = url[len('https://'):]
+ elif url.startswith('www.'):
+ url = url[len('www.'):]
+ oidc_arn = 'arn:aws:iam:::oidc-provider/{}'.format(url)
+ print (url)
+ print (oidc_arn)
+ oidc_response = iam_client.get_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn)
+ except ClientError as e:
+ oidc_arn = None
+ return (oidc_arn, oidc_error)
+
@attr(resource='get session token')
@attr(method='get')
@attr(operation='check')
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
'''
+
+#######################
+# Session Policy Tests
+#######################
+
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='checking session policy working for two different buckets')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_check_on_different_buckets():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"arn:aws:s3:::test2\",\"arn:aws:s3:::test2/*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_name_1 = 'test1'
+ try:
+ s3bucket = s3_client.create_bucket(Bucket=bucket_name_1)
+ except ClientError as e:
+ s3bucket_error = e.response.get("Error", {}).get("Code")
+ eq(s3bucket_error, 'AccessDenied')
+
+ bucket_name_2 = 'test2'
+ try:
+ s3bucket = s3_client.create_bucket(Bucket=bucket_name_2)
+ except ClientError as e:
+ s3bucket_error = e.response.get("Error", {}).get("Code")
+ eq(s3bucket_error, 'AccessDenied')
+
+ bucket_body = 'please-write-something'
+ #body.encode(encoding='utf_8')
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3_put_obj_error = e.response.get("Error", {}).get("Code")
+ eq(s3_put_obj_error,'NoSuchBucket')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking session policy working for same bucket')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_check_on_same_bucket():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client_iam_creds = get_s3_client_using_iam_creds()
+
+ bucket_name_1 = 'test1'
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='checking put_obj op denial')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_check_put_obj_denial():
+ check_webidentity()
+ iam_client=get_iam_client()
+ iam_access_key=get_iam_access_key()
+ iam_secret_key=get_iam_secret_key()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client_iam_creds = get_s3_client_using_iam_creds()
+
+ bucket_name_1 = 'test1'
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3_put_obj_error = e.response.get("Error", {}).get("Code")
+ eq(s3_put_obj_error, 'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='checking put_obj working by swapping policies')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_swapping_role_policy_and_session_policy():
+ check_webidentity()
+ iam_client=get_iam_client()
+ iam_access_key=get_iam_access_key()
+ iam_secret_key=get_iam_secret_key()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client_iam_creds = get_s3_client_using_iam_creds()
+
+ bucket_name_1 = 'test1'
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_body = 'this is a test file'
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking put_obj working by setting different permissions to role and session policy')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_check_different_op_permissions():
+ check_webidentity()
+ iam_client=get_iam_client()
+ iam_access_key=get_iam_access_key()
+ iam_secret_key=get_iam_secret_key()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client_iam_creds = get_s3_client_using_iam_creds()
+
+ bucket_name_1 = 'test1'
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3_put_obj_error = e.response.get("Error", {}).get("Code")
+ eq(s3_put_obj_error, 'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking op behaviour with deny effect')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_check_with_deny_effect():
+ check_webidentity()
+ iam_client=get_iam_client()
+ iam_access_key=get_iam_access_key()
+ iam_secret_key=get_iam_secret_key()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client_iam_creds = get_s3_client_using_iam_creds()
+
+ bucket_name_1 = 'test1'
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_body = 'this is a test file'
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3_put_obj_error = e.response.get("Error", {}).get("Code")
+ eq(s3_put_obj_error, 'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking put_obj working with deny and allow on same op')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_check_with_deny_on_same_op():
+ check_webidentity()
+ iam_client=get_iam_client()
+ iam_access_key=get_iam_access_key()
+ iam_secret_key=get_iam_secret_key()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client_iam_creds = get_s3_client_using_iam_creds()
+
+ bucket_name_1 = 'test1'
+ s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+
+ bucket_body = 'this is a test file'
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3_put_obj_error = e.response.get("Error", {}).get("Code")
+ eq(s3_put_obj_error, 'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking op when bucket policy has role arn')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_bucket_policy_role_arn():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3client_iamcreds = get_s3_client_using_iam_creds()
+ bucket_name_1 = 'test1'
+ s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resource1 = "arn:aws:s3:::" + bucket_name_1
+ resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
+ rolearn = "arn:aws:iam:::role/" + general_role_name
+ bucket_policy = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": "{}".format(rolearn)},
+ "Action": ["s3:GetObject","s3:PutObject"],
+ "Resource": [
+ "{}".format(resource1),
+ "{}".format(resource2)
+ ]
+ }]
+ })
+ s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_body = 'this is a test file'
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ try:
+ obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3object_error = e.response.get("Error", {}).get("Code")
+ eq(s3object_error, 'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='checking op when bucket policy has session arn')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_bucket_policy_session_arn():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3client_iamcreds = get_s3_client_using_iam_creds()
+ bucket_name_1 = 'test1'
+ s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resource1 = "arn:aws:s3:::" + bucket_name_1
+ resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
+ rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
+ bucket_policy = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": "{}".format(rolesessionarn)},
+ "Action": ["s3:GetObject","s3:PutObject"],
+ "Resource": [
+ "{}".format(resource1),
+ "{}".format(resource2)
+ ]
+ }]
+ })
+ s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_body = 'this is a test file'
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+
+ s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
+ eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking copy object op with role, session and bucket policy')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_copy_object():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3client_iamcreds = get_s3_client_using_iam_creds()
+ bucket_name_1 = 'test1'
+ s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resource1 = "arn:aws:s3:::" + bucket_name_1
+ resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
+ rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
+ print (rolesessionarn)
+ bucket_policy = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": "{}".format(rolesessionarn)},
+ "Action": ["s3:GetObject","s3:PutObject"],
+ "Resource": [
+ "{}".format(resource1),
+ "{}".format(resource2)
+ ]
+ }]
+ })
+ s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_body = 'this is a test file'
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ eq(s3_put_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ copy_source = {
+ 'Bucket': bucket_name_1,
+ 'Key': 'test-1.txt'
+ }
+
+ s3_client.copy(copy_source, bucket_name_1, "test-2.txt")
+
+ s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-2.txt")
+ eq(s3_get_obj['ResponseMetadata']['HTTPStatusCode'],200)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking op is denied when no role policy')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_no_bucket_role_policy():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ s3client_iamcreds = get_s3_client_using_iam_creds()
+ bucket_name_1 = 'test1'
+ s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\",\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_body = 'this is a test file'
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3putobj_error = e.response.get("Error", {}).get("Code")
+ eq(s3putobj_error, 'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )
+
+@attr(resource='assume role with web identity')
+@attr(method='put')
+@attr(operation='check')
+@attr(assertion='checking op is denied when resource policy denies')
+@attr('webidentity_test')
+@attr('session_policy')
+def test_session_policy_bucket_policy_deny():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ url = 'http://localhost:8080/auth/realms/{}'.format(realm)
+ thumbprintlist = [thumbprint]
+ (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist)
+ if oidc_error is not None:
+ raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error))
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}"
+
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3client_iamcreds = get_s3_client_using_iam_creds()
+ bucket_name_1 = 'test1'
+ s3bucket = s3client_iamcreds.create_bucket(Bucket=bucket_name_1)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resource1 = "arn:aws:s3:::" + bucket_name_1
+ resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*"
+ rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name
+ bucket_policy = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Deny",
+ "Principal": {"AWS": "{}".format(rolesessionarn)},
+ "Action": ["s3:GetObject","s3:PutObject"],
+ "Resource": [
+ "{}".format(resource1),
+ "{}".format(resource2)
+ ]
+ }]
+ })
+ s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
+ session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_body = 'this is a test file'
+
+ try:
+ s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
+ except ClientError as e:
+ s3putobj_error = e.response.get("Error", {}).get("Code")
+ eq(s3putobj_error, 'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_arn
+ )