]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw/noitifications: fix test names 61103/head
authorYuval Lifshitz <ylifshit@ibm.com>
Mon, 16 Dec 2024 17:16:36 +0000 (17:16 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 30 Dec 2024 12:55:00 +0000 (12:55 +0000)
for persistent topic stats tests

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/test/rgw/bucket_notification/test_bn.py

index 83d66b77b4c63f0b5beecc15b74caf98615ae074..665fbca74949b34e9250594bce2e6de819bb9483 100644 (file)
@@ -3005,7 +3005,6 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
     log.info('waited for %ds for queue %s to drain', time_diff, topic_name)
 
 
-@attr('kafka_test')
 def persistent_topic_stats(conn, endpoint_type):
     zonegroup = get_config_zonegroup()
 
@@ -3017,12 +3016,13 @@ def persistent_topic_stats(conn, endpoint_type):
     host = get_ip()
     task = None
     port = None
+    wrong_port = 1234
+    endpoint_address = endpoint_type+'://'+host+':'+str(wrong_port)
     if endpoint_type == 'http':
         # create random port for the http server
         port = random.randint(10000, 20000)
         # start an http server in a separate thread
         receiver = HTTPServerWithEvents((host, port))
-        endpoint_address = 'http://'+host+':'+str(port)
         endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
                         '&retry_sleep_duration=1'
     elif endpoint_type == 'amqp':
@@ -3030,23 +3030,18 @@ def persistent_topic_stats(conn, endpoint_type):
         exchange = 'ex1'
         task, receiver = create_amqp_receiver_thread(exchange, topic_name)
         task.start()
-        endpoint_address = 'amqp://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
     elif endpoint_type == 'kafka':
         # start kafka receiver
         task, receiver = create_kafka_receiver_thread(topic_name)
         task.start()
-        endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
     else:
         return SkipTest('Unknown endpoint type: ' + endpoint_type)
 
     # create s3 topic
-    endpoint_address = 'kafka://' + host + ':1234' # wrong port
-    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
-                    '&retry_sleep_duration=1'
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -3094,9 +3089,19 @@ def persistent_topic_stats(conn, endpoint_type):
     get_stats_persistent_topic(topic_name, 2 * number_of_objects)
 
     # change the endpoint port
-    endpoint_address = 'kafka://' + host
-    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
-                    '&retry_sleep_duration=1'
+    if endpoint_type == 'http':
+        endpoint_address = endpoint_type+'://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'amqp':
+        endpoint_address = endpoint_type+'://'+host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'kafka':
+        endpoint_address = endpoint_type+'://'+host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
 
@@ -3111,19 +3116,26 @@ def persistent_topic_stats(conn, endpoint_type):
 
 
 @attr('http_test')
-def persistent_topic_stats_http():
+def test_persistent_topic_stats_http():
     """ test persistent topic stats, http endpoint """
     conn = connection()
     persistent_topic_stats(conn, 'http')
 
 
 @attr('kafka_test')
-def persistent_topic_stats_kafka():
+def test_persistent_topic_stats_kafka():
     """ test persistent topic stats, kafka endpoint """
     conn = connection()
     persistent_topic_stats(conn, 'kafka')
 
 
+@attr('amqp_test')
+def test_persistent_topic_stats_amqp():
+    """ test persistent topic stats, amqp endpoint """
+    conn = connection()
+    persistent_topic_stats(conn, 'amqp')
+
+
 @attr('kafka_test')
 def test_persistent_topic_dump():
     """ test persistent topic dump """