calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn, arn
+
+def list_topics(assert_len=None, tenant=''):
+ if tenant == '':
+ result = admin(['topic', 'list'], get_config_cluster())
+ else:
+ result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ if assert_len:
+ assert_equal(len(parsed_result['topics']), assert_len)
+ return parsed_result
+
+
+def get_stats_persistent_topic(topic_name, assert_entries_number=None):
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ if assert_entries_number:
+ assert_equal(parsed_result['Topic Stats']['Entries'], assert_entries_number)
+ return parsed_result
+
+
+def get_topic(topic_name, tenant='', allow_failure=False):
+ if tenant == '':
+ result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster())
+ else:
+ result = admin(['topic', 'get', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
+ if allow_failure:
+ return result
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ return parsed_result
+
+
+def remove_topic(topic_name, tenant='', allow_failure=False):
+ if tenant == '':
+ result = admin(['topic', 'rm', '--topic', topic_name], get_config_cluster())
+ else:
+ result = admin(['topic', 'rm', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
+ if not allow_failure:
+ assert_equal(result[1], 0)
+ return result[1]
+
+
+def list_notifications(bucket_name, assert_len=None, tenant=''):
+ if tenant == '':
+ result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
+ else:
+ result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ if assert_len:
+ assert_equal(len(parsed_result['notifications']), assert_len)
+ return parsed_result
+
+
+def get_notification(bucket_name, notification_name, tenant=''):
+ if tenant == '':
+ result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster())
+ else:
+ result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name, '--tenant', tenant], get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Id'], notification_name)
+ return parsed_result
+
+
+def remove_notification(bucket_name, notification_name='', tenant='', allow_failure=False):
+ args = ['notification', 'rm', '--bucket', bucket_name]
+ if notification_name != '':
+ args.extend(['--notification-id', notification_name])
+ if tenant != '':
+ args.extend(['--tenant', tenant])
+ result = admin(args, get_config_cluster())
+ if not allow_failure:
+ assert_equal(result[1], 0)
+ return result[1]
+
+
+zonegroup_feature_notification_v2 = 'notification_v2'
+
+
+def zonegroup_modify_feature(enable, feature_name):
+ if enable:
+ command = '--enable-feature='+feature_name
+ else:
+ command = '--disable-feature='+feature_name
+ result = admin(['zonegroup', 'modify', command], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+
+def connect_random_user(tenant=''):
+ access_key = str(time.time())
+ secret_key = str(time.time())
+ uid = UID_PREFIX + str(time.time())
+ if tenant == '':
+ _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+ else:
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+ assert_equal(result, 0)
+ conn = S3Connection(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host(),
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ return conn
+
##############
# bucket notifications tests
##############
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = UID_PREFIX + str(time.time())
tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
- assert_equal(result, 0)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ conn = connect_random_user(tenant)
# make sure there are no leftover topics
delete_all_topics(conn, tenant, get_config_cluster())
def test_ps_s3_topic_admin_on_master():
""" test s3 topics set/get/delete on master """
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = UID_PREFIX + str(time.time())
tenant = 'kaboom'
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
- assert_equal(result, 0)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ conn = connect_random_user(tenant)
# make sure there are no leftover topics
delete_all_topics(conn, tenant, get_config_cluster())
'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
# get topic 3 via commandline
- result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
- parsed_result = json.loads(result[0])
+ parsed_result = get_topic(topic_name+'_3', tenant)
assert_equal(parsed_result['arn'], topic_arn3)
matches = [tenant, UID_PREFIX]
assert_true( all([x in parsed_result['owner'] for x in matches]))
# delete topic 3
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
- assert_equal(result, 0)
+ remove_topic(topic_name + '_3', tenant)
# try to get a deleted topic
- _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant], get_config_cluster())
+ _, result = get_topic(topic_name + '_3', tenant, allow_failure=True)
print('"topic not found" error is expected')
assert_equal(result, 2)
# get the remaining 2 topics
- result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['topics']), 2)
+ list_topics(2, tenant)
# delete topics
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant], get_config_cluster())
- assert_equal(result, 0)
- _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant], get_config_cluster())
- assert_equal(result, 0)
+ remove_topic(topic_name + '_1', tenant)
+ remove_topic(topic_name + '_2', tenant)
# get topic list, make sure it is empty
- result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['topics']), 0)
+ list_topics(0, tenant)
def notification_configuration(with_cli):
# get notification 1
if with_cli:
- result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Id'], notification_name+'_1')
- assert_equal(parsed_result['TopicArn'], topic_arn)
- assert_equal(result[1], 0)
+ get_notification(bucket_name, notification_name+'_1')
else:
response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
assert_equal(status/100, 2)
# list notification
if with_cli:
- result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['notifications']), 3)
- assert_equal(result[1], 0)
+ list_notifications(bucket_name, 3)
else:
result, status = s3_notification_conf.get_config()
assert_equal(status, 200)
# delete notification 2
if with_cli:
- _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_2'], get_config_cluster())
- assert_equal(result, 0)
+ remove_notification(bucket_name, notification_name + '_2')
else:
_, status = s3_notification_conf.del_config(notification=notification_name+'_2')
assert_equal(status/100, 2)
# list notification
if with_cli:
- result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['notifications']), 2)
- assert_equal(result[1], 0)
+ list_notifications(bucket_name, 2)
else:
result, status = s3_notification_conf.get_config()
assert_equal(status, 200)
# delete notifications
if with_cli:
- _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
- assert_equal(result, 0)
+ remove_notification(bucket_name)
else:
_, status = s3_notification_conf.del_config()
assert_equal(status/100, 2)
# list notification, make sure it is empty
- result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['notifications']), 0)
- assert_equal(result[1], 0)
+ list_notifications(bucket_name, 0)
# cleanup
topic_conf.del_config()
assert_equal(status/100, 2)
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, 0)
# create objects in the bucket (async)
number_of_objects = 20
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, number_of_objects)
# delete objects from the bucket
client_threads = []
print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, 2 * number_of_objects)
# start an http server in a separate thread
http_server = HTTPServerWithEvents((host, port))
time.sleep(delay)
http_server.close()
# topic get
- result = admin(['topic', 'get', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
+ parsed_result = get_topic(topic_name)
parsed_result_dest = parsed_result["dest"]
for key, value in config_dict.items():
assert_equal(parsed_result_dest[key], str(value))
- assert_equal(result[1], 0)
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, 0)
# create objects in the bucket (async)
number_of_objects = 10
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, number_of_objects)
# wait as much as ttl and check if the persistent topics have expired
time.sleep(persistency_time)
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, 0)
# delete objects from the bucket
client_threads = []
start_time = time.time()
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, number_of_objects)
# wait as much as ttl and check if the persistent topics have expired
time.sleep(persistency_time)
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, 0)
# cleanup
s3_notification_conf.del_config()
delete_all_topics(conn, '', get_config_cluster())
# disable v2 notification
- result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
# create random port for the http server
host = get_ip()
assert_equal(status/100, 2)
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name1], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name1, 0)
# create objects in the bucket (async)
number_of_objects = 10
# do a reload
print('do reload')
- result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
wait_for_queue_to_drain(topic_name1, http_port=http_port)
# verify events
http_port = random.randint(10000, 20000)
# disable v2 notification
- result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
# create bucket
bucket_name = gen_bucket_name()
assert_equal(status/100, 2)
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, 0)
# create objects in the bucket (async)
number_of_objects = 10
http_server = None
try:
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, number_of_objects)
# create topic to poll on
topic_name_1 = topic_name + '_1'
topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
# enable v2 notification
- result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
# poll on topic_1
result = 1
while result != 0:
time.sleep(1)
- result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
+ result = remove_topic(topic_name_1, allow_failure=True)
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, number_of_objects)
# create more objects in the bucket (async)
client_threads = []
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
- assert_equal(result[1], 0)
+ get_stats_persistent_topic(topic_name, 2 * number_of_objects)
# start an http server in a separate thread
http_server = HTTPServerWithEvents((host, http_port))
http_server = HTTPServerWithEvents((host, http_port))
# disable v2 notification
- result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
# create bucket
bucket_name = gen_bucket_name()
topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
# enable v2 notification
- result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
-
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
# poll on topic_1
result = 1
while result != 0:
time.sleep(1)
- result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
-
+ result = remove_topic(topic_name_1, allow_failure=True)
# create more objects in the bucket (async)
client_threads = []
# make sure there are no leftover topics
delete_all_topics(conn, '', get_config_cluster())
for i in ['1', '2']:
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = UID_PREFIX + str(time.time())
tenant_id = 'kaboom_' + i
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'],
- get_config_cluster())
- assert_equal(result, 0)
tenants_list.append(tenant_id)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ conn = connect_random_user(tenant_id)
connections_list.append(conn)
# make sure there are no leftover topics
delete_all_topics(conn, tenant_id, get_config_cluster())
# disable v2 notification
- result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
# create random port for the http server
host = get_ip()
polling_topics_conf.append(topic_conf)
# enable v2 notification
- result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
# poll on topic_1
for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
while True:
- if tenant == '':
- result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
- else:
- result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+ result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
- if result[1] != 0:
- print('migration in process... error: '+str(result[1]))
+ if result != 0:
+ print('migration in process... error: '+str(result))
else:
break
# check if we migrated all the topics
for tenant in tenants_list:
- if tenant == '':
- topics_result = admin(['topic', 'list'], get_config_cluster())
- else:
- topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
- topics_json = json.loads(topics_result[0])
- assert_equal(len(topics_json['topics']), 1)
+ list_topics(1, tenant)
# check if we migrated all the notifications
for tenant, bucket in zip(tenants_list, buckets_list):
- if tenant == '':
- result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
- else:
- result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['notifications']), num_of_s3_notifications)
+ list_notifications(bucket.name, num_of_s3_notifications)
# cleanup
for s3_notification_conf in s3_notification_conf_list:
delete_all_topics(conn, '', get_config_cluster())
# make sure that we start at v2
- result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
for i in ['1', '2']:
- access_key = str(time.time())
- secret_key = str(time.time())
- uid = UID_PREFIX + str(time.time())
tenant_id = 'kaboom_' + i
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'],
- get_config_cluster())
- assert_equal(result, 0)
tenants_list.append(tenant_id)
- conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ conn = connect_random_user(tenant_id)
connections_list.append(conn)
# make sure there are no leftover topics
delete_all_topics(conn, tenant_id, get_config_cluster())
assert_equal(status / 100, 2)
# disable v2 notification
- result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
# create s3 topic
created_version = '_created_v1'
polling_topics_conf.append(topic_conf)
# enable v2 notification
- result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
- result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
# poll on topic_1
for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
while True:
- if tenant == '':
- result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
- else:
- result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+ result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
- if result[1] != 0:
+ if result != 0:
print(result)
else:
break
# check if we migrated all the topics
for tenant in tenants_list:
- if tenant == '':
- topics_result = admin(['topic', 'list'], get_config_cluster())
- else:
- topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
- topics_json = json.loads(topics_result[0])
- assert_equal(len(topics_json['topics']), 2)
+ list_topics(2, tenant)
# check if we migrated all the notifications
for tenant, bucket in zip(tenants_list, buckets_list):
- if tenant == '':
- notifications_result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
- else:
- notifications_result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
- notifications_json = json.loads(notifications_result[0])
- assert_equal(len(notifications_json['notifications']), 2)
+ list_notifications(bucket.name, 2)
# cleanup
for s3_notification_conf in s3_notification_conf_list: