Webidentity Test addition to test_sts.py 360/head
authorroot <root@localhost.localdomain>
Thu, 24 Sep 2020 10:04:09 +0000 (15:34 +0530)
committerroot <root@localhost.localdomain>
Fri, 30 Oct 2020 18:45:42 +0000 (00:15 +0530)
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
Few main changes/additions:
1. Webidentity test addition to test_sts.py.
2. A function named check_webidentity() added to __init__.py in order to check for section presence.
3. Few lines shifted from setup() to get_iam_client() to make them execute only when sts-tests run.
4. Documentation update (for sts section)
5. Changes in s3tests.conf.SAMPLE regarding sts sections

README.rst
s3tests.conf.SAMPLE
s3tests_boto3/functional/__init__.py
s3tests_boto3/functional/test_sts.py

index 96e0bbeb5546703152156f08f681d8475a17d0c4..c36a8a7203afa9c2ea284cc66f19011ab565cd9e 100644 (file)
@@ -58,13 +58,16 @@ You can run only the boto3 tests with::
  STS compatibility tests
 ========================
 
-This section contains some basic tests for the AssumeRole and GetSessionToken API's. The test file is located under ``s3tests_boto3/functional``.
+This section contains some basic tests for the AssumeRole, GetSessionToken and AssumeRoleWithWebIdentity API's. The test file is located under ``s3tests_boto3/functional``.
 
