conn.delete_bucket(bucket_name)
http_server.close()
-@attr('http_test')
+@attr('lifecycle_test')
def test_ps_s3_lifecycle_on_master():
""" test that when object is deleted due to lifecycle policy, notification is sent on master """
hostname = get_ip()
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
opaque_data = 'http://1.2.3.4:8888'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
time_diff = time.time() - start_time
print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ keys = list(bucket.list())
+
# create lifecycle policy
client = boto3.client('s3',
endpoint_url='http://'+conn.host+':'+str(conn.port),
# start lifecycle processing
admin(['lc', 'process'], get_config_cluster())
- print('wait for 5sec for the messages...')
- time.sleep(5)
+ print('wait for 20s for the lifecycle...')
+ time.sleep(20)
- # check http receiver does not have messages
- keys = list(bucket.list())
- print('total number of objects: ' + str(len(keys)))
+ no_keys = list(bucket.list())
+ wait_for_queue_to_drain(topic_name)
+ assert_equal(len(no_keys), 0)
event_keys = []
events = http_server.get_and_reset_events()
- assert_equal(number_of_objects * 2, len(events))
+ assert number_of_objects * 2 <= len(events)
for event in events:
assert_in(event['Records'][0]['eventName'],
['LifecycleExpiration:Delete',
for key in keys:
key_found = False
for event_key in event_keys:
- if event_key == key:
+ if event_key == key.name:
key_found = True
break
if not key_found:
except Exception as e:
print('Error: ' + str(e))
-@attr('http_test')
+@attr('lifecycle_test')
def test_ps_s3_lifecycle_abort_mpu_on_master():
""" test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
hostname = get_ip()
host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
- number_of_objects = 1
http_server = HTTPServerWithEvents((host, port))
# create bucket
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
opaque_data = 'http://1.2.3.4:8888'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
assert_equal(status/100, 2)
# start and abandon a multpart upload
- # create objects in the bucket
obj_prefix = 'ooo'
- start_time = time.time()
content = 'bar'
key_name = obj_prefix + str(1)
thr.start()
thr.join()
- time_diff = time.time() - start_time
- print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
# create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect
client = boto3.client('s3',
endpoint_url='http://'+conn.host+':'+str(conn.port),
# start lifecycle processing
admin(['lc', 'process'], get_config_cluster())
- print('wait for 20s (2 days) for the messages...')
+ print('wait for 20s for the lifecycle...')
time.sleep(20)
- # check http receiver does not have messages
- keys = list(bucket.list())
- print('total number of objects: ' + str(len(keys)))
- event_keys = []
+ wait_for_queue_to_drain(topic_name)
events = http_server.get_and_reset_events()
for event in events:
- # I hope Boto doesn't gak on the unknown eventName
- assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMultipartUpload')
- event_keys.append(event['Records'][0]['s3']['object']['key'])
- for key in keys:
- key_found = False
- for event_key in event_keys:
- if event_key == key:
- key_found = True
- break
- if not key_found:
- err = 'no lifecycle event found for key: ' + str(key)
- log.error(events)
- assert False, err
+ assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMPU')
+ assert key_name in event['Records'][0]['s3']['object']['key']
# cleanup
- for key in keys:
- key.delete()
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket