]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW\bucket notification tests: end-point agnostic + refactoring 57550/head
authorAli Masarwa <amasarwa@redhat.com>
Sun, 19 May 2024 14:23:31 +0000 (17:23 +0300)
committerAli Masarwa <amasarwa@redhat.com>
Thu, 30 May 2024 14:25:56 +0000 (17:25 +0300)
Signed-off-by: Ali Masarwa <amasarwa@redhat.com>
src/test/rgw/bucket_notification/test_bn.py

index df018f79b99500576f9ae43ba1c8b1b8064dc070..f7e12f25cda5560aab7a4612c9aa19a7cd26ca50 100644 (file)
@@ -444,6 +444,11 @@ class KafkaReceiver(object):
         verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags)
         self.events = []
 
+    def get_and_reset_events(self):
+        tmp = self.events
+        self.events = []
+        return tmp
+
     def close(self, task):
         stop_kafka_receiver(self, task)
 
@@ -701,25 +706,24 @@ def test_ps_s3_topic_admin_on_master():
     assert_equal(len(parsed_result['topics']), 0)
 
 
-@attr('basic_test')
-def test_ps_s3_notification_configuration_admin_on_master():
-    """ test s3 notification list/get/delete on master """
+def notification_configuration(with_cli):
     conn = connection()
     zonegroup = get_config_zonegroup()
     bucket_name = gen_bucket_name()
+    # create bucket
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
-    
+
     # make sure there are no leftover topics
     delete_all_topics(conn, '', get_config_cluster())
 
     # create s3 topics
-    endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+    endpoint_address = 'amqp://127.0.0.1:7001'
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1')
+                 'arn:aws:sns:' + zonegroup + '::' + topic_name)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name+'_1',
@@ -738,31 +742,55 @@ def test_ps_s3_notification_configuration_admin_on_master():
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
-    # list notification
-    result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result['notifications']), 3)
-    assert_equal(result[1], 0)
-
     # get notification 1
-    result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Id'], notification_name+'_1')
-    assert_equal(result[1], 0)
+    if with_cli:
+        result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(parsed_result['Id'], notification_name+'_1')
+        assert_equal(parsed_result['TopicArn'], topic_arn)
+        assert_equal(result[1], 0)
+    else:
+        response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
+        assert_equal(status/100, 2)
+        assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
 
-    # remove notification 3
-    _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'], get_config_cluster())
-    assert_equal(result, 0)
+    # list notification
+    if with_cli:
+        result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(len(parsed_result['notifications']), 3)
+        assert_equal(result[1], 0)
+    else:
+        result, status = s3_notification_conf.get_config()
+        assert_equal(status, 200)
+        assert_equal(len(result['TopicConfigurations']), 3)
+
+    # delete notification 2
+    if with_cli:
+        _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_2'], get_config_cluster())
+        assert_equal(result, 0)
+    else:
+        _, status = s3_notification_conf.del_config(notification=notification_name+'_2')
+        assert_equal(status/100, 2)
 
     # list notification
-    result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
-    parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result['notifications']), 2)
-    assert_equal(result[1], 0)
+    if with_cli:
+        result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(len(parsed_result['notifications']), 2)
+        assert_equal(result[1], 0)
+    else:
+        result, status = s3_notification_conf.get_config()
+        assert_equal(status, 200)
+        assert_equal(len(result['TopicConfigurations']), 2)
 
     # delete notifications
-    _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
-    assert_equal(result, 0)
+    if with_cli:
+        _, result = admin(['notification', 'rm', '--bucket', bucket_name], get_config_cluster())
+        assert_equal(result, 0)
+    else:
+        _, status = s3_notification_conf.del_config()
+        assert_equal(status/100, 2)
 
     # list notification, make sure it is empty
     result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
@@ -770,6 +798,17 @@ def test_ps_s3_notification_configuration_admin_on_master():
     assert_equal(len(parsed_result['notifications']), 0)
     assert_equal(result[1], 0)
 
+    # cleanup
+    topic_conf.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+
+
+@attr('basic_test')
+def test_notification_configuration_admin():
+    """ test notification list/set/get/delete, with admin cli """
+    notification_configuration(True)
+
 
 @attr('modification_required')
 def test_ps_s3_topic_with_secret_on_master():
@@ -823,64 +862,9 @@ def test_ps_s3_topic_with_secret_on_master():
 
 
 @attr('basic_test')
-def test_ps_s3_notification_on_master():
-    """ test s3 notification set/get/delete on master """
-    conn = connection()
-    zonegroup = get_config_zonegroup()
-    bucket_name = gen_bucket_name()
-    # create bucket
-    bucket = conn.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    # create s3 topic
-    endpoint_address = 'amqp://127.0.0.1:7001'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name+'_1',
-                        'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:*']
-                       },
-                       {'Id': notification_name+'_2',
-                        'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectRemoved:*']
-                       },
-                       {'Id': notification_name+'_3',
-                        'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
-    _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # get notifications on a bucket
-    response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
-    assert_equal(status/100, 2)
-    assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
-
-    # delete specific notifications
-    _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
-    assert_equal(status/100, 2)
-
-    # get the remaining 2 notifications on a bucket
-    response, status = s3_notification_conf.get_config()
-    assert_equal(status/100, 2)
-    assert_equal(len(response['TopicConfigurations']), 2)
-    assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
-    assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
-
-    # delete remaining notifications
-    _, status = s3_notification_conf.del_config()
-    assert_equal(status/100, 2)
-
-    # make sure that the notifications are now deleted
-    _, status = s3_notification_conf.get_config()
-
-    # cleanup
-    topic_conf.del_config()
-    # delete the bucket
-    conn.delete_bucket(bucket_name)
+def test_notification_configuration():
+    """ test s3 notification set/get/deleter """
+    notification_configuration(False)
 
 
 @attr('basic_test')
@@ -1214,7 +1198,7 @@ def test_ps_s3_notification_errors_on_master():
     conn.delete_bucket(bucket_name)
 
 
-def notification_push(endpoint_type, conn, account=None):
+def notification_push(endpoint_type, conn, account=None, cloudevents=False):
     """ test pushinging notification """
     zonegroup = get_config_zonegroup()
     # create bucket
@@ -1222,19 +1206,19 @@ def notification_push(endpoint_type, conn, account=None):
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
-    receiver = {}
-    task = None
     host = get_ip()
-    s3_notification_conf = None
-    topic_conf = None
+    task = None
     if endpoint_type == 'http':
         # create random port for the http server
         host = get_ip_http()
         port = random.randint(10000, 20000)
         # start an http server in a separate thread
-        receiver = HTTPServerWithEvents((host, port))
+        receiver = HTTPServerWithEvents((host, port), cloudevents=cloudevents)
         endpoint_address = 'http://'+host+':'+str(port)
-        endpoint_args = 'push-endpoint='+endpoint_address
+        if cloudevents:
+            endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
+        else:
+            endpoint_args = 'push-endpoint='+endpoint_address
         topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
         topic_arn = topic_conf.set_config()
         # create s3 notification
@@ -1291,6 +1275,8 @@ def notification_push(endpoint_type, conn, account=None):
 
     # create objects in the bucket
     number_of_objects = 100
+    if cloudevents:
+        number_of_objects = 10
     client_threads = []
     etags = []
     objects_size = {}
@@ -1565,87 +1551,11 @@ def test_notification_push_http():
 
 
 @attr('http_test')
-def test_ps_s3_notification_push_cloudevents_on_master():
-    """ test pushing cloudevents notification on master """
-    hostname = get_ip_http()
+def test_notification_push_cloudevents():
+    """ test pushing cloudevents notification """
     conn = connection()
-    zonegroup = get_config_zonegroup()
-
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = HTTPServerWithEvents((host, port), cloudevents=True)
-
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = conn.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
-    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket
-    client_threads = []
-    objects_size = {}
-    start_time = time.time()
-    for i in range(number_of_objects):
-        content = str(os.urandom(randint(1, 1024)))
-        object_size = len(content)
-        key = bucket.new_key(str(i))
-        objects_size[key.name] = object_size
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads]
-
-    time_diff = time.time() - start_time
-    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
+    notification_push('http', conn, cloudevents=True)
 
-    # check http receiver
-    keys = list(bucket.list())
-    http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads]
-
-    time_diff = time.time() - start_time
-    print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-
-    # check http receiver
-    http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
-
-    # cleanup
-    topic_conf.del_config()
-    s3_notification_conf.del_config(notification=notification_name)
-    # delete the bucket
-    conn.delete_bucket(bucket_name)
-    http_server.close()
 
 
 @attr('http_test')
