]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: test refactoring 64662/head
authorYuval Lifshitz <ylifshit@ibm.com>
Thu, 3 Jul 2025 16:57:39 +0000 (16:57 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Tue, 5 Aug 2025 16:26:38 +0000 (16:26 +0000)
* kafka: pass full broker list to consumer in tests
* kafka: use ip instead of localhost
* kafka: make sure topic exists before consumer start
* kafka: fix zookeeper and broker conf in tests
* kafka: verify receiver in the test
* kafka: tests were not running (Fixes: https://tracker.ceph.com/issues/72240)
* kafka: failover tests were failing (Fixes: https://tracker.ceph.com/issues/71585)
* simplify basic tests run command
* v2 migration tests were not running
* fix failing migration tests

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
15 files changed:
qa/suites/rgw/notifications/tasks/basic/+ [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/basic/.qa [new symlink]
qa/suites/rgw/notifications/tasks/basic/0-install.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/basic/supported-distros [new symlink]
qa/suites/rgw/notifications/tasks/basic/test_basic.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml
qa/suites/rgw/notifications/tasks/others/+ [deleted file]
qa/suites/rgw/notifications/tasks/others/.qa [deleted symlink]
qa/suites/rgw/notifications/tasks/others/0-install.yaml [deleted file]
qa/suites/rgw/notifications/tasks/others/supported-distros [deleted symlink]
qa/suites/rgw/notifications/tasks/others/test_others.yaml [deleted file]
qa/tasks/kafka_failover.py
qa/tasks/notification_tests.py
src/test/rgw/bucket_notification/api.py
src/test/rgw/bucket_notification/test_bn.py

diff --git a/qa/suites/rgw/notifications/tasks/basic/+ b/qa/suites/rgw/notifications/tasks/basic/+
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/rgw/notifications/tasks/basic/.qa b/qa/suites/rgw/notifications/tasks/basic/.qa
new file mode 120000 (symlink)
index 0000000..a602a03
--- /dev/null
@@ -0,0 +1 @@
+../.qa/
\ No newline at end of file
diff --git a/qa/suites/rgw/notifications/tasks/basic/0-install.yaml b/qa/suites/rgw/notifications/tasks/basic/0-install.yaml
new file mode 100644 (file)
index 0000000..4d5d9b8
--- /dev/null
@@ -0,0 +1,14 @@
+tasks:
+- install:
+- ceph:
+- openssl_keys:
+- rgw:
+    client.0:
+    client.1:
+
+overrides:
+  ceph:
+    conf:
+      global:
+        osd_min_pg_log_entries: 10
+        osd_max_pg_log_entries: 10
diff --git a/qa/suites/rgw/notifications/tasks/basic/supported-distros b/qa/suites/rgw/notifications/tasks/basic/supported-distros
new file mode 120000 (symlink)
index 0000000..46280a4
--- /dev/null
@@ -0,0 +1 @@
+../../.qa/distros/supported-random-distro$/
\ No newline at end of file
diff --git a/qa/suites/rgw/notifications/tasks/basic/test_basic.yaml b/qa/suites/rgw/notifications/tasks/basic/test_basic.yaml
new file mode 100644 (file)
index 0000000..793f6f4
--- /dev/null
@@ -0,0 +1,4 @@
+tasks:
+- notification-tests:
+    client.0:
+      rgw_server: client.0
index 303f98d540ea4a5ac9b44791f23561d823f9008b..4407cd3eaccc3858de213c749cd3b7579c644935 100644 (file)
@@ -4,5 +4,5 @@ tasks:
       kafka_version: 3.8.1
 - notification-tests:
     client.0:
-      extra_attr: ["kafka_test", "data_path_v2_kafka_test"]
+      extra_attr: ["kafka_test"]
       rgw_server: client.0
diff --git a/qa/suites/rgw/notifications/tasks/others/+ b/qa/suites/rgw/notifications/tasks/others/+
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/qa/suites/rgw/notifications/tasks/others/.qa b/qa/suites/rgw/notifications/tasks/others/.qa
deleted file mode 120000 (symlink)
index a602a03..0000000
+++ /dev/null
@@ -1 +0,0 @@
-../.qa/
\ No newline at end of file
diff --git a/qa/suites/rgw/notifications/tasks/others/0-install.yaml b/qa/suites/rgw/notifications/tasks/others/0-install.yaml
deleted file mode 100644 (file)
index 4d5d9b8..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-tasks:
-- install:
-- ceph:
-- openssl_keys:
-- rgw:
-    client.0:
-    client.1:
-
-overrides:
-  ceph:
-    conf:
-      global:
-        osd_min_pg_log_entries: 10
-        osd_max_pg_log_entries: 10
diff --git a/qa/suites/rgw/notifications/tasks/others/supported-distros b/qa/suites/rgw/notifications/tasks/others/supported-distros
deleted file mode 120000 (symlink)
index 46280a4..0000000
+++ /dev/null
@@ -1 +0,0 @@
-../../.qa/distros/supported-random-distro$/
\ No newline at end of file
diff --git a/qa/suites/rgw/notifications/tasks/others/test_others.yaml b/qa/suites/rgw/notifications/tasks/others/test_others.yaml
deleted file mode 100644 (file)
index 793f6f4..0000000
+++ /dev/null
@@ -1,4 +0,0 @@
-tasks:
-- notification-tests:
-    client.0:
-      rgw_server: client.0
index 3ca60ab84fcfe7113a8b9e6686da7f2d11378d2c..d1cbd72d3568e90630c75c44e6f426b30006088f 100644 (file)
@@ -25,6 +25,71 @@ def get_kafka_dir(ctx, config):
     current_version = kafka_prefix + kafka_version
     return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
 
+def zookeeper_conf(ctx, client, _id, kafka_dir):
+    conf = """
+    # zookeeper{_id}.properties
+    dataDir={tdir}/data/zookeeper{_id}
+    clientPort=218{_id}
+    maxClientCnxns=0
+    admin.enableServer=false
+    tickTime=2000
+    initLimit=10
+    syncLimit=5
+    server.1=localhost:2888:3888
+    server.2=localhost:2889:3889
+    """.format(tdir=kafka_dir, _id=_id)
+    file_name = 'zookeeper{_id}.properties'.format(_id=_id)
+    log.info("zookeeper conf file: %s", file_name)
+    log.info(conf)
+    return ctx.cluster.only(client).run(
+            args=[
+                'cd', kafka_dir, run.Raw('&&'),
+                'mkdir', '-p', 'config', run.Raw('&&'),
+                'mkdir', '-p', 'data/zookeeper{_id}'.format(_id=_id), run.Raw('&&'),
+                'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name), run.Raw('&&'),
+                'echo', str(_id), run.Raw('>'), 'data/zookeeper{_id}/myid'.format(_id=_id)
+                ],
+            )
+
+
+def broker_conf(ctx, client, _id, kafka_dir):
+    (remote,) = ctx.cluster.only(client).remotes.keys()
+    conf = """
+    # kafka{_id}.properties
+       broker.id={_id}
+    listeners=PLAINTEXT://0.0.0.0:909{_id}
+    advertised.listeners=PLAINTEXT://{ip}:909{_id}
+    log.dirs={tdir}/data/kafka-logs-{_id}
+    num.network.threads=3
+    num.io.threads=8
+    socket.send.buffer.bytes=102400
+    socket.receive.buffer.bytes=102400
+    socket.request.max.bytes=369295617
+    num.partitions=1
+    num.recovery.threads.per.data.dir=1
+    offsets.topic.replication.factor=2
+    transaction.state.log.replication.factor=2
+    transaction.state.log.min.isr=2
+    log.retention.hours=168
+    log.segment.bytes=1073741824
+    log.retention.check.interval.ms=300000
+    zookeeper.connect=localhost:2181,localhost:2182
+    zookeeper.connection.timeout.ms=18000
+    group.initial.rebalance.delay.ms=0
+    metadata.max.age.ms=3000
+    """.format(tdir=kafka_dir, _id=_id, ip=remote.ip_address)
+    file_name = 'kafka{_id}.properties'.format(_id=_id)
+    log.info("kafka conf file: %s", file_name)
+    log.info(conf)
+    return ctx.cluster.only(client).run(
+            args=[
+                'cd', kafka_dir, run.Raw('&&'),
+                'mkdir', '-p', 'config', run.Raw('&&'),
+                'mkdir', '-p', 'data', run.Raw('&&'),
+                'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name)
+                ],
+            )
+
 
 @contextlib.contextmanager
 def install_kafka(ctx, config):
@@ -59,45 +124,21 @@ def install_kafka(ctx, config):
         )
 
         kafka_dir = get_kafka_dir(ctx, config)
-        # create config for second broker
-        second_broker_config_name = "server2.properties"
-        second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
-        second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/")
-
-        ctx.cluster.only(client).run(
-            args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'), 
-             'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), 
-             'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir)
-            ],
-        )
-
-        # edit config
-        ctx.cluster.only(client).run(
-            args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
-                  'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
-                  'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
-                  'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
-                  'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name)
-            ]
-        )
+        # create config for 2 zookeepers
+        zookeeper_conf(ctx, client, 1, kafka_dir)
+        zookeeper_conf(ctx, client, 2, kafka_dir)
+        # create config for 2 brokers
+        broker_conf(ctx, client, 1, kafka_dir)
+        broker_conf(ctx, client, 2, kafka_dir)
 
     try:
         yield
     finally:
         log.info('Removing packaged dependencies of Kafka...')
-        test_dir=get_kafka_dir(ctx, config)
-        current_version = get_kafka_version(config)
+        kafka_dir=get_kafka_dir(ctx, config)
         for (client,_) in config.items():
             ctx.cluster.only(client).run(
-                args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
-            )
-
-            ctx.cluster.only(client).run(
-                args=['rm', '-rf', test_dir],
-            )
-
-            ctx.cluster.only(client).run(
-                args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)],
+                args=['rm', '-rf', '{tdir}'.format(tdir=kafka_dir)],
             )
 
 
