return
body = self.rfile.read(content_length)
if self.server.cloudevents:
- event = from_http(self.headers, body)
+ event = from_http(self.headers, body)
record = json.loads(body)['Records'][0]
assert_equal(event['specversion'], '1.0')
assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
- assert_equal(event['datacontenttype'], 'application/json')
+ assert_equal(event['datacontenttype'], 'application/json')
assert_equal(event['subject'], record['s3']['object']['key'])
assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
log.info('HTTP Server received event: %s', str(body))
self.acquire_lock()
self.events.append(event)
self.lock.release()
-
+
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
"""verify stored s3 records agains a list of keys"""
self.acquire_lock()
port = 9093
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic,
- bootstrap_servers = kafka_server+':'+str(port),
+ self.consumer = KafkaConsumer(topic,
+ bootstrap_servers = kafka_server+':'+str(port),
security_protocol=security_type,
consumer_timeout_ms=16000,
auto_offset_reset='earliest')
conn = S3Connection(aws_access_key_id=vstart_access_key,
aws_secret_access_key=vstart_secret_key,
- is_secure=False, port=port_no, host=hostname,
+ is_secure=False, port=port_no, host=hostname,
calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn
conn = S3Connection(aws_access_key_id=vstart_access_key,
aws_secret_access_key=vstart_secret_key,
- is_secure=False, port=port_no, host=hostname,
+ is_secure=False, port=port_no, host=hostname,
calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
+ is_secure=False, port=get_config_port(), host=get_config_host(),
calling_format='boto.s3.connection.OrdinaryCallingFormat')
return conn, arn
@attr('basic_test')
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
-
+
tenant = 'kaboom'
conn = connect_random_user(tenant)
-
+
# make sure there are no leftover topics
delete_all_topics(conn, tenant, get_config_cluster())
-
+
zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
@attr('basic_test')
def test_ps_s3_topic_admin_on_master():
""" test s3 topics set/get/delete on master """
-
+
tenant = 'kaboom'
conn = connect_random_user(tenant)
-
+
# make sure there are no leftover topics
delete_all_topics(conn, tenant, get_config_cluster())
-
+
zonegroup = get_config_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
""" test s3 notification filter on master """
hostname = get_ip()
-
+
conn = connection()
ps_zone = conn
# create s3 topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-
+
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""
-
+
if not external_endpoint_address:
hostname = get_ip()
proc = init_rabbitmq()
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
+ topic_name = bucket_name + TOPIC_SUFFIX
# start endpoint receiver
host = get_ip()
key_name = 'copy_of_foo'
bucket.copy_key(key_name, bucket.name, key.name)
expected_keys.append(key_name)
-
+
# create another objects in the bucket using COPY
# but override the metadata value
key_name = 'another_copy_of_foo'
# create objects in the bucket using COPY
key_name = 'copy_of_foo'
bucket.copy_key(key_name, bucket.name, key.name)
-
+
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
chunk_size = 1024*1024*5 # 5MB
if version not in versions:
print('version mismatch: '+version+' not in: '+str(versions))
# TODO: copy_key() does not return the version of the copied object
- #assert False
+ #assert False
else:
print('version ok: '+version+' in: '+str(versions))
size = event['s3']['object']['size']
if version not in versions:
print('version mismatch: '+version+' not in: '+str(versions))
- assert False
+ assert False
else:
print('version ok: '+version+' in: '+str(versions))
if event['eventName'] == 'ObjectRemoved:Delete':
# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
-
+
task, receiver = create_kafka_receiver_thread(topic_name+'_1')
task.start()
amqp_task.start()
#topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
-
+
topic_arn = topic_conf.set_config()
#result, status = topic_conf.set_config()
#assert_equal(status/100, 2)
keys = list(bucket.list())
# TODO: use exact match
verify_s3_records_by_elements(records, keys, exact_match=False)
- receiver.verify_s3_events(keys, exact_match=False)
+ receiver.verify_s3_events(keys, exact_match=False)
result, _ = sub_conf2.get_events()
parsed_result = json.loads(result)
for record in parsed_result['Records']:
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
-
+
# create s3 topic without policy
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
_, status = s3_notification_conf2.set_config()
assert_equal(status, 200)
-
+
try:
# 2nd user tries to delete the topic
status = topic_conf2.del_config(topic_arn=topic_arn)
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
-
+
# create consumer on the topic
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
-
+
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
tenants_list.append('')
# make sure there are no leftover topics
delete_all_topics(conn, '', get_config_cluster())
-
+
# make sure that we start at v2
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)