# make sure that http handler is able to consume requests
url = 'http://{}:{}'.format(self.addr[0], self.addr[1])
response = requests.post(url, {})
- print(response)
assert response.status_code == 200
time.sleep(20)
no_keys = list(bucket.list())
- wait_for_queue_to_drain(topic_name)
+ wait_for_queue_to_drain(topic_name, http_port=port)
assert_equal(len(no_keys), 0)
event_keys = []
events = http_server.get_and_reset_events()
print('wait for 20s for the lifecycle...')
time.sleep(20)
- wait_for_queue_to_drain(topic_name)
+ wait_for_queue_to_drain(topic_name, http_port=port)
events = http_server.get_and_reset_events()
for event in events:
assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMPU')
http_server.close()
-def wait_for_queue_to_drain(topic_name, tenant=None, account=None):
+def check_http_server(http_port):
+ str_port = str(http_port)
+ cmd = 'netstat -tlnnp | grep python | grep '+str_port
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+ out = proc.communicate()[0]
+ assert len(out) > 0, 'http python server NOT listening on port '+str_port
+ log.info("http python server listening on port "+str_port)
+ log.info(out.decode('utf-8'))
+
+
+def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=None):
retries = 0
entries = 1
start_time = time.time()
if account:
cmd += ['--account-id', account]
while entries > 0:
+ if http_port:
+ check_http_server(http_port)
result = admin(cmd, get_config_cluster())
assert_equal(result[1], 0)
parsed_result = json.loads(result[0])
# start an http server in a separate thread
http_server = HTTPServerWithEvents((host, port))
- wait_for_queue_to_drain(topic_name)
+ wait_for_queue_to_drain(topic_name, http_port=port)
# cleanup
s3_notification_conf.del_config()
time_diff = time.time() - start_time
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- wait_for_queue_to_drain(topic_name1)
+ wait_for_queue_to_drain(topic_name1, http_port=http_port)
client_threads = []
start_time = time.time()
result = admin(['period', 'commit'], get_config_cluster())
assert_equal(result[1], 0)
- wait_for_queue_to_drain(topic_name1)
+ wait_for_queue_to_drain(topic_name1, http_port=http_port)
# verify events
keys = list(bucket.list())
http_server.verify_s3_events(keys, exact_match=False)
# start an http server in a separate thread
http_server = HTTPServerWithEvents((host, http_port))
- wait_for_queue_to_drain(topic_name)
+ wait_for_queue_to_drain(topic_name, http_port=http_port)
# verify events
keys = list(bucket.list())
# exact match is false because the notifications are persistent.