from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from random import randint
import hashlib
-# XXX this should be converted to use pytest
-from nose.plugins.attrib import attr
+import pytest
import boto3
import datetime
from cloudevents.http import from_http
import requests
from . import(
+ configfile,
get_config_host,
get_config_port,
get_config_zonegroup,
S3Key, \
MultipartUpload
-from nose import SkipTest
-from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
# configure logging for the tests module
class LogWrapper:
if key_found:
break
for record in record_list['Records']:
- assert_in('eTag', record['s3']['object'])
+ assert 'eTag' in record['s3']['object']
if record['s3']['bucket']['name'] == key.bucket.name and \
record['s3']['object']['key'] == key.name:
# Assertion Error needs to be fixed
- #assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
+ #assert key.etag[1:-1] == record['s3']['object']['eTag']
if etags:
- assert_in(key.etag[1:-1], etags)
+ assert key.etag[1:-1] in etags
if len(record['s3']['object']['metadata']) > 0:
for meta in record['s3']['object']['metadata']:
assert(meta['key'].startswith(META_PREFIX))
break
else:
for record in records['Records']:
- assert_in('eTag', record['s3']['object'])
+ assert 'eTag' in record['s3']['object']
if record['s3']['bucket']['name'] == key.bucket.name and \
record['s3']['object']['key'] == key.name:
- assert_equal(key.etag, record['s3']['object']['eTag'])
+ assert key.etag == record['s3']['object']['eTag']
if etags:
- assert_in(key.etag[1:-1], etags)
+ assert key.etag[1:-1] in etags
if len(record['s3']['object']['metadata']) > 0:
for meta in record['s3']['object']['metadata']:
assert(meta['key'].startswith(META_PREFIX))
err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
assert False, err
elif expected_sizes:
- assert_equal(object_size, expected_sizes.get(key.name))
+ assert object_size == expected_sizes.get(key.name)
if not len(records) == len(keys):
err = 'superfluous records are found'
if self.server.cloudevents:
event = from_http(self.headers, body)
record = json.loads(body)['Records'][0]
- assert_equal(event['specversion'], '1.0')
- assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
- assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
- assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
- assert_equal(event['datacontenttype'], 'application/json')
- assert_equal(event['subject'], record['s3']['object']['key'])
- assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
+ assert event['specversion'] == '1.0'
+ assert event['id'] == record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2']
+ assert event['source'] == 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name']
+ assert event['type'] == 'com.amazonaws.' + record['eventName']
+ assert event['datacontenttype'] == 'application/json'
+ assert event['subject'] == record['s3']['object']['key']
+ assert parser.parse(event['time']) == parser.parse(record['eventTime'])
log.info('HTTP Server received event: %s', str(body))
self.server.append(json.loads(body))
if self.headers.get('Expect') == '100-continue':
remaining_retries -= 1
if remaining_retries == 0:
raise Exception('kafka receiver on topic: %s did not receive test event in time', receiver.topic)
- assert_equal(len(events), 1)
- assert_in('test', events[0])
+ assert len(events) == 1
+ assert 'test' in events[0]
log.info('Kafka receiver on topic: %s tested ok', receiver.topic)
arn = f'arn:aws:iam::{account}:user/Superman'
_, rc = admin(cmd, get_config_cluster())
- assert_equal(rc, 0)
+ assert rc == 0
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
result = admin(['topic', 'list'], get_config_cluster())
else:
result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
try:
actual_len = len(parsed_result['topics'])
def get_stats_persistent_topic(topic_name, assert_entries_number=None):
result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
if assert_entries_number:
actual_number = parsed_result['Topic Stats']['Entries']
result = admin(['topic', 'get', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
if allow_failure:
return result
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
return parsed_result
else:
result = admin(['topic', 'rm', '--topic', topic_name, '--tenant', tenant], get_config_cluster())
if not allow_failure:
- assert_equal(result[1], 0)
+ assert result[1] == 0
return result[1]
result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
else:
result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
actual_len = len(parsed_result['notifications'])
if assert_len and assert_len != actual_len:
result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster())
else:
result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name, '--tenant', tenant], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Id'], notification_name)
+ assert parsed_result['Id'] == notification_name
return parsed_result
args.extend(['--tenant', tenant])
result = admin(args, get_config_cluster())
if not allow_failure:
- assert_equal(result[1], 0)
+ assert result[1] == 0
return result[1]
else:
command = '--disable-feature='+feature_name
result = admin(['zonegroup', 'modify', command], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
result = admin(['period', 'update'], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
result = admin(['period', 'commit'], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
def connect_random_user(tenant=''):
_, rc = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
else:
_, rc = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
- assert_equal(rc, 0)
+ assert rc == 0
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
is_secure=False, port=get_config_port(), host=get_config_host())
##############
-@attr('basic_test')
+@pytest.mark.basic_test
def test_topic():
""" test topics set/get/delete """
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf1.set_config()
- assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+ assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1'
endpoint_address = 'http://127.0.0.1:9001'
endpoint_args = 'push-endpoint='+endpoint_address
topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf2.set_config()
- assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+ assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2'
endpoint_address = 'http://127.0.0.1:9002'
endpoint_args = 'push-endpoint='+endpoint_address
topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf3.set_config()
- assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+ assert topic_arn == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3'
# get topic 3
result, status = topic_conf3.get_config()
- assert_equal(status, 200)
- assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
- assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+ assert status == 200
+ assert topic_arn == result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']
+ assert endpoint_address == result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']
# Note that endpoint args may be ordered differently in the result
# delete topic 1
result = topic_conf1.del_config()
- assert_equal(status, 200)
+ assert status == 200
# try to get a deleted topic
_, status = topic_conf1.get_config()
- assert_equal(status, 404)
+ assert status == 404
# get the remaining 2 topics
list_topics(2, tenant)
# delete topics
status = topic_conf2.del_config()
- assert_equal(status, 200)
+ assert status == 200
status = topic_conf3.del_config()
- assert_equal(status, 200)
+ assert status == 200
# get topic list, make sure it is empty
list_topics(0, tenant)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_topic_admin():
""" test topics set/get/delete """
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
- assert_equal(topic_arn1,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+ assert topic_arn1 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1'
endpoint_address = 'http://127.0.0.1:9001'
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
- assert_equal(topic_arn2,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+ assert topic_arn2 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2'
endpoint_address = 'http://127.0.0.1:9002'
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
topic_arn3 = topic_conf3.set_config()
- assert_equal(topic_arn3,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+ assert topic_arn3 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3'
# get topic 3 via commandline
parsed_result = get_topic(topic_name+'_3', tenant)
- assert_equal(parsed_result['arn'], topic_arn3)
+ assert parsed_result['arn'] == topic_arn3
matches = [tenant, UID_PREFIX]
- assert_true( all([x in parsed_result['owner'] for x in matches]))
- assert_equal(parsed_result['dest']['persistent_queue'],
- tenant + ":" + topic_name + '_3')
+ assert all([x in parsed_result['owner'] for x in matches])
+ assert parsed_result['dest']['persistent_queue'] == tenant + ":" + topic_name + '_3'
# recall CreateTopic and verify the owner and persistent_queue remain same.
topic_conf3 = PSTopicS3(conn, topic_name + '_3', zonegroup,
endpoint_args=endpoint_args)
topic_arn3 = topic_conf3.set_config()
- assert_equal(topic_arn3,
- 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+ assert topic_arn3 == 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3'
# get topic 3 via commandline
result = admin(
['topic', 'get', '--topic', topic_name + '_3', '--tenant', tenant],
get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(parsed_result['arn'], topic_arn3)
- assert_true(all([x in parsed_result['owner'] for x in matches]))
- assert_equal(parsed_result['dest']['persistent_queue'],
- tenant + ":" + topic_name + '_3')
+ assert parsed_result['arn'] == topic_arn3
+ assert all([x in parsed_result['owner'] for x in matches])
+ assert parsed_result['dest']['persistent_queue'] == tenant + ":" + topic_name + '_3'
# delete topic 3
remove_topic(topic_name + '_3', tenant)
# try to get a deleted topic
_, result = get_topic(topic_name + '_3', tenant, allow_failure=True)
print('"topic not found" error is expected')
- assert_equal(result, 2)
+ assert result == 2
# get the remaining 2 topics
list_topics(2, tenant)
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
- assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup + '::' + topic_name)
+ assert topic_arn == 'arn:aws:sns:' + zonegroup + '::' + topic_name
# create notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# get notification 1
if with_cli:
get_notification(bucket_name, notification_name+'_1')
else:
response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
- assert_equal(status/100, 2)
- assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+ assert status/100 == 2
+ assert response['NotificationConfiguration']['TopicConfiguration']['Topic'] == topic_arn
# list notification
if with_cli:
list_notifications(bucket_name, 3)
else:
result, status = s3_notification_conf.get_config()
- assert_equal(status, 200)
- assert_equal(len(result['TopicConfigurations']), 3)
+ assert status == 200
+ assert len(result['TopicConfigurations']) == 3
# delete notification 2
if with_cli:
remove_notification(bucket_name, notification_name + '_2')
else:
_, status = s3_notification_conf.del_config(notification=notification_name+'_2')
- assert_equal(status/100, 2)
+ assert status/100 == 2
# list notification
if with_cli:
list_notifications(bucket_name, 2)
else:
result, status = s3_notification_conf.get_config()
- assert_equal(status, 200)
- assert_equal(len(result['TopicConfigurations']), 2)
+ assert status == 200
+ assert len(result['TopicConfigurations']) == 2
# delete notifications
if with_cli:
remove_notification(bucket_name)
else:
_, status = s3_notification_conf.del_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# list notification, make sure it is empty
list_notifications(bucket_name, 0)
conn.delete_bucket(bucket_name)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_notification_configuration_admin():
""" test notification list/set/get/delete, with admin cli """
notification_configuration(True)
-@attr('not_implemented')
+@pytest.mark.not_implemented
def test_topic_with_secret():
""" test topics with secret set/get/delete """
- return SkipTest('This test is yet to be implemented')
+ pytest.skip('This test is yet to be implemented')
-@attr('basic_test')
+@pytest.mark.basic_test
def test_notification_configuration():
""" test notification set/get/deleter """
notification_configuration(False)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_notification_empty_config():
""" test notification set/get/delete with empty config """
hostname = get_ip()
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# get notifications on a bucket
response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
- assert_equal(status/100, 2)
- assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+ assert status/100 == 2
+ assert response['NotificationConfiguration']['TopicConfiguration']['Topic'] == topic_arn
# create notification again with empty configuration to check if it deletes or not
topic_conf_list = []
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# make sure that the notification is now deleted
response, status = s3_notification_conf.get_config()
try:
check = response['NotificationConfiguration']
except KeyError as e:
- assert_equal(status/100, 2)
+ assert status/100 == 2
else:
assert False
conn.delete_bucket(bucket_name)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_notification_filter_amqp():
""" test notification filter """
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
result, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
topic_conf_list = [{'Id': notification_name+'_4',
'TopicArn': topic_arn,
try:
s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf4.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
skip_notif4 = False
except Exception as error:
print('note: metadata filter is not supported by boto3 - skipping test')
# get all notifications
result, status = s3_notification_conf.get_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
for conf in result['TopicConfigurations']:
filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
if not skip_notif4:
result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
- assert_equal(status/100, 2)
+ assert status/100 == 2
filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
notif_id = event['Records'][0]['s3']['configurationId']
key_name = event['Records'][0]['s3']['object']['key']
awsRegion = event['Records'][0]['awsRegion']
- assert_equal(awsRegion, zonegroup)
+ assert awsRegion == zonegroup
bucket_arn = event['Records'][0]['s3']['bucket']['arn']
- assert_equal(bucket_arn, "arn:aws:s3:"+awsRegion+"::"+bucket_name)
+ assert bucket_arn == "arn:aws:s3:"+awsRegion+"::"+bucket_name
if notif_id == notification_name+'_1':
found_in1.append(key_name)
elif notif_id == notification_name+'_2':
else:
assert False, 'invalid notification: ' + notif_id
- assert_equal(set(found_in1), set(expected_in1))
- assert_equal(set(found_in2), set(expected_in2))
- assert_equal(set(found_in3), set(expected_in3))
+ assert set(found_in1) == set(expected_in1)
+ assert set(found_in2) == set(expected_in2)
+ assert set(found_in3) == set(expected_in3)
if not skip_notif4:
- assert_equal(set(found_in4), set(expected_in4))
+ assert set(found_in4) == set(expected_in4)
# cleanup
s3_notification_conf.del_config()
stop_amqp_receiver(receiver, task)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_notification_errors():
""" test notification set/get/delete """
conn = connection()
status = topic_conf.del_config()
# deleting an unknown notification is not considered an error
- assert_equal(status, 200)
+ assert status == 200
_, status = topic_conf.get_config()
- assert_equal(status, 404)
+ assert status == 404
# cleanup
# delete the bucket
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
elif endpoint_type == 'amqp':
# start amqp receiver
exchange = 'ex1'
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
elif endpoint_type == 'kafka':
# start kafka receiver
default_kafka_server_and_port = default_kafka_server + ':9092'
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
# create objects in the bucket
number_of_objects = 100
receiver.close(task)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_notification_amqp():
""" test pushing amqp notification """
conn = connection()
notification('amqp', conn)
-@attr('manual_test')
+@pytest.mark.manual_test
def test_notification_amqp_idleness_check():
""" test pushing amqp notification and checking for connection idleness """
hostname = get_ip()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket (async)
number_of_objects = 10
conn.delete_bucket(bucket_name)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_notification_kafka():
""" test pushing kafka notification """
conn = connection()
notification('kafka', conn)
-@attr('kafka_failover')
+@pytest.mark.kafka_failover
def test_notification_kafka_multiple_brokers_override():
""" test pushing kafka notification """
conn = connection()
notification('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server))
-@attr('kafka_failover')
+@pytest.mark.kafka_failover
def test_notification_kafka_multiple_brokers_append():
""" test pushing kafka notification """
conn = connection()
notification('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
-@attr('manual_test')
+@pytest.mark.manual_test
def test_1K_topics():
""" test creation of moe than 1K topics """
conn = connection()
log.error(f"error during cleanup: {e}")
-@attr('http_test')
+@pytest.mark.http_test
def test_notification_multi_delete():
""" test deletion of multiple keys """
hostname = get_ip()
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket
client_threads = []
http_server.close()
-@attr('http_test')
+@pytest.mark.http_test
def test_notification_http():
""" test pushing http notification """
conn = connection()
notification('http', conn)
-@attr('http_test')
+@pytest.mark.http_test
def test_notification_cloudevents():
""" test pushing cloudevents notification """
conn = connection()
-@attr('http_test')
+@pytest.mark.http_test
def test_opaque_data():
""" test that opaque id set in topic, is sent in notification """
hostname = get_ip()
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket
client_threads = []
print('total number of objects: ' + str(len(keys)))
events = http_server.get_and_reset_events()
for event in events:
- assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+ assert event['Records'][0]['opaqueData'] == opaque_data
# cleanup
for key in keys:
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
# create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket
obj_prefix = 'ooo'
assert len(no_keys) == 0, "lifecycle didn't delete the objects after 500 seconds"
wait_for_queue_to_drain(topic_name, http_port=port)
- assert_equal(len(no_keys), 0)
+ assert len(no_keys) == 0
event_keys = []
events = receiver.get_and_reset_events()
if not expected_abortion:
assert number_of_objects * 2 <= len(events)
for event in events:
- assert_in(event['Records'][0]['eventName'], record_events)
+ assert event['Records'][0]['eventName'] in record_events
event_keys.append(event['Records'][0]['s3']['object']['key'])
for key in keys:
key_found = False
return threading.Thread(target = set_contents_from_string, args=(key, content,))
-@attr('http_test')
+@pytest.mark.http_test
def test_lifecycle_http():
""" test that when object is deleted due to lifecycle policy, http endpoint """
rules_creator, ['LifecycleExpiration:Delete', 'ObjectLifecycle:Expiration:Current'])
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_lifecycle_kafka():
""" test that when object is deleted due to lifecycle policy, kafka endpoint """
print('Error: ' + str(e))
-@attr('http_test')
+@pytest.mark.http_test
def test_lifecycle_abort_mpu():
""" test that when a multipart upload is aborted by lifecycle policy, http endpoint """
hostname = get_ip()
proc = init_rabbitmq()
if proc is None:
- return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ pytest.skip('end2end amqp tests require rabbitmq-server installed')
else:
proc = None
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
objects_size = {}
# create objects in the bucket using PUT
clean_rabbitmq(proc)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_creation_triggers_amqp():
creation_triggers(external_endpoint_address="amqp://localhost:5672")
-@attr('amqp_ssl_test')
+@pytest.mark.amqp_ssl_test
def test_creation_triggers_external():
from distutils.util import strtobool
external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
verify_ssl=verify_ssl)
else:
- return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
+ pytest.skip("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
def generate_private_key(tempdir):
return CACERTFILE, CERTFILE, KEYFILE
-@attr('amqp_ssl_test')
+@pytest.mark.amqp_ssl_test
def test_creation_triggers_ssl():
import textwrap
del os.environ['RABBITMQ_CONFIG_FILE']
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_post_object_upload_amqp():
""" test that uploads object using HTTP POST """
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
("Content-Type" , "text/plain"),('file', ('bar'))])
# POST upload
r = requests.post(url, files=payload, verify=True)
- assert_equal(r.status_code, 204)
+ assert r.status_code == 204
# check amqp receiver
events = receiver1.get_and_reset_events()
- assert_equal(len(events), 1)
+ assert len(events) == 1
# cleanup
stop_amqp_receiver(receiver1, task1)
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
# create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket
client_threads = []
receiver.close(task)
-@attr('http_test')
+@pytest.mark.http_test
def test_multipart_http():
""" test http multipart object upload """
conn = connection()
multipart_endpoint_agnostic('http', conn)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_multipart_kafka():
""" test kafka multipart object upload """
conn = connection()
multipart_endpoint_agnostic('kafka', conn)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_multipart_ampq():
""" test ampq multipart object upload """
conn = connection()
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
# create topic
zonegroup = get_config_zonegroup()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
expected_keys = []
# create objects in the bucket
wait_for_queue_to_drain(topic_name, http_port=port)
# check amqp receiver
events = receiver.get_and_reset_events()
- assert_equal(len(events), len(expected_keys))
+ assert len(events) == len(expected_keys)
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
wait_for_queue_to_drain(topic_name, http_port=port)
# check endpoint receiver
events = receiver.get_and_reset_events()
- assert_equal(len(events), len(expected_keys))
+ assert len(events) == len(expected_keys)
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
conn.delete_bucket(bucket_name)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_metadata_filter_kafka():
""" test notification of filtering metadata, kafka endpoint """
conn = connection()
metadata_filter('kafka', conn)
-@attr('http_test')
+@pytest.mark.http_test
def test_metadata_filter_http():
""" test notification of filtering metadata, http endpoint """
conn = connection()
metadata_filter('http', conn)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_metadata_filter_ampq():
""" test notification of filtering metadata, ampq endpoint """
conn = connection()
metadata_filter('amqp', conn)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_metadata_amqp():
""" test notification of metadata """
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket
key_name = 'foo'
events = receiver.get_and_reset_events()
for event in events:
value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
- assert_equal(value[0], meta_value)
+ assert value[0] == meta_value
# delete objects
for key in bucket.list():
events = receiver.get_and_reset_events()
for event in events:
value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
- assert_equal(value[0], meta_value)
+ assert value[0] == meta_value
# cleanup
stop_amqp_receiver(receiver, task)
conn.delete_bucket(bucket_name)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_tags_amqp():
""" test notification of tags """
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
expected_keys = []
# create objects in the bucket with tags
key = event['Records'][0]['s3']['object']['key']
if (key == key_name1):
obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
- assert_equal(obj_tags, expected_tags1)
+ assert obj_tags == expected_tags1
event_count += 1
assert(key in expected_keys)
- assert_equal(event_count, len(expected_keys))
+ assert event_count == len(expected_keys)
# delete the objects
for key in bucket.list():
key = event['Records'][0]['s3']['object']['key']
if (key == key_name1):
obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
- assert_equal(obj_tags, expected_tags1)
+ assert obj_tags == expected_tags1
event_count += 1
assert(key in expected_keys)
# delete the bucket
conn.delete_bucket(bucket_name)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_versioning_amqp():
""" test notification of object versions """
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket
key_name = 'foo'
else:
print('version ok: '+version+' in: '+str(versions))
- assert_equal(num_of_versions, 3)
+ assert num_of_versions == 3
# cleanup
stop_amqp_receiver(receiver, task)
#conn.delete_bucket(bucket_name)
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_versioned_deletion_amqp():
""" test notification of deletion markers """
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket
key = bucket.new_key('foo')
# 2 key versions were deleted
# notified over the same topic via 2 notifications (1,3)
- assert_equal(delete_events, 2*2)
+ assert delete_events == 2*2
# 1 deletion marker was created
# notified over the same topic over 2 notifications (1,2)
- assert_equal(delete_marker_create_events, 1*2)
+ assert delete_marker_create_events == 1*2
# cleanup
delete_marker_key.delete()
conn.delete_bucket(bucket_name)
-@attr('manual_test')
+@pytest.mark.manual_test
def test_persistent_cleanup():
""" test reservation cleanup after gateway crash """
conn = connection()
}]
s3_notification_conf = PSNotificationS3(gw, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
client_threads = []
start_time = time.time()
if http_port:
check_http_server(http_port)
result = admin(cmd, get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
entries = parsed_result['Topic Stats']['Entries']
retries += 1
log.info('shards for %s has %d entries after %ds', topic_name, entries, time_diff)
if retries > 100:
log.warning('shards for %s still has %d entries after %ds', topic_name, entries, time_diff)
- assert_equal(entries, 0)
+ assert entries == 0
time.sleep(5)
time_diff = time.time() - start_time
log.info('waited for %ds for shards of %s to drain', time_diff, topic_name)
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
# create topic
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# topic stats
get_stats_persistent_topic(topic_name, 0)
receiver.close(task)
-@attr('http_test')
+@pytest.mark.http_test
def test_persistent_topic_stats_http():
""" test persistent topic stats, http endpoint """
conn = connection()
persistent_topic_stats(conn, 'http')
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_persistent_topic_stats_kafka():
""" test persistent topic stats, kafka endpoint """
conn = connection()
persistent_topic_stats(conn, 'kafka')
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_persistent_topic_stats_amqp():
""" test persistent topic stats, amqp endpoint """
conn = connection()
persistent_topic_stats(conn, 'amqp')
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_persistent_topic_dump():
""" test persistent topic dump """
conn = connection()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket (async)
number_of_objects = 20
# topic dump
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result), number_of_objects)
+ assert len(parsed_result) == number_of_objects
# delete objects from the bucket
client_threads = []
# topic stats
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result), 2*number_of_objects)
+ assert len(parsed_result) == 2*number_of_objects
# change the endpoint port
endpoint_address = 'kafka://' + host
wait_for_queue_to_drain(topic_name,)
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(len(parsed_result), 0)
+ assert len(parsed_result) == 0
# cleanup
s3_notification_conf.del_config()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# topic get
parsed_result = get_topic(topic_name)
parsed_result_dest = parsed_result["dest"]
for key, value in config_dict.items():
- assert_equal(parsed_result_dest[key], str(value))
+ assert parsed_result_dest[key] == str(value)
# topic stats
get_stats_persistent_topic(topic_name, 0)
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
log.info('deletion of %d objects took %f seconds', count, time_diff)
- assert_equal(count, number_of_objects)
+ assert count == number_of_objects
# topic stats
if time_diff > persistency_time:
return str_ret[:-1]
-@attr('basic_test')
+@pytest.mark.basic_test
def test_persistent_topic_configs_ttl():
""" test persistent topic configurations with time_to_live """
config_dict = {"time_to_live": 30, "max_retries": "None", "retry_sleep_duration": "None"}
persistent_topic_configs(persistency_time, config_dict)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_persistent_topic_configs_max_retries():
""" test persistent topic configurations with max_retries and retry_sleep_duration """
config_dict = {"time_to_live": "None", "max_retries": 10, "retry_sleep_duration": 1}
persistent_topic_configs(persistency_time, config_dict)
-@attr('manual_test')
+@pytest.mark.manual_test
def test_persistent_notificationback():
""" test pushing persistent notification pushback """
conn = connection()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket (async)
for j in range(100):
http_server.close()
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_notification_kafka_idle_behaviour():
""" test pushing kafka notification idle behaviour check """
conn = connection()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket (async)
number_of_objects = 10
stop_kafka_receiver(receiver, task)
-@attr('not_implemented')
+@pytest.mark.not_implemented
def test_persistent_gateways_recovery():
""" test gateway recovery of persistent notifications """
- return SkipTest('This test is yet to be implemented')
+ pytest.skip('This test is yet to be implemented')
-@attr('not_implemented')
+@pytest.mark.not_implemented
def test_persistent_multiple_gateways():
""" test pushing persistent notification via two gateways """
- return SkipTest('This test is yet to be implemented')
+ pytest.skip('This test is yet to be implemented')
def persistent_topic_multiple_endpoints(conn, endpoint_type):
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
# create two topics
topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
}]
s3_notification_conf1 = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf1.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
'Events': []
}]
s3_notification_conf2 = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf2.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
client_threads = []
start_time = time.time()
receiver.close(task)
-@attr('http_test')
+@pytest.mark.http_test
def test_persistent_multiple_endpoints_http():
""" test pushing persistent notification when one of the endpoints has error, http endpoint """
conn = connection()
persistent_topic_multiple_endpoints(conn, 'http')
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_persistent_multiple_endpoints_kafka():
""" test pushing persistent notification when one of the endpoints has error, kafka endpoint """
conn = connection()
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
# create topic
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket (async)
number_of_objects = 100
receiver.close(task)
-@attr('http_test')
+@pytest.mark.http_test
def test_persistent_notification_http():
""" test pushing persistent notification http """
conn = connection()
persistent_notification('http', conn)
-@attr('http_test')
+@pytest.mark.http_test
def test_persistent_notification_http_account():
""" test pushing persistent notification via http for account user """
user = UID_PREFIX + 'test'
_, rc = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster())
- assert_true(rc in [0, 17]) # EEXIST okay if we rerun
+ assert rc in [0, 17] # EEXIST okay if we rerun
conn, _ = another_user(user=user, account=account)
try:
admin(['user', 'rm', '--uid', user], get_config_cluster())
admin(['account', 'rm', '--account-id', account], get_config_cluster())
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_persistent_notification_amqp():
""" test pushing persistent notification amqp """
conn = connection()
persistent_notification('amqp', conn)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_persistent_notification_kafka():
""" test pushing persistent notification kafka """
conn = connection()
return ''.join(random.choice(letters) for i in range(length))
-@attr('amqp_test')
+@pytest.mark.amqp_test
def test_persistent_notification_large_amqp():
""" test pushing persistent notification of large notifications """
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket (async)
number_of_objects = 100
stop_amqp_receiver(receiver, task)
-@attr('not_implemented')
+@pytest.mark.not_implemented
def test_topic_update():
""" test updating topic associated with a notification"""
- return SkipTest('This test is yet to be implemented')
+ pytest.skip('This test is yet to be implemented')
-@attr('not_implemented')
+@pytest.mark.not_implemented
def test_notification_update():
""" test updating the topic of a notification"""
- return SkipTest('This test is yet to be implemented')
+ pytest.skip('This test is yet to be implemented')
-@attr('not_implemented')
+@pytest.mark.not_implemented
def test_multiple_topics_notification():
""" test notification creation with multiple topics"""
- return SkipTest('This test is yet to be implemented')
+ pytest.skip('This test is yet to be implemented')
-@attr('basic_test')
+@pytest.mark.basic_test
def test_list_topics_migration():
""" test list topics on migration"""
if get_config_cluster() == 'noname':
- return SkipTest('realm is needed for migration test')
+ pytest.skip('realm is needed for migration test')
# Initialize connections and configurations
conn1 = connection()
try:
# Verify no tenant topics
res, status = topic_conf.get_list()
- assert_equal(status // 100, 2)
+ assert status // 100 == 2
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
- assert_equal(len(member), 6)
+ assert len(member) == 6
# Verify tenant topics
res, status = tenant_topic_conf.get_list()
- assert_equal(status // 100, 2)
+ assert status // 100 == 2
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
- assert_equal(len(member), 3)
+ assert len(member) == 3
finally:
# Cleanup created topics
topic_conf.del_config(topic_arn1)
tenant_topic_conf.del_config(tenant_topic_arn3)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_list_topics():
""" test list topics"""
try:
# Verify no tenant topics
res, status = topic_conf.get_list()
- assert_equal(status // 100, 2)
+ assert status // 100 == 2
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else [] # version 2
- assert_equal(len(member), 3)
+ assert len(member) == 3
# Verify topics for tenant
res, status = tenant_topic_conf.get_list()
- assert_equal(status // 100, 2)
+ assert status // 100 == 2
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
- assert_equal(len(member), 2)
+ assert len(member) == 2
finally:
# Cleanup created topics
topic_conf.del_config(topic_arn1)
tenant_topic_conf.del_config(tenant_topic_arn1)
tenant_topic_conf.del_config(tenant_topic_arn2)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_list_topics_v1():
""" test list topics on v1"""
if get_config_cluster() == 'noname':
- return SkipTest('realm is needed')
+ pytest.skip('realm is needed')
# Initialize connections and configurations
conn1 = connection()
try:
# Verify no tenant topics
res, status = topic_conf.get_list()
- assert_equal(status // 100, 2)
+ assert status // 100 == 2
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
- assert_equal(len(member), 3)
+ assert len(member) == 3
# Verify tenant topics
res, status = tenant_topic_conf.get_list()
- assert_equal(status // 100, 2)
+ assert status // 100 == 2
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
- assert_equal(len(member), 2)
+ assert len(member) == 2
finally:
# Cleanup created topics
topic_conf.del_config(topic_arn1)
assert False, "'AuthorizationError' error is expected"
except ClientError as err:
if 'Error' in err.response:
- assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+ assert err.response['Error']['Code'] == 'AuthorizationError'
else:
- assert_equal(err.response['Code'], 'AuthorizationError')
+ assert err.response['Code'] == 'AuthorizationError'
except Exception as err:
print('unexpected error type: '+type(err).__name__)
assert False, "'AuthorizationError' error is expected"
# 2nd user tries to fetch the topic
_, status = topic_conf2.get_config(topic_arn=topic_arn)
- assert_equal(status, 403)
+ assert status == 403
try:
# 2nd user tries to set the attribute
assert False, "'AuthorizationError' error is expected"
except ClientError as err:
if 'Error' in err.response:
- assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+ assert err.response['Error']['Code'] == 'AuthorizationError'
else:
- assert_equal(err.response['Code'], 'AuthorizationError')
+ assert err.response['Code'] == 'AuthorizationError'
except Exception as err:
print('unexpected error type: '+type(err).__name__)
assert False, "'AuthorizationError' error is expected"
assert False, "'AccessDenied' error is expected"
except ClientError as err:
if 'Error' in err.response:
- assert_equal(err.response['Error']['Code'], 'AccessDenied')
+ assert err.response['Error']['Code'] == 'AccessDenied'
else:
- assert_equal(err.response['Code'], 'AccessDenied')
+ assert err.response['Code'] == 'AccessDenied'
except Exception as err:
print('unexpected error type: '+type(err).__name__)
assert False, "'AuthorizationError' error is expected"
assert False, "'AuthorizationError' error is expected"
except ClientError as err:
if 'Error' in err.response:
- assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+ assert err.response['Error']['Code'] == 'AuthorizationError'
else:
- assert_equal(err.response['Code'], 'AuthorizationError')
+ assert err.response['Code'] == 'AuthorizationError'
except Exception as err:
print('unexpected error type: '+type(err).__name__)
assert False, "'AuthorizationError' error is expected"
topic_arn = topic_conf.set_config()
# 2nd user try to fetch topic again
_, status = topic_conf2.get_config(topic_arn=topic_arn)
- assert_equal(status, 200)
+ assert status == 200
# 2nd user tries to set the attribute again
status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
- assert_equal(status, 200)
+ assert status == 200
# 2nd user tries to publish notification again
s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
_, status = s3_notification_conf2.set_config()
- assert_equal(status, 200)
+ assert status == 200
# 2nd user tries to delete the topic again
status = topic_conf2.del_config(topic_arn=topic_arn)
- assert_equal(status, 200)
+ assert status == 200
# cleanup
s3_notification_conf2.del_config()
conn2.delete_bucket(bucket_name)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_topic_permissions_same_tenant():
topic_permissions()
-@attr('basic_test')
+@pytest.mark.basic_test
def test_topic_permissions_cross_tenant():
topic_permissions(another_tenant="boom")
-@attr('basic_test')
+@pytest.mark.basic_test
def test_topic_no_permissions():
""" test topic set/get/delete permissions """
conn1 = connection()
assert False, "'AuthorizationError' error is expected"
except ClientError as err:
if 'Error' in err.response:
- assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+ assert err.response['Error']['Code'] == 'AuthorizationError'
else:
- assert_equal(err.response['Code'], 'AuthorizationError')
+ assert err.response['Code'] == 'AuthorizationError'
except Exception as err:
print('unexpected error type: '+type(err).__name__)
# 2nd user tries to fetch the topic
_, status = topic_conf2.get_config(topic_arn=topic_arn)
- assert_equal(status, 403)
+ assert status == 403
try:
# 2nd user tries to set the attribute
assert False, "'AuthorizationError' error is expected"
except ClientError as err:
if 'Error' in err.response:
- assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+ assert err.response['Error']['Code'] == 'AuthorizationError'
else:
- assert_equal(err.response['Code'], 'AuthorizationError')
+ assert err.response['Code'] == 'AuthorizationError'
except Exception as err:
print('unexpected error type: '+type(err).__name__)
}]
s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
_, status = s3_notification_conf2.set_config()
- assert_equal(status, 200)
+ assert status == 200
try:
# 2nd user tries to delete the topic
assert False, "'AuthorizationError' error is expected"
except ClientError as err:
if 'Error' in err.response:
- assert_equal(err.response['Error']['Code'], 'AuthorizationError')
+ assert err.response['Error']['Code'] == 'AuthorizationError'
else:
- assert_equal(err.response['Code'], 'AuthorizationError')
+ assert err.response['Code'] == 'AuthorizationError'
except Exception as err:
print('unexpected error type: '+type(err).__name__)
stop_kafka_receiver(receiver, task)
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
def test_notification_kafka_security_ssl():
kafka_security('SSL')
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
def test_notification_kafka_security_ssl_sasl():
kafka_security('SASL_SSL')
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
def test_notification_kafka_security_ssl_sasl_attrs():
kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
def test_notification_kafka_security_sasl():
kafka_security('SASL_PLAINTEXT')
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
def test_notification_kafka_security_ssl_sasl_scram():
kafka_security('SASL_SSL', mechanism='SCRAM-SHA-256')
-@attr('kafka_security_test')
+@pytest.mark.kafka_security_test
def test_notification_kafka_security_sasl_scram():
kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
-@attr('http_test')
+@pytest.mark.http_test
def test_persistent_reload():
""" do a realm reload while we send notifications """
if get_config_cluster() == 'noname':
- return SkipTest('realm is needed for reload test')
+ pytest.skip('realm is needed for reload test')
conn = connection()
zonegroup = get_config_zonegroup()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# topic stats
get_stats_persistent_topic(topic_name1, 0)
def persistent_data_path_v2_migration(conn, endpoint_type):
""" test data path v2 persistent migration """
if get_config_cluster() == 'noname':
- return SkipTest('realm is needed for migration test')
+ pytest.skip('realm is needed for migration test')
zonegroup = get_config_zonegroup()
# disable v2 notification
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# topic stats
get_stats_persistent_topic(topic_name, 0)
receiver.close(task)
-@attr('http_test')
+@pytest.mark.http_test
def persistent_data_path_v2_migration_http():
""" test data path v2 persistent migration, http endpoint """
conn = connection()
persistent_data_path_v2_migration(conn, 'http')
-@attr('kafka_test')
+@pytest.mark.kafka_test
def persistent_data_path_v2_migration_kafka():
""" test data path v2 persistent migration, kafka endpoint """
conn = connection()
persistent_data_path_v2_migration(conn, 'kafka')
-@attr('http_test')
+@pytest.mark.http_test
def test_data_path_v2_migration():
""" test data path v2 migration """
if get_config_cluster() == 'noname':
- return SkipTest('realm is needed for migration test')
+ pytest.skip('realm is needed for migration test')
conn = connection()
zonegroup = get_config_zonegroup()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
# create objects in the bucket (async)
number_of_objects = 10
http_server.close()
-@attr('basic_test')
+@pytest.mark.basic_test
def test_data_path_v2_large_migration():
""" test data path v2 large migration """
if get_config_cluster() == 'noname':
- return SkipTest('realm is needed for migration test')
+ pytest.skip('realm is needed for migration test')
conn = connection()
connections_list = []
connections_list.append(conn)
s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
s3_notification_conf_list.append(s3_notification_conf)
response, status = s3_notification_conf.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# create topic to poll on
polling_topics_conf = []
conn.delete_bucket(bucket.name)
-@attr('basic_test')
+@pytest.mark.basic_test
def test_data_path_v2_mixed_migration():
""" test data path v2 mixed migration """
if get_config_cluster() == 'noname':
- return SkipTest('realm is needed for migration test')
+ pytest.skip('realm is needed for migration test')
conn = connection()
connections_list = []
connections_list.append(conn)
s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
s3_notification_conf_list.append(s3_notification_conf)
response, status = s3_notification_conf.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# disable v2 notification
zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
s3_notification_conf_list.append(s3_notification_conf)
response, status = s3_notification_conf.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# create topic to poll on
polling_topics_conf = []
conn.delete_bucket(bucket.name)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_notification_caching():
""" test notification caching """
conn = connection()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# create objects in the bucket (async)
number_of_objects = 10
# topic stats
result = admin(['topic', 'stats', '--topic', topic_name],
get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+ assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects
# remove the port and update the topic, so its pointing to correct endpoint.
endpoint_address = 'kafka://' + default_kafka_server
receiver.close(task)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_connection_caching():
""" test connection caching """
conn = connection()
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# create objects in the bucket (async)
number_of_objects = 10
# topic stats
result = admin(['topic', 'stats', '--topic', topic_name_1],
get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+ assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects
# remove the ssl from topic1 and update the topic.
endpoint_address = 'kafka://' + default_kafka_server
# topic stats for 2nd topic will still reflect entries
result = admin(['topic', 'stats', '--topic', topic_name_2],
get_config_cluster())
- assert_equal(result[1], 0)
+ assert result[1] == 0
parsed_result = json.loads(result[0])
- assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+ assert parsed_result['Topic Stats']['Entries'] == 2 * number_of_objects
# cleanup
s3_notification_conf.del_config()
receiver_2.close(task_2)
-@attr("http_test")
+@pytest.mark.http_test
def test_topic_migration_to_an_account():
"""test the topic migration procedure described at
https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account
user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
)
topic_arn = topic_conf.set_config()
- assert_equal(topic_arn, expected_topic_arn)
+ assert topic_arn == expected_topic_arn
log.info(
f"{user1_conn.access_key} created the topic {topic_arn} with args {endpoint_args}"
)
user2_conn, user2_bucket_name, topic_conf_list
)
_, status = s3_notification_conf1.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
_, status = s3_notification_conf2.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# verify both buckets are subscribed to the topic
rgw_topic_entry = [
t for t in list_topics() if t["name"] == topic_name
]
- assert_equal(len(rgw_topic_entry), 1)
+ assert len(rgw_topic_entry) == 1
subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
- assert_equal(len(subscribed_buckets), 2)
- assert_in(user1_bucket_name, subscribed_buckets)
- assert_in(user2_bucket_name, subscribed_buckets)
+ assert len(subscribed_buckets) == 2
+ assert user1_bucket_name in subscribed_buckets
+ assert user2_bucket_name in subscribed_buckets
log.info(
"buckets {user1_bucket_name} and {user2_bucket_name} are subscribed to {topic_arn}"
)
user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
)
account_topic_arn = topic_conf.set_config()
- assert_equal(account_topic_arn, expected_topic_arn)
+ assert account_topic_arn == expected_topic_arn
log.info(
f"{user1_conn.access_key} created the account topic {account_topic_arn} with args {endpoint_args}"
)
user1_conn, user1_bucket_name, topic_conf_list
)
_, status = s3_notification_conf1.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# verify subscriptions to the account topic
rgw_topic_entry = [
t
for t in list_topics(tenant=account_id)
if t["name"] == topic_name
]
- assert_equal(len(rgw_topic_entry), 1)
+ assert len(rgw_topic_entry) == 1
subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
- assert_equal(len(subscribed_buckets), 1)
- assert_in(user1_bucket_name, subscribed_buckets)
- assert_not_in(user2_bucket_name, subscribed_buckets)
+ assert len(subscribed_buckets) == 1
+ assert user1_bucket_name in subscribed_buckets
+ assert user2_bucket_name not in subscribed_buckets
# verify bucket notifications while 2 test buckets are in the mixed mode
notification_list = list_notifications(user1_bucket_name, assert_len=1)
- assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+ assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn
notification_list = list_notifications(user2_bucket_name, assert_len=1)
- assert_equal(notification_list["notifications"][0]["TopicArn"], topic_arn)
+ assert notification_list["notifications"][0]["TopicArn"] == topic_arn
# verify both topics are functional at the same time with no duplicate notifications
user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
user2_conn, user2_bucket_name, topic_conf_list
)
_, status = s3_notification_conf2.set_config()
- assert_equal(status / 100, 2)
+ assert status / 100 == 2
# remove old topic
# note that, although account topic has the same name, it has to be scoped by account/tenant id to be removed
# so below command will only remove the old topic
_, rc = admin(["topic", "rm", "--topic", topic_name], get_config_cluster())
- assert_equal(rc, 0)
+ assert rc == 0
# now verify account topic serves both buckets
rgw_topic_entry = [
for t in list_topics(tenant=account_id)
if t["name"] == topic_name
]
- assert_equal(len(rgw_topic_entry), 1)
+ assert len(rgw_topic_entry) == 1
subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
- assert_equal(len(subscribed_buckets), 2)
- assert_in(user1_bucket_name, subscribed_buckets)
- assert_in(user2_bucket_name, subscribed_buckets)
+ assert len(subscribed_buckets) == 2
+ assert user1_bucket_name in subscribed_buckets
+ assert user2_bucket_name in subscribed_buckets
# verify bucket notifications after 2 test buckets are updated to use the account topic
notification_list = list_notifications(user1_bucket_name, assert_len=1)
- assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+ assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn
notification_list = list_notifications(user2_bucket_name, assert_len=1)
- assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+ assert notification_list["notifications"][0]["TopicArn"] == account_topic_arn
# finally, make sure that notifications are going thru via the new account topic
user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
else:
- return SkipTest('Unknown endpoint type: ' + endpoint_type)
+ pytest.skip('Unknown endpoint type: ' + endpoint_type)
zonegroup = get_config_zonegroup()
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ assert status/100 == 2
## create objects in the bucket (async)
expected_keys = []
print('wait for the messages...')
wait_for_queue_to_drain(topic_name)
events = receiver.get_and_reset_events()
- assert_equal(len(events), len(expected_keys))
+ assert len(events) == len(expected_keys)
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
wait_for_queue_to_drain(topic_name)
# check endpoint receiver
events = receiver.get_and_reset_events()
- assert_equal(len(events), len(expected_keys))
+ assert len(events) == len(expected_keys)
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
-@attr('http_test')
+@pytest.mark.http_test
def test_backward_compatibility_persistent_sharded_topic_http():
conn = connection()
persistent_notification_shard_config_change('http', conn, new_num_shards=11, old_num_shards=1)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_backward_compatibility_persistent_sharded_topic_kafka():
conn = connection()
persistent_notification_shard_config_change('kafka', conn, new_num_shards=11, old_num_shards=1)
-@attr('http_test')
+@pytest.mark.http_test
def test_persistent_sharded_topic_config_change_http():
conn = connection()
new_num_shards = random.randint(2, 10)
default_num_shards = 11
persistent_notification_shard_config_change('http', conn, new_num_shards, default_num_shards)
-@attr('kafka_test')
+@pytest.mark.kafka_test
def test_persistent_sharded_topic_config_change_kafka():
conn = connection()
new_num_shards = random.randint(2, 10)