]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
iam: move iam_root, iam_alt_root fixtures to iam.py
authorCasey Bodley <cbodley@redhat.com>
Tue, 12 Mar 2024 19:47:51 +0000 (15:47 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 12 Mar 2024 19:47:51 +0000 (15:47 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
s3tests_boto3/functional/iam.py [new file with mode: 0644]
s3tests_boto3/functional/test_iam.py

diff --git a/s3tests_boto3/functional/iam.py b/s3tests_boto3/functional/iam.py
new file mode 100644 (file)
index 0000000..a070e5d
--- /dev/null
@@ -0,0 +1,199 @@
+from botocore.exceptions import ClientError
+import pytest
+
+from . import (
+    configfile,
+    get_iam_root_client,
+    get_iam_root_user_id,
+    get_iam_root_email,
+    get_iam_alt_root_client,
+    get_iam_alt_root_user_id,
+    get_iam_alt_root_email,
+    get_iam_path_prefix,
+)
+
+def nuke_user_keys(client, name):
+    p = client.get_paginator('list_access_keys')
+    for response in p.paginate(UserName=name):
+        for key in response['AccessKeyMetadata']:
+            try:
+                client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId'])
+            except:
+                pass
+
+def nuke_user_policies(client, name):
+    p = client.get_paginator('list_user_policies')
+    for response in p.paginate(UserName=name):
+        for policy in response['PolicyNames']:
+            try:
+                client.delete_user_policy(UserName=name, PolicyName=policy)
+            except:
+                pass
+
+def nuke_attached_user_policies(client, name):
+    p = client.get_paginator('list_attached_user_policies')
+    for response in p.paginate(UserName=name):
+        for policy in response['AttachedPolicies']:
+            try:
+                client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn'])
+            except:
+                pass
+
+def nuke_user(client, name):
+    # delete access keys, user policies, etc
+    try:
+        nuke_user_keys(client, name)
+    except:
+        pass
+    try:
+        nuke_user_policies(client, name)
+    except:
+        pass
+    try:
+        nuke_attached_user_policies(client, name)
+    except:
+        pass
+    client.delete_user(UserName=name)
+
+def nuke_users(client, **kwargs):
+    p = client.get_paginator('list_users')
+    for response in p.paginate(**kwargs):
+        for user in response['Users']:
+            try:
+                nuke_user(client, user['UserName'])
+            except:
+                pass
+
+def nuke_group_policies(client, name):
+    p = client.get_paginator('list_group_policies')
+    for response in p.paginate(GroupName=name):
+        for policy in response['PolicyNames']:
+            try:
+                client.delete_group_policy(GroupName=name, PolicyName=policy)
+            except:
+                pass
+
+def nuke_attached_group_policies(client, name):
+    p = client.get_paginator('list_attached_group_policies')
+    for response in p.paginate(GroupName=name):
+        for policy in response['AttachedPolicies']:
+            try:
+                client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn'])
+            except:
+                pass
+
+def nuke_group_users(client, name):
+    p = client.get_paginator('get_group')
+    for response in p.paginate(GroupName=name):
+        for user in response['Users']:
+            try:
+                client.remove_user_from_group(GroupName=name, UserName=user['UserName'])
+            except:
+                pass
+
+def nuke_group(client, name):
+    # delete group policies and remove all users
+    try:
+        nuke_group_policies(client, name)
+    except:
+        pass
+    try:
+        nuke_attached_group_policies(client, name)
+    except:
+        pass
+    try:
+        nuke_group_users(client, name)
+    except:
+        pass
+    client.delete_group(GroupName=name)
+
+def nuke_groups(client, **kwargs):
+    p = client.get_paginator('list_groups')
+    for response in p.paginate(**kwargs):
+        for user in response['Groups']:
+            try:
+                nuke_group(client, user['GroupName'])
+            except:
+                pass
+
+def nuke_role_policies(client, name):
+    p = client.get_paginator('list_role_policies')
+    for response in p.paginate(RoleName=name):
+        for policy in response['PolicyNames']:
+            try:
+                client.delete_role_policy(RoleName=name, PolicyName=policy)
+            except:
+                pass
+
+def nuke_attached_role_policies(client, name):
+    p = client.get_paginator('list_attached_role_policies')
+    for response in p.paginate(RoleName=name):
+        for policy in response['AttachedPolicies']:
+            try:
+                client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn'])
+            except:
+                pass
+
+def nuke_role(client, name):
+    # delete role policies, etc
+    try:
+        nuke_role_policies(client, name)
+    except:
+        pass
+    try:
+        nuke_attached_role_policies(client, name)
+    except:
+        pass
+    client.delete_role(RoleName=name)
+
+def nuke_roles(client, **kwargs):
+    p = client.get_paginator('list_roles')
+    for response in p.paginate(**kwargs):
+        for role in response['Roles']:
+            try:
+                nuke_role(client, role['RoleName'])
+            except:
+                pass
+
+def nuke_oidc_providers(client, prefix):
+    result = client.list_open_id_connect_providers()
+    for provider in result['OpenIDConnectProviderList']:
+        arn = provider['Arn']
+        if f':oidc-provider{prefix}' in arn:
+            try:
+                client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn)
+            except:
+                pass
+
+
+# fixture for iam account root user
+@pytest.fixture
+def iam_root(configfile):
+    client = get_iam_root_client()
+    try:
+        arn = client.get_user()['User']['Arn']
+        if not arn.endswith(':root'):
+            pytest.skip('[iam root] user does not have :root arn')
+    except ClientError as e:
+        pytest.skip('[iam root] user does not belong to an account')
+
+    yield client
+    nuke_users(client, PathPrefix=get_iam_path_prefix())
+    nuke_groups(client, PathPrefix=get_iam_path_prefix())
+    nuke_roles(client, PathPrefix=get_iam_path_prefix())
+    nuke_oidc_providers(client, get_iam_path_prefix())
+
+# fixture for iam alt account root user
+@pytest.fixture
+def iam_alt_root(configfile):
+    client = get_iam_alt_root_client()
+    try:
+        arn = client.get_user()['User']['Arn']
+        if not arn.endswith(':root'):
+            pytest.skip('[iam alt root] user does not have :root arn')
+    except ClientError as e:
+        pytest.skip('[iam alt root] user does not belong to an account')
+
+    yield client
+    nuke_users(client, PathPrefix=get_iam_path_prefix())
+    nuke_roles(client, PathPrefix=get_iam_path_prefix())
index 101cfb8fdd4d638d824227923d01feaad73f9f67..fb288cea2496a4e90deafc7d4a3ab66b15f8eb1a 100644 (file)
@@ -13,8 +13,6 @@ from . import (
     get_alt_client,
     get_iam_client,
     get_iam_root_client,
-    get_iam_root_user_id,
-    get_iam_root_email,
     get_iam_alt_root_client,
     get_iam_alt_root_user_id,
     get_iam_alt_root_email,
@@ -28,6 +26,7 @@ from . import (
     get_sts_client,
 )
 from .utils import _get_status, _get_status_and_error_code
+from .iam import iam_root, iam_alt_root
 
 
 @pytest.mark.user_policy
@@ -876,178 +875,6 @@ def test_verify_allow_iam_actions():
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
 
 
-def nuke_user_keys(client, name):
-    p = client.get_paginator('list_access_keys')
-    for response in p.paginate(UserName=name):
-        for key in response['AccessKeyMetadata']:
-            try:
-                client.delete_access_key(UserName=name, AccessKeyId=key['AccessKeyId'])
-            except:
-                pass
-
-def nuke_user_policies(client, name):
-    p = client.get_paginator('list_user_policies')
-    for response in p.paginate(UserName=name):
-        for policy in response['PolicyNames']:
-            try:
-                client.delete_user_policy(UserName=name, PolicyName=policy)
-            except:
-                pass
-
-def nuke_attached_user_policies(client, name):
-    p = client.get_paginator('list_attached_user_policies')
-    for response in p.paginate(UserName=name):
-        for policy in response['AttachedPolicies']:
-            try:
-                client.detach_user_policy(UserName=name, PolicyArn=policy['PolicyArn'])
-            except:
-                pass
-
-def nuke_user(client, name):
-    # delete access keys, user policies, etc
-    try:
-        nuke_user_keys(client, name)
-    except:
-        pass
-    try:
-        nuke_user_policies(client, name)
-    except:
-        pass
-    try:
-        nuke_attached_user_policies(client, name)
-    except:
-        pass
-    client.delete_user(UserName=name)
-
-def nuke_users(client, **kwargs):
-    p = client.get_paginator('list_users')
-    for response in p.paginate(**kwargs):
-        for user in response['Users']:
-            try:
-                nuke_user(client, user['UserName'])
-            except:
-                pass
-
-def nuke_group_policies(client, name):
-    p = client.get_paginator('list_group_policies')
-    for response in p.paginate(GroupName=name):
-        for policy in response['PolicyNames']:
-            try:
-                client.delete_group_policy(GroupName=name, PolicyName=policy)
-            except:
-                pass
-
-def nuke_attached_group_policies(client, name):
-    p = client.get_paginator('list_attached_group_policies')
-    for response in p.paginate(GroupName=name):
-        for policy in response['AttachedPolicies']:
-            try:
-                client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn'])
-            except:
-                pass
-
-def nuke_group_users(client, name):
-    p = client.get_paginator('get_group')
-    for response in p.paginate(GroupName=name):
-        for user in response['Users']:
-            try:
-                client.remove_user_from_group(GroupName=name, UserName=user['UserName'])
-            except:
-                pass
-
-def nuke_group(client, name):
-    # delete group policies and remove all users
-    try:
-        nuke_group_policies(client, name)
-    except:
-        pass
-    try:
-        nuke_attached_group_policies(client, name)
-    except:
-        pass
-    try:
-        nuke_group_users(client, name)
-    except:
-        pass
-    client.delete_group(GroupName=name)
-
-def nuke_groups(client, **kwargs):
-    p = client.get_paginator('list_groups')
-    for response in p.paginate(**kwargs):
-        for user in response['Groups']:
-            try:
-                nuke_group(client, user['GroupName'])
-            except:
-                pass
-
-def nuke_role_policies(client, name):
-    p = client.get_paginator('list_role_policies')
-    for response in p.paginate(RoleName=name):
-        for policy in response['PolicyNames']:
-            try:
-                client.delete_role_policy(RoleName=name, PolicyName=policy)
-            except:
-                pass
-
-def nuke_attached_role_policies(client, name):
-    p = client.get_paginator('list_attached_role_policies')
-    for response in p.paginate(RoleName=name):
-        for policy in response['AttachedPolicies']:
-            try:
-                client.detach_role_policy(RoleName=name, PolicyArn=policy['PolicyArn'])
-            except:
-                pass
-
-def nuke_role(client, name):
-    # delete role policies, etc
-    try:
-        nuke_role_policies(client, name)
-    except:
-        pass
-    try:
-        nuke_attached_role_policies(client, name)
-    except:
-        pass
-    client.delete_role(RoleName=name)
-
-def nuke_roles(client, **kwargs):
-    p = client.get_paginator('list_roles')
-    for response in p.paginate(**kwargs):
-        for role in response['Roles']:
-            try:
-                nuke_role(client, role['RoleName'])
-            except:
-                pass
-
-def nuke_oidc_providers(client, prefix):
-    result = client.list_open_id_connect_providers()
-    for provider in result['OpenIDConnectProviderList']:
-        arn = provider['Arn']
-        if f':oidc-provider{prefix}' in arn:
-            try:
-                client.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn)
-            except:
-                pass
-
-
-# fixture for iam account root user
-@pytest.fixture
-def iam_root(configfile):
-    client = get_iam_root_client()
-    try:
-        arn = client.get_user()['User']['Arn']
-        if not arn.endswith(':root'):
-            pytest.skip('[iam root] user does not have :root arn')
-    except ClientError as e:
-        pytest.skip('[iam root] user does not belong to an account')
-
-    yield client
-    nuke_users(client, PathPrefix=get_iam_path_prefix())
-    nuke_groups(client, PathPrefix=get_iam_path_prefix())
-    nuke_roles(client, PathPrefix=get_iam_path_prefix())
-    nuke_oidc_providers(client, get_iam_path_prefix())
-
-
 # IAM User apis
 @pytest.mark.iam_account
 @pytest.mark.iam_user
@@ -2618,22 +2445,6 @@ def test_account_oidc_provider(iam_root):
         iam_root.delete_open_id_connect_provider(OpenIDConnectProviderArn=arn)
 
 
-# fixture for iam alt account root user
-@pytest.fixture
-def iam_alt_root(configfile):
-    client = get_iam_alt_root_client()
-    try:
-        arn = client.get_user()['User']['Arn']
-        if not arn.endswith(':root'):
-            pytest.skip('[iam alt root] user does not have :root arn')
-    except ClientError as e:
-        pytest.skip('[iam alt root] user does not belong to an account')
-
-    yield client
-    nuke_users(client, PathPrefix=get_iam_path_prefix())
-    nuke_roles(client, PathPrefix=get_iam_path_prefix())
-
-
 # test cross-account access, adding user policy before the bucket policy
 def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn):
     # add a user policy that allows s3 actions