]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/rgw/notifications: migrate from nose to pytest
authorYuval Lifshitz <ylifshit@ibm.com>
Thu, 19 Feb 2026 16:55:24 +0000 (16:55 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 23 Feb 2026 17:48:53 +0000 (17:48 +0000)
Fixes: https://tracker.ceph.com/issues/74573
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
Co-Authored-By: Claude <noreply@anthropic.com>
qa/tasks/notification_tests.py
src/test/rgw/bucket_notification/README.rst
src/test/rgw/bucket_notification/__init__.py
src/test/rgw/bucket_notification/bootstrap
src/test/rgw/bucket_notification/pytest.ini [new file with mode: 0644]
src/test/rgw/bucket_notification/requirements.txt
src/test/rgw/bucket_notification/setup.py [deleted file]
src/test/rgw/bucket_notification/test_bn.py
src/test/rgw/bucket_notification/tox.ini [new file with mode: 0644]

index cc84b1575a5f1eb1b5ca372a3c79e9b5c1d21da2..a4b0fab97ad5a3386796d7c34fc403c5ef8282fd 100644 (file)
@@ -220,19 +220,19 @@ def run_tests(ctx, config):
     for client, client_config in config.items():
         (remote,) = ctx.cluster.only(client).remotes.keys()
 
-        attr = ["basic_test"]
+        markers = ["basic_test"]
 
         if 'extra_attr' in client_config:
-            attr = client_config.get('extra_attr')
+            markers = client_config.get('extra_attr')
 
         args = [
             'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
             '{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir),
-            '-m', 'nose',
+            '-m', 'pytest',
             '-s',
             '{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir),
             '-v',
-            '-a', ','.join(attr),
+            '-m', ' or '.join(markers),
             ]
 
         remote.run(
index 02d42a2abd6fa361d396d7582ddb25d3492641e1..db58e8cfcc7b1ff8819729257cbaf4bc4dcbdf28 100644 (file)
@@ -57,7 +57,7 @@ and::
 
 After running `vstart.sh`, Zookeeper, and Kafka services you're ready to run the Kafka tests::
 
-        BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_test'
+        BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_test'
 
 --------------------
 Kafka Security Tests
@@ -120,7 +120,7 @@ And restart the Kafka server. Once both Zookeeper and Kafka are up, run the foll
 
 To run the Kafka security test, you also need to provide the test with the location of the Kafka directory::
 
-        KAFKA_DIR=/path/to/kafka BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_security_test'
+        KAFKA_DIR=/path/to/kafka BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test'
 
 ==============
 RabbitMQ Tests
@@ -148,7 +148,7 @@ To confirm that the RabbitMQ server is running you can run the following command
 
 After running `vstart.sh` and RabbitMQ server you're ready to run the AMQP tests::
 
-        BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_test'
+        BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'amqp_test'
 
 After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command::
 
@@ -156,7 +156,7 @@ After running the tests you need to stop the vstart cluster (``/path/to/ceph/src
 
 To run the RabbitMQ SSL security tests use the following::
 
-        BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_ssl_test'
+        BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'amqp_ssl_test'
 
 During these tests, the test script will restart the RabbitMQ server with the correct security configuration (``sudo`` privileges will be needed).
 For that reason it is not recommended to run the `amqp_ssl_test` tests, that assumes a manually configured rabbirmq server, in the same run as `amqp_test` tests, 
index 97d6cf3c5a280b1c289df4f154c7815655cd6750..954c6575bb6c3d7ea0535ec41654e542ede92035 100644 (file)
@@ -1,5 +1,6 @@
 import configparser
 import os
+import pytest
 from .api import admin
 
 def setup():
@@ -72,3 +73,7 @@ def get_access_key():
 def get_secret_key():
     global main_secret_key
     return main_secret_key
+
+@pytest.fixture(autouse=True, scope="package")
+def configfile():
+    setup()
index 4d4a5a7487827177a2432a68d25042f9481c26e3..4d680c86d2f4fb36ed90bb1e46a8b8d0e7769816 100755 (executable)
@@ -29,17 +29,5 @@ python3 -m venv --system-site-packages virtualenv
 
 # avoid pip bugs
 ./virtualenv/bin/pip install --upgrade pip
-#pip3 install --upgrade setuptools cffi  # address pip issue: https://github.com/pypa/pip/issues/6264
-
-# work-around change in pip 1.5
-#./virtualenv/bin/pip install six
-#./virtualenv/bin/pip install -I nose
-#./virtualenv/bin/pip install setuptools
 
 ./virtualenv/bin/pip install -U -r requirements.txt
-
-# forbid setuptools from using the network because it'll try to use
-# easy_install, and we really wanted pip; next line will fail if pip
-# requirements.txt does not match setup.py requirements -- sucky but
-# good enough for now
-./virtualenv/bin/python setup.py develop
diff --git a/src/test/rgw/bucket_notification/pytest.ini b/src/test/rgw/bucket_notification/pytest.ini
new file mode 100644 (file)
index 0000000..9d80d73
--- /dev/null
@@ -0,0 +1,11 @@
+[pytest]
+markers =
+  basic_test
+  http_test
+  kafka_test
+  amqp_test
+  kafka_failover
+  kafka_security_test
+  amqp_ssl_test
+  manual_test
+  not_implemented
index d57533fbd87286c522b0e1a8a72f02932c5d017f..14885b11f16e8e3bba21535399b35647b9792520 100644 (file)
@@ -1,4 +1,4 @@
-nose-py3 >=1.0.0
+pytest
 boto3 >=1.0.0
 configparser >=5.0.0
 kafka-python >=2.0.0
diff --git a/src/test/rgw/bucket_notification/setup.py b/src/test/rgw/bucket_notification/setup.py
deleted file mode 100644 (file)
index 189ab27..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/usr/bin/python
-from setuptools import setup, find_packages
-
-setup(
-    name='bn_tests',
-    version='0.0.1',
-    packages=find_packages(),
-
-    author='Kalpesh Pandya',
-    author_email='kapandya@redhat.com',
-    description='Bucket Notification compatibility tests',
-    license='MIT',
-    keywords='bn web testing',
-
-    install_requires=[
-        'boto >=2.0b4',
-        'boto3 >=1.0.0'
-        ],
-    )
index 81879ef76f02b69ad8906efc0404db3d587ea92b..830b6e48ffec73950b7f185099dce5266ef795b7 100644 (file)
@@ -16,8 +16,7 @@ from botocore.exceptions import ClientError
 from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
 from random import randint
 import hashlib
-# XXX this should be converted to use pytest
-from nose.plugins.attrib import attr
+import pytest
 import boto3
 import datetime
 from cloudevents.http import from_http
@@ -25,6 +24,7 @@ from dateutil import parser
 import requests
 
 from . import(
+    configfile,
     get_config_host,
     get_config_port,
     get_config_zonegroup,
@@ -46,8 +46,6 @@ from .api import PSTopicS3, \
     S3Key, \
     MultipartUpload
 
-from nose import SkipTest
-from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
 
 # configure logging for the tests module
 class LogWrapper:
@@ -124,13 +122,13 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
                 if key_found:
                     break
                 for record in record_list['Records']:
-                    assert_in('eTag', record['s3']['object'])
+                    assert 'eTag' in record['s3']['object']
                     if record['s3']['bucket']['name'] == key.bucket.name and \
                         record['s3']['object']['key'] == key.name:
                         # Assertion Error needs to be fixed
-                        #assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
+                        #assert key.etag[1:-1] == record['s3']['object']['eTag']
                         if etags:
-                            assert_in(key.etag[1:-1], etags)
+                            assert key.etag[1:-1] in etags
                         if len(record['s3']['object']['metadata']) > 0:
                             for meta in record['s3']['object']['metadata']:
                                 assert(meta['key'].startswith(META_PREFIX))
@@ -144,12 +142,12 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
                             break
         else:
             for record in records['Records']:
-                assert_in('eTag', record['s3']['object'])
+                assert 'eTag' in record['s3']['object']
                 if record['s3']['bucket']['name'] == key.bucket.name and \
                     record['s3']['object']['key'] == key.name:
-                    assert_equal(key.etag, record['s3']['object']['eTag'])
+                    assert key.etag == record['s3']['object']['eTag']
                     if etags:
-                        assert_in(key.etag[1:-1], etags)
+                        assert key.etag[1:-1] in etags
                     if len(record['s3']['object']['metadata']) > 0:
                         for meta in record['s3']['object']['metadata']:
                             assert(meta['key'].startswith(META_PREFIX))
@@ -166,7 +164,7 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
             err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
             assert False, err
         elif expected_sizes:
-            assert_equal(object_size, expected_sizes.get(key.name))
+            assert object_size == expected_sizes.get(key.name)
 
     if not len(records) == len(keys):
         err = 'superfluous records are found'
@@ -192,13 +190,13 @@ class HTTPPostHandler(BaseHTTPRequestHandler):
         if self.server.cloudevents:
             event = from_http(self.headers, body)
             record = json.loads(body)['Records'][0]
-            assert_equal(event['specversion'], '1.0')
-            assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
-            assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
-            assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
-            assert_equal(event['datacontenttype'], 'application/json')
-            assert_equal(event['subject'], record['s3']['object']['key'])
-            assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
+            assert event['specversion'] == '1.0'
+            assert event['id'] == record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2']
+            assert event['source'] == 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name']
+            assert event['type'] == 'com.amazonaws.' + record['eventName']
+            assert event['datacontenttype'] == 'application/json'
+            assert event['subject'] == record['s3']['object']['key']
+            assert parser.parse(event['time']) == parser.parse(record['eventTime'])
         log.info('HTTP Server received event: %s', str(body))
         self.server.append(json.loads(body))
         if self.headers.get('Expect') == '100-continue':
@@ -558,8 +556,8 @@ def verify_kafka_receiver(receiver):
         remaining_retries -= 1
         if remaining_retries == 0:
             raise Exception('kafka receiver on topic: %s did not receive test event in time', receiver.topic)
-    assert_equal(len(events), 1)
-    assert_in('test', events[0])
+    assert len(events) == 1
+    assert 'test' in events[0]
     log.info('Kafka receiver on topic: %s tested ok', receiver.topic)
 
 
@@ -603,7 +601,7 @@ def another_user(user=None, tenant=None, account=None):
         arn = f'arn:aws:iam::{account}:user/Superman'
 
     _, rc = admin(cmd, get_config_cluster())
-    assert_equal(rc, 0)
+    assert rc == 0
 
     conn = S3Connection(aws_access_key_id=access_key,
                         aws_secret_access_key=secret_key,
@@ -616,7 +614,7 @@ def list_topics(assert_len=None, tenant=''):
         result = admin(['topic', 'list'], get_config_cluster())
     else:
         result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
     try:
         actual_len = len(parsed_result['topics'])
@@ -630,7 +628,7 @@ def list_topics(assert_len=None, tenant=''):
 
 def get_stats_persistent_topic(topic_name, assert_entries_number=None):
     result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
     if assert_entries_number:
         actual_number = parsed_result['Topic Stats']['Entries']
@@ -652,7 +650,7 @@ def get_topic(topic_name, tenant='', allow_failure=False):
         result = admin(['topic', 'get', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
     if allow_failure:
         return result
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
     return parsed_result
 
@@ -663,7 +661,7 @@ def remove_topic(topic_name, tenant='', allow_failure=False):
     else:
         result = admin(['topic', 'rm', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
     if not allow_failure:
-        assert_equal(result[1], 0)
+        assert result[1] == 0
     return result[1]
 
 
@@ -672,7 +670,7 @@ def list_notifications(bucket_name, assert_len=None, tenant=''):
         result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
     else:
         result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
     actual_len = len(parsed_result['notifications'])
     if assert_len and assert_len != actual_len:
@@ -686,9 +684,9 @@ def get_notification(bucket_name, notification_name, tenant=''):
         result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster())
     else:
         result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name, '--tenant', tenant], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Id'], notification_name)
+    assert parsed_result['Id'] == notification_name
     return parsed_result
 
 
@@ -700,7 +698,7 @@ def remove_notification(bucket_name, notification_name='', tenant='', allow_fail
         args.extend(['--tenant', tenant])
     result = admin(args, get_config_cluster())
     if not allow_failure:
-        assert_equal(result[1], 0)
+        assert result[1] == 0
     return result[1]
 
 
@@ -713,11 +711,11 @@ def zonegroup_modify_feature(enable, feature_name):
     else:
         command = '--disable-feature='+feature_name
     result = admin(['zonegroup', 'modify', command], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     result = admin(['period', 'update'], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     result = admin(['period', 'commit'], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
 
 
 def connect_random_user(tenant=''):
@@ -728,7 +726,7 @@ def connect_random_user(tenant=''):
         _, rc = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
     else:
         _, rc = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
-    assert_equal(rc, 0)
+    assert rc == 0
     conn = S3Connection(aws_access_key_id=access_key,
                         aws_secret_access_key=secret_key,
                         is_secure=False, port=get_config_port(), host=get_config_host())
@@ -739,7 +737,7 @@ def connect_random_user(tenant=''):
 ##############
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_topic():
     """ test topics set/get/delete """
 
@@ -759,51 +757,48 @@ def test_topic():
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
     topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf1.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+    assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1'
 
     endpoint_address = 'http://127.0.0.1:9001'
     endpoint_args = 'push-endpoint='+endpoint_address
     topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf2.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+    assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2'
     endpoint_address = 'http://127.0.0.1:9002'
     endpoint_args = 'push-endpoint='+endpoint_address
     topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf3.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+    assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3'
 
     # get topic 3
     result, status = topic_conf3.get_config()
-    assert_equal(status, 200)
-    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
-    assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+    assert status == 200
+    assert topic_arn == result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']
+    assert endpoint_address == result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']
 
     # Note that endpoint args may be ordered differently in the result
     # delete topic 1
     result = topic_conf1.del_config()
-    assert_equal(status, 200)
+    assert status == 200
 
     # try to get a deleted topic
     _, status = topic_conf1.get_config()
-    assert_equal(status, 404)
+    assert status == 404
 
     # get the remaining 2 topics
     list_topics(2, tenant)
 
     # delete topics
     status = topic_conf2.del_config()
-    assert_equal(status, 200)
+    assert status == 200
     status = topic_conf3.del_config()
-    assert_equal(status, 200)
+    assert status == 200
 
     # get topic list, make sure it is empty
     list_topics(0, tenant)
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_topic_admin():
     """ test topics set/get/delete """
 
@@ -823,46 +818,40 @@ def test_topic_admin():
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
     topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
     topic_arn1 = topic_conf1.set_config()
-    assert_equal(topic_arn1,
-                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+    assert topic_arn1 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1'
 
     endpoint_address = 'http://127.0.0.1:9001'
     endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
     topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
     topic_arn2 = topic_conf2.set_config()
-    assert_equal(topic_arn2,
-                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+    assert topic_arn2 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2'
     endpoint_address = 'http://127.0.0.1:9002'
     endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
     topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
     topic_arn3 = topic_conf3.set_config()
-    assert_equal(topic_arn3,
-                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+    assert topic_arn3 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3'
 
     # get topic 3 via commandline
     parsed_result = get_topic(topic_name+'_3', tenant)
-    assert_equal(parsed_result['arn'], topic_arn3)
+    assert parsed_result['arn'] == topic_arn3
     matches = [tenant, UID_PREFIX]
-    assert_true( all([x in parsed_result['owner'] for x in matches]))
-    assert_equal(parsed_result['dest']['persistent_queue'],
-                 tenant + ":" + topic_name + '_3')
+    assert all([x in parsed_result['owner'] for x in matches])
+    assert parsed_result['dest']['persistent_queue'] == tenant + ":" + topic_name + '_3'
 
     # recall CreateTopic and verify the owner and persistent_queue remain same.
     topic_conf3 = PSTopicS3(conn, topic_name + '_3', zonegroup,
                             endpoint_args=endpoint_args)
     topic_arn3 = topic_conf3.set_config()
-    assert_equal(topic_arn3,
-                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+    assert topic_arn3 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3'
     # get topic 3 via commandline
     result = admin(
       ['topic', 'get', '--topic', topic_name + '_3', '--tenant', tenant],
       get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['arn'], topic_arn3)
-    assert_true(all([x in parsed_result['owner'] for x in matches]))
-    assert_equal(parsed_result['dest']['persistent_queue'],
-                 tenant + ":" + topic_name + '_3')
+    assert parsed_result['arn'] == topic_arn3
+    assert all([x in parsed_result['owner'] for x in matches])
+    assert parsed_result['dest']['persistent_queue'] == tenant + ":" + topic_name + '_3'
 
     # delete topic 3
     remove_topic(topic_name + '_3', tenant)
@@ -870,7 +859,7 @@ def test_topic_admin():
     # try to get a deleted topic
     _, result = get_topic(topic_name + '_3', tenant, allow_failure=True)
     print('"topic not found" error is expected')
-    assert_equal(result, 2)
+    assert result == 2
 
     # get the remaining 2 topics
     list_topics(2, tenant)
@@ -899,8 +888,7 @@ def notification_configuration(with_cli):
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + '::' + topic_name)
+    assert topic_arn == 'arn:aws:sns:' + zonegroup + '::' + topic_name
     # create notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [
@@ -943,45 +931,45 @@ def notification_configuration(with_cli):
                     }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # get notification 1
     if with_cli:
         get_notification(bucket_name, notification_name+'_1')
     else:
         response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
-        assert_equal(status/100, 2)
-        assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+        assert status/100 == 2
+        assert response['NotificationConfiguration']['TopicConfiguration']['Topic'] == topic_arn
 
     # list notification
     if with_cli:
         list_notifications(bucket_name, 3)
     else:
         result, status = s3_notification_conf.get_config()
-        assert_equal(status, 200)
-        assert_equal(len(result['TopicConfigurations']), 3)
+        assert status == 200
+        assert len(result['TopicConfigurations']) == 3
 
     # delete notification 2
     if with_cli:
         remove_notification(bucket_name, notification_name + '_2')
     else:
         _, status = s3_notification_conf.del_config(notification=notification_name+'_2')
-        assert_equal(status/100, 2)
+        assert status/100 == 2
 
     # list notification
     if with_cli:
         list_notifications(bucket_name, 2)
     else:
         result, status = s3_notification_conf.get_config()
-        assert_equal(status, 200)
-        assert_equal(len(result['TopicConfigurations']), 2)
+        assert status == 200
+        assert len(result['TopicConfigurations']) == 2
 
     # delete notifications
     if with_cli:
         remove_notification(bucket_name)
     else:
         _, status = s3_notification_conf.del_config()
-        assert_equal(status/100, 2)
+        assert status/100 == 2
 
     # list notification, make sure it is empty
     list_notifications(bucket_name, 0)
@@ -992,25 +980,25 @@ def notification_configuration(with_cli):
     conn.delete_bucket(bucket_name)
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_notification_configuration_admin():
     """ test notification list/set/get/delete, with admin cli """
     notification_configuration(True)
 
 
-@attr('not_implemented')
+@pytest.mark.not_implemented
 def test_topic_with_secret():
     """ test topics with secret set/get/delete """
-    return SkipTest('This test is yet to be implemented')
+    pytest.skip('This test is yet to be implemented')
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_notification_configuration():
     """ test notification set/get/deleter """
     notification_configuration(False)
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_notification_empty_config():
     """ test notification set/get/delete with empty config """
     hostname = get_ip()
@@ -1038,26 +1026,26 @@ def test_notification_empty_config():
                        }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # get notifications on a bucket
     response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
-    assert_equal(status/100, 2)
-    assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+    assert status/100 == 2
+    assert response['NotificationConfiguration']['TopicConfiguration']['Topic'] == topic_arn
 
     # create notification again with empty configuration to check if it deletes or not
     topic_conf_list = []
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # make sure that the notification is now deleted
     response, status = s3_notification_conf.get_config()
     try:
         check = response['NotificationConfiguration']
     except KeyError as e:
-        assert_equal(status/100, 2)
+        assert status/100 == 2
     else:
         assert False
 
@@ -1067,7 +1055,7 @@ def test_notification_empty_config():
     conn.delete_bucket(bucket_name)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_notification_filter_amqp():
     """ test notification filter """
 
@@ -1128,7 +1116,7 @@ def test_notification_filter_amqp():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     result, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     topic_conf_list = [{'Id': notification_name+'_4',
                         'TopicArn': topic_arn,
@@ -1147,7 +1135,7 @@ def test_notification_filter_amqp():
     try:
         s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list)
         _, status = s3_notification_conf4.set_config()
-        assert_equal(status/100, 2)
+        assert status/100 == 2
         skip_notif4 = False
     except Exception as error:
         print('note: metadata filter is not supported by boto3 - skipping test')
@@ -1156,14 +1144,14 @@ def test_notification_filter_amqp():
 
     # get all notifications
     result, status = s3_notification_conf.get_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
     for conf in result['TopicConfigurations']:
         filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
         assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
 
     if not skip_notif4:
         result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
-        assert_equal(status/100, 2)
+        assert status/100 == 2
         filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
         assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
 
@@ -1212,9 +1200,9 @@ def test_notification_filter_amqp():
         notif_id = event['Records'][0]['s3']['configurationId']
         key_name = event['Records'][0]['s3']['object']['key']
         awsRegion = event['Records'][0]['awsRegion']
-        assert_equal(awsRegion, zonegroup)
+        assert awsRegion == zonegroup
         bucket_arn = event['Records'][0]['s3']['bucket']['arn']
-        assert_equal(bucket_arn, "arn:aws:s3:"+awsRegion+"::"+bucket_name)
+        assert bucket_arn == "arn:aws:s3:"+awsRegion+"::"+bucket_name
         if notif_id == notification_name+'_1':
             found_in1.append(key_name)
         elif notif_id == notification_name+'_2':
@@ -1226,11 +1214,11 @@ def test_notification_filter_amqp():
         else:
             assert False, 'invalid notification: ' + notif_id
 
-    assert_equal(set(found_in1), set(expected_in1))
-    assert_equal(set(found_in2), set(expected_in2))
-    assert_equal(set(found_in3), set(expected_in3))
+    assert set(found_in1) == set(expected_in1)
+    assert set(found_in2) == set(expected_in2)
+    assert set(found_in3) == set(expected_in3)
     if not skip_notif4:
-        assert_equal(set(found_in4), set(expected_in4))
+        assert set(found_in4) == set(expected_in4)
 
     # cleanup
     s3_notification_conf.del_config()
@@ -1244,7 +1232,7 @@ def test_notification_filter_amqp():
     stop_amqp_receiver(receiver, task)
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_notification_errors():
     """ test notification set/get/delete """
     conn = connection()
@@ -1331,10 +1319,10 @@ def test_notification_errors():
 
     status = topic_conf.del_config()
     # deleting an unknown notification is not considered an error
-    assert_equal(status, 200)
+    assert status == 200
 
     _, status = topic_conf.get_config()
-    assert_equal(status, 404)
+    assert status == 404
 
     # cleanup
     # delete the bucket
@@ -1372,7 +1360,7 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro
                             }]
         s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
         response, status = s3_notification_conf.set_config()
-        assert_equal(status/100, 2)
+        assert status/100 == 2
     elif endpoint_type == 'amqp':
         # start amqp receiver
         exchange = 'ex1'
@@ -1393,7 +1381,7 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro
                             }]
         s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
         response, status = s3_notification_conf.set_config()
-        assert_equal(status/100, 2)
+        assert status/100 == 2
     elif endpoint_type == 'kafka':
         # start kafka receiver
         default_kafka_server_and_port = default_kafka_server + ':9092'
@@ -1418,9 +1406,9 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro
                             }]
         s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
         response, status = s3_notification_conf.set_config()
-        assert_equal(status/100, 2)
+        assert status/100 == 2
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     # create objects in the bucket
     number_of_objects = 100
@@ -1478,14 +1466,14 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro
     receiver.close(task)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_notification_amqp():
     """ test pushing amqp notification """
     conn = connection()
     notification('amqp', conn)
 
 
-@attr('manual_test')
+@pytest.mark.manual_test
 def test_notification_amqp_idleness_check():
     """ test pushing amqp notification and checking for connection idleness """
     hostname = get_ip()
@@ -1516,7 +1504,7 @@ def test_notification_amqp_idleness_check():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -1615,28 +1603,28 @@ def test_notification_amqp_idleness_check():
     conn.delete_bucket(bucket_name)
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_notification_kafka():
     """ test pushing kafka notification """
     conn = connection()
     notification('kafka', conn)
 
 
-@attr('kafka_failover')
+@pytest.mark.kafka_failover
 def test_notification_kafka_multiple_brokers_override():
     """ test pushing kafka notification """
     conn = connection()
     notification('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server))
 
 
-@attr('kafka_failover')
+@pytest.mark.kafka_failover
 def test_notification_kafka_multiple_brokers_append():
     """ test pushing kafka notification """
     conn = connection()
     notification('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
 
 
-@attr('manual_test')
+@pytest.mark.manual_test
 def test_1K_topics():
     """ test creation of moe than 1K topics """
     conn = connection()
@@ -1739,7 +1727,7 @@ def test_1K_topics():
                 log.error(f"error during cleanup: {e}")
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_notification_multi_delete():
     """ test deletion of multiple keys """
     hostname = get_ip()
@@ -1771,7 +1759,7 @@ def test_notification_multi_delete():
                        }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket
     client_threads = []
@@ -1807,14 +1795,14 @@ def test_notification_multi_delete():
     http_server.close()
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_notification_http():
     """ test pushing http notification """
     conn = connection()
     notification('http', conn)
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_notification_cloudevents():
     """ test pushing cloudevents notification """
     conn = connection()
@@ -1822,7 +1810,7 @@ def test_notification_cloudevents():
 
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_opaque_data():
     """ test that opaque id set in topic, is sent in notification """
     hostname = get_ip()
@@ -1855,7 +1843,7 @@ def test_opaque_data():
                        }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket
     client_threads = []
@@ -1879,7 +1867,7 @@ def test_opaque_data():
     print('total number of objects: ' + str(len(keys)))
     events = http_server.get_and_reset_events()
     for event in events:
-        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+        assert event['Records'][0]['opaqueData'] == opaque_data
 
     # cleanup
     for key in keys:
@@ -1926,7 +1914,7 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     # create topic
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
@@ -1939,7 +1927,7 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa
                         }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket
     obj_prefix = 'ooo'
@@ -1981,13 +1969,13 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa
 
     assert len(no_keys) == 0, "lifecycle didn't delete the objects after 500 seconds"
     wait_for_queue_to_drain(topic_name, http_port=port)
-    assert_equal(len(no_keys), 0)
+    assert len(no_keys) == 0
     event_keys = []
     events = receiver.get_and_reset_events()
     if not expected_abortion:
         assert number_of_objects * 2 <= len(events)
     for event in events:
-        assert_in(event['Records'][0]['eventName'], record_events)
+        assert event['Records'][0]['eventName'] in record_events
         event_keys.append(event['Records'][0]['s3']['object']['key'])
     for key in keys:
         key_found = False
@@ -2028,7 +2016,7 @@ def create_thread(bucket, obj_prefix, i, content):
     return threading.Thread(target = set_contents_from_string, args=(key, content,))
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_lifecycle_http():
     """ test that when object is deleted due to lifecycle policy, http endpoint """
 
@@ -2037,7 +2025,7 @@ def test_lifecycle_http():
               rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current'])
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_lifecycle_kafka():
     """ test that when object is deleted due to lifecycle policy, kafka endpoint """
 
@@ -2055,7 +2043,7 @@ def start_and_abandon_multipart_upload(bucket, key_name, content):
         print('Error: ' + str(e))
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_lifecycle_abort_mpu():
     """ test that when a multipart upload is aborted by lifecycle policy, http endpoint """
 
@@ -2085,7 +2073,7 @@ def creation_triggers(external_endpoint_address=None, ca_location=None, verify_s
         hostname = get_ip()
         proc = init_rabbitmq()
         if proc is  None:
-            return SkipTest('end2end amqp tests require rabbitmq-server installed')
+            pytest.skip('end2end amqp tests require rabbitmq-server installed')
     else:
         proc = None
 
@@ -2122,7 +2110,7 @@ def creation_triggers(external_endpoint_address=None, ca_location=None, verify_s
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     objects_size = {}
     # create objects in the bucket using PUT
@@ -2168,12 +2156,12 @@ def creation_triggers(external_endpoint_address=None, ca_location=None, verify_s
         clean_rabbitmq(proc)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_creation_triggers_amqp():
     creation_triggers(external_endpoint_address="amqp://localhost:5672")
 
 
-@attr('amqp_ssl_test')
+@pytest.mark.amqp_ssl_test
 def test_creation_triggers_external():
 
     from distutils.util import strtobool
@@ -2191,7 +2179,7 @@ def test_creation_triggers_external():
             external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
             verify_ssl=verify_ssl)
     else:
-        return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
+        pytest.skip("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
 
 
 def generate_private_key(tempdir):
@@ -2285,7 +2273,7 @@ def generate_private_key(tempdir):
     return CACERTFILE, CERTFILE, KEYFILE
 
 
-@attr('amqp_ssl_test')
+@pytest.mark.amqp_ssl_test
 def test_creation_triggers_ssl():
 
     import textwrap
@@ -2314,7 +2302,7 @@ def test_creation_triggers_ssl():
         del os.environ['RABBITMQ_CONFIG_FILE']
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_post_object_upload_amqp():
     """ test that uploads object using HTTP POST """
 
@@ -2364,18 +2352,18 @@ def test_post_object_upload_amqp():
                        }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
     ("Content-Type" , "text/plain"),('file', ('bar'))])
 
     # POST upload
     r = requests.post(url, files=payload, verify=True)
-    assert_equal(r.status_code, 204)
+    assert r.status_code == 204
 
     # check amqp receiver
     events = receiver1.get_and_reset_events()
-    assert_equal(len(events), 1)
+    assert len(events) == 1
 
     # cleanup
     stop_amqp_receiver(receiver1, task1)
@@ -2419,7 +2407,7 @@ def multipart_endpoint_agnostic(endpoint_type, conn):
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     # create topic
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
@@ -2432,7 +2420,7 @@ def multipart_endpoint_agnostic(endpoint_type, conn):
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket
     client_threads = []
@@ -2461,21 +2449,21 @@ def multipart_endpoint_agnostic(endpoint_type, conn):
     receiver.close(task)
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_multipart_http():
     """ test http multipart object upload """
     conn = connection()
     multipart_endpoint_agnostic('http', conn)
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_multipart_kafka():
     """ test kafka multipart object upload """
     conn = connection()
     multipart_endpoint_agnostic('kafka', conn)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_multipart_ampq():
     """ test ampq multipart object upload """
     conn = connection()
@@ -2514,7 +2502,7 @@ def metadata_filter(endpoint_type, conn):
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     # create topic
     zonegroup = get_config_zonegroup()
@@ -2535,7 +2523,7 @@ def metadata_filter(endpoint_type, conn):
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     expected_keys = []
     # create objects in the bucket
@@ -2578,7 +2566,7 @@ def metadata_filter(endpoint_type, conn):
     wait_for_queue_to_drain(topic_name, http_port=port)
     # check amqp receiver
     events = receiver.get_and_reset_events()
-    assert_equal(len(events), len(expected_keys))
+    assert len(events) == len(expected_keys)
     for event in events:
         assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
 
@@ -2589,7 +2577,7 @@ def metadata_filter(endpoint_type, conn):
     wait_for_queue_to_drain(topic_name, http_port=port)
     # check endpoint receiver
     events = receiver.get_and_reset_events()
-    assert_equal(len(events), len(expected_keys))
+    assert len(events) == len(expected_keys)
     for event in events:
         assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
 
@@ -2601,28 +2589,28 @@ def metadata_filter(endpoint_type, conn):
     conn.delete_bucket(bucket_name)
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_metadata_filter_kafka():
     """ test notification of filtering metadata, kafka endpoint """
     conn = connection()
     metadata_filter('kafka', conn)
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_metadata_filter_http():
     """ test notification of filtering metadata, http endpoint """
     conn = connection()
     metadata_filter('http', conn)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_metadata_filter_ampq():
     """ test notification of filtering metadata, ampq endpoint """
     conn = connection()
     metadata_filter('amqp', conn)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_metadata_amqp():
     """ test notification of metadata """
 
@@ -2656,7 +2644,7 @@ def test_metadata_amqp():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket
     key_name = 'foo'
@@ -2695,7 +2683,7 @@ def test_metadata_amqp():
     events = receiver.get_and_reset_events()
     for event in events:
         value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
-        assert_equal(value[0], meta_value)
+        assert value[0] == meta_value
 
     # delete objects
     for key in bucket.list():
@@ -2706,7 +2694,7 @@ def test_metadata_amqp():
     events = receiver.get_and_reset_events()
     for event in events:
         value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
-        assert_equal(value[0], meta_value)
+        assert value[0] == meta_value
 
     # cleanup
     stop_amqp_receiver(receiver, task)
@@ -2716,7 +2704,7 @@ def test_metadata_amqp():
     conn.delete_bucket(bucket_name)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_tags_amqp():
     """ test notification of tags """
 
@@ -2752,7 +2740,7 @@ def test_tags_amqp():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     expected_keys = []
     # create objects in the bucket with tags
@@ -2797,11 +2785,11 @@ def test_tags_amqp():
         key = event['Records'][0]['s3']['object']['key']
         if (key == key_name1):
             obj_tags =  sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
-            assert_equal(obj_tags, expected_tags1)
+            assert obj_tags == expected_tags1
         event_count += 1
         assert(key in expected_keys)
 
-    assert_equal(event_count, len(expected_keys))
+    assert event_count == len(expected_keys)
 
     # delete the objects
     for key in bucket.list():
@@ -2814,7 +2802,7 @@ def test_tags_amqp():
         key = event['Records'][0]['s3']['object']['key']
         if (key == key_name1):
             obj_tags =  sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
-            assert_equal(obj_tags, expected_tags1)
+            assert obj_tags == expected_tags1
         event_count += 1
         assert(key in expected_keys)
 
@@ -2827,7 +2815,7 @@ def test_tags_amqp():
     # delete the bucket
     conn.delete_bucket(bucket_name)
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_versioning_amqp():
     """ test notification of object versions """
 
@@ -2858,7 +2846,7 @@ def test_versioning_amqp():
                        }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket
     key_name = 'foo'
@@ -2889,7 +2877,7 @@ def test_versioning_amqp():
             else:
                 print('version ok: '+version+' in: '+str(versions))
 
-    assert_equal(num_of_versions, 3)
+    assert num_of_versions == 3
 
     # cleanup
     stop_amqp_receiver(receiver, task)
@@ -2902,7 +2890,7 @@ def test_versioning_amqp():
     #conn.delete_bucket(bucket_name)
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_versioned_deletion_amqp():
     """ test notification of deletion markers """
 
@@ -2939,7 +2927,7 @@ def test_versioned_deletion_amqp():
                        }]
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket
     key = bucket.new_key('foo')
@@ -2988,10 +2976,10 @@ def test_versioned_deletion_amqp():
 
     # 2 key versions were deleted
     # notified over the same topic via 2 notifications (1,3)
-    assert_equal(delete_events, 2*2)
+    assert delete_events == 2*2
     # 1 deletion marker was created
     # notified over the same topic over 2 notifications (1,2)
-    assert_equal(delete_marker_create_events, 1*2)
+    assert delete_marker_create_events == 1*2
 
     # cleanup
     delete_marker_key.delete()
@@ -3002,7 +2990,7 @@ def test_versioned_deletion_amqp():
     conn.delete_bucket(bucket_name)
 
 
-@attr('manual_test')
+@pytest.mark.manual_test
 def test_persistent_cleanup():
     """ test reservation cleanup after gateway crash """
     conn = connection()
@@ -3035,7 +3023,7 @@ def test_persistent_cleanup():
         }]
     s3_notification_conf = PSNotificationS3(gw, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     client_threads = []
     start_time = time.time()
@@ -3129,7 +3117,7 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
         if http_port:
             check_http_server(http_port)
         result = admin(cmd, get_config_cluster())
-        assert_equal(result[1], 0)
+        assert result[1] == 0
         parsed_result = json.loads(result[0])
         entries = parsed_result['Topic Stats']['Entries']
         retries += 1
@@ -3137,7 +3125,7 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
         log.info('shards for %s has %d entries after %ds', topic_name, entries, time_diff)
         if retries > 100:
             log.warning('shards for %s still has %d entries after %ds', topic_name, entries, time_diff)
-            assert_equal(entries, 0)
+            assert entries == 0
         time.sleep(5)
     time_diff = time.time() - start_time
     log.info('waited for %ds for shards of %s to drain', time_diff, topic_name)
@@ -3178,7 +3166,7 @@ def persistent_topic_stats(conn, endpoint_type):
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     # create topic
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
@@ -3191,7 +3179,7 @@ def persistent_topic_stats(conn, endpoint_type):
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # topic stats
     get_stats_persistent_topic(topic_name, 0)
@@ -3254,28 +3242,28 @@ def persistent_topic_stats(conn, endpoint_type):
     receiver.close(task)
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_persistent_topic_stats_http():
     """ test persistent topic stats, http endpoint """
     conn = connection()
     persistent_topic_stats(conn, 'http')
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_persistent_topic_stats_kafka():
     """ test persistent topic stats, kafka endpoint """
     conn = connection()
     persistent_topic_stats(conn, 'kafka')
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_persistent_topic_stats_amqp():
     """ test persistent topic stats, amqp endpoint """
     conn = connection()
     persistent_topic_stats(conn, 'amqp')
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_persistent_topic_dump():
     """ test persistent topic dump """
     conn = connection()
@@ -3306,7 +3294,7 @@ def test_persistent_topic_dump():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 20
@@ -3324,9 +3312,9 @@ def test_persistent_topic_dump():
 
     # topic dump
     result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result), number_of_objects)