@@ -114,32 +155,48 @@ def run_kafka(ctx,config):
         (remote,) = ctx.cluster.only(client).remotes.keys()
         kafka_dir = get_kafka_dir(ctx, config)
 
-        second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
-        second_broker_java_log_dir = "{}/java_logs".format(second_broker_data)
-
         ctx.cluster.only(client).run(
             args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
-             './zookeeper-server-start.sh',
-             '{tir}/config/zookeeper.properties'.format(tir=kafka_dir),
-             run.Raw('&'), 'exit'
+             './zookeeper-server-start.sh', '-daemon',
+             '{tdir}/config/zookeeper1.properties'.format(tdir=kafka_dir)
+            ],
+        )
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+             './zookeeper-server-start.sh', '-daemon',
+             '{tdir}/config/zookeeper2.properties'.format(tdir=kafka_dir)
             ],
         )
+        # wait for zookeepers to start
+        time.sleep(5)
+        for zk_id in [1, 2]:
+            ctx.cluster.only(client).run(
+                args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+                 './zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'],
+            )
+            zk_started = False
+            while not zk_started:
+                result = ctx.cluster.only(client).run(
+                        args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+                        './zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'],
+                        )
+                log.info("Checking if Zookeeper %d is started. Result: %s", zk_id, str(result))
+                zk_started = True
 
         ctx.cluster.only(client).run(
             args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
-             './kafka-server-start.sh',
-             '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
-             run.Raw('&'), 'exit'
+             './kafka-server-start.sh', '-daemon',
+             '{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config))
             ],
         )
-        
         ctx.cluster.only(client).run(
             args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
-             run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)), 
-             './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir),
-             run.Raw('&'), 'exit'
+             './kafka-server-start.sh', '-daemon',
+             '{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config))
             ],
         )
+        # wait for kafka to start
+        time.sleep(5)
 
     try:
         yield
@@ -151,27 +208,41 @@ def run_kafka(ctx,config):
 
             ctx.cluster.only(client).run(
                 args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
-                 './kafka-server-stop.sh',  
-                 '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+                 './kafka-server-stop.sh',
+                 '{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config)),
                 ],
             )
 
+            ctx.cluster.only(client).run(
+                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+                 './kafka-server-stop.sh',
+                 '{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config)),
+                ],
+            )
+
+            # wait for kafka to stop
             time.sleep(5)
 
             ctx.cluster.only(client).run(
-                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
+                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+                 './zookeeper-server-stop.sh',
+                 '{tir}/config/zookeeper1.properties'.format(tir=get_kafka_dir(ctx, config)),
+                ],
+            )
+            ctx.cluster.only(client).run(
+                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                  './zookeeper-server-stop.sh',
-                 '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+                 '{tir}/config/zookeeper2.properties'.format(tir=get_kafka_dir(ctx, config)),
                 ],
             )
 
+            # wait for zookeeper to stop
             time.sleep(5)
-
             ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
 
 
 @contextlib.contextmanager
-def run_admin_cmds(ctx,config):
+def run_admin_cmds(ctx, config):
     """
     Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
     """
@@ -182,9 +253,9 @@ def run_admin_cmds(ctx,config):
 
         ctx.cluster.only(client).run(
             args=[
-                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
+                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                 './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
-                '--bootstrap-server', 'localhost:9092'
+                '--bootstrap-server', 'localhost:9091,localhost:9092',
             ],
         )
 
@@ -193,7 +264,7 @@ def run_admin_cmds(ctx,config):
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                 'echo', "First", run.Raw('|'),
                 './kafka-console-producer.sh', '--topic', 'quickstart-events',
-                '--bootstrap-server', 'localhost:9092'
+                '--bootstrap-server', 'localhost:9091,localhost:9092',
             ],
         )
 
@@ -202,8 +273,7 @@ def run_admin_cmds(ctx,config):
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                 './kafka-console-consumer.sh', '--topic', 'quickstart-events',
                 '--from-beginning',
-                '--bootstrap-server', 'localhost:9092',
-                run.Raw('&'), 'exit'
+                '--bootstrap-server', 'localhost:9091,localhost:9092', '--max-messages', '1',
             ],
         )
 
index f1eae3c89c4e2fdf20540d82278992fadb0f8f8d..cc84b1575a5f1eb1b5ca372a3c79e9b5c1d21da2 100644 (file)
@@ -220,7 +220,7 @@ def run_tests(ctx, config):
     for client, client_config in config.items():
         (remote,) = ctx.cluster.only(client).remotes.keys()
 
-        attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
+        attr = ["basic_test"]
 
         if 'extra_attr' in client_config:
             attr = client_config.get('extra_attr')
@@ -291,6 +291,7 @@ def task(ctx,config):
         endpoint = ctx.rgw.role_endpoints.get(client)
         assert endpoint, 'bntests: no rgw endpoint for {}'.format(client)
 
