self.topic = topic
self.stop = False
- def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[]):
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[], expected_sizes={}):
"""verify stored s3 records agains a list of keys"""
- verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags)
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags, expected_sizes=expected_sizes)
self.events = []
+ def close(self, task):
+ stop_kafka_receiver(self, task)
+
+
def kafka_receiver_thread_runner(receiver):
"""main thread function for the kafka receiver"""
try:
# start amqp receiver
task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers)
task.start()
- endpoint_address = 'kafka://' + kafka_server
+ endpoint_address = 'kafka://' + kafka_server + ':9092'
# without acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
if kafka_brokers is not None: