From ddb6a24dd25a6823c062284a5f6a00bcc5be16d6 Mon Sep 17 00:00:00 2001 From: Ali Masarwa Date: Sun, 19 May 2024 17:23:31 +0300 Subject: [PATCH] RGW\bucket notification tests: end-point agnostic + refactoring Signed-off-by: Ali Masarwa --- src/test/rgw/bucket_notification/test_bn.py | 701 ++++++++------------ 1 file changed, 286 insertions(+), 415 deletions(-) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index df018f79b9950..f7e12f25cda55 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -444,6 +444,11 @@ class KafkaReceiver(object): verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags) self.events = [] + def get_and_reset_events(self): + tmp = self.events + self.events = [] + return tmp + def close(self, task): stop_kafka_receiver(self, task) @@ -701,25 +706,24 @@ def test_ps_s3_topic_admin_on_master(): assert_equal(len(parsed_result['topics']), 0) -@attr('basic_test') -def test_ps_s3_notification_configuration_admin_on_master(): - """ test s3 notification list/get/delete on master """ +def notification_configuration(with_cli): conn = connection() zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() + # create bucket bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - + # make sure there are no leftover topics delete_all_topics(conn, '', get_config_cluster()) # create s3 topics - endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' + endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() assert_equal(topic_arn, - 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1') + 'arn:aws:sns:' + zonegroup + '::' + topic_name) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', @@ -738,31 +742,55 @@ def test_ps_s3_notification_configuration_admin_on_master(): _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) - # list notification - result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['notifications']), 3) - assert_equal(result[1], 0) - # get notification 1 - result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Id'], notification_name+'_1') - assert_equal(result[1], 0) + if with_cli: + result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Id'], notification_name+'_1') + assert_equal(parsed_result['TopicArn'], topic_arn) + assert_equal(result[1], 0) + else: + response, status = s3_notification_conf.get_config(notification=notification_name+'_1') + assert_equal(status/100, 2) + assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) - # remove notification 3 - _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'], get_config_cluster()) - assert_equal(result, 0) + # list notification + if with_cli: + result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result['notifications']), 3) + assert_equal(result[1], 0) + else: + result, status = s3_notification_conf.get_config() + assert_equal(status, 200) + assert_equal(len(result['TopicConfigurations']), 3) + + # delete notification 2 + if with_cli: + _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_2'], get_config_cluster()) + assert_equal(result, 0) + else: + _, status = s3_notification_conf.del_config(notification=notification_name+'_2') + assert_equal(status/100, 2) # list notification - result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['notifications']), 2) - assert_equal(result[1], 0) + if with_cli: + result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result['notifications']), 2) + assert_equal(result[1], 0) + else: + result, status = s3_notification_conf.get_config() + assert_equal(status, 200) + assert_equal(len(result['TopicConfigurations']), 2) # delete notifications - _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster()) - assert_equal(result, 0) + if with_cli: + _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster()) + assert_equal(result, 0) + else: + _, status = s3_notification_conf.del_config() + assert_equal(status/100, 2) # list notification, make sure it is empty result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) @@ -770,6 +798,17 @@ def test_ps_s3_notification_configuration_admin_on_master(): assert_equal(len(parsed_result['notifications']), 0) assert_equal(result[1], 0) + # cleanup + topic_conf.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + + +@attr('basic_test') +def test_notification_configuration_admin(): + """ test notification list/set/get/delete, with admin cli """ + notification_configuration(True) + @attr('modification_required') def test_ps_s3_topic_with_secret_on_master(): @@ -823,64 +862,9 @@ def test_ps_s3_topic_with_secret_on_master(): @attr('basic_test') -def test_ps_s3_notification_on_master(): - """ test s3 notification set/get/delete on master """ - conn = connection() - zonegroup = get_config_zonegroup() - bucket_name = gen_bucket_name() - # create bucket - bucket = conn.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic - endpoint_address = 'amqp://127.0.0.1:7001' - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) - topic_arn = topic_conf.set_config() - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name+'_1', - 'TopicArn': topic_arn, - 'Events': ['s3:ObjectCreated:*'] - }, - {'Id': notification_name+'_2', - 'TopicArn': topic_arn, - 'Events': ['s3:ObjectRemoved:*'] - }, - {'Id': notification_name+'_3', - 'TopicArn': topic_arn, - 'Events': [] - }] - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - - # get notifications on a bucket - response, status = s3_notification_conf.get_config(notification=notification_name+'_1') - assert_equal(status/100, 2) - assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) - - # delete specific notifications - _, status = s3_notification_conf.del_config(notification=notification_name+'_1') - assert_equal(status/100, 2) - - # get the remaining 2 notifications on a bucket - response, status = s3_notification_conf.get_config() - assert_equal(status/100, 2) - assert_equal(len(response['TopicConfigurations']), 2) - assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn) - assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn) - - # delete remaining notifications - _, status = s3_notification_conf.del_config() - assert_equal(status/100, 2) - - # make sure that the notifications are now deleted - _, status = s3_notification_conf.get_config() - - # cleanup - topic_conf.del_config() - # delete the bucket - conn.delete_bucket(bucket_name) +def test_notification_configuration(): + """ test s3 notification set/get/deleter """ + notification_configuration(False) @attr('basic_test') @@ -1214,7 +1198,7 @@ def test_ps_s3_notification_errors_on_master(): conn.delete_bucket(bucket_name) -def notification_push(endpoint_type, conn, account=None): +def notification_push(endpoint_type, conn, account=None, cloudevents=False): """ test pushinging notification """ zonegroup = get_config_zonegroup() # create bucket @@ -1222,19 +1206,19 @@ def notification_push(endpoint_type, conn, account=None): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - receiver = {} - task = None host = get_ip() - s3_notification_conf = None - topic_conf = None + task = None if endpoint_type == 'http': # create random port for the http server host = get_ip_http() port = random.randint(10000, 20000) # start an http server in a separate thread - receiver = HTTPServerWithEvents((host, port)) + receiver = HTTPServerWithEvents((host, port), cloudevents=cloudevents) endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address + if cloudevents: + endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true' + else: + endpoint_args = 'push-endpoint='+endpoint_address topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -1291,6 +1275,8 @@ def notification_push(endpoint_type, conn, account=None): # create objects in the bucket number_of_objects = 100 + if cloudevents: + number_of_objects = 10 client_threads = [] etags = [] objects_size = {} @@ -1565,87 +1551,11 @@ def test_notification_push_http(): @attr('http_test') -def test_ps_s3_notification_push_cloudevents_on_master(): - """ test pushing cloudevents notification on master """ - hostname = get_ip_http() +def test_notification_push_cloudevents(): + """ test pushing cloudevents notification """ conn = connection() - zonegroup = get_config_zonegroup() - - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - number_of_objects = 10 - http_server = HTTPServerWithEvents((host, port), cloudevents=True) - - # create bucket - bucket_name = gen_bucket_name() - bucket = conn.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - - # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true' - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) - topic_arn = topic_conf.set_config() - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn, - 'Events': [] - }] - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - - # create objects in the bucket - client_threads = [] - objects_size = {} - start_time = time.time() - for i in range(number_of_objects): - content = str(os.urandom(randint(1, 1024))) - object_size = len(content) - key = bucket.new_key(str(i)) - objects_size[key.name] = object_size - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - - time_diff = time.time() - start_time - print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - - print('wait for 5sec for the messages...') - time.sleep(5) + notification_push('http', conn, cloudevents=True) - # check http receiver - keys = list(bucket.list()) - http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size) - - # delete objects from the bucket - client_threads = [] - start_time = time.time() - for key in bucket.list(): - thr = threading.Thread(target = key.delete, args=()) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - - time_diff = time.time() - start_time - print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - - print('wait for 5sec for the messages...') - time.sleep(5) - - # check http receiver - http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) - - # cleanup - topic_conf.del_config() - s3_notification_conf.del_config(notification=notification_name) - # delete the bucket - conn.delete_bucket(bucket_name) - http_server.close() @attr('http_test') @@ -1717,38 +1627,51 @@ def test_ps_s3_opaque_data_on_master(): conn.delete_bucket(bucket_name) http_server.close() -@attr('lifecycle_test') -def test_ps_s3_lifecycle_on_master(): - """ test that when object is deleted due to lifecycle policy, notification is sent on master """ - hostname = get_ip() - conn = connection() - zonegroup = get_config_zonegroup() - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - number_of_objects = 10 - http_server = HTTPServerWithEvents((host, port)) +def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_thread, rules_creator, record_events, + expected_abortion=False): + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX + host = get_ip() + task = None + port = None + if endpoint_type == 'http': + # create random port for the http server + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((host, port)) + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + endpoint_address = 'amqp://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true' + elif endpoint_type == 'kafka': + # start kafka receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) + # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' - opaque_data = 'http://1.2.3.4:8888' - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, - 'Events': ['s3:ObjectLifecycle:Expiration:*', - 's3:LifecycleExpiration:*'] - }] + 'Events': topic_events + }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1759,50 +1682,41 @@ def test_ps_s3_lifecycle_on_master(): start_time = time.time() content = 'bar' for i in range(number_of_objects): - key = bucket.new_key(obj_prefix + str(i)) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr = create_thread(bucket, obj_prefix, i, content) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time - print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + print('average time for creation + '+endpoint_type+' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') keys = list(bucket.list()) # create lifecycle policy client = boto3.client('s3', - endpoint_url='http://'+conn.host+':'+str(conn.port), - aws_access_key_id=conn.aws_access_key_id, - aws_secret_access_key=conn.aws_secret_access_key) + endpoint_url='http://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key) yesterday = datetime.date.today() - datetime.timedelta(days=1) response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, - LifecycleConfiguration={'Rules': [ - { - 'ID': 'rule1', - 'Expiration': {'Date': yesterday.isoformat()}, - 'Filter': {'Prefix': obj_prefix}, - 'Status': 'Enabled', - } - ] - } - ) + LifecycleConfiguration={'Rules': rules_creator(yesterday, obj_prefix)} + ) # start lifecycle processing admin(['lc', 'process'], get_config_cluster()) print('wait for 20s for the lifecycle...') time.sleep(20) + print('wait for sometime for the messages...') no_keys = list(bucket.list()) wait_for_queue_to_drain(topic_name, http_port=port) assert_equal(len(no_keys), 0) event_keys = [] - events = http_server.get_and_reset_events() - assert number_of_objects * 2 <= len(events) + events = receiver.get_and_reset_events() + if not expected_abortion: + assert number_of_objects * 2 <= len(events) for event in events: - assert_in(event['Records'][0]['eventName'], - ['LifecycleExpiration:Delete', - 'ObjectLifecycle:Expiration:Current']) + assert_in(event['Records'][0]['eventName'], record_events) event_keys.append(event['Records'][0]['s3']['object']['key']) for key in keys: key_found = False @@ -1818,99 +1732,80 @@ def test_ps_s3_lifecycle_on_master(): # cleanup for key in keys: key.delete() - [thr.join() for thr in client_threads] + if not expected_abortion: + [thr.join() for thr in client_threads] topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket conn.delete_bucket(bucket_name) - http_server.close() + receiver.close(task) + + +def rules_creator(yesterday, obj_prefix): + return [ + { + 'ID': 'rule1', + 'Expiration': {'Date': yesterday.isoformat()}, + 'Filter': {'Prefix': obj_prefix}, + 'Status': 'Enabled', + } + ] + + +def create_thread(bucket, obj_prefix, i, content): + key = bucket.new_key(obj_prefix + str(i)) + return threading.Thread(target = set_contents_from_string, args=(key, content,)) + + +@attr('http_test') +def test_lifecycle_http(): + """ test that when object is deleted due to lifecycle policy, http endpoint """ + + conn = connection() + lifecycle('http', conn, 10, ['s3:ObjectLifecycle:Expiration:*', 's3:LifecycleExpiration:*'], create_thread, + rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current']) + + +@attr('kafka_test') +def test_lifecycle_kafka(): + """ test that when object is deleted due to lifecycle policy, kafka endpoint """ + + conn = connection() + lifecycle('kafka', conn, 10, ['s3:ObjectLifecycle:Expiration:*', 's3:LifecycleExpiration:*'], create_thread, + rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current']) + def start_and_abandon_multipart_upload(bucket, key_name, content): try: mp = bucket.initiate_multipart_upload(key_name) part_data = io.StringIO(content) mp.upload_part_from_file(part_data, 1) - # mp.complete_upload() except Exception as e: print('Error: ' + str(e)) -@attr('lifecycle_test') -def test_ps_s3_lifecycle_abort_mpu_on_master(): - """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """ - hostname = get_ip() - conn = connection() - zonegroup = get_config_zonegroup() - - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - http_server = HTTPServerWithEvents((host, port)) - - # create bucket - bucket_name = gen_bucket_name() - bucket = conn.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - - # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' - opaque_data = 'http://1.2.3.4:8888' - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) - topic_arn = topic_conf.set_config() - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn, - 'Events': ['s3:ObjectLifecycle:Expiration:*'] - }] - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - - # start and abandon a multpart upload - obj_prefix = 'ooo' - content = 'bar' - key_name = obj_prefix + str(1) - thr = threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,)) - thr.start() - thr.join() +@attr('http_test') +def test_lifecycle_abort_mpu(): + """ test that when a multipart upload is aborted by lifecycle policy, http endpoint """ - # create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect - client = boto3.client('s3', - endpoint_url='http://'+conn.host+':'+str(conn.port), - aws_access_key_id=conn.aws_access_key_id, - aws_secret_access_key=conn.aws_secret_access_key) - response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, - LifecycleConfiguration={'Rules': [ - { - 'ID': 'abort1', - 'Filter': {'Prefix': obj_prefix}, - 'Status': 'Enabled', - 'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1}, - } - ] - } - ) + def rules_creator(yesterday, obj_prefix): + return [ + { + 'ID': 'abort1', + 'Filter': {'Prefix': obj_prefix}, + 'Status': 'Enabled', + 'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1}, + } + ] - # start lifecycle processing - admin(['lc', 'process'], get_config_cluster()) - print('wait for 20s for the lifecycle...') - time.sleep(20) + def create_thread(bucket, obj_prefix, i, content): + key_name = obj_prefix + str(i) + return threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,)) - wait_for_queue_to_drain(topic_name, http_port=port) - events = http_server.get_and_reset_events() - for event in events: - assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMPU') - assert key_name in event['Records'][0]['s3']['object']['key'] + conn = connection() + lifecycle('http', conn, 1, ['s3:ObjectLifecycle:Expiration:*'], create_thread, rules_creator, + ['ObjectLifecycle:Expiration:AbortMultipartUpload'], True) - # cleanup - topic_conf.del_config() - s3_notification_conf.del_config(notification=notification_name) - # delete the bucket - conn.delete_bucket(bucket_name) - http_server.close() def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): """ test object creation s3 notifications in using put/copy/post on master""" @@ -2219,35 +2114,49 @@ def test_http_post_object_upload(): conn1.delete_bucket(Bucket=bucket_name) -@attr('mpu_test') -def test_ps_s3_multipart_on_master_http(): - """ test http multipart object upload on master""" - conn = connection() - zonegroup = 'default' - - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - http_server = HTTPServerWithEvents((host, port)) +def multipart_endpoint_agnostic(endpoint_type, conn): + hostname = get_ip() + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX + host = get_ip() + task = None + if endpoint_type == 'http': + # create random port for the http server + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((hostname, port)) + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + endpoint_address = 'amqp://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker' + elif endpoint_type == 'kafka': + # start amqp receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) + # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address - opaque_data = 'http://1.2.3.4:8888' - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn, + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -2256,7 +2165,7 @@ def test_ps_s3_multipart_on_master_http(): client_threads = [] content = str(os.urandom(20*1024*1024)) key = bucket.new_key('obj') - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr = threading.Thread(target=set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] @@ -2266,133 +2175,75 @@ def test_ps_s3_multipart_on_master_http(): # check http receiver keys = list(bucket.list()) - print('total number of objects: ' + str(len(keys))) - events = http_server.get_and_reset_events() - for event in events: - assert_equal(event['Records'][0]['opaqueData'], opaque_data) - assert_true(event['Records'][0]['s3']['object']['eTag'] != '') + receiver.verify_s3_events(keys, exact_match=True, deletions=False) # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete objects for key in keys: key.delete() - [thr.join() for thr in client_threads] - topic_conf.del_config() - s3_notification_conf.del_config(notification=notification_name) # delete the bucket conn.delete_bucket(bucket_name) - http_server.close() - + receiver.close(task) -@attr('amqp_test') -def test_ps_s3_multipart_on_master(): - """ test multipart object upload on master""" - hostname = get_ip() +@attr('http_test') +def test_multipart_http(): + """ test http multipart object upload """ conn = connection() - zonegroup = get_config_zonegroup() + multipart_endpoint_agnostic('http', conn) - # create bucket - bucket_name = gen_bucket_name() - bucket = conn.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - - # start amqp receivers - exchange = 'ex1' - task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1') - task1.start() - task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2') - task2.start() - task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3') - task3.start() - - # create s3 topics - endpoint_address = 'amqp://' + hostname - endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' - topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) - topic_arn1 = topic_conf1.set_config() - topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) - topic_arn2 = topic_conf2.set_config() - topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args) - topic_arn3 = topic_conf3.set_config() - - # create s3 notifications - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, - 'Events': ['s3:ObjectCreated:*'] - }, - {'Id': notification_name+'_2', 'TopicArn': topic_arn2, - 'Events': ['s3:ObjectCreated:Post'] - }, - {'Id': notification_name+'_3', 'TopicArn': topic_arn3, - 'Events': ['s3:ObjectCreated:CompleteMultipartUpload'] - }] - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - # create objects in the bucket using multi-part upload - fp = tempfile.NamedTemporaryFile(mode='w+b') - object_size = 1024 - content = bytearray(os.urandom(object_size)) - fp.write(content) - fp.flush() - fp.seek(0) - uploader = bucket.initiate_multipart_upload('multipart') - uploader.upload_part_from_file(fp, 1) - uploader.complete_upload() - fp.close() - - print('wait for 5sec for the messages...') - time.sleep(5) - - # check amqp receiver - events = receiver1.get_and_reset_events() - assert_equal(len(events), 1) - - events = receiver2.get_and_reset_events() - assert_equal(len(events), 0) - - events = receiver3.get_and_reset_events() - assert_equal(len(events), 1) - assert_equal(events[0]['Records'][0]['eventName'], 'ObjectCreated:CompleteMultipartUpload') - assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3') - assert_equal(events[0]['Records'][0]['s3']['object']['size'], object_size) - assert events[0]['Records'][0]['eventTime'] != '0.000000', 'invalid eventTime' +@attr('kafka_test') +def test_multipart_kafka(): + """ test kafka multipart object upload """ + conn = connection() + multipart_endpoint_agnostic('kafka', conn) - # cleanup - stop_amqp_receiver(receiver1, task1) - stop_amqp_receiver(receiver2, task2) - stop_amqp_receiver(receiver3, task3) - s3_notification_conf.del_config() - topic_conf1.del_config() - topic_conf2.del_config() - topic_conf3.del_config() - for key in bucket.list(): - key.delete() - # delete the bucket - conn.delete_bucket(bucket_name) @attr('amqp_test') -def test_ps_s3_metadata_filter_on_master(): - """ test s3 notification of metadata on master """ - - hostname = get_ip() +def test_multipart_ampq(): + """ test ampq multipart object upload """ conn = connection() - zonegroup = get_config_zonegroup() + multipart_endpoint_agnostic('ampq', conn) + +def metadata_filter(endpoint_type, conn): # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # start amqp receivers - exchange = 'ex1' - task, receiver = create_amqp_receiver_thread(exchange, topic_name) - task.start() + # start endpoint receiver + host = get_ip() + task = None + port = None + if endpoint_type == 'http': + # create random port for the http server + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((host, port)) + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + endpoint_address = 'amqp://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable&persistent=true' + elif endpoint_type == 'kafka': + # start kafka receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) # create s3 topic - endpoint_address = 'amqp://' + hostname - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' + zonegroup = get_config_zonegroup() topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -2449,8 +2300,8 @@ def test_ps_s3_metadata_filter_on_master(): fp.close() expected_keys.append(key_name) - print('wait for 5sec for the messages...') - time.sleep(5) + print('wait for the messages...') + wait_for_queue_to_drain(topic_name, http_port=port) # check amqp receiver events = receiver.get_and_reset_events() assert_equal(len(events), len(expected_keys)) @@ -2460,22 +2311,43 @@ def test_ps_s3_metadata_filter_on_master(): # delete objects for key in bucket.list(): key.delete() - print('wait for 5sec for the messages...') - time.sleep(5) - # check amqp receiver + print('wait for the messages...') + wait_for_queue_to_drain(topic_name, http_port=port) + # check endpoint receiver events = receiver.get_and_reset_events() assert_equal(len(events), len(expected_keys)) for event in events: assert(event['Records'][0]['s3']['object']['key'] in expected_keys) # cleanup - stop_amqp_receiver(receiver, task) + receiver.close(task) s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) +@attr('kafka_test') +def test_metadata_filter_kafka(): + """ test notification of filtering metadata, kafka endpoint """ + conn = connection() + metadata_filter('kafka', conn) + + +@attr('http_test') +def test_metadata_filter_http(): + """ test notification of filtering metadata, http endpoint """ + conn = connection() + metadata_filter('http', conn) + + +@attr('amqp_test') +def test_metadata_filter_ampq(): + """ test notification of filtering metadata, ampq endpoint """ + conn = connection() + metadata_filter('amqp', conn) + + @attr('amqp_test') def test_ps_s3_metadata_on_master(): """ test s3 notification of metadata on master """ @@ -3725,9 +3597,8 @@ def persistent_notification(endpoint_type, conn, account=None): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - receiver = {} - task = None host = get_ip() + task = None if endpoint_type == 'http': # create random port for the http server host = get_ip_http() -- 2.39.5