@@ -1717,38 +1627,51 @@ def test_ps_s3_opaque_data_on_master():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
-@attr('lifecycle_test')
-def test_ps_s3_lifecycle_on_master():
-    """ test that when object is deleted due to lifecycle policy, notification is sent on master """
-    hostname = get_ip()
-    conn = connection()
-    zonegroup = get_config_zonegroup()
 
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = HTTPServerWithEvents((host, port))
+def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_thread, rules_creator, record_events,
+              expected_abortion=False):
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
+    host = get_ip()
+    task = None
+    port = None
+    if endpoint_type == 'http':
+        # create random port for the http server
+        port = random.randint(10000, 20000)
+        # start an http server in a separate thread
+        receiver = HTTPServerWithEvents((host, port))
+        endpoint_address = 'http://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+    elif endpoint_type == 'amqp':
+        # start amqp receiver
+        exchange = 'ex1'
+        task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+        task.start()
+        endpoint_address = 'amqp://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'
+    elif endpoint_type == 'kafka':
+        # start kafka receiver
+        task, receiver = create_kafka_receiver_thread(topic_name)
+        task.start()
+        endpoint_address = 'kafka://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+    else:
+        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
     # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    opaque_data = 'http://1.2.3.4:8888'
-    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectLifecycle:Expiration:*',
-                                   's3:LifecycleExpiration:*']
-                       }]
+                        'Events': topic_events
+                        }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -1759,50 +1682,41 @@ def test_ps_s3_lifecycle_on_master():
     start_time = time.time()
     content = 'bar'
     for i in range(number_of_objects):
-        key = bucket.new_key(obj_prefix + str(i))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr = create_thread(bucket, obj_prefix, i, content)
         thr.start()
         client_threads.append(thr)
     [thr.join() for thr in client_threads]
 
     time_diff = time.time() - start_time
-    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+    print('average time for creation + '+endpoint_type+' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
     keys = list(bucket.list())
 
     # create lifecycle policy
     client = boto3.client('s3',
-            endpoint_url='http://'+conn.host+':'+str(conn.port),
-            aws_access_key_id=conn.aws_access_key_id,
-            aws_secret_access_key=conn.aws_secret_access_key)
+                          endpoint_url='http://'+conn.host+':'+str(conn.port),
+                          aws_access_key_id=conn.aws_access_key_id,
+                          aws_secret_access_key=conn.aws_secret_access_key)
     yesterday = datetime.date.today() - datetime.timedelta(days=1)
     response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
-            LifecycleConfiguration={'Rules': [
-                {
-                    'ID': 'rule1',
-                    'Expiration': {'Date': yesterday.isoformat()},
-                    'Filter': {'Prefix': obj_prefix},
-                    'Status': 'Enabled',
-                }
-            ]
-        }
-    )
+                                                         LifecycleConfiguration={'Rules': rules_creator(yesterday, obj_prefix)}
+                                                         )
 
     # start lifecycle processing
     admin(['lc', 'process'], get_config_cluster())
     print('wait for 20s for the lifecycle...')
     time.sleep(20)
+    print('wait for sometime for the messages...')
 
     no_keys = list(bucket.list())
     wait_for_queue_to_drain(topic_name, http_port=port)
     assert_equal(len(no_keys), 0)
     event_keys = []
-    events = http_server.get_and_reset_events()
-    assert number_of_objects * 2 <= len(events)
+    events = receiver.get_and_reset_events()
+    if not expected_abortion:
+        assert number_of_objects * 2 <= len(events)
     for event in events:
-        assert_in(event['Records'][0]['eventName'],
-                  ['LifecycleExpiration:Delete',
-                   'ObjectLifecycle:Expiration:Current'])
+        assert_in(event['Records'][0]['eventName'], record_events)
         event_keys.append(event['Records'][0]['s3']['object']['key'])
     for key in keys:
         key_found = False
@@ -1818,99 +1732,80 @@ def test_ps_s3_lifecycle_on_master():
     # cleanup
     for key in keys:
         key.delete()