-You can run only the sts tests with::
+You can run only the sts tests (all the three API's) with::
 
         S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts
 
-You can filter tests based on the attributes. There is a attribute named ``sts_test`` to run specifically the sts tests as below::
+You can filter tests based on the attributes. There is a attribute named ``test_of_sts`` to run AssumeRole and GetSessionToken tests and ``webidentity_test`` to run the AssumeRoleWithWebIdentity tests. If you want to execute only ``test_of_sts`` tests you can apply that filter as below::
 
-        S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'sts_test' s3tests_boto3.functional.test_sts
+        S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'test_of_sts' s3tests_boto3.functional.test_sts
 
+For running ``webidentity_test`` you'll need have Keycloak running.
+
+In order to run any STS test you'll need to add "iam" section to the config file. For further reference on how your config file should look check ``s3tests.conf.SAMPLE``.
index 39c869415d8c5e87927f821e07f25ff0a1b850e1..828ea902217a6286b5ec2ff1c783ecda05fe5b78 100644 (file)
@@ -69,6 +69,7 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
 # tenant email set in vstart.sh
 email = tenanteduser@example.com
 
+#following section needs to be added for all sts-tests
 [iam]
 #used for iam operations in sts-tests
 #user_id from vstart.sh
@@ -82,3 +83,15 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
 
 #display_name from vstart.sh
 display_name = youruseridhere
+
+#following section needs to be added when you want to run Assume Role With Webidentity test
+[webidentity]
+#used for assume role with web identity test in sts-tests
+#all parameters will be obtained from ceph/qa/tasks/keycloak.py
+token=<access_token>
+
+aud=<obtained after introspecting token>
+
+thumbprint=<obtained from x509 certificate>
+
+KC_REALM=<name of the realm>
index 88831523e4fe50160d9449a223c75de5a301bab4..92bc3d511c3ab1cb42529cf880d5007788a0929c 100644 (file)
@@ -158,8 +158,6 @@ def setup():
         raise RuntimeError('Your config file is missing the "s3 alt" section!')
     if not cfg.has_section("s3 tenant"):
         raise RuntimeError('Your config file is missing the "s3 tenant" section!')
-    if not cfg.has_section("iam"):
-        raise RuntimeError('Your config file is missing the "iam" section!')
 
     global prefix
 
@@ -207,12 +205,6 @@ def setup():
     config.tenant_user_id = cfg.get('s3 tenant',"user_id")
     config.tenant_email = cfg.get('s3 tenant',"email")
 
-    config.iam_access_key = cfg.get('iam',"access_key")
-    config.iam_secret_key = cfg.get('iam',"secret_key")
-    config.iam_display_name = cfg.get('iam',"display_name")
-    config.iam_user_id = cfg.get('iam',"user_id")
-    #config.iam_email = cfg.get('iam',"email")
-
     # vars from the fixtures section
     try:
         template = cfg.get('fixtures', "bucket prefix")
@@ -233,6 +225,24 @@ def teardown():
     nuke_prefixed_buckets(prefix=prefix, client=alt_client)
     nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
 
+def check_webidentity():
+    cfg = configparser.RawConfigParser()
+    try:
+        path = os.environ['S3TEST_CONF']
+    except KeyError:
+        raise RuntimeError(
+            'To run tests, point environment '
+            + 'variable S3TEST_CONF to a config file.',
+            )
+    cfg.read(path)
+    if not cfg.has_section("webidentity"):
+        raise RuntimeError('Your config file is missing the "webidentity" section!')
+
+    config.webidentity_thumbprint = cfg.get('webidentity', "thumbprint")
+    config.webidentity_aud = cfg.get('webidentity', "aud")
+    config.webidentity_token = cfg.get('webidentity', "token")
+    config.webidentity_realm = cfg.get('webidentity', "KC_REALM")
+
 def get_client(client_config=None):
     if client_config == None:
         client_config = Config(signature_version='s3v4')
@@ -268,6 +278,24 @@ def get_sts_client(client_config=None):
     return client
 
 def get_iam_client(client_config=None):
+    cfg = configparser.RawConfigParser()
+    try:
+        path = os.environ['S3TEST_CONF']
+    except KeyError:
+        raise RuntimeError(
+            'To run tests, point environment '
+            + 'variable S3TEST_CONF to a config file.',
+            )
+    cfg.read(path)
+    if not cfg.has_section("iam"):
+        raise RuntimeError('Your config file is missing the "iam" section!')
+
+    config.iam_access_key = cfg.get('iam',"access_key")
+    config.iam_secret_key = cfg.get('iam',"secret_key")
+    config.iam_display_name = cfg.get('iam',"display_name")
+    config.iam_user_id = cfg.get('iam',"user_id")
+    config.iam_email = cfg.get('iam',"email")    
+
     if client_config == None:
         client_config = Config(signature_version='s3v4')
     
@@ -474,3 +502,15 @@ def get_tenant_user_id():
 
 def get_tenant_email():
     return config.tenant_email
+
+def get_thumbprint():
+    return config.webidentity_thumbprint
+
+def get_aud():
+    return config.webidentity_aud
+
+def get_token():
+    return config.webidentity_token
+
+def get_realm_name():
+    return config.webidentity_realm
index 4799f144e833f7ef74b174dd977989a36f36c0d3..fb4ab11cea1f2df276c4d6fd660056e068eff33a 100644 (file)
@@ -41,8 +41,15 @@ from . import(
     get_parameter_name,
     get_main_aws_access_key,
     get_main_aws_secret_key,
+    get_thumbprint,
+    get_aud,
+    get_token,
+    get_realm_name,
+    check_webidentity
     )
 
+log = logging.getLogger(__name__)
+
 def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
     role_err=None
     if rolename is None:
@@ -77,17 +84,20 @@ def put_user_policy(iam_client,username,policyname,policy_document):
 @attr(method='get')
 @attr(operation='check')
 @attr(assertion='s3 ops only accessible by temporary credentials')
-@attr('sts_test')
+@attr('test_of_sts')
 def test_get_session_token():
     iam_client=get_iam_client()
     sts_client=get_sts_client()
     sts_user_id=get_alt_user_id()
     default_endpoint=get_config_endpoint()
+    
     user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
     (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
     eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    
     response=sts_client.get_session_token()
     eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    
     s3_client=boto3.client('s3',
                 aws_access_key_id = response['Credentials']['AccessKeyId'],
                aws_secret_access_key = response['Credentials']['SecretAccessKey'],
@@ -104,7 +114,7 @@ def test_get_session_token():
 @attr(method='get')
 @attr(operation='check')
 @attr(assertion='s3 ops denied by permanent credentials')
-@attr('sts_test')
+@attr('test_of_sts')
 def test_get_session_token_permanent_creds_denied():
     s3bucket_error=None
     iam_client=get_iam_client()
@@ -113,11 +123,14 @@ def test_get_session_token_permanent_creds_denied():
     default_endpoint=get_config_endpoint()
     s3_main_access_key=get_main_aws_access_key()
     s3_main_secret_key=get_main_aws_secret_key()
+    
     user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
     (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
     eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    
     response=sts_client.get_session_token()
     eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    
     s3_client=boto3.client('s3',
                 aws_access_key_id = s3_main_access_key,
                aws_secret_access_key = s3_main_secret_key,
@@ -136,21 +149,25 @@ def test_get_session_token_permanent_creds_denied():
 @attr(method='get')
 @attr(operation='check')
 @attr(assertion='role policy allows all s3 ops')
-@attr('sts_test')
+@attr('test_of_sts')
 def test_assume_role_allow():
     iam_client=get_iam_client()    
     sts_client=get_sts_client()
     sts_user_id=get_alt_user_id()
     default_endpoint=get_config_endpoint()
     role_session_name=get_parameter_name()
+    
     policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
     (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
     eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+    
     role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
     (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
     eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    
     resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
     eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    
     s3_client = boto3.client('s3',
                aws_access_key_id = resp['Credentials']['AccessKeyId'],
                aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
@@ -168,7 +185,7 @@ def test_assume_role_allow():
 @attr(method='get')
 @attr(operation='check')
 @attr(assertion='role policy denies all s3 ops')
-@attr('sts_test')
+@attr('test_of_sts')
 def test_assume_role_deny():
     s3bucket_error=None
     iam_client=get_iam_client()    
@@ -176,14 +193,18 @@ def test_assume_role_deny():
     sts_user_id=get_alt_user_id()
     default_endpoint=get_config_endpoint()
     role_session_name=get_parameter_name()
+    
     policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
     (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
     eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+    
     role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
     (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
     eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    
     resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
     eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    
     s3_client = boto3.client('s3',
                aws_access_key_id = resp['Credentials']['AccessKeyId'],
                aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
@@ -202,22 +223,26 @@ def test_assume_role_deny():
 @attr(method='get')
 @attr(operation='check')
 @attr(assertion='creds expire so all s3 ops fails')
-@attr('sts_test')
+@attr('test_of_sts')
 def test_assume_role_creds_expiry():
     iam_client=get_iam_client()    
     sts_client=get_sts_client()
     sts_user_id=get_alt_user_id()
     default_endpoint=get_config_endpoint()
     role_session_name=get_parameter_name()
+    
     policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
     (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
     eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+    
     role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
     (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
     eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    
     resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
     eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
     time.sleep(900)
+    
     s3_client = boto3.client('s3',
                aws_access_key_id = resp['Credentials']['AccessKeyId'],
                aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
@@ -231,3 +256,101 @@ def test_assume_role_creds_expiry():
     except ClientError as e:
         s3bucket_error = e.response.get("Error", {}).get("Code")
     eq(s3bucket_error,'AccessDenied')
+
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='assuming role through web token')
+@attr('webidentity_test')
+def test_assume_role_with_web_identity():
+    check_webidentity()
+    iam_client=get_iam_client()    
+    sts_client=get_sts_client()
+    default_endpoint=get_config_endpoint()
+    role_session_name=get_parameter_name()
+    thumbprint=get_thumbprint()
+    aud=get_aud()
+    token=get_token()
+    realm=get_realm_name()
+    
+    oidc_response = iam_client.create_open_id_connect_provider(
+    Url='http://localhost:8080/auth/realms/{}'.format(realm),
+    ThumbprintList=[
+        thumbprint,
+    ],
+    )
+    
+    policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+    (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+    eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+    
+    role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
+    (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    
+    resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    
+    s3_client = boto3.client('s3',
+               aws_access_key_id = resp['Credentials']['AccessKeyId'],
+               aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+               aws_session_token = resp['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='',
+               )
+    bucket_name = get_new_bucket_name()
+    s3bucket = s3_client.create_bucket(Bucket=bucket_name)
+    eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+    bkt = s3_client.delete_bucket(Bucket=bucket_name)
+    eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
+    
+    oidc_remove=iam_client.delete_open_id_connect_provider(
+    OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+    )
+
+'''
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='assume_role_with_web_token creds expire')
+@attr('webidentity_test')
+def test_assume_role_with_web_identity_invalid_webtoken():
+    resp_error=None
+    iam_client=get_iam_client()
+    sts_client=get_sts_client()
+    default_endpoint=get_config_endpoint()
+    role_session_name=get_parameter_name()
+    thumbprint=get_thumbprint()
+    aud=get_aud()
+    token=get_token()
+    realm=get_realm_name()
+
+    oidc_response = iam_client.create_open_id_connect_provider(
+    Url='http://localhost:8080/auth/realms/{}'.format(realm),
+    ThumbprintList=[
+        thumbprint,
+    ],
+    )
+
+    policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+    (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+    eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+    role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
+    (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+    resp=""
+    try:
+        resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken='abcdef')
+    except InvalidIdentityTokenException as e:
+        log.debug('{}'.format(resp))
+        log.debug('{}'.format(e.response.get("Error", {}).get("Code")))
+        log.debug('{}'.format(e))
+        resp_error = e.response.get("Error", {}).get("Code")
+    eq(resp_error,'AccessDenied')
+
+    oidc_remove=iam_client.delete_open_id_connect_provider(
+    OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+    )
+'''