From: Ali Masarwa Date: Sun, 30 Jun 2024 13:26:22 +0000 (+0300) Subject: RGW|BN : change endpoint for http failing tests X-Git-Tag: v20.0.0~1594^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5d6db362d348e5d465780da02f4d719129ed5e91;p=ceph.git RGW|BN : change endpoint for http failing tests Signed-off-by: Ali Masarwa --- diff --git a/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml index ae647df38653..462570e77275 100644 --- a/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml +++ b/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml @@ -4,5 +4,5 @@ tasks: kafka_version: 2.6.0 - notification-tests: client.0: - extra_attr: ["kafka_test"] + extra_attr: ["kafka_test", "data_path_v2_kafka_test"] rgw_server: client.0 diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index 58dbaaa14b34..b4697a6f797f 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -220,7 +220,7 @@ def run_tests(ctx, config): for client, client_config in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() - attr = ["!kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] + attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] if 'extra_attr' in client_config: attr = client_config.get('extra_attr') diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index e63bec1d5fab..54a2a0e98eec 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -2957,25 +2957,48 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non log.info('waited for %ds for queue %s to drain', time_diff, topic_name) -@attr('basic_test') -def test_ps_s3_persistent_topic_stats(): - """ test persistent topic stats """ - conn = connection() +@attr('kafka_test') +def persistent_topic_stats(conn, endpoint_type): zonegroup = get_config_zonegroup() - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX + host = get_ip() + task = None + port = None + if endpoint_type == 'http': + # create random port for the http server + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((host, port)) + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ + '&retry_sleep_duration=1' + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + endpoint_address = 'amqp://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + elif endpoint_type == 'kafka': + # start kafka receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) + # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ - '&retry_sleep_duration=1' + endpoint_address = 'kafka://' + host + ':1234' # wrong port + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -3022,8 +3045,12 @@ def test_ps_s3_persistent_topic_stats(): # topic stats get_stats_persistent_topic(topic_name, 2 * number_of_objects) - # start an http server in a separate thread - http_server = HTTPServerWithEvents((host, port)) + # change the endpoint port + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() wait_for_queue_to_drain(topic_name, http_port=port) @@ -3032,27 +3059,44 @@ def test_ps_s3_persistent_topic_stats(): topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) - http_server.close() + receiver.close(task) -@attr('basic_test') + +@attr('http_test') +def persistent_topic_stats_http(): + """ test persistent topic stats, http endpoint """ + conn = connection() + persistent_topic_stats(conn, 'http') + + +@attr('kafka_test') +def persistent_topic_stats_kafka(): + """ test persistent topic stats, kafka endpoint """ + conn = connection() + persistent_topic_stats(conn, 'kafka') + + +@attr('kafka_test') def test_persistent_topic_dump(): """ test persistent topic dump """ conn = connection() zonegroup = get_config_zonegroup() - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX + # start kafka receiver + host = get_ip() + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + + # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ - '&retry_sleep_duration=1' + endpoint_address = 'kafka://WrongHost' # wrong port + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -3103,10 +3147,14 @@ def test_persistent_topic_dump(): parsed_result = json.loads(result[0]) assert_equal(len(parsed_result), 2*number_of_objects) - # start an http server in a separate thread - http_server = HTTPServerWithEvents((host, port)) + # change the endpoint port + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() - wait_for_queue_to_drain(topic_name, http_port=port) + wait_for_queue_to_drain(topic_name,) result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) assert_equal(result[1], 0) @@ -3118,7 +3166,7 @@ def test_persistent_topic_dump(): topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) - http_server.close() + receiver.close(task) def ps_s3_persistent_topic_configs(persistency_time, config_dict): @@ -3653,33 +3701,50 @@ def test_ps_s3_persistent_multiple_gateways(): http_server.close() -@attr('http_test') -def test_ps_s3_persistent_multiple_endpoints(): - """ test pushing persistent notification when one of the endpoints has error """ - conn = connection() +def persistent_topic_multiple_endpoints(conn, endpoint_type): zonegroup = get_config_zonegroup() - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - number_of_objects = 10 - http_server = HTTPServerWithEvents((host, port)) - # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX + topic_name_1 = topic_name+'_1' + + host = get_ip() + task = None + port = None + if endpoint_type == 'http': + # create random port for the http server + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((host, port)) + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ + '&retry_sleep_duration=1' + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name_1) + task.start() + endpoint_address = 'amqp://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + elif endpoint_type == 'kafka': + # start kafka receiver + task, receiver = create_kafka_receiver_thread(topic_name_1) + task.start() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) # create two s3 topics - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ - '&retry_sleep_duration=1' - topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) + topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() endpoint_address = 'http://kaboom:9999' endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ - '&retry_sleep_duration=1' + '&retry_sleep_duration=1' topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() @@ -3701,6 +3766,7 @@ def test_ps_s3_persistent_multiple_endpoints(): client_threads = [] start_time = time.time() + number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) @@ -3711,9 +3777,8 @@ def test_ps_s3_persistent_multiple_endpoints(): keys = list(bucket.list()) - wait_for_queue_to_drain(topic_name+'_1') - - http_server.verify_s3_events(keys, exact_match=True, deletions=False) + wait_for_queue_to_drain(topic_name_1, http_port=port) + receiver.verify_s3_events(keys, exact_match=True, deletions=False) # delete objects from the bucket client_threads = [] @@ -3724,9 +3789,8 @@ def test_ps_s3_persistent_multiple_endpoints(): client_threads.append(thr) [thr.join() for thr in client_threads] - wait_for_queue_to_drain(topic_name+'_1') - - http_server.verify_s3_events(keys, exact_match=True, deletions=True) + wait_for_queue_to_drain(topic_name_1, http_port=port) + receiver.verify_s3_events(keys, exact_match=True, deletions=True) # cleanup s3_notification_conf1.del_config() @@ -3734,7 +3798,22 @@ def test_ps_s3_persistent_multiple_endpoints(): s3_notification_conf2.del_config() topic_conf2.del_config() conn.delete_bucket(bucket_name) - http_server.close() + receiver.close(task) + + +@attr('http_test') +def test_persistent_multiple_endpoints_http(): + """ test pushing persistent notification when one of the endpoints has error, http endpoint """ + conn = connection() + persistent_topic_multiple_endpoints(conn, 'http') + + +@attr('kafka_test') +def test_persistent_multiple_endpoints_kafka(): + """ test pushing persistent notification when one of the endpoints has error, kafka endpoint """ + conn = connection() + persistent_topic_multiple_endpoints(conn, 'kafka') + def persistent_notification(endpoint_type, conn, account=None): """ test pushing persistent notification """ @@ -4668,18 +4747,12 @@ def test_persistent_ps_s3_reload(): http_server.close() -@attr('data_path_v2_test') -def test_persistent_ps_s3_data_path_v2_migration(): +def persistent_data_path_v2_migration(conn, endpoint_type): """ test data path v2 persistent migration """ if get_config_cluster() == 'noname': return SkipTest('realm is needed for migration test') - conn = connection() zonegroup = get_config_zonegroup() - # create random port for the http server - host = get_ip() - http_port = random.randint(10000, 20000) - # disable v2 notification zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) @@ -4688,10 +4761,35 @@ def test_persistent_ps_s3_data_path_v2_migration(): bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic - endpoint_address = 'http://'+host+':'+str(http_port) - endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ - '&retry_sleep_duration=1' + host = get_ip() + task = None + port = None + if endpoint_type == 'http': + # create random port for the http server + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((host, port)) + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \ + '&retry_sleep_duration=1' + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + endpoint_address = 'amqp://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + elif endpoint_type == 'kafka': + # start kafka receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ + '&retry_sleep_duration=1' + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -4758,14 +4856,11 @@ def test_persistent_ps_s3_data_path_v2_migration(): # topic stats get_stats_persistent_topic(topic_name, 2 * number_of_objects) - # start an http server in a separate thread - http_server = HTTPServerWithEvents((host, http_port)) - - wait_for_queue_to_drain(topic_name, http_port=http_port) + wait_for_queue_to_drain(topic_name) # verify events keys = list(bucket.list()) # exact match is false because the notifications are persistent. - http_server.verify_s3_events(keys, exact_match=False) + receiver.verify_s3_events(keys, exact_match=False) except Exception as e: assert False, str(e) @@ -4782,8 +4877,21 @@ def test_persistent_ps_s3_data_path_v2_migration(): [thr.join() for thr in client_threads] # delete the bucket conn.delete_bucket(bucket_name) - if http_server: - http_server.close() + receiver.close(task) + + +@attr('data_path_v2_test') +def persistent_data_path_v2_migration_http(): + """ test data path v2 persistent migration, http endpoint """ + conn = connection() + persistent_data_path_v2_migration(conn, 'http') + + +@attr('data_path_v2_kafka_test') +def persistent_data_path_v2_migration_kafka(): + """ test data path v2 persistent migration, kafka endpoint """ + conn = connection() + persistent_data_path_v2_migration(conn, 'kafka') @attr('data_path_v2_test')