]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW|BN : change endpoint for http failing tests 58408/head
authorAli Masarwa <amasarwa@redhat.com>
Sun, 30 Jun 2024 13:26:22 +0000 (16:26 +0300)
committerAli Masarwa <amasarwa@redhat.com>
Thu, 4 Jul 2024 07:06:26 +0000 (10:06 +0300)
Signed-off-by: Ali Masarwa <amasarwa@redhat.com>
qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml
qa/tasks/notification_tests.py
src/test/rgw/bucket_notification/test_bn.py

index ae647df386532a0d405f403644d35095d0724f19..462570e77275542d68a1d50952a5967fa59eaec2 100644 (file)
@@ -4,5 +4,5 @@ tasks:
       kafka_version: 2.6.0
 - notification-tests:
     client.0:
-      extra_attr: ["kafka_test"]
+      extra_attr: ["kafka_test", "data_path_v2_kafka_test"]
       rgw_server: client.0
index 58dbaaa14b344fdea333bf0563a451ee33fb04ce..b4697a6f797f07f040bdd90b72c137ecd2647502 100644 (file)
@@ -220,7 +220,7 @@ def run_tests(ctx, config):
     for client, client_config in config.items():
         (remote,) = ctx.cluster.only(client).remotes.keys()
 
-        attr = ["!kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
+        attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
 
         if 'extra_attr' in client_config:
             attr = client_config.get('extra_attr')
index e63bec1d5fab61b0ad927fb31f1ad2ac23c6c2ef..54a2a0e98eec2d519f4f76e77263e1911ca8e2e3 100644 (file)
@@ -2957,25 +2957,48 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
     log.info('waited for %ds for queue %s to drain', time_diff, topic_name)
 
 
-@attr('basic_test')
-def test_ps_s3_persistent_topic_stats():
-    """ test persistent topic stats """
-    conn = connection()
+@attr('kafka_test')
+def persistent_topic_stats(conn, endpoint_type):
     zonegroup = get_config_zonegroup()
 
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
+    host = get_ip()
+    task = None
+    port = None
+    if endpoint_type == 'http':
+        # create random port for the http server
+        port = random.randint(10000, 20000)
+        # start an http server in a separate thread
+        receiver = HTTPServerWithEvents((host, port))
+        endpoint_address = 'http://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'amqp':
+        # start amqp receiver
+        exchange = 'ex1'
+        task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+        task.start()
+        endpoint_address = 'amqp://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'kafka':
+        # start kafka receiver
+        task, receiver = create_kafka_receiver_thread(topic_name)
+        task.start()
+        endpoint_address = 'kafka://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    else:
+        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
     # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
-            '&retry_sleep_duration=1'
+    endpoint_address = 'kafka://' + host + ':1234' # wrong port
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                    '&retry_sleep_duration=1'
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -3022,8 +3045,12 @@ def test_ps_s3_persistent_topic_stats():
     # topic stats
     get_stats_persistent_topic(topic_name, 2 * number_of_objects)
 
-    # start an http server in a separate thread
-    http_server = HTTPServerWithEvents((host, port))
+    # change the endpoint port
+    endpoint_address = 'kafka://' + host
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                    '&retry_sleep_duration=1'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
 
     wait_for_queue_to_drain(topic_name, http_port=port)
 
@@ -3032,27 +3059,44 @@ def test_ps_s3_persistent_topic_stats():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    http_server.close()
+    receiver.close(task)
 
-@attr('basic_test')
+
+@attr('http_test')
+def persistent_topic_stats_http():
+    """ test persistent topic stats, http endpoint """
+    conn = connection()
+    persistent_topic_stats(conn, 'http')
+
+
+@attr('kafka_test')
+def persistent_topic_stats_kafka():
+    """ test persistent topic stats, kafka endpoint """
+    conn = connection()
+    persistent_topic_stats(conn, 'kafka')
+
+
+@attr('kafka_test')
 def test_persistent_topic_dump():
     """ test persistent topic dump """
     conn = connection()
     zonegroup = get_config_zonegroup()
 
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
+    # start kafka receiver
+    host = get_ip()
+    task, receiver = create_kafka_receiver_thread(topic_name)
+    task.start()
+
+
     # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
-            '&retry_sleep_duration=1'
+    endpoint_address = 'kafka://WrongHost' # wrong port
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                    '&retry_sleep_duration=1'
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -3103,10 +3147,14 @@ def test_persistent_topic_dump():
     parsed_result = json.loads(result[0])
     assert_equal(len(parsed_result), 2*number_of_objects)
 
-    # start an http server in a separate thread
-    http_server = HTTPServerWithEvents((host, port))
+    # change the endpoint port
+    endpoint_address = 'kafka://' + host
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                    '&retry_sleep_duration=1'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
 
-    wait_for_queue_to_drain(topic_name, http_port=port)
+    wait_for_queue_to_drain(topic_name,)
 
     result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
     assert_equal(result[1], 0)
@@ -3118,7 +3166,7 @@ def test_persistent_topic_dump():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    http_server.close()
+    receiver.close(task)
 
 
 def ps_s3_persistent_topic_configs(persistency_time, config_dict):
@@ -3653,33 +3701,50 @@ def test_ps_s3_persistent_multiple_gateways():
     http_server.close()
 
 
-@attr('http_test')
-def test_ps_s3_persistent_multiple_endpoints():
-    """ test pushing persistent notification when one of the endpoints has error """
-    conn = connection()
+def persistent_topic_multiple_endpoints(conn, endpoint_type):
     zonegroup = get_config_zonegroup()
 
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = HTTPServerWithEvents((host, port))
-
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
+    topic_name_1 = topic_name+'_1'
+
+    host = get_ip()
+    task = None
+    port = None
+    if endpoint_type == 'http':
+        # create random port for the http server
+        port = random.randint(10000, 20000)
+        # start an http server in a separate thread
+        receiver = HTTPServerWithEvents((host, port))
+        endpoint_address = 'http://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'amqp':
+        # start amqp receiver
+        exchange = 'ex1'
+        task, receiver = create_amqp_receiver_thread(exchange, topic_name_1)
+        task.start()
+        endpoint_address = 'amqp://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'kafka':
+        # start kafka receiver
+        task, receiver = create_kafka_receiver_thread(topic_name_1)
+        task.start()
+        endpoint_address = 'kafka://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    else:
+        return SkipTest('Unknown endpoint type: ' + endpoint_type)
 
     # create two s3 topics
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
-            '&retry_sleep_duration=1'
-    topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+    topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
     topic_arn1 = topic_conf1.set_config()
     endpoint_address = 'http://kaboom:9999'
     endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
-            '&retry_sleep_duration=1'
+                    '&retry_sleep_duration=1'
     topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
     topic_arn2 = topic_conf2.set_config()
 
@@ -3701,6 +3766,7 @@ def test_ps_s3_persistent_multiple_endpoints():
 
     client_threads = []
     start_time = time.time()
+    number_of_objects = 10
     for i in range(number_of_objects):
         key = bucket.new_key(str(i))
         content = str(os.urandom(1024*1024))
@@ -3711,9 +3777,8 @@ def test_ps_s3_persistent_multiple_endpoints():
 
     keys = list(bucket.list())
 
-    wait_for_queue_to_drain(topic_name+'_1')
-
-    http_server.verify_s3_events(keys, exact_match=True, deletions=False)
+    wait_for_queue_to_drain(topic_name_1, http_port=port)
+    receiver.verify_s3_events(keys, exact_match=True, deletions=False)
 
     # delete objects from the bucket
     client_threads = []
@@ -3724,9 +3789,8 @@ def test_ps_s3_persistent_multiple_endpoints():
         client_threads.append(thr)
     [thr.join() for thr in client_threads]
 
-    wait_for_queue_to_drain(topic_name+'_1')
-
-    http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+    wait_for_queue_to_drain(topic_name_1, http_port=port)
+    receiver.verify_s3_events(keys, exact_match=True, deletions=True)
 
     # cleanup
     s3_notification_conf1.del_config()
@@ -3734,7 +3798,22 @@ def test_ps_s3_persistent_multiple_endpoints():
     s3_notification_conf2.del_config()
     topic_conf2.del_config()
     conn.delete_bucket(bucket_name)
-    http_server.close()
+    receiver.close(task)
+
+
+@attr('http_test')
+def test_persistent_multiple_endpoints_http():
+    """ test pushing persistent notification when one of the endpoints has error, http endpoint """
+    conn = connection()
+    persistent_topic_multiple_endpoints(conn, 'http')
+
+
+@attr('kafka_test')
+def test_persistent_multiple_endpoints_kafka():
+    """ test pushing persistent notification when one of the endpoints has error, kafka endpoint """
+    conn = connection()
+    persistent_topic_multiple_endpoints(conn, 'kafka')
+
 
 def persistent_notification(endpoint_type, conn, account=None):
     """ test pushing persistent notification """
@@ -4668,18 +4747,12 @@ def test_persistent_ps_s3_reload():
     http_server.close()
 
 
-@attr('data_path_v2_test')
-def test_persistent_ps_s3_data_path_v2_migration():
+def persistent_data_path_v2_migration(conn, endpoint_type):
     """ test data path v2 persistent migration """
     if get_config_cluster() == 'noname':
         return SkipTest('realm is needed for migration test')
-    conn = connection()
     zonegroup = get_config_zonegroup()
 
-    # create random port for the http server
-    host = get_ip()
-    http_port = random.randint(10000, 20000)
-
     # disable v2 notification
     zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
 
@@ -4688,10 +4761,35 @@ def test_persistent_ps_s3_data_path_v2_migration():
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(http_port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
-            '&retry_sleep_duration=1'
+    host = get_ip()
+    task = None
+    port = None
+    if endpoint_type == 'http':
+        # create random port for the http server
+        port = random.randint(10000, 20000)
+        # start an http server in a separate thread
+        receiver = HTTPServerWithEvents((host, port))
+        endpoint_address = 'http://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'amqp':
+        # start amqp receiver
+        exchange = 'ex1'
+        task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+        task.start()
+        endpoint_address = 'amqp://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    elif endpoint_type == 'kafka':
+        # start kafka receiver
+        task, receiver = create_kafka_receiver_thread(topic_name)
+        task.start()
+        endpoint_address = 'kafka://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+                        '&retry_sleep_duration=1'
+    else:
+        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -4758,14 +4856,11 @@ def test_persistent_ps_s3_data_path_v2_migration():
         # topic stats
         get_stats_persistent_topic(topic_name, 2 * number_of_objects)
 
-        # start an http server in a separate thread
-        http_server = HTTPServerWithEvents((host, http_port))
-
-        wait_for_queue_to_drain(topic_name, http_port=http_port)
+        wait_for_queue_to_drain(topic_name)
         # verify events
         keys = list(bucket.list())
         # exact match is false because the notifications are persistent.
-        http_server.verify_s3_events(keys, exact_match=False)
+        receiver.verify_s3_events(keys, exact_match=False)
 
     except Exception as e:
         assert False, str(e)
@@ -4782,8 +4877,21 @@ def test_persistent_ps_s3_data_path_v2_migration():
         [thr.join() for thr in client_threads]
         # delete the bucket
         conn.delete_bucket(bucket_name)
-        if http_server:
-            http_server.close()
+        receiver.close(task)
+
+
+@attr('data_path_v2_test')
+def persistent_data_path_v2_migration_http():
+    """ test data path v2 persistent migration, http endpoint """
+    conn = connection()
+    persistent_data_path_v2_migration(conn, 'http')
+
+
+@attr('data_path_v2_kafka_test')
+def persistent_data_path_v2_migration_kafka():
+    """ test data path v2 persistent migration, kafka endpoint """
+    conn = connection()
+    persistent_data_path_v2_migration(conn, 'kafka')
 
 
 @attr('data_path_v2_test')