]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
rgw/sts: GetCallerIndentity test 661/head
authorRaja Sharma <raja@ibm.com>
Thu, 26 Jun 2025 05:58:52 +0000 (11:28 +0530)
committerRaja Sharma <raja@ibm.com>
Mon, 14 Jul 2025 04:13:36 +0000 (09:43 +0530)
Signed-off-by: Raja Sharma <raja@ibm.com>
s3tests_boto3/functional/test_sts.py

index b13f56dd184147edbca93629d1891bec5fbdfcf2..06de13e32a6e44250253ea1bc08ccf6b525777be 100644 (file)
@@ -31,8 +31,11 @@ from email.header import decode_header
 from . import(
     configfile,
     setup_teardown,
+    get_iam_root_client,
     get_iam_client,
     get_sts_client,
+    get_iam_path_prefix,
+    make_iam_name,
     get_client,
     get_alt_user_id,
     get_config_endpoint,
@@ -40,6 +43,7 @@ from . import(
     get_parameter_name,
     get_main_aws_access_key,
     get_main_aws_secret_key,
+    get_iam_root_user_id,
     get_thumbprint,
     get_aud,
     get_token,
@@ -2069,3 +2073,94 @@ def test_assume_role_with_web_identity_role_resource_tag():
     oidc_remove=iam_client.delete_open_id_connect_provider(
     OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
     )
+
+def retry_on(code, tries, func, *args, **kwargs):
+  for i in range(tries):
+    try:
+      return func(*args, **kwargs)
+    except ClientError as e:
+      err = e.response['Error']['Code']
+      if i + 1 < tries and err in code:
+        print(f'Got {err}, retrying in {i}s..')
+        time.sleep(i)
+        continue
+      raise
+
+@pytest.mark.test_of_sts
+@pytest.mark.fails_on_dbstore
+def test_get_caller_identity_root():
+  root_user_id = get_iam_root_user_id()
+  user_arn = f'arn:aws:iam::{root_user_id}:root'
+  sts = get_iam_root_client(service_name='sts')
+  response = sts.get_caller_identity()
+  assert response['UserId'] == root_user_id
+  assert response['Account'] == root_user_id
+  assert response['Arn'] == user_arn
+
+@pytest.mark.test_of_sts
+@pytest.mark.fails_on_dbstore
+def test_get_caller_identity_non_root_user():
+  iam_root = get_iam_root_client()
+  path = get_iam_path_prefix()
+  user_name = make_iam_name('MyUser')
+  user = iam_root.create_user(UserName=user_name, Path=path)['User']
+  user_arn = user['Arn']
+  account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+  user_id = user['UserId']
+
+  key = iam_root.create_access_key(UserName=user_name)['AccessKey']
+  sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+                        aws_secret_access_key=key['SecretAccessKey'])
+
+  user_identity = sts.get_caller_identity()
+  iam_root.delete_access_key(UserName=user_name, AccessKeyId=key['AccessKeyId'])
+  iam_root.delete_user(UserName=user_name)
+  
+  assert user_identity['UserId'] == user_id
+  assert user_identity['Account'] == account_id
+  assert user_identity['Arn'] == user_arn
+
+@pytest.mark.test_of_sts
+@pytest.mark.fails_on_dbstore
+def test_get_caller_identity_after_assume_role():
+  iam_root = get_iam_root_client()
+  path = get_iam_path_prefix()
+  user_name = make_iam_name('MyUser')
+  role_name = make_iam_name('MyRole')
+  session_name = 'MySession'
+
+  user = iam_root.create_user(UserName=user_name, Path=path)['User']
+  user_arn = user['Arn']
+  account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+  key = iam_root.create_access_key(UserName=user_name)['AccessKey']
+  sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+                        aws_secret_access_key=key['SecretAccessKey'])
+  trust_policy = json.dumps({
+      'Version': '2012-10-17',
+      'Statement': [{
+          'Effect': 'Allow',
+          'Action': 'sts:AssumeRole',
+          'Principal': {'AWS': user_arn}
+          }]
+      })
+
+  role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+                  RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+  role_arn = role['Arn']
+
+  assume_role = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+                      RoleArn=role_arn, RoleSessionName=session_name)
+  creds = assume_role['Credentials']
+
+  assumed_sts = get_sts_client(aws_access_key_id = creds['AccessKeyId'],
+                        aws_secret_access_key = creds['SecretAccessKey'],
+                        aws_session_token = creds['SessionToken'])
+  response = assumed_sts.get_caller_identity()
+  iam_root.delete_role(RoleName=role_name)
+  iam_root.delete_access_key(UserName=user_name, AccessKeyId=key['AccessKeyId'])
+  iam_root.delete_user(UserName=user_name)
+
+  assert response['UserId'] == f"{role['RoleId']}:{session_name}"
+  assert response['Account'] == account_id
+  assert response['Arn'] == assume_role['AssumedRoleUser']['Arn']
+