self.lock.release()
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
- """verify stored s3 records agains a list of keys"""
+ """verify stored records agains a list of keys"""
self.acquire_lock()
log.info('verify_s3_events: http server has %d events', len(self.events))
try:
# TODO create a base class for the AMQP and HTTP cases
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
- """verify stored s3 records agains a list of keys"""
+ """verify stored records agains a list of keys"""
verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags)
self.events = []
self.stop = False
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
- """verify stored s3 records agains a list of keys"""
+ """verify stored records agains a list of keys"""
verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags)
self.events = []
@attr('basic_test')
-def test_ps_s3_topic_on_master():
- """ test s3 topics set/get/delete on master """
+def test_topic():
+ """ test topics set/get/delete """
tenant = 'kaboom'
conn = connect_random_user(tenant)
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topics
+ # create topics
endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
@attr('basic_test')
-def test_ps_s3_topic_admin_on_master():
- """ test s3 topics set/get/delete on master """
+def test_topic_admin():
+ """ test topics set/get/delete """
tenant = 'kaboom'
conn = connect_random_user(tenant)
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topics
+ # create topics
endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
# make sure there are no leftover topics
delete_all_topics(conn, '', get_config_cluster())
- # create s3 topics
+ # create topics
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
assert_equal(topic_arn,
'arn:aws:sns:' + zonegroup + '::' + topic_name)
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [
{
notification_configuration(True)
-@attr('modification_required')
-def test_ps_s3_topic_with_secret_on_master():
- """ test s3 topics with secret set/get/delete on master """
- return SkipTest('secure connection is needed to test topic with secrets')
-
- conn = connection1()
- if conn.secure_conn is None:
- return SkipTest('secure connection is needed to test topic with secrets')
-
- zonegroup = get_config_zonegroup()
- bucket_name = gen_bucket_name()
- topic_name = bucket_name + TOPIC_SUFFIX
-
- # clean all topics
- delete_all_s3_topics(conn, zonegroup)
-
- # create s3 topics
- endpoint_address = 'amqp://user:password@127.0.0.1:7001'
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- bad_topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
- try:
- result = bad_topic_conf.set_config()
- except Exception as err:
- print('Error is expected: ' + str(err))
- else:
- assert False, 'user password configuration set allowed only over HTTPS'
- topic_conf = PSTopicS3(conn.secure_conn, topic_name, zonegroup, endpoint_args=endpoint_args)
- topic_arn = topic_conf.set_config()
-
- assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name)
-
- _, status = bad_topic_conf.get_config()
- assert_equal(status/100, 4)
-
- # get topic
- result, status = topic_conf.get_config()
- assert_equal(status, 200)
- assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
- assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
-
- _, status = bad_topic_conf.get_config()
- assert_equal(status/100, 4)
-
- _, status = topic_conf.get_list()
- assert_equal(status/100, 2)
-
- # delete topics
- result = topic_conf.del_config()
+@attr('not_implemented')
+def test_topic_with_secret():
+ """ test topics with secret set/get/delete """
+ return SkipTest('This test is yet to be implemented')
@attr('basic_test')
def test_notification_configuration():
- """ test s3 notification set/get/deleter """
+ """ test notification set/get/deleter """
notification_configuration(False)
@attr('basic_test')
-def test_ps_s3_notification_on_master_empty_config():
- """ test s3 notification set/get/delete on master with empty config """
+def test_notification_empty_config():
+ """ test notification set/get/delete with empty config """
hostname = get_ip()
conn = connection()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1',
'TopicArn': topic_arn,
assert_equal(status/100, 2)
assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
- # create s3 notification again with empty configuration to check if it deletes or not
+ # create notification again with empty configuration to check if it deletes or not
topic_conf_list = []
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
@attr('amqp_test')
-def test_ps_s3_notification_filter_on_master():
- """ test s3 notification filter on master """
+def test_notification_filter_amqp():
+ """ test notification filter """
hostname = get_ip()
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- # create s3 topic
+ # create topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1',
'TopicArn': topic_arn,
@attr('basic_test')
-def test_ps_s3_notification_errors_on_master():
- """ test s3 notification set/get/delete on master """
+def test_notification_errors():
+ """ test notification set/get/delete """
conn = connection()
zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
# create bucket
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification with invalid event name
+ # create notification with invalid event name
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
else:
assert False, 'invalid event name is expected to fail'
- # create s3 notification with missing name
+ # create notification with missing name
topic_conf_list = [{'Id': '',
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Put']
else:
assert False, 'missing notification name is expected to fail'
- # create s3 notification with invalid topic ARN
+ # create notification with invalid topic ARN
invalid_topic_arn = 'kaboom'
topic_conf_list = [{'Id': notification_name,
'TopicArn': invalid_topic_arn,
else:
assert False, 'invalid ARN is expected to fail'
- # create s3 notification with unknown topic ARN
+ # create notification with unknown topic ARN
invalid_topic_arn = 'arn:aws:sns:a::kaboom'
topic_conf_list = [{'Id': notification_name,
'TopicArn': invalid_topic_arn ,
else:
assert False, 'unknown topic is expected to fail'
- # create s3 notification with wrong bucket
+ # create notification with wrong bucket
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Put']
conn.delete_bucket(bucket_name)
-def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None):
+def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None):
""" test pushinging notification """
zonegroup = get_config_zonegroup()
# create bucket
endpoint_args = 'push-endpoint='+endpoint_address
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
# with acks from broker
exchange = 'ex1'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- # create two s3 topic
+ # create two topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
if kafka_brokers is not None:
endpoint_args += '&kafka-brokers=' + kafka_brokers
- # create s3 topic
+ # create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
@attr('amqp_test')
-def test_notification_push_amqp():
+def test_notification_amqp():
""" test pushing amqp notification """
- return SkipTest("Running into an issue with amqp when we make exact_match=true")
conn = connection()
- notification_push('amqp', conn)
+ notification('amqp', conn)
@attr('manual_test')
-def test_ps_s3_notification_push_amqp_idleness_check():
- """ test pushing amqp s3 notification and checking for connection idleness """
- return SkipTest("only used in manual testing")
+def test_notification_amqp_idleness_check():
+ """ test pushing amqp notification and checking for connection idleness """
hostname = get_ip()
conn = connection()
zonegroup = get_config_zonegroup()
task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
task1.start()
- # create two s3 topic
+ # create two topic
endpoint_address = 'amqp://' + hostname
# with acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
'Events': []
@attr('kafka_test')
-def test_notification_push_kafka():
- """ test pushing kafka s3 notification on master """
+def test_notification_kafka():
+ """ test pushing kafka notification """
conn = connection()
- notification_push('kafka', conn)
+ notification('kafka', conn)
@attr('kafka_failover')
-def test_notification_push_kafka_multiple_brokers_override():
- """ test pushing kafka s3 notification on master """
+def test_notification_kafka_multiple_brokers_override():
+ """ test pushing kafka notification """
conn = connection()
- notification_push('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server))
+ notification('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server))
@attr('kafka_failover')
-def test_notification_push_kafka_multiple_brokers_append():
- """ test pushing kafka s3 notification on master """
+def test_notification_kafka_multiple_brokers_append():
+ """ test pushing kafka notification """
conn = connection()
- notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
+ notification('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
@attr('manual_test')
@attr('http_test')
-def test_ps_s3_notification_multi_delete_on_master():
- """ test deletion of multiple keys on master """
+def test_notification_multi_delete():
+ """ test deletion of multiple keys """
hostname = get_ip()
conn = connection()
zonegroup = get_config_zonegroup()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
@attr('http_test')
-def test_notification_push_http():
- """ test pushing http s3 notification """
+def test_notification_http():
+ """ test pushing http notification """
conn = connection()
- notification_push('http', conn)
+ notification('http', conn)
@attr('http_test')
-def test_notification_push_cloudevents():
+def test_notification_cloudevents():
""" test pushing cloudevents notification """
conn = connection()
- notification_push('http', conn, cloudevents=True)
+ notification('http', conn, cloudevents=True)
@attr('http_test')
-def test_ps_s3_opaque_data_on_master():
- """ test that opaque id set in topic, is sent in notification on master """
+def test_opaque_data():
+ """ test that opaque id set in topic, is sent in notification """
hostname = get_ip()
conn = connection()
zonegroup = get_config_zonegroup()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
opaque_data = 'http://1.2.3.4:8888'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
- # create s3 topic
+ # create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
['ObjectLifecycle:Expiration:AbortMultipartUpload'], True)
-def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
- """ test object creation s3 notifications in using put/copy/post on master"""
+def creation_triggers(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
+ """ test object creation notifications in using put/copy/post"""
if not external_endpoint_address:
hostname = get_ip()
task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
task.start()
- # create s3 topic
+ # create topic
if external_endpoint_address:
endpoint_address = external_endpoint_address
elif ca_location:
endpoint_args += '&ca-location={}'.format(ca_location)
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:CompleteMultipartUpload']
@attr('amqp_test')
-def test_ps_s3_creation_triggers_on_master():
- ps_s3_creation_triggers_on_master(external_endpoint_address="amqp://localhost:5672")
+def test_creation_triggers_amqp():
+ creation_triggers(external_endpoint_address="amqp://localhost:5672")
@attr('amqp_ssl_test')
-def test_ps_s3_creation_triggers_on_master_external():
+def test_creation_triggers_external():
from distutils.util import strtobool
except Exception as e:
verify_ssl = 'true'
- ps_s3_creation_triggers_on_master(
+ creation_triggers(
external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
verify_ssl=verify_ssl)
else:
@attr('amqp_ssl_test')
-def test_ps_s3_creation_triggers_on_master_ssl():
+def test_creation_triggers_ssl():
import textwrap
from tempfile import TemporaryDirectory
'''))
os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
- ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
+ creation_triggers(ca_location=CACERTFILE)
del os.environ['RABBITMQ_CONFIG_FILE']
@attr('amqp_test')
-def test_http_post_object_upload():
+def test_post_object_upload_amqp():
""" test that uploads object using HTTP POST """
import boto3
task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
task1.start()
- # create s3 topics
+ # create topics
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
- # create s3 notifications
+ # create notifications
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
'Events': ['s3:ObjectCreated:Post']
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
- # create s3 topic
+ # create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
- # create s3 topic
+ # create topic
zonegroup = get_config_zonegroup()
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
meta_key = 'meta1'
meta_value = 'This is my metadata value'
@attr('amqp_test')
-def test_ps_s3_metadata_on_master():
- """ test s3 notification of metadata on master """
+def test_metadata_amqp():
+ """ test notification of metadata """
hostname = get_ip()
conn = connection()
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- # create s3 topic
+ # create topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
meta_key = 'meta1'
meta_value = 'This is my metadata value'
@attr('amqp_test')
-def test_ps_s3_tags_on_master():
- """ test s3 notification of tags on master """
+def test_tags_amqp():
+ """ test notification of tags """
hostname = get_ip()
conn = connection()
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- # create s3 topic
+ # create topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
conn.delete_bucket(bucket_name)
@attr('amqp_test')
-def test_ps_s3_versioning_on_master():
- """ test s3 notification of object versions """
+def test_versioning_amqp():
+ """ test notification of object versions """
hostname = get_ip()
conn = connection()
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- # create s3 topic
+ # create topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
@attr('amqp_test')
-def test_ps_s3_versioned_deletion_on_master():
- """ test s3 notification of deletion markers on master """
+def test_versioned_deletion_amqp():
+ """ test notification of deletion markers """
hostname = get_ip()
conn = connection()
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- # create s3 topic
+ # create topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:*']
@attr('manual_test')
-def test_ps_s3_persistent_cleanup():
+def test_persistent_cleanup():
""" test reservation cleanup after gateway crash """
- return SkipTest("only used in manual testing")
conn = connection()
zonegroup = get_config_zonegroup()
bucket = gw.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
topic_conf = PSTopicS3(gw, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Put']
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
- # create s3 topic
+ # create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
task.start()
verify_kafka_receiver(receiver)
- # create s3 topic
+ # create topic
endpoint_address = 'kafka://WrongHost' # wrong port
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
receiver.close(task)
-def ps_s3_persistent_topic_configs(persistency_time, config_dict):
+def persistent_topic_configs(persistency_time, config_dict):
# create connection with no retries
conn = connection(no_retries=True)
zonegroup = get_config_zonegroup()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true&'+create_persistency_config_string(config_dict)
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
return str_ret[:-1]
@attr('basic_test')
-def test_ps_s3_persistent_topic_configs_ttl():
+def test_persistent_topic_configs_ttl():
""" test persistent topic configurations with time_to_live """
config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"}
persistency_time = config_dict["time_to_live"]
- ps_s3_persistent_topic_configs(persistency_time, config_dict)
+ persistent_topic_configs(persistency_time, config_dict)
@attr('basic_test')
-def test_ps_s3_persistent_topic_configs_max_retries():
+def test_persistent_topic_configs_max_retries():
""" test persistent topic configurations with max_retries and retry_sleep_duration """
config_dict = {"time_to_live": "None", "max_retries": 10, "retry_sleep_duration": 1}
persistency_time = config_dict["max_retries"]*config_dict["retry_sleep_duration"]
- ps_s3_persistent_topic_configs(persistency_time, config_dict)
+ persistent_topic_configs(persistency_time, config_dict)
@attr('manual_test')
-def test_ps_s3_persistent_notification_pushback():
+def test_persistent_notificationback():
""" test pushing persistent notification pushback """
- return SkipTest("only used in manual testing")
conn = connection()
zonegroup = get_config_zonegroup()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
@attr('kafka_test')
-def test_ps_s3_notification_kafka_idle_behaviour():
- """ test pushing kafka s3 notification idle behaviour check """
+def test_notification_kafka_idle_behaviour():
+ """ test pushing kafka notification idle behaviour check """
conn = connection()
zonegroup = get_config_zonegroup()
task.start()
verify_kafka_receiver(receiver)
- # create s3 topic
+ # create topic
endpoint_address = 'kafka://' + default_kafka_server
# with acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
'Events': []
stop_kafka_receiver(receiver, task)
-@attr('modification_required')
-def test_ps_s3_persistent_gateways_recovery():
+@attr('not_implemented')
+def test_persistent_gateways_recovery():
""" test gateway recovery of persistent notifications """
- return SkipTest('This test requires two gateways.')
-
- conn = connection()
- zonegroup = get_config_zonegroup()
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
- gw1 = conn
- gw2 = connection2()
- # create bucket
- bucket_name = gen_bucket_name()
- bucket = gw1.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
- # create two s3 topics
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
- topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw1')
- topic_arn1 = topic_conf1.set_config()
- topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw2')
- topic_arn2 = topic_conf2.set_config()
- # create two s3 notifications
- notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
- topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
- 'Events': ['s3:ObjectCreated:Put']
- }]
- s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list)
- response, status = s3_notification_conf1.set_config()
- assert_equal(status/100, 2)
- notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
- topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
- 'Events': ['s3:ObjectRemoved:Delete']
- }]
- s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list)
- response, status = s3_notification_conf2.set_config()
- assert_equal(status/100, 2)
- # stop gateway 2
- print('stopping gateway2...')
- client_threads = []
- start_time = time.time()
- for i in range(number_of_objects):
- key = bucket.new_key(str(i))
- content = str(os.urandom(1024*1024))
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
- keys = list(bucket.list())
- # delete objects from the bucket
- client_threads = []
- start_time = time.time()
- for key in bucket.list():
- thr = threading.Thread(target = key.delete, args=())
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
- print('wait for 60 sec for before restarting the gateway')
- time.sleep(60)
- # check http receiver
- events = http_server.get_and_reset_events()
- for key in keys:
- creations = 0
- deletions = 0
- for event in events:
- if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
- key.name == event['Records'][0]['s3']['object']['key']:
- creations += 1
- elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
- key.name == event['Records'][0]['s3']['object']['key']:
- deletions += 1
- assert_equal(creations, 1)
- assert_equal(deletions, 1)
- # cleanup
- s3_notification_conf1.del_config()
- topic_conf1.del_config()
- gw1.delete_bucket(bucket_name)
- time.sleep(10)
- s3_notification_conf2.del_config()
- topic_conf2.del_config()
- http_server.close()
+ return SkipTest('This test is yet to be implemented')
-@attr('modification_required')
-def test_ps_s3_persistent_multiple_gateways():
+@attr('not_implemented')
+def test_persistent_multiple_gateways():
""" test pushing persistent notification via two gateways """
- return SkipTest('This test requires two gateways.')
-
- conn = connection()
- zonegroup = get_config_zonegroup()
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
- gw1 = conn
- gw2 = connection2()
- # create bucket
- bucket_name = gen_bucket_name()
- bucket1 = gw1.create_bucket(bucket_name)
- bucket2 = gw2.get_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
- # create two s3 topics
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
- topic1_opaque = 'fromgw1'
- topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic1_opaque)
- topic_arn1 = topic_conf1.set_config()
- topic2_opaque = 'fromgw2'
- topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic2_opaque)
- topic_arn2 = topic_conf2.set_config()
- # create two s3 notifications
- notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
- topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
- 'Events': []
- }]
- s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list)
- response, status = s3_notification_conf1.set_config()
- assert_equal(status/100, 2)
- notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
- topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
- 'Events': []
- }]
- s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list)
- response, status = s3_notification_conf2.set_config()
- assert_equal(status/100, 2)
- client_threads = []
- start_time = time.time()
- for i in range(number_of_objects):
- key = bucket1.new_key('gw1_'+str(i))
- content = str(os.urandom(1024*1024))
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- key = bucket2.new_key('gw2_'+str(i))
- content = str(os.urandom(1024*1024))
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
- keys = list(bucket1.list())
- delay = 30
- print('wait for '+str(delay)+'sec for the messages...')
- time.sleep(delay)
- events = http_server.get_and_reset_events()
- for key in keys:
- topic1_count = 0
- topic2_count = 0
- for event in events:
- if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
- key.name == event['Records'][0]['s3']['object']['key'] and \
- topic1_opaque == event['Records'][0]['opaqueData']:
- topic1_count += 1
- elif event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
- key.name == event['Records'][0]['s3']['object']['key'] and \
- topic2_opaque == event['Records'][0]['opaqueData']:
- topic2_count += 1
- assert_equal(topic1_count, 1)
- assert_equal(topic2_count, 1)
- # delete objects from the bucket
- client_threads = []
- start_time = time.time()
- for key in bucket1.list():
- thr = threading.Thread(target = key.delete, args=())
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
- print('wait for '+str(delay)+'sec for the messages...')
- time.sleep(delay)
- events = http_server.get_and_reset_events()
- for key in keys:
- topic1_count = 0
- topic2_count = 0
- for event in events:
- if event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
- key.name == event['Records'][0]['s3']['object']['key'] and \
- topic1_opaque == event['Records'][0]['opaqueData']:
- topic1_count += 1
- elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
- key.name == event['Records'][0]['s3']['object']['key'] and \
- topic2_opaque == event['Records'][0]['opaqueData']:
- topic2_count += 1
- assert_equal(topic1_count, 1)
- assert_equal(topic2_count, 1)
- # cleanup
- s3_notification_conf1.del_config()
- topic_conf1.del_config()
- s3_notification_conf2.del_config()
- topic_conf2.del_config()
- gw1.delete_bucket(bucket_name)
- http_server.close()
+ return SkipTest('This test is yet to be implemented')
def persistent_topic_multiple_endpoints(conn, endpoint_type):
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
- # create two s3 topics
+ # create two topics
topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
endpoint_address = 'http://kaboom:9999'
topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
- # create two s3 notifications
+ # create two notifications
notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
'Events': []
return SkipTest('Unknown endpoint type: ' + endpoint_type)
- # create s3 topic
+ # create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
@attr('http_test')
-def test_ps_s3_persistent_notification_http():
+def test_persistent_notification_http():
""" test pushing persistent notification http """
conn = connection()
persistent_notification('http', conn)
@attr('http_test')
-def test_ps_s3_persistent_notification_http_account():
+def test_persistent_notification_http_account():
""" test pushing persistent notification via http for account user """
account = 'RGW77777777777777777'
admin(['account', 'rm', '--account-id', account], get_config_cluster())
@attr('amqp_test')
-def test_ps_s3_persistent_notification_amqp():
+def test_persistent_notification_amqp():
""" test pushing persistent notification amqp """
conn = connection()
persistent_notification('amqp', conn)
@attr('kafka_test')
-def test_ps_s3_persistent_notification_kafka():
+def test_persistent_notification_kafka():
""" test pushing persistent notification kafka """
conn = connection()
persistent_notification('kafka', conn)
@attr('amqp_test')
-def test_ps_s3_persistent_notification_large():
+def test_persistent_notification_large_amqp():
""" test pushing persistent notification of large notifications """
conn = connection()
# amqp broker guarantee ordering
exact_match = True
- # create s3 topic
+ # create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
stop_amqp_receiver(receiver, task)
-@attr('modification_required')
-def test_ps_s3_topic_update():
+@attr('not_implemented')
+def test_topic_update():
""" test updating topic associated with a notification"""
- return SkipTest('This test is yet to be modified.')
-
- conn = connection()
- ps_zone = None
- bucket_name = gen_bucket_name()
- topic_name = bucket_name+TOPIC_SUFFIX
- # create amqp topic
- hostname = get_ip()
- exchange = 'ex1'
- amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
- amqp_task.start()
- #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
-
- topic_arn = topic_conf.set_config()
- #result, status = topic_conf.set_config()
- #assert_equal(status/100, 2)
- parsed_result = json.loads(result)
- topic_arn = parsed_result['arn']
- # get topic
- result, _ = topic_conf.get_config()
- # verify topic content
- parsed_result = json.loads(result)
- assert_equal(parsed_result['topic']['name'], topic_name)
- assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
- # create http server
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- http_server = StreamingHTTPServer(hostname, port)
- # create bucket on the first of the rados zones
- bucket = conn.create_bucket(bucket_name)
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:*']
- }]
- s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
- _, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
- # create objects in the bucket
- number_of_objects = 10
- for i in range(number_of_objects):
- key = bucket.new_key(str(i))
- key.set_contents_from_string('bar')
- # wait for sync
- #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
- keys = list(bucket.list())
- # TODO: use exact match
- receiver.verify_s3_events(keys, exact_match=True)
- # update the same topic with new endpoint
- #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='http://'+ hostname + ':' + str(port))
- topic_conf = PSTopicS3(conn, topic_name, endpoint_args='http://'+ hostname + ':' + str(port))
- _, status = topic_conf.set_config()
- assert_equal(status/100, 2)
- # get topic
- result, _ = topic_conf.get_config()
- # verify topic content
- parsed_result = json.loads(result)
- assert_equal(parsed_result['topic']['name'], topic_name)
- assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
- # delete current objects and create new objects in the bucket
- for key in bucket.list():
- key.delete()
- for i in range(number_of_objects):
- key = bucket.new_key(str(i+100))
- key.set_contents_from_string('bar')
- # wait for sync
- #zone_meta_checkpoint(ps_zone.zone)
- #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
- keys = list(bucket.list())
- # verify that notifications are still sent to amqp
- # TODO: use exact match
- receiver.verify_s3_events(keys, exact_match=False)
- # update notification to update the endpoint from the topic
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:*']
- }]
- s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
- _, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
- # delete current objects and create new objects in the bucket
- for key in bucket.list():
- key.delete()
- for i in range(number_of_objects):
- key = bucket.new_key(str(i+200))
- key.set_contents_from_string('bar')
- # wait for sync
- #zone_meta_checkpoint(ps_zone.zone)
- #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
- keys = list(bucket.list())
- # check that updates switched to http
- # TODO: use exact match
- http_server.verify_s3_events(keys, exact_match=False)
- # cleanup
- # delete objects from the bucket
- stop_amqp_receiver(receiver, amqp_task)
- for key in bucket.list():
- key.delete()
- s3_notification_conf.del_config()
- topic_conf.del_config()
- conn.delete_bucket(bucket_name)
- http_server.close()
+ return SkipTest('This test is yet to be implemented')
-@attr('modification_required')
-def test_ps_s3_notification_update():
+@attr('not_implemented')
+def test_notification_update():
""" test updating the topic of a notification"""
- return SkipTest('This test is yet to be modified.')
-
- hostname = get_ip()
- conn = connection()
- ps_zone = None
- bucket_name = gen_bucket_name()
- topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
- topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
- zonegroup = get_config_zonegroup()
- # create topics
- # start amqp receiver in a separate thread
- exchange = 'ex1'
- amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
- amqp_task.start()
- # create random port for the http server
- http_port = random.randint(10000, 20000)
- # start an http server in a separate thread
- http_server = StreamingHTTPServer(hostname, http_port)
- #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
- topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
- result, status = topic_conf1.set_config()
- parsed_result = json.loads(result)
- topic_arn1 = parsed_result['arn']
- assert_equal(status/100, 2)
- #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port))
- topic_conf2 = PSTopicS3(conn, topic_name2, endpoint_args='http://'+hostname+':'+str(http_port))
- result, status = topic_conf2.set_config()
- parsed_result = json.loads(result)
- topic_arn2 = parsed_result['arn']
- assert_equal(status/100, 2)
- # create bucket on the first of the rados zones
- bucket = conn.create_bucket(bucket_name)
- # wait for sync
- #zone_meta_checkpoint(ps_zone.zone)
- # create s3 notification with topic1
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn1,
- 'Events': ['s3:ObjectCreated:*']
- }]
- s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
- _, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
- # create objects in the bucket
- number_of_objects = 10
- for i in range(number_of_objects):
- key = bucket.new_key(str(i))
- key.set_contents_from_string('bar')
- # wait for sync
- #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
- keys = list(bucket.list())
- # TODO: use exact match
- receiver.verify_s3_events(keys, exact_match=False);
- # update notification to use topic2
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn2,
- 'Events': ['s3:ObjectCreated:*']
- }]
- s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
- _, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
- # delete current objects and create new objects in the bucket
- for key in bucket.list():
- key.delete()
- for i in range(number_of_objects):
- key = bucket.new_key(str(i+100))
- key.set_contents_from_string('bar')
- # wait for sync
- #zone_meta_checkpoint(ps_zone.zone)
- #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
- keys = list(bucket.list())
- # check that updates switched to http
- # TODO: use exact match
- http_server.verify_s3_events(keys, exact_match=False)
- # cleanup
- # delete objects from the bucket
- stop_amqp_receiver(receiver, amqp_task)
- for key in bucket.list():
- key.delete()
- s3_notification_conf.del_config()
- topic_conf1.del_config()
- topic_conf2.del_config()
- conn.delete_bucket(bucket_name)
- http_server.close()
+ return SkipTest('This test is yet to be implemented')
-@attr('modification_required')
-def test_ps_s3_multiple_topics_notification():
+@attr('not_implemented')
+def test_multiple_topics_notification():
""" test notification creation with multiple topics"""
- return SkipTest('This test is yet to be modified.')
-
- hostname = get_ip()
- zonegroup = get_config_zonegroup()
- conn = connection()
- ps_zone = None
- bucket_name = gen_bucket_name()
- topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
- topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
- # create topics
- # start amqp receiver in a separate thread
- exchange = 'ex1'
- amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
- amqp_task.start()
- # create random port for the http server
- http_port = random.randint(10000, 20000)
- # start an http server in a separate thread
- http_server = StreamingHTTPServer(hostname, http_port)
- #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
- topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
- result, status = topic_conf1.set_config()
- parsed_result = json.loads(result)
- topic_arn1 = parsed_result['arn']
- assert_equal(status/100, 2)
- #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port))
- topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args='http://'+hostname+':'+str(http_port))
- result, status = topic_conf2.set_config()
- parsed_result = json.loads(result)
- topic_arn2 = parsed_result['arn']
- assert_equal(status/100, 2)
- # create bucket on the first of the rados zones
- bucket = conn.create_bucket(bucket_name)
- # wait for sync
- #zone_meta_checkpoint(ps_zone.zone)
- # create s3 notification
- notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
- notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
- topic_conf_list = [
- {
- 'Id': notification_name1,
- 'TopicArn': topic_arn1,
- 'Events': ['s3:ObjectCreated:*']
- },
- {
- 'Id': notification_name2,
- 'TopicArn': topic_arn2,
- 'Events': ['s3:ObjectCreated:*']
- }]
- s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
- _, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
- result, _ = s3_notification_conf.get_config()
- assert_equal(len(result['TopicConfigurations']), 2)
- assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
- assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
- # get auto-generated subscriptions
- sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
- topic_name1)
- _, status = sub_conf1.get_config()
- assert_equal(status/100, 2)
- sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
- topic_name2)
- _, status = sub_conf2.get_config()
- assert_equal(status/100, 2)
- # create objects in the bucket
- number_of_objects = 10
- for i in range(number_of_objects):
- key = bucket.new_key(str(i))
- key.set_contents_from_string('bar')
- # wait for sync
- #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
- # get the events from both of the subscription
- result, _ = sub_conf1.get_events()
- records = json.loads(result)
- for record in records['Records']:
- log.debug(record)
- keys = list(bucket.list())
- # TODO: use exact match
- verify_s3_records_by_elements(records, keys, exact_match=False)
- receiver.verify_s3_events(keys, exact_match=False)
- result, _ = sub_conf2.get_events()
- parsed_result = json.loads(result)
- for record in parsed_result['Records']:
- log.debug(record)
- keys = list(bucket.list())
- # TODO: use exact match
- verify_s3_records_by_elements(records, keys, exact_match=False)
- http_server.verify_s3_events(keys, exact_match=False)
- # cleanup
- stop_amqp_receiver(receiver, amqp_task)
- s3_notification_conf.del_config()
- topic_conf1.del_config()
- topic_conf2.del_config()
- # delete objects from the bucket
- for key in bucket.list():
- key.delete()
- conn.delete_bucket(bucket_name)
- http_server.close()
+ return SkipTest('This test is yet to be implemented')
@attr('basic_test')
-def test_ps_s3_list_topics_migration():
+def test_list_topics_migration():
""" test list topics on migration"""
if get_config_cluster() == 'noname':
return SkipTest('realm is needed for migration test')
delete_all_topics(conn1, '', conf_cluster)
delete_all_topics(conn2, tenant, conf_cluster)
- # Create s3 - v1 topics
+ # Create - v1 topics
topic_conf = PSTopicS3(conn1, topic_versions['topic3_v1'], zonegroup, endpoint_args=endpoint_args)
topic_arn3 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_versions['topic4_v1'], zonegroup, endpoint_args=endpoint_args)
# Start v2 notification
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
- # Create s3 - v2 topics
+ # Create - v2 topics
topic_conf = PSTopicS3(conn1, topic_versions['topic1_v2'], zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_versions['topic2_v2'], zonegroup, endpoint_args=endpoint_args)
@attr('basic_test')
-def test_ps_s3_list_topics():
+def test_list_topics():
""" test list topics"""
# Initialize connections, topic names and configurations
delete_all_topics(conn1, '', get_config_cluster())
delete_all_topics(conn2, tenant, get_config_cluster())
- # Create s3 - v2 topics
+ # Create - v2 topics
topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
tenant_topic_conf.del_config(tenant_topic_arn2)
@attr('basic_test')
-def test_ps_s3_list_topics_v1():
+def test_list_topics_v1():
""" test list topics on v1"""
if get_config_cluster() == 'noname':
return SkipTest('realm is needed')
# Make sure that we disable v2
zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
- # Create s3 - v1 topics
+ # Create - v1 topics
topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
tenant_topic_conf.del_config(tenant_topic_arn2)
-def ps_s3_topic_permissions(another_tenant=""):
- """ test s3 topic set/get/delete permissions """
+def topic_permissions(another_tenant=""):
+ """ test topic set/get/delete permissions """
conn1 = connection()
conn2, arn2 = another_user(tenant=another_tenant)
zonegroup = get_config_zonegroup()
}
]
})
- # create s3 topic with DENY policy
+ # create topic with DENY policy
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
@attr('basic_test')
-def test_ps_s3_topic_permissions_same_tenant():
- ps_s3_topic_permissions()
+def test_topic_permissions_same_tenant():
+ topic_permissions()
@attr('basic_test')
-def test_ps_s3_topic_permissions_cross_tenant():
- ps_s3_topic_permissions(another_tenant="boom")
+def test_topic_permissions_cross_tenant():
+ topic_permissions(another_tenant="boom")
@attr('basic_test')
-def test_ps_s3_topic_no_permissions():
- """ test s3 topic set/get/delete permissions """
+def test_topic_no_permissions():
+ """ test topic set/get/delete permissions """
conn1 = connection()
conn2, _ = another_user()
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic without policy
+ # create topic without policy
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
- """ test pushing kafka s3 notification securly to master """
+ """ test pushing kafka notification securly to master """
conn = connection()
zonegroup = get_config_zonegroup()
# create bucket
bucket = conn.create_bucket(bucket_name)
# name is constant for manual testing
topic_name = bucket_name+'_topic'
- # create s3 topic
+ # create topic
if security_type == 'SASL_SSL':
if not use_topic_attrs_for_creds:
endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9094'
verify_kafka_receiver(receiver)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
@attr('kafka_security_test')
-def test_ps_s3_notification_push_kafka_security_ssl():
+def test_notification_kafka_security_ssl():
kafka_security('SSL')
@attr('kafka_security_test')
-def test_ps_s3_notification_push_kafka_security_ssl_sasl():
+def test_notification_kafka_security_ssl_sasl():
kafka_security('SASL_SSL')
@attr('kafka_security_test')
-def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs():
+def test_notification_kafka_security_ssl_sasl_attrs():
kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
@attr('kafka_security_test')
-def test_ps_s3_notification_push_kafka_security_sasl():
+def test_notification_kafka_security_sasl():
kafka_security('SASL_PLAINTEXT')
@attr('kafka_security_test')
-def test_ps_s3_notification_push_kafka_security_ssl_sasl_scram():
+def test_notification_kafka_security_ssl_sasl_scram():
kafka_security('SASL_SSL', mechanism='SCRAM-SHA-256')
@attr('kafka_security_test')
-def test_ps_s3_notification_push_kafka_security_sasl_scram():
+def test_notification_kafka_security_sasl_scram():
kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
@attr('http_test')
-def test_persistent_ps_s3_reload():
+def test_persistent_reload():
""" do a realm reload while we send notifications """
if get_config_cluster() == 'noname':
return SkipTest('realm is needed for reload test')
bucket = conn.create_bucket(bucket_name)
topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
- # create s3 topics
+ # create topics
endpoint_address = 'http://'+host+':'+str(http_port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
'&retry_sleep_duration=1'
topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
'Events': []
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
@attr('http_test')
-def test_ps_s3_data_path_v2_migration():
+def test_data_path_v2_migration():
""" test data path v2 migration """
if get_config_cluster() == 'noname':
return SkipTest('realm is needed for migration test')
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'http://'+host+':'+str(http_port)
endpoint_args = 'push-endpoint='+endpoint_address
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
@attr('basic_test')
-def test_ps_s3_data_path_v2_large_migration():
+def test_data_path_v2_large_migration():
""" test data path v2 large migration """
if get_config_cluster() == 'noname':
return SkipTest('realm is needed for migration test')
host = get_ip()
http_port = random.randint(10000, 20000)
- # create s3 topic
+ # create topic
buckets_list = []
topics_conf_list = []
s3_notification_conf_list = []
bucket = conn.create_bucket(bucket_name)
buckets_list.append(bucket)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
+ # create topic
endpoint_address = 'http://' + host + ':' + str(http_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topics_conf_list.append(topic_conf)
topic_arn = topic_conf.set_config()
- # create s3 110 notifications
+ # create 110 notifications
s3_notification_list = []
for i in range(num_of_s3_notifications):
notification_name = bucket_name + NOTIFICATION_SUFFIX + '_' + str(i + 1)
@attr('basic_test')
-def test_ps_s3_data_path_v2_mixed_migration():
+def test_data_path_v2_mixed_migration():
""" test data path v2 mixed migration """
if get_config_cluster() == 'noname':
return SkipTest('realm is needed for migration test')
host = get_ip()
http_port = random.randint(10000, 20000)
- # create s3 topic
+ # create topic
buckets_list = []
topics_conf_list = []
s3_notification_conf_list = []
bucket = conn.create_bucket(bucket_name)
buckets_list.append(bucket)
topic_name = bucket_name + TOPIC_SUFFIX + created_version
- # create s3 topic
+ # create topic
endpoint_address = 'http://' + host + ':' + str(http_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topics_conf_list.append(topic_conf)
topic_arn = topic_conf.set_config()
topic_arn_list.append(topic_arn)
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version
s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
# disable v2 notification
zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
- # create s3 topic
+ # create topic
created_version = '_created_v1'
for conn, bucket in zip(connections_list, buckets_list):
# create bucket
bucket_name = bucket.name
topic_name = bucket_name + TOPIC_SUFFIX + created_version
- # create s3 topic
+ # create topic
endpoint_address = 'http://' + host + ':' + str(http_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topics_conf_list.append(topic_conf)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version
s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
endpoint_address = 'kafka://' + default_kafka_server + ':' + str(incorrect_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true'
- # create s3 topic
+ # create topic
zonegroup = get_config_zonegroup()
topic_conf = PSTopicS3(conn, topic_name, zonegroup,
endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
# remove the port and update the topic, so its pointing to correct endpoint.
endpoint_address = 'kafka://' + default_kafka_server
- # update s3 topic
+ # update topic
topic_conf.set_attributes(attribute_name="push-endpoint",
attribute_val=endpoint_address)
keys = list(bucket.list())
endpoint_address = 'kafka://' + default_kafka_server
endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true'
- # initially create both s3 topics with `use-ssl=true`
+ # initially create both topics with `use-ssl=true`
zonegroup = get_config_zonegroup()
topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup,
endpoint_args=endpoint_args)
topic_conf_2 = PSTopicS3(conn, topic_name_2, zonegroup,
endpoint_args=endpoint_args)
topic_arn_2 = topic_conf_2.set_config()
- # create s3 notification
+ # create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn_1,
'Events': []
f"one bucket per user {user1_conn.access_key}: {user1_bucket_name} and {user2_conn.access_key}: {user2_bucket_name}"
)
- # create an S3 topic owned by the first user
+ # create an topic owned by the first user
topic_name = user1_bucket_name + TOPIC_SUFFIX
zonegroup = get_config_zonegroup()
endpoint_address = "http://" + host + ":" + str(port)
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- # create s3 notification
+ # create notification
notif_1 = bucket_name + '_notif_1'
topic_conf_list = [{'Id': notif_1, 'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']