]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/test: Addition of test related to idleness check 43025/head
authorKalpesh Pandya <kapandya@redhat.com>
Thu, 2 Sep 2021 09:14:17 +0000 (14:44 +0530)
committerKalpesh Pandya <kapandya@redhat.com>
Thu, 2 Sep 2021 11:03:06 +0000 (16:33 +0530)
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
src/test/rgw/bucket_notification/test_bn.py

index ac72b1c2bc16b370cc4b45af7bdd992759b8edab..c40fcc9e383ad12053780c9a47768994c1e62221 100644 (file)
@@ -1057,6 +1057,139 @@ def test_ps_s3_notification_push_amqp_on_master():
     conn.delete_bucket(bucket_name)
 
 
+@attr('manual_testing')
+def test_ps_s3_notification_push_amqp_idleness_check():
+    """ test pushing amqp s3 notification and checking for connection idleness """
+    # return SkipTest("only used in manual testing")
+    hostname = get_ip()
+    conn = connection()
+    zonegroup = 'default'
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+
+    # start amqp receivers
+    exchange = 'ex1'
+    task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
+    task1.start()
+
+    # create two s3 topic
+    endpoint_address = 'amqp://' + hostname
+    # with acks from broker
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
+    topic_arn1 = topic_conf1.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+                         'Events': []
+                       }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check amqp receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    receiver1.verify_s3_events(keys, exact_match=True)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check amqp receiver 1 for deletions
+    receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
+
+    os.system("netstat -nnp | grep 5672");
+
+    print('waiting for 40sec for checking idleness')
+    time.sleep(40)
+
+    os.system("netstat -nnp | grep 5672");
+
+    # do the process of uploading an object and checking for notification again
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check amqp receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    receiver1.verify_s3_events(keys, exact_match=True)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check amqp receiver 1 for deletions
+    receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
+
+    os.system("netstat -nnp | grep 5672");
+
+    # cleanup
+    stop_amqp_receiver(receiver1, task1)
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+
+
 @attr('kafka_test')
 def test_ps_s3_notification_push_kafka_on_master():
     """ test pushing kafka s3 notification on master """