make_iam_name,
get_iam_path_prefix,
get_new_bucket,
+ get_new_bucket_name,
get_iam_s3client,
get_alt_iam_client,
get_alt_user_id,
except:
pass
+def nuke_user_policies(client, name):
+ p = client.get_paginator('list_user_policies')
+ for response in p.paginate(UserName=name):
+ for policy in response['PolicyNames']:
+ try:
+ client.delete_user_policy(UserName=name, PolicyName=policy)
+ except:
+ pass
+
def nuke_user(client, name):
# delete access keys, user policies, etc
try:
nuke_user_keys(client, name)
except:
pass
+ try:
+ nuke_user_policies(client, name)
+ except:
+ pass
client.delete_user(UserName=name)
def nuke_users(client, **kwargs):
retry_on('AccessDenied', 10, client.list_objects, Bucket=bucket)
finally:
roots3.delete_bucket(Bucket=bucket)
+
+
+# IAM UserPolicy apis
+@pytest.mark.user_policy
+@pytest.mark.iam_account
+def test_account_user_policy(iam_root):
+ path = get_iam_path_prefix()
+ name = make_iam_name('name')
+ policy_name = 'List'
+ bucket_name = get_new_bucket_name()
+ policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [
+ {'Effect': 'Deny',
+ 'Action': 's3:ListBucket',
+ 'Resource': f'arn:aws:s3:::{bucket_name}'}]})
+ policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [
+ {'Effect': 'Allow',
+ 'Action': 's3:ListBucket',
+ 'Resource': f'arn:aws:s3:::{bucket_name}'}]})
+
+ # Get/Put/Delete fail on nonexistent UserName
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_user_policy(UserName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_user_policy(UserName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1)
+
+ iam_root.create_user(UserName=name, Path=path)
+
+ # Get/Delete fail on nonexistent PolicyName
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_user_policy(UserName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_user_policy(UserName=name, PolicyName=policy_name)
+
+ iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy1)
+
+ response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name)
+ assert policy1 == json.dumps(response['PolicyDocument'])
+ response = iam_root.list_user_policies(UserName=name)
+ assert [policy_name] == response['PolicyNames']
+
+ iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy2)
+
+ response = iam_root.get_user_policy(UserName=name, PolicyName=policy_name)
+ assert policy2 == json.dumps(response['PolicyDocument'])
+ response = iam_root.list_user_policies(UserName=name)
+ assert [policy_name] == response['PolicyNames']
+
+ iam_root.delete_user_policy(UserName=name, PolicyName=policy_name)
+
+ # Get/Delete fail after Delete
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_user_policy(UserName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_user_policy(UserName=name, PolicyName=policy_name)
+
+ response = iam_root.list_user_policies(UserName=name)
+ assert [] == response['PolicyNames']
+
+@pytest.mark.user_policy
+@pytest.mark.iam_account
+def test_account_user_policy_allow(iam_root):
+ path = get_iam_path_prefix()
+ name = make_iam_name('name')
+ bucket_name = get_new_bucket_name()
+ iam_root.create_user(UserName=name, Path=path)
+
+ key = iam_root.create_access_key(UserName=name)['AccessKey']
+ client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ # the access key may take a bit to start working. retry until it returns
+ # something other than InvalidAccessKeyId
+ e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets)
+ # expect AccessDenied because no identity policy allows s3 actions
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a user policy that allows s3 actions
+ policy = json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:*',
+ 'Resource': '*'
+ }]
+ })
+ policy_name = 'AllowStar'
+ iam_root.put_user_policy(UserName=name, PolicyName=policy_name, PolicyDocument=policy)
+
+ # the policy may take a bit to start working. retry until it returns
+ # something other than AccessDenied
+ retry_on('AccessDenied', 10, client.list_buckets)