]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW\bucket notification tests: code dedup for admin commands 57626/head
authorAli Masarwa <amasarwa@redhat.com>
Wed, 22 May 2024 10:55:00 +0000 (13:55 +0300)
committerAli Masarwa <amasarwa@redhat.com>
Tue, 4 Jun 2024 07:33:15 +0000 (10:33 +0300)
Signed-off-by: Ali Masarwa <amasarwa@redhat.com>
src/test/rgw/bucket_notification/test_bn.py

index f7e12f25cda5560aab7a4612c9aa19a7cd26ca50..61b7374b7ef0f9d0e8d6937ddbfae6a1d120f498 100644 (file)
@@ -551,6 +551,114 @@ def another_user(user=None, tenant=None, account=None):
                       calling_format='boto.s3.connection.OrdinaryCallingFormat')
     return conn, arn
 
+
+def list_topics(assert_len=None, tenant=''):
+    if tenant == '':
+        result = admin(['topic', 'list'], get_config_cluster())
+    else:
+        result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+    parsed_result = json.loads(result[0])
+    if assert_len:
+        assert_equal(len(parsed_result['topics']), assert_len)
+    return parsed_result
+
+
+def get_stats_persistent_topic(topic_name, assert_entries_number=None):
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    if assert_entries_number:
+        assert_equal(parsed_result['Topic Stats']['Entries'], assert_entries_number)
+    return parsed_result
+
+
+def get_topic(topic_name, tenant='', allow_failure=False):
+    if tenant == '':
+        result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster())
+    else:
+        result = admin(['topic', 'get', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
+    if allow_failure:
+        return result
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    return parsed_result
+
+
+def remove_topic(topic_name, tenant='', allow_failure=False):
+    if tenant == '':
+        result = admin(['topic', 'rm', '--topic', topic_name], get_config_cluster())
+    else:
+        result = admin(['topic', 'rm', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
+    if not allow_failure:
+        assert_equal(result[1], 0)
+    return result[1]
+
+
+def list_notifications(bucket_name, assert_len=None, tenant=''):
+    if tenant == '':
+        result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
+    else:
+        result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster())
+    parsed_result = json.loads(result[0])
+    if assert_len:
+        assert_equal(len(parsed_result['notifications']), assert_len)
+    return parsed_result
+
+
+def get_notification(bucket_name,  notification_name, tenant=''):
+    if tenant == '':
+        result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster())
+    else:
+        result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name, '--tenant', tenant], get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Id'], notification_name)
+    return parsed_result
+
+
+def remove_notification(bucket_name, notification_name='', tenant='', allow_failure=False):
+    args = ['notification', 'rm', '--bucket', bucket_name]
+    if notification_name != '':
+        args.extend(['--notification-id', notification_name])
+    if tenant != '':
+        args.extend(['--tenant', tenant])
+    result = admin(args, get_config_cluster())
+    if not allow_failure:
+        assert_equal(result[1], 0)
+    return result[1]
+
+
+zonegroup_feature_notification_v2 = 'notification_v2'
+
+
+def zonegroup_modify_feature(enable, feature_name):
+    if enable:
+        command = '--enable-feature='+feature_name
+    else:
+        command = '--disable-feature='+feature_name
+    result = admin(['zonegroup', 'modify', command], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+
+def connect_random_user(tenant=''):
+    access_key = str(time.time())
+    secret_key = str(time.time())
+    uid = UID_PREFIX + str(time.time())
+    if tenant == '':
+        _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+    else:
+        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+    assert_equal(result, 0)
+    conn = S3Connection(aws_access_key_id=access_key,
+                        aws_secret_access_key=secret_key,
+                        is_secure=False, port=get_config_port(), host=get_config_host(),
+                        calling_format='boto.s3.connection.OrdinaryCallingFormat')
+    return conn
+
 ##############
 # bucket notifications tests
 ##############
@@ -560,16 +668,8 @@ def another_user(user=None, tenant=None, account=None):
 def test_ps_s3_topic_on_master():
     """ test s3 topics set/get/delete on master """
     
-    access_key = str(time.time())
-    secret_key = str(time.time())
-    uid = UID_PREFIX + str(time.time())
     tenant = 'kaboom'
-    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
-    assert_equal(result, 0)
-    conn = S3Connection(aws_access_key_id=access_key,
-                  aws_secret_access_key=secret_key,
-                      is_secure=False, port=get_config_port(), host=get_config_host(), 
-                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
+    conn = connect_random_user(tenant)
     
     # make sure there are no leftover topics
     delete_all_topics(conn, tenant, get_config_cluster())
@@ -634,16 +734,8 @@ def test_ps_s3_topic_on_master():
 def test_ps_s3_topic_admin_on_master():
     """ test s3 topics set/get/delete on master """
     
-    access_key = str(time.time())
-    secret_key = str(time.time())
-    uid = UID_PREFIX + str(time.time())
     tenant = 'kaboom'
-    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
-    assert_equal(result, 0)
-    conn = S3Connection(aws_access_key_id=access_key,
-                  aws_secret_access_key=secret_key,
-                      is_secure=False, port=get_config_port(), host=get_config_host(), 
-                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
+    conn = connect_random_user(tenant)
     
     # make sure there are no leftover topics
     delete_all_topics(conn, tenant, get_config_cluster())
@@ -674,36 +766,28 @@ def test_ps_s3_topic_admin_on_master():
                  'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
 
     # get topic 3 via commandline
-    result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
-    parsed_result = json.loads(result[0])
+    parsed_result = get_topic(topic_name+'_3', tenant)
     assert_equal(parsed_result['arn'], topic_arn3)
     matches = [tenant, UID_PREFIX]
     assert_true( all([x in parsed_result['owner'] for x in matches]))
 
     # delete topic 3
-    _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
-    assert_equal(result, 0)
+    remove_topic(topic_name + '_3', tenant)
 
     # try to get a deleted topic
-    _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
+    _, result = get_topic(topic_name + '_3', tenant, allow_failure=True)
     print('"topic not found" error is expected')
     assert_equal(result, 2)
 
     # get the remaining 2 topics
-    result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result['topics']), 2)
+    list_topics(2, tenant)
 
     # delete topics
-    _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant], get_config_cluster())
-    assert_equal(result, 0)
-    _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant], get_config_cluster())
-    assert_equal(result, 0)
+    remove_topic(topic_name + '_1', tenant)
+    remove_topic(topic_name + '_2', tenant)
 
     # get topic list, make sure it is empty
