]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
STS Tests Files and modification in __init__.py 355/head
authorroot <root@localhost.localdomain>
Thu, 30 Jul 2020 11:35:59 +0000 (17:05 +0530)
committerroot <root@localhost.localdomain>
Wed, 12 Aug 2020 08:00:22 +0000 (13:30 +0530)
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
README.rst
s3tests.conf.SAMPLE
s3tests_boto3/functional/__init__.py
s3tests_boto3/functional/test_sts.py [new file with mode: 0644]

index e9f74753d5b4e9377264d03cec9653272852d36a..96e0bbeb5546703152156f08f681d8475a17d0c4 100644 (file)
@@ -54,3 +54,17 @@ You can run only the boto3 tests with::
 
         S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'not fails_on_rgw' s3tests_boto3.functional
 
+========================
+ STS compatibility tests
+========================
+
+This section contains some basic tests for the AssumeRole and GetSessionToken API's. The test file is located under ``s3tests_boto3/functional``.
+
+You can run only the sts tests with::
+
+        S3TEST_CONF=your.conf ./virtualenv/bin/nosetests s3tests_boto3.functional.test_sts
+
+You can filter tests based on the attributes. There is a attribute named ``sts_test`` to run specifically the sts tests as below::
+
+        S3TEST_CONF=your.conf ./virtualenv/bin/nosetests -v -s -A 'sts_test' s3tests_boto3.functional.test_sts
+
index 54c642fca08f5946554f1ff6f5322f18102cfb4e..39c869415d8c5e87927f821e07f25ff0a1b850e1 100644 (file)
@@ -68,3 +68,17 @@ secret_key = opqrstuvwxyzabcdefghijklmnopqrstuvwxyzab
 
 # tenant email set in vstart.sh
 email = tenanteduser@example.com
+
+[iam]
+#used for iam operations in sts-tests
+#user_id from vstart.sh
+user_id = 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
+
+#access_key from vstart.sh
+access_key = ABCDEFGHIJKLMNOPQRST
+
+#secret_key vstart.sh
+secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
+
+#display_name from vstart.sh
+display_name = youruseridhere
index e6878054cf0778ce5bf9b42245f7b62ab055dca6..88831523e4fe50160d9449a223c75de5a301bab4 100644 (file)
@@ -158,6 +158,8 @@ def setup():
         raise RuntimeError('Your config file is missing the "s3 alt" section!')
     if not cfg.has_section("s3 tenant"):
         raise RuntimeError('Your config file is missing the "s3 tenant" section!')
+    if not cfg.has_section("iam"):
+        raise RuntimeError('Your config file is missing the "iam" section!')
 
     global prefix
 
@@ -205,6 +207,12 @@ def setup():
     config.tenant_user_id = cfg.get('s3 tenant',"user_id")
     config.tenant_email = cfg.get('s3 tenant',"email")
 
+    config.iam_access_key = cfg.get('iam',"access_key")
+    config.iam_secret_key = cfg.get('iam',"secret_key")
+    config.iam_display_name = cfg.get('iam',"display_name")
+    config.iam_user_id = cfg.get('iam',"user_id")
+    #config.iam_email = cfg.get('iam',"email")
+
     # vars from the fixtures section
     try:
         template = cfg.get('fixtures', "bucket prefix")
@@ -246,6 +254,32 @@ def get_v2_client():
                         config=Config(signature_version='s3'))
     return client
 
+def get_sts_client(client_config=None):
+    if client_config == None:
+        client_config = Config(signature_version='s3v4')
+
+    client = boto3.client(service_name='sts',
+                        aws_access_key_id=config.alt_access_key,
+                        aws_secret_access_key=config.alt_secret_key,
+                        endpoint_url=config.default_endpoint,
+                        region_name='',
+                        use_ssl=config.default_is_secure,
+                        config=client_config)
+    return client
+
+def get_iam_client(client_config=None):
+    if client_config == None:
+        client_config = Config(signature_version='s3v4')
+    
+    client = boto3.client(service_name='iam',
+                        aws_access_key_id=config.iam_access_key,
+                        aws_secret_access_key=config.iam_secret_key,
+                        endpoint_url=config.default_endpoint,
+                        region_name='',
+                        use_ssl=config.default_is_secure,
+                        config=client_config)
+    return client
+
 def get_alt_client(client_config=None):
     if client_config == None:
         client_config = Config(signature_version='s3v4')
@@ -359,6 +393,21 @@ def get_new_bucket(client=None, name=None):
     client.create_bucket(Bucket=name)
     return name
 
+def get_parameter_name():
+    parameter_name=""
+    rand = ''.join(
+        random.choice(string.ascii_lowercase + string.digits)
+        for c in range(255)
+        )
+    while rand:
+        parameter_name = '{random}'.format(random=rand)
+        if len(parameter_name) <= 10:
+            return parameter_name
+        rand = rand[:-1]
+    return parameter_name
+
+def get_sts_user_id():
+    return config.alt_user_id
 
 def get_config_is_secure():
     return config.default_is_secure
diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py
new file mode 100644 (file)
index 0000000..0c66121
--- /dev/null
@@ -0,0 +1,233 @@
+import boto3
+import botocore.session
+from botocore.exceptions import ClientError
+from botocore.exceptions import ParamValidationError
+from nose.tools import eq_ as eq
+from nose.plugins.attrib import attr
+from nose.plugins.skip import SkipTest
+import isodate
+import email.utils
+import datetime
+import threading
+import re
+import pytz
+from collections import OrderedDict
+import requests
+import json
+import base64
+import hmac
+import hashlib
+import xml.etree.ElementTree as ET
+import time
+import operator
+import nose
+import os
+import string
+import random
+import socket
+import ssl
+import logging
+from collections import namedtuple
+
+from email.header import decode_header
+
+from . import(
+    get_iam_client,
+    get_sts_client,
+    get_client,
+    get_alt_user_id,
+    get_config_endpoint,
+    get_new_bucket_name,
+    get_parameter_name,
+    get_main_aws_access_key,
+    get_main_aws_secret_key,
+    )
+
+def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary):
+    role_err=None
+    if rolename is None:
+        rolename=get_parameter_name()
+    try:
+       role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,)
+    except ClientError as e:
+       role_err = e.response['Code']
+    return (role_err,role_response,rolename)
+
+def put_role_policy(iam_client,rolename,policyname,role_policy):
+    role_err=None
+    if policyname is None:
+        policyname=get_parameter_name() 
+    try:
+        role_response = iam_client.put_role_policy(RoleName=rolename,PolicyName=policyname,PolicyDocument=role_policy)
+    except ClientError as e:
+       role_err = e.response['Code']
+    return (role_err,role_response)
+
+def put_user_policy(iam_client,username,policyname,policy_document):
+    role_err=None
+    if policyname is None:
+        policyname=get_parameter_name()
+    try:
+        role_response = iam_client.put_user_policy(UserName=username,PolicyName=policyname,PolicyDocument=policy_document)
+    except ClientError as e:
+        role_err = e.response['Code']
+    return (role_err,role_response)
+
+@attr(resource='get session token')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='s3 ops only accessible by temporary credentials')
+@attr('sts_test')
+def test_get_session_token():
+    iam_client=get_iam_client()
+    sts_client=get_sts_client()
+    sts_user_id=get_alt_user_id()
+    default_endpoint=get_config_endpoint()
+    user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
+    (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    response=sts_client.get_session_token(DurationSeconds=43200)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    s3_client=boto3.client('s3',
+                aws_access_key_id = response['Credentials']['AccessKeyId'],
+               aws_secret_access_key = response['Credentials']['SecretAccessKey'],
+                aws_session_token = response['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='',
+               )
+    bucket_name = get_new_bucket_name()
+    s3bucket = s3_client.create_bucket(Bucket=bucket_name)
+    eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+    finish=s3_client.delete_bucket(Bucket=bucket_name)
+
+@attr(resource='get session token')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='s3 ops denied by permanent credentials')
+@attr('sts_test')
+def test_get_session_token_permanent_creds_denied():
+    s3bucket_error=None
+    iam_client=get_iam_client()
+    sts_client=get_sts_client()
+    sts_user_id=get_alt_user_id()
+    default_endpoint=get_config_endpoint()
+    s3_main_access_key=get_main_aws_access_key()
+    s3_main_secret_key=get_main_aws_secret_key()
+    user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}"
+    (resp_err,resp)=put_user_policy(iam_client,sts_user_id,None,user_policy)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    response=sts_client.get_session_token(DurationSeconds=43200)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    s3_client=boto3.client('s3',
+                aws_access_key_id = s3_main_access_key,
+               aws_secret_access_key = s3_main_secret_key,
+                aws_session_token = response['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='',
+               )
+    bucket_name = get_new_bucket_name()
+    try:
+        s3bucket = s3_client.create_bucket(Bucket=bucket_name)
+    except ClientError as e:
+        s3bucket_error = e.response.get("Error", {}).get("Code")
+    eq(s3bucket_error,'AccessDenied')
+
+@attr(resource='assume role')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='role policy allows all s3 ops')
+@attr('sts_test')
+def test_assume_role_allow():
+    iam_client=get_iam_client()    
+    sts_client=get_sts_client()
+    sts_user_id=get_alt_user_id()
+    default_endpoint=get_config_endpoint()
+    role_session_name=get_parameter_name()
+    policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
+    (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+    eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+    role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
+    (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    s3_client = boto3.client('s3',
+               aws_access_key_id = resp['Credentials']['AccessKeyId'],
+               aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+               aws_session_token = resp['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='',
+               )
+    bucket_name = get_new_bucket_name()
+    s3bucket = s3_client.create_bucket(Bucket=bucket_name)
+    eq(s3bucket['ResponseMetadata']['HTTPStatusCode'],200)
+    bkt = s3_client.delete_bucket(Bucket=bucket_name)
+    eq(bkt['ResponseMetadata']['HTTPStatusCode'],204)
+
+@attr(resource='assume role')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='role policy denies all s3 ops')
+@attr('sts_test')
+def test_assume_role_deny():
+    s3bucket_error=None
+    iam_client=get_iam_client()    
+    sts_client=get_sts_client()
+    sts_user_id=get_alt_user_id()
+    default_endpoint=get_config_endpoint()
+    role_session_name=get_parameter_name()
+    policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
+    (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+    eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+    role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
+    (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    s3_client = boto3.client('s3',
+               aws_access_key_id = resp['Credentials']['AccessKeyId'],
+               aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+               aws_session_token = resp['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='',
+               )
+    bucket_name = get_new_bucket_name()
+    try:
+        s3bucket = s3_client.create_bucket(Bucket=bucket_name)
+    except ClientError as e:
+        s3bucket_error = e.response.get("Error", {}).get("Code")
+    eq(s3bucket_error,'AccessDenied')
+
+@attr(resource='assume role')
+@attr(method='get')
+@attr(operation='check')
+@attr(assertion='creds expire so all s3 ops fails')
+@attr('sts_test')
+def test_assume_role_creds_expiry():
+    iam_client=get_iam_client()    
+    sts_client=get_sts_client()
+    sts_user_id=get_alt_user_id()
+    default_endpoint=get_config_endpoint()
+    role_session_name=get_parameter_name()
+    policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
+    (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None)
+    eq(role_response['Role']['Arn'],'arn:aws:iam:::role/'+general_role_name+'')
+    role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}"
+    (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy)
+    eq(response['ResponseMetadata']['HTTPStatusCode'],200)
+    resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900)
+    eq(resp['ResponseMetadata']['HTTPStatusCode'],200)
+    time.sleep(900)
+    s3_client = boto3.client('s3',
+               aws_access_key_id = resp['Credentials']['AccessKeyId'],
+               aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
+               aws_session_token = resp['Credentials']['SessionToken'],
+               endpoint_url=default_endpoint,
+               region_name='',
+               )
+    bucket_name = get_new_bucket_name()
+    try:
+        s3bucket = s3_client.create_bucket(Bucket=bucket_name)
+    except ClientError as e:
+        s3bucket_error = e.response.get("Error", {}).get("Code")
+    eq(s3bucket_error,'AccessDenied')