From: Yuval Lifshitz Date: Tue, 17 Feb 2026 14:30:43 +0000 (+0000) Subject: test/rgw/notifications: cleanup of tests X-Git-Tag: testing/wip-vshankar-testing-20260226.041846~16^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8f9706576320d7f9eb352c44b8d9923a8a414405;p=ceph-ci.git test/rgw/notifications: cleanup of tests * remove dead code * remove unnecessary text from test names used when we supported "pubsub" (pull mode) Signed-off-by: Yuval Lifshitz --- diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index a051d188260..2c82a3d5f9c 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -255,7 +255,7 @@ class HTTPServerWithEvents(ThreadingHTTPServer): self.lock.release() def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): - """verify stored s3 records agains a list of keys""" + """verify stored records agains a list of keys""" self.acquire_lock() log.info('verify_s3_events: http server has %d events', len(self.events)) try: @@ -350,7 +350,7 @@ class AMQPReceiver(object): # TODO create a base class for the AMQP and HTTP cases def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): - """verify stored s3 records agains a list of keys""" + """verify stored records agains a list of keys""" verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags) self.events = [] @@ -494,7 +494,7 @@ class KafkaReceiver(object): self.stop = False def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): - """verify stored s3 records agains a list of keys""" + """verify stored records agains a list of keys""" verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags) self.events = [] @@ -742,8 +742,8 @@ def connect_random_user(tenant=''): @attr('basic_test') -def test_ps_s3_topic_on_master(): - """ test s3 topics set/get/delete on master """ +def test_topic(): + """ test topics set/get/delete """ tenant = 'kaboom' conn = connect_random_user(tenant) @@ -756,7 +756,7 @@ def test_ps_s3_topic_on_master(): bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topics + # create topics endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) @@ -806,8 +806,8 @@ def test_ps_s3_topic_on_master(): @attr('basic_test') -def test_ps_s3_topic_admin_on_master(): - """ test s3 topics set/get/delete on master """ +def test_topic_admin(): + """ test topics set/get/delete """ tenant = 'kaboom' conn = connect_random_user(tenant) @@ -820,7 +820,7 @@ def test_ps_s3_topic_admin_on_master(): bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topics + # create topics endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) @@ -896,14 +896,14 @@ def notification_configuration(with_cli): # make sure there are no leftover topics delete_all_topics(conn, '', get_config_cluster()) - # create s3 topics + # create topics endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup + '::' + topic_name) - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [ { @@ -1000,66 +1000,21 @@ def test_notification_configuration_admin(): notification_configuration(True) -@attr('modification_required') -def test_ps_s3_topic_with_secret_on_master(): - """ test s3 topics with secret set/get/delete on master """ - return SkipTest('secure connection is needed to test topic with secrets') - - conn = connection1() - if conn.secure_conn is None: - return SkipTest('secure connection is needed to test topic with secrets') - - zonegroup = get_config_zonegroup() - bucket_name = gen_bucket_name() - topic_name = bucket_name + TOPIC_SUFFIX - - # clean all topics - delete_all_s3_topics(conn, zonegroup) - - # create s3 topics - endpoint_address = 'amqp://user:password@127.0.0.1:7001' - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - bad_topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) - try: - result = bad_topic_conf.set_config() - except Exception as err: - print('Error is expected: ' + str(err)) - else: - assert False, 'user password configuration set allowed only over HTTPS' - topic_conf = PSTopicS3(conn.secure_conn, topic_name, zonegroup, endpoint_args=endpoint_args) - topic_arn = topic_conf.set_config() - - assert_equal(topic_arn, - 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name) - - _, status = bad_topic_conf.get_config() - assert_equal(status/100, 4) - - # get topic - result, status = topic_conf.get_config() - assert_equal(status, 200) - assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) - assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) - - _, status = bad_topic_conf.get_config() - assert_equal(status/100, 4) - - _, status = topic_conf.get_list() - assert_equal(status/100, 2) - - # delete topics - result = topic_conf.del_config() +@attr('not_implemented') +def test_topic_with_secret(): + """ test topics with secret set/get/delete """ + return SkipTest('This test is yet to be implemented') @attr('basic_test') def test_notification_configuration(): - """ test s3 notification set/get/deleter """ + """ test notification set/get/deleter """ notification_configuration(False) @attr('basic_test') -def test_ps_s3_notification_on_master_empty_config(): - """ test s3 notification set/get/delete on master with empty config """ +def test_notification_empty_config(): + """ test notification set/get/delete with empty config """ hostname = get_ip() conn = connection() @@ -1071,13 +1026,13 @@ def test_ps_s3_notification_on_master_empty_config(): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, @@ -1092,7 +1047,7 @@ def test_ps_s3_notification_on_master_empty_config(): assert_equal(status/100, 2) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) - # create s3 notification again with empty configuration to check if it deletes or not + # create notification again with empty configuration to check if it deletes or not topic_conf_list = [] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) @@ -1115,8 +1070,8 @@ def test_ps_s3_notification_on_master_empty_config(): @attr('amqp_test') -def test_ps_s3_notification_filter_on_master(): - """ test s3 notification filter on master """ +def test_notification_filter_amqp(): + """ test notification filter """ hostname = get_ip() @@ -1135,14 +1090,14 @@ def test_ps_s3_notification_filter_on_master(): task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() - # create s3 topic + # create topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, @@ -1292,21 +1247,21 @@ def test_ps_s3_notification_filter_on_master(): @attr('basic_test') -def test_ps_s3_notification_errors_on_master(): - """ test s3 notification set/get/delete on master """ +def test_notification_errors(): + """ test notification set/get/delete """ conn = connection() zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() # create bucket bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification with invalid event name + # create notification with invalid event name notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -1320,7 +1275,7 @@ def test_ps_s3_notification_errors_on_master(): else: assert False, 'invalid event name is expected to fail' - # create s3 notification with missing name + # create notification with missing name topic_conf_list = [{'Id': '', 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] @@ -1333,7 +1288,7 @@ def test_ps_s3_notification_errors_on_master(): else: assert False, 'missing notification name is expected to fail' - # create s3 notification with invalid topic ARN + # create notification with invalid topic ARN invalid_topic_arn = 'kaboom' topic_conf_list = [{'Id': notification_name, 'TopicArn': invalid_topic_arn, @@ -1347,7 +1302,7 @@ def test_ps_s3_notification_errors_on_master(): else: assert False, 'invalid ARN is expected to fail' - # create s3 notification with unknown topic ARN + # create notification with unknown topic ARN invalid_topic_arn = 'arn:aws:sns:a::kaboom' topic_conf_list = [{'Id': notification_name, 'TopicArn': invalid_topic_arn , @@ -1361,7 +1316,7 @@ def test_ps_s3_notification_errors_on_master(): else: assert False, 'unknown topic is expected to fail' - # create s3 notification with wrong bucket + # create notification with wrong bucket topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] @@ -1388,7 +1343,7 @@ def test_ps_s3_notification_errors_on_master(): conn.delete_bucket(bucket_name) -def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None): +def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None): """ test pushinging notification """ zonegroup = get_config_zonegroup() # create bucket @@ -1411,7 +1366,7 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafk endpoint_args = 'push-endpoint='+endpoint_address topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -1429,10 +1384,10 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafk # with acks from broker exchange = 'ex1' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - # create two s3 topic + # create two topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -1454,10 +1409,10 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafk endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' if kafka_brokers is not None: endpoint_args += '&kafka-brokers=' + kafka_brokers - # create s3 topic + # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -1526,17 +1481,15 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafk @attr('amqp_test') -def test_notification_push_amqp(): +def test_notification_amqp(): """ test pushing amqp notification """ - return SkipTest("Running into an issue with amqp when we make exact_match=true") conn = connection() - notification_push('amqp', conn) + notification('amqp', conn) @attr('manual_test') -def test_ps_s3_notification_push_amqp_idleness_check(): - """ test pushing amqp s3 notification and checking for connection idleness """ - return SkipTest("only used in manual testing") +def test_notification_amqp_idleness_check(): + """ test pushing amqp notification and checking for connection idleness """ hostname = get_ip() conn = connection() zonegroup = get_config_zonegroup() @@ -1551,13 +1504,13 @@ def test_ps_s3_notification_push_amqp_idleness_check(): task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1) task1.start() - # create two s3 topic + # create two topic endpoint_address = 'amqp://' + hostname # with acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, 'Events': [] @@ -1665,24 +1618,24 @@ def test_ps_s3_notification_push_amqp_idleness_check(): @attr('kafka_test') -def test_notification_push_kafka(): - """ test pushing kafka s3 notification on master """ +def test_notification_kafka(): + """ test pushing kafka notification """ conn = connection() - notification_push('kafka', conn) + notification('kafka', conn) @attr('kafka_failover') -def test_notification_push_kafka_multiple_brokers_override(): - """ test pushing kafka s3 notification on master """ +def test_notification_kafka_multiple_brokers_override(): + """ test pushing kafka notification """ conn = connection() - notification_push('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server)) + notification('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server)) @attr('kafka_failover') -def test_notification_push_kafka_multiple_brokers_append(): - """ test pushing kafka s3 notification on master """ +def test_notification_kafka_multiple_brokers_append(): + """ test pushing kafka notification """ conn = connection() - notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server)) + notification('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server)) @attr('manual_test') @@ -1789,8 +1742,8 @@ def test_1K_topics(): @attr('http_test') -def test_ps_s3_notification_multi_delete_on_master(): - """ test deletion of multiple keys on master """ +def test_notification_multi_delete(): + """ test deletion of multiple keys """ hostname = get_ip() conn = connection() zonegroup = get_config_zonegroup() @@ -1807,12 +1760,12 @@ def test_ps_s3_notification_multi_delete_on_master(): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -1857,23 +1810,23 @@ def test_ps_s3_notification_multi_delete_on_master(): @attr('http_test') -def test_notification_push_http(): - """ test pushing http s3 notification """ +def test_notification_http(): + """ test pushing http notification """ conn = connection() - notification_push('http', conn) + notification('http', conn) @attr('http_test') -def test_notification_push_cloudevents(): +def test_notification_cloudevents(): """ test pushing cloudevents notification """ conn = connection() - notification_push('http', conn, cloudevents=True) + notification('http', conn, cloudevents=True) @attr('http_test') -def test_ps_s3_opaque_data_on_master(): - """ test that opaque id set in topic, is sent in notification on master """ +def test_opaque_data(): + """ test that opaque id set in topic, is sent in notification """ hostname = get_ip() conn = connection() zonegroup = get_config_zonegroup() @@ -1890,13 +1843,13 @@ def test_ps_s3_opaque_data_on_master(): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address opaque_data = 'http://1.2.3.4:8888' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -1977,10 +1930,10 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa else: return SkipTest('Unknown endpoint type: ' + endpoint_type) - # create s3 topic + # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, @@ -2126,8 +2079,8 @@ def test_lifecycle_abort_mpu(): ['ObjectLifecycle:Expiration:AbortMultipartUpload'], True) -def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): - """ test object creation s3 notifications in using put/copy/post on master""" +def creation_triggers(external_endpoint_address=None, ca_location=None, verify_ssl='true'): + """ test object creation notifications in using put/copy/post""" if not external_endpoint_address: hostname = get_ip() @@ -2150,7 +2103,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location) task.start() - # create s3 topic + # create topic if external_endpoint_address: endpoint_address = external_endpoint_address elif ca_location: @@ -2162,7 +2115,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio endpoint_args += '&ca-location={}'.format(ca_location) topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:CompleteMultipartUpload'] @@ -2217,12 +2170,12 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio @attr('amqp_test') -def test_ps_s3_creation_triggers_on_master(): - ps_s3_creation_triggers_on_master(external_endpoint_address="amqp://localhost:5672") +def test_creation_triggers_amqp(): + creation_triggers(external_endpoint_address="amqp://localhost:5672") @attr('amqp_ssl_test') -def test_ps_s3_creation_triggers_on_master_external(): +def test_creation_triggers_external(): from distutils.util import strtobool @@ -2235,7 +2188,7 @@ def test_ps_s3_creation_triggers_on_master_external(): except Exception as e: verify_ssl = 'true' - ps_s3_creation_triggers_on_master( + creation_triggers( external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'], verify_ssl=verify_ssl) else: @@ -2334,7 +2287,7 @@ def generate_private_key(tempdir): @attr('amqp_ssl_test') -def test_ps_s3_creation_triggers_on_master_ssl(): +def test_creation_triggers_ssl(): import textwrap from tempfile import TemporaryDirectory @@ -2357,13 +2310,13 @@ def test_ps_s3_creation_triggers_on_master_ssl(): ''')) os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0] - ps_s3_creation_triggers_on_master(ca_location=CACERTFILE) + creation_triggers(ca_location=CACERTFILE) del os.environ['RABBITMQ_CONFIG_FILE'] @attr('amqp_test') -def test_http_post_object_upload(): +def test_post_object_upload_amqp(): """ test that uploads object using HTTP POST """ import boto3 @@ -2398,13 +2351,13 @@ def test_http_post_object_upload(): task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1') task1.start() - # create s3 topics + # create topics endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() - # create s3 notifications + # create notifications notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:Post'] @@ -2468,10 +2421,10 @@ def multipart_endpoint_agnostic(endpoint_type, conn): else: return SkipTest('Unknown endpoint type: ' + endpoint_type) - # create s3 topic + # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -2563,11 +2516,11 @@ def metadata_filter(endpoint_type, conn): else: return SkipTest('Unknown endpoint type: ' + endpoint_type) - # create s3 topic + # create topic zonegroup = get_config_zonegroup() topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX meta_key = 'meta1' meta_value = 'This is my metadata value' @@ -2670,8 +2623,8 @@ def test_metadata_filter_ampq(): @attr('amqp_test') -def test_ps_s3_metadata_on_master(): - """ test s3 notification of metadata on master """ +def test_metadata_amqp(): + """ test notification of metadata """ hostname = get_ip() conn = connection() @@ -2687,12 +2640,12 @@ def test_ps_s3_metadata_on_master(): task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() - # create s3 topic + # create topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX meta_key = 'meta1' meta_value = 'This is my metadata value' @@ -2764,8 +2717,8 @@ def test_ps_s3_metadata_on_master(): @attr('amqp_test') -def test_ps_s3_tags_on_master(): - """ test s3 notification of tags on master """ +def test_tags_amqp(): + """ test notification of tags """ hostname = get_ip() conn = connection() @@ -2781,12 +2734,12 @@ def test_ps_s3_tags_on_master(): task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() - # create s3 topic + # create topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], @@ -2875,8 +2828,8 @@ def test_ps_s3_tags_on_master(): conn.delete_bucket(bucket_name) @attr('amqp_test') -def test_ps_s3_versioning_on_master(): - """ test s3 notification of object versions """ +def test_versioning_amqp(): + """ test notification of object versions """ hostname = get_ip() conn = connection() @@ -2893,7 +2846,7 @@ def test_ps_s3_versioning_on_master(): task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() - # create s3 topic + # create topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) @@ -2950,8 +2903,8 @@ def test_ps_s3_versioning_on_master(): @attr('amqp_test') -def test_ps_s3_versioned_deletion_on_master(): - """ test s3 notification of deletion markers on master """ +def test_versioned_deletion_amqp(): + """ test notification of deletion markers """ hostname = get_ip() conn = connection() @@ -2968,12 +2921,12 @@ def test_ps_s3_versioned_deletion_on_master(): task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() - # create s3 topic + # create topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:*'] @@ -3050,9 +3003,8 @@ def test_ps_s3_versioned_deletion_on_master(): @attr('manual_test') -def test_ps_s3_persistent_cleanup(): +def test_persistent_cleanup(): """ test reservation cleanup after gateway crash """ - return SkipTest("only used in manual testing") conn = connection() zonegroup = get_config_zonegroup() @@ -3070,13 +3022,13 @@ def test_ps_s3_persistent_cleanup(): bucket = gw.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf = PSTopicS3(gw, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] @@ -3228,10 +3180,10 @@ def persistent_topic_stats(conn, endpoint_type): else: return SkipTest('Unknown endpoint type: ' + endpoint_type) - # create s3 topic + # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -3340,13 +3292,13 @@ def test_persistent_topic_dump(): task.start() verify_kafka_receiver(receiver) - # create s3 topic + # create topic endpoint_address = 'kafka://WrongHost' # wrong port endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ '&retry_sleep_duration=1' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -3415,7 +3367,7 @@ def test_persistent_topic_dump(): receiver.close(task) -def ps_s3_persistent_topic_configs(persistency_time, config_dict): +def persistent_topic_configs(persistency_time, config_dict): # create connection with no retries conn = connection(no_retries=True) zonegroup = get_config_zonegroup() @@ -3429,12 +3381,12 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true&'+create_persistency_config_string(config_dict) topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -3518,25 +3470,24 @@ def create_persistency_config_string(config_dict): return str_ret[:-1] @attr('basic_test') -def test_ps_s3_persistent_topic_configs_ttl(): +def test_persistent_topic_configs_ttl(): """ test persistent topic configurations with time_to_live """ config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"} persistency_time = config_dict["time_to_live"] - ps_s3_persistent_topic_configs(persistency_time, config_dict) + persistent_topic_configs(persistency_time, config_dict) @attr('basic_test') -def test_ps_s3_persistent_topic_configs_max_retries(): +def test_persistent_topic_configs_max_retries(): """ test persistent topic configurations with max_retries and retry_sleep_duration """ config_dict = {"time_to_live": "None", "max_retries": 10, "retry_sleep_duration": 1} persistency_time = config_dict["max_retries"]*config_dict["retry_sleep_duration"] - ps_s3_persistent_topic_configs(persistency_time, config_dict) + persistent_topic_configs(persistency_time, config_dict) @attr('manual_test') -def test_ps_s3_persistent_notification_pushback(): +def test_persistent_notificationback(): """ test pushing persistent notification pushback """ - return SkipTest("only used in manual testing") conn = connection() zonegroup = get_config_zonegroup() @@ -3551,12 +3502,12 @@ def test_ps_s3_persistent_notification_pushback(): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -3616,8 +3567,8 @@ def test_ps_s3_persistent_notification_pushback(): @attr('kafka_test') -def test_ps_s3_notification_kafka_idle_behaviour(): - """ test pushing kafka s3 notification idle behaviour check """ +def test_notification_kafka_idle_behaviour(): + """ test pushing kafka notification idle behaviour check """ conn = connection() zonegroup = get_config_zonegroup() @@ -3632,13 +3583,13 @@ def test_ps_s3_notification_kafka_idle_behaviour(): task.start() verify_kafka_receiver(receiver) - # create s3 topic + # create topic endpoint_address = 'kafka://' + default_kafka_server # with acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, 'Events': [] @@ -3748,202 +3699,16 @@ def test_ps_s3_notification_kafka_idle_behaviour(): stop_kafka_receiver(receiver, task) -@attr('modification_required') -def test_ps_s3_persistent_gateways_recovery(): +@attr('not_implemented') +def test_persistent_gateways_recovery(): """ test gateway recovery of persistent notifications """ - return SkipTest('This test requires two gateways.') - - conn = connection() - zonegroup = get_config_zonegroup() - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - number_of_objects = 10 - http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) - gw1 = conn - gw2 = connection2() - # create bucket - bucket_name = gen_bucket_name() - bucket = gw1.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - # create two s3 topics - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' - topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw1') - topic_arn1 = topic_conf1.set_config() - topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw2') - topic_arn2 = topic_conf2.set_config() - # create two s3 notifications - notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' - topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, - 'Events': ['s3:ObjectCreated:Put'] - }] - s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list) - response, status = s3_notification_conf1.set_config() - assert_equal(status/100, 2) - notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' - topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, - 'Events': ['s3:ObjectRemoved:Delete'] - }] - s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list) - response, status = s3_notification_conf2.set_config() - assert_equal(status/100, 2) - # stop gateway 2 - print('stopping gateway2...') - client_threads = [] - start_time = time.time() - for i in range(number_of_objects): - key = bucket.new_key(str(i)) - content = str(os.urandom(1024*1024)) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - keys = list(bucket.list()) - # delete objects from the bucket - client_threads = [] - start_time = time.time() - for key in bucket.list(): - thr = threading.Thread(target = key.delete, args=()) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - print('wait for 60 sec for before restarting the gateway') - time.sleep(60) - # check http receiver - events = http_server.get_and_reset_events() - for key in keys: - creations = 0 - deletions = 0 - for event in events: - if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \ - key.name == event['Records'][0]['s3']['object']['key']: - creations += 1 - elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \ - key.name == event['Records'][0]['s3']['object']['key']: - deletions += 1 - assert_equal(creations, 1) - assert_equal(deletions, 1) - # cleanup - s3_notification_conf1.del_config() - topic_conf1.del_config() - gw1.delete_bucket(bucket_name) - time.sleep(10) - s3_notification_conf2.del_config() - topic_conf2.del_config() - http_server.close() + return SkipTest('This test is yet to be implemented') -@attr('modification_required') -def test_ps_s3_persistent_multiple_gateways(): +@attr('not_implemented') +def test_persistent_multiple_gateways(): """ test pushing persistent notification via two gateways """ - return SkipTest('This test requires two gateways.') - - conn = connection() - zonegroup = get_config_zonegroup() - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - number_of_objects = 10 - http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) - gw1 = conn - gw2 = connection2() - # create bucket - bucket_name = gen_bucket_name() - bucket1 = gw1.create_bucket(bucket_name) - bucket2 = gw2.get_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - # create two s3 topics - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' - topic1_opaque = 'fromgw1' - topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic1_opaque) - topic_arn1 = topic_conf1.set_config() - topic2_opaque = 'fromgw2' - topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic2_opaque) - topic_arn2 = topic_conf2.set_config() - # create two s3 notifications - notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' - topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, - 'Events': [] - }] - s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list) - response, status = s3_notification_conf1.set_config() - assert_equal(status/100, 2) - notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' - topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, - 'Events': [] - }] - s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list) - response, status = s3_notification_conf2.set_config() - assert_equal(status/100, 2) - client_threads = [] - start_time = time.time() - for i in range(number_of_objects): - key = bucket1.new_key('gw1_'+str(i)) - content = str(os.urandom(1024*1024)) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - key = bucket2.new_key('gw2_'+str(i)) - content = str(os.urandom(1024*1024)) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - keys = list(bucket1.list()) - delay = 30 - print('wait for '+str(delay)+'sec for the messages...') - time.sleep(delay) - events = http_server.get_and_reset_events() - for key in keys: - topic1_count = 0 - topic2_count = 0 - for event in events: - if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \ - key.name == event['Records'][0]['s3']['object']['key'] and \ - topic1_opaque == event['Records'][0]['opaqueData']: - topic1_count += 1 - elif event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \ - key.name == event['Records'][0]['s3']['object']['key'] and \ - topic2_opaque == event['Records'][0]['opaqueData']: - topic2_count += 1 - assert_equal(topic1_count, 1) - assert_equal(topic2_count, 1) - # delete objects from the bucket - client_threads = [] - start_time = time.time() - for key in bucket1.list(): - thr = threading.Thread(target = key.delete, args=()) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - print('wait for '+str(delay)+'sec for the messages...') - time.sleep(delay) - events = http_server.get_and_reset_events() - for key in keys: - topic1_count = 0 - topic2_count = 0 - for event in events: - if event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \ - key.name == event['Records'][0]['s3']['object']['key'] and \ - topic1_opaque == event['Records'][0]['opaqueData']: - topic1_count += 1 - elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \ - key.name == event['Records'][0]['s3']['object']['key'] and \ - topic2_opaque == event['Records'][0]['opaqueData']: - topic2_count += 1 - assert_equal(topic1_count, 1) - assert_equal(topic2_count, 1) - # cleanup - s3_notification_conf1.del_config() - topic_conf1.del_config() - s3_notification_conf2.del_config() - topic_conf2.del_config() - gw1.delete_bucket(bucket_name) - http_server.close() + return SkipTest('This test is yet to be implemented') def persistent_topic_multiple_endpoints(conn, endpoint_type): @@ -3985,7 +3750,7 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type): else: return SkipTest('Unknown endpoint type: ' + endpoint_type) - # create two s3 topics + # create two topics topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() endpoint_address = 'http://kaboom:9999' @@ -3994,7 +3759,7 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type): topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() - # create two s3 notifications + # create two notifications notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': [] @@ -4098,10 +3863,10 @@ def persistent_notification(endpoint_type, conn, account=None): return SkipTest('Unknown endpoint type: ' + endpoint_type) - # create s3 topic + # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -4157,13 +3922,13 @@ def persistent_notification(endpoint_type, conn, account=None): @attr('http_test') -def test_ps_s3_persistent_notification_http(): +def test_persistent_notification_http(): """ test pushing persistent notification http """ conn = connection() persistent_notification('http', conn) @attr('http_test') -def test_ps_s3_persistent_notification_http_account(): +def test_persistent_notification_http_account(): """ test pushing persistent notification via http for account user """ account = 'RGW77777777777777777' @@ -4180,14 +3945,14 @@ def test_ps_s3_persistent_notification_http_account(): admin(['account', 'rm', '--account-id', account], get_config_cluster()) @attr('amqp_test') -def test_ps_s3_persistent_notification_amqp(): +def test_persistent_notification_amqp(): """ test pushing persistent notification amqp """ conn = connection() persistent_notification('amqp', conn) @attr('kafka_test') -def test_ps_s3_persistent_notification_kafka(): +def test_persistent_notification_kafka(): """ test pushing persistent notification kafka """ conn = connection() persistent_notification('kafka', conn) @@ -4200,7 +3965,7 @@ def random_string(length): @attr('amqp_test') -def test_ps_s3_persistent_notification_large(): +def test_persistent_notification_large_amqp(): """ test pushing persistent notification of large notifications """ conn = connection() @@ -4223,10 +3988,10 @@ def test_ps_s3_persistent_notification_large(): # amqp broker guarantee ordering exact_match = True - # create s3 topic + # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -4285,309 +4050,26 @@ def test_ps_s3_persistent_notification_large(): stop_amqp_receiver(receiver, task) -@attr('modification_required') -def test_ps_s3_topic_update(): +@attr('not_implemented') +def test_topic_update(): """ test updating topic associated with a notification""" - return SkipTest('This test is yet to be modified.') - - conn = connection() - ps_zone = None - bucket_name = gen_bucket_name() - topic_name = bucket_name+TOPIC_SUFFIX - # create amqp topic - hostname = get_ip() - exchange = 'ex1' - amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name) - amqp_task.start() - #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') - - topic_arn = topic_conf.set_config() - #result, status = topic_conf.set_config() - #assert_equal(status/100, 2) - parsed_result = json.loads(result) - topic_arn = parsed_result['arn'] - # get topic - result, _ = topic_conf.get_config() - # verify topic content - parsed_result = json.loads(result) - assert_equal(parsed_result['topic']['name'], topic_name) - assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) - # create http server - port = random.randint(10000, 20000) - # start an http server in a separate thread - http_server = StreamingHTTPServer(hostname, port) - # create bucket on the first of the rados zones - bucket = conn.create_bucket(bucket_name) - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn, - 'Events': ['s3:ObjectCreated:*'] - }] - s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) - _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - # create objects in the bucket - number_of_objects = 10 - for i in range(number_of_objects): - key = bucket.new_key(str(i)) - key.set_contents_from_string('bar') - # wait for sync - #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) - keys = list(bucket.list()) - # TODO: use exact match - receiver.verify_s3_events(keys, exact_match=True) - # update the same topic with new endpoint - #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='http://'+ hostname + ':' + str(port)) - topic_conf = PSTopicS3(conn, topic_name, endpoint_args='http://'+ hostname + ':' + str(port)) - _, status = topic_conf.set_config() - assert_equal(status/100, 2) - # get topic - result, _ = topic_conf.get_config() - # verify topic content - parsed_result = json.loads(result) - assert_equal(parsed_result['topic']['name'], topic_name) - assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) - # delete current objects and create new objects in the bucket - for key in bucket.list(): - key.delete() - for i in range(number_of_objects): - key = bucket.new_key(str(i+100)) - key.set_contents_from_string('bar') - # wait for sync - #zone_meta_checkpoint(ps_zone.zone) - #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) - keys = list(bucket.list()) - # verify that notifications are still sent to amqp - # TODO: use exact match - receiver.verify_s3_events(keys, exact_match=False) - # update notification to update the endpoint from the topic - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn, - 'Events': ['s3:ObjectCreated:*'] - }] - s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) - _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - # delete current objects and create new objects in the bucket - for key in bucket.list(): - key.delete() - for i in range(number_of_objects): - key = bucket.new_key(str(i+200)) - key.set_contents_from_string('bar') - # wait for sync - #zone_meta_checkpoint(ps_zone.zone) - #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) - keys = list(bucket.list()) - # check that updates switched to http - # TODO: use exact match - http_server.verify_s3_events(keys, exact_match=False) - # cleanup - # delete objects from the bucket - stop_amqp_receiver(receiver, amqp_task) - for key in bucket.list(): - key.delete() - s3_notification_conf.del_config() - topic_conf.del_config() - conn.delete_bucket(bucket_name) - http_server.close() + return SkipTest('This test is yet to be implemented') -@attr('modification_required') -def test_ps_s3_notification_update(): +@attr('not_implemented') +def test_notification_update(): """ test updating the topic of a notification""" - return SkipTest('This test is yet to be modified.') - - hostname = get_ip() - conn = connection() - ps_zone = None - bucket_name = gen_bucket_name() - topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX - topic_name2 = bucket_name+'http'+TOPIC_SUFFIX - zonegroup = get_config_zonegroup() - # create topics - # start amqp receiver in a separate thread - exchange = 'ex1' - amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) - amqp_task.start() - # create random port for the http server - http_port = random.randint(10000, 20000) - # start an http server in a separate thread - http_server = StreamingHTTPServer(hostname, http_port) - #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') - topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') - result, status = topic_conf1.set_config() - parsed_result = json.loads(result) - topic_arn1 = parsed_result['arn'] - assert_equal(status/100, 2) - #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port)) - topic_conf2 = PSTopicS3(conn, topic_name2, endpoint_args='http://'+hostname+':'+str(http_port)) - result, status = topic_conf2.set_config() - parsed_result = json.loads(result) - topic_arn2 = parsed_result['arn'] - assert_equal(status/100, 2) - # create bucket on the first of the rados zones - bucket = conn.create_bucket(bucket_name) - # wait for sync - #zone_meta_checkpoint(ps_zone.zone) - # create s3 notification with topic1 - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn1, - 'Events': ['s3:ObjectCreated:*'] - }] - s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) - _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - # create objects in the bucket - number_of_objects = 10 - for i in range(number_of_objects): - key = bucket.new_key(str(i)) - key.set_contents_from_string('bar') - # wait for sync - #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) - keys = list(bucket.list()) - # TODO: use exact match - receiver.verify_s3_events(keys, exact_match=False); - # update notification to use topic2 - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn2, - 'Events': ['s3:ObjectCreated:*'] - }] - s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) - _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - # delete current objects and create new objects in the bucket - for key in bucket.list(): - key.delete() - for i in range(number_of_objects): - key = bucket.new_key(str(i+100)) - key.set_contents_from_string('bar') - # wait for sync - #zone_meta_checkpoint(ps_zone.zone) - #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) - keys = list(bucket.list()) - # check that updates switched to http - # TODO: use exact match - http_server.verify_s3_events(keys, exact_match=False) - # cleanup - # delete objects from the bucket - stop_amqp_receiver(receiver, amqp_task) - for key in bucket.list(): - key.delete() - s3_notification_conf.del_config() - topic_conf1.del_config() - topic_conf2.del_config() - conn.delete_bucket(bucket_name) - http_server.close() + return SkipTest('This test is yet to be implemented') -@attr('modification_required') -def test_ps_s3_multiple_topics_notification(): +@attr('not_implemented') +def test_multiple_topics_notification(): """ test notification creation with multiple topics""" - return SkipTest('This test is yet to be modified.') - - hostname = get_ip() - zonegroup = get_config_zonegroup() - conn = connection() - ps_zone = None - bucket_name = gen_bucket_name() - topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX - topic_name2 = bucket_name+'http'+TOPIC_SUFFIX - # create topics - # start amqp receiver in a separate thread - exchange = 'ex1' - amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) - amqp_task.start() - # create random port for the http server - http_port = random.randint(10000, 20000) - # start an http server in a separate thread - http_server = StreamingHTTPServer(hostname, http_port) - #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') - topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') - result, status = topic_conf1.set_config() - parsed_result = json.loads(result) - topic_arn1 = parsed_result['arn'] - assert_equal(status/100, 2) - #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port)) - topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args='http://'+hostname+':'+str(http_port)) - result, status = topic_conf2.set_config() - parsed_result = json.loads(result) - topic_arn2 = parsed_result['arn'] - assert_equal(status/100, 2) - # create bucket on the first of the rados zones - bucket = conn.create_bucket(bucket_name) - # wait for sync - #zone_meta_checkpoint(ps_zone.zone) - # create s3 notification - notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' - notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' - topic_conf_list = [ - { - 'Id': notification_name1, - 'TopicArn': topic_arn1, - 'Events': ['s3:ObjectCreated:*'] - }, - { - 'Id': notification_name2, - 'TopicArn': topic_arn2, - 'Events': ['s3:ObjectCreated:*'] - }] - s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) - _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - result, _ = s3_notification_conf.get_config() - assert_equal(len(result['TopicConfigurations']), 2) - assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1) - assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2) - # get auto-generated subscriptions - sub_conf1 = PSSubscription(ps_zone.conn, notification_name1, - topic_name1) - _, status = sub_conf1.get_config() - assert_equal(status/100, 2) - sub_conf2 = PSSubscription(ps_zone.conn, notification_name2, - topic_name2) - _, status = sub_conf2.get_config() - assert_equal(status/100, 2) - # create objects in the bucket - number_of_objects = 10 - for i in range(number_of_objects): - key = bucket.new_key(str(i)) - key.set_contents_from_string('bar') - # wait for sync - #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) - # get the events from both of the subscription - result, _ = sub_conf1.get_events() - records = json.loads(result) - for record in records['Records']: - log.debug(record) - keys = list(bucket.list()) - # TODO: use exact match - verify_s3_records_by_elements(records, keys, exact_match=False) - receiver.verify_s3_events(keys, exact_match=False) - result, _ = sub_conf2.get_events() - parsed_result = json.loads(result) - for record in parsed_result['Records']: - log.debug(record) - keys = list(bucket.list()) - # TODO: use exact match - verify_s3_records_by_elements(records, keys, exact_match=False) - http_server.verify_s3_events(keys, exact_match=False) - # cleanup - stop_amqp_receiver(receiver, amqp_task) - s3_notification_conf.del_config() - topic_conf1.del_config() - topic_conf2.del_config() - # delete objects from the bucket - for key in bucket.list(): - key.delete() - conn.delete_bucket(bucket_name) - http_server.close() + return SkipTest('This test is yet to be implemented') @attr('basic_test') -def test_ps_s3_list_topics_migration(): +def test_list_topics_migration(): """ test list topics on migration""" if get_config_cluster() == 'noname': return SkipTest('realm is needed for migration test') @@ -4632,7 +4114,7 @@ def test_ps_s3_list_topics_migration(): delete_all_topics(conn1, '', conf_cluster) delete_all_topics(conn2, tenant, conf_cluster) - # Create s3 - v1 topics + # Create - v1 topics topic_conf = PSTopicS3(conn1, topic_versions['topic3_v1'], zonegroup, endpoint_args=endpoint_args) topic_arn3 = topic_conf.set_config() topic_conf = PSTopicS3(conn1, topic_versions['topic4_v1'], zonegroup, endpoint_args=endpoint_args) @@ -4649,7 +4131,7 @@ def test_ps_s3_list_topics_migration(): # Start v2 notification zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) - # Create s3 - v2 topics + # Create - v2 topics topic_conf = PSTopicS3(conn1, topic_versions['topic1_v2'], zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf.set_config() topic_conf = PSTopicS3(conn1, topic_versions['topic2_v2'], zonegroup, endpoint_args=endpoint_args) @@ -4690,7 +4172,7 @@ def test_ps_s3_list_topics_migration(): @attr('basic_test') -def test_ps_s3_list_topics(): +def test_list_topics(): """ test list topics""" # Initialize connections, topic names and configurations @@ -4713,7 +4195,7 @@ def test_ps_s3_list_topics(): delete_all_topics(conn1, '', get_config_cluster()) delete_all_topics(conn2, tenant, get_config_cluster()) - # Create s3 - v2 topics + # Create - v2 topics topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf.set_config() topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args) @@ -4753,7 +4235,7 @@ def test_ps_s3_list_topics(): tenant_topic_conf.del_config(tenant_topic_arn2) @attr('basic_test') -def test_ps_s3_list_topics_v1(): +def test_list_topics_v1(): """ test list topics on v1""" if get_config_cluster() == 'noname': return SkipTest('realm is needed') @@ -4782,7 +4264,7 @@ def test_ps_s3_list_topics_v1(): # Make sure that we disable v2 zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) - # Create s3 - v1 topics + # Create - v1 topics topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf.set_config() topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args) @@ -4822,8 +4304,8 @@ def test_ps_s3_list_topics_v1(): tenant_topic_conf.del_config(tenant_topic_arn2) -def ps_s3_topic_permissions(another_tenant=""): - """ test s3 topic set/get/delete permissions """ +def topic_permissions(another_tenant=""): + """ test topic set/get/delete permissions """ conn1 = connection() conn2, arn2 = another_user(tenant=another_tenant) zonegroup = get_config_zonegroup() @@ -4841,7 +4323,7 @@ def ps_s3_topic_permissions(another_tenant=""): } ] }) - # create s3 topic with DENY policy + # create topic with DENY policy endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy) @@ -4937,25 +4419,25 @@ def ps_s3_topic_permissions(another_tenant=""): @attr('basic_test') -def test_ps_s3_topic_permissions_same_tenant(): - ps_s3_topic_permissions() +def test_topic_permissions_same_tenant(): + topic_permissions() @attr('basic_test') -def test_ps_s3_topic_permissions_cross_tenant(): - ps_s3_topic_permissions(another_tenant="boom") +def test_topic_permissions_cross_tenant(): + topic_permissions(another_tenant="boom") @attr('basic_test') -def test_ps_s3_topic_no_permissions(): - """ test s3 topic set/get/delete permissions """ +def test_topic_no_permissions(): + """ test topic set/get/delete permissions """ conn1 = connection() conn2, _ = another_user() zonegroup = 'default' bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic without policy + # create topic without policy endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args) @@ -5021,7 +4503,7 @@ def test_ps_s3_topic_no_permissions(): def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False): - """ test pushing kafka s3 notification securly to master """ + """ test pushing kafka notification securly to master """ conn = connection() zonegroup = get_config_zonegroup() # create bucket @@ -5029,7 +4511,7 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F bucket = conn.create_bucket(bucket_name) # name is constant for manual testing topic_name = bucket_name+'_topic' - # create s3 topic + # create topic if security_type == 'SASL_SSL': if not use_topic_attrs_for_creds: endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9094' @@ -5061,7 +4543,7 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F verify_kafka_receiver(receiver) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -5113,37 +4595,37 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F @attr('kafka_security_test') -def test_ps_s3_notification_push_kafka_security_ssl(): +def test_notification_kafka_security_ssl(): kafka_security('SSL') @attr('kafka_security_test') -def test_ps_s3_notification_push_kafka_security_ssl_sasl(): +def test_notification_kafka_security_ssl_sasl(): kafka_security('SASL_SSL') @attr('kafka_security_test') -def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs(): +def test_notification_kafka_security_ssl_sasl_attrs(): kafka_security('SASL_SSL', use_topic_attrs_for_creds=True) @attr('kafka_security_test') -def test_ps_s3_notification_push_kafka_security_sasl(): +def test_notification_kafka_security_sasl(): kafka_security('SASL_PLAINTEXT') @attr('kafka_security_test') -def test_ps_s3_notification_push_kafka_security_ssl_sasl_scram(): +def test_notification_kafka_security_ssl_sasl_scram(): kafka_security('SASL_SSL', mechanism='SCRAM-SHA-256') @attr('kafka_security_test') -def test_ps_s3_notification_push_kafka_security_sasl_scram(): +def test_notification_kafka_security_sasl_scram(): kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256') @attr('http_test') -def test_persistent_ps_s3_reload(): +def test_persistent_reload(): """ do a realm reload while we send notifications """ if get_config_cluster() == 'noname': return SkipTest('realm is needed for reload test') @@ -5169,7 +4651,7 @@ def test_persistent_ps_s3_reload(): bucket = conn.create_bucket(bucket_name) topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' - # create s3 topics + # create topics endpoint_address = 'http://'+host+':'+str(http_port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ '&retry_sleep_duration=1' @@ -5180,7 +4662,7 @@ def test_persistent_ps_s3_reload(): topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': [] @@ -5310,7 +4792,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type): topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -5410,7 +4892,7 @@ def persistent_data_path_v2_migration_kafka(): @attr('http_test') -def test_ps_s3_data_path_v2_migration(): +def test_data_path_v2_migration(): """ test data path v2 migration """ if get_config_cluster() == 'noname': return SkipTest('realm is needed for migration test') @@ -5432,12 +4914,12 @@ def test_ps_s3_data_path_v2_migration(): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'http://'+host+':'+str(http_port) endpoint_args = 'push-endpoint='+endpoint_address topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -5508,7 +4990,7 @@ def test_ps_s3_data_path_v2_migration(): @attr('basic_test') -def test_ps_s3_data_path_v2_large_migration(): +def test_data_path_v2_large_migration(): """ test data path v2 large migration """ if get_config_cluster() == 'noname': return SkipTest('realm is needed for migration test') @@ -5535,7 +5017,7 @@ def test_ps_s3_data_path_v2_large_migration(): host = get_ip() http_port = random.randint(10000, 20000) - # create s3 topic + # create topic buckets_list = [] topics_conf_list = [] s3_notification_conf_list = [] @@ -5546,13 +5028,13 @@ def test_ps_s3_data_path_v2_large_migration(): bucket = conn.create_bucket(bucket_name) buckets_list.append(bucket) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic + # create topic endpoint_address = 'http://' + host + ':' + str(http_port) endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topics_conf_list.append(topic_conf) topic_arn = topic_conf.set_config() - # create s3 110 notifications + # create 110 notifications s3_notification_list = [] for i in range(num_of_s3_notifications): notification_name = bucket_name + NOTIFICATION_SUFFIX + '_' + str(i + 1) @@ -5601,7 +5083,7 @@ def test_ps_s3_data_path_v2_large_migration(): @attr('basic_test') -def test_ps_s3_data_path_v2_mixed_migration(): +def test_data_path_v2_mixed_migration(): """ test data path v2 mixed migration """ if get_config_cluster() == 'noname': return SkipTest('realm is needed for migration test') @@ -5629,7 +5111,7 @@ def test_ps_s3_data_path_v2_mixed_migration(): host = get_ip() http_port = random.randint(10000, 20000) - # create s3 topic + # create topic buckets_list = [] topics_conf_list = [] s3_notification_conf_list = [] @@ -5641,14 +5123,14 @@ def test_ps_s3_data_path_v2_mixed_migration(): bucket = conn.create_bucket(bucket_name) buckets_list.append(bucket) topic_name = bucket_name + TOPIC_SUFFIX + created_version - # create s3 topic + # create topic endpoint_address = 'http://' + host + ':' + str(http_port) endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topics_conf_list.append(topic_conf) topic_arn = topic_conf.set_config() topic_arn_list.append(topic_arn) - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -5662,19 +5144,19 @@ def test_ps_s3_data_path_v2_mixed_migration(): # disable v2 notification zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) - # create s3 topic + # create topic created_version = '_created_v1' for conn, bucket in zip(connections_list, buckets_list): # create bucket bucket_name = bucket.name topic_name = bucket_name + TOPIC_SUFFIX + created_version - # create s3 topic + # create topic endpoint_address = 'http://' + host + ':' + str(http_port) endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topics_conf_list.append(topic_conf) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -5734,12 +5216,12 @@ def test_notification_caching(): endpoint_address = 'kafka://' + default_kafka_server + ':' + str(incorrect_port) endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true' - # create s3 topic + # create topic zonegroup = get_config_zonegroup() topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] @@ -5789,7 +5271,7 @@ def test_notification_caching(): # remove the port and update the topic, so its pointing to correct endpoint. endpoint_address = 'kafka://' + default_kafka_server - # update s3 topic + # update topic topic_conf.set_attributes(attribute_name="push-endpoint", attribute_val=endpoint_address) keys = list(bucket.list()) @@ -5824,7 +5306,7 @@ def test_connection_caching(): endpoint_address = 'kafka://' + default_kafka_server endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true' - # initially create both s3 topics with `use-ssl=true` + # initially create both topics with `use-ssl=true` zonegroup = get_config_zonegroup() topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) @@ -5832,7 +5314,7 @@ def test_connection_caching(): topic_conf_2 = PSTopicS3(conn, topic_name_2, zonegroup, endpoint_args=endpoint_args) topic_arn_2 = topic_conf_2.set_config() - # create s3 notification + # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn_1, 'Events': [] @@ -5935,7 +5417,7 @@ def test_topic_migration_to_an_account(): f"one bucket per user {user1_conn.access_key}: {user1_bucket_name} and {user2_conn.access_key}: {user2_bucket_name}" ) - # create an S3 topic owned by the first user + # create an topic owned by the first user topic_name = user1_bucket_name + TOPIC_SUFFIX zonegroup = get_config_zonegroup() endpoint_address = "http://" + host + ":" + str(port) @@ -6159,7 +5641,7 @@ def persistent_notification_shard_config_change(endpoint_type, conn, new_num_sha topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - # create s3 notification + # create notification notif_1 = bucket_name + '_notif_1' topic_conf_list = [{'Id': notif_1, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']