]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw/notifications: support running tests in multisite environment
authorYuval Lifshitz <ylifshit@redhat.com>
Thu, 21 Dec 2023 18:18:45 +0000 (18:18 +0000)
committerCasey Bodley <cbodley@redhat.com>
Wed, 10 Apr 2024 13:18:06 +0000 (09:18 -0400)
both locally and in teuthology

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit 7502215d70a423e642e5147a0d444f627a7979f2)

qa/tasks/notification_tests.py
src/test/rgw/bucket_notification/README.rst
src/test/rgw/bucket_notification/__init__.py
src/test/rgw/bucket_notification/api.py
src/test/rgw/bucket_notification/bntests.conf.SAMPLE
src/test/rgw/bucket_notification/test_bn.py

index f7b91e10b90371838e1bfb53c8cd74a2e1e92cb0..86e95dfb5f295bb54e69e95a3099a97dbadb4f2f 100644 (file)
@@ -298,6 +298,8 @@ def task(ctx,config):
                     {
                     'port':endpoint.port,
                     'host':endpoint.dns_name,
+                    'zonegroup':'default',
+                    'cluster':'noname'
                     },
                 's3 main':{}
             }
index 20eee8463cf1fd7b1bafedfc0e3d03a9644f7457..db34545e1d539552f985ec2262c64c43b00f5f10 100644 (file)
@@ -9,6 +9,20 @@ with the `vstart.sh` script.
 For the tests covering Kafka and RabbitMQ security, the RGW will need to accept use/password without TLS connection between the client and the RGW.
 So, the cluster will have to be started with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``.
 
+The test suite can be run against a multisite setup, in the configuration file we will have to decide which RGW and which cluster will be used for the test.
+For example, if the ``test-rgw-multisite.sh`` script is used to setup multisite, and we want to run the test agianst the first RGW in the first cluster, 
+we would need the following configuration file::
+
+                               [DEFAULT]
+                               port = 8101
+                               host = localhost
+                               zonegroup = zg1
+                               cluster = c1
+
+                               [s3 main]
+                               access_key = 1234567890
+                               secret_key = pencil
+
 
 ===========
 Kafka Tests
index 6785fce9263447f7aa49040f461a855396a02cdf..5d8fac8e215c485516505e457384d64cc78c7fe6 100644 (file)
@@ -25,9 +25,15 @@ def setup():
     global default_port
     default_port = int(defaults.get("port"))
 
+    global default_zonegroup
+    default_zonegroup = defaults.get("zonegroup")
+
+    global default_cluster
+    default_cluster = defaults.get("cluster")
+
     global main_access_key
     main_access_key = cfg.get('s3 main',"access_key")
-    
+
     global main_secret_key
     main_secret_key = cfg.get('s3 main',"secret_key")
 
@@ -39,6 +45,14 @@ def get_config_port():
     global default_port
     return default_port
 
+def get_config_zonegroup():
+    global default_zonegroup
+    return default_zonegroup
+
+def get_config_cluster():
+    global default_cluster
+    return default_cluster
+
 def get_access_key():
     global main_access_key
     return main_access_key
index 0b53d32c7365cf2f28eab4dea16b479d13b50ccf..c23b7bb910abf53da4f9209f3e9a9e4517a87cc3 100644 (file)
@@ -236,8 +236,8 @@ def bash(cmd, **kwargs):
     s = process.communicate()[0].decode('utf-8')
     return (s, process.returncode)
 
-def admin(args, **kwargs):
+def admin(args, cluster='noname', **kwargs):
     """ radosgw-admin command """
-    cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args
+    cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', cluster] + args
     return bash(cmd, **kwargs)
 
index eb3291dafa8e4e8e660e006873c442ca5a546602..998fcd1ef842e0539002beddd166dd74c335f4e9 100644 (file)
@@ -1,6 +1,8 @@
 [DEFAULT]
 port = 8000
 host = localhost
+zonegroup = default
+cluster = noname
 
 [s3 main]
 access_key = 0555b35654ad1656d804
index 30cbfdfe7865b06140c979b574411011a3f0a871..8e9ebd126c633cc3638f953895727457b3ce36bc 100644 (file)
@@ -9,6 +9,7 @@ import time
 import os
 import io
 import string
+# XXX this should be converted to use boto3
 import boto
 from botocore.exceptions import ClientError
 from http import server as http_server
@@ -17,16 +18,16 @@ import hashlib
 # XXX this should be converted to use pytest
 from nose.plugins.attrib import attr
 import boto3
