http_server.close()
+def wait_for_queue_to_drain(topic_name):
+ retries = 0
+ entries = 1
+ start_time = time.time()
+ # topic stats
+ while entries > 0:
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ assert_equal(result[1], 0)
+ parsed_result = json.loads(result[0])
+ entries = parsed_result['Topic Stats']['Entries']
+ retries += 1
+ if retries > 30:
+ time_diff = time.time() - start_time
+ log.warning('queue %s still has %d entries after %ds', topic_name, entries, time_diff)
+ assert_equal(entries, 0)
+ time.sleep(5)
+ time_diff = time.time() - start_time
+ log.info('waited for %ds for queue %s to drain', time_diff, topic_name)
+
+
@attr('basic_test')
def test_ps_s3_persistent_topic_stats():
""" test persistent topic stats """
# start an http server in a separate thread
http_server = HTTPServerWithEvents((host, port))
- print('wait for '+str(delay*2)+'sec for the messages...')
- time.sleep(delay*2)
-
- # topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ wait_for_queue_to_drain(topic_name)
# cleanup
s3_notification_conf.del_config()
keys = list(bucket.list())
- delay = 30
- print('wait for '+str(delay)+'sec for the messages...')
- time.sleep(delay)
+ wait_for_queue_to_drain(topic_name+'_1')
http_server.verify_s3_events(keys, exact_match=False, deletions=False)
client_threads.append(thr)
[thr.join() for thr in client_threads]
- print('wait for '+str(delay)+'sec for the messages...')
- time.sleep(delay)
+ wait_for_queue_to_drain(topic_name+'_1')
http_server.verify_s3_events(keys, exact_match=False, deletions=True)
keys = list(bucket.list())
- delay = 40
- print('wait for '+str(delay)+'sec for the messages...')
- time.sleep(delay)
+ wait_for_queue_to_drain(topic_name)
receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
time_diff = time.time() - start_time
print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- print('wait for '+str(delay)+'sec for the messages...')
- time.sleep(delay)
+ wait_for_queue_to_drain(topic_name)
receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
# start an http server in a separate thread
http_server = HTTPServerWithEvents((host, http_port))
- delay = 30
- print('wait for '+str(delay)+'sec for the messages...')
- time.sleep(delay)
-
- # topic stats
- result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 0)
- assert_equal(result[1], 0)
+ wait_for_queue_to_drain(topic_name)
# verify events
keys = list(bucket.list())
http_server.verify_s3_events(keys, exact_match=False)