+    assert len(parsed_result) == number_of_objects
 
     # delete objects from the bucket
     client_threads = []
@@ -3341,9 +3329,9 @@ def test_persistent_topic_dump():
 
     # topic stats
     result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result), 2*number_of_objects)
+    assert len(parsed_result) == 2*number_of_objects
 
     # change the endpoint port
     endpoint_address = 'kafka://' + host
@@ -3355,9 +3343,9 @@ def test_persistent_topic_dump():
     wait_for_queue_to_drain(topic_name,)
 
     result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(len(parsed_result), 0)
+    assert len(parsed_result) == 0
 
     # cleanup
     s3_notification_conf.del_config()
@@ -3394,13 +3382,13 @@ def persistent_topic_configs(persistency_time, config_dict):
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # topic get
     parsed_result = get_topic(topic_name)
     parsed_result_dest = parsed_result["dest"]
     for key, value in config_dict.items():
-        assert_equal(parsed_result_dest[key], str(value))
+        assert parsed_result_dest[key] == str(value)
 
     # topic stats
     get_stats_persistent_topic(topic_name, 0)
@@ -3443,7 +3431,7 @@ def persistent_topic_configs(persistency_time, config_dict):
     [thr.join() for thr in client_threads]
     time_diff = time.time() - start_time
     log.info('deletion of %d objects took %f seconds', count, time_diff)
-    assert_equal(count, number_of_objects)
+    assert count == number_of_objects
 
     # topic stats
     if time_diff > persistency_time:
@@ -3469,7 +3457,7 @@ def create_persistency_config_string(config_dict):
 
     return str_ret[:-1]
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_persistent_topic_configs_ttl():
     """ test persistent topic configurations with time_to_live """
     config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"}
@@ -3477,7 +3465,7 @@ def test_persistent_topic_configs_ttl():
 
     persistent_topic_configs(persistency_time, config_dict)
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_persistent_topic_configs_max_retries():
     """ test persistent topic configurations with max_retries and retry_sleep_duration """
     config_dict = {"time_to_live": "None", "max_retries": 10, "retry_sleep_duration": 1}
@@ -3485,7 +3473,7 @@ def test_persistent_topic_configs_max_retries():
 
     persistent_topic_configs(persistency_time, config_dict)
 
-@attr('manual_test')
+@pytest.mark.manual_test
 def test_persistent_notificationback():
     """ test pushing persistent notification pushback """
     conn = connection()
@@ -3515,7 +3503,7 @@ def test_persistent_notificationback():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket (async)
     for j in range(100):
@@ -3566,7 +3554,7 @@ def test_persistent_notificationback():
     http_server.close()
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_notification_kafka_idle_behaviour():
     """ test pushing kafka notification idle behaviour check """
     conn = connection()
