]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw/notifications: fix lifecycle tests
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 24 Apr 2024 10:03:35 +0000 (10:03 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Sun, 12 May 2024 10:39:15 +0000 (10:39 +0000)
* tests were passing only because they were not performings their asserts
* tests are now separated with their own attribute
* their topics are now marked "persistent" to workaround the issue in:
  https://tracker.ceph.com/issues/65645

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/test/rgw/bucket_notification/test_bn.py

index 6363d9b84ec0074932db6f8aee3dfa11c870e885..49c7ee53989eb2dc37a2d757f0c418ac5338a725 100644 (file)
@@ -1678,7 +1678,7 @@ def test_ps_s3_opaque_data_on_master():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
-@attr('http_test')
+@attr('lifecycle_test')
 def test_ps_s3_lifecycle_on_master():
     """ test that when object is deleted due to lifecycle policy, notification is sent on master """
     hostname = get_ip()
@@ -1699,7 +1699,7 @@ def test_ps_s3_lifecycle_on_master():
 
     # create s3 topic
     endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address
+    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
     opaque_data = 'http://1.2.3.4:8888'
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
     topic_arn = topic_conf.set_config()
@@ -1729,6 +1729,8 @@ def test_ps_s3_lifecycle_on_master():
     time_diff = time.time() - start_time
     print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
+    keys = list(bucket.list())
+
     # create lifecycle policy
     client = boto3.client('s3',
             endpoint_url='http://'+conn.host+':'+str(conn.port),
@@ -1749,15 +1751,15 @@ def test_ps_s3_lifecycle_on_master():
 
     # start lifecycle processing
     admin(['lc', 'process'], get_config_cluster())
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
+    print('wait for 20s for the lifecycle...')
+    time.sleep(20)
 
-    # check http receiver does not have messages
-    keys = list(bucket.list())
-    print('total number of objects: ' + str(len(keys)))
+    no_keys = list(bucket.list())
+    wait_for_queue_to_drain(topic_name)
+    assert_equal(len(no_keys), 0)
     event_keys = []
     events = http_server.get_and_reset_events()
-    assert_equal(number_of_objects * 2, len(events))
+    assert number_of_objects * 2 <= len(events)
     for event in events:
         assert_in(event['Records'][0]['eventName'],
                   ['LifecycleExpiration:Delete',
@@ -1766,7 +1768,7 @@ def test_ps_s3_lifecycle_on_master():
     for key in keys:
         key_found = False
         for event_key in event_keys:
-            if event_key == key:
+            if event_key == key.name:
                 key_found = True
                 break
         if not key_found:
@@ -1793,7 +1795,7 @@ def start_and_abandon_multipart_upload(bucket, key_name, content):
     except Exception as e:
         print('Error: ' + str(e))
 
-@attr('http_test')
+@attr('lifecycle_test')
 def test_ps_s3_lifecycle_abort_mpu_on_master():
     """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
     hostname = get_ip()
@@ -1804,7 +1806,6 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
     host = get_ip()
     port = random.randint(10000, 20000)
     # start an http server in a separate thread
-    number_of_objects = 1
     http_server = HTTPServerWithEvents((host, port))
 
     # create bucket
@@ -1814,7 +1815,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
 
     # create s3 topic
     endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address
+    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
     opaque_data = 'http://1.2.3.4:8888'
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
     topic_arn = topic_conf.set_config()
@@ -1829,9 +1830,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
     assert_equal(status/100, 2)
 
     # start and abandon a multpart upload
-    # create objects in the bucket
     obj_prefix = 'ooo'
-    start_time = time.time()
     content = 'bar'
 
     key_name = obj_prefix + str(1)
@@ -1839,9 +1838,6 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
     thr.start()
     thr.join()    
 
-    time_diff = time.time() - start_time
-    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-    
     # create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect
     client = boto3.client('s3',
             endpoint_url='http://'+conn.host+':'+str(conn.port),
@@ -1861,32 +1857,16 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
 
     # start lifecycle processing
     admin(['lc', 'process'], get_config_cluster())
-    print('wait for 20s (2 days) for the messages...')
+    print('wait for 20s for the lifecycle...')
     time.sleep(20)
 
-    # check http receiver does not have messages
-    keys = list(bucket.list())
-    print('total number of objects: ' + str(len(keys)))
-    event_keys = []
+    wait_for_queue_to_drain(topic_name)
     events = http_server.get_and_reset_events()
     for event in events:
-        # I hope Boto doesn't gak on the unknown eventName
-        assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMultipartUpload')
-        event_keys.append(event['Records'][0]['s3']['object']['key'])
-    for key in keys:
-        key_found = False
-        for event_key in event_keys:
-            if event_key == key:
-                key_found = True
-                break
-        if not key_found:
-            err = 'no lifecycle event found for key: ' + str(key)
-            log.error(events)
-            assert False, err
+        assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMPU')
+        assert key_name in event['Records'][0]['s3']['object']['key']
 
     # cleanup
-    for key in keys:
-        key.delete()
     topic_conf.del_config()
     s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket