]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/tasks: Adding RabbitMQ task for bucket notification tests
authorKalpesh <kalpesh@localhost.localdomain>
Tue, 20 Apr 2021 09:14:04 +0000 (14:44 +0530)
committerKalpesh Pandya <kapandya@redhat.com>
Tue, 1 Jun 2021 18:04:31 +0000 (23:34 +0530)
This commit majorly consists of the RabbitMQ task which is a required and supported endpoint in bucket notification tests.
And some related changes in the AMQP tests. Major changes are:
1. Addition of RabbitMQ task
2. Documentation update for the steps to execute AMQP tests
3. Addition of attributes to the tests
4. Tox dependency removal from kafka.py

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
qa/suites/rgw/notifications/supported-all-distro$/$ [new file with mode: 0644]
qa/suites/rgw/notifications/supported-all-distro$/centos_8.yaml [new symlink]
qa/suites/rgw/notifications/supported-random-distro$ [deleted symlink]
qa/suites/rgw/notifications/tasks/test_amqp.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/test_kafka.yaml
qa/suites/rgw/notifications/tasks/test_others.yaml [new file with mode: 0644]
qa/tasks/kafka.py
qa/tasks/notification_tests.py
qa/tasks/rabbitmq.py [new file with mode: 0644]
src/test/rgw/bucket_notification/README.rst
src/test/rgw/bucket_notification/test_bn.py

diff --git a/qa/suites/rgw/notifications/supported-all-distro$/$ b/qa/suites/rgw/notifications/supported-all-distro$/$
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/rgw/notifications/supported-all-distro$/centos_8.yaml b/qa/suites/rgw/notifications/supported-all-distro$/centos_8.yaml
new file mode 120000 (symlink)
index 0000000..c23fd05
--- /dev/null
@@ -0,0 +1 @@
+../.qa/distros/supported-all-distro/centos_8.yaml
\ No newline at end of file
diff --git a/qa/suites/rgw/notifications/supported-random-distro$ b/qa/suites/rgw/notifications/supported-random-distro$
deleted file mode 120000 (symlink)
index 0862b44..0000000
+++ /dev/null
@@ -1 +0,0 @@
-.qa/distros/supported-random-distro$
\ No newline at end of file
diff --git a/qa/suites/rgw/notifications/tasks/test_amqp.yaml b/qa/suites/rgw/notifications/tasks/test_amqp.yaml
new file mode 100644 (file)
index 0000000..e02cc8f
--- /dev/null
@@ -0,0 +1,8 @@
+tasks:
+- rabbitmq:
+    client.0:
+- notification-tests:
+    client.0:
+      force-branch: master
+      extra_attr: ["amqp_test"]
+      rgw_server: client.0
index dad9c47ddb5f2f02217da2534eed277f2ebd38de..4fa0791e03218e3d876757a3baabdbfd1980da6b 100644 (file)
@@ -1,9 +1,9 @@
 tasks:
-- tox: [ client.0 ]
 - kafka:
     client.0:
       kafka_version: 2.6.0
 - notification-tests:
     client.0:
       force-branch: master
+      extra_attr: ["kafka_test"]
       rgw_server: client.0
diff --git a/qa/suites/rgw/notifications/tasks/test_others.yaml b/qa/suites/rgw/notifications/tasks/test_others.yaml
new file mode 100644 (file)
index 0000000..f13cac6
--- /dev/null
@@ -0,0 +1,5 @@
+tasks:
+- notification-tests:
+    client.0:
+      force-branch: master
+      rgw_server: client.0
index d4b3e08fe4201aa4aa25dc1e8f24aeb3bc966941..5c121cc622619d159a7107b4ad6dd4aa690c34ed 100644 (file)
@@ -7,7 +7,6 @@ import logging
 from teuthology import misc as teuthology
 from teuthology import contextutil
 from teuthology.orchestra import run
-from teuthology.exceptions import ConfigError
 
 log = logging.getLogger(__name__)
 
@@ -22,12 +21,6 @@ def get_kafka_dir(ctx, config):
     current_version = 'kafka-' + kafka_version + '-src'
     return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
 