+from boto.s3.connection import S3Connection
 import datetime
 from cloudevents.http import from_http
 from dateutil import parser
 
-# XXX this should be converted to use boto3
-from boto.s3.connection import S3Connection
-
 from . import(
     get_config_host,
     get_config_port,
+    get_config_zonegroup,
+    get_config_cluster,
     get_access_key,
     get_secret_key
     )
@@ -48,7 +49,6 @@ TOPIC_SUFFIX = "_topic"
 NOTIFICATION_SUFFIX = "_notif"
 UID_PREFIX = "superman"
 
-
 num_buckets = 0
 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
 
@@ -538,9 +538,9 @@ def another_user(tenant=None):
     secret_key = str(time.time())
     uid = UID_PREFIX + str(time.time())
     if tenant:
-        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
     else:
-        _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+        _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
 
     assert_equal(result, 0)
     conn = S3Connection(aws_access_key_id=access_key,
@@ -560,15 +560,15 @@ def test_ps_s3_topic_on_master():
     
     access_key = str(time.time())
     secret_key = str(time.time())
-    uid = 'superman' + str(time.time())
+    uid = UID_PREFIX + str(time.time())
     tenant = 'kaboom'
-    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
     assert_equal(result, 0)
     conn = S3Connection(aws_access_key_id=access_key,
                   aws_secret_access_key=secret_key,
                       is_secure=False, port=get_config_port(), host=get_config_host(), 
                       calling_format='boto.s3.connection.OrdinaryCallingFormat')
-    zonegroup = 'default' 
+    zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
 
@@ -641,15 +641,15 @@ def test_ps_s3_topic_admin_on_master():
     
     access_key = str(time.time())
     secret_key = str(time.time())
-    uid = 'superman' + str(time.time())
+    uid = UID_PREFIX + str(time.time())
     tenant = 'kaboom'
-    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
     assert_equal(result, 0)
     conn = S3Connection(aws_access_key_id=access_key,
                   aws_secret_access_key=secret_key,
                       is_secure=False, port=get_config_port(), host=get_config_host(), 
                       calling_format='boto.s3.connection.OrdinaryCallingFormat')
-    zonegroup = 'default' 
+    zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
 
@@ -686,34 +686,34 @@ def test_ps_s3_topic_admin_on_master():
                  'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
 
     # get topic 3 via commandline
-    result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])  
+    result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['arn'], topic_arn3)
     matches = [tenant, UID_PREFIX]
     assert_true( all([x in parsed_result['user'] for x in matches]))
 
     # delete topic 3
-    _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])  
+    _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
     assert_equal(result, 0)
 
     # try to get a deleted topic
-    _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])  
+    _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
     print('"topic not found" error is expected')
     assert_equal(result, 2)
 
     # get the remaining 2 topics
-    result = admin(['topic', 'list', '--tenant', tenant])  
+    result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(len(parsed_result['topics']), 2)
 
     # delete topics
-    _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant])  
+    _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant], get_config_cluster())
     assert_equal(result, 0)
-    _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant])  
+    _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant], get_config_cluster())
     assert_equal(result, 0)
 
     # get topic list, make sure it is empty
-    result = admin(['topic', 'list', '--tenant', tenant])  
+    result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(len(parsed_result['topics']), 0)
 
@@ -722,7 +722,7 @@ def test_ps_s3_topic_admin_on_master():
 def test_ps_s3_notification_configuration_admin_on_master():
     """ test s3 notification list/get/delete on master """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
@@ -764,33 +764,33 @@ def test_ps_s3_notification_configuration_admin_on_master():
     assert_equal(status/100, 2)
 
     # list notification
-    result = admin(['notification', 'list', '--bucket', bucket_name])
+    result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(len(parsed_result['notifications']), 3)
     assert_equal(result[1], 0)
 
     # get notification 1
-    result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'])
+    result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Id'], notification_name+'_1')
     assert_equal(result[1], 0)
 
     # remove notification 3
-    _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'])
+    _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'], get_config_cluster())
     assert_equal(result, 0)
 
     # list notification
-    result = admin(['notification', 'list', '--bucket', bucket_name])
+    result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(len(parsed_result['notifications']), 2)
     assert_equal(result[1], 0)
 
     # delete notifications
-    _, result = admin(['notification', 'rm', '--bucket', bucket_name])
+    _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
     assert_equal(result, 0)
 
     # list notification, make sure it is empty
-    result = admin(['notification', 'list', '--bucket', bucket_name])
+    result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(len(parsed_result['notifications']), 0)
     assert_equal(result[1], 0)
@@ -805,7 +805,7 @@ def test_ps_s3_topic_with_secret_on_master():
     if conn.secure_conn is None:
         return SkipTest('secure connection is needed to test topic with secrets')
 
-    zonegroup = 'default' 
+    zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
 
@@ -851,7 +851,7 @@ def test_ps_s3_topic_with_secret_on_master():
 def test_ps_s3_notification_on_master():
     """ test s3 notification set/get/delete on master """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
     # create bucket
     bucket = conn.create_bucket(bucket_name)
@@ -915,7 +915,7 @@ def test_ps_s3_notification_on_master_empty_config():
 
     conn = connection()
 
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -974,7 +974,7 @@ def test_ps_s3_notification_filter_on_master():
     conn = connection()
     ps_zone = conn
 
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -1146,7 +1146,7 @@ def test_ps_s3_notification_filter_on_master():
 def test_ps_s3_notification_errors_on_master():
     """ test s3 notification set/get/delete on master """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
     # create bucket
     bucket = conn.create_bucket(bucket_name)
@@ -1245,7 +1245,7 @@ def test_ps_s3_notification_push_amqp_on_master():
 
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -1349,7 +1349,7 @@ def test_ps_s3_notification_push_amqp_idleness_check():
     return SkipTest("only used in manual testing")
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -1478,7 +1478,7 @@ def test_ps_s3_notification_push_amqp_idleness_check():
 def test_ps_s3_notification_push_kafka_on_master():
     """ test pushing kafka s3 notification on master """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -1578,7 +1578,7 @@ def test_ps_s3_notification_multi_delete_on_master():
     """ test deletion of multiple keys on master """
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -1646,7 +1646,7 @@ def test_ps_s3_notification_push_http_on_master():
     """ test pushing http s3 notification on master """
     hostname = get_ip_http()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -1730,7 +1730,7 @@ def test_ps_s3_notification_push_cloudevents_on_master():
     """ test pushing cloudevents notification on master """
     hostname = get_ip_http()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -1814,7 +1814,7 @@ def test_ps_s3_opaque_data_on_master():
     """ test that opaque id set in topic, is sent in notification on master """
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -1883,7 +1883,7 @@ def test_ps_s3_lifecycle_on_master():
     """ test that when object is deleted due to lifecycle policy, notification is sent on master """
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -1947,7 +1947,7 @@ def test_ps_s3_lifecycle_on_master():
     )
 
     # start lifecycle processing
-    admin(['lc', 'process'])
+    admin(['lc', 'process'], get_config_cluster())
     print('wait for 5sec for the messages...')
     time.sleep(5)
 