@@ -3597,7 +3585,7 @@ def test_notification_kafka_idle_behaviour():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -3699,16 +3687,16 @@ def test_notification_kafka_idle_behaviour():
     stop_kafka_receiver(receiver, task)
 
 
-@attr('not_implemented')
+@pytest.mark.not_implemented
 def test_persistent_gateways_recovery():
     """ test gateway recovery of persistent notifications """
-    return SkipTest('This test is yet to be implemented')
+    pytest.skip('This test is yet to be implemented')
 
 
-@attr('not_implemented')
+@pytest.mark.not_implemented
 def test_persistent_multiple_gateways():
     """ test pushing persistent notification via two gateways """
-    return SkipTest('This test is yet to be implemented')
+    pytest.skip('This test is yet to be implemented')
 
 
 def persistent_topic_multiple_endpoints(conn, endpoint_type):
@@ -3748,7 +3736,7 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type):
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     # create two topics
     topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
@@ -3766,14 +3754,14 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type):
                        }]
     s3_notification_conf1 = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf1.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
     notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
     topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
                          'Events': []
                        }]
     s3_notification_conf2 = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf2.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     client_threads = []
     start_time = time.time()
@@ -3812,14 +3800,14 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type):
     receiver.close(task)
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_persistent_multiple_endpoints_http():
     """ test pushing persistent notification when one of the endpoints has error, http endpoint """
     conn = connection()
     persistent_topic_multiple_endpoints(conn, 'http')
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_persistent_multiple_endpoints_kafka():
     """ test pushing persistent notification when one of the endpoints has error, kafka endpoint """
     conn = connection()
@@ -3860,7 +3848,7 @@ def persistent_notification(endpoint_type, conn, account=None):
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
 
     # create topic
@@ -3874,7 +3862,7 @@ def persistent_notification(endpoint_type, conn, account=None):
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 100
@@ -3921,13 +3909,13 @@ def persistent_notification(endpoint_type, conn, account=None):
     receiver.close(task)
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_persistent_notification_http():
     """ test pushing persistent notification http """
     conn = connection()
     persistent_notification('http', conn)
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_persistent_notification_http_account():
     """ test pushing persistent notification via http for account user """
 
@@ -3935,7 +3923,7 @@ def test_persistent_notification_http_account():
     user = UID_PREFIX + 'test'
 
     _, rc = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster())
-    assert_true(rc in [0, 17]) # EEXIST okay if we rerun
+    assert rc in [0, 17] # EEXIST okay if we rerun
 
     conn, _ = another_user(user=user, account=account)
     try:
@@ -3944,14 +3932,14 @@ def test_persistent_notification_http_account():
         admin(['user', 'rm', '--uid', user], get_config_cluster())
         admin(['account', 'rm', '--account-id', account], get_config_cluster())
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_persistent_notification_amqp():
     """ test pushing persistent notification amqp """
     conn = connection()
     persistent_notification('amqp', conn)
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_persistent_notification_kafka():
     """ test pushing persistent notification kafka """
     conn = connection()
@@ -3964,7 +3952,7 @@ def random_string(length):
     return ''.join(random.choice(letters) for i in range(length))
 
 
-@attr('amqp_test')
+@pytest.mark.amqp_test
 def test_persistent_notification_large_amqp():
     """ test pushing persistent notification of large notifications """
 
@@ -3999,7 +3987,7 @@ def test_persistent_notification_large_amqp():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 100
@@ -4050,29 +4038,29 @@ def test_persistent_notification_large_amqp():
     stop_amqp_receiver(receiver, task)
 
 
-@attr('not_implemented')
+@pytest.mark.not_implemented
 def test_topic_update():
     """ test updating topic associated with a notification"""
-    return SkipTest('This test is yet to be implemented')
+    pytest.skip('This test is yet to be implemented')
 
 
-@attr('not_implemented')
+@pytest.mark.not_implemented
 def test_notification_update():
     """ test updating the topic of a notification"""
-    return SkipTest('This test is yet to be implemented')
+    pytest.skip('This test is yet to be implemented')
 
 
-@attr('not_implemented')
+@pytest.mark.not_implemented
 def test_multiple_topics_notification():
     """ test notification creation with multiple topics"""
-    return SkipTest('This test is yet to be implemented')
+    pytest.skip('This test is yet to be implemented')
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_list_topics_migration():
     """ test list topics on migration"""
     if get_config_cluster() == 'noname':
-        return SkipTest('realm is needed for migration test')
+        pytest.skip('realm is needed for migration test')
     
     # Initialize connections and configurations
     conn1 = connection()
@@ -4143,21 +4131,21 @@ def test_list_topics_migration():
     try:
         # Verify no tenant topics
         res, status = topic_conf.get_list()