-def get_toxvenv_dir(ctx):
-    return ctx.tox.venv_path
-
-def toxvenv_sh(ctx, remote, args, **kwargs):
-    activate = get_toxvenv_dir(ctx) + '/bin/activate'
-    return remote.sh(['source', activate, run.Raw('&&')] + args, **kwargs)
 
 @contextlib.contextmanager
 def install_kafka(ctx, config):
@@ -43,10 +36,14 @@ def install_kafka(ctx, config):
         current_version = get_kafka_version(config)
 
         link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
-        toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1])
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
+        )
 
         file1 = 'kafka-' + current_version + '-src.tgz'
-        toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1])
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1],
+        )
 
     try:
         yield
@@ -54,7 +51,7 @@ def install_kafka(ctx, config):
         log.info('Removing packaged dependencies of Kafka...')
         test_dir=get_kafka_dir(ctx, config)
         current_version = get_kafka_version(config)
-        for client in config:
+        for (client,_) in config.items():
             ctx.cluster.only(client).run(
                 args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
             )
@@ -81,23 +78,23 @@ def run_kafka(ctx,config):
     for (client,_) in config.items():
         (remote,) = ctx.cluster.only(client).remotes.keys()
 
-        toxvenv_sh(ctx, remote,
-            ['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
              './gradlew', 'jar', 
              '-PscalaVersion=2.13.2'
             ],
         )
 
-        toxvenv_sh(ctx, remote,
-            ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
              './zookeeper-server-start.sh',
              '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
              run.Raw('&'), 'exit'
             ],
         )
 
-        toxvenv_sh(ctx, remote,
-            ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
              './kafka-server-start.sh',
              '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
              run.Raw('&'), 'exit'
@@ -112,18 +109,18 @@ def run_kafka(ctx,config):
         for (client, _) in config.items():
             (remote,) = ctx.cluster.only(client).remotes.keys()
 
-            toxvenv_sh(ctx, remote, 
-                ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+            ctx.cluster.only(client).run(
+                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                  './kafka-server-stop.sh',  
                  '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
-                ]
+                ],
             )
 
-            toxvenv_sh(ctx, remote, 
-                ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
+            ctx.cluster.only(client).run(
+                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
                  './zookeeper-server-stop.sh',
                  '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
-                ]
+                ],
             )
 
 @contextlib.contextmanager
@@ -136,29 +133,32 @@ def run_admin_cmds(ctx,config):
     for (client,_) in config.items():
         (remote,) = ctx.cluster.only(client).remotes.keys()
 
-        toxvenv_sh(ctx, remote,
-            [
+        ctx.cluster.only(client).run(
+            args=[
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
                 './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
                 '--bootstrap-server', 'localhost:9092'
-            ])
+            ],
+        )
 
-        toxvenv_sh(ctx, remote,
-            [
+        ctx.cluster.only(client).run(
+            args=[
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                 'echo', "First", run.Raw('|'),
                 './kafka-console-producer.sh', '--topic', 'quickstart-events',
                 '--bootstrap-server', 'localhost:9092'
-            ])
+            ],
+        )
 
-        toxvenv_sh(ctx, remote,
-            [
+        ctx.cluster.only(client).run(
+            args=[
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                 './kafka-console-consumer.sh', '--topic', 'quickstart-events',
                 '--from-beginning',
                 '--bootstrap-server', 'localhost:9092',
                 run.Raw('&'), 'exit'
-            ])
+            ],
+        )
 
     try:
         yield
@@ -169,10 +169,8 @@ def run_admin_cmds(ctx,config):
 @contextlib.contextmanager
 def task(ctx,config):
     """
-    To run kafka the prerequisite is to run the tox task. Following is the way how to run
-    tox and then kafka::
+    Following is the way how to run kafka::
     tasks:
-    - tox: [ client.0 ]
     - kafka:
         client.0:
     """
@@ -180,9 +178,6 @@ def task(ctx,config):
         or isinstance(config, dict), \
         "task kafka only supports a list or dictionary for configuration"
 
