From: Yuval Lifshitz Date: Thu, 21 Dec 2023 18:18:45 +0000 (+0000) Subject: test/rgw/notifications: support running tests in multisite environment X-Git-Tag: v20.0.0~2440^2~16 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=7502215d70a423e642e5147a0d444f627a7979f2;p=ceph.git test/rgw/notifications: support running tests in multisite environment both locally and in teuthology Signed-off-by: Yuval Lifshitz --- diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index f7b91e10b9037..86e95dfb5f295 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -298,6 +298,8 @@ def task(ctx,config): { 'port':endpoint.port, 'host':endpoint.dns_name, + 'zonegroup':'default', + 'cluster':'noname' }, 's3 main':{} } diff --git a/src/test/rgw/bucket_notification/README.rst b/src/test/rgw/bucket_notification/README.rst index 20eee8463cf1f..db34545e1d539 100644 --- a/src/test/rgw/bucket_notification/README.rst +++ b/src/test/rgw/bucket_notification/README.rst @@ -9,6 +9,20 @@ with the `vstart.sh` script. For the tests covering Kafka and RabbitMQ security, the RGW will need to accept use/password without TLS connection between the client and the RGW. So, the cluster will have to be started with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``. +The test suite can be run against a multisite setup, in the configuration file we will have to decide which RGW and which cluster will be used for the test. +For example, if the ``test-rgw-multisite.sh`` script is used to setup multisite, and we want to run the test agianst the first RGW in the first cluster, +we would need the following configuration file:: + + [DEFAULT] + port = 8101 + host = localhost + zonegroup = zg1 + cluster = c1 + + [s3 main] + access_key = 1234567890 + secret_key = pencil + =========== Kafka Tests diff --git a/src/test/rgw/bucket_notification/__init__.py b/src/test/rgw/bucket_notification/__init__.py index 6785fce926344..5d8fac8e215c4 100644 --- a/src/test/rgw/bucket_notification/__init__.py +++ b/src/test/rgw/bucket_notification/__init__.py @@ -25,9 +25,15 @@ def setup(): global default_port default_port = int(defaults.get("port")) + global default_zonegroup + default_zonegroup = defaults.get("zonegroup") + + global default_cluster + default_cluster = defaults.get("cluster") + global main_access_key main_access_key = cfg.get('s3 main',"access_key") - + global main_secret_key main_secret_key = cfg.get('s3 main',"secret_key") @@ -39,6 +45,14 @@ def get_config_port(): global default_port return default_port +def get_config_zonegroup(): + global default_zonegroup + return default_zonegroup + +def get_config_cluster(): + global default_cluster + return default_cluster + def get_access_key(): global main_access_key return main_access_key diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py index 0b53d32c7365c..c23b7bb910abf 100644 --- a/src/test/rgw/bucket_notification/api.py +++ b/src/test/rgw/bucket_notification/api.py @@ -236,8 +236,8 @@ def bash(cmd, **kwargs): s = process.communicate()[0].decode('utf-8') return (s, process.returncode) -def admin(args, **kwargs): +def admin(args, cluster='noname', **kwargs): """ radosgw-admin command """ - cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args + cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', cluster] + args return bash(cmd, **kwargs) diff --git a/src/test/rgw/bucket_notification/bntests.conf.SAMPLE b/src/test/rgw/bucket_notification/bntests.conf.SAMPLE index eb3291dafa8e4..998fcd1ef842e 100644 --- a/src/test/rgw/bucket_notification/bntests.conf.SAMPLE +++ b/src/test/rgw/bucket_notification/bntests.conf.SAMPLE @@ -1,6 +1,8 @@ [DEFAULT] port = 8000 host = localhost +zonegroup = default +cluster = noname [s3 main] access_key = 0555b35654ad1656d804 diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 30cbfdfe7865b..8e9ebd126c633 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -9,6 +9,7 @@ import time import os import io import string +# XXX this should be converted to use boto3 import boto from botocore.exceptions import ClientError from http import server as http_server @@ -17,16 +18,16 @@ import hashlib # XXX this should be converted to use pytest from nose.plugins.attrib import attr import boto3 +from boto.s3.connection import S3Connection import datetime from cloudevents.http import from_http from dateutil import parser -# XXX this should be converted to use boto3 -from boto.s3.connection import S3Connection - from . import( get_config_host, get_config_port, + get_config_zonegroup, + get_config_cluster, get_access_key, get_secret_key ) @@ -48,7 +49,6 @@ TOPIC_SUFFIX = "_topic" NOTIFICATION_SUFFIX = "_notif" UID_PREFIX = "superman" - num_buckets = 0 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) @@ -538,9 +538,9 @@ def another_user(tenant=None): secret_key = str(time.time()) uid = UID_PREFIX + str(time.time()) if tenant: - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) else: - _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) + _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) assert_equal(result, 0) conn = S3Connection(aws_access_key_id=access_key, @@ -560,15 +560,15 @@ def test_ps_s3_topic_on_master(): access_key = str(time.time()) secret_key = str(time.time()) - uid = 'superman' + str(time.time()) + uid = UID_PREFIX + str(time.time()) tenant = 'kaboom' - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) assert_equal(result, 0) conn = S3Connection(aws_access_key_id=access_key, aws_secret_access_key=secret_key, is_secure=False, port=get_config_port(), host=get_config_host(), calling_format='boto.s3.connection.OrdinaryCallingFormat') - zonegroup = 'default' + zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX @@ -641,15 +641,15 @@ def test_ps_s3_topic_admin_on_master(): access_key = str(time.time()) secret_key = str(time.time()) - uid = 'superman' + str(time.time()) + uid = UID_PREFIX + str(time.time()) tenant = 'kaboom' - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) assert_equal(result, 0) conn = S3Connection(aws_access_key_id=access_key, aws_secret_access_key=secret_key, is_secure=False, port=get_config_port(), host=get_config_host(), calling_format='boto.s3.connection.OrdinaryCallingFormat') - zonegroup = 'default' + zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX @@ -686,34 +686,34 @@ def test_ps_s3_topic_admin_on_master(): 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3') # get topic 3 via commandline - result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant]) + result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['arn'], topic_arn3) matches = [tenant, UID_PREFIX] assert_true( all([x in parsed_result['user'] for x in matches])) # delete topic 3 - _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant]) + _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster()) assert_equal(result, 0) # try to get a deleted topic - _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant]) + _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster()) print('"topic not found" error is expected') assert_equal(result, 2) # get the remaining 2 topics - result = admin(['topic', 'list', '--tenant', tenant]) + result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(len(parsed_result['topics']), 2) # delete topics - _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant]) + _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant], get_config_cluster()) assert_equal(result, 0) - _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant]) + _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant], get_config_cluster()) assert_equal(result, 0) # get topic list, make sure it is empty - result = admin(['topic', 'list', '--tenant', tenant]) + result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(len(parsed_result['topics']), 0) @@ -722,7 +722,7 @@ def test_ps_s3_topic_admin_on_master(): def test_ps_s3_notification_configuration_admin_on_master(): """ test s3 notification list/get/delete on master """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX @@ -764,33 +764,33 @@ def test_ps_s3_notification_configuration_admin_on_master(): assert_equal(status/100, 2) # list notification - result = admin(['notification', 'list', '--bucket', bucket_name]) + result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(len(parsed_result['notifications']), 3) assert_equal(result[1], 0) # get notification 1 - result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1']) + result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Id'], notification_name+'_1') assert_equal(result[1], 0) # remove notification 3 - _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3']) + _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'], get_config_cluster()) assert_equal(result, 0) # list notification - result = admin(['notification', 'list', '--bucket', bucket_name]) + result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(len(parsed_result['notifications']), 2) assert_equal(result[1], 0) # delete notifications - _, result = admin(['notification', 'rm', '--bucket', bucket_name]) + _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster()) assert_equal(result, 0) # list notification, make sure it is empty - result = admin(['notification', 'list', '--bucket', bucket_name]) + result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(len(parsed_result['notifications']), 0) assert_equal(result[1], 0) @@ -805,7 +805,7 @@ def test_ps_s3_topic_with_secret_on_master(): if conn.secure_conn is None: return SkipTest('secure connection is needed to test topic with secrets') - zonegroup = 'default' + zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX @@ -851,7 +851,7 @@ def test_ps_s3_topic_with_secret_on_master(): def test_ps_s3_notification_on_master(): """ test s3 notification set/get/delete on master """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() # create bucket bucket = conn.create_bucket(bucket_name) @@ -915,7 +915,7 @@ def test_ps_s3_notification_on_master_empty_config(): conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -974,7 +974,7 @@ def test_ps_s3_notification_filter_on_master(): conn = connection() ps_zone = conn - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -1146,7 +1146,7 @@ def test_ps_s3_notification_filter_on_master(): def test_ps_s3_notification_errors_on_master(): """ test s3 notification set/get/delete on master """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() # create bucket bucket = conn.create_bucket(bucket_name) @@ -1245,7 +1245,7 @@ def test_ps_s3_notification_push_amqp_on_master(): hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -1349,7 +1349,7 @@ def test_ps_s3_notification_push_amqp_idleness_check(): return SkipTest("only used in manual testing") hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -1478,7 +1478,7 @@ def test_ps_s3_notification_push_amqp_idleness_check(): def test_ps_s3_notification_push_kafka_on_master(): """ test pushing kafka s3 notification on master """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -1578,7 +1578,7 @@ def test_ps_s3_notification_multi_delete_on_master(): """ test deletion of multiple keys on master """ hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -1646,7 +1646,7 @@ def test_ps_s3_notification_push_http_on_master(): """ test pushing http s3 notification on master """ hostname = get_ip_http() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -1730,7 +1730,7 @@ def test_ps_s3_notification_push_cloudevents_on_master(): """ test pushing cloudevents notification on master """ hostname = get_ip_http() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -1814,7 +1814,7 @@ def test_ps_s3_opaque_data_on_master(): """ test that opaque id set in topic, is sent in notification on master """ hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -1883,7 +1883,7 @@ def test_ps_s3_lifecycle_on_master(): """ test that when object is deleted due to lifecycle policy, notification is sent on master """ hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -1947,7 +1947,7 @@ def test_ps_s3_lifecycle_on_master(): ) # start lifecycle processing - admin(['lc', 'process']) + admin(['lc', 'process'], get_config_cluster()) print('wait for 5sec for the messages...') time.sleep(5) @@ -1994,7 +1994,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master(): """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """ hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -2056,7 +2056,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master(): ) # start lifecycle processing - admin(['lc', 'process']) + admin(['lc', 'process'], get_config_cluster()) print('wait for 20s (2 days) for the messages...') time.sleep(20) @@ -2093,7 +2093,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio """ test object creation s3 notifications in using put/copy/post on master""" if not external_endpoint_address: - hostname = 'localhost' + hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') @@ -2101,8 +2101,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio proc = None conn = connection() - hostname = 'localhost' - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -2335,7 +2334,7 @@ def test_http_post_object_upload(): import requests hostname = get_ip() - zonegroup = 'default' + zonegroup = get_config_zonegroup() conn = connection() endpoint = "http://%s:%d" % (get_config_host(), get_config_port()) @@ -2467,7 +2466,7 @@ def test_ps_s3_multipart_on_master(): hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -2556,7 +2555,7 @@ def test_ps_s3_metadata_filter_on_master(): hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -2660,7 +2659,7 @@ def test_ps_s3_metadata_on_master(): hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -2754,7 +2753,7 @@ def test_ps_s3_tags_on_master(): hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -2865,7 +2864,7 @@ def test_ps_s3_versioning_on_master(): hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -2940,7 +2939,7 @@ def test_ps_s3_versioned_deletion_on_master(): hostname = get_ip() conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -3039,7 +3038,7 @@ def test_ps_s3_persistent_cleanup(): """ test reservation cleanup after gateway crash """ return SkipTest("only used in manual testing") conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -3142,7 +3141,7 @@ def test_ps_s3_persistent_cleanup(): def test_ps_s3_persistent_topic_stats(): """ test persistent topic stats """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -3176,7 +3175,7 @@ def test_ps_s3_persistent_topic_stats(): http_server.close() # topic stats - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], 0) assert_equal(result[1], 0) @@ -3196,7 +3195,7 @@ def test_ps_s3_persistent_topic_stats(): print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # topic stats - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) assert_equal(result[1], 0) @@ -3218,7 +3217,7 @@ def test_ps_s3_persistent_topic_stats(): start_time = time.time() # topic stats - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects) assert_equal(result[1], 0) @@ -3230,7 +3229,7 @@ def test_ps_s3_persistent_topic_stats(): time.sleep(delay) # topic stats - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], 0) assert_equal(result[1], 0) @@ -3245,7 +3244,7 @@ def test_ps_s3_persistent_topic_stats(): def ps_s3_persistent_topic_configs(persistency_time, config_dict): conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -3278,7 +3277,7 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): time.sleep(delay) http_server.close() # topic get - result = admin(['topic', 'get', '--topic', topic_name]) + result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) parsed_result_dest = parsed_result["dest"] for key, value in config_dict.items(): @@ -3286,7 +3285,7 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): assert_equal(result[1], 0) # topic stats - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], 0) assert_equal(result[1], 0) @@ -3306,14 +3305,14 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # topic stats - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) assert_equal(result[1], 0) # wait as much as ttl and check if the persistent topics have expired time.sleep(persistency_time) - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], 0) assert_equal(result[1], 0) @@ -3335,14 +3334,14 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): start_time = time.time() # topic stats - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) assert_equal(result[1], 0) # wait as much as ttl and check if the persistent topics have expired time.sleep(persistency_time) - result = admin(['topic', 'stats', '--topic', topic_name]) + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) parsed_result = json.loads(result[0]) assert_equal(parsed_result['Topic Stats']['Entries'], 0) assert_equal(result[1], 0) @@ -3385,7 +3384,7 @@ def test_ps_s3_persistent_notification_pushback(): """ test pushing persistent notification pushback """ return SkipTest("only used in manual testing") conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -3465,10 +3464,8 @@ def test_ps_s3_persistent_notification_pushback(): @attr('kafka_test') def test_ps_s3_notification_kafka_idle_behaviour(): """ test pushing kafka s3 notification idle behaviour check """ - # TODO convert this test to actual running test by changing - # os.system call to verify the process idleness conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -3602,7 +3599,7 @@ def test_ps_s3_persistent_gateways_recovery(): return SkipTest('This test requires two gateways.') conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) @@ -3689,7 +3686,7 @@ def test_ps_s3_persistent_multiple_gateways(): return SkipTest('This test requires two gateways.') conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) @@ -3798,7 +3795,7 @@ def test_ps_s3_persistent_multiple_gateways(): def test_ps_s3_persistent_multiple_endpoints(): """ test pushing persistent notification when one of the endpoints has error """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create random port for the http server host = get_ip() @@ -3881,7 +3878,7 @@ def test_ps_s3_persistent_multiple_endpoints(): def persistent_notification(endpoint_type): """ test pushing persistent notification """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -4014,7 +4011,7 @@ def test_ps_s3_persistent_notification_large(): """ test pushing persistent notification of large notifications """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() @@ -4215,7 +4212,7 @@ def test_ps_s3_notification_update(): bucket_name = gen_bucket_name() topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX topic_name2 = bucket_name+'http'+TOPIC_SUFFIX - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create topics # start amqp receiver in a separate thread exchange = 'ex1' @@ -4299,7 +4296,7 @@ def test_ps_s3_multiple_topics_notification(): return SkipTest('This test is yet to be modified.') hostname = get_ip() - zonegroup = 'default' + zonegroup = get_config_zonegroup() conn = connection() ps_zone = None bucket_name = gen_bucket_name() @@ -4401,7 +4398,7 @@ def test_ps_s3_topic_permissions(): """ test s3 topic set/get/delete permissions """ conn1 = connection() conn2 = another_user() - zonegroup = 'default' + zonegroup = get_config_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX topic_policy = json.dumps({ @@ -4582,7 +4579,7 @@ def test_ps_s3_topic_no_permissions(): def kafka_security(security_type, mechanism='PLAIN'): """ test pushing kafka s3 notification securly to master """ conn = connection() - zonegroup = 'default' + zonegroup = get_config_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name)