-        assert_equal(status // 100, 2)
+        assert status // 100 == 2
         listTopicsResponse = res.get('ListTopicsResponse', {})
         listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
         topics = listTopicsResult.get('Topics', {})
         member = topics['member'] if topics else []
-        assert_equal(len(member), 6)
+        assert len(member) == 6
         
         # Verify tenant topics
         res, status = tenant_topic_conf.get_list()
-        assert_equal(status // 100, 2)
+        assert status // 100 == 2
         listTopicsResponse = res.get('ListTopicsResponse', {})
         listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
         topics = listTopicsResult.get('Topics', {})
         member = topics['member'] if topics else []
-        assert_equal(len(member), 3)
+        assert len(member) == 3
     finally:
         # Cleanup created topics
         topic_conf.del_config(topic_arn1)
@@ -4171,7 +4159,7 @@ def test_list_topics_migration():
         tenant_topic_conf.del_config(tenant_topic_arn3)
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_list_topics():
     """ test list topics"""
     
@@ -4211,21 +4199,21 @@ def test_list_topics():
     try:
         # Verify no tenant topics
         res, status = topic_conf.get_list()
-        assert_equal(status // 100, 2)
+        assert status // 100 == 2
         listTopicsResponse = res.get('ListTopicsResponse', {})
         listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
         topics = listTopicsResult.get('Topics', {})
         member = topics['member'] if topics else [] # version 2
-        assert_equal(len(member), 3)
+        assert len(member) == 3
                
         # Verify topics for tenant
         res, status = tenant_topic_conf.get_list()
-        assert_equal(status // 100, 2)
+        assert status // 100 == 2
         listTopicsResponse = res.get('ListTopicsResponse', {})
         listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
         topics = listTopicsResult.get('Topics', {})
         member = topics['member'] if topics else []
-        assert_equal(len(member), 2)
+        assert len(member) == 2
     finally:
         # Cleanup created topics
         topic_conf.del_config(topic_arn1)
@@ -4234,11 +4222,11 @@ def test_list_topics():
         tenant_topic_conf.del_config(tenant_topic_arn1)
         tenant_topic_conf.del_config(tenant_topic_arn2)
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_list_topics_v1():
     """ test list topics on v1"""
     if get_config_cluster() == 'noname':
-        return SkipTest('realm is needed')
+        pytest.skip('realm is needed')
     
     # Initialize connections and configurations
     conn1 = connection()
@@ -4280,21 +4268,21 @@ def test_list_topics_v1():
     try:
         # Verify no tenant topics
         res, status = topic_conf.get_list()
-        assert_equal(status // 100, 2)
+        assert status // 100 == 2
         listTopicsResponse = res.get('ListTopicsResponse', {})
         listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
         topics = listTopicsResult.get('Topics', {})
         member = topics['member'] if topics else []
-        assert_equal(len(member), 3)
+        assert len(member) == 3
         
         # Verify tenant topics
         res, status = tenant_topic_conf.get_list()
-        assert_equal(status // 100, 2)
+        assert status // 100 == 2
         listTopicsResponse = res.get('ListTopicsResponse', {})
         listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
         topics = listTopicsResult.get('Topics', {})
         member = topics['member'] if topics else []
-        assert_equal(len(member), 2)
+        assert len(member) == 2
     finally:
         # Cleanup created topics
         topic_conf.del_config(topic_arn1)
@@ -4338,16 +4326,16 @@ def topic_permissions(another_tenant=""):
             assert False, "'AuthorizationError' error is expected"
         except ClientError as err:
             if 'Error' in err.response:
-                assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+                assert err.response['Error']['Code'] == 'AuthorizationError'
             else:
-                assert_equal(err.response['Code'], 'AuthorizationError')
+                assert err.response['Code'] == 'AuthorizationError'
         except Exception as err:
             print('unexpected error type: '+type(err).__name__)
             assert False, "'AuthorizationError' error is expected"
 
     # 2nd user tries to fetch the topic
     _, status = topic_conf2.get_config(topic_arn=topic_arn)
-    assert_equal(status, 403)
+    assert status == 403
 
     try:
         # 2nd user tries to set the attribute
@@ -4355,9 +4343,9 @@ def topic_permissions(another_tenant=""):
         assert False, "'AuthorizationError' error is expected"
     except ClientError as err:
         if 'Error' in err.response:
-            assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+            assert err.response['Error']['Code'] == 'AuthorizationError'
         else:
-            assert_equal(err.response['Code'], 'AuthorizationError')
+            assert err.response['Code'] == 'AuthorizationError'
     except Exception as err:
         print('unexpected error type: '+type(err).__name__)
         assert False, "'AuthorizationError' error is expected"
@@ -4374,9 +4362,9 @@ def topic_permissions(another_tenant=""):
         assert False, "'AccessDenied' error is expected"
     except ClientError as err:
         if 'Error' in err.response:
-            assert_equal(err.response['Error']['Code'], 'AccessDenied')
+            assert err.response['Error']['Code'] == 'AccessDenied'
         else:
-            assert_equal(err.response['Code'], 'AccessDenied')
+            assert err.response['Code'] == 'AccessDenied'
     except Exception as err:
         print('unexpected error type: '+type(err).__name__)
         assert False, "'AuthorizationError' error is expected"
@@ -4387,9 +4375,9 @@ def topic_permissions(another_tenant=""):
         assert False, "'AuthorizationError' error is expected"
     except ClientError as err:
         if 'Error' in err.response:
-            assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+            assert err.response['Error']['Code'] == 'AuthorizationError'
         else:
-            assert_equal(err.response['Code'], 'AuthorizationError')
+            assert err.response['Code'] == 'AuthorizationError'
     except Exception as err:
         print('unexpected error type: '+type(err).__name__)
         assert False, "'AuthorizationError' error is expected"
@@ -4400,17 +4388,17 @@ def topic_permissions(another_tenant=""):
     topic_arn = topic_conf.set_config()
     # 2nd user try to fetch topic again
     _, status = topic_conf2.get_config(topic_arn=topic_arn)
-    assert_equal(status, 200)
+    assert status == 200
     # 2nd user tries to set the attribute again
     status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
-    assert_equal(status, 200)
+    assert status == 200
     # 2nd user tries to publish notification again
     s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
     _, status = s3_notification_conf2.set_config()
-    assert_equal(status, 200)
+    assert status == 200
     # 2nd user tries to delete the topic again
     status = topic_conf2.del_config(topic_arn=topic_arn)
-    assert_equal(status, 200)
+    assert status == 200
 
     # cleanup
     s3_notification_conf2.del_config()
@@ -4418,17 +4406,17 @@ def topic_permissions(another_tenant=""):
     conn2.delete_bucket(bucket_name)
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_topic_permissions_same_tenant():
     topic_permissions()
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_topic_permissions_cross_tenant():
     topic_permissions(another_tenant="boom")
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_topic_no_permissions():
     """ test topic set/get/delete permissions """
     conn1 = connection()
@@ -4450,15 +4438,15 @@ def test_topic_no_permissions():
         assert False, "'AuthorizationError' error is expected"
     except ClientError as err:
         if 'Error' in err.response:
-            assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+            assert err.response['Error']['Code'] == 'AuthorizationError'
         else:
-            assert_equal(err.response['Code'], 'AuthorizationError')
+            assert err.response['Code'] == 'AuthorizationError'
     except Exception as err:
         print('unexpected error type: '+type(err).__name__)
 
     # 2nd user tries to fetch the topic
     _, status = topic_conf2.get_config(topic_arn=topic_arn)
-    assert_equal(status, 403)
+    assert status == 403
 
     try:
         # 2nd user tries to set the attribute
@@ -4466,9 +4454,9 @@ def test_topic_no_permissions():
         assert False, "'AuthorizationError' error is expected"
     except ClientError as err:
         if 'Error' in err.response:
-            assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+            assert err.response['Error']['Code'] == 'AuthorizationError'
         else:
-            assert_equal(err.response['Code'], 'AuthorizationError')
+            assert err.response['Code'] == 'AuthorizationError'
     except Exception as err:
         print('unexpected error type: '+type(err).__name__)
 
@@ -4481,7 +4469,7 @@ def test_topic_no_permissions():
                        }]
     s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
     _, status = s3_notification_conf2.set_config()
-    assert_equal(status, 200)
+    assert status == 200
 
     try:
         # 2nd user tries to delete the topic
@@ -4489,9 +4477,9 @@ def test_topic_no_permissions():
         assert False, "'AuthorizationError' error is expected"
     except ClientError as err:
         if 'Error' in err.response:
-            assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+            assert err.response['Error']['Code'] == 'AuthorizationError'
         else:
-            assert_equal(err.response['Code'], 'AuthorizationError')
+            assert err.response['Code'] == 'AuthorizationError'
     except Exception as err:
         print('unexpected error type: '+type(err).__name__)
 
@@ -4594,41 +4582,41 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
         stop_kafka_receiver(receiver, task)
 
 
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
 def test_notification_kafka_security_ssl():
     kafka_security('SSL')
 
 
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
 def test_notification_kafka_security_ssl_sasl():
     kafka_security('SASL_SSL')
 
 
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
 def test_notification_kafka_security_ssl_sasl_attrs():
     kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
 
 
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
 def test_notification_kafka_security_sasl():
     kafka_security('SASL_PLAINTEXT')
 
 
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
 def test_notification_kafka_security_ssl_sasl_scram():
     kafka_security('SASL_SSL', mechanism='SCRAM-SHA-256')
 
 
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
 def test_notification_kafka_security_sasl_scram():
     kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_persistent_reload():
     """ do a realm reload while we send notifications """
     if get_config_cluster() == 'noname':
-        return SkipTest('realm is needed for reload test')
+        pytest.skip('realm is needed for reload test')
 
     conn = connection()
     zonegroup = get_config_zonegroup()
@@ -4670,7 +4658,7 @@ def test_persistent_reload():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # topic stats
     get_stats_persistent_topic(topic_name1, 0)
@@ -4749,7 +4737,7 @@ def poll_on_topic(topic_name, tenant=''):
 def persistent_data_path_v2_migration(conn, endpoint_type):
     """ test data path v2 persistent migration """
     if get_config_cluster() == 'noname':
-        return SkipTest('realm is needed for migration test')
+        pytest.skip('realm is needed for migration test')
     zonegroup = get_config_zonegroup()
 
     # disable v2 notification
@@ -4788,7 +4776,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type):
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
                         '&retry_sleep_duration=1'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
@@ -4800,7 +4788,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type):
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # topic stats
     get_stats_persistent_topic(topic_name, 0)
@@ -4877,25 +4865,25 @@ def persistent_data_path_v2_migration(conn, endpoint_type):
         receiver.close(task)
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def persistent_data_path_v2_migration_http():
     """ test data path v2 persistent migration, http endpoint """
     conn = connection()
     persistent_data_path_v2_migration(conn, 'http')
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def persistent_data_path_v2_migration_kafka():
     """ test data path v2 persistent migration, kafka endpoint """
     conn = connection()
     persistent_data_path_v2_migration(conn, 'kafka')
 
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_data_path_v2_migration():
     """ test data path v2 migration """
     if get_config_cluster() == 'noname':
-        return SkipTest('realm is needed for migration test')
+        pytest.skip('realm is needed for migration test')
     conn = connection()
     zonegroup = get_config_zonegroup()
 
@@ -4927,7 +4915,7 @@ def test_data_path_v2_migration():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -4989,11 +4977,11 @@ def test_data_path_v2_migration():
     http_server.close()
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_data_path_v2_large_migration():
     """ test data path v2 large migration """
     if get_config_cluster() == 'noname':
-        return SkipTest('realm is needed for migration test')
+        pytest.skip('realm is needed for migration test')
     conn = connection()
     connections_list = []
     connections_list.append(conn)
@@ -5045,7 +5033,7 @@ def test_data_path_v2_large_migration():
         s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
         s3_notification_conf_list.append(s3_notification_conf)
         response, status = s3_notification_conf.set_config()
-        assert_equal(status / 100, 2)
+        assert status / 100 == 2
 
     # create topic to poll on
     polling_topics_conf = []
@@ -5082,11 +5070,11 @@ def test_data_path_v2_large_migration():
         conn.delete_bucket(bucket.name)
 
 
-@attr('basic_test')
+@pytest.mark.basic_test
 def test_data_path_v2_mixed_migration():
     """ test data path v2 mixed migration """
     if get_config_cluster() == 'noname':
-        return SkipTest('realm is needed for migration test')
+        pytest.skip('realm is needed for migration test')
     conn = connection()
     connections_list = []
     connections_list.append(conn)
@@ -5139,7 +5127,7 @@ def test_data_path_v2_mixed_migration():
         s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
         s3_notification_conf_list.append(s3_notification_conf)
         response, status = s3_notification_conf.set_config()
-        assert_equal(status / 100, 2)
+        assert status / 100 == 2
 
     # disable v2 notification
     zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
@@ -5165,7 +5153,7 @@ def test_data_path_v2_mixed_migration():
         s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
         s3_notification_conf_list.append(s3_notification_conf)
         response, status = s3_notification_conf.set_config()
-        assert_equal(status / 100, 2)
+        assert status / 100 == 2
 
     # create topic to poll on
     polling_topics_conf = []
@@ -5199,7 +5187,7 @@ def test_data_path_v2_mixed_migration():
         conn.delete_bucket(bucket.name)
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_notification_caching():
     """ test notification caching """
     conn = connection()
@@ -5229,7 +5217,7 @@ def test_notification_caching():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status / 100, 2)
+    assert status / 100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -5265,9 +5253,9 @@ def test_notification_caching():
     # topic stats
     result = admin(['topic', 'stats', '--topic', topic_name],
                    get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+    assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects
 
     # remove the port and update the topic, so its pointing to correct endpoint.
     endpoint_address = 'kafka://' + default_kafka_server
@@ -5286,7 +5274,7 @@ def test_notification_caching():
     receiver.close(task)
 
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_connection_caching():
     """ test connection caching """
     conn = connection()
@@ -5324,7 +5312,7 @@ def test_connection_caching():
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
-    assert_equal(status / 100, 2)
+    assert status / 100 == 2
 
     # create objects in the bucket (async)
     number_of_objects = 10
@@ -5360,9 +5348,9 @@ def test_connection_caching():
     # topic stats
     result = admin(['topic', 'stats', '--topic', topic_name_1],
                    get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+    assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects
 
     # remove the ssl from topic1 and update the topic.
     endpoint_address = 'kafka://' + default_kafka_server
@@ -5374,9 +5362,9 @@ def test_connection_caching():
     # topic stats for 2nd topic will still reflect entries
     result = admin(['topic', 'stats', '--topic', topic_name_2],
                    get_config_cluster())
-    assert_equal(result[1], 0)
+    assert result[1] == 0
     parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+    assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects
 
     # cleanup
     s3_notification_conf.del_config()
@@ -5388,7 +5376,7 @@ def test_connection_caching():
     receiver_2.close(task_2)
 
 
-@attr("http_test")
+@pytest.mark.http_test
 def test_topic_migration_to_an_account():
     """test the topic migration procedure described at
     https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account
@@ -5433,7 +5421,7 @@ def test_topic_migration_to_an_account():
             user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
         )
         topic_arn = topic_conf.set_config()
-        assert_equal(topic_arn, expected_topic_arn)
+        assert topic_arn == expected_topic_arn
         log.info(
             f"{user1_conn.access_key} created the topic {topic_arn} with args {endpoint_args}"
         )
@@ -5454,18 +5442,18 @@ def test_topic_migration_to_an_account():
             user2_conn, user2_bucket_name, topic_conf_list
         )
         _, status = s3_notification_conf1.set_config()
-        assert_equal(status / 100, 2)
+        assert status / 100 == 2
         _, status = s3_notification_conf2.set_config()
-        assert_equal(status / 100, 2)
+        assert status / 100 == 2
         # verify both buckets are subscribed to the topic
         rgw_topic_entry = [
             t for t in list_topics() if t["name"] == topic_name
         ]
-        assert_equal(len(rgw_topic_entry), 1)
+        assert len(rgw_topic_entry) == 1
         subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
-        assert_equal(len(subscribed_buckets), 2)
-        assert_in(user1_bucket_name, subscribed_buckets)
-        assert_in(user2_bucket_name, subscribed_buckets)
+        assert len(subscribed_buckets) == 2
+        assert user1_bucket_name in subscribed_buckets
+        assert user2_bucket_name in subscribed_buckets
         log.info(
             "buckets {user1_bucket_name} and {user2_bucket_name} are subscribed to {topic_arn}"
         )
@@ -5507,7 +5495,7 @@ def test_topic_migration_to_an_account():
             user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
         )
         account_topic_arn = topic_conf.set_config()
-        assert_equal(account_topic_arn, expected_topic_arn)
+        assert account_topic_arn == expected_topic_arn
         log.info(
             f"{user1_conn.access_key} created the account topic {account_topic_arn} with args {endpoint_args}"
         )
@@ -5533,23 +5521,23 @@ def test_topic_migration_to_an_account():
             user1_conn, user1_bucket_name, topic_conf_list
         )
         _, status = s3_notification_conf1.set_config()
-        assert_equal(status / 100, 2)
+        assert status / 100 == 2
         # verify subscriptions to the account topic
         rgw_topic_entry = [
             t
             for t in list_topics(tenant=account_id)
             if t["name"] == topic_name
         ]
-        assert_equal(len(rgw_topic_entry), 1)
+        assert len(rgw_topic_entry) == 1
         subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
-        assert_equal(len(subscribed_buckets), 1)
-        assert_in(user1_bucket_name, subscribed_buckets)
-        assert_not_in(user2_bucket_name, subscribed_buckets)
+        assert len(subscribed_buckets) == 1
+        assert user1_bucket_name in subscribed_buckets
+        assert user2_bucket_name not in subscribed_buckets
         # verify bucket notifications while 2 test buckets are in the mixed mode
         notification_list = list_notifications(user1_bucket_name, assert_len=1)
-        assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+        assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn
         notification_list = list_notifications(user2_bucket_name, assert_len=1)
-        assert_equal(notification_list["notifications"][0]["TopicArn"], topic_arn)
+        assert notification_list["notifications"][0]["TopicArn"] == topic_arn
 
         # verify both topics are functional at the same time with no duplicate notifications
         user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
@@ -5565,12 +5553,12 @@ def test_topic_migration_to_an_account():
             user2_conn, user2_bucket_name, topic_conf_list
         )
         _, status = s3_notification_conf2.set_config()
-        assert_equal(status / 100, 2)
+        assert status / 100 == 2
         # remove old topic
         # note that, although account topic has the same name, it has to be scoped by account/tenant id to be removed
         # so below command will only remove the old topic
         _, rc = admin(["topic", "rm", "--topic", topic_name], get_config_cluster())
-        assert_equal(rc, 0)
+        assert rc == 0
 
         # now verify account topic serves both buckets
         rgw_topic_entry = [
@@ -5578,16 +5566,16 @@ def test_topic_migration_to_an_account():
             for t in list_topics(tenant=account_id)
             if t["name"] == topic_name
         ]
-        assert_equal(len(rgw_topic_entry), 1)
+        assert len(rgw_topic_entry) == 1
         subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
-        assert_equal(len(subscribed_buckets), 2)
-        assert_in(user1_bucket_name, subscribed_buckets)
-        assert_in(user2_bucket_name, subscribed_buckets)
+        assert len(subscribed_buckets) == 2
+        assert user1_bucket_name in subscribed_buckets
+        assert user2_bucket_name in subscribed_buckets
         # verify bucket notifications after 2 test buckets are updated to use the account topic
         notification_list = list_notifications(user1_bucket_name, assert_len=1)
-        assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+        assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn
         notification_list = list_notifications(user2_bucket_name, assert_len=1)
-        assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+        assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn
 
         # finally, make sure that notifications are going thru via the new account topic
         user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
@@ -5645,7 +5633,7 @@ def persistent_notification_shard_config_change(endpoint_type, conn, new_num_sha
         endpoint_address = 'kafka://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
     else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+        pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
     zonegroup = get_config_zonegroup()
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
@@ -5659,7 +5647,7 @@ def persistent_notification_shard_config_change(endpoint_type, conn, new_num_sha
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+    assert status/100 == 2
 
     ## create objects in the bucket (async)
     expected_keys = []
@@ -5692,7 +5680,7 @@ def create_object_and_verify_events(bucket, key_name, topic_name, receiver, expe
     print('wait for the messages...')
     wait_for_queue_to_drain(topic_name)
     events = receiver.get_and_reset_events()
-    assert_equal(len(events), len(expected_keys))
+    assert len(events) == len(expected_keys)
     for event in events:
         assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
 
@@ -5704,28 +5692,28 @@ def create_object_and_verify_events(bucket, key_name, topic_name, receiver, expe
         wait_for_queue_to_drain(topic_name)
         # check endpoint receiver
         events = receiver.get_and_reset_events()
-        assert_equal(len(events), len(expected_keys))
+        assert len(events) == len(expected_keys)
         for event in events:
             assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_backward_compatibility_persistent_sharded_topic_http(): 
     conn = connection()
     persistent_notification_shard_config_change('http', conn, new_num_shards=11, old_num_shards=1)
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_backward_compatibility_persistent_sharded_topic_kafka(): 
     conn = connection()
     persistent_notification_shard_config_change('kafka', conn, new_num_shards=11, old_num_shards=1)
 
-@attr('http_test')
+@pytest.mark.http_test
 def test_persistent_sharded_topic_config_change_http():
     conn = connection()
     new_num_shards = random.randint(2, 10)
     default_num_shards = 11
     persistent_notification_shard_config_change('http', conn, new_num_shards, default_num_shards)
 
-@attr('kafka_test')
+@pytest.mark.kafka_test
 def test_persistent_sharded_topic_config_change_kafka():
     conn = connection()
     new_num_shards = random.randint(2, 10)
diff --git a/src/test/rgw/bucket_notification/tox.ini b/src/test/rgw/bucket_notification/tox.ini
new file mode 100644 (file)
index 0000000..954d550
--- /dev/null
@@ -0,0 +1,9 @@
+[tox]
+envlist = py
+skipsdist = True
+
+[testenv]
+deps = -rrequirements.txt
+passenv =
+  BNTESTS_CONF
+commands = pytest {posargs}