-    result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result['topics']), 0)
+    list_topics(0, tenant)
 
 
 def notification_configuration(with_cli):
@@ -744,11 +828,7 @@ def notification_configuration(with_cli):
 
     # get notification 1
     if with_cli:
-        result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
-        parsed_result = json.loads(result[0])
-        assert_equal(parsed_result['Id'], notification_name+'_1')
-        assert_equal(parsed_result['TopicArn'], topic_arn)
-        assert_equal(result[1], 0)
+        get_notification(bucket_name, notification_name+'_1')
     else:
         response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
         assert_equal(status/100, 2)
@@ -756,10 +836,7 @@ def notification_configuration(with_cli):
 
     # list notification
     if with_cli:
-        result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
-        parsed_result = json.loads(result[0])
-        assert_equal(len(parsed_result['notifications']), 3)
-        assert_equal(result[1], 0)
+        list_notifications(bucket_name, 3)
     else:
         result, status = s3_notification_conf.get_config()
         assert_equal(status, 200)
@@ -767,18 +844,14 @@ def notification_configuration(with_cli):
 
     # delete notification 2
     if with_cli:
-        _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_2'], get_config_cluster())
-        assert_equal(result, 0)
+        remove_notification(bucket_name, notification_name + '_2')
     else:
         _, status = s3_notification_conf.del_config(notification=notification_name+'_2')
         assert_equal(status/100, 2)
 
     # list notification
     if with_cli:
-        result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
-        parsed_result = json.loads(result[0])
-        assert_equal(len(parsed_result['notifications']), 2)
-        assert_equal(result[1], 0)
+        list_notifications(bucket_name, 2)
     else:
         result, status = s3_notification_conf.get_config()
         assert_equal(status, 200)
@@ -786,17 +859,13 @@ def notification_configuration(with_cli):
 
     # delete notifications
     if with_cli:
-        _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
-        assert_equal(result, 0)
+        remove_notification(bucket_name)
     else:
         _, status = s3_notification_conf.del_config()
         assert_equal(status/100, 2)
 
     # list notification, make sure it is empty
-    result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result['notifications']), 0)
-    assert_equal(result[1], 0)
+    list_notifications(bucket_name, 0)
 
     # cleanup
     topic_conf.del_config()
@@ -2902,10 +2971,7 @@ def test_ps_s3_persistent_topic_stats():
     assert_equal(status/100, 2)
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, 0)
 
     # create objects in the bucket (async)
     number_of_objects = 20
@@ -2922,10 +2988,7 @@ def test_ps_s3_persistent_topic_stats():
     print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, number_of_objects)
 
     # delete objects from the bucket
     client_threads = []
@@ -2939,10 +3002,7 @@ def test_ps_s3_persistent_topic_stats():
     print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, 2 * number_of_objects)
 
     # start an http server in a separate thread
     http_server = HTTPServerWithEvents((host, port))
