From: Ali Masarwa Date: Wed, 22 May 2024 10:55:00 +0000 (+0300) Subject: RGW\bucket notification tests: code dedup for admin commands X-Git-Tag: v20.0.0~1823^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F57626%2Fhead;p=ceph.git RGW\bucket notification tests: code dedup for admin commands Signed-off-by: Ali Masarwa --- diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index f7e12f25cda5..61b7374b7ef0 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -551,6 +551,114 @@ def another_user(user=None, tenant=None, account=None): calling_format='boto.s3.connection.OrdinaryCallingFormat') return conn, arn + +def list_topics(assert_len=None, tenant=''): + if tenant == '': + result = admin(['topic', 'list'], get_config_cluster()) + else: + result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) + parsed_result = json.loads(result[0]) + if assert_len: + assert_equal(len(parsed_result['topics']), assert_len) + return parsed_result + + +def get_stats_persistent_topic(topic_name, assert_entries_number=None): + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + if assert_entries_number: + assert_equal(parsed_result['Topic Stats']['Entries'], assert_entries_number) + return parsed_result + + +def get_topic(topic_name, tenant='', allow_failure=False): + if tenant == '': + result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster()) + else: + result = admin(['topic', 'get', '--topic', topic_name, '--tenant', tenant], get_config_cluster()) + if allow_failure: + return result + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + return parsed_result + + +def remove_topic(topic_name, tenant='', allow_failure=False): + if tenant == '': + result = admin(['topic', 'rm', '--topic', topic_name], get_config_cluster()) + else: + result = admin(['topic', 'rm', '--topic', topic_name, '--tenant', tenant], get_config_cluster()) + if not allow_failure: + assert_equal(result[1], 0) + return result[1] + + +def list_notifications(bucket_name, assert_len=None, tenant=''): + if tenant == '': + result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) + else: + result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster()) + parsed_result = json.loads(result[0]) + if assert_len: + assert_equal(len(parsed_result['notifications']), assert_len) + return parsed_result + + +def get_notification(bucket_name, notification_name, tenant=''): + if tenant == '': + result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster()) + else: + result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name, '--tenant', tenant], get_config_cluster()) + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Id'], notification_name) + return parsed_result + + +def remove_notification(bucket_name, notification_name='', tenant='', allow_failure=False): + args = ['notification', 'rm', '--bucket', bucket_name] + if notification_name != '': + args.extend(['--notification-id', notification_name]) + if tenant != '': + args.extend(['--tenant', tenant]) + result = admin(args, get_config_cluster()) + if not allow_failure: + assert_equal(result[1], 0) + return result[1] + + +zonegroup_feature_notification_v2 = 'notification_v2' + + +def zonegroup_modify_feature(enable, feature_name): + if enable: + command = '--enable-feature='+feature_name + else: + command = '--disable-feature='+feature_name + result = admin(['zonegroup', 'modify', command], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + +def connect_random_user(tenant=''): + access_key = str(time.time()) + secret_key = str(time.time()) + uid = UID_PREFIX + str(time.time()) + if tenant == '': + _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) + else: + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) + assert_equal(result, 0) + conn = S3Connection(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, port=get_config_port(), host=get_config_host(), + calling_format='boto.s3.connection.OrdinaryCallingFormat') + return conn + ############## # bucket notifications tests ############## @@ -560,16 +668,8 @@ def another_user(user=None, tenant=None, account=None): def test_ps_s3_topic_on_master(): """ test s3 topics set/get/delete on master """ - access_key = str(time.time()) - secret_key = str(time.time()) - uid = UID_PREFIX + str(time.time()) tenant = 'kaboom' - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) - assert_equal(result, 0) - conn = S3Connection(aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - is_secure=False, port=get_config_port(), host=get_config_host(), - calling_format='boto.s3.connection.OrdinaryCallingFormat') + conn = connect_random_user(tenant) # make sure there are no leftover topics delete_all_topics(conn, tenant, get_config_cluster()) @@ -634,16 +734,8 @@ def test_ps_s3_topic_on_master(): def test_ps_s3_topic_admin_on_master(): """ test s3 topics set/get/delete on master """ - access_key = str(time.time()) - secret_key = str(time.time()) - uid = UID_PREFIX + str(time.time()) tenant = 'kaboom' - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) - assert_equal(result, 0) - conn = S3Connection(aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - is_secure=False, port=get_config_port(), host=get_config_host(), - calling_format='boto.s3.connection.OrdinaryCallingFormat') + conn = connect_random_user(tenant) # make sure there are no leftover topics delete_all_topics(conn, tenant, get_config_cluster()) @@ -674,36 +766,28 @@ def test_ps_s3_topic_admin_on_master(): 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3') # get topic 3 via commandline - result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster()) - parsed_result = json.loads(result[0]) + parsed_result = get_topic(topic_name+'_3', tenant) assert_equal(parsed_result['arn'], topic_arn3) matches = [tenant, UID_PREFIX] assert_true( all([x in parsed_result['owner'] for x in matches])) # delete topic 3 - _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster()) - assert_equal(result, 0) + remove_topic(topic_name + '_3', tenant) # try to get a deleted topic - _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster()) + _, result = get_topic(topic_name + '_3', tenant, allow_failure=True) print('"topic not found" error is expected') assert_equal(result, 2) # get the remaining 2 topics - result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['topics']), 2) + list_topics(2, tenant) # delete topics - _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant], get_config_cluster()) - assert_equal(result, 0) - _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant], get_config_cluster()) - assert_equal(result, 0) + remove_topic(topic_name + '_1', tenant) + remove_topic(topic_name + '_2', tenant) # get topic list, make sure it is empty - result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['topics']), 0) + list_topics(0, tenant) def notification_configuration(with_cli): @@ -744,11 +828,7 @@ def notification_configuration(with_cli): # get notification 1 if with_cli: - result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Id'], notification_name+'_1') - assert_equal(parsed_result['TopicArn'], topic_arn) - assert_equal(result[1], 0) + get_notification(bucket_name, notification_name+'_1') else: response, status = s3_notification_conf.get_config(notification=notification_name+'_1') assert_equal(status/100, 2) @@ -756,10 +836,7 @@ def notification_configuration(with_cli): # list notification if with_cli: - result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['notifications']), 3) - assert_equal(result[1], 0) + list_notifications(bucket_name, 3) else: result, status = s3_notification_conf.get_config() assert_equal(status, 200) @@ -767,18 +844,14 @@ def notification_configuration(with_cli): # delete notification 2 if with_cli: - _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_2'], get_config_cluster()) - assert_equal(result, 0) + remove_notification(bucket_name, notification_name + '_2') else: _, status = s3_notification_conf.del_config(notification=notification_name+'_2') assert_equal(status/100, 2) # list notification if with_cli: - result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['notifications']), 2) - assert_equal(result[1], 0) + list_notifications(bucket_name, 2) else: result, status = s3_notification_conf.get_config() assert_equal(status, 200) @@ -786,17 +859,13 @@ def notification_configuration(with_cli): # delete notifications if with_cli: - _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster()) - assert_equal(result, 0) + remove_notification(bucket_name) else: _, status = s3_notification_conf.del_config() assert_equal(status/100, 2) # list notification, make sure it is empty - result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['notifications']), 0) - assert_equal(result[1], 0) + list_notifications(bucket_name, 0) # cleanup topic_conf.del_config() @@ -2902,10 +2971,7 @@ def test_ps_s3_persistent_topic_stats(): assert_equal(status/100, 2) # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 0) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, 0) # create objects in the bucket (async) number_of_objects = 20 @@ -2922,10 +2988,7 @@ def test_ps_s3_persistent_topic_stats(): print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, number_of_objects) # delete objects from the bucket client_threads = [] @@ -2939,10 +3002,7 @@ def test_ps_s3_persistent_topic_stats(): print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, 2 * number_of_objects) # start an http server in a separate thread http_server = HTTPServerWithEvents((host, port)) @@ -2991,18 +3051,13 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): time.sleep(delay) http_server.close() # topic get - result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) + parsed_result = get_topic(topic_name) parsed_result_dest = parsed_result["dest"] for key, value in config_dict.items(): assert_equal(parsed_result_dest[key], str(value)) - assert_equal(result[1], 0) # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 0) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, 0) # create objects in the bucket (async) number_of_objects = 10 @@ -3019,17 +3074,11 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, number_of_objects) # wait as much as ttl and check if the persistent topics have expired time.sleep(persistency_time) - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 0) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, 0) # delete objects from the bucket client_threads = [] @@ -3048,17 +3097,11 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict): start_time = time.time() # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, number_of_objects) # wait as much as ttl and check if the persistent topics have expired time.sleep(persistency_time) - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 0) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, 0) # cleanup s3_notification_conf.del_config() @@ -4430,12 +4473,7 @@ def test_persistent_ps_s3_reload(): delete_all_topics(conn, '', get_config_cluster()) # disable v2 notification - result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) # create random port for the http server host = get_ip() @@ -4470,10 +4508,7 @@ def test_persistent_ps_s3_reload(): assert_equal(status/100, 2) # topic stats - result = admin(['topic', 'stats', '--topic', topic_name1], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 0) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name1, 0) # create objects in the bucket (async) number_of_objects = 10 @@ -4505,12 +4540,7 @@ def test_persistent_ps_s3_reload(): # do a reload print('do reload') - result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) wait_for_queue_to_drain(topic_name1, http_port=http_port) # verify events @@ -4546,12 +4576,7 @@ def test_persistent_ps_s3_data_path_v2_migration(): http_port = random.randint(10000, 20000) # disable v2 notification - result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) # create bucket bucket_name = gen_bucket_name() @@ -4575,10 +4600,7 @@ def test_persistent_ps_s3_data_path_v2_migration(): assert_equal(status/100, 2) # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 0) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, 0) # create objects in the bucket (async) number_of_objects = 10 @@ -4597,34 +4619,23 @@ def test_persistent_ps_s3_data_path_v2_migration(): http_server = None try: # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, number_of_objects) # create topic to poll on topic_name_1 = topic_name + '_1' topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) # enable v2 notification - result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) # poll on topic_1 result = 1 while result != 0: time.sleep(1) - result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1] + result = remove_topic(topic_name_1, allow_failure=True) # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, number_of_objects) # create more objects in the bucket (async) client_threads = [] @@ -4640,10 +4651,7 @@ def test_persistent_ps_s3_data_path_v2_migration(): print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # topic stats - result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects) - assert_equal(result[1], 0) + get_stats_persistent_topic(topic_name, 2 * number_of_objects) # start an http server in a separate thread http_server = HTTPServerWithEvents((host, http_port)) @@ -4689,12 +4697,7 @@ def test_ps_s3_data_path_v2_migration(): http_server = HTTPServerWithEvents((host, http_port)) # disable v2 notification - result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) # create bucket bucket_name = gen_bucket_name() @@ -4740,20 +4743,13 @@ def test_ps_s3_data_path_v2_migration(): topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) # enable v2 notification - result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) - + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) # poll on topic_1 result = 1 while result != 0: time.sleep(1) - result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1] - + result = remove_topic(topic_name_1, allow_failure=True) # create more objects in the bucket (async) client_threads = [] @@ -4804,29 +4800,15 @@ def test_ps_s3_data_path_v2_large_migration(): # make sure there are no leftover topics delete_all_topics(conn, '', get_config_cluster()) for i in ['1', '2']: - access_key = str(time.time()) - secret_key = str(time.time()) - uid = UID_PREFIX + str(time.time()) tenant_id = 'kaboom_' + i - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], - get_config_cluster()) - assert_equal(result, 0) tenants_list.append(tenant_id) - conn = S3Connection(aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - is_secure=False, port=get_config_port(), host=get_config_host(), - calling_format='boto.s3.connection.OrdinaryCallingFormat') + conn = connect_random_user(tenant_id) connections_list.append(conn) # make sure there are no leftover topics delete_all_topics(conn, tenant_id, get_config_cluster()) # disable v2 notification - result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) # create random port for the http server host = get_ip() @@ -4870,23 +4852,15 @@ def test_ps_s3_data_path_v2_large_migration(): polling_topics_conf.append(topic_conf) # enable v2 notification - result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) # poll on topic_1 for tenant, topic_conf in zip(tenants_list, polling_topics_conf): while True: - if tenant == '': - result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster()) - else: - result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster()) + result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True) - if result[1] != 0: - print('migration in process... error: '+str(result[1])) + if result != 0: + print('migration in process... error: '+str(result)) else: break @@ -4894,21 +4868,11 @@ def test_ps_s3_data_path_v2_large_migration(): # check if we migrated all the topics for tenant in tenants_list: - if tenant == '': - topics_result = admin(['topic', 'list'], get_config_cluster()) - else: - topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) - topics_json = json.loads(topics_result[0]) - assert_equal(len(topics_json['topics']), 1) + list_topics(1, tenant) # check if we migrated all the notifications for tenant, bucket in zip(tenants_list, buckets_list): - if tenant == '': - result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster()) - else: - result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster()) - parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result['notifications']), num_of_s3_notifications) + list_notifications(bucket.name, num_of_s3_notifications) # cleanup for s3_notification_conf in s3_notification_conf_list: @@ -4935,26 +4899,12 @@ def test_ps_s3_data_path_v2_mixed_migration(): delete_all_topics(conn, '', get_config_cluster()) # make sure that we start at v2 - result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) for i in ['1', '2']: - access_key = str(time.time()) - secret_key = str(time.time()) - uid = UID_PREFIX + str(time.time()) tenant_id = 'kaboom_' + i - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], - get_config_cluster()) - assert_equal(result, 0) tenants_list.append(tenant_id) - conn = S3Connection(aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - is_secure=False, port=get_config_port(), host=get_config_host(), - calling_format='boto.s3.connection.OrdinaryCallingFormat') + conn = connect_random_user(tenant_id) connections_list.append(conn) # make sure there are no leftover topics delete_all_topics(conn, tenant_id, get_config_cluster()) @@ -4994,12 +4944,7 @@ def test_ps_s3_data_path_v2_mixed_migration(): assert_equal(status / 100, 2) # disable v2 notification - result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) # create s3 topic created_version = '_created_v1' @@ -5032,22 +4977,14 @@ def test_ps_s3_data_path_v2_mixed_migration(): polling_topics_conf.append(topic_conf) # enable v2 notification - result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) - result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) # poll on topic_1 for tenant, topic_conf in zip(tenants_list, polling_topics_conf): while True: - if tenant == '': - result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster()) - else: - result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster()) + result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True) - if result[1] != 0: + if result != 0: print(result) else: break @@ -5056,21 +4993,11 @@ def test_ps_s3_data_path_v2_mixed_migration(): # check if we migrated all the topics for tenant in tenants_list: - if tenant == '': - topics_result = admin(['topic', 'list'], get_config_cluster()) - else: - topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) - topics_json = json.loads(topics_result[0]) - assert_equal(len(topics_json['topics']), 2) + list_topics(2, tenant) # check if we migrated all the notifications for tenant, bucket in zip(tenants_list, buckets_list): - if tenant == '': - notifications_result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster()) - else: - notifications_result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster()) - notifications_json = json.loads(notifications_result[0]) - assert_equal(len(notifications_json['notifications']), 2) + list_notifications(bucket.name, 2) # cleanup for s3_notification_conf in s3_notification_conf_list: