raise RuntimeError('Your config file is missing the "s3 alt" section!')
if not cfg.has_section("s3 tenant"):
raise RuntimeError('Your config file is missing the "s3 tenant" section!')
- if not cfg.has_section("iam"):
- raise RuntimeError('Your config file is missing the "iam" section!')
global prefix
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
config.tenant_email = cfg.get('s3 tenant',"email")
- config.iam_access_key = cfg.get('iam',"access_key")
- config.iam_secret_key = cfg.get('iam',"secret_key")
- config.iam_display_name = cfg.get('iam',"display_name")
- config.iam_user_id = cfg.get('iam',"user_id")
- #config.iam_email = cfg.get('iam',"email")
-
# vars from the fixtures section
try:
template = cfg.get('fixtures', "bucket prefix")
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
+def check_webidentity():
+ cfg = configparser.RawConfigParser()
+ try:
+ path = os.environ['S3TEST_CONF']
+ except KeyError:
+ raise RuntimeError(
+ 'To run tests, point environment '
+ + 'variable S3TEST_CONF to a config file.',
+ )
+ cfg.read(path)
+ if not cfg.has_section("webidentity"):
+ raise RuntimeError('Your config file is missing the "webidentity" section!')
+
+ config.webidentity_thumbprint = cfg.get('webidentity', "thumbprint")
+ config.webidentity_aud = cfg.get('webidentity', "aud")
+ config.webidentity_token = cfg.get('webidentity', "token")
+ config.webidentity_realm = cfg.get('webidentity', "KC_REALM")
+
def get_client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
return client
def get_iam_client(client_config=None):
+ cfg = configparser.RawConfigParser()
+ try:
+ path = os.environ['S3TEST_CONF']
+ except KeyError:
+ raise RuntimeError(
+ 'To run tests, point environment '
+ + 'variable S3TEST_CONF to a config file.',
+ )
+ cfg.read(path)
+ if not cfg.has_section("iam"):
+ raise RuntimeError('Your config file is missing the "iam" section!')
+
+ config.iam_access_key = cfg.get('iam',"access_key")
+ config.iam_secret_key = cfg.get('iam',"secret_key")
+ config.iam_display_name = cfg.get('iam',"display_name")
+ config.iam_user_id = cfg.get('iam',"user_id")
+ config.iam_email = cfg.get('iam',"email")
+
if client_config == None:
client_config = Config(signature_version='s3v4')
def get_tenant_email():
return config.tenant_email
+
+def get_thumbprint():
+ return config.webidentity_thumbprint
+
+def get_aud():
+ return config.webidentity_aud
+
+def get_token():
+ return config.webidentity_token
+
+def get_realm_name():
+ return config.webidentity_realm
get_parameter_name,
get_main_aws_access_key,
get_main_aws_secret_key,
+ get_thumbprint,
+ get_aud,
+ get_token,
+ get_realm_name,
+ check_webidentity
)
+log = logging.getLogger(__name__)
+
def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
role_err=None
if rolename is None:
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops only accessible by temporary credentials')
-@attr('sts_test')
+@attr('test_of_sts')
def test_get_session_token():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
+
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
response=sts_client.get_session_token()
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
s3_client=boto3.client('s3',
aws_access_key_id = response['Credentials']['AccessKeyId'],
aws_secret_access_key = response['Credentials']['SecretAccessKey'],
@attr(method='get')
@attr(operation='check')
@attr(assertion='s3 ops denied by permanent credentials')
-@attr('sts_test')
+@attr('test_of_sts')
def test_get_session_token_permanent_creds_denied():
s3bucket_error=None
iam_client=get_iam_client()
default_endpoint=get_config_endpoint()
s3_main_access_key=get_main_aws_access_key()
s3_main_secret_key=get_main_aws_secret_key()
+
user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
(resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
response=sts_client.get_session_token()
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
s3_client=boto3.client('s3',
aws_access_key_id = s3_main_access_key,
aws_secret_access_key = s3_main_secret_key,
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy allows all s3 ops')
-@attr('sts_test')
+@attr('test_of_sts')
def test_assume_role_allow():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
+
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
@attr(method='get')
@attr(operation='check')
@attr(assertion='role policy denies all s3 ops')
-@attr('sts_test')
+@attr('test_of_sts')
def test_assume_role_deny():
s3bucket_error=None
iam_client=get_iam_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
+
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
@attr(method='get')
@attr(operation='check')
@attr(assertion='creds expire so all s3 ops fails')
-@attr('sts_test')
+@attr('test_of_sts')
def test_assume_role_creds_expiry():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_session_name=get_parameter_name()
+
policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
(role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
(role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
time.sleep(900)
+
s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
except ClientError as e:
s3bucket_error = e.response.get("Error", {}).get("Code")
eq(s3bucket_error,'AccessDenied')
+
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='assuming role through web token')
+@attr('webidentity_test')
+def test_assume_role_with_web_identity():
+ check_webidentity()
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url='http://localhost:8080/auth/realms/{}'.format(realm),
+ ThumbprintList=[
+ thumbprint,
+ ],
+ )
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token)
+ eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+
+ s3_client = boto3.client('s3',
+ aws_access_key_id = resp['Credentials']['AccessKeyId'],
+ aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+ aws_session_token = resp['Credentials']['SessionToken'],
+ endpoint_url=default_endpoint,
+ region_name='',
+ )
+ bucket_name = get_new_bucket_name()
+ s3bucket = s3_client.create_bucket(Bucket=bucket_name)
+ eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+ bkt = s3_client.delete_bucket(Bucket=bucket_name)
+ eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+ )
+
+'''
+@attr(resource='assume role with web identity')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='assume_role_with_web_token creds expire')
+@attr('webidentity_test')
+def test_assume_role_with_web_identity_invalid_webtoken():
+ resp_error=None
+ iam_client=get_iam_client()
+ sts_client=get_sts_client()
+ default_endpoint=get_config_endpoint()
+ role_session_name=get_parameter_name()
+ thumbprint=get_thumbprint()
+ aud=get_aud()
+ token=get_token()
+ realm=get_realm_name()
+
+ oidc_response = iam_client.create_open_id_connect_provider(
+ Url='http://localhost:8080/auth/realms/{}'.format(realm),
+ ThumbprintList=[
+ thumbprint,
+ ],
+ )
+
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}"
+ (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+ eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+
+ role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
+ (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+ eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+
+ resp=""
+ try:
+ resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken='abcdef')
+ except InvalidIdentityTokenException as e:
+ log.debug('{}'.format(resp))
+ log.debug('{}'.format(e.response.get("Error", {}).get("Code")))
+ log.debug('{}'.format(e))
+ resp_error = e.response.get("Error", {}).get("Code")
+ eq(resp_error,'AccessDenied')
+
+ oidc_remove=iam_client.delete_open_id_connect_provider(
+ OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
+ )
+'''