+        cluster_name, _, _ = teuthology.split_role(client)
         bntests_conf[client] = ConfigObj(
             indent_type='',
             infile={
@@ -299,7 +300,7 @@ def task(ctx,config):
                     'port':endpoint.port,
                     'host':endpoint.dns_name,
                     'zonegroup':ctx.rgw.zonegroup,
-                    'cluster':'noname',
+                    'cluster':cluster_name,
                     'version':'v2'
                     },
                 's3 main':{}
index 63b1ec699779e593555c37f4d19981f0aecfa914..1c2c4c54b0422fc31082f26fa4e559ffcb9ba3fc 100644 (file)
@@ -247,13 +247,19 @@ def delete_all_topics(conn, tenant, cluster):
     if tenant == '':
         topics_result = admin(['topic', 'list'], cluster)
         topics_json = json.loads(topics_result[0])
-        for topic in topics_json:
-            rm_result = admin(['topic', 'rm', '--topic', topic['name']], cluster)
-            print(rm_result)
+        try:
+            for topic in topics_json['topics']:
+                admin(['topic', 'rm', '--topic', topic['name']], cluster)
+        except TypeError:
+            for topic in topics_json:
+                admin(['topic', 'rm', '--topic', topic['name']], cluster)
     else:
         topics_result = admin(['topic', 'list', '--tenant', tenant], cluster)
         topics_json = json.loads(topics_result[0])
-        for topic in topics_json:
-            rm_result = admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
-            print(rm_result)
+        try:
+            for topic in topics_json['topics']:
+                admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
+        except TypeError:
+            for topic in topics_json:
+                admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
 
index 229dc316481dfbc09fcc1deb6f82b281ef3aa18b..e27517ebd2f2cc7430be871af211f6930e548747 100644 (file)
@@ -82,6 +82,18 @@ UID_PREFIX = "superman"
 num_buckets = 0
 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
 
+
+def get_ip():
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    try:
+        # address should not be reachable
+        s.connect(('10.255.255.255', 1))
+        ip = s.getsockname()[0]
+    finally:
+        s.close()
+    return ip
+
+
 def gen_bucket_name():
     global num_buckets
 
@@ -406,45 +418,76 @@ META_PREFIX = 'x-amz-meta-'
 
 # Kafka endpoint functions
 
-kafka_server = 'localhost'
+default_kafka_server = get_ip()
 
 class KafkaReceiver(object):
     """class for receiving and storing messages on a topic from the kafka broker"""
-    def __init__(self, topic, security_type, kafka_server='localhost'):
+    def __init__(self, topic_name, security_type, kafka_server):
         from kafka import KafkaConsumer
-        remaining_retries = 10
+        from kafka.admin import KafkaAdminClient, NewTopic
+        from kafka.errors import TopicAlreadyExistsError
+        self.status = 'init'
         port = 9092
         if security_type != 'PLAINTEXT':
             security_type = 'SSL'
             port = 9093
 
         if kafka_server is None:
-            endpoint = "localhost" + ":" + str(port)
-        elif ":" not in kafka_server:
+            endpoint = default_kafka_server + ":" + str(port)
+        elif ":" not in kafka_server and len(kafka_server.split(",")) == 1:
             endpoint = kafka_server + ":" + str(port)
         else:
             endpoint = kafka_server
 
+        remaining_retries = 10
         while remaining_retries > 0:
             try:
-                self.consumer = KafkaConsumer(topic,
+                admin_client = KafkaAdminClient(
+                        bootstrap_servers=endpoint,
+                        request_timeout_ms=16000)
+                topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
+                admin_client.create_topics([topic])
+                log.info('Kafka admin created topic: %s on broker/s: %s', topic_name, endpoint)
+                break
+            except Exception as error:
+                if type(error) == TopicAlreadyExistsError:
+                    log.info('Kafka admin topic %s already exists on broker/s: %s', topic_name, endpoint)
+                    break
+                remaining_retries -= 1
+                log.warning('Kafka admin failed to create topic: %s on broker/s: %s. remaining reties: %d. error: %s',
+                            topic_name, endpoint , remaining_retries, str(error))
+                time.sleep(1)
+
+        if remaining_retries == 0:
+            raise Exception('Kafka admin failed to create topic: %s. no retries left', topic_name)
+
+        remaining_retries = 10
+        while remaining_retries > 0:
+            try:
+                self.consumer = KafkaConsumer(topic_name,
                         bootstrap_servers=endpoint,
                         security_protocol=security_type,
-                        consumer_timeout_ms=16000,
+                        metadata_max_age_ms=5000,
+                        consumer_timeout_ms=5000,
                         auto_offset_reset='earliest')
-                print('Kafka consumer created on topic: '+topic)
+                log.info('Kafka consumer connected to broker/s: %s for topic: %s', endpoint , topic_name)
+                # This forces the consumer to fetch metadata immediately
+                partitions = self.consumer.partitions_for_topic(topic)
+                log.info('Kafka consumer partitions for topic: %s are: %s', topic_name, str(partitions))
+                self.consumer.poll(timeout_ms=1000, max_records=1)
                 break
             except Exception as error:
                 remaining_retries -= 1
-                print('failed to connect to kafka (remaining retries '
-                    + str(remaining_retries) + '): ' + str(error))
+                log.warning('Kafka consumer failed to connect to broker/s: %s. for topic: %. remaining reties: %d. error: %s',
+                            endpoint, topic_name,  remaining_retries, str(error))
                 time.sleep(1)
 
         if remaining_retries == 0:
-            raise Exception('failed to connect to kafka - no retries left')
+            raise Exception('Kafka consumer failed to connect to kafka for topic: %s. no retries left', topic_name)
 
+        self.status = 'connected'
         self.events = []
-        self.topic = topic
+        self.topic = topic_name
         self.stop = False
 
     def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
@@ -463,17 +506,16 @@ class KafkaReceiver(object):
 def kafka_receiver_thread_runner(receiver):
     """main thread function for the kafka receiver"""
     try:
-        log.info('Kafka receiver started')
-        print('Kafka receiver started')
+        log.info('Kafka receiver for topic: %s started', receiver.topic)
+        receiver.status = 'running'
         while not receiver.stop:
             for msg in receiver.consumer:
                 receiver.events.append(json.loads(msg.value))
             time.sleep(0.1)
-        log.info('Kafka receiver ended')
-        print('Kafka receiver ended')
+        log.info('Kafka receiver for topic: %s ended', receiver.topic)
     except Exception as error:
-        log.info('Kafka receiver ended unexpectedly: %s', str(error))
-        print('Kafka receiver ended unexpectedly: ' + str(error))
+        log.info('Kafka receiver for topic: %s ended unexpectedly. error: %s', receiver.topic,  str(error))
+    receiver.status = 'ended'
 
 
 def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
@@ -486,23 +528,34 @@ def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers
 def stop_kafka_receiver(receiver, task):
     """stop the receiver thread and wait for it to finish"""
     receiver.stop = True
-    task.join(1)
+    task.join(5)
     try:
         receiver.consumer.unsubscribe()
         receiver.consumer.close()
+        log.info('Kafka receiver on topic: %s gracefully stopped', receiver.topic)
     except Exception as error:
-        log.info('failed to gracefully stop Kafka receiver: %s', str(error))
-
-
-def get_ip():
-    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-    try:
-        # address should not be reachable
-        s.connect(('10.255.255.255', 1))
-        ip = s.getsockname()[0]
-    finally:
-        s.close()
-    return ip
+        log.info('Kafka receiver on topic: %s failed to gracefully stop. error: %s', receiver.topic, str(error))
+
+def verify_kafka_receiver(receiver):
+    """test the kafka receiver"""
+    from kafka import KafkaProducer
+    producer = KafkaProducer(bootstrap_servers=receiver.consumer.config['bootstrap_servers'],
+                             security_protocol=receiver.consumer.config['security_protocol'])
+    producer.send(receiver.topic, value=json.dumps({'test': 'message'}).encode('utf-8'))
+    producer.flush()
+    events = []
+    remaining_retries = 10
+    while len(events) == 0:
+        log.info('Kafka receiver (in "%s" state) waiting for test event (at: %s). remaining retries: %d',
+                 receiver.status, datetime.datetime.now(), remaining_retries)
+        time.sleep(1)
+        events = receiver.get_and_reset_events()
+        remaining_retries -= 1
+        if remaining_retries == 0:
+            raise Exception('kafka receiver on topic: %s did not receive test event in time', receiver.topic)
+    assert_equal(len(events), 1)
+    assert_in('test', events[0])
+    log.info('Kafka receiver on topic: %s tested ok', receiver.topic)
 
 
 def connection(no_retries=False):
@@ -546,8 +599,8 @@ def another_user(user=None, tenant=None, account=None):
         cmd += ['--account-id', account, '--account-root']
         arn = f'arn:aws:iam::{account}:user/Superman'
 
-    _, result = admin(cmd, get_config_cluster())
-    assert_equal(result, 0)
+    _, rc = admin(cmd, get_config_cluster())
+    assert_equal(rc, 0)
 
     conn = S3Connection(aws_access_key_id=access_key,
                   aws_secret_access_key=secret_key,
@@ -561,9 +614,15 @@ def list_topics(assert_len=None, tenant=''):
         result = admin(['topic', 'list'], get_config_cluster())
     else:
         result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+    assert_equal(result[1], 0)
     parsed_result = json.loads(result[0])
-    if assert_len:
-        assert_equal(len(parsed_result), assert_len)
+    try:
+        actual_len = len(parsed_result['topics'])
+    except TypeError:
+        actual_len = len(parsed_result)
+    if assert_len and assert_len != actual_len:
+        log.error(parsed_result)
+        assert 'expected %d topics, got %d' % (assert_len, actual_len)
     return parsed_result
 
 
@@ -580,7 +639,7 @@ def get_stats_persistent_topic(topic_name, assert_entries_number=None):
             log.warning('Topic dump:')
             for entry in parsed_result:
                 log.warning(entry)
-            assert_equal(actual_number, assert_entries_number)
+            assert 'expected %d entries, got %d' % (assert_entries_number, actual_number)
     return parsed_result
 
 
@@ -611,13 +670,16 @@ def list_notifications(bucket_name, assert_len=None, tenant=''):
         result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
     else:
         result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster())
+    assert_equal(result[1], 0)
     parsed_result = json.loads(result[0])
-    if assert_len:
-        assert_equal(len(parsed_result['notifications']), assert_len)
+    actual_len = len(parsed_result['notifications'])
+    if assert_len and assert_len != actual_len:
+        log.error(parsed_result)
+        assert 'expected %d notifications, got %d' % (assert_len, actual_len)
     return parsed_result
 
 
-def get_notification(bucket_name,  notification_name, tenant=''):
+def get_notification(bucket_name, notification_name, tenant=''):
     if tenant == '':
         result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster())
     else:
@@ -661,10 +723,10 @@ def connect_random_user(tenant=''):
     secret_key = str(time.time())
     uid = UID_PREFIX + str(time.time())
     if tenant == '':
-        _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+        _, rc = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
     else:
-        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
-    assert_equal(result, 0)
+        _, rc = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+    assert_equal(rc, 0)
     conn = S3Connection(aws_access_key_id=access_key,
                         aws_secret_access_key=secret_key,
                         is_secure=False, port=get_config_port(), host=get_config_host(),
@@ -684,6 +746,7 @@ def test_ps_s3_topic_on_master():
     conn = connect_random_user(tenant)
 
     # make sure there are no leftover topics
+    delete_all_topics(conn, '', get_config_cluster())
     delete_all_topics(conn, tenant, get_config_cluster())
 
     zonegroup = get_config_zonegroup()
@@ -747,6 +810,7 @@ def test_ps_s3_topic_admin_on_master():
     conn = connect_random_user(tenant)
 
     # make sure there are no leftover topics
+    delete_all_topics(conn, '', get_config_cluster())
     delete_all_topics(conn, tenant, get_config_cluster())
 
     zonegroup = get_config_zonegroup()
@@ -792,6 +856,7 @@ def test_ps_s3_topic_admin_on_master():
     result = admin(
       ['topic', 'get', '--topic', topic_name + '_3', '--tenant', tenant],
       get_config_cluster())
+    assert_equal(result[1], 0)
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['arn'], topic_arn3)
     assert_true(all([x in parsed_result['owner'] for x in matches]))
@@ -1374,10 +1439,14 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafk
         response, status = s3_notification_conf.set_config()
         assert_equal(status/100, 2)
     elif endpoint_type == 'kafka':
-        # start amqp receiver
+        # start kafka receiver
+        default_kafka_server_and_port = default_kafka_server + ':9092'
+        if kafka_brokers is not None:
+            kafka_brokers = kafka_brokers + ',' + default_kafka_server_and_port
         task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers)
         task.start()