-    [thr.join() for thr in client_threads]
+    if not expected_abortion:
+        [thr.join() for thr in client_threads]
     topic_conf.del_config()
     s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    http_server.close()
+    receiver.close(task)
+
+
+def rules_creator(yesterday, obj_prefix):
+    return [
+        {
+            'ID': 'rule1',
+            'Expiration': {'Date': yesterday.isoformat()},
+            'Filter': {'Prefix': obj_prefix},
+            'Status': 'Enabled',
+        }
+    ]
+
+
+def create_thread(bucket, obj_prefix, i, content):
+    key = bucket.new_key(obj_prefix + str(i))
+    return threading.Thread(target = set_contents_from_string, args=(key, content,))
+
+
+@attr('http_test')
+def test_lifecycle_http():
+    """ test that when object is deleted due to lifecycle policy, http endpoint """
+
+    conn = connection()
+    lifecycle('http', conn, 10, ['s3:ObjectLifecycle:Expiration:*', 's3:LifecycleExpiration:*'], create_thread,
+              rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current'])
+
+
+@attr('kafka_test')
+def test_lifecycle_kafka():
+    """ test that when object is deleted due to lifecycle policy, kafka endpoint """
+
+    conn = connection()
+    lifecycle('kafka', conn, 10, ['s3:ObjectLifecycle:Expiration:*', 's3:LifecycleExpiration:*'], create_thread,
+              rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current'])
+
 
 def start_and_abandon_multipart_upload(bucket, key_name, content):
     try:
         mp = bucket.initiate_multipart_upload(key_name)
         part_data = io.StringIO(content)
         mp.upload_part_from_file(part_data, 1)
-        # mp.complete_upload()
     except Exception as e:
         print('Error: ' + str(e))
 
-@attr('lifecycle_test')
-def test_ps_s3_lifecycle_abort_mpu_on_master():
-    """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
-    hostname = get_ip()
-    conn = connection()
-    zonegroup = get_config_zonegroup()
-
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    http_server = HTTPServerWithEvents((host, port))
-
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = conn.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    opaque_data = 'http://1.2.3.4:8888'
-    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectLifecycle:Expiration:*']
-                       }]
-    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # start and abandon a multpart upload
-    obj_prefix = 'ooo'
-    content = 'bar'
 
-    key_name = obj_prefix + str(1)
-    thr = threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,))
-    thr.start()
-    thr.join()    
+@attr('http_test')
+def test_lifecycle_abort_mpu():
+    """ test that when a multipart upload is aborted by lifecycle policy, http endpoint """
 
-    # create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect
-    client = boto3.client('s3',
-            endpoint_url='http://'+conn.host+':'+str(conn.port),
-            aws_access_key_id=conn.aws_access_key_id,
-            aws_secret_access_key=conn.aws_secret_access_key)
-    response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, 
-            LifecycleConfiguration={'Rules': [
-                {
-                    'ID': 'abort1',
-                    'Filter': {'Prefix': obj_prefix},
-                    'Status': 'Enabled',
-                    'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1},
-                }
-            ]
-        }
-    )
+    def rules_creator(yesterday, obj_prefix):
+        return [
+            {
+                'ID': 'abort1',
+                'Filter': {'Prefix': obj_prefix},
+                'Status': 'Enabled',
+                'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1},
+            }
+        ]
 
-    # start lifecycle processing
-    admin(['lc', 'process'], get_config_cluster())
-    print('wait for 20s for the lifecycle...')
-    time.sleep(20)
+    def create_thread(bucket, obj_prefix, i, content):
+        key_name = obj_prefix + str(i)
+        return threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,))
 
-    wait_for_queue_to_drain(topic_name, http_port=port)
-    events = http_server.get_and_reset_events()
-    for event in events:
-        assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMPU')
-        assert key_name in event['Records'][0]['s3']['object']['key']
+    conn = connection()
+    lifecycle('http', conn, 1, ['s3:ObjectLifecycle:Expiration:*'], create_thread, rules_creator,
+              ['ObjectLifecycle:Expiration:AbortMultipartUpload'], True)
 
-    # cleanup
-    topic_conf.del_config()
-    s3_notification_conf.del_config(notification=notification_name)
-    # delete the bucket
-    conn.delete_bucket(bucket_name)
-    http_server.close()
 
 def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
     """ test object creation s3 notifications in using put/copy/post on master"""
@@ -2219,35 +2114,49 @@ def test_http_post_object_upload():
     conn1.delete_bucket(Bucket=bucket_name)
 
 
-@attr('mpu_test')
-def test_ps_s3_multipart_on_master_http():
-    """ test http multipart object upload on master"""
-    conn = connection()
-    zonegroup = 'default'
-
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    http_server = HTTPServerWithEvents((host, port))
+def multipart_endpoint_agnostic(endpoint_type, conn):
+    hostname = get_ip()
+    zonegroup = get_config_zonegroup()
 
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
+    host = get_ip()
+    task = None
+    if endpoint_type == 'http':
+        # create random port for the http server
+        port = random.randint(10000, 20000)
+        # start an http server in a separate thread
+        receiver = HTTPServerWithEvents((hostname, port))
+        endpoint_address = 'http://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address
+    elif endpoint_type == 'amqp':
+        # start amqp receiver
+        exchange = 'ex1'
+        task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+        task.start()
+        endpoint_address = 'amqp://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'
+    elif endpoint_type == 'kafka':
+        # start amqp receiver
+        task, receiver = create_kafka_receiver_thread(topic_name)
+        task.start()
+        endpoint_address = 'kafka://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+    else:
+        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
     # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address
-    opaque_data = 'http://1.2.3.4:8888'
-    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
                         'Events': []
                         }]
+
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -2256,7 +2165,7 @@ def test_ps_s3_multipart_on_master_http():
     client_threads = []
     content = str(os.urandom(20*1024*1024))
     key = bucket.new_key('obj')
-    thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+    thr = threading.Thread(target=set_contents_from_string, args=(key, content,))
     thr.start()
     client_threads.append(thr)
     [thr.join() for thr in client_threads]
@@ -2266,133 +2175,75 @@ def test_ps_s3_multipart_on_master_http():
 
     # check http receiver
     keys = list(bucket.list())
-    print('total number of objects: ' + str(len(keys)))
-    events = http_server.get_and_reset_events()
-    for event in events:
-        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
-        assert_true(event['Records'][0]['s3']['object']['eTag'] != '')
+    receiver.verify_s3_events(keys, exact_match=True, deletions=False)
 
     # cleanup
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete objects
     for key in keys:
         key.delete()
-    [thr.join() for thr in client_threads]
-    topic_conf.del_config()
-    s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    http_server.close()
-
+    receiver.close(task)
 
-@attr('amqp_test')
-def test_ps_s3_multipart_on_master():
-    """ test multipart object upload on master"""
 
-    hostname = get_ip()
+@attr('http_test')
+def test_multipart_http():
+    """ test http multipart object upload """
     conn = connection()
-    zonegroup = get_config_zonegroup()
+    multipart_endpoint_agnostic('http', conn)
 
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = conn.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # start amqp receivers
-    exchange = 'ex1'
-    task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
-    task1.start()
-    task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
-    task2.start()
-    task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
-    task3.start()
-
-    # create s3 topics
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
-    topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
-    topic_arn1 = topic_conf1.set_config()
-    topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
-    topic_arn2 = topic_conf2.set_config()
-    topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
-    topic_arn3 = topic_conf3.set_config()
-
-    # create s3 notifications
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
-                        'Events': ['s3:ObjectCreated:*']
-                       },
-                       {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
-                        'Events': ['s3:ObjectCreated:Post']
-                       },
-                       {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
-                        'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
-                       }]
-    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
 
-    # create objects in the bucket using multi-part upload
-    fp = tempfile.NamedTemporaryFile(mode='w+b')
-    object_size = 1024
-    content = bytearray(os.urandom(object_size))
-    fp.write(content)
-    fp.flush()
-    fp.seek(0)
-    uploader = bucket.initiate_multipart_upload('multipart')
-    uploader.upload_part_from_file(fp, 1)
-    uploader.complete_upload()
-    fp.close()
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-
-    # check amqp receiver
-    events = receiver1.get_and_reset_events()
-    assert_equal(len(events), 1)
-
-    events = receiver2.get_and_reset_events()
-    assert_equal(len(events), 0)
-
-    events = receiver3.get_and_reset_events()
-    assert_equal(len(events), 1)
-    assert_equal(events[0]['Records'][0]['eventName'], 'ObjectCreated:CompleteMultipartUpload')
-    assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
-    assert_equal(events[0]['Records'][0]['s3']['object']['size'], object_size)
-    assert events[0]['Records'][0]['eventTime'] != '0.000000', 'invalid eventTime'
+@attr('kafka_test')
+def test_multipart_kafka():
+    """ test kafka multipart object upload """
+    conn = connection()
+    multipart_endpoint_agnostic('kafka', conn)
 
-    # cleanup
-    stop_amqp_receiver(receiver1, task1)
-    stop_amqp_receiver(receiver2, task2)
-    stop_amqp_receiver(receiver3, task3)
-    s3_notification_conf.del_config()
-    topic_conf1.del_config()
-    topic_conf2.del_config()
-    topic_conf3.del_config()
-    for key in bucket.list():
-        key.delete()
-    # delete the bucket
-    conn.delete_bucket(bucket_name)
 
 @attr('amqp_test')
-def test_ps_s3_metadata_filter_on_master():
-    """ test s3 notification of metadata on master """
-
-    hostname = get_ip()
+def test_multipart_ampq():
+    """ test ampq multipart object upload """
     conn = connection()
-    zonegroup = get_config_zonegroup()
+    multipart_endpoint_agnostic('ampq', conn)
+
 
+def metadata_filter(endpoint_type, conn):
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX 
 
-    # start amqp receivers
-    exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
-    task.start()
+    # start endpoint receiver
+    host = get_ip()
+    task = None
+    port = None
+    if endpoint_type == 'http':
+        # create random port for the http server
+        port = random.randint(10000, 20000)
+        # start an http server in a separate thread
+        receiver = HTTPServerWithEvents((host, port))
+        endpoint_address = 'http://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+    elif endpoint_type == 'amqp':
+        # start amqp receiver
+        exchange = 'ex1'
+        task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+        task.start()
+        endpoint_address = 'amqp://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable&persistent=true'
+    elif endpoint_type == 'kafka':
+        # start kafka receiver
+        task, receiver = create_kafka_receiver_thread(topic_name)
+        task.start()
+        endpoint_address = 'kafka://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+    else:
+        return SkipTest('Unknown endpoint type: ' + endpoint_type)
 
     # create s3 topic
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+    zonegroup = get_config_zonegroup()
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -2449,8 +2300,8 @@ def test_ps_s3_metadata_filter_on_master():
     fp.close()
     expected_keys.append(key_name)
 
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
+    print('wait for the messages...')
+    wait_for_queue_to_drain(topic_name, http_port=port)
     # check amqp receiver
     events = receiver.get_and_reset_events()
     assert_equal(len(events), len(expected_keys))
@@ -2460,22 +2311,43 @@ def test_ps_s3_metadata_filter_on_master():
     # delete objects
     for key in bucket.list():
         key.delete()
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    # check amqp receiver
+    print('wait for the messages...')
+    wait_for_queue_to_drain(topic_name, http_port=port)
+    # check endpoint receiver
     events = receiver.get_and_reset_events()
     assert_equal(len(events), len(expected_keys))
     for event in events:
         assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
 
     # cleanup
-    stop_amqp_receiver(receiver, task)
+    receiver.close(task)
     s3_notification_conf.del_config()
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
 
 
+@attr('kafka_test')
+def test_metadata_filter_kafka():
+    """ test notification of filtering metadata, kafka endpoint """
+    conn = connection()
+    metadata_filter('kafka', conn)
+
+
+@attr('http_test')
+def test_metadata_filter_http():
+    """ test notification of filtering metadata, http endpoint """
+    conn = connection()
+    metadata_filter('http', conn)
+
+
+@attr('amqp_test')
+def test_metadata_filter_ampq():
+    """ test notification of filtering metadata, ampq endpoint """
+    conn = connection()
+    metadata_filter('amqp', conn)
+
+
 @attr('amqp_test')
 def test_ps_s3_metadata_on_master():
     """ test s3 notification of metadata on master """
@@ -3725,9 +3597,8 @@ def persistent_notification(endpoint_type, conn, account=None):
     bucket = conn.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
-    receiver = {}
-    task = None
     host = get_ip()
+    task = None
     if endpoint_type == 'http':
         # create random port for the http server
         host = get_ip_http()