@@ -1994,7 +1994,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
     """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -2056,7 +2056,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
     )
 
     # start lifecycle processing
-    admin(['lc', 'process'])
+    admin(['lc', 'process'], get_config_cluster())
     print('wait for 20s (2 days) for the messages...')
     time.sleep(20)
 
@@ -2093,7 +2093,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio
     """ test object creation s3 notifications in using put/copy/post on master"""
     
     if not external_endpoint_address:
-        hostname = 'localhost'
+        hostname = get_ip()
         proc = init_rabbitmq()
         if proc is  None:
             return SkipTest('end2end amqp tests require rabbitmq-server installed')
@@ -2101,8 +2101,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio
         proc = None
 
     conn = connection()
-    hostname = 'localhost'
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -2335,7 +2334,7 @@ def test_http_post_object_upload():
     import requests
 
     hostname = get_ip()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     conn = connection()
 
     endpoint = "http://%s:%d" % (get_config_host(), get_config_port())
@@ -2467,7 +2466,7 @@ def test_ps_s3_multipart_on_master():
 
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -2556,7 +2555,7 @@ def test_ps_s3_metadata_filter_on_master():
 
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -2660,7 +2659,7 @@ def test_ps_s3_metadata_on_master():
 
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -2754,7 +2753,7 @@ def test_ps_s3_tags_on_master():
 
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -2865,7 +2864,7 @@ def test_ps_s3_versioning_on_master():
 
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -2940,7 +2939,7 @@ def test_ps_s3_versioned_deletion_on_master():
 
     hostname = get_ip()
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -3039,7 +3038,7 @@ def test_ps_s3_persistent_cleanup():
     """ test reservation cleanup after gateway crash """
     return SkipTest("only used in manual testing")
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -3142,7 +3141,7 @@ def test_ps_s3_persistent_cleanup():
 def test_ps_s3_persistent_topic_stats():
     """ test persistent topic stats """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -3176,7 +3175,7 @@ def test_ps_s3_persistent_topic_stats():
     http_server.close()
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], 0)
     assert_equal(result[1], 0)
@@ -3196,7 +3195,7 @@ def test_ps_s3_persistent_topic_stats():
     print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
     assert_equal(result[1], 0)
@@ -3218,7 +3217,7 @@ def test_ps_s3_persistent_topic_stats():
             start_time = time.time()
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
     assert_equal(result[1], 0)
@@ -3230,7 +3229,7 @@ def test_ps_s3_persistent_topic_stats():
     time.sleep(delay)
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], 0)
     assert_equal(result[1], 0)
@@ -3245,7 +3244,7 @@ def test_ps_s3_persistent_topic_stats():
 
 def ps_s3_persistent_topic_configs(persistency_time, config_dict):
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -3278,7 +3277,7 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
     time.sleep(delay)
     http_server.close()
     # topic get
-    result = admin(['topic', 'get', '--topic', topic_name])
+    result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     parsed_result_dest = parsed_result["dest"]
     for key, value in config_dict.items():
@@ -3286,7 +3285,7 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
     assert_equal(result[1], 0)
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], 0)
     assert_equal(result[1], 0)
@@ -3306,14 +3305,14 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
     print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
     assert_equal(result[1], 0)
 
     # wait as much as ttl and check if the persistent topics have expired
     time.sleep(persistency_time)
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], 0)
     assert_equal(result[1], 0)
@@ -3335,14 +3334,14 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
             start_time = time.time()
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
     assert_equal(result[1], 0)
 
     # wait as much as ttl and check if the persistent topics have expired
     time.sleep(persistency_time)
-    result = admin(['topic', 'stats', '--topic', topic_name])
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
     parsed_result = json.loads(result[0])
     assert_equal(parsed_result['Topic Stats']['Entries'], 0)
     assert_equal(result[1], 0)
@@ -3385,7 +3384,7 @@ def test_ps_s3_persistent_notification_pushback():
     """ test pushing persistent notification pushback """
     return SkipTest("only used in manual testing")
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -3465,10 +3464,8 @@ def test_ps_s3_persistent_notification_pushback():
 @attr('kafka_test')
 def test_ps_s3_notification_kafka_idle_behaviour():
     """ test pushing kafka s3 notification idle behaviour check """
-    # TODO convert this test to actual running test by changing
-    # os.system call to verify the process idleness
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -3602,7 +3599,7 @@ def test_ps_s3_persistent_gateways_recovery():
     return SkipTest('This test requires two gateways.')
 
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     # create random port for the http server
     host = get_ip()
     port = random.randint(10000, 20000)
@@ -3689,7 +3686,7 @@ def test_ps_s3_persistent_multiple_gateways():
     return SkipTest('This test requires two gateways.')
 
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     # create random port for the http server
     host = get_ip()
     port = random.randint(10000, 20000)
@@ -3798,7 +3795,7 @@ def test_ps_s3_persistent_multiple_gateways():
 def test_ps_s3_persistent_multiple_endpoints():
     """ test pushing persistent notification when one of the endpoints has error """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create random port for the http server
     host = get_ip()
@@ -3881,7 +3878,7 @@ def test_ps_s3_persistent_multiple_endpoints():
 def persistent_notification(endpoint_type):
     """ test pushing persistent notification """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -4014,7 +4011,7 @@ def test_ps_s3_persistent_notification_large():
     """ test pushing persistent notification of large notifications """
 
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -4215,7 +4212,7 @@ def test_ps_s3_notification_update():
     bucket_name = gen_bucket_name()
     topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
     topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     # create topics
     # start amqp receiver in a separate thread
     exchange = 'ex1'
@@ -4299,7 +4296,7 @@ def test_ps_s3_multiple_topics_notification():
     return SkipTest('This test is yet to be modified.')
 
     hostname = get_ip()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     conn = connection()
     ps_zone = None
     bucket_name = gen_bucket_name()
@@ -4401,7 +4398,7 @@ def test_ps_s3_topic_permissions():
     """ test s3 topic set/get/delete permissions """
     conn1 = connection()
     conn2 = another_user()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
     topic_policy = json.dumps({
@@ -4582,7 +4579,7 @@ def test_ps_s3_topic_no_permissions():
 def kafka_security(security_type, mechanism='PLAIN'):
     """ test pushing kafka s3 notification securly to master """
     conn = connection()
-    zonegroup = 'default'
+    zonegroup = get_config_zonegroup()
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)