http_server.close()
-def wait_for_queue_to_drain(topic_name):
+def wait_for_queue_to_drain(topic_name, tenant=None, account=None):
retries = 0
entries = 1
start_time = time.time()
# topic stats
+ cmd = ['topic', 'stats', '--topic', topic_name]
+ if tenant:
+ cmd += ['--tenant', tenant]
+ if account:
+ cmd += ['--account-id', account]
while entries > 0:
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ result = admin(cmd, get_config_cluster())
assert_equal(result[1], 0)
parsed_result = json.loads(result[0])
entries = parsed_result['Topic Stats']['Entries']
conn.delete_bucket(bucket_name)
http_server.close()
-def persistent_notification(endpoint_type, conn):
+def persistent_notification(endpoint_type, conn, account=None):
""" test pushing persistent notification """
zonegroup = get_config_zonegroup()
keys = list(bucket.list())
- wait_for_queue_to_drain(topic_name)
+ wait_for_queue_to_drain(topic_name, account=account)
receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
time_diff = time.time() - start_time
print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- wait_for_queue_to_drain(topic_name)
+ wait_for_queue_to_drain(topic_name, account=account)
receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
user = UID_PREFIX + 'test'
_, result = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster())
- assert_equal(result, 0)
+ assert_true(result in [0, 17]) # EEXIST okay if we rerun
conn, _ = another_user(user=user, account=account)
try:
- persistent_notification('http', conn)
+ persistent_notification('http', conn, account)
finally:
admin(['user', 'rm', '--uid', user], get_config_cluster())
admin(['account', 'rm', '--account-id', account], get_config_cluster())