import tempfile
import random
import threading
+from concurrent.futures import ThreadPoolExecutor
import subprocess
import socket
import time
notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
+@attr('manual_test')
+def test_1K_topics():
+ """ test creation of moe than 1K topics """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+ base_bucket_name = gen_bucket_name()
+ host = get_ip()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+ topic_count = 1200
+ thread_pool_size = 30
+ topics = []
+ notifications = []
+ buckets = []
+ log.info(f"creating {topic_count} topics, buckets and notifications")
+ print(f"creating {topic_count} topics, buckets and notifications")
+ # create topics buckets and notifications
+ def create_topic_bucket_notification(i):
+ bucket_name = base_bucket_name + str(i)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ bucket = conn.create_bucket(bucket_name)
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ return topic_conf, bucket, s3_notification_conf
+
+ with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+ futures = [executor.submit(create_topic_bucket_notification, i) for i in range(topic_count)]
+ for i, future in enumerate(futures):
+ try:
+ topic_conf, bucket, s3_notification_conf = future.result()
+ topics.append(topic_conf)
+ buckets.append(bucket)
+ notifications.append(s3_notification_conf)
+ except Exception as e:
+ log.error(f"error creating topic/bucket/notification {i}: {e}")
+
+ log.info("creating objects in buckets")
+ print("creating objects in buckets")
+ # create objects in the buckets
+ def create_object_in_bucket(bucket_idx):
+ bucket = buckets[bucket_idx]
+ content = str(os.urandom(32))
+ key = bucket.new_key(str(bucket_idx))
+ set_contents_from_string(key, content)
+
+ with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+ futures = [executor.submit(create_object_in_bucket, i) for i in range(len(buckets))]
+ for future in futures:
+ try:
+ future.result()
+ except Exception as e:
+ log.error(f"error creating object in bucket: {e}")
+
+ log.info("deleting objects from buckets")
+ print("deleting objects from buckets")
+ # delete objects in the buckets
+ def delete_object_in_bucket(bucket_idx):
+ bucket = buckets[bucket_idx]
+ key = bucket.new_key(str(bucket_idx))
+ key.delete()
+
+ with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+ futures = [executor.submit(delete_object_in_bucket, i) for i in range(len(buckets))]
+ for future in futures:
+ try:
+ future.result()
+ except Exception as e:
+ log.error(f"error deleting object in bucket: {e}")
+
+ # cleanup
+ def cleanup_notification(notification):
+ notification.del_config()
+
+ def cleanup_topic(topic):
+ topic.del_config()
+
+ def cleanup_bucket(i):
+ bucket_name = base_bucket_name + str(i)
+ conn.delete_bucket(bucket_name)
+
+ with ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
+ # cleanup notifications
+ notification_futures = [executor.submit(cleanup_notification, notification) for notification in notifications]
+ # cleanup topics
+ topic_futures = [executor.submit(cleanup_topic, topic) for topic in topics]
+ # cleanup buckets
+ bucket_futures = [executor.submit(cleanup_bucket, i) for i in range(topic_count)]
+ # wait for all cleanup operations to complete
+ all_futures = notification_futures + topic_futures + bucket_futures
+ for future in all_futures:
+ try:
+ future.result()
+ except Exception as e:
+ log.error(f"error during cleanup: {e}")
+
+
@attr('http_test')
def test_ps_s3_notification_multi_delete_on_master():
""" test deletion of multiple keys on master """
conn = connection()
new_num_shards = random.randint(2, 10)
default_num_shards = 11
- persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)
\ No newline at end of file
+ persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)