From 0d8a70b6d6c1278a207a4910109cb37fa785c56f Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Thu, 19 Feb 2026 16:55:24 +0000 Subject: [PATCH] test/rgw/notifications: migrate from nose to pytest Fixes: https://tracker.ceph.com/issues/74573 Signed-off-by: Yuval Lifshitz Co-Authored-By: Claude --- qa/tasks/notification_tests.py | 8 +- src/test/rgw/bucket_notification/README.rst | 8 +- src/test/rgw/bucket_notification/__init__.py | 5 + src/test/rgw/bucket_notification/bootstrap | 12 - src/test/rgw/bucket_notification/pytest.ini | 11 + .../rgw/bucket_notification/requirements.txt | 2 +- src/test/rgw/bucket_notification/setup.py | 19 - src/test/rgw/bucket_notification/test_bn.py | 630 +++++++++--------- src/test/rgw/bucket_notification/tox.ini | 9 + 9 files changed, 343 insertions(+), 361 deletions(-) create mode 100644 src/test/rgw/bucket_notification/pytest.ini delete mode 100644 src/test/rgw/bucket_notification/setup.py create mode 100644 src/test/rgw/bucket_notification/tox.ini diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index cc84b1575a5..a4b0fab97ad 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -220,19 +220,19 @@ def run_tests(ctx, config): for client, client_config in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() - attr = ["basic_test"] + markers = ["basic_test"] if 'extra_attr' in client_config: - attr = client_config.get('extra_attr') + markers = client_config.get('extra_attr') args = [ 'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client), '{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir), - '-m', 'nose', + '-m', 'pytest', '-s', '{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir), '-v', - '-a', ','.join(attr), + '-m', ' or '.join(markers), ] remote.run( diff --git a/src/test/rgw/bucket_notification/README.rst b/src/test/rgw/bucket_notification/README.rst index 02d42a2abd6..db58e8cfcc7 100644 --- a/src/test/rgw/bucket_notification/README.rst +++ b/src/test/rgw/bucket_notification/README.rst @@ -57,7 +57,7 @@ and:: After running `vstart.sh`, Zookeeper, and Kafka services you're ready to run the Kafka tests:: - BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_test' + BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_test' -------------------- Kafka Security Tests @@ -120,7 +120,7 @@ And restart the Kafka server. Once both Zookeeper and Kafka are up, run the foll To run the Kafka security test, you also need to provide the test with the location of the Kafka directory:: - KAFKA_DIR=/path/to/kafka BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_security_test' + KAFKA_DIR=/path/to/kafka BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test' ============== RabbitMQ Tests @@ -148,7 +148,7 @@ To confirm that the RabbitMQ server is running you can run the following command After running `vstart.sh` and RabbitMQ server you're ready to run the AMQP tests:: - BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_test' + BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'amqp_test' After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command:: @@ -156,7 +156,7 @@ After running the tests you need to stop the vstart cluster (``/path/to/ceph/src To run the RabbitMQ SSL security tests use the following:: - BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_ssl_test' + BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'amqp_ssl_test' During these tests, the test script will restart the RabbitMQ server with the correct security configuration (``sudo`` privileges will be needed). For that reason it is not recommended to run the `amqp_ssl_test` tests, that assumes a manually configured rabbirmq server, in the same run as `amqp_test` tests, diff --git a/src/test/rgw/bucket_notification/__init__.py b/src/test/rgw/bucket_notification/__init__.py index 97d6cf3c5a2..954c6575bb6 100644 --- a/src/test/rgw/bucket_notification/__init__.py +++ b/src/test/rgw/bucket_notification/__init__.py @@ -1,5 +1,6 @@ import configparser import os +import pytest from .api import admin def setup(): @@ -72,3 +73,7 @@ def get_access_key(): def get_secret_key(): global main_secret_key return main_secret_key + +@pytest.fixture(autouse=True, scope="package") +def configfile(): + setup() diff --git a/src/test/rgw/bucket_notification/bootstrap b/src/test/rgw/bucket_notification/bootstrap index 4d4a5a74878..4d680c86d2f 100755 --- a/src/test/rgw/bucket_notification/bootstrap +++ b/src/test/rgw/bucket_notification/bootstrap @@ -29,17 +29,5 @@ python3 -m venv --system-site-packages virtualenv # avoid pip bugs ./virtualenv/bin/pip install --upgrade pip -#pip3 install --upgrade setuptools cffi # address pip issue: https://github.com/pypa/pip/issues/6264 - -# work-around change in pip 1.5 -#./virtualenv/bin/pip install six -#./virtualenv/bin/pip install -I nose -#./virtualenv/bin/pip install setuptools ./virtualenv/bin/pip install -U -r requirements.txt - -# forbid setuptools from using the network because it'll try to use -# easy_install, and we really wanted pip; next line will fail if pip -# requirements.txt does not match setup.py requirements -- sucky but -# good enough for now -./virtualenv/bin/python setup.py develop diff --git a/src/test/rgw/bucket_notification/pytest.ini b/src/test/rgw/bucket_notification/pytest.ini new file mode 100644 index 00000000000..9d80d738781 --- /dev/null +++ b/src/test/rgw/bucket_notification/pytest.ini @@ -0,0 +1,11 @@ +[pytest] +markers = + basic_test + http_test + kafka_test + amqp_test + kafka_failover + kafka_security_test + amqp_ssl_test + manual_test + not_implemented diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt index d57533fbd87..14885b11f16 100644 --- a/src/test/rgw/bucket_notification/requirements.txt +++ b/src/test/rgw/bucket_notification/requirements.txt @@ -1,4 +1,4 @@ -nose-py3 >=1.0.0 +pytest boto3 >=1.0.0 configparser >=5.0.0 kafka-python >=2.0.0 diff --git a/src/test/rgw/bucket_notification/setup.py b/src/test/rgw/bucket_notification/setup.py deleted file mode 100644 index 189ab27b4f4..00000000000 --- a/src/test/rgw/bucket_notification/setup.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/python -from setuptools import setup, find_packages - -setup( - name='bn_tests', - version='0.0.1', - packages=find_packages(), - - author='Kalpesh Pandya', - author_email='kapandya@redhat.com', - description='Bucket Notification compatibility tests', - license='MIT', - keywords='bn web testing', - - install_requires=[ - 'boto >=2.0b4', - 'boto3 >=1.0.0' - ], - ) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 81879ef76f0..830b6e48ffe 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -16,8 +16,7 @@ from botocore.exceptions import ClientError from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from random import randint import hashlib -# XXX this should be converted to use pytest -from nose.plugins.attrib import attr +import pytest import boto3 import datetime from cloudevents.http import from_http @@ -25,6 +24,7 @@ from dateutil import parser import requests from . import( + configfile, get_config_host, get_config_port, get_config_zonegroup, @@ -46,8 +46,6 @@ from .api import PSTopicS3, \ S3Key, \ MultipartUpload -from nose import SkipTest -from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true # configure logging for the tests module class LogWrapper: @@ -124,13 +122,13 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa if key_found: break for record in record_list['Records']: - assert_in('eTag', record['s3']['object']) + assert 'eTag' in record['s3']['object'] if record['s3']['bucket']['name'] == key.bucket.name and \ record['s3']['object']['key'] == key.name: # Assertion Error needs to be fixed - #assert_equal(key.etag[1:-1], record['s3']['object']['eTag']) + #assert key.etag[1:-1] == record['s3']['object']['eTag'] if etags: - assert_in(key.etag[1:-1], etags) + assert key.etag[1:-1] in etags if len(record['s3']['object']['metadata']) > 0: for meta in record['s3']['object']['metadata']: assert(meta['key'].startswith(META_PREFIX)) @@ -144,12 +142,12 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa break else: for record in records['Records']: - assert_in('eTag', record['s3']['object']) + assert 'eTag' in record['s3']['object'] if record['s3']['bucket']['name'] == key.bucket.name and \ record['s3']['object']['key'] == key.name: - assert_equal(key.etag, record['s3']['object']['eTag']) + assert key.etag == record['s3']['object']['eTag'] if etags: - assert_in(key.etag[1:-1], etags) + assert key.etag[1:-1] in etags if len(record['s3']['object']['metadata']) > 0: for meta in record['s3']['object']['metadata']: assert(meta['key'].startswith(META_PREFIX)) @@ -166,7 +164,7 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) assert False, err elif expected_sizes: - assert_equal(object_size, expected_sizes.get(key.name)) + assert object_size == expected_sizes.get(key.name) if not len(records) == len(keys): err = 'superfluous records are found' @@ -192,13 +190,13 @@ class HTTPPostHandler(BaseHTTPRequestHandler): if self.server.cloudevents: event = from_http(self.headers, body) record = json.loads(body)['Records'][0] - assert_equal(event['specversion'], '1.0') - assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2']) - assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name']) - assert_equal(event['type'], 'com.amazonaws.' + record['eventName']) - assert_equal(event['datacontenttype'], 'application/json') - assert_equal(event['subject'], record['s3']['object']['key']) - assert_equal(parser.parse(event['time']), parser.parse(record['eventTime'])) + assert event['specversion'] == '1.0' + assert event['id'] == record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'] + assert event['source'] == 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'] + assert event['type'] == 'com.amazonaws.' + record['eventName'] + assert event['datacontenttype'] == 'application/json' + assert event['subject'] == record['s3']['object']['key'] + assert parser.parse(event['time']) == parser.parse(record['eventTime']) log.info('HTTP Server received event: %s', str(body)) self.server.append(json.loads(body)) if self.headers.get('Expect') == '100-continue': @@ -558,8 +556,8 @@ def verify_kafka_receiver(receiver): remaining_retries -= 1 if remaining_retries == 0: raise Exception('kafka receiver on topic: %s did not receive test event in time', receiver.topic) - assert_equal(len(events), 1) - assert_in('test', events[0]) + assert len(events) == 1 + assert 'test' in events[0] log.info('Kafka receiver on topic: %s tested ok', receiver.topic) @@ -603,7 +601,7 @@ def another_user(user=None, tenant=None, account=None): arn = f'arn:aws:iam::{account}:user/Superman' _, rc = admin(cmd, get_config_cluster()) - assert_equal(rc, 0) + assert rc == 0 conn = S3Connection(aws_access_key_id=access_key, aws_secret_access_key=secret_key, @@ -616,7 +614,7 @@ def list_topics(assert_len=None, tenant=''): result = admin(['topic', 'list'], get_config_cluster()) else: result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) try: actual_len = len(parsed_result['topics']) @@ -630,7 +628,7 @@ def list_topics(assert_len=None, tenant=''): def get_stats_persistent_topic(topic_name, assert_entries_number=None): result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) if assert_entries_number: actual_number = parsed_result['Topic Stats']['Entries'] @@ -652,7 +650,7 @@ def get_topic(topic_name, tenant='', allow_failure=False): result = admin(['topic', 'get', '--topic', topic_name, '--tenant', tenant], get_config_cluster()) if allow_failure: return result - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) return parsed_result @@ -663,7 +661,7 @@ def remove_topic(topic_name, tenant='', allow_failure=False): else: result = admin(['topic', 'rm', '--topic', topic_name, '--tenant', tenant], get_config_cluster()) if not allow_failure: - assert_equal(result[1], 0) + assert result[1] == 0 return result[1] @@ -672,7 +670,7 @@ def list_notifications(bucket_name, assert_len=None, tenant=''): result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) else: result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) actual_len = len(parsed_result['notifications']) if assert_len and assert_len != actual_len: @@ -686,9 +684,9 @@ def get_notification(bucket_name, notification_name, tenant=''): result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster()) else: result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name, '--tenant', tenant], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Id'], notification_name) + assert parsed_result['Id'] == notification_name return parsed_result @@ -700,7 +698,7 @@ def remove_notification(bucket_name, notification_name='', tenant='', allow_fail args.extend(['--tenant', tenant]) result = admin(args, get_config_cluster()) if not allow_failure: - assert_equal(result[1], 0) + assert result[1] == 0 return result[1] @@ -713,11 +711,11 @@ def zonegroup_modify_feature(enable, feature_name): else: command = '--disable-feature='+feature_name result = admin(['zonegroup', 'modify', command], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 result = admin(['period', 'update'], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 result = admin(['period', 'commit'], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 def connect_random_user(tenant=''): @@ -728,7 +726,7 @@ def connect_random_user(tenant=''): _, rc = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) else: _, rc = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) - assert_equal(rc, 0) + assert rc == 0 conn = S3Connection(aws_access_key_id=access_key, aws_secret_access_key=secret_key, is_secure=False, port=get_config_port(), host=get_config_host()) @@ -739,7 +737,7 @@ def connect_random_user(tenant=''): ############## -@attr('basic_test') +@pytest.mark.basic_test def test_topic(): """ test topics set/get/delete """ @@ -759,51 +757,48 @@ def test_topic(): endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf1.set_config() - assert_equal(topic_arn, - 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1') + assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1' endpoint_address = 'http://127.0.0.1:9001' endpoint_args = 'push-endpoint='+endpoint_address topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf2.set_config() - assert_equal(topic_arn, - 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2') + assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2' endpoint_address = 'http://127.0.0.1:9002' endpoint_args = 'push-endpoint='+endpoint_address topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf3.set_config() - assert_equal(topic_arn, - 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3') + assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3' # get topic 3 result, status = topic_conf3.get_config() - assert_equal(status, 200) - assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) - assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) + assert status == 200 + assert topic_arn == result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'] + assert endpoint_address == result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'] # Note that endpoint args may be ordered differently in the result # delete topic 1 result = topic_conf1.del_config() - assert_equal(status, 200) + assert status == 200 # try to get a deleted topic _, status = topic_conf1.get_config() - assert_equal(status, 404) + assert status == 404 # get the remaining 2 topics list_topics(2, tenant) # delete topics status = topic_conf2.del_config() - assert_equal(status, 200) + assert status == 200 status = topic_conf3.del_config() - assert_equal(status, 200) + assert status == 200 # get topic list, make sure it is empty list_topics(0, tenant) -@attr('basic_test') +@pytest.mark.basic_test def test_topic_admin(): """ test topics set/get/delete """ @@ -823,46 +818,40 @@ def test_topic_admin(): endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() - assert_equal(topic_arn1, - 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1') + assert topic_arn1 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1' endpoint_address = 'http://127.0.0.1:9001' endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() - assert_equal(topic_arn2, - 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2') + assert topic_arn2 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2' endpoint_address = 'http://127.0.0.1:9002' endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args) topic_arn3 = topic_conf3.set_config() - assert_equal(topic_arn3, - 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3') + assert topic_arn3 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3' # get topic 3 via commandline parsed_result = get_topic(topic_name+'_3', tenant) - assert_equal(parsed_result['arn'], topic_arn3) + assert parsed_result['arn'] == topic_arn3 matches = [tenant, UID_PREFIX] - assert_true( all([x in parsed_result['owner'] for x in matches])) - assert_equal(parsed_result['dest']['persistent_queue'], - tenant + ":" + topic_name + '_3') + assert all([x in parsed_result['owner'] for x in matches]) + assert parsed_result['dest']['persistent_queue'] == tenant + ":" + topic_name + '_3' # recall CreateTopic and verify the owner and persistent_queue remain same. topic_conf3 = PSTopicS3(conn, topic_name + '_3', zonegroup, endpoint_args=endpoint_args) topic_arn3 = topic_conf3.set_config() - assert_equal(topic_arn3, - 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3') + assert topic_arn3 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3' # get topic 3 via commandline result = admin( ['topic', 'get', '--topic', topic_name + '_3', '--tenant', tenant], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(parsed_result['arn'], topic_arn3) - assert_true(all([x in parsed_result['owner'] for x in matches])) - assert_equal(parsed_result['dest']['persistent_queue'], - tenant + ":" + topic_name + '_3') + assert parsed_result['arn'] == topic_arn3 + assert all([x in parsed_result['owner'] for x in matches]) + assert parsed_result['dest']['persistent_queue'] == tenant + ":" + topic_name + '_3' # delete topic 3 remove_topic(topic_name + '_3', tenant) @@ -870,7 +859,7 @@ def test_topic_admin(): # try to get a deleted topic _, result = get_topic(topic_name + '_3', tenant, allow_failure=True) print('"topic not found" error is expected') - assert_equal(result, 2) + assert result == 2 # get the remaining 2 topics list_topics(2, tenant) @@ -899,8 +888,7 @@ def notification_configuration(with_cli): endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() - assert_equal(topic_arn, - 'arn:aws:sns:' + zonegroup + '::' + topic_name) + assert topic_arn == 'arn:aws:sns:' + zonegroup + '::' + topic_name # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [ @@ -943,45 +931,45 @@ def notification_configuration(with_cli): }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # get notification 1 if with_cli: get_notification(bucket_name, notification_name+'_1') else: response, status = s3_notification_conf.get_config(notification=notification_name+'_1') - assert_equal(status/100, 2) - assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) + assert status/100 == 2 + assert response['NotificationConfiguration']['TopicConfiguration']['Topic'] == topic_arn # list notification if with_cli: list_notifications(bucket_name, 3) else: result, status = s3_notification_conf.get_config() - assert_equal(status, 200) - assert_equal(len(result['TopicConfigurations']), 3) + assert status == 200 + assert len(result['TopicConfigurations']) == 3 # delete notification 2 if with_cli: remove_notification(bucket_name, notification_name + '_2') else: _, status = s3_notification_conf.del_config(notification=notification_name+'_2') - assert_equal(status/100, 2) + assert status/100 == 2 # list notification if with_cli: list_notifications(bucket_name, 2) else: result, status = s3_notification_conf.get_config() - assert_equal(status, 200) - assert_equal(len(result['TopicConfigurations']), 2) + assert status == 200 + assert len(result['TopicConfigurations']) == 2 # delete notifications if with_cli: remove_notification(bucket_name) else: _, status = s3_notification_conf.del_config() - assert_equal(status/100, 2) + assert status/100 == 2 # list notification, make sure it is empty list_notifications(bucket_name, 0) @@ -992,25 +980,25 @@ def notification_configuration(with_cli): conn.delete_bucket(bucket_name) -@attr('basic_test') +@pytest.mark.basic_test def test_notification_configuration_admin(): """ test notification list/set/get/delete, with admin cli """ notification_configuration(True) -@attr('not_implemented') +@pytest.mark.not_implemented def test_topic_with_secret(): """ test topics with secret set/get/delete """ - return SkipTest('This test is yet to be implemented') + pytest.skip('This test is yet to be implemented') -@attr('basic_test') +@pytest.mark.basic_test def test_notification_configuration(): """ test notification set/get/deleter """ notification_configuration(False) -@attr('basic_test') +@pytest.mark.basic_test def test_notification_empty_config(): """ test notification set/get/delete with empty config """ hostname = get_ip() @@ -1038,26 +1026,26 @@ def test_notification_empty_config(): }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # get notifications on a bucket response, status = s3_notification_conf.get_config(notification=notification_name+'_1') - assert_equal(status/100, 2) - assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) + assert status/100 == 2 + assert response['NotificationConfiguration']['TopicConfiguration']['Topic'] == topic_arn # create notification again with empty configuration to check if it deletes or not topic_conf_list = [] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # make sure that the notification is now deleted response, status = s3_notification_conf.get_config() try: check = response['NotificationConfiguration'] except KeyError as e: - assert_equal(status/100, 2) + assert status/100 == 2 else: assert False @@ -1067,7 +1055,7 @@ def test_notification_empty_config(): conn.delete_bucket(bucket_name) -@attr('amqp_test') +@pytest.mark.amqp_test def test_notification_filter_amqp(): """ test notification filter """ @@ -1128,7 +1116,7 @@ def test_notification_filter_amqp(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) result, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 topic_conf_list = [{'Id': notification_name+'_4', 'TopicArn': topic_arn, @@ -1147,7 +1135,7 @@ def test_notification_filter_amqp(): try: s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf4.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 skip_notif4 = False except Exception as error: print('note: metadata filter is not supported by boto3 - skipping test') @@ -1156,14 +1144,14 @@ def test_notification_filter_amqp(): # get all notifications result, status = s3_notification_conf.get_config() - assert_equal(status/100, 2) + assert status/100 == 2 for conf in result['TopicConfigurations']: filter_name = conf['Filter']['Key']['FilterRules'][0]['Name'] assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name if not skip_notif4: result, status = s3_notification_conf4.get_config(notification=notification_name+'_4') - assert_equal(status/100, 2) + assert status/100 == 2 filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name'] assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello' @@ -1212,9 +1200,9 @@ def test_notification_filter_amqp(): notif_id = event['Records'][0]['s3']['configurationId'] key_name = event['Records'][0]['s3']['object']['key'] awsRegion = event['Records'][0]['awsRegion'] - assert_equal(awsRegion, zonegroup) + assert awsRegion == zonegroup bucket_arn = event['Records'][0]['s3']['bucket']['arn'] - assert_equal(bucket_arn, "arn:aws:s3:"+awsRegion+"::"+bucket_name) + assert bucket_arn == "arn:aws:s3:"+awsRegion+"::"+bucket_name if notif_id == notification_name+'_1': found_in1.append(key_name) elif notif_id == notification_name+'_2': @@ -1226,11 +1214,11 @@ def test_notification_filter_amqp(): else: assert False, 'invalid notification: ' + notif_id - assert_equal(set(found_in1), set(expected_in1)) - assert_equal(set(found_in2), set(expected_in2)) - assert_equal(set(found_in3), set(expected_in3)) + assert set(found_in1) == set(expected_in1) + assert set(found_in2) == set(expected_in2) + assert set(found_in3) == set(expected_in3) if not skip_notif4: - assert_equal(set(found_in4), set(expected_in4)) + assert set(found_in4) == set(expected_in4) # cleanup s3_notification_conf.del_config() @@ -1244,7 +1232,7 @@ def test_notification_filter_amqp(): stop_amqp_receiver(receiver, task) -@attr('basic_test') +@pytest.mark.basic_test def test_notification_errors(): """ test notification set/get/delete """ conn = connection() @@ -1331,10 +1319,10 @@ def test_notification_errors(): status = topic_conf.del_config() # deleting an unknown notification is not considered an error - assert_equal(status, 200) + assert status == 200 _, status = topic_conf.get_config() - assert_equal(status, 404) + assert status == 404 # cleanup # delete the bucket @@ -1372,7 +1360,7 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 elif endpoint_type == 'amqp': # start amqp receiver exchange = 'ex1' @@ -1393,7 +1381,7 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 elif endpoint_type == 'kafka': # start kafka receiver default_kafka_server_and_port = default_kafka_server + ':9092' @@ -1418,9 +1406,9 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) # create objects in the bucket number_of_objects = 100 @@ -1478,14 +1466,14 @@ def notification(endpoint_type, conn, account=None, cloudevents=False, kafka_bro receiver.close(task) -@attr('amqp_test') +@pytest.mark.amqp_test def test_notification_amqp(): """ test pushing amqp notification """ conn = connection() notification('amqp', conn) -@attr('manual_test') +@pytest.mark.manual_test def test_notification_amqp_idleness_check(): """ test pushing amqp notification and checking for connection idleness """ hostname = get_ip() @@ -1516,7 +1504,7 @@ def test_notification_amqp_idleness_check(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket (async) number_of_objects = 10 @@ -1615,28 +1603,28 @@ def test_notification_amqp_idleness_check(): conn.delete_bucket(bucket_name) -@attr('kafka_test') +@pytest.mark.kafka_test def test_notification_kafka(): """ test pushing kafka notification """ conn = connection() notification('kafka', conn) -@attr('kafka_failover') +@pytest.mark.kafka_failover def test_notification_kafka_multiple_brokers_override(): """ test pushing kafka notification """ conn = connection() notification('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server)) -@attr('kafka_failover') +@pytest.mark.kafka_failover def test_notification_kafka_multiple_brokers_append(): """ test pushing kafka notification """ conn = connection() notification('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server)) -@attr('manual_test') +@pytest.mark.manual_test def test_1K_topics(): """ test creation of moe than 1K topics """ conn = connection() @@ -1739,7 +1727,7 @@ def test_1K_topics(): log.error(f"error during cleanup: {e}") -@attr('http_test') +@pytest.mark.http_test def test_notification_multi_delete(): """ test deletion of multiple keys """ hostname = get_ip() @@ -1771,7 +1759,7 @@ def test_notification_multi_delete(): }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket client_threads = [] @@ -1807,14 +1795,14 @@ def test_notification_multi_delete(): http_server.close() -@attr('http_test') +@pytest.mark.http_test def test_notification_http(): """ test pushing http notification """ conn = connection() notification('http', conn) -@attr('http_test') +@pytest.mark.http_test def test_notification_cloudevents(): """ test pushing cloudevents notification """ conn = connection() @@ -1822,7 +1810,7 @@ def test_notification_cloudevents(): -@attr('http_test') +@pytest.mark.http_test def test_opaque_data(): """ test that opaque id set in topic, is sent in notification """ hostname = get_ip() @@ -1855,7 +1843,7 @@ def test_opaque_data(): }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket client_threads = [] @@ -1879,7 +1867,7 @@ def test_opaque_data(): print('total number of objects: ' + str(len(keys))) events = http_server.get_and_reset_events() for event in events: - assert_equal(event['Records'][0]['opaqueData'], opaque_data) + assert event['Records'][0]['opaqueData'] == opaque_data # cleanup for key in keys: @@ -1926,7 +1914,7 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) @@ -1939,7 +1927,7 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket obj_prefix = 'ooo' @@ -1981,13 +1969,13 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa assert len(no_keys) == 0, "lifecycle didn't delete the objects after 500 seconds" wait_for_queue_to_drain(topic_name, http_port=port) - assert_equal(len(no_keys), 0) + assert len(no_keys) == 0 event_keys = [] events = receiver.get_and_reset_events() if not expected_abortion: assert number_of_objects * 2 <= len(events) for event in events: - assert_in(event['Records'][0]['eventName'], record_events) + assert event['Records'][0]['eventName'] in record_events event_keys.append(event['Records'][0]['s3']['object']['key']) for key in keys: key_found = False @@ -2028,7 +2016,7 @@ def create_thread(bucket, obj_prefix, i, content): return threading.Thread(target = set_contents_from_string, args=(key, content,)) -@attr('http_test') +@pytest.mark.http_test def test_lifecycle_http(): """ test that when object is deleted due to lifecycle policy, http endpoint """ @@ -2037,7 +2025,7 @@ def test_lifecycle_http(): rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current']) -@attr('kafka_test') +@pytest.mark.kafka_test def test_lifecycle_kafka(): """ test that when object is deleted due to lifecycle policy, kafka endpoint """ @@ -2055,7 +2043,7 @@ def start_and_abandon_multipart_upload(bucket, key_name, content): print('Error: ' + str(e)) -@attr('http_test') +@pytest.mark.http_test def test_lifecycle_abort_mpu(): """ test that when a multipart upload is aborted by lifecycle policy, http endpoint """ @@ -2085,7 +2073,7 @@ def creation_triggers(external_endpoint_address=None, ca_location=None, verify_s hostname = get_ip() proc = init_rabbitmq() if proc is None: - return SkipTest('end2end amqp tests require rabbitmq-server installed') + pytest.skip('end2end amqp tests require rabbitmq-server installed') else: proc = None @@ -2122,7 +2110,7 @@ def creation_triggers(external_endpoint_address=None, ca_location=None, verify_s s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 objects_size = {} # create objects in the bucket using PUT @@ -2168,12 +2156,12 @@ def creation_triggers(external_endpoint_address=None, ca_location=None, verify_s clean_rabbitmq(proc) -@attr('amqp_test') +@pytest.mark.amqp_test def test_creation_triggers_amqp(): creation_triggers(external_endpoint_address="amqp://localhost:5672") -@attr('amqp_ssl_test') +@pytest.mark.amqp_ssl_test def test_creation_triggers_external(): from distutils.util import strtobool @@ -2191,7 +2179,7 @@ def test_creation_triggers_external(): external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'], verify_ssl=verify_ssl) else: - return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run") + pytest.skip("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run") def generate_private_key(tempdir): @@ -2285,7 +2273,7 @@ def generate_private_key(tempdir): return CACERTFILE, CERTFILE, KEYFILE -@attr('amqp_ssl_test') +@pytest.mark.amqp_ssl_test def test_creation_triggers_ssl(): import textwrap @@ -2314,7 +2302,7 @@ def test_creation_triggers_ssl(): del os.environ['RABBITMQ_CONFIG_FILE'] -@attr('amqp_test') +@pytest.mark.amqp_test def test_post_object_upload_amqp(): """ test that uploads object using HTTP POST """ @@ -2364,18 +2352,18 @@ def test_post_object_upload_amqp(): }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ ("Content-Type" , "text/plain"),('file', ('bar'))]) # POST upload r = requests.post(url, files=payload, verify=True) - assert_equal(r.status_code, 204) + assert r.status_code == 204 # check amqp receiver events = receiver1.get_and_reset_events() - assert_equal(len(events), 1) + assert len(events) == 1 # cleanup stop_amqp_receiver(receiver1, task1) @@ -2419,7 +2407,7 @@ def multipart_endpoint_agnostic(endpoint_type, conn): endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) @@ -2432,7 +2420,7 @@ def multipart_endpoint_agnostic(endpoint_type, conn): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket client_threads = [] @@ -2461,21 +2449,21 @@ def multipart_endpoint_agnostic(endpoint_type, conn): receiver.close(task) -@attr('http_test') +@pytest.mark.http_test def test_multipart_http(): """ test http multipart object upload """ conn = connection() multipart_endpoint_agnostic('http', conn) -@attr('kafka_test') +@pytest.mark.kafka_test def test_multipart_kafka(): """ test kafka multipart object upload """ conn = connection() multipart_endpoint_agnostic('kafka', conn) -@attr('amqp_test') +@pytest.mark.amqp_test def test_multipart_ampq(): """ test ampq multipart object upload """ conn = connection() @@ -2514,7 +2502,7 @@ def metadata_filter(endpoint_type, conn): endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) # create topic zonegroup = get_config_zonegroup() @@ -2535,7 +2523,7 @@ def metadata_filter(endpoint_type, conn): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 expected_keys = [] # create objects in the bucket @@ -2578,7 +2566,7 @@ def metadata_filter(endpoint_type, conn): wait_for_queue_to_drain(topic_name, http_port=port) # check amqp receiver events = receiver.get_and_reset_events() - assert_equal(len(events), len(expected_keys)) + assert len(events) == len(expected_keys) for event in events: assert(event['Records'][0]['s3']['object']['key'] in expected_keys) @@ -2589,7 +2577,7 @@ def metadata_filter(endpoint_type, conn): wait_for_queue_to_drain(topic_name, http_port=port) # check endpoint receiver events = receiver.get_and_reset_events() - assert_equal(len(events), len(expected_keys)) + assert len(events) == len(expected_keys) for event in events: assert(event['Records'][0]['s3']['object']['key'] in expected_keys) @@ -2601,28 +2589,28 @@ def metadata_filter(endpoint_type, conn): conn.delete_bucket(bucket_name) -@attr('kafka_test') +@pytest.mark.kafka_test def test_metadata_filter_kafka(): """ test notification of filtering metadata, kafka endpoint """ conn = connection() metadata_filter('kafka', conn) -@attr('http_test') +@pytest.mark.http_test def test_metadata_filter_http(): """ test notification of filtering metadata, http endpoint """ conn = connection() metadata_filter('http', conn) -@attr('amqp_test') +@pytest.mark.amqp_test def test_metadata_filter_ampq(): """ test notification of filtering metadata, ampq endpoint """ conn = connection() metadata_filter('amqp', conn) -@attr('amqp_test') +@pytest.mark.amqp_test def test_metadata_amqp(): """ test notification of metadata """ @@ -2656,7 +2644,7 @@ def test_metadata_amqp(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket key_name = 'foo' @@ -2695,7 +2683,7 @@ def test_metadata_amqp(): events = receiver.get_and_reset_events() for event in events: value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key] - assert_equal(value[0], meta_value) + assert value[0] == meta_value # delete objects for key in bucket.list(): @@ -2706,7 +2694,7 @@ def test_metadata_amqp(): events = receiver.get_and_reset_events() for event in events: value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key] - assert_equal(value[0], meta_value) + assert value[0] == meta_value # cleanup stop_amqp_receiver(receiver, task) @@ -2716,7 +2704,7 @@ def test_metadata_amqp(): conn.delete_bucket(bucket_name) -@attr('amqp_test') +@pytest.mark.amqp_test def test_tags_amqp(): """ test notification of tags """ @@ -2752,7 +2740,7 @@ def test_tags_amqp(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 expected_keys = [] # create objects in the bucket with tags @@ -2797,11 +2785,11 @@ def test_tags_amqp(): key = event['Records'][0]['s3']['object']['key'] if (key == key_name1): obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val']) - assert_equal(obj_tags, expected_tags1) + assert obj_tags == expected_tags1 event_count += 1 assert(key in expected_keys) - assert_equal(event_count, len(expected_keys)) + assert event_count == len(expected_keys) # delete the objects for key in bucket.list(): @@ -2814,7 +2802,7 @@ def test_tags_amqp(): key = event['Records'][0]['s3']['object']['key'] if (key == key_name1): obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val']) - assert_equal(obj_tags, expected_tags1) + assert obj_tags == expected_tags1 event_count += 1 assert(key in expected_keys) @@ -2827,7 +2815,7 @@ def test_tags_amqp(): # delete the bucket conn.delete_bucket(bucket_name) -@attr('amqp_test') +@pytest.mark.amqp_test def test_versioning_amqp(): """ test notification of object versions """ @@ -2858,7 +2846,7 @@ def test_versioning_amqp(): }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket key_name = 'foo' @@ -2889,7 +2877,7 @@ def test_versioning_amqp(): else: print('version ok: '+version+' in: '+str(versions)) - assert_equal(num_of_versions, 3) + assert num_of_versions == 3 # cleanup stop_amqp_receiver(receiver, task) @@ -2902,7 +2890,7 @@ def test_versioning_amqp(): #conn.delete_bucket(bucket_name) -@attr('amqp_test') +@pytest.mark.amqp_test def test_versioned_deletion_amqp(): """ test notification of deletion markers """ @@ -2939,7 +2927,7 @@ def test_versioned_deletion_amqp(): }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket key = bucket.new_key('foo') @@ -2988,10 +2976,10 @@ def test_versioned_deletion_amqp(): # 2 key versions were deleted # notified over the same topic via 2 notifications (1,3) - assert_equal(delete_events, 2*2) + assert delete_events == 2*2 # 1 deletion marker was created # notified over the same topic over 2 notifications (1,2) - assert_equal(delete_marker_create_events, 1*2) + assert delete_marker_create_events == 1*2 # cleanup delete_marker_key.delete() @@ -3002,7 +2990,7 @@ def test_versioned_deletion_amqp(): conn.delete_bucket(bucket_name) -@attr('manual_test') +@pytest.mark.manual_test def test_persistent_cleanup(): """ test reservation cleanup after gateway crash """ conn = connection() @@ -3035,7 +3023,7 @@ def test_persistent_cleanup(): }] s3_notification_conf = PSNotificationS3(gw, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 client_threads = [] start_time = time.time() @@ -3129,7 +3117,7 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non if http_port: check_http_server(http_port) result = admin(cmd, get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) entries = parsed_result['Topic Stats']['Entries'] retries += 1 @@ -3137,7 +3125,7 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non log.info('shards for %s has %d entries after %ds', topic_name, entries, time_diff) if retries > 100: log.warning('shards for %s still has %d entries after %ds', topic_name, entries, time_diff) - assert_equal(entries, 0) + assert entries == 0 time.sleep(5) time_diff = time.time() - start_time log.info('waited for %ds for shards of %s to drain', time_diff, topic_name) @@ -3178,7 +3166,7 @@ def persistent_topic_stats(conn, endpoint_type): endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ '&retry_sleep_duration=1' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) # create topic topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) @@ -3191,7 +3179,7 @@ def persistent_topic_stats(conn, endpoint_type): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # topic stats get_stats_persistent_topic(topic_name, 0) @@ -3254,28 +3242,28 @@ def persistent_topic_stats(conn, endpoint_type): receiver.close(task) -@attr('http_test') +@pytest.mark.http_test def test_persistent_topic_stats_http(): """ test persistent topic stats, http endpoint """ conn = connection() persistent_topic_stats(conn, 'http') -@attr('kafka_test') +@pytest.mark.kafka_test def test_persistent_topic_stats_kafka(): """ test persistent topic stats, kafka endpoint """ conn = connection() persistent_topic_stats(conn, 'kafka') -@attr('amqp_test') +@pytest.mark.amqp_test def test_persistent_topic_stats_amqp(): """ test persistent topic stats, amqp endpoint """ conn = connection() persistent_topic_stats(conn, 'amqp') -@attr('kafka_test') +@pytest.mark.kafka_test def test_persistent_topic_dump(): """ test persistent topic dump """ conn = connection() @@ -3306,7 +3294,7 @@ def test_persistent_topic_dump(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket (async) number_of_objects = 20 @@ -3324,9 +3312,9 @@ def test_persistent_topic_dump(): # topic dump result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result), number_of_objects) + assert len(parsed_result) == number_of_objects # delete objects from the bucket client_threads = [] @@ -3341,9 +3329,9 @@ def test_persistent_topic_dump(): # topic stats result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result), 2*number_of_objects) + assert len(parsed_result) == 2*number_of_objects # change the endpoint port endpoint_address = 'kafka://' + host @@ -3355,9 +3343,9 @@ def test_persistent_topic_dump(): wait_for_queue_to_drain(topic_name,) result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(len(parsed_result), 0) + assert len(parsed_result) == 0 # cleanup s3_notification_conf.del_config() @@ -3394,13 +3382,13 @@ def persistent_topic_configs(persistency_time, config_dict): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # topic get parsed_result = get_topic(topic_name) parsed_result_dest = parsed_result["dest"] for key, value in config_dict.items(): - assert_equal(parsed_result_dest[key], str(value)) + assert parsed_result_dest[key] == str(value) # topic stats get_stats_persistent_topic(topic_name, 0) @@ -3443,7 +3431,7 @@ def persistent_topic_configs(persistency_time, config_dict): [thr.join() for thr in client_threads] time_diff = time.time() - start_time log.info('deletion of %d objects took %f seconds', count, time_diff) - assert_equal(count, number_of_objects) + assert count == number_of_objects # topic stats if time_diff > persistency_time: @@ -3469,7 +3457,7 @@ def create_persistency_config_string(config_dict): return str_ret[:-1] -@attr('basic_test') +@pytest.mark.basic_test def test_persistent_topic_configs_ttl(): """ test persistent topic configurations with time_to_live """ config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"} @@ -3477,7 +3465,7 @@ def test_persistent_topic_configs_ttl(): persistent_topic_configs(persistency_time, config_dict) -@attr('basic_test') +@pytest.mark.basic_test def test_persistent_topic_configs_max_retries(): """ test persistent topic configurations with max_retries and retry_sleep_duration """ config_dict = {"time_to_live": "None", "max_retries": 10, "retry_sleep_duration": 1} @@ -3485,7 +3473,7 @@ def test_persistent_topic_configs_max_retries(): persistent_topic_configs(persistency_time, config_dict) -@attr('manual_test') +@pytest.mark.manual_test def test_persistent_notificationback(): """ test pushing persistent notification pushback """ conn = connection() @@ -3515,7 +3503,7 @@ def test_persistent_notificationback(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket (async) for j in range(100): @@ -3566,7 +3554,7 @@ def test_persistent_notificationback(): http_server.close() -@attr('kafka_test') +@pytest.mark.kafka_test def test_notification_kafka_idle_behaviour(): """ test pushing kafka notification idle behaviour check """ conn = connection() @@ -3597,7 +3585,7 @@ def test_notification_kafka_idle_behaviour(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket (async) number_of_objects = 10 @@ -3699,16 +3687,16 @@ def test_notification_kafka_idle_behaviour(): stop_kafka_receiver(receiver, task) -@attr('not_implemented') +@pytest.mark.not_implemented def test_persistent_gateways_recovery(): """ test gateway recovery of persistent notifications """ - return SkipTest('This test is yet to be implemented') + pytest.skip('This test is yet to be implemented') -@attr('not_implemented') +@pytest.mark.not_implemented def test_persistent_multiple_gateways(): """ test pushing persistent notification via two gateways """ - return SkipTest('This test is yet to be implemented') + pytest.skip('This test is yet to be implemented') def persistent_topic_multiple_endpoints(conn, endpoint_type): @@ -3748,7 +3736,7 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type): endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ '&retry_sleep_duration=1' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) # create two topics topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) @@ -3766,14 +3754,14 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type): }] s3_notification_conf1 = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, 'Events': [] }] s3_notification_conf2 = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 client_threads = [] start_time = time.time() @@ -3812,14 +3800,14 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type): receiver.close(task) -@attr('http_test') +@pytest.mark.http_test def test_persistent_multiple_endpoints_http(): """ test pushing persistent notification when one of the endpoints has error, http endpoint """ conn = connection() persistent_topic_multiple_endpoints(conn, 'http') -@attr('kafka_test') +@pytest.mark.kafka_test def test_persistent_multiple_endpoints_kafka(): """ test pushing persistent notification when one of the endpoints has error, kafka endpoint """ conn = connection() @@ -3860,7 +3848,7 @@ def persistent_notification(endpoint_type, conn, account=None): endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) # create topic @@ -3874,7 +3862,7 @@ def persistent_notification(endpoint_type, conn, account=None): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket (async) number_of_objects = 100 @@ -3921,13 +3909,13 @@ def persistent_notification(endpoint_type, conn, account=None): receiver.close(task) -@attr('http_test') +@pytest.mark.http_test def test_persistent_notification_http(): """ test pushing persistent notification http """ conn = connection() persistent_notification('http', conn) -@attr('http_test') +@pytest.mark.http_test def test_persistent_notification_http_account(): """ test pushing persistent notification via http for account user """ @@ -3935,7 +3923,7 @@ def test_persistent_notification_http_account(): user = UID_PREFIX + 'test' _, rc = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster()) - assert_true(rc in [0, 17]) # EEXIST okay if we rerun + assert rc in [0, 17] # EEXIST okay if we rerun conn, _ = another_user(user=user, account=account) try: @@ -3944,14 +3932,14 @@ def test_persistent_notification_http_account(): admin(['user', 'rm', '--uid', user], get_config_cluster()) admin(['account', 'rm', '--account-id', account], get_config_cluster()) -@attr('amqp_test') +@pytest.mark.amqp_test def test_persistent_notification_amqp(): """ test pushing persistent notification amqp """ conn = connection() persistent_notification('amqp', conn) -@attr('kafka_test') +@pytest.mark.kafka_test def test_persistent_notification_kafka(): """ test pushing persistent notification kafka """ conn = connection() @@ -3964,7 +3952,7 @@ def random_string(length): return ''.join(random.choice(letters) for i in range(length)) -@attr('amqp_test') +@pytest.mark.amqp_test def test_persistent_notification_large_amqp(): """ test pushing persistent notification of large notifications """ @@ -3999,7 +3987,7 @@ def test_persistent_notification_large_amqp(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket (async) number_of_objects = 100 @@ -4050,29 +4038,29 @@ def test_persistent_notification_large_amqp(): stop_amqp_receiver(receiver, task) -@attr('not_implemented') +@pytest.mark.not_implemented def test_topic_update(): """ test updating topic associated with a notification""" - return SkipTest('This test is yet to be implemented') + pytest.skip('This test is yet to be implemented') -@attr('not_implemented') +@pytest.mark.not_implemented def test_notification_update(): """ test updating the topic of a notification""" - return SkipTest('This test is yet to be implemented') + pytest.skip('This test is yet to be implemented') -@attr('not_implemented') +@pytest.mark.not_implemented def test_multiple_topics_notification(): """ test notification creation with multiple topics""" - return SkipTest('This test is yet to be implemented') + pytest.skip('This test is yet to be implemented') -@attr('basic_test') +@pytest.mark.basic_test def test_list_topics_migration(): """ test list topics on migration""" if get_config_cluster() == 'noname': - return SkipTest('realm is needed for migration test') + pytest.skip('realm is needed for migration test') # Initialize connections and configurations conn1 = connection() @@ -4143,21 +4131,21 @@ def test_list_topics_migration(): try: # Verify no tenant topics res, status = topic_conf.get_list() - assert_equal(status // 100, 2) + assert status // 100 == 2 listTopicsResponse = res.get('ListTopicsResponse', {}) listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) topics = listTopicsResult.get('Topics', {}) member = topics['member'] if topics else [] - assert_equal(len(member), 6) + assert len(member) == 6 # Verify tenant topics res, status = tenant_topic_conf.get_list() - assert_equal(status // 100, 2) + assert status // 100 == 2 listTopicsResponse = res.get('ListTopicsResponse', {}) listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) topics = listTopicsResult.get('Topics', {}) member = topics['member'] if topics else [] - assert_equal(len(member), 3) + assert len(member) == 3 finally: # Cleanup created topics topic_conf.del_config(topic_arn1) @@ -4171,7 +4159,7 @@ def test_list_topics_migration(): tenant_topic_conf.del_config(tenant_topic_arn3) -@attr('basic_test') +@pytest.mark.basic_test def test_list_topics(): """ test list topics""" @@ -4211,21 +4199,21 @@ def test_list_topics(): try: # Verify no tenant topics res, status = topic_conf.get_list() - assert_equal(status // 100, 2) + assert status // 100 == 2 listTopicsResponse = res.get('ListTopicsResponse', {}) listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) topics = listTopicsResult.get('Topics', {}) member = topics['member'] if topics else [] # version 2 - assert_equal(len(member), 3) + assert len(member) == 3 # Verify topics for tenant res, status = tenant_topic_conf.get_list() - assert_equal(status // 100, 2) + assert status // 100 == 2 listTopicsResponse = res.get('ListTopicsResponse', {}) listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) topics = listTopicsResult.get('Topics', {}) member = topics['member'] if topics else [] - assert_equal(len(member), 2) + assert len(member) == 2 finally: # Cleanup created topics topic_conf.del_config(topic_arn1) @@ -4234,11 +4222,11 @@ def test_list_topics(): tenant_topic_conf.del_config(tenant_topic_arn1) tenant_topic_conf.del_config(tenant_topic_arn2) -@attr('basic_test') +@pytest.mark.basic_test def test_list_topics_v1(): """ test list topics on v1""" if get_config_cluster() == 'noname': - return SkipTest('realm is needed') + pytest.skip('realm is needed') # Initialize connections and configurations conn1 = connection() @@ -4280,21 +4268,21 @@ def test_list_topics_v1(): try: # Verify no tenant topics res, status = topic_conf.get_list() - assert_equal(status // 100, 2) + assert status // 100 == 2 listTopicsResponse = res.get('ListTopicsResponse', {}) listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) topics = listTopicsResult.get('Topics', {}) member = topics['member'] if topics else [] - assert_equal(len(member), 3) + assert len(member) == 3 # Verify tenant topics res, status = tenant_topic_conf.get_list() - assert_equal(status // 100, 2) + assert status // 100 == 2 listTopicsResponse = res.get('ListTopicsResponse', {}) listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) topics = listTopicsResult.get('Topics', {}) member = topics['member'] if topics else [] - assert_equal(len(member), 2) + assert len(member) == 2 finally: # Cleanup created topics topic_conf.del_config(topic_arn1) @@ -4338,16 +4326,16 @@ def topic_permissions(another_tenant=""): assert False, "'AuthorizationError' error is expected" except ClientError as err: if 'Error' in err.response: - assert_equal(err.response['Error']['Code'], 'AuthorizationError') + assert err.response['Error']['Code'] == 'AuthorizationError' else: - assert_equal(err.response['Code'], 'AuthorizationError') + assert err.response['Code'] == 'AuthorizationError' except Exception as err: print('unexpected error type: '+type(err).__name__) assert False, "'AuthorizationError' error is expected" # 2nd user tries to fetch the topic _, status = topic_conf2.get_config(topic_arn=topic_arn) - assert_equal(status, 403) + assert status == 403 try: # 2nd user tries to set the attribute @@ -4355,9 +4343,9 @@ def topic_permissions(another_tenant=""): assert False, "'AuthorizationError' error is expected" except ClientError as err: if 'Error' in err.response: - assert_equal(err.response['Error']['Code'], 'AuthorizationError') + assert err.response['Error']['Code'] == 'AuthorizationError' else: - assert_equal(err.response['Code'], 'AuthorizationError') + assert err.response['Code'] == 'AuthorizationError' except Exception as err: print('unexpected error type: '+type(err).__name__) assert False, "'AuthorizationError' error is expected" @@ -4374,9 +4362,9 @@ def topic_permissions(another_tenant=""): assert False, "'AccessDenied' error is expected" except ClientError as err: if 'Error' in err.response: - assert_equal(err.response['Error']['Code'], 'AccessDenied') + assert err.response['Error']['Code'] == 'AccessDenied' else: - assert_equal(err.response['Code'], 'AccessDenied') + assert err.response['Code'] == 'AccessDenied' except Exception as err: print('unexpected error type: '+type(err).__name__) assert False, "'AuthorizationError' error is expected" @@ -4387,9 +4375,9 @@ def topic_permissions(another_tenant=""): assert False, "'AuthorizationError' error is expected" except ClientError as err: if 'Error' in err.response: - assert_equal(err.response['Error']['Code'], 'AuthorizationError') + assert err.response['Error']['Code'] == 'AuthorizationError' else: - assert_equal(err.response['Code'], 'AuthorizationError') + assert err.response['Code'] == 'AuthorizationError' except Exception as err: print('unexpected error type: '+type(err).__name__) assert False, "'AuthorizationError' error is expected" @@ -4400,17 +4388,17 @@ def topic_permissions(another_tenant=""): topic_arn = topic_conf.set_config() # 2nd user try to fetch topic again _, status = topic_conf2.get_config(topic_arn=topic_arn) - assert_equal(status, 200) + assert status == 200 # 2nd user tries to set the attribute again status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn) - assert_equal(status, 200) + assert status == 200 # 2nd user tries to publish notification again s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) _, status = s3_notification_conf2.set_config() - assert_equal(status, 200) + assert status == 200 # 2nd user tries to delete the topic again status = topic_conf2.del_config(topic_arn=topic_arn) - assert_equal(status, 200) + assert status == 200 # cleanup s3_notification_conf2.del_config() @@ -4418,17 +4406,17 @@ def topic_permissions(another_tenant=""): conn2.delete_bucket(bucket_name) -@attr('basic_test') +@pytest.mark.basic_test def test_topic_permissions_same_tenant(): topic_permissions() -@attr('basic_test') +@pytest.mark.basic_test def test_topic_permissions_cross_tenant(): topic_permissions(another_tenant="boom") -@attr('basic_test') +@pytest.mark.basic_test def test_topic_no_permissions(): """ test topic set/get/delete permissions """ conn1 = connection() @@ -4450,15 +4438,15 @@ def test_topic_no_permissions(): assert False, "'AuthorizationError' error is expected" except ClientError as err: if 'Error' in err.response: - assert_equal(err.response['Error']['Code'], 'AuthorizationError') + assert err.response['Error']['Code'] == 'AuthorizationError' else: - assert_equal(err.response['Code'], 'AuthorizationError') + assert err.response['Code'] == 'AuthorizationError' except Exception as err: print('unexpected error type: '+type(err).__name__) # 2nd user tries to fetch the topic _, status = topic_conf2.get_config(topic_arn=topic_arn) - assert_equal(status, 403) + assert status == 403 try: # 2nd user tries to set the attribute @@ -4466,9 +4454,9 @@ def test_topic_no_permissions(): assert False, "'AuthorizationError' error is expected" except ClientError as err: if 'Error' in err.response: - assert_equal(err.response['Error']['Code'], 'AuthorizationError') + assert err.response['Error']['Code'] == 'AuthorizationError' else: - assert_equal(err.response['Code'], 'AuthorizationError') + assert err.response['Code'] == 'AuthorizationError' except Exception as err: print('unexpected error type: '+type(err).__name__) @@ -4481,7 +4469,7 @@ def test_topic_no_permissions(): }] s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) _, status = s3_notification_conf2.set_config() - assert_equal(status, 200) + assert status == 200 try: # 2nd user tries to delete the topic @@ -4489,9 +4477,9 @@ def test_topic_no_permissions(): assert False, "'AuthorizationError' error is expected" except ClientError as err: if 'Error' in err.response: - assert_equal(err.response['Error']['Code'], 'AuthorizationError') + assert err.response['Error']['Code'] == 'AuthorizationError' else: - assert_equal(err.response['Code'], 'AuthorizationError') + assert err.response['Code'] == 'AuthorizationError' except Exception as err: print('unexpected error type: '+type(err).__name__) @@ -4594,41 +4582,41 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F stop_kafka_receiver(receiver, task) -@attr('kafka_security_test') +@pytest.mark.kafka_security_test def test_notification_kafka_security_ssl(): kafka_security('SSL') -@attr('kafka_security_test') +@pytest.mark.kafka_security_test def test_notification_kafka_security_ssl_sasl(): kafka_security('SASL_SSL') -@attr('kafka_security_test') +@pytest.mark.kafka_security_test def test_notification_kafka_security_ssl_sasl_attrs(): kafka_security('SASL_SSL', use_topic_attrs_for_creds=True) -@attr('kafka_security_test') +@pytest.mark.kafka_security_test def test_notification_kafka_security_sasl(): kafka_security('SASL_PLAINTEXT') -@attr('kafka_security_test') +@pytest.mark.kafka_security_test def test_notification_kafka_security_ssl_sasl_scram(): kafka_security('SASL_SSL', mechanism='SCRAM-SHA-256') -@attr('kafka_security_test') +@pytest.mark.kafka_security_test def test_notification_kafka_security_sasl_scram(): kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256') -@attr('http_test') +@pytest.mark.http_test def test_persistent_reload(): """ do a realm reload while we send notifications """ if get_config_cluster() == 'noname': - return SkipTest('realm is needed for reload test') + pytest.skip('realm is needed for reload test') conn = connection() zonegroup = get_config_zonegroup() @@ -4670,7 +4658,7 @@ def test_persistent_reload(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # topic stats get_stats_persistent_topic(topic_name1, 0) @@ -4749,7 +4737,7 @@ def poll_on_topic(topic_name, tenant=''): def persistent_data_path_v2_migration(conn, endpoint_type): """ test data path v2 persistent migration """ if get_config_cluster() == 'noname': - return SkipTest('realm is needed for migration test') + pytest.skip('realm is needed for migration test') zonegroup = get_config_zonegroup() # disable v2 notification @@ -4788,7 +4776,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type): endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ '&retry_sleep_duration=1' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() @@ -4800,7 +4788,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # topic stats get_stats_persistent_topic(topic_name, 0) @@ -4877,25 +4865,25 @@ def persistent_data_path_v2_migration(conn, endpoint_type): receiver.close(task) -@attr('http_test') +@pytest.mark.http_test def persistent_data_path_v2_migration_http(): """ test data path v2 persistent migration, http endpoint """ conn = connection() persistent_data_path_v2_migration(conn, 'http') -@attr('kafka_test') +@pytest.mark.kafka_test def persistent_data_path_v2_migration_kafka(): """ test data path v2 persistent migration, kafka endpoint """ conn = connection() persistent_data_path_v2_migration(conn, 'kafka') -@attr('http_test') +@pytest.mark.http_test def test_data_path_v2_migration(): """ test data path v2 migration """ if get_config_cluster() == 'noname': - return SkipTest('realm is needed for migration test') + pytest.skip('realm is needed for migration test') conn = connection() zonegroup = get_config_zonegroup() @@ -4927,7 +4915,7 @@ def test_data_path_v2_migration(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 # create objects in the bucket (async) number_of_objects = 10 @@ -4989,11 +4977,11 @@ def test_data_path_v2_migration(): http_server.close() -@attr('basic_test') +@pytest.mark.basic_test def test_data_path_v2_large_migration(): """ test data path v2 large migration """ if get_config_cluster() == 'noname': - return SkipTest('realm is needed for migration test') + pytest.skip('realm is needed for migration test') conn = connection() connections_list = [] connections_list.append(conn) @@ -5045,7 +5033,7 @@ def test_data_path_v2_large_migration(): s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list) s3_notification_conf_list.append(s3_notification_conf) response, status = s3_notification_conf.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # create topic to poll on polling_topics_conf = [] @@ -5082,11 +5070,11 @@ def test_data_path_v2_large_migration(): conn.delete_bucket(bucket.name) -@attr('basic_test') +@pytest.mark.basic_test def test_data_path_v2_mixed_migration(): """ test data path v2 mixed migration """ if get_config_cluster() == 'noname': - return SkipTest('realm is needed for migration test') + pytest.skip('realm is needed for migration test') conn = connection() connections_list = [] connections_list.append(conn) @@ -5139,7 +5127,7 @@ def test_data_path_v2_mixed_migration(): s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list) s3_notification_conf_list.append(s3_notification_conf) response, status = s3_notification_conf.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # disable v2 notification zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) @@ -5165,7 +5153,7 @@ def test_data_path_v2_mixed_migration(): s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list) s3_notification_conf_list.append(s3_notification_conf) response, status = s3_notification_conf.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # create topic to poll on polling_topics_conf = [] @@ -5199,7 +5187,7 @@ def test_data_path_v2_mixed_migration(): conn.delete_bucket(bucket.name) -@attr('kafka_test') +@pytest.mark.kafka_test def test_notification_caching(): """ test notification caching """ conn = connection() @@ -5229,7 +5217,7 @@ def test_notification_caching(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # create objects in the bucket (async) number_of_objects = 10 @@ -5265,9 +5253,9 @@ def test_notification_caching(): # topic stats result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) + assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects # remove the port and update the topic, so its pointing to correct endpoint. endpoint_address = 'kafka://' + default_kafka_server @@ -5286,7 +5274,7 @@ def test_notification_caching(): receiver.close(task) -@attr('kafka_test') +@pytest.mark.kafka_test def test_connection_caching(): """ test connection caching """ conn = connection() @@ -5324,7 +5312,7 @@ def test_connection_caching(): s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # create objects in the bucket (async) number_of_objects = 10 @@ -5360,9 +5348,9 @@ def test_connection_caching(): # topic stats result = admin(['topic', 'stats', '--topic', topic_name_1], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) + assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects # remove the ssl from topic1 and update the topic. endpoint_address = 'kafka://' + default_kafka_server @@ -5374,9 +5362,9 @@ def test_connection_caching(): # topic stats for 2nd topic will still reflect entries result = admin(['topic', 'stats', '--topic', topic_name_2], get_config_cluster()) - assert_equal(result[1], 0) + assert result[1] == 0 parsed_result = json.loads(result[0]) - assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) + assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects # cleanup s3_notification_conf.del_config() @@ -5388,7 +5376,7 @@ def test_connection_caching(): receiver_2.close(task_2) -@attr("http_test") +@pytest.mark.http_test def test_topic_migration_to_an_account(): """test the topic migration procedure described at https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account @@ -5433,7 +5421,7 @@ def test_topic_migration_to_an_account(): user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args ) topic_arn = topic_conf.set_config() - assert_equal(topic_arn, expected_topic_arn) + assert topic_arn == expected_topic_arn log.info( f"{user1_conn.access_key} created the topic {topic_arn} with args {endpoint_args}" ) @@ -5454,18 +5442,18 @@ def test_topic_migration_to_an_account(): user2_conn, user2_bucket_name, topic_conf_list ) _, status = s3_notification_conf1.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 _, status = s3_notification_conf2.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # verify both buckets are subscribed to the topic rgw_topic_entry = [ t for t in list_topics() if t["name"] == topic_name ] - assert_equal(len(rgw_topic_entry), 1) + assert len(rgw_topic_entry) == 1 subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"] - assert_equal(len(subscribed_buckets), 2) - assert_in(user1_bucket_name, subscribed_buckets) - assert_in(user2_bucket_name, subscribed_buckets) + assert len(subscribed_buckets) == 2 + assert user1_bucket_name in subscribed_buckets + assert user2_bucket_name in subscribed_buckets log.info( "buckets {user1_bucket_name} and {user2_bucket_name} are subscribed to {topic_arn}" ) @@ -5507,7 +5495,7 @@ def test_topic_migration_to_an_account(): user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args ) account_topic_arn = topic_conf.set_config() - assert_equal(account_topic_arn, expected_topic_arn) + assert account_topic_arn == expected_topic_arn log.info( f"{user1_conn.access_key} created the account topic {account_topic_arn} with args {endpoint_args}" ) @@ -5533,23 +5521,23 @@ def test_topic_migration_to_an_account(): user1_conn, user1_bucket_name, topic_conf_list ) _, status = s3_notification_conf1.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # verify subscriptions to the account topic rgw_topic_entry = [ t for t in list_topics(tenant=account_id) if t["name"] == topic_name ] - assert_equal(len(rgw_topic_entry), 1) + assert len(rgw_topic_entry) == 1 subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"] - assert_equal(len(subscribed_buckets), 1) - assert_in(user1_bucket_name, subscribed_buckets) - assert_not_in(user2_bucket_name, subscribed_buckets) + assert len(subscribed_buckets) == 1 + assert user1_bucket_name in subscribed_buckets + assert user2_bucket_name not in subscribed_buckets # verify bucket notifications while 2 test buckets are in the mixed mode notification_list = list_notifications(user1_bucket_name, assert_len=1) - assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn) + assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn notification_list = list_notifications(user2_bucket_name, assert_len=1) - assert_equal(notification_list["notifications"][0]["TopicArn"], topic_arn) + assert notification_list["notifications"][0]["TopicArn"] == topic_arn # verify both topics are functional at the same time with no duplicate notifications user1_bucket.new_key("user1obj1").set_contents_from_string("object content") @@ -5565,12 +5553,12 @@ def test_topic_migration_to_an_account(): user2_conn, user2_bucket_name, topic_conf_list ) _, status = s3_notification_conf2.set_config() - assert_equal(status / 100, 2) + assert status / 100 == 2 # remove old topic # note that, although account topic has the same name, it has to be scoped by account/tenant id to be removed # so below command will only remove the old topic _, rc = admin(["topic", "rm", "--topic", topic_name], get_config_cluster()) - assert_equal(rc, 0) + assert rc == 0 # now verify account topic serves both buckets rgw_topic_entry = [ @@ -5578,16 +5566,16 @@ def test_topic_migration_to_an_account(): for t in list_topics(tenant=account_id) if t["name"] == topic_name ] - assert_equal(len(rgw_topic_entry), 1) + assert len(rgw_topic_entry) == 1 subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"] - assert_equal(len(subscribed_buckets), 2) - assert_in(user1_bucket_name, subscribed_buckets) - assert_in(user2_bucket_name, subscribed_buckets) + assert len(subscribed_buckets) == 2 + assert user1_bucket_name in subscribed_buckets + assert user2_bucket_name in subscribed_buckets # verify bucket notifications after 2 test buckets are updated to use the account topic notification_list = list_notifications(user1_bucket_name, assert_len=1) - assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn) + assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn notification_list = list_notifications(user2_bucket_name, assert_len=1) - assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn) + assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn # finally, make sure that notifications are going thru via the new account topic user1_bucket.new_key("user1obj1").set_contents_from_string("object content") @@ -5645,7 +5633,7 @@ def persistent_notification_shard_config_change(endpoint_type, conn, new_num_sha endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' else: - return SkipTest('Unknown endpoint type: ' + endpoint_type) + pytest.skip('Unknown endpoint type: ' + endpoint_type) zonegroup = get_config_zonegroup() topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) @@ -5659,7 +5647,7 @@ def persistent_notification_shard_config_change(endpoint_type, conn, new_num_sha s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + assert status/100 == 2 ## create objects in the bucket (async) expected_keys = [] @@ -5692,7 +5680,7 @@ def create_object_and_verify_events(bucket, key_name, topic_name, receiver, expe print('wait for the messages...') wait_for_queue_to_drain(topic_name) events = receiver.get_and_reset_events() - assert_equal(len(events), len(expected_keys)) + assert len(events) == len(expected_keys) for event in events: assert(event['Records'][0]['s3']['object']['key'] in expected_keys) @@ -5704,28 +5692,28 @@ def create_object_and_verify_events(bucket, key_name, topic_name, receiver, expe wait_for_queue_to_drain(topic_name) # check endpoint receiver events = receiver.get_and_reset_events() - assert_equal(len(events), len(expected_keys)) + assert len(events) == len(expected_keys) for event in events: assert(event['Records'][0]['s3']['object']['key'] in expected_keys) -@attr('http_test') +@pytest.mark.http_test def test_backward_compatibility_persistent_sharded_topic_http(): conn = connection() persistent_notification_shard_config_change('http', conn, new_num_shards=11, old_num_shards=1) -@attr('kafka_test') +@pytest.mark.kafka_test def test_backward_compatibility_persistent_sharded_topic_kafka(): conn = connection() persistent_notification_shard_config_change('kafka', conn, new_num_shards=11, old_num_shards=1) -@attr('http_test') +@pytest.mark.http_test def test_persistent_sharded_topic_config_change_http(): conn = connection() new_num_shards = random.randint(2, 10) default_num_shards = 11 persistent_notification_shard_config_change('http', conn, new_num_shards, default_num_shards) -@attr('kafka_test') +@pytest.mark.kafka_test def test_persistent_sharded_topic_config_change_kafka(): conn = connection() new_num_shards = random.randint(2, 10) diff --git a/src/test/rgw/bucket_notification/tox.ini b/src/test/rgw/bucket_notification/tox.ini new file mode 100644 index 00000000000..954d5502d4c --- /dev/null +++ b/src/test/rgw/bucket_notification/tox.ini @@ -0,0 +1,9 @@ +[tox] +envlist = py +skipsdist = True + +[testenv] +deps = -rrequirements.txt +passenv = + BNTESTS_CONF +commands = pytest {posargs} -- 2.47.3