from .multisite import Zone, ZoneGroup, Credentials
from .conn import get_gateway_connection
-from .tools import assert_raises
class Config:
""" test configuration """
tenant_primary_conn.head_bucket(Bucket=bucket_name)
log.info("bucket exists in tenant namespace")
- e = assert_raises(ClientError, primary.s3_client.head_bucket, Bucket=bucket_name)
- assert e.response['Error']['Code'] == '404'
+ try:
+ primary.s3_client.head_bucket(Bucket=bucket_name)
+ assert False, "Expected 404 error - bucket should not exist in default namespace"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == '404'
log.info("bucket does not exist in default user namespace")
finally:
cmd = ['user', 'rm', '--tenant', tenant, '--uid', uid, '--purge-data']
zonegroup_meta_checkpoint(zonegroup)
- # try to read object from second zone (should fail)
- e = assert_raises(ClientError, zc2.s3_client.get_object, Bucket=bucket_name, Key=obj)
- assert e.response['Error']['Code'] == 'NoSuchKey'
-
- set_redirect_zone(z2, z1)
+ try:
+ # try to read object from second zone (should fail)
+ try:
+ zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
- # Should work now with redirect
- response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
- eq(data, response['Body'].read().decode('ascii'))
+ set_redirect_zone(z2, z1)
- # Test updates
- for x in ['a', 'b', 'c', 'd']:
- data = x * 512
- zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data)
- response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
- eq(data, response['Body'].read().decode('ascii'))
+ # verify redirect is configured by checking for 301 response
+ try:
+ zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+ assert False, "expected 301 error (boto3 doesn't follow redirects \
+ https://github.com/boto/botocore/issues/2571)"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == '301', \
+ f"Expected 301 but got {e.response['Error']['Code']}"
- set_sync_from_all(z2, True)
- set_redirect_zone(z2, None)
+ finally:
+ set_sync_from_all(z2, True)
+ set_redirect_zone(z2, None)
def test_zonegroup_remove():
zonegroup = realm.master_zonegroup()
zonegroup_conns.master_zone.iam_conn.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
for zone in zonegroup_conns.zones:
- e = assert_raises(zone.iam_conn.exceptions.DeleteConflictException,
- zone.iam_conn.delete_role, RoleName=role_name)
- assert e.response['Error']['Code'] == 'DeleteConflict'
- assert e.response['Error']['Message']
+ try:
+ zone.iam_conn.delete_role(RoleName=role_name)
+ assert False, "Expected DeleteConflictException"
+ except zone.iam_conn.exceptions.DeleteConflictException as e:
+ assert e.response['Error']['Code'] == 'DeleteConflict'
+ assert e.response['Error']['Message']
zonegroup_conns.master_zone.iam_conn.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
for zone in zonegroup_conns.zones:
log.info(f'checking if zone: {zone.name} does not have role: {role_name}')
- assert_raises(zone.iam_conn.exceptions.NoSuchEntityException,
- zone.iam_conn.get_role, RoleName=role_name)
+ try:
+ zone.iam_conn.get_role(RoleName=role_name)
+ assert False, "Expected NoSuchEntityException"
+ except zone.iam_conn.exceptions.NoSuchEntityException:
+ pass # expected
log.info(f'success, zone: {zone.name} does not have role: {role_name}')
def test_forwarded_put_bucket_policy_error():
policy = 'Invalid policy document'
try:
for zone in zonegroup_conns.rw_zones:
- e = assert_raises(ClientError, zone.s3_client.put_bucket_policy,
- Bucket=bucket, Policy=policy)
- eq(e.response['Error']['Code'], 'InvalidArgument')
- assert e.response['Error']['Message']
+ try:
+ zone.s3_client.put_bucket_policy(Bucket=bucket, Policy=policy)
+ assert False, "Expected InvalidArgument error"
+ except ClientError as e:
+ eq(e.response['Error']['Code'], 'InvalidArgument')
+ assert e.response['Error']['Message']
finally:
zonegroup_conns.rw_zones[0].delete_bucket(bucket)
realm_meta_checkpoint(realm)
assert_equal(response['LocationConstraint'], zg.name)
else:
# other zonegroup should fail with 400
- e = assert_raises(ClientError,
- z.s3_client.create_bucket,
- Bucket=bucket_name,
- CreateBucketConfiguration={'LocationConstraint': zg.name})
- assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+ try:
+ z.s3_client.create_bucket(
+ Bucket=bucket_name,
+ CreateBucketConfiguration={'LocationConstraint': zg.name})
+ assert False, "expected 400 error"
+ except ClientError as e:
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
def run_per_zonegroup(func):
def wrapper(*args, **kwargs):
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_normal_deletemarker():
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_alt_user_forbidden():
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_alt_user():
zonegroup_meta_checkpoint(zonegroup)
# create replication configuration
- e = assert_raises(ClientError,
- source.s3_client.put_bucket_replication,
- Bucket=source_bucket.name,
- ReplicationConfiguration={
- 'Role': '',
- 'Rules': [{
- 'ID': 'rule1',
- 'Status': 'Enabled',
- 'Destination': {
- 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
- }
- }]
- })
- assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+ try:
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ })
+ assert False, "Expected 400 error"
+ except ClientError as e:
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
@allow_bucket_replication
def test_bucket_replication_reject_objectlock_identical():
zonegroup_meta_checkpoint(zonegroup)
# create replication configuration
- e = assert_raises(ClientError,
- source.s3_client.put_bucket_replication,
- Bucket=source_bucket.name,
- ReplicationConfiguration={
- 'Role': '',
- 'Rules': [{
- 'ID': 'rule1',
- 'Status': 'Enabled',
- 'Destination': {
- 'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
- }
- }]
- })
- assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+ try:
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
+ }
+ }]
+ })
+ assert False, "Expected 400 error"
+ except ClientError as e:
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
@allow_bucket_replication
def test_bucket_replication_non_versioned_to_versioned():
zone_data_checkpoint(dest.zone, source.zone)
# check that object not exists in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_versioned_to_non_versioned():
zone_data_checkpoint(dest.zone, source.zone)
# check that object not exists in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_lock_enabled_to_lock_disabled():
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_lock_disabled_to_lock_enabled():
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket_name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket_name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@attr('sync_policy')
@attr('fails_with_rgw')
# delete bucket - should fail
log.debug('deleting bucket')
- e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
- assert e.response['Error']['Code'] == 'BucketNotEmpty'
+ try:
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+ assert False, "Expected BucketNotEmpty error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
assert check_all_buckets_exist(zcA, [bucketA.name])
assert check_all_buckets_exist(zcB, [bucketA.name])
zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
log.debug('deleting bucket')
- e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
- assert e.response['Error']['Code'] == 'BucketNotEmpty'
+ try:
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+ assert False, "Expected BucketNotEmpty error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
assert check_all_buckets_exist(zcA, [bucketA.name])
assert check_all_buckets_exist(zcB, [bucketA.name])
# delete bucket on zoneA. it should fail to delete because zoneC still has objnameB
log.debug('deleting bucket')
- e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
- assert e.response['Error']['Code'] == 'BucketNotEmpty'
+ try:
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+ assert False, "Expected BucketNotEmpty error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
assert check_all_buckets_exist(zcA, buckets)
assert check_all_buckets_exist(zcC, buckets)
# delete bucket on zoneA. it should fail to delete because zoneB still has objnameB
log.debug('deleting bucket')
- e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
- assert e.response['Error']['Code'] == 'BucketNotEmpty'
+ try:
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+ assert False, "Expected BucketNotEmpty error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
assert check_all_buckets_exist(zcA, buckets)
assert check_all_buckets_exist(zcB, buckets)
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_alt_user_deletemarker_forbidden():
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
@allow_bucket_replication
def test_bucket_replication_alt_user_deny_tagreplication():
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
# remove bucket policy so I can check for replication status
source.s3_client.delete_bucket_policy(Bucket=source_bucket.name)
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
# remove bucket policy so I can check for replication status
source.s3_client.delete_bucket_policy(Bucket=source_bucket.name)
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
# check the source object has replication status set to FAILED
# uncomment me in https://github.com/ceph/ceph/pull/62147
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
- e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
- assert e.response['Error']['Code'] == 'NoSuchKey'
+ try:
+ dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert False, "Expected NoSuchKey error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
# check the source object has replication status set to FAILED
realm_meta_checkpoint(realm)
# copy object returns 403
- e = assert_raises(ClientError, dest_zone.s3_client.copy_object,
- Bucket=dest_bucket.name,
- CopySource={'Bucket': source_bucket.name, 'Key': objname},
- Key=objname)
- assert e.response['Error']['Code'] == 'AccessDenied'
+ try:
+ dest_zone.s3_client.copy_object(
+ Bucket=dest_bucket.name,
+ CopySource={'Bucket': source_bucket.name, 'Key': objname},
+ Key=objname)
+ assert False, "Expected AccessDenied error"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'AccessDenied'