From 3b959f364fed8350fd98bf889e54aa03f6f03c1d Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Mon, 17 Nov 2025 10:04:17 +0000 Subject: [PATCH] test/rgw/notify: cover case with motre than 1K topics Signed-off-by: Yuval Lifshitz --- src/test/rgw/bucket_notification/test_bn.py | 106 +++++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index cd2b2857ff0d6..a051d18826072 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -3,6 +3,7 @@ import json import tempfile import random import threading +from concurrent.futures import ThreadPoolExecutor import subprocess import socket import time @@ -1684,6 +1685,109 @@ def test_notification_push_kafka_multiple_brokers_append(): notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server)) +@attr('manual_test') +def test_1K_topics(): + """ test creation of moe than 1K topics """ + conn = connection() + zonegroup = get_config_zonegroup() + base_bucket_name = gen_bucket_name() + host = get_ip() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' + topic_count = 1200 + thread_pool_size = 30 + topics = [] + notifications = [] + buckets = [] + log.info(f"creating {topic_count} topics, buckets and notifications") + print(f"creating {topic_count} topics, buckets and notifications") + # create topics buckets and notifications + def create_topic_bucket_notification(i): + bucket_name = base_bucket_name + str(i) + topic_name = bucket_name + TOPIC_SUFFIX + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + bucket = conn.create_bucket(bucket_name) + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + return topic_conf, bucket, s3_notification_conf + + with ThreadPoolExecutor(max_workers=thread_pool_size) as executor: + futures = [executor.submit(create_topic_bucket_notification, i) for i in range(topic_count)] + for i, future in enumerate(futures): + try: + topic_conf, bucket, s3_notification_conf = future.result() + topics.append(topic_conf) + buckets.append(bucket) + notifications.append(s3_notification_conf) + except Exception as e: + log.error(f"error creating topic/bucket/notification {i}: {e}") + + log.info("creating objects in buckets") + print("creating objects in buckets") + # create objects in the buckets + def create_object_in_bucket(bucket_idx): + bucket = buckets[bucket_idx] + content = str(os.urandom(32)) + key = bucket.new_key(str(bucket_idx)) + set_contents_from_string(key, content) + + with ThreadPoolExecutor(max_workers=thread_pool_size) as executor: + futures = [executor.submit(create_object_in_bucket, i) for i in range(len(buckets))] + for future in futures: + try: + future.result() + except Exception as e: + log.error(f"error creating object in bucket: {e}") + + log.info("deleting objects from buckets") + print("deleting objects from buckets") + # delete objects in the buckets + def delete_object_in_bucket(bucket_idx): + bucket = buckets[bucket_idx] + key = bucket.new_key(str(bucket_idx)) + key.delete() + + with ThreadPoolExecutor(max_workers=thread_pool_size) as executor: + futures = [executor.submit(delete_object_in_bucket, i) for i in range(len(buckets))] + for future in futures: + try: + future.result() + except Exception as e: + log.error(f"error deleting object in bucket: {e}") + + # cleanup + def cleanup_notification(notification): + notification.del_config() + + def cleanup_topic(topic): + topic.del_config() + + def cleanup_bucket(i): + bucket_name = base_bucket_name + str(i) + conn.delete_bucket(bucket_name) + + with ThreadPoolExecutor(max_workers=thread_pool_size) as executor: + # cleanup notifications + notification_futures = [executor.submit(cleanup_notification, notification) for notification in notifications] + # cleanup topics + topic_futures = [executor.submit(cleanup_topic, topic) for topic in topics] + # cleanup buckets + bucket_futures = [executor.submit(cleanup_bucket, i) for i in range(topic_count)] + # wait for all cleanup operations to complete + all_futures = notification_futures + topic_futures + bucket_futures + for future in all_futures: + try: + future.result() + except Exception as e: + log.error(f"error during cleanup: {e}") + + @attr('http_test') def test_ps_s3_notification_multi_delete_on_master(): """ test deletion of multiple keys on master """ @@ -6134,4 +6238,4 @@ def test_persistent_sharded_topic_config_change_kafka(): conn = connection() new_num_shards = random.randint(2, 10) default_num_shards = 11 - persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards) \ No newline at end of file + persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards) -- 2.39.5