-        endpoint_address = 'kafka://' + kafka_server
+        verify_kafka_receiver(receiver)
+        endpoint_address = 'kafka://' + default_kafka_server_and_port
         # without acks from broker
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
         if kafka_brokers is not None:
@@ -1603,14 +1672,14 @@ def test_notification_push_kafka():
 def test_notification_push_kafka_multiple_brokers_override():
     """ test pushing kafka s3 notification on master """
     conn = connection()
-    notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092')
+    notification_push('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server))
 
 
 @attr('kafka_failover')
 def test_notification_push_kafka_multiple_brokers_append():
     """ test pushing kafka s3 notification on master """
     conn = connection()
-    notification_push('kafka', conn, kafka_brokers='localhost:19092')
+    notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
 
 
 @attr('http_test')
@@ -1796,6 +1865,7 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa
         # start kafka receiver
         task, receiver = create_kafka_receiver_thread(topic_name)
         task.start()
+        verify_kafka_receiver(receiver)
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
     else:
@@ -2286,6 +2356,7 @@ def multipart_endpoint_agnostic(endpoint_type, conn):
         # start amqp receiver
         task, receiver = create_kafka_receiver_thread(topic_name)
         task.start()
+        verify_kafka_receiver(receiver)
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
     else:
@@ -2380,6 +2451,7 @@ def metadata_filter(endpoint_type, conn):
         # start kafka receiver
         task, receiver = create_kafka_receiver_thread(topic_name)
         task.start()
+        verify_kafka_receiver(receiver)
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
     else:
@@ -3044,6 +3116,7 @@ def persistent_topic_stats(conn, endpoint_type):
         # start kafka receiver
         task, receiver = create_kafka_receiver_thread(topic_name)
         task.start()
+        verify_kafka_receiver(receiver)
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
     else:
@@ -3159,7 +3232,7 @@ def test_persistent_topic_dump():
     host = get_ip()
     task, receiver = create_kafka_receiver_thread(topic_name)
     task.start()
-
+    verify_kafka_receiver(receiver)
 
     # create s3 topic
     endpoint_address = 'kafka://WrongHost' # wrong port
@@ -3211,7 +3284,6 @@ def test_persistent_topic_dump():
     # topic stats
     result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
     assert_equal(result[1], 0)
-    print(result[0])
     parsed_result = json.loads(result[0])
     assert_equal(len(parsed_result), 2*number_of_objects)
 
@@ -3452,9 +3524,10 @@ def test_ps_s3_notification_kafka_idle_behaviour():
 
     task, receiver = create_kafka_receiver_thread(topic_name+'_1')
     task.start()
+    verify_kafka_receiver(receiver)
 
     # create s3 topic
-    endpoint_address = 'kafka://' + kafka_server
+    endpoint_address = 'kafka://' + default_kafka_server
     # with acks from broker
     endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
     topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
@@ -3799,6 +3872,7 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type):
         # start kafka receiver
         task, receiver = create_kafka_receiver_thread(topic_name_1)
         task.start()