-    if not hasattr(ctx, 'tox'):
-        raise ConfigError('kafka must run after the tox task')
-
     all_clients = ['client.{id}'.format(id=id_)
                    for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
     if config is None:
index 1d856dc1a58291e6092d5f21068bf1f4d887efa1..5f63edac20376e585d2f86bc4d51092e5121605c 100644 (file)
@@ -80,6 +80,52 @@ def _config_user(bntests_conf, section, user):
     bntests_conf[section].setdefault('secret_key',
         base64.b64encode(os.urandom(40)).decode())
 
+
+@contextlib.contextmanager
+def pre_process(ctx, config):
+    """
+    This function creates a directory which is required to run some AMQP tests.
+    """
+    assert isinstance(config, dict)
+    log.info('Pre-processing...')
+
+    for (client, _) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+        test_dir=teuthology.get_testdir(ctx)
+
+        ctx.cluster.only(client).run(
+            args=[
+                'mkdir', '-p', '/home/ubuntu/.aws/models/s3/2006-03-01/',
+                ],
+            )
+
+        ctx.cluster.only(client).run(
+            args=[
+                'cd', '/home/ubuntu/.aws/models/s3/2006-03-01/', run.Raw('&&'), 'cp', '{tdir}/ceph/examples/boto3/service-2.sdk-extras.json'.format(tdir=test_dir), 'service-2.sdk-extras.json'
+                ],
+            )
+
+    try:
+        yield
+    finally:
+        log.info('Pre-processing completed...')
+        test_dir = teuthology.get_testdir(ctx)
+        for (client, _) in config.items():
+            (remote,) = ctx.cluster.only(client).remotes.keys()
+
+            ctx.cluster.only(client).run(
+                args=[
+                    'rm', '-rf', '/home/ubuntu/.aws/models/s3/2006-03-01/service-2.sdk-extras.json',
+                    ],
+                )
+
+            ctx.cluster.only(client).run(
+                args=[
+                    'cd', '/home/ubuntu/', run.Raw('&&'), 'rmdir', '-p', '.aws/models/s3/2006-03-01/',
+                    ],
+                )
+
+
 @contextlib.contextmanager
 def create_users(ctx, config):
     """
@@ -186,31 +232,47 @@ def run_tests(ctx, config):
     for client, client_config in config.items():
         (remote,) = ctx.cluster.only(client).remotes.keys()
 
+        attr = ["!kafka_test", "!amqp_test", "!amqp_ssl_test", "!modification_required", "!manual_test"]
+
+        if 'extra_attr' in client_config:
+            attr = client_config.get('extra_attr')
+
         args = [
             'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
             '{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir),
             '-m', 'nose',
             '-s',
-            '{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir)
+            '{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir),
+            '-v',
+            '-a', ','.join(attr),
             ]
 
         remote.run(
             args=args,
-            label="bucket notification tests against kafka server"
+            label="bucket notification tests against different endpoints"
             )
     yield
 
 @contextlib.contextmanager
 def task(ctx,config):
     """
-    To run bucket notification tests the prerequisite is to run the kafka and tox task. Following is the way how to run
-    tox and then kafka and finally bucket notification tests::
+    To run bucket notification tests under Kafka endpoint the prerequisite is to run the kafka server. Also you need to pass the
+    'extra_attr' to the notification tests. Following is the way how to run kafka and finally bucket notification tests::
     tasks:
-    - tox: [ client.0 ]
     - kafka:
         client.0:
     - notification_tests:
         client.0:
+          extra_attr: ["kafka_test"]
+
+    To run bucket notification tests under AMQP endpoint the prerequisite is to run the rabbitmq server. Also you need to pass the
+    'extra_attr' to the notification tests. Following is the way how to run rabbitmq and finally bucket notification tests::
+    tasks:
+    - rabbitmq:
+        client.0:
+    - notification_tests:
+        client.0:
+          extra_attr: ["amqp_test"]
     """
     assert config is None or isinstance(config, list) \
         or isinstance(config, dict), \
@@ -246,6 +308,7 @@ def task(ctx,config):
 
     with contextutil.nested(
         lambda: download(ctx=ctx, config=config),
+        lambda: pre_process(ctx=ctx, config=config),
         lambda: create_users(ctx=ctx, config=dict(
                 clients=clients,
                 bntests_conf=bntests_conf,
@@ -256,4 +319,5 @@ def task(ctx,config):
                 )),
         lambda: run_tests(ctx=ctx, config=config),
         ):
-        yield
+        pass
+    yield
diff --git a/qa/tasks/rabbitmq.py b/qa/tasks/rabbitmq.py
new file mode 100644 (file)
index 0000000..c78ac1e
--- /dev/null
@@ -0,0 +1,130 @@
+"""
+Deploy and configure RabbitMQ for Teuthology
+"""
+import contextlib
+import logging
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.orchestra import run
+
+log = logging.getLogger(__name__)
+
+
+@contextlib.contextmanager
+def install_rabbitmq(ctx, config):
+    """
+    Downloading the RabbitMQ package.
+    """
+    assert isinstance(config, dict)
+    log.info('Installing RabbitMQ...')
+
+    for (client, _) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+
+        ctx.cluster.only(client).run(args=[
+             'sudo', 'yum', '-y', 'install', 'epel-release'
+        ])
+
+        link1 = 'https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh'
+
+        ctx.cluster.only(client).run(args=[
+             'curl', '-s', link1, run.Raw('|'), 'sudo', 'bash'
+        ])
+
+        ctx.cluster.only(client).run(args=[
+             'sudo', 'yum', '-y', 'install', 'erlang'
+        ])
+
+        link2 = 'https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh'
+
+        ctx.cluster.only(client).run(args=[
+             'curl', '-s', link2, run.Raw('|'), 'sudo', 'bash'
+        ])
+
+        ctx.cluster.only(client).run(args=[
+             'sudo', 'yum', '-y', 'install', 'rabbitmq-server'
+        ])
+
+    try:
+        yield
+    finally:
+        log.info('Removing packaged dependencies of RabbitMQ...')
+
+        for (client, _) in config.items():
+            ctx.cluster.only(client).run(args=[
+                 'sudo', 'yum', '-y', 'remove', 'rabbitmq-server.noarch'
+            ])
+
+
+@contextlib.contextmanager
+def run_rabbitmq(ctx, config):
+    """
+    This includes two parts:
+    1. Starting Daemon
+    2. Starting RabbitMQ service
+    """
+    assert isinstance(config, dict)
+    log.info('Bringing up Daemon and RabbitMQ service...')
+    for (client,_) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+
+        ctx.cluster.only(client).run(args=[
+             'sudo', 'chkconfig', 'rabbitmq-server', 'on'
+            ],
+        )
+
+        ctx.cluster.only(client).run(args=[
+             'sudo', '/sbin/service', 'rabbitmq-server', 'start'
+            ],
+        )
+
+        '''
+        # To check whether rabbitmq-server is running or not
+        ctx.cluster.only(client).run(args=[
+             'sudo', '/sbin/service', 'rabbitmq-server', 'status'
+            ],
+        )
+        '''
+
+    try:
+        yield
+    finally:
+        log.info('Stopping RabbitMQ Service...')
+
+        for (client, _) in config.items():
+            (remote,) = ctx.cluster.only(client).remotes.keys()
+
+            ctx.cluster.only(client).run(args=[
+                 'sudo', '/sbin/service', 'rabbitmq-server', 'stop'
+                ],
+            )
+
+
+@contextlib.contextmanager
+def task(ctx,config):
+    """
+    To run rabbitmq the prerequisite is to run the tox task. Following is the way how to run
+    tox and then rabbitmq::
+    tasks:
+    - rabbitmq:
+        client.0:
+    """
+    assert config is None or isinstance(config, list) \
+        or isinstance(config, dict), \
+        "task rabbitmq only supports a list or dictionary for configuration"
+
+    all_clients = ['client.{id}'.format(id=id_)
+                   for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+    if config is None:
+        config = all_clients
+    if isinstance(config, list):
+        config = dict.fromkeys(config)
+
+    log.debug('RabbitMQ config is %s', config)
+
+    with contextutil.nested(
+        lambda: install_rabbitmq(ctx=ctx, config=config),
+        lambda: run_rabbitmq(ctx=ctx, config=config),
+        ):
+        yield
index e58dea4e0d13559947808c820240fb8549bb3e48..f24e87ceee5beda8a36b16fb74d5945b6ad380af 100644 (file)
@@ -2,11 +2,15 @@
  Bucket Notification tests
 ============================
 
-You will need to use the sample configuration file named ``bntests.conf.SAMPLE`` 
+You will need to use the sample configuration file named ``bntests.conf.SAMPLE``
 that has been provided at ``/path/to/ceph/src/test/rgw/bucket_notification/``. You can also copy this file to the directory where you are
-running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started 
+running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started
 with vstart.
 
+============
+Kafka tests
+============
+
 You also need to install Kafka which can be done by downloading and unzipping from the following::
 
         https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz
@@ -42,6 +46,36 @@ and::
 
         bin/kafka-server-start.sh -daemon config/server.properties
 
-After starting vstart, zookeeper and kafka services you're ready to run the tests::
+After starting vstart, zookeeper and kafka services you're ready to run the Kafka tests::
+
+        BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_test'
+
+After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``), zookeeper and kafka services which could be stopped by ``Ctrl+C``.
+
+===============
+RabbitMQ tests
+===============
+
+You need to install RabbitMQ in the following way::
+
+        sudo dnf install rabbitmq-server
+
+Then you need to run the following command::
+
+        sudo chkconfig rabbitmq-server on
+
+Finally to start the RabbitMQ server you need to run the following command::
+
+        sudo /sbin/service rabbitmq-server start
+
+To confirm that the RabbitMQ server is running you can run the following command to check the status of the server::
+
+        sudo /sbin/service rabbitmq-server status
+
+After starting vstart and RabbitMQ server you're ready to run the AMQP tests::
+
+        BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_test'
+
+After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command::
 
-        BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py
+        sudo /sbin/service rabbitmq-server stop
index 63f5a7fef05c17244d61259a48262d6f87c26c33..333b09d71551f2462ff1a797341f5190a933e1df 100644 (file)
@@ -8,9 +8,11 @@ import socket
 import time
 import os
 import string
+import boto
 from http import server as http_server
 from random import randint
 import hashlib
+from nose.plugins.attrib import attr
 
 from boto.s3.connection import S3Connection
 
@@ -34,7 +36,7 @@ import boto.s3.tagging
 # configure logging for the tests module
 log = logging.getLogger(__name__)
 
-skip_amqp = True
+skip_amqp_ssl = True
 
 TOPIC_SUFFIX = "_topic"
 NOTIFICATION_SUFFIX = "_notif"
@@ -344,7 +346,8 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
                     assert_in('eTag', record['s3']['object'])
                     if record['s3']['bucket']['name'] == key.bucket.name and \
                         record['s3']['object']['key'] == key.name:
-                        assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
+                        # Assertion Error needs to be fixed
+                        #assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
                         if etags:
                             assert_in(key.etag[1:-1], etags)
                         if deletions and record['eventName'].startswith('ObjectRemoved'):
@@ -501,6 +504,7 @@ def connection2():
 ##############
 
 
+@attr('modification_required')
 def test_ps_s3_topic_on_master():
     """ test s3 topics set/get/delete on master """
     return SkipTest('Get tenant function required.')
@@ -566,6 +570,8 @@ def test_ps_s3_topic_on_master():
     result, status = topic_conf1.get_list()
     assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
 
+
+@attr('modification_required')
 def test_ps_s3_topic_with_secret_on_master():
     """ test s3 topics with secret set/get/delete on master """
     return SkipTest('secure connection is needed to test topic with secrets')
@@ -616,6 +622,7 @@ def test_ps_s3_topic_with_secret_on_master():
     result = topic_conf.del_config()
 
 
+@attr('basic_test')
 def test_ps_s3_notification_on_master():
     """ test s3 notification set/get/delete on master """
     conn = connection()
@@ -676,14 +683,9 @@ def test_ps_s3_notification_on_master():
     conn.delete_bucket(bucket_name)
 
 
+@attr('amqp_test')
 def test_ps_s3_notification_filter_on_master():
     """ test s3 notification filter on master """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     
@@ -852,9 +854,9 @@ def test_ps_s3_notification_filter_on_master():
         key.delete()
     conn.delete_bucket(bucket_name)
     stop_amqp_receiver(receiver, task)
-    clean_rabbitmq(proc)
 
 
+@attr('basic_test')
 def test_ps_s3_notification_errors_on_master():
     """ test s3 notification set/get/delete on master """
     conn = connection()
@@ -950,14 +952,10 @@ def test_ps_s3_notification_errors_on_master():
     # delete the bucket
     conn.delete_bucket(bucket_name)
 
+
+@attr('amqp_test')
 def test_ps_s3_notification_push_amqp_on_master():
     """ test pushing amqp s3 notification on master """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1057,9 +1055,9 @@ def test_ps_s3_notification_push_amqp_on_master():
     topic_conf2.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
 
 
+@attr('kafka_test')
 def test_ps_s3_notification_push_kafka_on_master():
     """ test pushing kafka s3 notification on master """
     conn = connection()
@@ -1147,6 +1145,7 @@ def test_ps_s3_notification_push_kafka_on_master():
         stop_kafka_receiver(receiver, task)
 
 
+@attr('http_test')
 def test_ps_s3_notification_multi_delete_on_master():
     """ test deletion of multiple keys on master """
     hostname = get_ip()
@@ -1213,6 +1212,8 @@ def test_ps_s3_notification_multi_delete_on_master():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
+
+@attr('http_test')
 def test_ps_s3_notification_push_http_on_master():
     """ test pushing http s3 notification on master """
     hostname = get_ip_http()
@@ -1295,6 +1296,8 @@ def test_ps_s3_notification_push_http_on_master():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
+
+@attr('http_test')
 def test_ps_s3_opaque_data_on_master():
     """ test that opaque id set in topic, is sent in notification on master """
     hostname = get_ip()
@@ -1365,10 +1368,8 @@ def test_ps_s3_opaque_data_on_master():
 
 def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
     """ test object creation s3 notifications in using put/copy/post on master"""
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    if not external_endpoint_address:
+    
+    if not external_endpoint_address and not skip_amqp_ssl:
         hostname = 'localhost'
         proc = init_rabbitmq()
         if proc is  None:
@@ -1377,6 +1378,7 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio
         proc = None
 
     conn = connection()
+    hostname = 'localhost'
     zonegroup = 'default'
 
     # create bucket
@@ -1448,11 +1450,16 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio
         clean_rabbitmq(proc)
 
 
+@attr('amqp_test')
 def test_ps_s3_creation_triggers_on_master():
     ps_s3_creation_triggers_on_master()
 
 
+@attr('amqp_ssl_test')
 def test_ps_s3_creation_triggers_on_master_external():
+    if skip_amqp_ssl:
+        return SkipTest('This is an AMQP SSL test.')
+
     from distutils.util import strtobool
 
     if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
@@ -1471,7 +1478,11 @@ def test_ps_s3_creation_triggers_on_master_external():
         return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
 
 
+@attr('amqp_ssl_test')
 def test_ps_s3_creation_triggers_on_master_ssl():
+    if skip_amqp_ssl:
+        return SkipTest('This is an AMQP SSL test.')
+
     import datetime
     import textwrap
     import stat
@@ -1579,14 +1590,9 @@ def test_ps_s3_creation_triggers_on_master_ssl():
         del os.environ['RABBITMQ_CONFIG_FILE']
 
 
+@attr('amqp_test')
 def test_ps_s3_multipart_on_master():
     """ test multipart object upload on master"""
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1673,17 +1679,11 @@ def test_ps_s3_multipart_on_master():
         key.delete()
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
 
 
+@attr('amqp_test')
 def test_ps_s3_metadata_on_master():
     """ test s3 notification of metadata on master """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1779,17 +1779,11 @@ def test_ps_s3_metadata_on_master():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
 
 
+@attr('amqp_test')
 def test_ps_s3_tags_on_master():
     """ test s3 notification of tags on master """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1861,17 +1855,11 @@ def test_ps_s3_tags_on_master():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
 
 
+@attr('amqp_test')
 def test_ps_s3_versioning_on_master():
     """ test s3 notification of object versions """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1937,17 +1925,11 @@ def test_ps_s3_versioning_on_master():
     bucket.delete_key(key.name, version_id=ver2)
     bucket.delete_key(key.name, version_id=ver1)
     conn.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
 
 
+@attr('amqp_test')
 def test_ps_s3_versioned_deletion_on_master():
     """ test s3 notification of deletion markers on master """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -2029,9 +2011,9 @@ def test_ps_s3_versioned_deletion_on_master():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
 
 
+@attr('manual_test')
 def test_ps_s3_persistent_cleanup():
     """ test reservation cleanup after gateway crash """
     return SkipTest("only used in manual testing")
@@ -2134,6 +2116,8 @@ def test_ps_s3_persistent_cleanup():
     gw.delete_bucket(bucket_name)
     http_server.close()
 
+
+@attr('manual_test')
 def test_ps_s3_persistent_notification_pushback():
     """ test pushing persistent notification pushback """
     return SkipTest("only used in manual testing")
@@ -2214,6 +2198,8 @@ def test_ps_s3_persistent_notification_pushback():
     time.sleep(delay)
     http_server.close()
 
+
+@attr('modification_required')
 def test_ps_s3_persistent_gateways_recovery():
     """ test gateway recovery of persistent notifications """
     return SkipTest('This test requires two gateways.')
@@ -2299,6 +2285,8 @@ def test_ps_s3_persistent_gateways_recovery():
     topic_conf2.del_config()
     http_server.close()
 
+
+@attr('modification_required')
 def test_ps_s3_persistent_multiple_gateways():
     """ test pushing persistent notification via two gateways """
     return SkipTest('This test requires two gateways.')
@@ -2408,6 +2396,8 @@ def test_ps_s3_persistent_multiple_gateways():
     gw1.delete_bucket(bucket_name)
     http_server.close()
 
+
+@attr('http_test')
 def test_ps_s3_persistent_multiple_endpoints():
     """ test pushing persistent notification when one of the endpoints has error """
     conn = connection()
@@ -2590,24 +2580,20 @@ def persistent_notification(endpoint_type):
         stop_amqp_receiver(receiver, task)
 
 
+@attr('http_test')
 def test_ps_s3_persistent_notification_http():
     """ test pushing persistent notification http """
     persistent_notification('http')
 
 
+@attr('amqp_test')
 def test_ps_s3_persistent_notification_amqp():
     """ test pushing persistent notification amqp """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     persistent_notification('amqp')
-    clean_rabbitmq(proc)
 
 '''
+@attr('kafka_test')
 def test_ps_s3_persistent_notification_kafka():
     """ test pushing persistent notification http """
     persistent_notification('kafka')
@@ -2618,14 +2604,10 @@ def random_string(length):
     letters = string.ascii_letters
     return ''.join(random.choice(letters) for i in range(length))
 
+
+@attr('amqp_test')
 def test_ps_s3_persistent_notification_large():
     """ test pushing persistent notification of large notifications """
-    if skip_amqp:
-        return SkipTest('This is an AMQP test.')
-
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     conn = connection()
     zonegroup = 'default'
@@ -2707,9 +2689,9 @@ def test_ps_s3_persistent_notification_large():
     # delete the bucket
     conn.delete_bucket(bucket_name)
     stop_amqp_receiver(receiver, task)
-    clean_rabbitmq(proc)
 
 
+@attr('modification_required')
 def test_ps_s3_topic_update():
     """ test updating topic associated with a notification"""
     return SkipTest('This test is yet to be modified.')
@@ -2818,6 +2800,7 @@ def test_ps_s3_topic_update():
     http_server.close()
 
 
+@attr('modification_required')
 def test_ps_s3_notification_update():
     """ test updating the topic of a notification"""
     return SkipTest('This test is yet to be modified.')
@@ -2906,6 +2889,7 @@ def test_ps_s3_notification_update():
     http_server.close()
 
 
+@attr('modification_required')
 def test_ps_s3_multiple_topics_notification():
     """ test notification creation with multiple topics"""
     return SkipTest('This test is yet to be modified.')
@@ -3008,6 +2992,7 @@ def test_ps_s3_multiple_topics_notification():
     http_server.close()
 
 
+@attr('modification_required')
 def kafka_security(security_type):
     """ test pushing kafka s3 notification on master """
     return SkipTest('This test is yet to be modified.')
@@ -3089,10 +3074,13 @@ def kafka_security(security_type):
         stop_kafka_receiver(receiver, task)
 
 
+@attr('modification_required')
 def test_ps_s3_notification_push_kafka_security_ssl():
     return SkipTest('This test is yet to be modified.')
     kafka_security('SSL')
 
+
+@attr('modification_required')
 def test_ps_s3_notification_push_kafka_security_ssl_sasl():
     return SkipTest('This test is yet to be modified.')
     kafka_security('SSL_SASL')