verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags)
self.events = []
+ def get_and_reset_events(self):
+ tmp = self.events
+ self.events = []
+ return tmp
+
def close(self, task):
stop_kafka_receiver(self, task)
assert_equal(len(parsed_result['topics']), 0)
-@attr('basic_test')
-def test_ps_s3_notification_configuration_admin_on_master():
- """ test s3 notification list/get/delete on master """
+def notification_configuration(with_cli):
conn = connection()
zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
+ # create bucket
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
-
+
# make sure there are no leftover topics
delete_all_topics(conn, '', get_config_cluster())
# create s3 topics
- endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+ endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1')
+ 'arn:aws:sns:' + zonegroup + '::' + topic_name)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1',
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- # list notification
- result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['notifications']), 3)
- assert_equal(result[1], 0)
-
# get notification 1
- result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Id'], notification_name+'_1')
- assert_equal(result[1], 0)
+ if with_cli:
+ result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Id'], notification_name+'_1')
+ assert_equal(parsed_result['TopicArn'], topic_arn)
+ assert_equal(result[1], 0)
+ else:
+ response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
+ assert_equal(status/100, 2)
+ assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
- # remove notification 3
- _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'], get_config_cluster())
- assert_equal(result, 0)
+ # list notification
+ if with_cli:
+ result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 3)
+ assert_equal(result[1], 0)
+ else:
+ result, status = s3_notification_conf.get_config()
+ assert_equal(status, 200)
+ assert_equal(len(result['TopicConfigurations']), 3)
+
+ # delete notification 2
+ if with_cli:
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_2'], get_config_cluster())
+ assert_equal(result, 0)
+ else:
+ _, status = s3_notification_conf.del_config(notification=notification_name+'_2')
+ assert_equal(status/100, 2)
# list notification
- result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
- parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result['notifications']), 2)
- assert_equal(result[1], 0)
+ if with_cli:
+ result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 2)
+ assert_equal(result[1], 0)
+ else:
+ result, status = s3_notification_conf.get_config()
+ assert_equal(status, 200)
+ assert_equal(len(result['TopicConfigurations']), 2)
# delete notifications
- _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
- assert_equal(result, 0)
+ if with_cli:
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
+ assert_equal(result, 0)
+ else:
+ _, status = s3_notification_conf.del_config()
+ assert_equal(status/100, 2)
# list notification, make sure it is empty
result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
assert_equal(len(parsed_result['notifications']), 0)
assert_equal(result[1], 0)
+ # cleanup
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('basic_test')
+def test_notification_configuration_admin():
+ """ test notification list/set/get/delete, with admin cli """
+ notification_configuration(True)
+
@attr('modification_required')
def test_ps_s3_topic_with_secret_on_master():
@attr('basic_test')
-def test_ps_s3_notification_on_master():
- """ test s3 notification set/get/delete on master """
- conn = connection()
- zonegroup = get_config_zonegroup()
- bucket_name = gen_bucket_name()
- # create bucket
- bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
- endpoint_address = 'amqp://127.0.0.1:7001'
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
- topic_arn = topic_conf.set_config()
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name+'_1',
- 'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:*']
- },
- {'Id': notification_name+'_2',
- 'TopicArn': topic_arn,
- 'Events': ['s3:ObjectRemoved:*']
- },
- {'Id': notification_name+'_3',
- 'TopicArn': topic_arn,
- 'Events': []
- }]
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- _, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
-
- # get notifications on a bucket
- response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
- assert_equal(status/100, 2)
- assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
-
- # delete specific notifications
- _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
- assert_equal(status/100, 2)
-
- # get the remaining 2 notifications on a bucket
- response, status = s3_notification_conf.get_config()
- assert_equal(status/100, 2)
- assert_equal(len(response['TopicConfigurations']), 2)
- assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
- assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
-
- # delete remaining notifications
- _, status = s3_notification_conf.del_config()
- assert_equal(status/100, 2)
-
- # make sure that the notifications are now deleted
- _, status = s3_notification_conf.get_config()
-
- # cleanup
- topic_conf.del_config()
- # delete the bucket
- conn.delete_bucket(bucket_name)
+def test_notification_configuration():
+ """ test s3 notification set/get/deleter """
+ notification_configuration(False)
@attr('basic_test')
conn.delete_bucket(bucket_name)
-def notification_push(endpoint_type, conn, account=None):
+def notification_push(endpoint_type, conn, account=None, cloudevents=False):
""" test pushinging notification """
zonegroup = get_config_zonegroup()
# create bucket
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- receiver = {}
- task = None
host = get_ip()
- s3_notification_conf = None
- topic_conf = None
+ task = None
if endpoint_type == 'http':
# create random port for the http server
host = get_ip_http()
port = random.randint(10000, 20000)
# start an http server in a separate thread
- receiver = HTTPServerWithEvents((host, port))
+ receiver = HTTPServerWithEvents((host, port), cloudevents=cloudevents)
endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address
+ if cloudevents:
+ endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
+ else:
+ endpoint_args = 'push-endpoint='+endpoint_address
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
# create objects in the bucket
number_of_objects = 100
+ if cloudevents:
+ number_of_objects = 10
client_threads = []
etags = []
objects_size = {}
@attr('http_test')
-def test_ps_s3_notification_push_cloudevents_on_master():
- """ test pushing cloudevents notification on master """
- hostname = get_ip_http()
+def test_notification_push_cloudevents():
+ """ test pushing cloudevents notification """
conn = connection()
- zonegroup = get_config_zonegroup()
-
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- number_of_objects = 10
- http_server = HTTPServerWithEvents((host, port), cloudevents=True)
-
- # create bucket
- bucket_name = gen_bucket_name()
- bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
-
- # create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
- topic_arn = topic_conf.set_config()
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn,
- 'Events': []
- }]
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
-
- # create objects in the bucket
- client_threads = []
- objects_size = {}
- start_time = time.time()
- for i in range(number_of_objects):
- content = str(os.urandom(randint(1, 1024)))
- object_size = len(content)
- key = bucket.new_key(str(i))
- objects_size[key.name] = object_size
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
-
- time_diff = time.time() - start_time
- print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
- print('wait for 5sec for the messages...')
- time.sleep(5)
+ notification_push('http', conn, cloudevents=True)
- # check http receiver
- keys = list(bucket.list())
- http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
-
- # delete objects from the bucket
- client_threads = []
- start_time = time.time()
- for key in bucket.list():
- thr = threading.Thread(target = key.delete, args=())
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
-
- time_diff = time.time() - start_time
- print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
- print('wait for 5sec for the messages...')
- time.sleep(5)
-
- # check http receiver
- http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
-
- # cleanup
- topic_conf.del_config()
- s3_notification_conf.del_config(notification=notification_name)
- # delete the bucket
- conn.delete_bucket(bucket_name)
- http_server.close()
@attr('http_test')
conn.delete_bucket(bucket_name)
http_server.close()
-@attr('lifecycle_test')
-def test_ps_s3_lifecycle_on_master():
- """ test that when object is deleted due to lifecycle policy, notification is sent on master """
- hostname = get_ip()
- conn = connection()
- zonegroup = get_config_zonegroup()
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- number_of_objects = 10
- http_server = HTTPServerWithEvents((host, port))
+def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_thread, rules_creator, record_events,
+ expected_abortion=False):
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
+ host = get_ip()
+ task = None
+ port = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'
+ elif endpoint_type == 'kafka':
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
# create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
- opaque_data = 'http://1.2.3.4:8888'
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
- 'Events': ['s3:ObjectLifecycle:Expiration:*',
- 's3:LifecycleExpiration:*']
- }]
+ 'Events': topic_events
+ }]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
start_time = time.time()
content = 'bar'
for i in range(number_of_objects):
- key = bucket.new_key(obj_prefix + str(i))
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr = create_thread(bucket, obj_prefix, i, content)
thr.start()
client_threads.append(thr)
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
- print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ print('average time for creation + '+endpoint_type+' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
keys = list(bucket.list())
# create lifecycle policy
client = boto3.client('s3',
- endpoint_url='http://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key)
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
yesterday = datetime.date.today() - datetime.timedelta(days=1)
response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
- LifecycleConfiguration={'Rules': [
- {
- 'ID': 'rule1',
- 'Expiration': {'Date': yesterday.isoformat()},
- 'Filter': {'Prefix': obj_prefix},
- 'Status': 'Enabled',
- }
- ]
- }
- )
+ LifecycleConfiguration={'Rules': rules_creator(yesterday, obj_prefix)}
+ )
# start lifecycle processing
admin(['lc', 'process'], get_config_cluster())
print('wait for 20s for the lifecycle...')
time.sleep(20)
+ print('wait for sometime for the messages...')
no_keys = list(bucket.list())
wait_for_queue_to_drain(topic_name, http_port=port)
assert_equal(len(no_keys), 0)
event_keys = []
- events = http_server.get_and_reset_events()
- assert number_of_objects * 2 <= len(events)
+ events = receiver.get_and_reset_events()
+ if not expected_abortion:
+ assert number_of_objects * 2 <= len(events)
for event in events:
- assert_in(event['Records'][0]['eventName'],
- ['LifecycleExpiration:Delete',
- 'ObjectLifecycle:Expiration:Current'])
+ assert_in(event['Records'][0]['eventName'], record_events)
event_keys.append(event['Records'][0]['s3']['object']['key'])
for key in keys:
key_found = False
# cleanup
for key in keys:
key.delete()
- [thr.join() for thr in client_threads]
+ if not expected_abortion:
+ [thr.join() for thr in client_threads]
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
conn.delete_bucket(bucket_name)
- http_server.close()
+ receiver.close(task)
+
+
+def rules_creator(yesterday, obj_prefix):
+ return [
+ {
+ 'ID': 'rule1',
+ 'Expiration': {'Date': yesterday.isoformat()},
+ 'Filter': {'Prefix': obj_prefix},
+ 'Status': 'Enabled',
+ }
+ ]
+
+
+def create_thread(bucket, obj_prefix, i, content):
+ key = bucket.new_key(obj_prefix + str(i))
+ return threading.Thread(target = set_contents_from_string, args=(key, content,))
+
+
+@attr('http_test')
+def test_lifecycle_http():
+ """ test that when object is deleted due to lifecycle policy, http endpoint """
+
+ conn = connection()
+ lifecycle('http', conn, 10, ['s3:ObjectLifecycle:Expiration:*', 's3:LifecycleExpiration:*'], create_thread,
+ rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current'])
+
+
+@attr('kafka_test')
+def test_lifecycle_kafka():
+ """ test that when object is deleted due to lifecycle policy, kafka endpoint """
+
+ conn = connection()
+ lifecycle('kafka', conn, 10, ['s3:ObjectLifecycle:Expiration:*', 's3:LifecycleExpiration:*'], create_thread,
+ rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current'])
+
def start_and_abandon_multipart_upload(bucket, key_name, content):
try:
mp = bucket.initiate_multipart_upload(key_name)
part_data = io.StringIO(content)
mp.upload_part_from_file(part_data, 1)
- # mp.complete_upload()
except Exception as e:
print('Error: ' + str(e))
-@attr('lifecycle_test')
-def test_ps_s3_lifecycle_abort_mpu_on_master():
- """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
- hostname = get_ip()
- conn = connection()
- zonegroup = get_config_zonegroup()
-
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- http_server = HTTPServerWithEvents((host, port))
-
- # create bucket
- bucket_name = gen_bucket_name()
- bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
-
- # create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
- opaque_data = 'http://1.2.3.4:8888'
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
- topic_arn = topic_conf.set_config()
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn,
- 'Events': ['s3:ObjectLifecycle:Expiration:*']
- }]
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
-
- # start and abandon a multpart upload
- obj_prefix = 'ooo'
- content = 'bar'
- key_name = obj_prefix + str(1)
- thr = threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,))
- thr.start()
- thr.join()
+@attr('http_test')
+def test_lifecycle_abort_mpu():
+ """ test that when a multipart upload is aborted by lifecycle policy, http endpoint """
- # create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect
- client = boto3.client('s3',
- endpoint_url='http://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key)
- response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
- LifecycleConfiguration={'Rules': [
- {
- 'ID': 'abort1',
- 'Filter': {'Prefix': obj_prefix},
- 'Status': 'Enabled',
- 'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1},
- }
- ]
- }
- )
+ def rules_creator(yesterday, obj_prefix):
+ return [
+ {
+ 'ID': 'abort1',
+ 'Filter': {'Prefix': obj_prefix},
+ 'Status': 'Enabled',
+ 'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1},
+ }
+ ]
- # start lifecycle processing
- admin(['lc', 'process'], get_config_cluster())
- print('wait for 20s for the lifecycle...')
- time.sleep(20)
+ def create_thread(bucket, obj_prefix, i, content):
+ key_name = obj_prefix + str(i)
+ return threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,))
- wait_for_queue_to_drain(topic_name, http_port=port)
- events = http_server.get_and_reset_events()
- for event in events:
- assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMPU')
- assert key_name in event['Records'][0]['s3']['object']['key']
+ conn = connection()
+ lifecycle('http', conn, 1, ['s3:ObjectLifecycle:Expiration:*'], create_thread, rules_creator,
+ ['ObjectLifecycle:Expiration:AbortMultipartUpload'], True)
- # cleanup
- topic_conf.del_config()
- s3_notification_conf.del_config(notification=notification_name)
- # delete the bucket
- conn.delete_bucket(bucket_name)
- http_server.close()
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""
conn1.delete_bucket(Bucket=bucket_name)
-@attr('mpu_test')
-def test_ps_s3_multipart_on_master_http():
- """ test http multipart object upload on master"""
- conn = connection()
- zonegroup = 'default'
-
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- http_server = HTTPServerWithEvents((host, port))
+def multipart_endpoint_agnostic(endpoint_type, conn):
+ hostname = get_ip()
+ zonegroup = get_config_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
+ host = get_ip()
+ task = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((hostname, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'
+ elif endpoint_type == 'kafka':
+ # start amqp receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
# create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address
- opaque_data = 'http://1.2.3.4:8888'
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn,
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
}]
+
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
client_threads = []
content = str(os.urandom(20*1024*1024))
key = bucket.new_key('obj')
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr = threading.Thread(target=set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
[thr.join() for thr in client_threads]
# check http receiver
keys = list(bucket.list())
- print('total number of objects: ' + str(len(keys)))
- events = http_server.get_and_reset_events()
- for event in events:
- assert_equal(event['Records'][0]['opaqueData'], opaque_data)
- assert_true(event['Records'][0]['s3']['object']['eTag'] != '')
+ receiver.verify_s3_events(keys, exact_match=True, deletions=False)
# cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete objects
for key in keys:
key.delete()
- [thr.join() for thr in client_threads]
- topic_conf.del_config()
- s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
conn.delete_bucket(bucket_name)
- http_server.close()
-
+ receiver.close(task)
-@attr('amqp_test')
-def test_ps_s3_multipart_on_master():
- """ test multipart object upload on master"""
- hostname = get_ip()
+@attr('http_test')
+def test_multipart_http():
+ """ test http multipart object upload """
conn = connection()
- zonegroup = get_config_zonegroup()
+ multipart_endpoint_agnostic('http', conn)
- # create bucket
- bucket_name = gen_bucket_name()
- bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
-
- # start amqp receivers
- exchange = 'ex1'
- task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
- task1.start()
- task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
- task2.start()
- task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
- task3.start()
-
- # create s3 topics
- endpoint_address = 'amqp://' + hostname
- endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
- topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
- topic_arn1 = topic_conf1.set_config()
- topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
- topic_arn2 = topic_conf2.set_config()
- topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
- topic_arn3 = topic_conf3.set_config()
-
- # create s3 notifications
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
- 'Events': ['s3:ObjectCreated:*']
- },
- {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
- 'Events': ['s3:ObjectCreated:Post']
- },
- {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
- 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
- }]
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
- # create objects in the bucket using multi-part upload
- fp = tempfile.NamedTemporaryFile(mode='w+b')
- object_size = 1024
- content = bytearray(os.urandom(object_size))
- fp.write(content)
- fp.flush()
- fp.seek(0)
- uploader = bucket.initiate_multipart_upload('multipart')
- uploader.upload_part_from_file(fp, 1)
- uploader.complete_upload()
- fp.close()
-
- print('wait for 5sec for the messages...')
- time.sleep(5)
-
- # check amqp receiver
- events = receiver1.get_and_reset_events()
- assert_equal(len(events), 1)
-
- events = receiver2.get_and_reset_events()
- assert_equal(len(events), 0)
-
- events = receiver3.get_and_reset_events()
- assert_equal(len(events), 1)
- assert_equal(events[0]['Records'][0]['eventName'], 'ObjectCreated:CompleteMultipartUpload')
- assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
- assert_equal(events[0]['Records'][0]['s3']['object']['size'], object_size)
- assert events[0]['Records'][0]['eventTime'] != '0.000000', 'invalid eventTime'
+@attr('kafka_test')
+def test_multipart_kafka():
+ """ test kafka multipart object upload """
+ conn = connection()
+ multipart_endpoint_agnostic('kafka', conn)
- # cleanup
- stop_amqp_receiver(receiver1, task1)
- stop_amqp_receiver(receiver2, task2)
- stop_amqp_receiver(receiver3, task3)
- s3_notification_conf.del_config()
- topic_conf1.del_config()
- topic_conf2.del_config()
- topic_conf3.del_config()
- for key in bucket.list():
- key.delete()
- # delete the bucket
- conn.delete_bucket(bucket_name)
@attr('amqp_test')
-def test_ps_s3_metadata_filter_on_master():
- """ test s3 notification of metadata on master """
-
- hostname = get_ip()
+def test_multipart_ampq():
+ """ test ampq multipart object upload """
conn = connection()
- zonegroup = get_config_zonegroup()
+ multipart_endpoint_agnostic('ampq', conn)
+
+def metadata_filter(endpoint_type, conn):
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # start amqp receivers
- exchange = 'ex1'
- task, receiver = create_amqp_receiver_thread(exchange, topic_name)
- task.start()
+ # start endpoint receiver
+ host = get_ip()
+ task = None
+ port = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable&persistent=true'
+ elif endpoint_type == 'kafka':
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
# create s3 topic
- endpoint_address = 'amqp://' + hostname
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+ zonegroup = get_config_zonegroup()
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
fp.close()
expected_keys.append(key_name)
- print('wait for 5sec for the messages...')
- time.sleep(5)
+ print('wait for the messages...')
+ wait_for_queue_to_drain(topic_name, http_port=port)
# check amqp receiver
events = receiver.get_and_reset_events()
assert_equal(len(events), len(expected_keys))
# delete objects
for key in bucket.list():
key.delete()
- print('wait for 5sec for the messages...')
- time.sleep(5)
- # check amqp receiver
+ print('wait for the messages...')
+ wait_for_queue_to_drain(topic_name, http_port=port)
+ # check endpoint receiver
events = receiver.get_and_reset_events()
assert_equal(len(events), len(expected_keys))
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
# cleanup
- stop_amqp_receiver(receiver, task)
+ receiver.close(task)
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
+@attr('kafka_test')
+def test_metadata_filter_kafka():
+ """ test notification of filtering metadata, kafka endpoint """
+ conn = connection()
+ metadata_filter('kafka', conn)
+
+
+@attr('http_test')
+def test_metadata_filter_http():
+ """ test notification of filtering metadata, http endpoint """
+ conn = connection()
+ metadata_filter('http', conn)
+
+
+@attr('amqp_test')
+def test_metadata_filter_ampq():
+ """ test notification of filtering metadata, ampq endpoint """
+ conn = connection()
+ metadata_filter('amqp', conn)
+
+
@attr('amqp_test')
def test_ps_s3_metadata_on_master():
""" test s3 notification of metadata on master """
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- receiver = {}
- task = None
host = get_ip()
+ task = None
if endpoint_type == 'http':
# create random port for the http server
host = get_ip_http()