import logging
import json
import tempfile
+import BaseHTTPServer
+import SocketServer
+import random
+import threading
+import subprocess
+import socket
+import time
from .tests import get_realm, \
ZonegroupConns, \
zonegroup_meta_checkpoint, \
# configure logging for the tests module
log = logging.getLogger(__name__)
+skip_push_tests = True
+
####################################
# utility functions for pubsub tests
####################################
+# HTTP endpoint functions
+# multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
+
+class HTTPPostHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """HTTP POST hanler class storing the received events in its http server"""
+ def do_POST(self):
+ """implementation of POST handler"""
+ try:
+ content_length = int(self.headers['Content-Length'])
+ body = self.rfile.read(content_length)
+ log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
+ self.server.append(json.loads(body))
+ except:
+ log.error('HTTP Server received empty event')
+ self.send_response(400)
+ else:
+ self.send_response(100)
+ finally:
+ self.end_headers()
+
+
+class HTTPServerWithEvents(BaseHTTPServer.HTTPServer):
+ """HTTP server used by the handler to store events"""
+ def __init__(self, addr, handler, worker_id):
+ BaseHTTPServer.HTTPServer.__init__(self, addr, handler, False)
+ self.worker_id = worker_id
+ self.events = []
+
+ def append(self, event):
+ self.events.append(event)
+
+
+class HTTPServerThread(threading.Thread):
+ """thread for running the HTTP server. reusing the same socket for all threads"""
+ def __init__(self, i, sock, addr):
+ threading.Thread.__init__(self)
+ self.i = i
+ self.daemon = True
+ self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i)
+ self.httpd.socket = sock
+ # prevent the HTTP server from re-binding every handler
+ self.httpd.server_bind = self.server_close = lambda self: None
+ self.start()
+
+ def run(self):
+ try:
+ log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
+ self.httpd.serve_forever()
+ log.info('HTTP Server (%d) ended', self.i)
+ except Exception as error:
+ # could happen if the server r/w to a closing socket during shutdown
+ log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
+
+ def close(self):
+ self.httpd.shutdown()
+
+ def get_events(self):
+ return self.httpd.events
+
+
+class StreamingHTTPServer:
+ """multi-threaded http server class also holding list of events received into the handler
+ each thread has its own server, and all servers share the same socket"""
+ def __init__(self, host, port, num_workers=20):
+ addr = (host, port)
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.sock.bind(addr)
+ # maximum of 10 connection backlog on the listener
+ self.sock.listen(20)
+ self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
+
+ def verify_s3_events(self, keys, exact_match=False, deletions=False):
+ """verify stored s3 records agains a list of keys"""
+ events = []
+ for worker in self.workers:
+ events += worker.get_events()
+ verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+
+ def verify_events(self, keys, exact_match=False, deletions=False):
+ """verify stored events agains a list of keys"""
+ events = []
+ for worker in self.workers:
+ events += worker.get_events()
+ verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+
+ def close(self):
+ """close all workers in the http server and wait for it to finish"""
+ # make sure that the shared socket is closed
+ # this is needed in case that one of the threads is blocked on the socket
+ self.sock.shutdown(socket.SHUT_RDWR)
+ self.sock.close()
+ # wait for server threads to finish
+ for worker in self.workers:
+ worker.close()
+ worker.join()
+
+
+# AMQP endpoint functions
+
+rabbitmq_port = 5672
+
+class AMQPReceiver(object):
+ """class for receiving and storing messages on a topic from the AMQP broker"""
+ def __init__(self, exchange, topic):
+ import pika
+ hostname = get_ip()
+ retries = 20
+ connect_ok = False
+ while retries > 0:
+ try:
+ connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+ connect_ok = True
+ break
+ except Exception as error:
+ retries -= 1
+ print 'AMQP receiver failed to connect (try %d): %s' % (10 - retries, str(error))
+ log.info('AMQP receiver failed to connect (try %d): %s', 10 - retries, str(error))
+ time.sleep(2)
+
+ if connect_ok == False:
+ raise error
+
+ self.channel = connection.channel()
+ self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
+ result = self.channel.queue_declare('', exclusive=True)
+ queue_name = result.method.queue
+ self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
+ self.channel.basic_consume(queue=queue_name,
+ on_message_callback=self.on_message,
+ auto_ack=True)
+ self.events = []
+
+ def on_message(self, ch, method, properties, body):
+ """callback invoked when a new message arrive on the topic"""
+ log.info('AMQP received event: %s', body)
+ self.events.append(json.loads(body))
+
+ # TODO create a base class for the AMQP and HTTP cases
+ def verify_s3_events(self, keys, exact_match=False, deletions=False):
+ """verify stored s3 records agains a list of keys"""
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+ self.events = []
+
+ def verify_events(self, keys, exact_match=False, deletions=False):
+ """verify stored events agains a list of keys"""
+ verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+ self.events = []
+
+
+def amqp_receiver_thread_runner(receiver):
+ """main thread function for the amqp receiver"""
+ try:
+ log.info('AMQP receiver started')
+ receiver.channel.start_consuming()
+ log.info('AMQP receiver ended')
+ except Exception as error:
+ log.info('AMQP receiver ended unexpectedly: %s', str(error))
+
+
+def create_amqp_receiver_thread(exchange, topic):
+ """create amqp receiver and thread"""
+ receiver = AMQPReceiver(exchange, topic)
+ task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
+ task.daemon = True
+ return task, receiver
+
+
+def stop_amqp_receiver(receiver, task):
+ """stop the receiver thread and wait for it to finis"""
+ try:
+ receiver.channel.stop_consuming()
+ log.info('stopping AMQP receiver')
+ except Exception as error:
+ log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
+ task.join(5)
def check_ps_configured():
"""check if at least one pubsub zone exist"""
key_found = False
for event in events:
if event['info']['bucket']['name'] == key.bucket.name and \
- event['info']['key']['name'] == key.name:
+ event['info']['key']['name'] == key.name:
if deletions and event['event'] == 'OBJECT_DELETE':
key_found = True
break
key_found = False
for record in records:
if record['s3']['bucket']['name'] == key.bucket.name and \
- record['s3']['object']['key'] == key.name:
+ record['s3']['object']['key'] == key.name:
if deletions and record['eventName'] == 'ObjectRemoved':
key_found = True
break
assert False, err
+def init_rabbitmq():
+ """ start a rabbitmq broker """
+ hostname = get_ip()
+ #port = str(random.randint(20000, 30000))
+ #data_dir = './' + port + '_data'
+ #log_dir = './' + port + '_log'
+ #print('')
+ #try:
+ # os.mkdir(data_dir)
+ # os.mkdir(log_dir)
+ #except:
+ # print('rabbitmq directories already exists')
+ #env = {'RABBITMQ_NODE_PORT': port,
+ # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
+ # 'RABBITMQ_USE_LONGNAME': 'true',
+ # 'RABBITMQ_MNESIA_BASE': data_dir,
+ # 'RABBITMQ_LOG_BASE': log_dir}
+ # TODO: support multiple brokers per host using env
+ # make sure we don't collide with the default
+ try:
+ proc = subprocess.Popen('rabbitmq-server')
+ except Exception as error:
+ log.info('failed to execute rabbitmq-server: %s', str(error))
+ print 'failed to execute rabbitmq-server: %s' % str(error)
+ return None
+ # TODO add rabbitmq checkpoint instead of sleep
+ time.sleep(5)
+ return proc #, port, data_dir, log_dir
+
+
+def clean_rabbitmq(proc): #, data_dir, log_dir)
+ """ stop the rabbitmq broker """
+ try:
+ subprocess.call(['rabbitmqctl', 'stop'])
+ time.sleep(5)
+ proc.terminate()
+ except:
+ log.info('rabbitmq server already terminated')
+ # TODO: add directory cleanup once multiple brokers are supported
+ #try:
+ # os.rmdir(data_dir)
+ # os.rmdir(log_dir)
+ #except:
+ # log.info('rabbitmq directories already removed')
+
+
def init_env():
"""initialize the environment"""
check_ps_configured()
return zones, ps_zones
+def get_ip():
+ """ This method returns the "primary" IP on the local box (the one with a default route)
+ source: https://stackoverflow.com/a/28950776/711085
+ this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ # address should not be reachable
+ s.connect(('10.255.255.255', 1))
+ ip = s.getsockname()[0]
+ finally:
+ s.close()
+ return ip
+
+
TOPIC_SUFFIX = "_topic"
SUB_SUFFIX = "_sub"
NOTIFICATION_SUFFIX = "_notif"
for i in range(number_of_objects):
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
- print('Zonegroup: ' + zonegroup.name)
- print('user: ' + get_user())
- print('tenant: ' + get_tenant())
- print('Master Zone')
+ print 'Zonegroup: ' + zonegroup.name
+ print 'user: ' + get_user()
+ print 'tenant: ' + get_tenant()
+ print 'Master Zone'
print_connection_info(zones[0].conn)
- print('PubSub Zone')
+ print 'PubSub Zone'
print_connection_info(ps_zones[0].conn)
- print('Bucket: ' + bucket_name)
+ print 'Bucket: ' + bucket_name
def test_ps_s3_notification_low_level():
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
topic_conf_list = [{'Id': notification_name1,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf1.set_config()
assert_equal(status/100, 2)
topic_conf_list = [{'Id': notification_name2,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
- }]
+ }]
s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf2.set_config()
assert_equal(status/100, 2)
# create topic
dest_endpoint = 'amqp://localhost:7001'
dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name,
endpoint=dest_endpoint,
endpoint_args=dest_args)
_, status = topic_conf.set_config()
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
log.debug("Event (OBJECT_DELETE) synced")
-
+
# get the events from the creations subscription
result, _ = sub_create_conf.get_events()
parsed_result = json.loads(result)
def test_ps_push_http():
""" test pushing to http endpoint """
- return SkipTest("PubSub push tests are only manual")
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, port)
+
# create topic
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
_, status = topic_conf.set_config()
assert_equal(status/100, 2)
# create subscription
sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
- topic_name, endpoint='http://localhost:9001')
+ topic_name, endpoint='http://'+host+':'+str(port))
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
key.set_contents_from_string('bar')
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ # check http server
+ keys = list(bucket.list())
+ # TODO: use exact match
+ http_server.verify_events(keys, exact_match=False)
# delete objects from the bucket
for key in bucket.list():
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ # check http server
+ # TODO: use exact match
+ http_server.verify_events(keys, deletions=True, exact_match=False)
# cleanup
sub_conf.del_config()
notification_conf.del_config()
topic_conf.del_config()
zones[0].delete_bucket(bucket_name)
+ http_server.close()
def test_ps_s3_push_http():
- """ test pushing to http endpoint n s3 record format"""
- return SkipTest("PubSub push tests are only manual")
+ """ test pushing to http endpoint s3 record format"""
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, port)
+
# create topic
topic_conf = PSTopic(ps_zones[0].conn, topic_name,
- endpoint='http://localhost:9001')
+ endpoint='http://'+host+':'+str(port))
result, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(result)
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:*']
- }]
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
key.set_contents_from_string('bar')
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO check http server
+ # check http server
+ keys = list(bucket.list())
+ # TODO: use exact match
+ http_server.verify_s3_events(keys, exact_match=False)
# delete objects from the bucket
for key in bucket.list():
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO check http server
+ # check http server
+ # TODO: use exact match
+ http_server.verify_s3_events(keys, deletions=True, exact_match=False)
# cleanup
s3_notification_conf.del_config()
topic_conf.del_config()
zones[0].delete_bucket(bucket_name)
+ http_server.close()
def test_ps_push_amqp():
""" test pushing to amqp endpoint """
- return SkipTest("PubSub push tests are only manual")
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
_, status = topic_conf.set_config()
assert_equal(status/100, 2)
assert_equal(status/100, 2)
# create subscription
sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
- topic_name, endpoint='amqp://localhost',
- endpoint_args='amqp-exchange=ex1&amqp-ack-level=none')
+ topic_name, endpoint='amqp://'+hostname,
+ endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=none')
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
key.set_contents_from_string('bar')
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO check amqp receiver
+ # check amqp receiver
+ keys = list(bucket.list())
+ # TODO: use exact match
+ receiver.verify_events(keys, exact_match=False)
# delete objects from the bucket
for key in bucket.list():
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO check amqp receiver
+ # check amqp receiver
+ # TODO: use exact match
+ receiver.verify_events(keys, deletions=True, exact_match=False)
# cleanup
+ stop_amqp_receiver(receiver, task)
sub_conf.del_config()
notification_conf.del_config()
topic_conf.del_config()
zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
def test_ps_s3_push_amqp():
""" test pushing to amqp endpoint s3 record format"""
- return SkipTest("PubSub push tests are only manual")
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
topic_conf = PSTopic(ps_zones[0].conn, topic_name,
- endpoint='amqp://localhost',
- endpoint_args='amqp-exchange=ex1&amqp-ack-level=none')
+ endpoint='amqp://' + hostname,
+ endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(result)
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:*']
- }]
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
key.set_contents_from_string('bar')
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO check amqp receiver
+ # check amqp receiver
+ keys = list(bucket.list())
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, exact_match=False)
# delete objects from the bucket
for key in bucket.list():
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO check amqp receiver
+ # check amqp receiver
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, deletions=True, exact_match=False)
# cleanup
+ stop_amqp_receiver(receiver, task)
s3_notification_conf.del_config()
topic_conf.del_config()
zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
def test_ps_delete_bucket():
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
-
+
# create non-s3 notification
notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
topic_name)
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
try:
s3_notification_conf.set_config()
except:
- print('missing topic is expected')
+ log.info('missing topic is expected')
else:
assert 'missing topic is expected'
def test_ps_s3_topic_update():
""" test updating topic associated with a notification"""
- return SkipTest("PubSub push tests are only manual")
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ rabbit_proc = init_rabbitmq()
+ if rabbit_proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
- # create topic
- dest_endpoint1 = 'amqp://localhost'
- dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
- dest_endpoint2 = 'http://localhost:9001'
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
- endpoint=dest_endpoint1,
- endpoint_args=dest_args1)
+ # create amqp topic
+ hostname = get_ip()
+ exchange = 'ex1'
+ amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ amqp_task.start()
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ endpoint='amqp://' + hostname,
+ endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf.set_config()
+ assert_equal(status/100, 2)
parsed_result = json.loads(result)
topic_arn = parsed_result['arn']
- assert_equal(status/100, 2)
# get topic
result, _ = topic_conf.get_config()
# verify topic content
parsed_result = json.loads(result)
assert_equal(parsed_result['topic']['name'], topic_name)
- assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint1)
+ assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
+
+ # create http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(hostname, port)
# create bucket on the first of the rados zones
bucket = zones[0].create_bucket(bucket_name)
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO: check update to amqp
+ keys = list(bucket.list())
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, exact_match=False)
- # update the same topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
- endpoint=dest_endpoint2)
+ # update the same topic with new endpoint
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ endpoint='http://'+ hostname + ':' + str(port))
_, status = topic_conf.set_config()
assert_equal(status/100, 2)
# get topic
# verify topic content
parsed_result = json.loads(result)
assert_equal(parsed_result['topic']['name'], topic_name)
- assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint2)
+ assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
- # create more objects in the bucket
- number_of_objects = 10
+ # delete current objects and create new objects in the bucket
+ for key in bucket.list():
+ key.delete()
for i in range(number_of_objects):
key = bucket.new_key(str(i+100))
key.set_contents_from_string('bar')
# wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO: check it is still updating amqp
+ keys = list(bucket.list())
+ # verify that notifications are still sent to amqp
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, exact_match=False)
# update notification to update the endpoint from the topic
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- # create even more objects in the bucket
- number_of_objects = 10
+
+ # delete current objects and create new objects in the bucket
+ for key in bucket.list():
+ key.delete()
for i in range(number_of_objects):
key = bucket.new_key(str(i+200))
key.set_contents_from_string('bar')
# wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO: check that updates switched to http
+ keys = list(bucket.list())
+ # check that updates switched to http
+ # TODO: use exact match
+ http_server.verify_s3_events(keys, exact_match=False)
# cleanup
# delete objects from the bucket
+ stop_amqp_receiver(receiver, amqp_task)
for key in bucket.list():
key.delete()
s3_notification_conf.del_config()
topic_conf.del_config()
zones[0].delete_bucket(bucket_name)
+ http_server.close()
+ clean_rabbitmq(rabbit_proc)
def test_ps_s3_notification_update():
""" test updating the topic of a notification"""
- return SkipTest("PubSub push tests are only manual")
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ rabbit_proc = init_rabbitmq()
+ if rabbit_proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
+ topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
- # create first topic
- dest_endpoint1 = 'amqp://localhost'
- dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
- topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
- endpoint=dest_endpoint1,
- endpoint_args=dest_args1)
+ # create topics
+ # start amqp receiver in a separate thread
+ exchange = 'ex1'
+ amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
+ amqp_task.start()
+ # create random port for the http server
+ http_port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(hostname, http_port)
+
+ topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+ endpoint='amqp://' + hostname,
+ endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf1.set_config()
parsed_result = json.loads(result)
topic_arn1 = parsed_result['arn']
assert_equal(status/100, 2)
+ topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+ endpoint='http://'+hostname+':'+str(http_port))
+ result, status = topic_conf2.set_config()
+ parsed_result = json.loads(result)
+ topic_arn2 = parsed_result['arn']
+ assert_equal(status/100, 2)
# create bucket on the first of the rados zones
bucket = zones[0].create_bucket(bucket_name)
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
- # create s3 notification
+ # create s3 notification with topic1
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn1,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
key.set_contents_from_string('bar')
# wait for sync
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- result, _ = s3_notification_conf.get_config()
- # TODO: check updates to amqp
-
- # create another topic
- topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
- dest_endpoint2 = 'http://localhost:9001'
- topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
- endpoint=dest_endpoint2)
- result, status = topic_conf2.set_config()
- parsed_result = json.loads(result)
- topic_arn2 = parsed_result['arn']
- assert_equal(status/100, 2)
+ keys = list(bucket.list())
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, exact_match=False);
- # update notification to the new topic
+ # update notification to use topic2
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn2,
'Events': ['s3:ObjectCreated:*']
- }]
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- # create more objects in the bucket
- number_of_objects = 10
+
+ # delete current objects and create new objects in the bucket
+ for key in bucket.list():
+ key.delete()
for i in range(number_of_objects):
- key = bucket.new_key(str(i+200))
+ key = bucket.new_key(str(i+100))
key.set_contents_from_string('bar')
# wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
- # TODO: check uodate to http
- result, _ = s3_notification_conf.get_config()
+ keys = list(bucket.list())
+ # check that updates switched to http
+ # TODO: use exact match
+ http_server.verify_s3_events(keys, exact_match=False)
# cleanup
# delete objects from the bucket
+ stop_amqp_receiver(receiver, amqp_task)
for key in bucket.list():
key.delete()
s3_notification_conf.del_config()
topic_conf1.del_config()
topic_conf2.del_config()
zones[0].delete_bucket(bucket_name)
+ http_server.close()
+ clean_rabbitmq(rabbit_proc)
def test_ps_s3_multiple_topics_notification():
""" test notification creation with multiple topics"""
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ rabbit_proc = init_rabbitmq()
+ if rabbit_proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
- # TODO: test via push endpoint (amqp+http)
topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
# create topics
- topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1)
+ # start amqp receiver in a separate thread
+ exchange = 'ex1'
+ amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
+ amqp_task.start()
+ # create random port for the http server
+ http_port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(hostname, http_port)
+
+ topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+ endpoint='amqp://' + hostname,
+ endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf1.set_config()
parsed_result = json.loads(result)
topic_arn1 = parsed_result['arn']
assert_equal(status/100, 2)
- topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2)
+ topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+ endpoint='http://'+hostname+':'+str(http_port))
result, status = topic_conf2.set_config()
parsed_result = json.loads(result)
topic_arn2 = parsed_result['arn']
notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
topic_conf_list = [
- {
- 'Id': notification_name1,
- 'TopicArn': topic_arn1,
- 'Events': ['s3:ObjectCreated:*']
- },
- {
- 'Id': notification_name2,
- 'TopicArn': topic_arn2,
- 'Events': ['s3:ObjectCreated:*']
- }]
+ {
+ 'Id': notification_name1,
+ 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:*']
+ },
+ {
+ 'Id': notification_name2,
+ 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# get auto-generated subscriptions
sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1,
- topic_name1)
+ topic_name1)
_, status = sub_conf1.get_config()
assert_equal(status/100, 2)
sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2,
- topic_name2)
+ topic_name2)
_, status = sub_conf2.get_config()
assert_equal(status/100, 2)
-
+
# create objects in the bucket
number_of_objects = 10
for i in range(number_of_objects):
keys = list(bucket.list())
# TODO: use exact match
verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
-
+ receiver.verify_s3_events(keys, exact_match=False)
+
result, _ = sub_conf2.get_events()
parsed_result = json.loads(result)
for record in parsed_result['Records']:
keys = list(bucket.list())
# TODO: use exact match
verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
-
+ http_server.verify_s3_events(keys, exact_match=True)
+
# cleanup
+ stop_amqp_receiver(receiver, amqp_task)
s3_notification_conf.del_config()
topic_conf1.del_config()
topic_conf2.del_config()
for key in bucket.list():
key.delete()
zones[0].delete_bucket(bucket_name)
+ http_server.close()
+ clean_rabbitmq(rabbit_proc)