]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: fix amqp topic bug. add disabled end2end push tests 28910/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Tue, 25 Jun 2019 04:44:40 +0000 (07:44 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Sun, 7 Jul 2019 10:48:40 +0000 (13:48 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/test/rgw/rgw_multi/tests_ps.py

index 5e149a4ca64d018fffd675e515c87bf248f04604..a2f163546377a54f42325d753ae3676ac3fed27c 100644 (file)
@@ -149,6 +149,7 @@ void rgw_pubsub_sub_dest::dump(Formatter *f) const
   encode_json("oid_prefix", oid_prefix, f);
   encode_json("push_endpoint", push_endpoint, f);
   encode_json("push_endpoint_args", push_endpoint_args, f);
+  encode_json("arn_topic", arn_topic, f);
 }
 
 void rgw_pubsub_sub_config::dump(Formatter *f) const
index 14eb51ee2a3e68e29f5e7f1ca362ec5dd1b4a4c4..72773b404e7b96c60cfa3a4c8a2d709c20d4b1f1 100644 (file)
@@ -228,24 +228,29 @@ struct rgw_pubsub_sub_dest {
   std::string oid_prefix;
   std::string push_endpoint;
   std::string push_endpoint_args;
+  std::string arn_topic;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(bucket_name, bl);
     encode(oid_prefix, bl);
     encode(push_endpoint, bl);
     encode(push_endpoint_args, bl);
+    encode(arn_topic, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(bucket_name, bl);
     decode(oid_prefix, bl);
     decode(push_endpoint, bl);
     if (struct_v >= 2) {
         decode(push_endpoint_args, bl);
     }
+    if (struct_v >= 3) {
+        decode(arn_topic, bl);
+    }
     DECODE_FINISH(bl);
   }
 
index 720cce838fa51a107a5760feecf89b7ffb563064..5c4a1788272ee674104d189cdb9b954ab863d654 100644 (file)
@@ -81,6 +81,7 @@ struct PSSubConfig {
   std::string data_bucket_name;
   std::string data_oid_prefix;
   std::string s3_id;
+  std::string arn_topic;
   RGWPubSubEndpoint::Ptr push_endpoint;
 
   void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
@@ -90,10 +91,11 @@ struct PSSubConfig {
     data_bucket_name = uc.dest.bucket_name;
     data_oid_prefix = uc.dest.oid_prefix;
     s3_id = uc.s3_id;
+    arn_topic = uc.dest.arn_topic;
     if (!push_endpoint_name.empty()) {
       push_endpoint_args = uc.dest.push_endpoint_args;
       try {
-        push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, topic, string_to_args(push_endpoint_args), cct);
+        push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
         ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
       } catch (const RGWPubSubEndpoint::configuration_error& e) {
           ldout(cct, 1) << "ERROR: failed to create push endpoint: " 
@@ -122,10 +124,11 @@ struct PSSubConfig {
     data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
     data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
     s3_id = config["s3_id"];
+    arn_topic = config["arn_topic"];
     if (!push_endpoint_name.empty()) {
       push_endpoint_args = config["push_endpoint_args"];
       try {
-        push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, topic, string_to_args(push_endpoint_args), cct);
+        push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
         ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
       } catch (const RGWPubSubEndpoint::configuration_error& e) {
         ldout(cct, 1) << "ERROR: failed to create push endpoint: " 
index 447171d4aeb3140d1d7222ac08462dc93213bdcc..d9c5c34cd19058bfcb157a593800e60fcb46fa75 100644 (file)
@@ -67,6 +67,7 @@ public:
     // bucket to store events/records will be set only when subscription is created
     dest.bucket_name = "";
     dest.oid_prefix = "";
+    dest.arn_topic = topic_name;
     // the topic ARN will be sent in the reply
     const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
         store->svc.zone->get_zonegroup().get_name(),
@@ -354,6 +355,7 @@ public:
     dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
     dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
     dest.push_endpoint_args = s->info.args.get_str();
+    dest.arn_topic = topic_name;
 
     return 0;
   }
index 3deca9c078c77e1a302462533a99b8efb1a40e11..e9a923b845ad36c93a46747feac44b70b2736fde 100644 (file)
@@ -1,6 +1,13 @@
 import logging
 import json
 import tempfile
+import BaseHTTPServer
+import SocketServer
+import random
+import threading
+import subprocess
+import socket
+import time
 from .tests import get_realm, \
     ZonegroupConns, \
     zonegroup_meta_checkpoint, \
@@ -20,10 +27,188 @@ from nose.tools import assert_not_equal, assert_equal
 # configure logging for the tests module
 log = logging.getLogger(__name__)
 
+skip_push_tests = True
+
 ####################################
 # utility functions for pubsub tests
 ####################################
 
+# HTTP endpoint functions
+# multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
+
+class HTTPPostHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+    """HTTP POST hanler class storing the received events in its http server"""
+    def do_POST(self):
+        """implementation of POST handler"""
+        try:
+            content_length = int(self.headers['Content-Length'])
+            body = self.rfile.read(content_length)
+            log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
+            self.server.append(json.loads(body))
+        except:
+            log.error('HTTP Server received empty event')
+            self.send_response(400)
+        else:
+            self.send_response(100)
+        finally:
+            self.end_headers()
+
+
+class HTTPServerWithEvents(BaseHTTPServer.HTTPServer):
+    """HTTP server used by the handler to store events"""
+    def __init__(self, addr, handler, worker_id):
+        BaseHTTPServer.HTTPServer.__init__(self, addr, handler, False)
+        self.worker_id = worker_id
+        self.events = []
+
+    def append(self, event):
+        self.events.append(event)
+
+
+class HTTPServerThread(threading.Thread):
+    """thread for running the HTTP server. reusing the same socket for all threads"""
+    def __init__(self, i, sock, addr):
+        threading.Thread.__init__(self)
+        self.i = i
+        self.daemon = True
+        self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i)
+        self.httpd.socket = sock
+        # prevent the HTTP server from re-binding every handler
+        self.httpd.server_bind = self.server_close = lambda self: None
+        self.start()
+
+    def run(self):
+        try:
+            log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
+            self.httpd.serve_forever()
+            log.info('HTTP Server (%d) ended', self.i)
+        except Exception as error:
+            # could happen if the server r/w to a closing socket during shutdown
+            log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
+
+    def close(self):
+        self.httpd.shutdown()
+
+    def get_events(self):
+        return self.httpd.events
+
+
+class StreamingHTTPServer:
+    """multi-threaded http server class also holding list of events received into the handler
+    each thread has its own server, and all servers share the same socket"""
+    def __init__(self, host, port, num_workers=20):
+        addr = (host, port)
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.sock.bind(addr)
+        # maximum of 10 connection backlog on the listener
+        self.sock.listen(20)
+        self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
+
+    def verify_s3_events(self, keys, exact_match=False, deletions=False):
+        """verify stored s3 records agains a list of keys"""
+        events = []
+        for worker in self.workers:
+            events += worker.get_events()
+        verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+
+    def verify_events(self, keys, exact_match=False, deletions=False):
+        """verify stored events agains a list of keys"""
+        events = []
+        for worker in self.workers:
+            events += worker.get_events()
+        verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+
+    def close(self):
+        """close all workers in the http server and wait for it to finish"""
+        # make sure that the shared socket is closed
+        # this is needed in case that one of the threads is blocked on the socket
+        self.sock.shutdown(socket.SHUT_RDWR)
+        self.sock.close()
+        # wait for server threads to finish
+        for worker in self.workers:
+            worker.close()
+            worker.join()
+
+
+# AMQP endpoint functions
+
+rabbitmq_port = 5672
+
+class AMQPReceiver(object):
+    """class for receiving and storing messages on a topic from the AMQP broker"""
+    def __init__(self, exchange, topic):
+        import pika
+        hostname = get_ip()
+        retries = 20
+        connect_ok = False
+        while retries > 0:
+            try:
+                connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+                connect_ok = True
+                break
+            except Exception as error:
+                retries -= 1
+                print 'AMQP receiver failed to connect (try %d): %s' % (10 - retries, str(error))
+                log.info('AMQP receiver failed to connect (try %d): %s', 10 - retries, str(error))
+                time.sleep(2)
+
+        if connect_ok == False:
+            raise error
+            
+        self.channel = connection.channel()
+        self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
+        result = self.channel.queue_declare('', exclusive=True)
+        queue_name = result.method.queue
+        self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
+        self.channel.basic_consume(queue=queue_name,
+                                   on_message_callback=self.on_message,
+                                   auto_ack=True)
+        self.events = []
+
+    def on_message(self, ch, method, properties, body):
+        """callback invoked when a new message arrive on the topic"""
+        log.info('AMQP received event: %s', body)
+        self.events.append(json.loads(body))
+
+    # TODO create a base class for the AMQP and HTTP cases
+    def verify_s3_events(self, keys, exact_match=False, deletions=False):
+        """verify stored s3 records agains a list of keys"""
+        verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+        self.events = []
+
+    def verify_events(self, keys, exact_match=False, deletions=False):
+        """verify stored events agains a list of keys"""
+        verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+        self.events = []
+
+
+def amqp_receiver_thread_runner(receiver):
+    """main thread function for the amqp receiver"""
+    try:
+        log.info('AMQP receiver started')
+        receiver.channel.start_consuming()
+        log.info('AMQP receiver ended')
+    except Exception as error:
+        log.info('AMQP receiver ended unexpectedly: %s', str(error))
+
+
+def create_amqp_receiver_thread(exchange, topic):
+    """create amqp receiver and thread"""
+    receiver = AMQPReceiver(exchange, topic)
+    task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
+    task.daemon = True
+    return task, receiver
+
+
+def stop_amqp_receiver(receiver, task):
+    """stop the receiver thread and wait for it to finis"""
+    try:
+        receiver.channel.stop_consuming()
+        log.info('stopping AMQP receiver')
+    except Exception as error:
+        log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
+    task.join(5)
 
 def check_ps_configured():
     """check if at least one pubsub zone exist"""
@@ -49,7 +234,7 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
         key_found = False
         for event in events:
             if event['info']['bucket']['name'] == key.bucket.name and \
-               event['info']['key']['name'] == key.name:
+                event['info']['key']['name'] == key.name:
                 if deletions and event['event'] == 'OBJECT_DELETE':
                     key_found = True
                     break
@@ -76,7 +261,7 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
         key_found = False
         for record in records:
             if record['s3']['bucket']['name'] == key.bucket.name and \
-               record['s3']['object']['key'] == key.name:
+                record['s3']['object']['key'] == key.name:
                 if deletions and record['eventName'] == 'ObjectRemoved':
                     key_found = True
                     break
@@ -98,6 +283,52 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
             assert False, err
 
 
+def init_rabbitmq():
+    """ start a rabbitmq broker """
+    hostname = get_ip()
+    #port = str(random.randint(20000, 30000))
+    #data_dir = './' + port + '_data'
+    #log_dir = './' + port + '_log'
+    #print('')
+    #try:
+    #    os.mkdir(data_dir)
+    #    os.mkdir(log_dir)
+    #except:
+    #    print('rabbitmq directories already exists')
+    #env = {'RABBITMQ_NODE_PORT': port,
+    #       'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
+    #       'RABBITMQ_USE_LONGNAME': 'true',
+    #       'RABBITMQ_MNESIA_BASE': data_dir,
+    #       'RABBITMQ_LOG_BASE': log_dir}
+    # TODO: support multiple brokers per host using env
+    # make sure we don't collide with the default
+    try:
+        proc = subprocess.Popen('rabbitmq-server')
+    except Exception as error:
+        log.info('failed to execute rabbitmq-server: %s', str(error))
+        print 'failed to execute rabbitmq-server: %s' % str(error)
+        return None
+    # TODO add rabbitmq checkpoint instead of sleep
+    time.sleep(5)
+    return proc #, port, data_dir, log_dir
+
+
+def clean_rabbitmq(proc): #, data_dir, log_dir)
+    """ stop the rabbitmq broker """
+    try:
+        subprocess.call(['rabbitmqctl', 'stop'])
+        time.sleep(5)
+        proc.terminate()
+    except:
+        log.info('rabbitmq server already terminated')
+    # TODO: add directory cleanup once multiple brokers are supported
+    #try:
+    #    os.rmdir(data_dir)
+    #    os.rmdir(log_dir)
+    #except:
+    #    log.info('rabbitmq directories already removed')
+
+
 def init_env():
     """initialize the environment"""
     check_ps_configured()
@@ -122,6 +353,20 @@ def init_env():
     return zones, ps_zones
 
 
+def get_ip():
+    """ This method returns the "primary" IP on the local box (the one with a default route)
+    source: https://stackoverflow.com/a/28950776/711085
+    this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    try:
+        # address should not be reachable
+        s.connect(('10.255.255.255', 1))
+        ip = s.getsockname()[0]
+    finally:
+        s.close()
+    return ip
+
+
 TOPIC_SUFFIX = "_topic"
 SUB_SUFFIX = "_sub"
 NOTIFICATION_SUFFIX = "_notif"
@@ -144,14 +389,14 @@ def test_ps_info():
     for i in range(number_of_objects):
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
-    print('Zonegroup: ' + zonegroup.name)
-    print('user: ' + get_user())
-    print('tenant: ' + get_tenant())
-    print('Master Zone')
+    print 'Zonegroup: ' + zonegroup.name
+    print 'user: ' + get_user()
+    print 'tenant: ' + get_tenant()
+    print 'Master Zone'
     print_connection_info(zones[0].conn)
-    print('PubSub Zone')
+    print 'PubSub Zone'
     print_connection_info(ps_zones[0].conn)
-    print('Bucket: ' + bucket_name)
+    print 'Bucket: ' + bucket_name
 
 
 def test_ps_s3_notification_low_level():
@@ -175,7 +420,7 @@ def test_ps_s3_notification_low_level():
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -247,7 +492,7 @@ def test_ps_s3_notification_records():
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -304,7 +549,7 @@ def test_ps_s3_notification():
     topic_conf_list = [{'Id': notification_name1,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf1.set_config()
     assert_equal(status/100, 2)
@@ -313,7 +558,7 @@ def test_ps_s3_notification():
     topic_conf_list = [{'Id': notification_name2,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
-                        }]
+                       }]
     s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf2.set_config()
     assert_equal(status/100, 2)
@@ -387,7 +632,7 @@ def test_ps_topic_with_endpoint():
     # create topic
     dest_endpoint = 'amqp://localhost:7001'
     dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name, 
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
                          endpoint=dest_endpoint,
                          endpoint_args=dest_args)
     _, status = topic_conf.set_config()
@@ -638,7 +883,7 @@ def test_ps_event_type_subscription():
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
     log.debug("Event (OBJECT_DELETE) synced")
-    
+
     # get the events from the creations subscription
     result, _ = sub_create_conf.get_events()
     parsed_result = json.loads(result)
@@ -935,11 +1180,18 @@ def test_ps_versioned_deletion():
 
 def test_ps_push_http():
     """ test pushing to http endpoint """
-    return SkipTest("PubSub push tests are only manual")
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(host, port)
+
     # create topic
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
     _, status = topic_conf.set_config()
@@ -955,7 +1207,7 @@ def test_ps_push_http():
     assert_equal(status/100, 2)
     # create subscription
     sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
-                              topic_name, endpoint='http://localhost:9001')
+                              topic_name, endpoint='http://'+host+':'+str(port))
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -965,6 +1217,10 @@ def test_ps_push_http():
         key.set_contents_from_string('bar')
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    # check http server
+    keys = list(bucket.list())
+    # TODO: use exact match
+    http_server.verify_events(keys, exact_match=False)
 
     # delete objects from the bucket
     for key in bucket.list():
@@ -972,24 +1228,35 @@ def test_ps_push_http():
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    # check http server
+    # TODO: use exact match
+    http_server.verify_events(keys, deletions=True, exact_match=False)
 
     # cleanup
     sub_conf.del_config()
     notification_conf.del_config()
     topic_conf.del_config()
     zones[0].delete_bucket(bucket_name)
+    http_server.close()
 
 
 def test_ps_s3_push_http():
-    """ test pushing to http endpoint n s3 record format"""
-    return SkipTest("PubSub push tests are only manual")
+    """ test pushing to http endpoint s3 record format"""
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(host, port)
+
     # create topic
     topic_conf = PSTopic(ps_zones[0].conn, topic_name,
-                         endpoint='http://localhost:9001')
+                         endpoint='http://'+host+':'+str(port))
     result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(result)
@@ -1002,8 +1269,8 @@ def test_ps_s3_push_http():
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:*']
-                        }]
+                        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -1014,7 +1281,10 @@ def test_ps_s3_push_http():
         key.set_contents_from_string('bar')
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    # TODO check http server
+    # check http server
+    keys = list(bucket.list())
+    # TODO: use exact match
+    http_server.verify_s3_events(keys, exact_match=False)
 
     # delete objects from the bucket
     for key in bucket.list():
@@ -1022,22 +1292,33 @@ def test_ps_s3_push_http():
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    # TODO check http server
+    # check http server
+    # TODO: use exact match
+    http_server.verify_s3_events(keys, deletions=True, exact_match=False)
 
     # cleanup
     s3_notification_conf.del_config()
     topic_conf.del_config()
     zones[0].delete_bucket(bucket_name)
+    http_server.close()
 
 
 def test_ps_push_amqp():
     """ test pushing to amqp endpoint """
-    return SkipTest("PubSub push tests are only manual")
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
     _, status = topic_conf.set_config()
     assert_equal(status/100, 2)
@@ -1052,8 +1333,8 @@ def test_ps_push_amqp():
     assert_equal(status/100, 2)
     # create subscription
     sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
-                              topic_name, endpoint='amqp://localhost',
-                              endpoint_args='amqp-exchange=ex1&amqp-ack-level=none')
+                              topic_name, endpoint='amqp://'+hostname,
+                              endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=none')
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -1063,7 +1344,10 @@ def test_ps_push_amqp():
         key.set_contents_from_string('bar')
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    # TODO check amqp receiver
+    # check amqp receiver
+    keys = list(bucket.list())
+    # TODO: use exact match
+    receiver.verify_events(keys, exact_match=False)
 
     # delete objects from the bucket
     for key in bucket.list():
@@ -1071,26 +1355,38 @@ def test_ps_push_amqp():
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    # TODO check amqp receiver
+    # check amqp receiver
+    # TODO: use exact match
+    receiver.verify_events(keys, deletions=True, exact_match=False)
 
     # cleanup
+    stop_amqp_receiver(receiver, task)
     sub_conf.del_config()
     notification_conf.del_config()
     topic_conf.del_config()
     zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
 
 
 def test_ps_s3_push_amqp():
     """ test pushing to amqp endpoint s3 record format"""
-    return SkipTest("PubSub push tests are only manual")
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
     topic_conf = PSTopic(ps_zones[0].conn, topic_name,
-                         endpoint='amqp://localhost',
-                         endpoint_args='amqp-exchange=ex1&amqp-ack-level=none')
+                         endpoint='amqp://' + hostname,
+                         endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(result)
@@ -1103,8 +1399,8 @@ def test_ps_s3_push_amqp():
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:*']
-                        }]
+                        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -1115,7 +1411,10 @@ def test_ps_s3_push_amqp():
         key.set_contents_from_string('bar')
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    # TODO check amqp receiver
+    # check amqp receiver
+    keys = list(bucket.list())
+    # TODO: use exact match
+    receiver.verify_s3_events(keys, exact_match=False)
 
     # delete objects from the bucket
     for key in bucket.list():
@@ -1123,12 +1422,16 @@ def test_ps_s3_push_amqp():
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    # TODO check amqp receiver
+    # check amqp receiver
+    # TODO: use exact match
+    receiver.verify_s3_events(keys, deletions=True, exact_match=False)
 
     # cleanup
+    stop_amqp_receiver(receiver, task)
     s3_notification_conf.del_config()
     topic_conf.del_config()
     zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
 
 
 def test_ps_delete_bucket():
@@ -1152,11 +1455,11 @@ def test_ps_delete_bucket():
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
-   
+
     # create non-s3 notification
     notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
                                        topic_name)
@@ -1216,12 +1519,12 @@ def test_ps_missing_topic():
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     try:
         s3_notification_conf.set_config()
     except:
-        print('missing topic is expected')
+        log.info('missing topic is expected')
     else:
         assert 'missing topic is expected'
 
@@ -1231,28 +1534,38 @@ def test_ps_missing_topic():
 
 def test_ps_s3_topic_update():
     """ test updating topic associated with a notification"""
-    return SkipTest("PubSub push tests are only manual")
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    rabbit_proc = init_rabbitmq()
+    if rabbit_proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
-    # create topic
-    dest_endpoint1 = 'amqp://localhost'
-    dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
-    dest_endpoint2 = 'http://localhost:9001'
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name, 
-                         endpoint=dest_endpoint1,
-                         endpoint_args=dest_args1)
+    # create amqp topic
+    hostname = get_ip()
+    exchange = 'ex1'
+    amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    amqp_task.start()
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+                          endpoint='amqp://' + hostname,
+                          endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf.set_config()
+    assert_equal(status/100, 2)
     parsed_result = json.loads(result)
     topic_arn = parsed_result['arn']
-    assert_equal(status/100, 2)
     # get topic
     result, _ = topic_conf.get_config()
     # verify topic content
     parsed_result = json.loads(result)
     assert_equal(parsed_result['topic']['name'], topic_name)
-    assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint1)
+    assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
+    
+    # create http server
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(hostname, port)
 
     # create bucket on the first of the rados zones
     bucket = zones[0].create_bucket(bucket_name)
@@ -1263,7 +1576,7 @@ def test_ps_s3_topic_update():
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -1275,11 +1588,13 @@ def test_ps_s3_topic_update():
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
 
-    # TODO: check update to amqp
+    keys = list(bucket.list())
+    # TODO: use exact match
+    receiver.verify_s3_events(keys, exact_match=False)
 
-    # update the same topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name, 
-                         endpoint=dest_endpoint2)
+    # update the same topic with new endpoint
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+                         endpoint='http://'+ hostname + ':' + str(port))
     _, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     # get topic
@@ -1287,73 +1602,107 @@ def test_ps_s3_topic_update():
     # verify topic content
     parsed_result = json.loads(result)
     assert_equal(parsed_result['topic']['name'], topic_name)
-    assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint2)
+    assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
 
-    # create more objects in the bucket
-    number_of_objects = 10
+    # delete current objects and create new objects in the bucket
+    for key in bucket.list():
+        key.delete()
     for i in range(number_of_objects):
         key = bucket.new_key(str(i+100))
         key.set_contents_from_string('bar')
     # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
 
-    # TODO: check it is still updating amqp
+    keys = list(bucket.list())
+    # verify that notifications are still sent to amqp
+    # TODO: use exact match
+    receiver.verify_s3_events(keys, exact_match=False)
 
     # update notification to update the endpoint from the topic
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
-    # create even more objects in the bucket
-    number_of_objects = 10
+    
+    # delete current objects and create new objects in the bucket
+    for key in bucket.list():
+        key.delete()
     for i in range(number_of_objects):
         key = bucket.new_key(str(i+200))
         key.set_contents_from_string('bar')
     # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
 
-    # TODO: check that updates switched to http
+    keys = list(bucket.list())
+    # check that updates switched to http
+    # TODO: use exact match
+    http_server.verify_s3_events(keys, exact_match=False)
 
     # cleanup
     # delete objects from the bucket
+    stop_amqp_receiver(receiver, amqp_task)
     for key in bucket.list():
         key.delete()
     s3_notification_conf.del_config()
     topic_conf.del_config()
     zones[0].delete_bucket(bucket_name)
+    http_server.close()
+    clean_rabbitmq(rabbit_proc)
 
 
 def test_ps_s3_notification_update():
     """ test updating the topic of a notification"""
-    return SkipTest("PubSub push tests are only manual")
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    rabbit_proc = init_rabbitmq()
+    if rabbit_proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
+    topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
 
-    # create first topic
-    dest_endpoint1 = 'amqp://localhost'
-    dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
-    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, 
-                         endpoint=dest_endpoint1,
-                         endpoint_args=dest_args1)
+    # create topics
+    # start amqp receiver in a separate thread
+    exchange = 'ex1'
+    amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
+    amqp_task.start()
+    # create random port for the http server
+    http_port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(hostname, http_port)
+
+    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+                          endpoint='amqp://' + hostname,
+                          endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf1.set_config()
     parsed_result = json.loads(result)
     topic_arn1 = parsed_result['arn']
     assert_equal(status/100, 2)
+    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+                          endpoint='http://'+hostname+':'+str(http_port))
+    result, status = topic_conf2.set_config()
+    parsed_result = json.loads(result)
+    topic_arn2 = parsed_result['arn']
+    assert_equal(status/100, 2)
 
     # create bucket on the first of the rados zones
     bucket = zones[0].create_bucket(bucket_name)
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
-    # create s3 notification
+    # create s3 notification with topic1
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn1,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -1364,64 +1713,81 @@ def test_ps_s3_notification_update():
         key.set_contents_from_string('bar')
     # wait for sync
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
-    result, _ = s3_notification_conf.get_config()
 
-    # TODO: check updates to amqp
-
-    # create another topic
-    topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
-    dest_endpoint2 = 'http://localhost:9001'
-    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, 
-                         endpoint=dest_endpoint2)
-    result, status = topic_conf2.set_config()
-    parsed_result = json.loads(result)
-    topic_arn2 = parsed_result['arn']
-    assert_equal(status/100, 2)
+    keys = list(bucket.list())
+    # TODO: use exact match
+    receiver.verify_s3_events(keys, exact_match=False);
 
-    # update notification to the new topic
+    # update notification to use topic2
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn2,
                         'Events': ['s3:ObjectCreated:*']
-                        }]
+                       }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
-    # create more objects in the bucket
-    number_of_objects = 10
+
+    # delete current objects and create new objects in the bucket
+    for key in bucket.list():
+        key.delete()
     for i in range(number_of_objects):
-        key = bucket.new_key(str(i+200))
+        key = bucket.new_key(str(i+100))
         key.set_contents_from_string('bar')
     # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
     zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
 
-    # TODO: check uodate to http
-    result, _ = s3_notification_conf.get_config()
+    keys = list(bucket.list())
+    # check that updates switched to http
+    # TODO: use exact match
+    http_server.verify_s3_events(keys, exact_match=False)
 
     # cleanup
     # delete objects from the bucket
+    stop_amqp_receiver(receiver, amqp_task)
     for key in bucket.list():
         key.delete()
     s3_notification_conf.del_config()
     topic_conf1.del_config()
     topic_conf2.del_config()
     zones[0].delete_bucket(bucket_name)
+    http_server.close()
+    clean_rabbitmq(rabbit_proc)
 
 
 def test_ps_s3_multiple_topics_notification():
     """ test notification creation with multiple topics"""
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    rabbit_proc = init_rabbitmq()
+    if rabbit_proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
-    # TODO: test via push endpoint (amqp+http)
     topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
     topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
 
     # create topics
-    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1)
+    # start amqp receiver in a separate thread
+    exchange = 'ex1'
+    amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
+    amqp_task.start()
+    # create random port for the http server
+    http_port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(hostname, http_port)
+
+    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+                          endpoint='amqp://' + hostname,
+                          endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf1.set_config()
     parsed_result = json.loads(result)
     topic_arn1 = parsed_result['arn']
     assert_equal(status/100, 2)
-    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2)
+    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+                          endpoint='http://'+hostname+':'+str(http_port))
     result, status = topic_conf2.set_config()
     parsed_result = json.loads(result)
     topic_arn2 = parsed_result['arn']
@@ -1435,16 +1801,16 @@ def test_ps_s3_multiple_topics_notification():
     notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
     notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
     topic_conf_list = [
-            {
-                'Id': notification_name1,
-                'TopicArn': topic_arn1,
-                'Events': ['s3:ObjectCreated:*']
-            },
-            {
-                'Id': notification_name2,
-                'TopicArn': topic_arn2,
-                'Events': ['s3:ObjectCreated:*']
-            }]
+        {
+            'Id': notification_name1,
+            'TopicArn': topic_arn1,
+            'Events': ['s3:ObjectCreated:*']
+        },
+        {
+            'Id': notification_name2,
+            'TopicArn': topic_arn2,
+            'Events': ['s3:ObjectCreated:*']
+        }]
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -1455,14 +1821,14 @@ def test_ps_s3_multiple_topics_notification():
 
     # get auto-generated subscriptions
     sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1,
-                              topic_name1)
+                               topic_name1)
     _, status = sub_conf1.get_config()
     assert_equal(status/100, 2)
     sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2,
-                              topic_name2)
+                               topic_name2)
     _, status = sub_conf2.get_config()
     assert_equal(status/100, 2)
-    
+
     # create objects in the bucket
     number_of_objects = 10
     for i in range(number_of_objects):
@@ -1479,7 +1845,8 @@ def test_ps_s3_multiple_topics_notification():
     keys = list(bucket.list())
     # TODO: use exact match
     verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
-    
+    receiver.verify_s3_events(keys, exact_match=False)
+
     result, _ = sub_conf2.get_events()
     parsed_result = json.loads(result)
     for record in parsed_result['Records']:
@@ -1487,8 +1854,10 @@ def test_ps_s3_multiple_topics_notification():
     keys = list(bucket.list())
     # TODO: use exact match
     verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
-    
+    http_server.verify_s3_events(keys, exact_match=True)
+
     # cleanup
+    stop_amqp_receiver(receiver, amqp_task)
     s3_notification_conf.del_config()
     topic_conf1.del_config()
     topic_conf2.del_config()
@@ -1496,3 +1865,5 @@ def test_ps_s3_multiple_topics_notification():
     for key in bucket.list():
         key.delete()
     zones[0].delete_bucket(bucket_name)
+    http_server.close()
+    clean_rabbitmq(rabbit_proc)