import os
import io
import string
+# XXX this should be converted to use boto3
import boto
from botocore.exceptions import ClientError
from http import server as http_server
# XXX this should be converted to use pytest
from nose.plugins.attrib import attr
import boto3
+from boto.s3.connection import S3Connection
import datetime
from cloudevents.http import from_http
from dateutil import parser
-# XXX this should be converted to use boto3
-from boto.s3.connection import S3Connection
-
from . import(
get_config_host,
get_config_port,
+ get_config_zonegroup,
+ get_config_cluster,
get_access_key,
get_secret_key
)
NOTIFICATION_SUFFIX = "_notif"
UID_PREFIX = "superman"
-
num_buckets = 0
run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
secret_key = str(time.time())
uid = UID_PREFIX + str(time.time())
if tenant:
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
else:
- _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
assert_equal(result, 0)
conn = S3Connection(aws_access_key_id=access_key,
access_key = str(time.time())
secret_key = str(time.time())
- uid = 'superman' + str(time.time())
+ uid = UID_PREFIX + str(time.time())
tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
assert_equal(result, 0)
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
is_secure=False, port=get_config_port(), host=get_config_host(),
calling_format='boto.s3.connection.OrdinaryCallingFormat')
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
access_key = str(time.time())
secret_key = str(time.time())
- uid = 'superman' + str(time.time())
+ uid = UID_PREFIX + str(time.time())
tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
assert_equal(result, 0)
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
is_secure=False, port=get_config_port(), host=get_config_host(),
calling_format='boto.s3.connection.OrdinaryCallingFormat')
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
# get topic 3 via commandline
- result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
+ result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['arn'], topic_arn3)
matches = [tenant, UID_PREFIX]
assert_true( all([x in parsed_result['user'] for x in matches]))
# delete topic 3
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])
+ _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
assert_equal(result, 0)
# try to get a deleted topic
- _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
+ _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
print('"topic not found" error is expected')
assert_equal(result, 2)
# get the remaining 2 topics
- result = admin(['topic', 'list', '--tenant', tenant])
+ result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(len(parsed_result['topics']), 2)
# delete topics
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant])
+ _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant], get_config_cluster())
assert_equal(result, 0)
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant])
+ _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant], get_config_cluster())
assert_equal(result, 0)
# get topic list, make sure it is empty
- result = admin(['topic', 'list', '--tenant', tenant])
+ result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(len(parsed_result['topics']), 0)
def test_ps_s3_notification_configuration_admin_on_master():
""" test s3 notification list/get/delete on master """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
assert_equal(status/100, 2)
# list notification
- result = admin(['notification', 'list', '--bucket', bucket_name])
+ result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(len(parsed_result['notifications']), 3)
assert_equal(result[1], 0)
# get notification 1
- result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'])
+ result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Id'], notification_name+'_1')
assert_equal(result[1], 0)
# remove notification 3
- _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'])
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'], get_config_cluster())
assert_equal(result, 0)
# list notification
- result = admin(['notification', 'list', '--bucket', bucket_name])
+ result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(len(parsed_result['notifications']), 2)
assert_equal(result[1], 0)
# delete notifications
- _, result = admin(['notification', 'rm', '--bucket', bucket_name])
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
assert_equal(result, 0)
# list notification, make sure it is empty
- result = admin(['notification', 'list', '--bucket', bucket_name])
+ result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(len(parsed_result['notifications']), 0)
assert_equal(result[1], 0)
if conn.secure_conn is None:
return SkipTest('secure connection is needed to test topic with secrets')
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
def test_ps_s3_notification_on_master():
""" test s3 notification set/get/delete on master """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
# create bucket
bucket = conn.create_bucket(bucket_name)
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
conn = connection()
ps_zone = conn
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
def test_ps_s3_notification_errors_on_master():
""" test s3 notification set/get/delete on master """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
# create bucket
bucket = conn.create_bucket(bucket_name)
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
return SkipTest("only used in manual testing")
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
def test_ps_s3_notification_push_kafka_on_master():
""" test pushing kafka s3 notification on master """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
""" test deletion of multiple keys on master """
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
""" test pushing http s3 notification on master """
hostname = get_ip_http()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
""" test pushing cloudevents notification on master """
hostname = get_ip_http()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
""" test that opaque id set in topic, is sent in notification on master """
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
""" test that when object is deleted due to lifecycle policy, notification is sent on master """
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
)
# start lifecycle processing
- admin(['lc', 'process'])
+ admin(['lc', 'process'], get_config_cluster())
print('wait for 5sec for the messages...')
time.sleep(5)
""" test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
)
# start lifecycle processing
- admin(['lc', 'process'])
+ admin(['lc', 'process'], get_config_cluster())
print('wait for 20s (2 days) for the messages...')
time.sleep(20)
""" test object creation s3 notifications in using put/copy/post on master"""
if not external_endpoint_address:
- hostname = 'localhost'
+ hostname = get_ip()
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
proc = None
conn = connection()
- hostname = 'localhost'
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
import requests
hostname = get_ip()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
conn = connection()
endpoint = "http://%s:%d" % (get_config_host(), get_config_port())
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
hostname = get_ip()
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
""" test reservation cleanup after gateway crash """
return SkipTest("only used in manual testing")
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
def test_ps_s3_persistent_topic_stats():
""" test persistent topic stats """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
http_server.close()
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 0)
assert_equal(result[1], 0)
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
assert_equal(result[1], 0)
start_time = time.time()
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
assert_equal(result[1], 0)
time.sleep(delay)
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 0)
assert_equal(result[1], 0)
def ps_s3_persistent_topic_configs(persistency_time, config_dict):
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
time.sleep(delay)
http_server.close()
# topic get
- result = admin(['topic', 'get', '--topic', topic_name])
+ result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
parsed_result_dest = parsed_result["dest"]
for key, value in config_dict.items():
assert_equal(result[1], 0)
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 0)
assert_equal(result[1], 0)
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
assert_equal(result[1], 0)
# wait as much as ttl and check if the persistent topics have expired
time.sleep(persistency_time)
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 0)
assert_equal(result[1], 0)
start_time = time.time()
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
assert_equal(result[1], 0)
# wait as much as ttl and check if the persistent topics have expired
time.sleep(persistency_time)
- result = admin(['topic', 'stats', '--topic', topic_name])
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 0)
assert_equal(result[1], 0)
""" test pushing persistent notification pushback """
return SkipTest("only used in manual testing")
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
@attr('kafka_test')
def test_ps_s3_notification_kafka_idle_behaviour():
""" test pushing kafka s3 notification idle behaviour check """
- # TODO convert this test to actual running test by changing
- # os.system call to verify the process idleness
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
return SkipTest('This test requires two gateways.')
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
port = random.randint(10000, 20000)
return SkipTest('This test requires two gateways.')
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
port = random.randint(10000, 20000)
def test_ps_s3_persistent_multiple_endpoints():
""" test pushing persistent notification when one of the endpoints has error """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create random port for the http server
host = get_ip()
def persistent_notification(endpoint_type):
""" test pushing persistent notification """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
""" test pushing persistent notification of large notifications """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
bucket_name = gen_bucket_name()
topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create topics
# start amqp receiver in a separate thread
exchange = 'ex1'
return SkipTest('This test is yet to be modified.')
hostname = get_ip()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
conn = connection()
ps_zone = None
bucket_name = gen_bucket_name()
""" test s3 topic set/get/delete permissions """
conn1 = connection()
conn2 = another_user()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
topic_policy = json.dumps({
def kafka_security(security_type, mechanism='PLAIN'):
""" test pushing kafka s3 notification securly to master """
conn = connection()
- zonegroup = 'default'
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)