From: Kalpesh Pandya Date: Thu, 2 Sep 2021 09:14:17 +0000 (+0530) Subject: src/test: Addition of test related to idleness check X-Git-Tag: v17.1.0~852^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4eb0067c67e4e313a3ad80dac185d5018e4dec92;p=ceph.git src/test: Addition of test related to idleness check Signed-off-by: Kalpesh Pandya --- diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index ac72b1c2bc1..c40fcc9e383 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -1057,6 +1057,139 @@ def test_ps_s3_notification_push_amqp_on_master(): conn.delete_bucket(bucket_name) +@attr('manual_testing') +def test_ps_s3_notification_push_amqp_idleness_check(): + """ test pushing amqp s3 notification and checking for connection idleness """ + # return SkipTest("only used in manual testing") + hostname = get_ip() + conn = connection() + zonegroup = 'default' + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' + + # start amqp receivers + exchange = 'ex1' + task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1) + task1.start() + + # create two s3 topic + endpoint_address = 'amqp://' + hostname + # with acks from broker + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args) + topic_arn1 = topic_conf1.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check amqp receiver + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + receiver1.verify_s3_events(keys, exact_match=True) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check amqp receiver 1 for deletions + receiver1.verify_s3_events(keys, exact_match=True, deletions=True) + + os.system("netstat -nnp | grep 5672"); + + print('waiting for 40sec for checking idleness') + time.sleep(40) + + os.system("netstat -nnp | grep 5672"); + + # do the process of uploading an object and checking for notification again + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check amqp receiver + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + receiver1.verify_s3_events(keys, exact_match=True) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check amqp receiver 1 for deletions + receiver1.verify_s3_events(keys, exact_match=True, deletions=True) + + os.system("netstat -nnp | grep 5672"); + + # cleanup + stop_amqp_receiver(receiver1, task1) + s3_notification_conf.del_config() + topic_conf1.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + + @attr('kafka_test') def test_ps_s3_notification_push_kafka_on_master(): """ test pushing kafka s3 notification on master """