]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/rgw/notify: cover case with motre than 1K topics
authorYuval Lifshitz <ylifshit@ibm.com>
Mon, 17 Nov 2025 10:04:17 +0000 (10:04 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 17 Nov 2025 16:20:31 +0000 (16:20 +0000)
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/test/rgw/bucket_notification/test_bn.py

index cd2b2857ff0d6afc2648393ee515c4275709acfb..a051d18826072440d95bd7cae92b5f5f7070fb28 100644 (file)
@@ -3,6 +3,7 @@ import json
 import tempfile
 import random
 import threading
+from concurrent.futures import ThreadPoolExecutor
 import subprocess
 import socket
 import time
@@ -1684,6 +1685,109 @@ def test_notification_push_kafka_multiple_brokers_append():
     notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
 
 
+@attr('manual_test')
+def test_1K_topics():
+    """ test creation of moe than 1K topics """
+    conn = connection()
+    zonegroup = get_config_zonegroup()
+    base_bucket_name = gen_bucket_name()
+    host = get_ip()
+    endpoint_address = 'kafka://' + host
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+    topic_count = 1200
+    thread_pool_size = 30
+    topics = []
+    notifications = []
+    buckets = []
+    log.info(f"creating {topic_count} topics, buckets and notifications")
+    print(f"creating {topic_count} topics, buckets and notifications")
+    # create topics buckets and notifications
+    def create_topic_bucket_notification(i):
+        bucket_name = base_bucket_name + str(i)
+        topic_name = bucket_name + TOPIC_SUFFIX
+        topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+        topic_arn = topic_conf.set_config()
+        bucket = conn.create_bucket(bucket_name)
+        notification_name = bucket_name + NOTIFICATION_SUFFIX
+        topic_conf_list = [{'Id': notification_name,
+                            'TopicArn': topic_arn,
+                            'Events': []
+                            }]
+        s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+        response, status = s3_notification_conf.set_config()
+        return topic_conf, bucket, s3_notification_conf
+
+    with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+        futures = [executor.submit(create_topic_bucket_notification, i) for i in range(topic_count)]
+        for i, future in enumerate(futures):
+            try:
+                topic_conf, bucket, s3_notification_conf = future.result()
+                topics.append(topic_conf)
+                buckets.append(bucket)
+                notifications.append(s3_notification_conf)
+            except Exception as e:
+                log.error(f"error creating topic/bucket/notification {i}: {e}")
+
+    log.info("creating objects in buckets")
+    print("creating objects in buckets")
+    # create objects in the buckets
+    def create_object_in_bucket(bucket_idx):
+        bucket = buckets[bucket_idx]
+        content = str(os.urandom(32))
+        key = bucket.new_key(str(bucket_idx))
+        set_contents_from_string(key, content)
+
+    with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+        futures = [executor.submit(create_object_in_bucket, i) for i in range(len(buckets))]
+        for future in futures:
+            try:
+                future.result()
+            except Exception as e:
+                log.error(f"error creating object in bucket: {e}")
+
+    log.info("deleting objects from buckets")
+    print("deleting objects from buckets")
+    # delete objects in the buckets
+    def delete_object_in_bucket(bucket_idx):
+        bucket = buckets[bucket_idx]
+        key = bucket.new_key(str(bucket_idx))
+        key.delete()
+
+    with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+        futures = [executor.submit(delete_object_in_bucket, i) for i in range(len(buckets))]
+        for future in futures:
+            try:
+                future.result()
+            except Exception as e:
+                log.error(f"error deleting object in bucket: {e}")
+
+    # cleanup
+    def cleanup_notification(notification):
+        notification.del_config()
+
+    def cleanup_topic(topic):
+        topic.del_config()
+
+    def cleanup_bucket(i):
+        bucket_name = base_bucket_name + str(i)
+        conn.delete_bucket(bucket_name)
+
+    with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+        # cleanup notifications
+        notification_futures = [executor.submit(cleanup_notification, notification) for notification in notifications]
+        # cleanup topics
+        topic_futures = [executor.submit(cleanup_topic, topic) for topic in topics]
+        # cleanup buckets
+        bucket_futures = [executor.submit(cleanup_bucket, i) for i in range(topic_count)]
+        # wait for all cleanup operations to complete
+        all_futures = notification_futures + topic_futures + bucket_futures
+        for future in all_futures:
+            try:
+                future.result()
+            except Exception as e:
+                log.error(f"error during cleanup: {e}")
+
+
 @attr('http_test')
 def test_ps_s3_notification_multi_delete_on_master():
     """ test deletion of multiple keys on master """
@@ -6134,4 +6238,4 @@ def test_persistent_sharded_topic_config_change_kafka():
     conn = connection()
     new_num_shards = random.randint(2, 10)
     default_num_shards = 11
-    persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)
\ No newline at end of file
+    persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)