@@ -2991,18 +3051,13 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
     time.sleep(delay)
     http_server.close()
     # topic get
-    result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
+    parsed_result = get_topic(topic_name)
     parsed_result_dest = parsed_result["dest"]
     for key, value in config_dict.items():
         assert_equal(parsed_result_dest[key], str(value))
-    assert_equal(result[1], 0)
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, 0)
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -3019,17 +3074,11 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
     print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, number_of_objects)
 
     # wait as much as ttl and check if the persistent topics have expired
     time.sleep(persistency_time)
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, 0)
 
     # delete objects from the bucket
     client_threads = []
@@ -3048,17 +3097,11 @@ def ps_s3_persistent_topic_configs(persistency_time, config_dict):
             start_time = time.time()
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, number_of_objects)
 
     # wait as much as ttl and check if the persistent topics have expired
     time.sleep(persistency_time)
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, 0)
 
     # cleanup
     s3_notification_conf.del_config()
@@ -4430,12 +4473,7 @@ def test_persistent_ps_s3_reload():
     delete_all_topics(conn, '', get_config_cluster())
 
     # disable v2 notification
-    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
 
     # create random port for the http server
     host = get_ip()
@@ -4470,10 +4508,7 @@ def test_persistent_ps_s3_reload():
     assert_equal(status/100, 2)
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name1], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name1, 0)
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -4505,12 +4540,7 @@ def test_persistent_ps_s3_reload():
 
     # do a reload
     print('do reload')
-    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
     wait_for_queue_to_drain(topic_name1, http_port=http_port)
     # verify events
@@ -4546,12 +4576,7 @@ def test_persistent_ps_s3_data_path_v2_migration():
     http_port = random.randint(10000, 20000)
 
     # disable v2 notification
-    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -4575,10 +4600,7 @@ def test_persistent_ps_s3_data_path_v2_migration():
     assert_equal(status/100, 2)
 
     # topic stats
-    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
-    assert_equal(result[1], 0)
+    get_stats_persistent_topic(topic_name, 0)
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -4597,34 +4619,23 @@ def test_persistent_ps_s3_data_path_v2_migration():
     http_server = None
     try:
         # topic stats
-        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-        parsed_result = json.loads(result[0])
-        assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
-        assert_equal(result[1], 0)
+        get_stats_persistent_topic(topic_name, number_of_objects)
 
         # create topic to poll on
         topic_name_1 = topic_name + '_1'
         topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
 
         # enable v2 notification
-        result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
-        assert_equal(result[1], 0)
-        result = admin(['period', 'update'], get_config_cluster())
-        assert_equal(result[1], 0)
-        result = admin(['period', 'commit'], get_config_cluster())
-        assert_equal(result[1], 0)
+        zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
         # poll on topic_1
         result = 1
         while result != 0:
             time.sleep(1)
-            result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
+            result = remove_topic(topic_name_1, allow_failure=True)
 
         # topic stats
-        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-        parsed_result = json.loads(result[0])
-        assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
-        assert_equal(result[1], 0)
+        get_stats_persistent_topic(topic_name, number_of_objects)
 
         # create more objects in the bucket (async)
         client_threads = []
@@ -4640,10 +4651,7 @@ def test_persistent_ps_s3_data_path_v2_migration():
         print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
         # topic stats
-        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-        parsed_result = json.loads(result[0])
-        assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
-        assert_equal(result[1], 0)
+        get_stats_persistent_topic(topic_name, 2 * number_of_objects)
 
         # start an http server in a separate thread
         http_server = HTTPServerWithEvents((host, http_port))
@@ -4689,12 +4697,7 @@ def test_ps_s3_data_path_v2_migration():
     http_server = HTTPServerWithEvents((host, http_port))
 
     # disable v2 notification
-    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -4740,20 +4743,13 @@ def test_ps_s3_data_path_v2_migration():
         topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
 
         # enable v2 notification
-        result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
-        assert_equal(result[1], 0)
-        result = admin(['period', 'update'], get_config_cluster())
-        assert_equal(result[1], 0)
-        result = admin(['period', 'commit'], get_config_cluster())
-        assert_equal(result[1], 0)
-
+        zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
         # poll on topic_1
         result = 1
         while result != 0:
             time.sleep(1)
-            result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
-
+            result = remove_topic(topic_name_1, allow_failure=True)
 
         # create more objects in the bucket (async)
         client_threads = []
@@ -4804,29 +4800,15 @@ def test_ps_s3_data_path_v2_large_migration():
     # make sure there are no leftover topics
     delete_all_topics(conn, '', get_config_cluster())
     for i in ['1', '2']:
