# XXX this should be converted to use boto3
import boto
from botocore.exceptions import ClientError
-from http import server as http_server
+from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from random import randint
import hashlib
# XXX this should be converted to use pytest
import datetime
from cloudevents.http import from_http
from dateutil import parser
+import requests
from . import(
get_config_host,
print('Error: ' + str(e))
-class HTTPPostHandler(http_server.BaseHTTPRequestHandler):
+def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
+ """ verify there is at least one record per element """
+ err = ''
+ for key in keys:
+ key_found = False
+ object_size = 0
+ if type(records) is list:
+ for record_list in records:
+ if key_found:
+ break
+ for record in record_list['Records']:
+ assert_in('eTag', record['s3']['object'])
+ if record['s3']['bucket']['name'] == key.bucket.name and \
+ record['s3']['object']['key'] == key.name:
+ # Assertion Error needs to be fixed
+ #assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
+ if etags:
+ assert_in(key.etag[1:-1], etags)
+ if len(record['s3']['object']['metadata']) > 0:
+ for meta in record['s3']['object']['metadata']:
+ assert(meta['key'].startswith(META_PREFIX))
+ if deletions and record['eventName'].startswith('ObjectRemoved'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+ elif not deletions and record['eventName'].startswith('ObjectCreated'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+ else:
+ for record in records['Records']:
+ assert_in('eTag', record['s3']['object'])
+ if record['s3']['bucket']['name'] == key.bucket.name and \
+ record['s3']['object']['key'] == key.name:
+ assert_equal(key.etag, record['s3']['object']['eTag'])
+ if etags:
+ assert_in(key.etag[1:-1], etags)
+ if len(record['s3']['object']['metadata']) > 0:
+ for meta in record['s3']['object']['metadata']:
+ assert(meta['key'].startswith(META_PREFIX))
+ if deletions and record['eventName'].startswith('ObjectRemoved'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+ elif not deletions and record['eventName'].startswith('ObjectCreated'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+
+ if not key_found:
+ err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
+ assert False, err
+ elif expected_sizes:
+ assert_equal(object_size, expected_sizes.get(key.name))
+
+ if not len(records) == len(keys):
+ err = 'superfluous records are found'
+ log.warning(err)
+ if exact_match:
+ for record_list in records:
+ for record in record_list['Records']:
+ log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
+ assert False, err
+
+
+class HTTPPostHandler(BaseHTTPRequestHandler):
"""HTTP POST hanler class storing the received events in its http server"""
def do_POST(self):
"""implementation of POST handler"""
assert_equal(event['datacontenttype'], 'application/json')
assert_equal(event['subject'], record['s3']['object']['key'])
assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
- log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
+ log.info('HTTP Server received event: %s', str(body))
self.server.append(json.loads(body))
if self.headers.get('Expect') == '100-continue':
self.send_response(100)
self.end_headers()
-class HTTPServerWithEvents(http_server.HTTPServer):
- """HTTP server used by the handler to store events"""
- def __init__(self, addr, handler, worker_id, delay=0, cloudevents=False):
- http_server.HTTPServer.__init__(self, addr, handler, False)
- self.worker_id = worker_id
+class HTTPServerWithEvents(ThreadingHTTPServer):
+ """multithreaded HTTP server used by the handler to store events"""
+ def __init__(self, addr, delay=0, cloudevents=False):
self.events = []
self.delay = delay
self.cloudevents = cloudevents
+ self.addr = addr
+ self.lock = threading.Lock()
+ ThreadingHTTPServer.__init__(self, addr, HTTPPostHandler)
+ log.info('http server created on %s', str(self.addr))
+ self.proc = threading.Thread(target=self.run)
+ self.proc.start()
+ retries = 0
+ while self.proc.is_alive() == False and retries < 5:
+ retries += 1
+ time.sleep(5)
+ log.warning('http server on %s did not start yet', str(self.addr))
+ if not self.proc.is_alive():
+ log.error('http server on %s failed to start. closing...', str(self.addr))
+ self.close()
+ assert False
- def append(self, event):
- self.events.append(event)
-
-class HTTPServerThread(threading.Thread):
- """thread for running the HTTP server. reusing the same socket for all threads"""
- def __init__(self, i, sock, addr, delay=0, cloudevents=False):
- threading.Thread.__init__(self)
- self.i = i
- self.daemon = True
- self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay, cloudevents)
- self.httpd.socket = sock
- # prevent the HTTP server from re-binding every handler
- self.httpd.server_bind = self.server_close = lambda self: None
- self.start()
def run(self):
- try:
- log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
- self.httpd.serve_forever()
- log.info('HTTP Server (%d) ended', self.i)
- except Exception as error:
- # could happen if the server r/w to a closing socket during shutdown
- log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
-
- def close(self):
- self.httpd.shutdown()
+ log.info('http server started on %s', str(self.addr))
+ self.serve_forever()
+ self.server_close()
+ log.info('http server ended on %s', str(self.addr))
- def get_events(self):
- return self.httpd.events
-
- def reset_events(self):
- self.httpd.events = []
-
-class StreamingHTTPServer:
- """multi-threaded http server class also holding list of events received into the handler
- each thread has its own server, and all servers share the same socket"""
- def __init__(self, host, port, num_workers=100, delay=0, cloudevents=False):
- addr = (host, port)
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sock.bind(addr)
- self.sock.listen(num_workers)
- self.workers = [HTTPServerThread(i, self.sock, addr, delay, cloudevents) for i in range(num_workers)]
+ def acquire_lock(self):
+ if self.lock.acquire(timeout=5) == False:
+ self.close()
+ raise AssertionError('failed to acquire lock in HTTPServerWithEvents')
+ def append(self, event):
+ self.acquire_lock()
+ self.events.append(event)
+ self.lock.release()
+
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
"""verify stored s3 records agains a list of keys"""
- events = []
- for worker in self.workers:
- events += worker.get_events()
- worker.reset_events()
- verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
-
- def verify_events(self, keys, exact_match=False, deletions=False):
- """verify stored events agains a list of keys"""
- events = []
- for worker in self.workers:
- events += worker.get_events()
- worker.reset_events()
- verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+ self.acquire_lock()
+ log.info('verify_s3_events: http server has %d events', len(self.events))
+ try:
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
+ except AssertionError as err:
+ self.close()
+ raise err
+ finally:
+ self.lock.release()
+ self.events = []
def get_and_reset_events(self):
- events = []
- for worker in self.workers:
- events += worker.get_events()
- worker.reset_events()
+ self.acquire_lock()
+ log.info('get_and_reset_events: http server has %d events', len(self.events))
+ events = self.events
+ self.events = []
+ self.lock.release()
return events
def close(self):
- """close all workers in the http server and wait for it to finish"""
- # make sure that the shared socket is closed
- # this is needed in case that one of the threads is blocked on the socket
- self.sock.shutdown(socket.SHUT_RDWR)
- self.sock.close()
- # wait for server threads to finish
- for worker in self.workers:
- worker.close()
- worker.join()
+ log.info('http server on %s starting shutdown', str(self.addr))
+ t = threading.Thread(target=self.shutdown)
+ t.start()
+ t.join(5)
+ retries = 0
+ while self.proc.is_alive() and retries < 5:
+ retries += 1
+ t.join(5)
+ log.warning('http server on %s still alive', str(self.addr))
+ if self.proc.is_alive():
+ log.error('http server on %s failed to shutdown', str(self.addr))
+ self.server_close()
+ else:
+ log.info('http server on %s shutdown ended', str(self.addr))
# AMQP endpoint functions
verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
self.events = []
- def verify_events(self, keys, exact_match=False, deletions=False):
- """verify stored events agains a list of keys"""
- verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
- self.events = []
-
def get_and_reset_events(self):
tmp = self.events
self.events = []
log.info('rabbitmq server already terminated')
-def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
- """ verify there is at least one event per element """
- err = ''
- for key in keys:
- key_found = False
- if type(events) is list:
- for event_list in events:
- if key_found:
- break
- for event in event_list['events']:
- if event['info']['bucket']['name'] == key.bucket.name and \
- event['info']['key']['name'] == key.name:
- if deletions and event['event'] == 'OBJECT_DELETE':
- key_found = True
- break
- elif not deletions and event['event'] == 'OBJECT_CREATE':
- key_found = True
- break
- else:
- for event in events['events']:
- if event['info']['bucket']['name'] == key.bucket.name and \
- event['info']['key']['name'] == key.name:
- if deletions and event['event'] == 'OBJECT_DELETE':
- key_found = True
- break
- elif not deletions and event['event'] == 'OBJECT_CREATE':
- key_found = True
- break
-
- if not key_found:
- err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
- log.error(events)
- assert False, err
-
- if not len(events) == len(keys):
- err = 'superfluous events are found'
- log.debug(err)
- if exact_match:
- log.error(events)
- assert False, err
-
META_PREFIX = 'x-amz-meta-'
-def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
- """ verify there is at least one record per element """
- err = ''
- for key in keys:
- key_found = False
- object_size = 0
- if type(records) is list:
- for record_list in records:
- if key_found:
- break
- for record in record_list['Records']:
- assert_in('eTag', record['s3']['object'])
- if record['s3']['bucket']['name'] == key.bucket.name and \
- record['s3']['object']['key'] == key.name:
- # Assertion Error needs to be fixed
- #assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
- if etags:
- assert_in(key.etag[1:-1], etags)
- if len(record['s3']['object']['metadata']) > 0:
- for meta in record['s3']['object']['metadata']:
- assert(meta['key'].startswith(META_PREFIX))
- if deletions and record['eventName'].startswith('ObjectRemoved'):
- key_found = True
- object_size = record['s3']['object']['size']
- break
- elif not deletions and record['eventName'].startswith('ObjectCreated'):
- key_found = True
- object_size = record['s3']['object']['size']
- break
- else:
- for record in records['Records']:
- assert_in('eTag', record['s3']['object'])
- if record['s3']['bucket']['name'] == key.bucket.name and \
- record['s3']['object']['key'] == key.name:
- assert_equal(key.etag, record['s3']['object']['eTag'])
- if etags:
- assert_in(key.etag[1:-1], etags)
- if len(record['s3']['object']['metadata']) > 0:
- for meta in record['s3']['object']['metadata']:
- assert(meta['key'].startswith(META_PREFIX))
- if deletions and record['eventName'].startswith('ObjectRemoved'):
- key_found = True
- object_size = record['s3']['object']['size']
- break
- elif not deletions and record['eventName'].startswith('ObjectCreated'):
- key_found = True
- object_size = record['s3']['object']['size']
- break
-
- if not key_found:
- err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
- assert False, err
- elif expected_sizes:
- assert_equal(object_size, expected_sizes.get(key.name))
-
- if not len(records) == len(keys):
- err = 'superfluous records are found'
- log.warning(err)
- if exact_match:
- for record_list in records:
- for record in record_list['Records']:
- log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
- assert False, err
-
-
# Kafka endpoint functions
kafka_server = 'localhost'
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects, cloudevents=True)
+ http_server = HTTPServerWithEvents((host, port), cloudevents=True)
# create bucket
bucket_name = gen_bucket_name()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 1
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
- http_server = StreamingHTTPServer(host, port, num_workers=10)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 200
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, port))
gw = conn
# create random port for the http server
host = get_ip()
- http_port = random.randint(10000, 20000)
+ port = random.randint(10000, 20000)
# start an http server in a separate thread
- http_server = StreamingHTTPServer(host, http_port, num_workers=10)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
- endpoint_address = 'http://'+host+':'+str(http_port)
+ endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
assert_equal(result[1], 0)
# start an http server in a separate thread
- http_server = StreamingHTTPServer(host, http_port, num_workers=10)
+ http_server = HTTPServerWithEvents((host, port))
print('wait for '+str(delay*2)+'sec for the messages...')
time.sleep(delay*2)
# create random port for the http server
host = get_ip()
- http_port = random.randint(10000, 20000)
+ port = random.randint(10000, 20000)
# start an http server in a separate thread
- http_server = StreamingHTTPServer(host, http_port, num_workers=10)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
- endpoint_address = 'http://'+host+':'+str(http_port)
+ endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true&'+create_persistency_config_string(config_dict)
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
- http_server = StreamingHTTPServer(host, port, num_workers=10, delay=0.5)
+ http_server = HTTPServerWithEvents((host, port), delay=0.5)
# create bucket
bucket_name = gen_bucket_name()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 10
- http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, port))
# create bucket
bucket_name = gen_bucket_name()
host = get_ip_http()
port = random.randint(10000, 20000)
# start an http server in a separate thread
- receiver = StreamingHTTPServer(host, port, num_workers=10)
+ receiver = HTTPServerWithEvents((host, port))
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
# the http server does not guarantee order, so duplicates are expected
assert_equal(result[1], 0)
# start an http server in a separate thread
- http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, http_port))
delay = 30
print('wait for '+str(delay)+'sec for the messages...')
http_port = random.randint(10000, 20000)
# start an http server in a separate thread
- number_of_objects = 10
- http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects)
+ http_server = HTTPServerWithEvents((host, http_port))
# disable v2 notification
result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
assert_equal(status/100, 2)
# create objects in the bucket (async)
+ number_of_objects = 10
client_threads = []
start_time = time.time()
for i in range(number_of_objects):