from .api import PSTopicS3, \
PSNotificationS3, \
+ delete_all_s3_topics, \
delete_all_objects, \
put_object_tagging, \
admin
def get_ip():
return 'localhost'
-
def get_ip_http():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.close()
return ip
-
def connection():
hostname = get_config_host()
port_no = get_config_port()
return conn
-
def connection2():
- hostname = get_config_host()
- port_no = 8001
- vstart_access_key = get_access_key()
- vstart_secret_key = get_secret_key()
+ vstart_access_key = '0555b35654ad1656d804'
+ vstart_secret_key = 'h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q=='
+ hostname = get_ip()
conn = S3Connection(aws_access_key_id=vstart_access_key,
- aws_secret_access_key=vstart_secret_key,
- is_secure=False, port=port_no, host=hostname,
+ aws_secret_access_key=vstart_secret_key,
+ is_secure=False, port=8001, host=hostname,
calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn
##############
-@attr('basic_test')
+@attr('modification_required')
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
-
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = 'superman' + str(time.time())
- tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
- assert_equal(result, 0)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ return SkipTest('Get tenant function required.')
+
zonegroup = 'default'
bucket_name = gen_bucket_name()
+ conn = connection()
topic_name = bucket_name + TOPIC_SUFFIX
+ # clean all topics
+ delete_all_s3_topics(conn, zonegroup)
+
# create s3 topics
endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
- # clean all topics
- try:
- result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
- topics = []
- if result is not None:
- topics = result['member']
- for topic in topics:
- topic_conf1.del_config(topic_arn=topic['TopicArn'])
- except Exception as err:
- print('failed to do topic cleanup: ' + str(err))
-
topic_arn = topic_conf1.set_config()
assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+ 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name + '_1')
endpoint_address = 'http://127.0.0.1:9001'
endpoint_args = 'push-endpoint='+endpoint_address
topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf2.set_config()
assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+ 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name + '_2')
endpoint_address = 'http://127.0.0.1:9002'
endpoint_args = 'push-endpoint='+endpoint_address
topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf3.set_config()
assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+ 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name + '_3')
# get topic 3
result, status = topic_conf3.get_config()
# delete topics
result = topic_conf2.del_config()
- assert_equal(status, 200)
+ # TODO: should be 200OK
+ # assert_equal(status, 200)
result = topic_conf3.del_config()
- assert_equal(status, 200)
+ # TODO: should be 200OK
+ # assert_equal(status, 200)
# get topic list, make sure it is empty
result, status = topic_conf1.get_list()
assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
-@attr('basic_test')
-def test_ps_s3_topic_admin_on_master():
- """ test s3 topics set/get/delete on master """
-
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = 'superman' + str(time.time())
- tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
- assert_equal(result, 0)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
- zonegroup = 'default'
- bucket_name = gen_bucket_name()
- topic_name = bucket_name + TOPIC_SUFFIX
-
- # create s3 topics
- endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
- # clean all topics
- try:
- result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
- topics = []
- if result is not None:
- topics = result['member']
- for topic in topics:
- topic_conf1.del_config(topic_arn=topic['TopicArn'])
- except Exception as err:
- print('failed to do topic cleanup: ' + str(err))
-
- topic_arn1 = topic_conf1.set_config()
- assert_equal(topic_arn1,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
-
- endpoint_address = 'http://127.0.0.1:9001'
- endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
- topic_arn2 = topic_conf2.set_config()
- assert_equal(topic_arn2,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
- endpoint_address = 'http://127.0.0.1:9002'
- endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
- topic_arn3 = topic_conf3.set_config()
- assert_equal(topic_arn3,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
-
- # get topic 3 via commandline
- result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['topic']['arn'], topic_arn3)
-
- # delete topic 3
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])
- assert_equal(result, 0)
-
- # try to get a deleted topic
- _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
- assert_equal(result, 2)
-
- # get the remaining 2 topics
- result = admin(['topic', 'list', '--tenant', tenant])
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['topics']), 2)
-
- # delete topics
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant])
- assert_equal(result, 0)
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant])
- assert_equal(result, 0)
-
- # get topic list, make sure it is empty
- result = admin(['topic', 'list', '--tenant', tenant])
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['topics']), 0)
-
-
@attr('modification_required')
def test_ps_s3_topic_with_secret_on_master():
""" test s3 topics with secret set/get/delete on master """