+        verify_kafka_receiver(receiver)
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
@@ -3911,6 +3985,7 @@ def persistent_notification(endpoint_type, conn, account=None):
         # start kafka receiver
         task, receiver = create_kafka_receiver_thread(topic_name)
         task.start()
+        verify_kafka_receiver(receiver)
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
     else:
@@ -3988,8 +4063,8 @@ def test_ps_s3_persistent_notification_http_account():
     account = 'RGW77777777777777777'
     user = UID_PREFIX + 'test'
 
-    _, result = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster())
-    assert_true(result in [0, 17]) # EEXIST okay if we rerun
+    _, rc = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster())
+    assert_true(rc in [0, 17]) # EEXIST okay if we rerun
 
     conn, _ = another_user(user=user, account=account)
     try:
@@ -4405,7 +4480,7 @@ def test_ps_s3_multiple_topics_notification():
     http_server.close()
 
 
-@attr('data_path_v2_test')
+@attr('basic_test')
 def test_ps_s3_list_topics_migration():
     """ test list topics on migration"""
     if get_config_cluster() == 'noname':
@@ -4571,7 +4646,7 @@ def test_ps_s3_list_topics():
         tenant_topic_conf.del_config(tenant_topic_arn1)
         tenant_topic_conf.del_config(tenant_topic_arn2)
 
