Formatter::ObjectSection sub_s(*f, "object");
encode_json("key", object_key, f);
encode_json("size", object_size, f);
- encode_json("etag", object_etag, f);
+ encode_json("eTag", object_etag, f);
encode_json("versionId", object_versionId, f);
encode_json("sequencer", object_sequencer, f);
encode_json("metadata", x_meta_map, f);
import string
from http import server as http_server
from random import randint
+import hashlib
from boto.s3.connection import S3Connection
put_object_tagging
from nose import SkipTest
-from nose.tools import assert_not_equal, assert_equal
+from nose.tools import assert_not_equal, assert_equal, assert_in
import boto.s3.tagging
# configure logging for the tests module
log.error(events)
assert False, err
-def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}):
+def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
""" verify there is at least one record per element """
err = ''
for key in keys:
if key_found:
break
for record in record_list['Records']:
+ assert_in('eTag', record['s3']['object'])
if record['s3']['bucket']['name'] == key.bucket.name and \
record['s3']['object']['key'] == key.name:
+ assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
+ if etags:
+ assert_in(key.etag[1:-1], etags)
if deletions and 'ObjectRemoved' in record['eventName']:
key_found = True
object_size = record['s3']['object']['size']
break
else:
for record in records['Records']:
+ assert_in('eTag', record['s3']['object'])
if record['s3']['bucket']['name'] == key.bucket.name and \
record['s3']['object']['key'] == key.name:
+ assert_equal(key.etag, record['s3']['object']['eTag'])
+ if etags:
+ assert_in(key.etag[1:-1], etags)
if deletions and 'ObjectRemoved' in record['eventName']:
key_found = True
object_size = record['s3']['object']['size']
self.topic = topic
self.stop = False
- def verify_s3_events(self, keys, exact_match=False, deletions=False):
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[]):
"""verify stored s3 records agains a list of keys"""
- verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags)
self.events = []
def kafka_receiver_thread_runner(receiver):
# create objects in the bucket (async)
number_of_objects = 10
client_threads = []
+ etags = []
start_time = time.time()
for i in range(number_of_objects):
key = bucket.new_key(str(i))
content = str(os.urandom(1024*1024))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
print('wait for 5sec for the messages...')
time.sleep(5)
keys = list(bucket.list())
- receiver.verify_s3_events(keys, exact_match=True)
+ receiver.verify_s3_events(keys, exact_match=True, etags=etags)
# delete objects from the bucket
client_threads = []
print('wait for 5sec for the messages...')
time.sleep(5)
- receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
finally:
# cleanup