host = get_ip()
port = random.randint(10000, 20000)
- # start an http server in a separate thread
- http_server = HTTPServerWithEvents((host, port))
-
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- delay = 20
- time.sleep(delay)
- http_server.close()
# topic get
parsed_result = get_topic(topic_name)
parsed_result_dest = parsed_result["dest"]
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- get_stats_persistent_topic(topic_name, number_of_objects)
+ if time_diff > persistency_time:
+ log.warning('persistency time for topic %s already passed. not possible to check object in the queue', topic_name)
+ else:
+ get_stats_persistent_topic(topic_name, number_of_objects)
+ # wait as much as ttl and check if the persistent topics have expired
+ time.sleep(persistency_time)
- # wait as much as ttl and check if the persistent topics have expired
- time.sleep(persistency_time)
get_stats_persistent_topic(topic_name, 0)
# delete objects from the bucket
thr = threading.Thread(target = key.delete, args=())
thr.start()
client_threads.append(thr)
- if count%100 == 0:
- [thr.join() for thr in client_threads]
- time_diff = time.time() - start_time
- print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- client_threads = []
- start_time = time.time()
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
- get_stats_persistent_topic(topic_name, number_of_objects)
+ if time_diff > persistency_time:
+ log.warning('persistency time for topic %s already passed. not possible to check object in the queue', topic_name)
+ else:
+ get_stats_persistent_topic(topic_name, number_of_objects)
+ # wait as much as ttl and check if the persistent topics have expired
+ time.sleep(persistency_time)
- # wait as much as ttl and check if the persistent topics have expired
- time.sleep(persistency_time)
get_stats_persistent_topic(topic_name, 0)
# cleanup
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
- time.sleep(delay)
def create_persistency_config_string(config_dict):
str_ret = ""
""" test persistent topic configurations with time_to_live """
config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"}
buffer = 10
- persistency_time =config_dict["time_to_live"] + buffer
+ persistency_time = config_dict["time_to_live"] + buffer
ps_s3_persistent_topic_configs(persistency_time, config_dict)