-@attr('data_path_v2_test')
+@attr('basic_test')
 def test_ps_s3_list_topics_v1():
     """ test list topics on v1"""
     if get_config_cluster() == 'noname':
@@ -4851,13 +4926,13 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
     # create s3 topic
     if security_type == 'SASL_SSL':
         if not use_topic_attrs_for_creds:
-            endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+            endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9094'
         else:
-            endpoint_address = 'kafka://' + kafka_server + ':9094'
+            endpoint_address = 'kafka://' + default_kafka_server + ':9094'
     elif security_type == 'SSL':
-        endpoint_address = 'kafka://' + kafka_server + ':9093'
+        endpoint_address = 'kafka://' + default_kafka_server + ':9093'
     elif security_type == 'SASL_PLAINTEXT':
-        endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9095'
+        endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9095'
     else:
         assert False, 'unknown security method '+security_type
 
@@ -4877,6 +4952,7 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
     # create consumer on the topic
     task, receiver = create_kafka_receiver_thread(topic_name)
     task.start()
+    verify_kafka_receiver(receiver)
 
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -5064,6 +5140,24 @@ def test_persistent_ps_s3_reload():
     http_server.close()
 
 
+def poll_on_topic(topic_name, tenant=''):
+    remaining_retries = 10
+    start_time = time.time()
+    while True:
+        result = remove_topic(topic_name, tenant, allow_failure=True)
+        time_diff = time.time() - start_time
+        if result == 0:
+            log.info('migration took %d seconds', time_diff)
+            return
+        elif result == 154:
+            if remaining_retries == 0:
+                assert False, 'migration did not end after %d seconds' % time_diff
+            remaining_retries -= 1
+            log.info('migration in process. remaining retries: %d', remaining_retries)
+            time.sleep(2)
+        else:
+            assert False, 'unexpected error (%d) trying to remove topic when waiting for migration to end' % result
+
 def persistent_data_path_v2_migration(conn, endpoint_type):
     """ test data path v2 persistent migration """
     if get_config_cluster() == 'noname':
@@ -5101,6 +5195,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type):
         # start kafka receiver
         task, receiver = create_kafka_receiver_thread(topic_name)
         task.start()
+        verify_kafka_receiver(receiver)
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
@@ -5149,10 +5244,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type):
         zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
         # poll on topic_1
-        result = 1
-        while result != 0:
-            time.sleep(1)
-            result = remove_topic(topic_name_1, allow_failure=True)
+        poll_on_topic(topic_name_1)
 
         # topic stats
         get_stats_persistent_topic(topic_name, number_of_objects)
@@ -5197,21 +5289,21 @@ def persistent_data_path_v2_migration(conn, endpoint_type):
         receiver.close(task)
 
 
-@attr('data_path_v2_test')
+@attr('http_test')
 def persistent_data_path_v2_migration_http():
     """ test data path v2 persistent migration, http endpoint """
     conn = connection()
     persistent_data_path_v2_migration(conn, 'http')
 
 
-@attr('data_path_v2_kafka_test')
+@attr('kafka_test')
 def persistent_data_path_v2_migration_kafka():
     """ test data path v2 persistent migration, kafka endpoint """
     conn = connection()
     persistent_data_path_v2_migration(conn, 'kafka')
 
 
-@attr('data_path_v2_test')
+@attr('http_test')
 def test_ps_s3_data_path_v2_migration():
     """ test data path v2 migration """
     if get_config_cluster() == 'noname':
@@ -5263,60 +5355,53 @@ def test_ps_s3_data_path_v2_migration():
     time_diff = time.time() - start_time
     print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
-    try:
-        # verify events
-        keys = list(bucket.list())
-        http_server.verify_s3_events(keys, exact_match=True)
+    # verify events
+    keys = list(bucket.list())
+    http_server.verify_s3_events(keys, exact_match=True)
 
-        # create topic to poll on
-        topic_name_1 = topic_name + '_1'
-        topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
+    # create topic to poll on
+    topic_name_1 = topic_name + '_1'
+    topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
 
-        # enable v2 notification
-        zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
+    # enable v2 notification
+    zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
-        # poll on topic_1
-        result = 1
-        while result != 0:
-            time.sleep(1)
-            result = remove_topic(topic_name_1, allow_failure=True)
+    # poll on topic_1
+    poll_on_topic(topic_name_1)
 
-        # create more objects in the bucket (async)
-        client_threads = []
-        start_time = time.time()
-        for i in range(number_of_objects):
-            key = bucket.new_key('key-'+str(i))
-            content = str(os.urandom(1024*1024))
-            thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-            thr.start()
-            client_threads.append(thr)
-        [thr.join() for thr in client_threads]
-        time_diff = time.time() - start_time
-        print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+    # create more objects in the bucket (async)
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
-        # verify events
-        keys = list(bucket.list())
-        http_server.verify_s3_events(keys, exact_match=True)
+    # verify events
+    keys = list(bucket.list())
+    http_server.verify_s3_events(keys, exact_match=True)
 
-    except Exception as e:
-        assert False, str(e)
-    finally:
-        # cleanup
-        s3_notification_conf.del_config()
-        topic_conf.del_config()
-        # delete objects from the bucket
-        client_threads = []
-        for key in bucket.list():
-            thr = threading.Thread(target = key.delete, args=())
-            thr.start()
-            client_threads.append(thr)
-        [thr.join() for thr in client_threads]
-        # delete the bucket
-        conn.delete_bucket(bucket_name)
-        http_server.close()
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete objects from the bucket
+    client_threads = []
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
 
 
-@attr('data_path_v2_test')
+@attr('basic_test')
 def test_ps_s3_data_path_v2_large_migration():
     """ test data path v2 large migration """
     if get_config_cluster() == 'noname':
@@ -5384,17 +5469,12 @@ def test_ps_s3_data_path_v2_large_migration():
     # enable v2 notification
     zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
-    # poll on topic_1
-    for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
-        while True:
-            result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
-
-            if result != 0:
-                print('migration in process... error: '+str(result))
-            else:
-                break
+    for tenant in tenants_list:
+        list_topics(1, tenant)
 
-            time.sleep(1)
+    # poll on topic
+    for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
+        poll_on_topic(topic_conf.topic_name, tenant)
 
     # check if we migrated all the topics
     for tenant in tenants_list:
@@ -5402,7 +5482,7 @@ def test_ps_s3_data_path_v2_large_migration():
 
     # check if we migrated all the notifications
     for tenant, bucket in zip(tenants_list, buckets_list):
-        list_notifications(bucket.name, num_of_s3_notifications)
+        list_notifications(bucket.name, num_of_s3_notifications, tenant)
 
     # cleanup
     for s3_notification_conf in s3_notification_conf_list:
@@ -5414,7 +5494,7 @@ def test_ps_s3_data_path_v2_large_migration():
         conn.delete_bucket(bucket.name)
 
 
-@attr('data_path_v2_test')
+@attr('basic_test')
 def test_ps_s3_data_path_v2_mixed_migration():
     """ test data path v2 mixed migration """
     if get_config_cluster() == 'noname':
@@ -5509,17 +5589,9 @@ def test_ps_s3_data_path_v2_mixed_migration():
     # enable v2 notification
     zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
-    # poll on topic_1
+    # poll on topic
     for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
-        while True:
-            result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
-
-            if result != 0:
-                print(result)
-            else:
-                break
-
-            time.sleep(1)
+        poll_on_topic(topic_conf.topic_name, tenant)
 
     # check if we migrated all the topics
     for tenant in tenants_list:
@@ -5527,7 +5599,7 @@ def test_ps_s3_data_path_v2_mixed_migration():
 
     # check if we migrated all the notifications
     for tenant, bucket in zip(tenants_list, buckets_list):
-        list_notifications(bucket.name, 2)
+        list_notifications(bucket.name, 2, tenant)
 
     # cleanup
     for s3_notification_conf in s3_notification_conf_list:
@@ -5551,8 +5623,9 @@ def test_notification_caching():
     # start kafka receiver
     task, receiver = create_kafka_receiver_thread(topic_name)
     task.start()
-    incorrect_port = 8080
-    endpoint_address = 'kafka://' + kafka_server + ':' + str(incorrect_port)
+    verify_kafka_receiver(receiver)
+    incorrect_port = 9999
+    endpoint_address = 'kafka://' + default_kafka_server + ':' + str(incorrect_port)
     endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true'
 
     # create s3 topic
@@ -5609,7 +5682,7 @@ def test_notification_caching():
     assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
 
     # remove the port and update the topic, so its pointing to correct endpoint.
-    endpoint_address = 'kafka://' + kafka_server
+    endpoint_address = 'kafka://' + default_kafka_server
     # update s3 topic
     topic_conf.set_attributes(attribute_name="push-endpoint",
                               attribute_val=endpoint_address)
@@ -5638,9 +5711,11 @@ def test_connection_caching():
     # start kafka receiver
     task_1, receiver_1 = create_kafka_receiver_thread(topic_name_1)
     task_1.start()
+    verify_kafka_receiver(receiver_1)
     task_2, receiver_2 = create_kafka_receiver_thread(topic_name_2)
     task_2.start()
-    endpoint_address = 'kafka://' + kafka_server
+    verify_kafka_receiver(receiver_2)
+    endpoint_address = 'kafka://' + default_kafka_server
     endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true'
 
     # initially create both s3 topics with `use-ssl=true`
@@ -5702,7 +5777,7 @@ def test_connection_caching():
     assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
 
     # remove the ssl from topic1 and update the topic.
-    endpoint_address = 'kafka://' + kafka_server
+    endpoint_address = 'kafka://' + default_kafka_server
     topic_conf_1.set_attributes(attribute_name="use-ssl",
                                 attribute_val="false")
     keys = list(bucket.list())