-        access_key = str(time.time())
-        secret_key = str(time.time())
-        uid = UID_PREFIX + str(time.time())
         tenant_id = 'kaboom_' + i
-        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], 
-                          get_config_cluster())
-        assert_equal(result, 0)
         tenants_list.append(tenant_id)
-        conn = S3Connection(aws_access_key_id=access_key,
-                            aws_secret_access_key=secret_key,
-                            is_secure=False, port=get_config_port(), host=get_config_host(),
-                            calling_format='boto.s3.connection.OrdinaryCallingFormat')
+        conn = connect_random_user(tenant_id)
         connections_list.append(conn)
         # make sure there are no leftover topics
         delete_all_topics(conn, tenant_id, get_config_cluster())
 
     # disable v2 notification
-    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
 
     # create random port for the http server
     host = get_ip()
@@ -4870,23 +4852,15 @@ def test_ps_s3_data_path_v2_large_migration():
         polling_topics_conf.append(topic_conf)
 
     # enable v2 notification
-    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
     # poll on topic_1
     for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
         while True:
-            if tenant == '':
-                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
-            else:
-                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+            result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
 
-            if result[1] != 0:
-                print('migration in process... error: '+str(result[1]))
+            if result != 0:
+                print('migration in process... error: '+str(result))
             else:
                 break
 
@@ -4894,21 +4868,11 @@ def test_ps_s3_data_path_v2_large_migration():
 
     # check if we migrated all the topics
     for tenant in tenants_list:
-        if tenant == '':
-            topics_result = admin(['topic', 'list'], get_config_cluster())
-        else:
-            topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
-        topics_json = json.loads(topics_result[0])
-        assert_equal(len(topics_json['topics']), 1)
+        list_topics(1, tenant)
 
     # check if we migrated all the notifications
     for tenant, bucket in zip(tenants_list, buckets_list):
-        if tenant == '':
-            result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
-        else:
-            result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
-        parsed_result = json.loads(result[0])
-        assert_equal(len(parsed_result['notifications']), num_of_s3_notifications)
+        list_notifications(bucket.name, num_of_s3_notifications)
 
     # cleanup
     for s3_notification_conf in s3_notification_conf_list:
@@ -4935,26 +4899,12 @@ def test_ps_s3_data_path_v2_mixed_migration():
     delete_all_topics(conn, '', get_config_cluster())
     
     # make sure that we start at v2
-    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
     for i in ['1', '2']:
-        access_key = str(time.time())
-        secret_key = str(time.time())
-        uid = UID_PREFIX + str(time.time())
         tenant_id = 'kaboom_' + i
-        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], 
-                          get_config_cluster())
-        assert_equal(result, 0)
         tenants_list.append(tenant_id)
-        conn = S3Connection(aws_access_key_id=access_key,
-                            aws_secret_access_key=secret_key,
-                            is_secure=False, port=get_config_port(), host=get_config_host(),
-                            calling_format='boto.s3.connection.OrdinaryCallingFormat')
+        conn = connect_random_user(tenant_id)
         connections_list.append(conn)
         # make sure there are no leftover topics
         delete_all_topics(conn, tenant_id, get_config_cluster())
@@ -4994,12 +4944,7 @@ def test_ps_s3_data_path_v2_mixed_migration():
         assert_equal(status / 100, 2)
 
     # disable v2 notification
-    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
 
     # create s3 topic
     created_version = '_created_v1'
@@ -5032,22 +4977,14 @@ def test_ps_s3_data_path_v2_mixed_migration():
         polling_topics_conf.append(topic_conf)
 
     # enable v2 notification
-    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
-    result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
 
     # poll on topic_1
     for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
         while True:
-            if tenant == '':
-                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
-            else:
-                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+            result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
 
-            if result[1] != 0:
+            if result != 0:
                 print(result)
             else:
                 break
@@ -5056,21 +4993,11 @@ def test_ps_s3_data_path_v2_mixed_migration():
 
     # check if we migrated all the topics
     for tenant in tenants_list:
-        if tenant == '':
-            topics_result = admin(['topic', 'list'], get_config_cluster())
-        else:
-            topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
-        topics_json = json.loads(topics_result[0])
-        assert_equal(len(topics_json['topics']), 2)
+        list_topics(2, tenant)
 
     # check if we migrated all the notifications
     for tenant, bucket in zip(tenants_list, buckets_list):
-        if tenant == '':
-            notifications_result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
-        else:
-            notifications_result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
-        notifications_json = json.loads(notifications_result[0])
-        assert_equal(len(notifications_json['notifications']), 2)
+        list_notifications(bucket.name, 2)
 
     # cleanup
     for s3_notification_conf in s3_notification_conf_list: