From abfd6f72db3b6e1a797a660a4a601a33605e5f25 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Tue, 25 Jun 2019 07:44:40 +0300 Subject: [PATCH] rgw/pubsub: fix amqp topic bug. add disabled end2end push tests Signed-off-by: Yuval Lifshitz --- src/rgw/rgw_pubsub.cc | 1 + src/rgw/rgw_pubsub.h | 9 +- src/rgw/rgw_sync_module_pubsub.cc | 7 +- src/rgw/rgw_sync_module_pubsub_rest.cc | 2 + src/test/rgw/rgw_multi/tests_ps.py | 587 ++++++++++++++++++++----- 5 files changed, 494 insertions(+), 112 deletions(-) diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 5e149a4ca64..a2f16354637 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -149,6 +149,7 @@ void rgw_pubsub_sub_dest::dump(Formatter *f) const encode_json("oid_prefix", oid_prefix, f); encode_json("push_endpoint", push_endpoint, f); encode_json("push_endpoint_args", push_endpoint_args, f); + encode_json("arn_topic", arn_topic, f); } void rgw_pubsub_sub_config::dump(Formatter *f) const diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 14eb51ee2a3..72773b404e7 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -228,24 +228,29 @@ struct rgw_pubsub_sub_dest { std::string oid_prefix; std::string push_endpoint; std::string push_endpoint_args; + std::string arn_topic; void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(bucket_name, bl); encode(oid_prefix, bl); encode(push_endpoint, bl); encode(push_endpoint_args, bl); + encode(arn_topic, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(2, bl); + DECODE_START(3, bl); decode(bucket_name, bl); decode(oid_prefix, bl); decode(push_endpoint, bl); if (struct_v >= 2) { decode(push_endpoint_args, bl); } + if (struct_v >= 3) { + decode(arn_topic, bl); + } DECODE_FINISH(bl); } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 720cce838fa..5c4a1788272 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -81,6 +81,7 @@ struct PSSubConfig { std::string data_bucket_name; std::string data_oid_prefix; std::string s3_id; + std::string arn_topic; RGWPubSubEndpoint::Ptr push_endpoint; void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) { @@ -90,10 +91,11 @@ struct PSSubConfig { data_bucket_name = uc.dest.bucket_name; data_oid_prefix = uc.dest.oid_prefix; s3_id = uc.s3_id; + arn_topic = uc.dest.arn_topic; if (!push_endpoint_name.empty()) { push_endpoint_args = uc.dest.push_endpoint_args; try { - push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, topic, string_to_args(push_endpoint_args), cct); + push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; } catch (const RGWPubSubEndpoint::configuration_error& e) { ldout(cct, 1) << "ERROR: failed to create push endpoint: " @@ -122,10 +124,11 @@ struct PSSubConfig { data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str()); s3_id = config["s3_id"]; + arn_topic = config["arn_topic"]; if (!push_endpoint_name.empty()) { push_endpoint_args = config["push_endpoint_args"]; try { - push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, topic, string_to_args(push_endpoint_args), cct); + push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; } catch (const RGWPubSubEndpoint::configuration_error& e) { ldout(cct, 1) << "ERROR: failed to create push endpoint: " diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index 447171d4aeb..d9c5c34cd19 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -67,6 +67,7 @@ public: // bucket to store events/records will be set only when subscription is created dest.bucket_name = ""; dest.oid_prefix = ""; + dest.arn_topic = topic_name; // the topic ARN will be sent in the reply const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, store->svc.zone->get_zonegroup().get_name(), @@ -354,6 +355,7 @@ public: dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name; dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/"; dest.push_endpoint_args = s->info.args.get_str(); + dest.arn_topic = topic_name; return 0; } diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 3deca9c078c..e9a923b845a 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -1,6 +1,13 @@ import logging import json import tempfile +import BaseHTTPServer +import SocketServer +import random +import threading +import subprocess +import socket +import time from .tests import get_realm, \ ZonegroupConns, \ zonegroup_meta_checkpoint, \ @@ -20,10 +27,188 @@ from nose.tools import assert_not_equal, assert_equal # configure logging for the tests module log = logging.getLogger(__name__) +skip_push_tests = True + #################################### # utility functions for pubsub tests #################################### +# HTTP endpoint functions +# multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/ + +class HTTPPostHandler(BaseHTTPServer.BaseHTTPRequestHandler): + """HTTP POST hanler class storing the received events in its http server""" + def do_POST(self): + """implementation of POST handler""" + try: + content_length = int(self.headers['Content-Length']) + body = self.rfile.read(content_length) + log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body)) + self.server.append(json.loads(body)) + except: + log.error('HTTP Server received empty event') + self.send_response(400) + else: + self.send_response(100) + finally: + self.end_headers() + + +class HTTPServerWithEvents(BaseHTTPServer.HTTPServer): + """HTTP server used by the handler to store events""" + def __init__(self, addr, handler, worker_id): + BaseHTTPServer.HTTPServer.__init__(self, addr, handler, False) + self.worker_id = worker_id + self.events = [] + + def append(self, event): + self.events.append(event) + + +class HTTPServerThread(threading.Thread): + """thread for running the HTTP server. reusing the same socket for all threads""" + def __init__(self, i, sock, addr): + threading.Thread.__init__(self) + self.i = i + self.daemon = True + self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i) + self.httpd.socket = sock + # prevent the HTTP server from re-binding every handler + self.httpd.server_bind = self.server_close = lambda self: None + self.start() + + def run(self): + try: + log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address) + self.httpd.serve_forever() + log.info('HTTP Server (%d) ended', self.i) + except Exception as error: + # could happen if the server r/w to a closing socket during shutdown + log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error)) + + def close(self): + self.httpd.shutdown() + + def get_events(self): + return self.httpd.events + + +class StreamingHTTPServer: + """multi-threaded http server class also holding list of events received into the handler + each thread has its own server, and all servers share the same socket""" + def __init__(self, host, port, num_workers=20): + addr = (host, port) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind(addr) + # maximum of 10 connection backlog on the listener + self.sock.listen(20) + self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)] + + def verify_s3_events(self, keys, exact_match=False, deletions=False): + """verify stored s3 records agains a list of keys""" + events = [] + for worker in self.workers: + events += worker.get_events() + verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions) + + def verify_events(self, keys, exact_match=False, deletions=False): + """verify stored events agains a list of keys""" + events = [] + for worker in self.workers: + events += worker.get_events() + verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions) + + def close(self): + """close all workers in the http server and wait for it to finish""" + # make sure that the shared socket is closed + # this is needed in case that one of the threads is blocked on the socket + self.sock.shutdown(socket.SHUT_RDWR) + self.sock.close() + # wait for server threads to finish + for worker in self.workers: + worker.close() + worker.join() + + +# AMQP endpoint functions + +rabbitmq_port = 5672 + +class AMQPReceiver(object): + """class for receiving and storing messages on a topic from the AMQP broker""" + def __init__(self, exchange, topic): + import pika + hostname = get_ip() + retries = 20 + connect_ok = False + while retries > 0: + try: + connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port)) + connect_ok = True + break + except Exception as error: + retries -= 1 + print 'AMQP receiver failed to connect (try %d): %s' % (10 - retries, str(error)) + log.info('AMQP receiver failed to connect (try %d): %s', 10 - retries, str(error)) + time.sleep(2) + + if connect_ok == False: + raise error + + self.channel = connection.channel() + self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) + result = self.channel.queue_declare('', exclusive=True) + queue_name = result.method.queue + self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic) + self.channel.basic_consume(queue=queue_name, + on_message_callback=self.on_message, + auto_ack=True) + self.events = [] + + def on_message(self, ch, method, properties, body): + """callback invoked when a new message arrive on the topic""" + log.info('AMQP received event: %s', body) + self.events.append(json.loads(body)) + + # TODO create a base class for the AMQP and HTTP cases + def verify_s3_events(self, keys, exact_match=False, deletions=False): + """verify stored s3 records agains a list of keys""" + verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) + self.events = [] + + def verify_events(self, keys, exact_match=False, deletions=False): + """verify stored events agains a list of keys""" + verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) + self.events = [] + + +def amqp_receiver_thread_runner(receiver): + """main thread function for the amqp receiver""" + try: + log.info('AMQP receiver started') + receiver.channel.start_consuming() + log.info('AMQP receiver ended') + except Exception as error: + log.info('AMQP receiver ended unexpectedly: %s', str(error)) + + +def create_amqp_receiver_thread(exchange, topic): + """create amqp receiver and thread""" + receiver = AMQPReceiver(exchange, topic) + task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,)) + task.daemon = True + return task, receiver + + +def stop_amqp_receiver(receiver, task): + """stop the receiver thread and wait for it to finis""" + try: + receiver.channel.stop_consuming() + log.info('stopping AMQP receiver') + except Exception as error: + log.info('failed to gracefuly stop AMQP receiver: %s', str(error)) + task.join(5) def check_ps_configured(): """check if at least one pubsub zone exist""" @@ -49,7 +234,7 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False): key_found = False for event in events: if event['info']['bucket']['name'] == key.bucket.name and \ - event['info']['key']['name'] == key.name: + event['info']['key']['name'] == key.name: if deletions and event['event'] == 'OBJECT_DELETE': key_found = True break @@ -76,7 +261,7 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa key_found = False for record in records: if record['s3']['bucket']['name'] == key.bucket.name and \ - record['s3']['object']['key'] == key.name: + record['s3']['object']['key'] == key.name: if deletions and record['eventName'] == 'ObjectRemoved': key_found = True break @@ -98,6 +283,52 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa assert False, err +def init_rabbitmq(): + """ start a rabbitmq broker """ + hostname = get_ip() + #port = str(random.randint(20000, 30000)) + #data_dir = './' + port + '_data' + #log_dir = './' + port + '_log' + #print('') + #try: + # os.mkdir(data_dir) + # os.mkdir(log_dir) + #except: + # print('rabbitmq directories already exists') + #env = {'RABBITMQ_NODE_PORT': port, + # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname, + # 'RABBITMQ_USE_LONGNAME': 'true', + # 'RABBITMQ_MNESIA_BASE': data_dir, + # 'RABBITMQ_LOG_BASE': log_dir} + # TODO: support multiple brokers per host using env + # make sure we don't collide with the default + try: + proc = subprocess.Popen('rabbitmq-server') + except Exception as error: + log.info('failed to execute rabbitmq-server: %s', str(error)) + print 'failed to execute rabbitmq-server: %s' % str(error) + return None + # TODO add rabbitmq checkpoint instead of sleep + time.sleep(5) + return proc #, port, data_dir, log_dir + + +def clean_rabbitmq(proc): #, data_dir, log_dir) + """ stop the rabbitmq broker """ + try: + subprocess.call(['rabbitmqctl', 'stop']) + time.sleep(5) + proc.terminate() + except: + log.info('rabbitmq server already terminated') + # TODO: add directory cleanup once multiple brokers are supported + #try: + # os.rmdir(data_dir) + # os.rmdir(log_dir) + #except: + # log.info('rabbitmq directories already removed') + + def init_env(): """initialize the environment""" check_ps_configured() @@ -122,6 +353,20 @@ def init_env(): return zones, ps_zones +def get_ip(): + """ This method returns the "primary" IP on the local box (the one with a default route) + source: https://stackoverflow.com/a/28950776/711085 + this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """ + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # address should not be reachable + s.connect(('10.255.255.255', 1)) + ip = s.getsockname()[0] + finally: + s.close() + return ip + + TOPIC_SUFFIX = "_topic" SUB_SUFFIX = "_sub" NOTIFICATION_SUFFIX = "_notif" @@ -144,14 +389,14 @@ def test_ps_info(): for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') - print('Zonegroup: ' + zonegroup.name) - print('user: ' + get_user()) - print('tenant: ' + get_tenant()) - print('Master Zone') + print 'Zonegroup: ' + zonegroup.name + print 'user: ' + get_user() + print 'tenant: ' + get_tenant() + print 'Master Zone' print_connection_info(zones[0].conn) - print('PubSub Zone') + print 'PubSub Zone' print_connection_info(ps_zones[0].conn) - print('Bucket: ' + bucket_name) + print 'Bucket: ' + bucket_name def test_ps_s3_notification_low_level(): @@ -175,7 +420,7 @@ def test_ps_s3_notification_low_level(): topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -247,7 +492,7 @@ def test_ps_s3_notification_records(): topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -304,7 +549,7 @@ def test_ps_s3_notification(): topic_conf_list = [{'Id': notification_name1, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() assert_equal(status/100, 2) @@ -313,7 +558,7 @@ def test_ps_s3_notification(): topic_conf_list = [{'Id': notification_name2, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] - }] + }] s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() assert_equal(status/100, 2) @@ -387,7 +632,7 @@ def test_ps_topic_with_endpoint(): # create topic dest_endpoint = 'amqp://localhost:7001' dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf = PSTopic(ps_zones[0].conn, topic_name, + topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=dest_endpoint, endpoint_args=dest_args) _, status = topic_conf.set_config() @@ -638,7 +883,7 @@ def test_ps_event_type_subscription(): # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) log.debug("Event (OBJECT_DELETE) synced") - + # get the events from the creations subscription result, _ = sub_create_conf.get_events() parsed_result = json.loads(result) @@ -935,11 +1180,18 @@ def test_ps_versioned_deletion(): def test_ps_push_http(): """ test pushing to http endpoint """ - return SkipTest("PubSub push tests are only manual") + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + http_server = StreamingHTTPServer(host, port) + # create topic topic_conf = PSTopic(ps_zones[0].conn, topic_name) _, status = topic_conf.set_config() @@ -955,7 +1207,7 @@ def test_ps_push_http(): assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, - topic_name, endpoint='http://localhost:9001') + topic_name, endpoint='http://'+host+':'+str(port)) _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -965,6 +1217,10 @@ def test_ps_push_http(): key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + # check http server + keys = list(bucket.list()) + # TODO: use exact match + http_server.verify_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): @@ -972,24 +1228,35 @@ def test_ps_push_http(): # wait for sync zone_meta_checkpoint(ps_zones[0].zone) zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + # check http server + # TODO: use exact match + http_server.verify_events(keys, deletions=True, exact_match=False) # cleanup sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() zones[0].delete_bucket(bucket_name) + http_server.close() def test_ps_s3_push_http(): - """ test pushing to http endpoint n s3 record format""" - return SkipTest("PubSub push tests are only manual") + """ test pushing to http endpoint s3 record format""" + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + http_server = StreamingHTTPServer(host, port) + # create topic topic_conf = PSTopic(ps_zones[0].conn, topic_name, - endpoint='http://localhost:9001') + endpoint='http://'+host+':'+str(port)) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) @@ -1002,8 +1269,8 @@ def test_ps_s3_push_http(): notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, - 'Events': ['s3:ObjectCreated:*'] - }] + 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1014,7 +1281,10 @@ def test_ps_s3_push_http(): key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO check http server + # check http server + keys = list(bucket.list()) + # TODO: use exact match + http_server.verify_s3_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): @@ -1022,22 +1292,33 @@ def test_ps_s3_push_http(): # wait for sync zone_meta_checkpoint(ps_zones[0].zone) zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO check http server + # check http server + # TODO: use exact match + http_server.verify_s3_events(keys, deletions=True, exact_match=False) # cleanup s3_notification_conf.del_config() topic_conf.del_config() zones[0].delete_bucket(bucket_name) + http_server.close() def test_ps_push_amqp(): """ test pushing to amqp endpoint """ - return SkipTest("PubSub push tests are only manual") + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() topic_conf = PSTopic(ps_zones[0].conn, topic_name) _, status = topic_conf.set_config() assert_equal(status/100, 2) @@ -1052,8 +1333,8 @@ def test_ps_push_amqp(): assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, - topic_name, endpoint='amqp://localhost', - endpoint_args='amqp-exchange=ex1&amqp-ack-level=none') + topic_name, endpoint='amqp://'+hostname, + endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=none') _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -1063,7 +1344,10 @@ def test_ps_push_amqp(): key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO check amqp receiver + # check amqp receiver + keys = list(bucket.list()) + # TODO: use exact match + receiver.verify_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): @@ -1071,26 +1355,38 @@ def test_ps_push_amqp(): # wait for sync zone_meta_checkpoint(ps_zones[0].zone) zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO check amqp receiver + # check amqp receiver + # TODO: use exact match + receiver.verify_events(keys, deletions=True, exact_match=False) # cleanup + stop_amqp_receiver(receiver, task) sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() zones[0].delete_bucket(bucket_name) + clean_rabbitmq(proc) def test_ps_s3_push_amqp(): """ test pushing to amqp endpoint s3 record format""" - return SkipTest("PubSub push tests are only manual") + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() topic_conf = PSTopic(ps_zones[0].conn, topic_name, - endpoint='amqp://localhost', - endpoint_args='amqp-exchange=ex1&amqp-ack-level=none') + endpoint='amqp://' + hostname, + endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) @@ -1103,8 +1399,8 @@ def test_ps_s3_push_amqp(): notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, - 'Events': ['s3:ObjectCreated:*'] - }] + 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1115,7 +1411,10 @@ def test_ps_s3_push_amqp(): key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO check amqp receiver + # check amqp receiver + keys = list(bucket.list()) + # TODO: use exact match + receiver.verify_s3_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): @@ -1123,12 +1422,16 @@ def test_ps_s3_push_amqp(): # wait for sync zone_meta_checkpoint(ps_zones[0].zone) zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO check amqp receiver + # check amqp receiver + # TODO: use exact match + receiver.verify_s3_events(keys, deletions=True, exact_match=False) # cleanup + stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() zones[0].delete_bucket(bucket_name) + clean_rabbitmq(proc) def test_ps_delete_bucket(): @@ -1152,11 +1455,11 @@ def test_ps_delete_bucket(): topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) - + # create non-s3 notification notification_conf = PSNotification(ps_zones[0].conn, bucket_name, topic_name) @@ -1216,12 +1519,12 @@ def test_ps_missing_topic(): topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) try: s3_notification_conf.set_config() except: - print('missing topic is expected') + log.info('missing topic is expected') else: assert 'missing topic is expected' @@ -1231,28 +1534,38 @@ def test_ps_missing_topic(): def test_ps_s3_topic_update(): """ test updating topic associated with a notification""" - return SkipTest("PubSub push tests are only manual") + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + rabbit_proc = init_rabbitmq() + if rabbit_proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX - # create topic - dest_endpoint1 = 'amqp://localhost' - dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none' - dest_endpoint2 = 'http://localhost:9001' - topic_conf = PSTopic(ps_zones[0].conn, topic_name, - endpoint=dest_endpoint1, - endpoint_args=dest_args1) + # create amqp topic + hostname = get_ip() + exchange = 'ex1' + amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name) + amqp_task.start() + topic_conf = PSTopic(ps_zones[0].conn, topic_name, + endpoint='amqp://' + hostname, + endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf.set_config() + assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] - assert_equal(status/100, 2) # get topic result, _ = topic_conf.get_config() # verify topic content parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) - assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint1) + assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) + + # create http server + port = random.randint(10000, 20000) + # start an http server in a separate thread + http_server = StreamingHTTPServer(hostname, port) # create bucket on the first of the rados zones bucket = zones[0].create_bucket(bucket_name) @@ -1263,7 +1576,7 @@ def test_ps_s3_topic_update(): topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1275,11 +1588,13 @@ def test_ps_s3_topic_update(): # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO: check update to amqp + keys = list(bucket.list()) + # TODO: use exact match + receiver.verify_s3_events(keys, exact_match=False) - # update the same topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name, - endpoint=dest_endpoint2) + # update the same topic with new endpoint + topic_conf = PSTopic(ps_zones[0].conn, topic_name, + endpoint='http://'+ hostname + ':' + str(port)) _, status = topic_conf.set_config() assert_equal(status/100, 2) # get topic @@ -1287,73 +1602,107 @@ def test_ps_s3_topic_update(): # verify topic content parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) - assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint2) + assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) - # create more objects in the bucket - number_of_objects = 10 + # delete current objects and create new objects in the bucket + for key in bucket.list(): + key.delete() for i in range(number_of_objects): key = bucket.new_key(str(i+100)) key.set_contents_from_string('bar') # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO: check it is still updating amqp + keys = list(bucket.list()) + # verify that notifications are still sent to amqp + # TODO: use exact match + receiver.verify_s3_events(keys, exact_match=False) # update notification to update the endpoint from the topic topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) - # create even more objects in the bucket - number_of_objects = 10 + + # delete current objects and create new objects in the bucket + for key in bucket.list(): + key.delete() for i in range(number_of_objects): key = bucket.new_key(str(i+200)) key.set_contents_from_string('bar') # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO: check that updates switched to http + keys = list(bucket.list()) + # check that updates switched to http + # TODO: use exact match + http_server.verify_s3_events(keys, exact_match=False) # cleanup # delete objects from the bucket + stop_amqp_receiver(receiver, amqp_task) for key in bucket.list(): key.delete() s3_notification_conf.del_config() topic_conf.del_config() zones[0].delete_bucket(bucket_name) + http_server.close() + clean_rabbitmq(rabbit_proc) def test_ps_s3_notification_update(): """ test updating the topic of a notification""" - return SkipTest("PubSub push tests are only manual") + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + rabbit_proc = init_rabbitmq() + if rabbit_proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + zones, ps_zones = init_env() bucket_name = gen_bucket_name() topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX + topic_name2 = bucket_name+'http'+TOPIC_SUFFIX - # create first topic - dest_endpoint1 = 'amqp://localhost' - dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none' - topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, - endpoint=dest_endpoint1, - endpoint_args=dest_args1) + # create topics + # start amqp receiver in a separate thread + exchange = 'ex1' + amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) + amqp_task.start() + # create random port for the http server + http_port = random.randint(10000, 20000) + # start an http server in a separate thread + http_server = StreamingHTTPServer(hostname, http_port) + + topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, + endpoint='amqp://' + hostname, + endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf1.set_config() parsed_result = json.loads(result) topic_arn1 = parsed_result['arn'] assert_equal(status/100, 2) + topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, + endpoint='http://'+hostname+':'+str(http_port)) + result, status = topic_conf2.set_config() + parsed_result = json.loads(result) + topic_arn2 = parsed_result['arn'] + assert_equal(status/100, 2) # create bucket on the first of the rados zones bucket = zones[0].create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zones[0].zone) - # create s3 notification + # create s3 notification with topic1 notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1364,64 +1713,81 @@ def test_ps_s3_notification_update(): key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - result, _ = s3_notification_conf.get_config() - # TODO: check updates to amqp - - # create another topic - topic_name2 = bucket_name+'http'+TOPIC_SUFFIX - dest_endpoint2 = 'http://localhost:9001' - topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, - endpoint=dest_endpoint2) - result, status = topic_conf2.set_config() - parsed_result = json.loads(result) - topic_arn2 = parsed_result['arn'] - assert_equal(status/100, 2) + keys = list(bucket.list()) + # TODO: use exact match + receiver.verify_s3_events(keys, exact_match=False); - # update notification to the new topic + # update notification to use topic2 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, 'Events': ['s3:ObjectCreated:*'] - }] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) - # create more objects in the bucket - number_of_objects = 10 + + # delete current objects and create new objects in the bucket + for key in bucket.list(): + key.delete() for i in range(number_of_objects): - key = bucket.new_key(str(i+200)) + key = bucket.new_key(str(i+100)) key.set_contents_from_string('bar') # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) - # TODO: check uodate to http - result, _ = s3_notification_conf.get_config() + keys = list(bucket.list()) + # check that updates switched to http + # TODO: use exact match + http_server.verify_s3_events(keys, exact_match=False) # cleanup # delete objects from the bucket + stop_amqp_receiver(receiver, amqp_task) for key in bucket.list(): key.delete() s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() zones[0].delete_bucket(bucket_name) + http_server.close() + clean_rabbitmq(rabbit_proc) def test_ps_s3_multiple_topics_notification(): """ test notification creation with multiple topics""" + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + rabbit_proc = init_rabbitmq() + if rabbit_proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + zones, ps_zones = init_env() bucket_name = gen_bucket_name() - # TODO: test via push endpoint (amqp+http) topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX topic_name2 = bucket_name+'http'+TOPIC_SUFFIX # create topics - topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1) + # start amqp receiver in a separate thread + exchange = 'ex1' + amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) + amqp_task.start() + # create random port for the http server + http_port = random.randint(10000, 20000) + # start an http server in a separate thread + http_server = StreamingHTTPServer(hostname, http_port) + + topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, + endpoint='amqp://' + hostname, + endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf1.set_config() parsed_result = json.loads(result) topic_arn1 = parsed_result['arn'] assert_equal(status/100, 2) - topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2) + topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, + endpoint='http://'+hostname+':'+str(http_port)) result, status = topic_conf2.set_config() parsed_result = json.loads(result) topic_arn2 = parsed_result['arn'] @@ -1435,16 +1801,16 @@ def test_ps_s3_multiple_topics_notification(): notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' topic_conf_list = [ - { - 'Id': notification_name1, - 'TopicArn': topic_arn1, - 'Events': ['s3:ObjectCreated:*'] - }, - { - 'Id': notification_name2, - 'TopicArn': topic_arn2, - 'Events': ['s3:ObjectCreated:*'] - }] + { + 'Id': notification_name1, + 'TopicArn': topic_arn1, + 'Events': ['s3:ObjectCreated:*'] + }, + { + 'Id': notification_name2, + 'TopicArn': topic_arn2, + 'Events': ['s3:ObjectCreated:*'] + }] s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1455,14 +1821,14 @@ def test_ps_s3_multiple_topics_notification(): # get auto-generated subscriptions sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1, - topic_name1) + topic_name1) _, status = sub_conf1.get_config() assert_equal(status/100, 2) sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2, - topic_name2) + topic_name2) _, status = sub_conf2.get_config() assert_equal(status/100, 2) - + # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): @@ -1479,7 +1845,8 @@ def test_ps_s3_multiple_topics_notification(): keys = list(bucket.list()) # TODO: use exact match verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) - + receiver.verify_s3_events(keys, exact_match=False) + result, _ = sub_conf2.get_events() parsed_result = json.loads(result) for record in parsed_result['Records']: @@ -1487,8 +1854,10 @@ def test_ps_s3_multiple_topics_notification(): keys = list(bucket.list()) # TODO: use exact match verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) - + http_server.verify_s3_events(keys, exact_match=True) + # cleanup + stop_amqp_receiver(receiver, amqp_task) s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() @@ -1496,3 +1865,5 @@ def test_ps_s3_multiple_topics_notification(): for key in bucket.list(): key.delete() zones[0].delete_bucket(bucket_name) + http_server.close() + clean_rabbitmq(rabbit_proc) -- 2.39.5