From: Shilpa Jagannath Date: Tue, 3 Feb 2026 05:15:46 +0000 (-0500) Subject: qa/multisite: use boto3's ClientError in place of assert_raises from tools.py. X-Git-Tag: testing/wip-vshankar-testing-20260212.053105~5^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3d08847abc44e2c09e1ca7fa8a2fbadb32332303;p=ceph-ci.git qa/multisite: use boto3's ClientError in place of assert_raises from tools.py. tools.py should eventually be removed once all other dependent tests move to boto3 Signed-off-by: Shilpa Jagannath --- diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 422bf3365a2..29e142ddc5e 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -25,7 +25,6 @@ from nose.plugins.skip import SkipTest from .multisite import Zone, ZoneGroup, Credentials from .conn import get_gateway_connection -from .tools import assert_raises class Config: """ test configuration """ @@ -660,8 +659,11 @@ def test_bucket_create_with_tenant(): tenant_primary_conn.head_bucket(Bucket=bucket_name) log.info("bucket exists in tenant namespace") - e = assert_raises(ClientError, primary.s3_client.head_bucket, Bucket=bucket_name) - assert e.response['Error']['Code'] == '404' + try: + primary.s3_client.head_bucket(Bucket=bucket_name) + assert False, "Expected 404 error - bucket should not exist in default namespace" + except ClientError as e: + assert e.response['Error']['Code'] == '404' log.info("bucket does not exist in default user namespace") finally: cmd = ['user', 'rm', '--tenant', tenant, '--uid', uid, '--purge-data'] @@ -1433,25 +1435,28 @@ def test_multi_zone_redirect(): zonegroup_meta_checkpoint(zonegroup) - # try to read object from second zone (should fail) - e = assert_raises(ClientError, zc2.s3_client.get_object, Bucket=bucket_name, Key=obj) - assert e.response['Error']['Code'] == 'NoSuchKey' - - set_redirect_zone(z2, z1) + try: + # try to read object from second zone (should fail) + try: + zc2.s3_client.get_object(Bucket=bucket_name, Key=obj) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' - # Should work now with redirect - response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj) - eq(data, response['Body'].read().decode('ascii')) + set_redirect_zone(z2, z1) - # Test updates - for x in ['a', 'b', 'c', 'd']: - data = x * 512 - zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data) - response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj) - eq(data, response['Body'].read().decode('ascii')) + # verify redirect is configured by checking for 301 response + try: + zc2.s3_client.get_object(Bucket=bucket_name, Key=obj) + assert False, "expected 301 error (boto3 doesn't follow redirects \ + https://github.com/boto/botocore/issues/2571)" + except ClientError as e: + assert e.response['Error']['Code'] == '301', \ + f"Expected 301 but got {e.response['Error']['Code']}" - set_sync_from_all(z2, True) - set_redirect_zone(z2, None) + finally: + set_sync_from_all(z2, True) + set_redirect_zone(z2, None) def test_zonegroup_remove(): zonegroup = realm.master_zonegroup() @@ -2311,10 +2316,12 @@ def test_role_delete_sync(): zonegroup_conns.master_zone.iam_conn.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for zone in zonegroup_conns.zones: - e = assert_raises(zone.iam_conn.exceptions.DeleteConflictException, - zone.iam_conn.delete_role, RoleName=role_name) - assert e.response['Error']['Code'] == 'DeleteConflict' - assert e.response['Error']['Message'] + try: + zone.iam_conn.delete_role(RoleName=role_name) + assert False, "Expected DeleteConflictException" + except zone.iam_conn.exceptions.DeleteConflictException as e: + assert e.response['Error']['Code'] == 'DeleteConflict' + assert e.response['Error']['Message'] zonegroup_conns.master_zone.iam_conn.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn) @@ -2324,8 +2331,11 @@ def test_role_delete_sync(): for zone in zonegroup_conns.zones: log.info(f'checking if zone: {zone.name} does not have role: {role_name}') - assert_raises(zone.iam_conn.exceptions.NoSuchEntityException, - zone.iam_conn.get_role, RoleName=role_name) + try: + zone.iam_conn.get_role(RoleName=role_name) + assert False, "Expected NoSuchEntityException" + except zone.iam_conn.exceptions.NoSuchEntityException: + pass # expected log.info(f'success, zone: {zone.name} does not have role: {role_name}') def test_forwarded_put_bucket_policy_error(): @@ -2342,10 +2352,12 @@ def test_forwarded_put_bucket_policy_error(): policy = 'Invalid policy document' try: for zone in zonegroup_conns.rw_zones: - e = assert_raises(ClientError, zone.s3_client.put_bucket_policy, - Bucket=bucket, Policy=policy) - eq(e.response['Error']['Code'], 'InvalidArgument') - assert e.response['Error']['Message'] + try: + zone.s3_client.put_bucket_policy(Bucket=bucket, Policy=policy) + assert False, "Expected InvalidArgument error" + except ClientError as e: + eq(e.response['Error']['Code'], 'InvalidArgument') + assert e.response['Error']['Message'] finally: zonegroup_conns.rw_zones[0].delete_bucket(bucket) realm_meta_checkpoint(realm) @@ -4053,11 +4065,13 @@ def test_bucket_create_location_constraint(): assert_equal(response['LocationConstraint'], zg.name) else: # other zonegroup should fail with 400 - e = assert_raises(ClientError, - z.s3_client.create_bucket, - Bucket=bucket_name, - CreateBucketConfiguration={'LocationConstraint': zg.name}) - assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 + try: + z.s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={'LocationConstraint': zg.name}) + assert False, "expected 400 error" + except ClientError as e: + assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 def test_timestamp_based_epochs(): """ @@ -4326,8 +4340,11 @@ def test_bucket_replication_normal_delete(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_normal_deletemarker(): @@ -4383,8 +4400,11 @@ def test_bucket_replication_normal_deletemarker(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_alt_user_forbidden(): @@ -4421,8 +4441,11 @@ def test_bucket_replication_alt_user_forbidden(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_alt_user(): @@ -4496,20 +4519,22 @@ def test_bucket_replication_reject_versioning_identical(): zonegroup_meta_checkpoint(zonegroup) # create replication configuration - e = assert_raises(ClientError, - source.s3_client.put_bucket_replication, - Bucket=source_bucket.name, - ReplicationConfiguration={ - 'Role': '', - 'Rules': [{ - 'ID': 'rule1', - 'Status': 'Enabled', - 'Destination': { - 'Bucket': f'arn:aws:s3:::{dest_bucket.name}', - } - }] - }) - assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 + try: + source.s3_client.put_bucket_replication( + Bucket=source_bucket.name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [{ + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket.name}', + } + }] + }) + assert False, "Expected 400 error" + except ClientError as e: + assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 @allow_bucket_replication def test_bucket_replication_reject_objectlock_identical(): @@ -4525,20 +4550,22 @@ def test_bucket_replication_reject_objectlock_identical(): zonegroup_meta_checkpoint(zonegroup) # create replication configuration - e = assert_raises(ClientError, - source.s3_client.put_bucket_replication, - Bucket=source_bucket.name, - ReplicationConfiguration={ - 'Role': '', - 'Rules': [{ - 'ID': 'rule1', - 'Status': 'Enabled', - 'Destination': { - 'Bucket': f'arn:aws:s3:::{dest_bucket_name}', - } - }] - }) - assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 + try: + source.s3_client.put_bucket_replication( + Bucket=source_bucket.name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [{ + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket_name}', + } + }] + }) + assert False, "Expected 400 error" + except ClientError as e: + assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 @allow_bucket_replication def test_bucket_replication_non_versioned_to_versioned(): @@ -4584,8 +4611,11 @@ def test_bucket_replication_non_versioned_to_versioned(): zone_data_checkpoint(dest.zone, source.zone) # check that object not exists in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_versioned_to_non_versioned(): @@ -4631,8 +4661,11 @@ def test_bucket_replication_versioned_to_non_versioned(): zone_data_checkpoint(dest.zone, source.zone) # check that object not exists in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_lock_enabled_to_lock_disabled(): @@ -4694,8 +4727,11 @@ def test_bucket_replication_lock_enabled_to_lock_disabled(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_lock_disabled_to_lock_enabled(): @@ -4757,8 +4793,11 @@ def test_bucket_replication_lock_disabled_to_lock_enabled(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket_name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket_name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @attr('sync_policy') @attr('fails_with_rgw') @@ -4807,8 +4846,11 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional(): # delete bucket - should fail log.debug('deleting bucket') - e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) - assert e.response['Error']['Code'] == 'BucketNotEmpty' + try: + zcA.s3_client.delete_bucket(Bucket=bucketA.name) + assert False, "Expected BucketNotEmpty error" + except ClientError as e: + assert e.response['Error']['Code'] == 'BucketNotEmpty' assert check_all_buckets_exist(zcA, [bucketA.name]) assert check_all_buckets_exist(zcB, [bucketA.name]) @@ -4879,8 +4921,11 @@ def test_bucket_delete_with_bucket_sync_policy_directional(): zone_bucket_checkpoint(zoneB, zoneA, bucketA.name) log.debug('deleting bucket') - e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) - assert e.response['Error']['Code'] == 'BucketNotEmpty' + try: + zcA.s3_client.delete_bucket(Bucket=bucketA.name) + assert False, "Expected BucketNotEmpty error" + except ClientError as e: + assert e.response['Error']['Code'] == 'BucketNotEmpty' assert check_all_buckets_exist(zcA, [bucketA.name]) assert check_all_buckets_exist(zcB, [bucketA.name]) @@ -5105,8 +5150,11 @@ def test_delete_bucket_with_zone_opt_out(): # delete bucket on zoneA. it should fail to delete because zoneC still has objnameB log.debug('deleting bucket') - e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) - assert e.response['Error']['Code'] == 'BucketNotEmpty' + try: + zcA.s3_client.delete_bucket(Bucket=bucketA.name) + assert False, "Expected BucketNotEmpty error" + except ClientError as e: + assert e.response['Error']['Code'] == 'BucketNotEmpty' assert check_all_buckets_exist(zcA, buckets) assert check_all_buckets_exist(zcC, buckets) @@ -5197,8 +5245,11 @@ def test_bucket_delete_with_sync_policy_object_prefix(): # delete bucket on zoneA. it should fail to delete because zoneB still has objnameB log.debug('deleting bucket') - e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) - assert e.response['Error']['Code'] == 'BucketNotEmpty' + try: + zcA.s3_client.delete_bucket(Bucket=bucketA.name) + assert False, "Expected BucketNotEmpty error" + except ClientError as e: + assert e.response['Error']['Code'] == 'BucketNotEmpty' assert check_all_buckets_exist(zcA, buckets) assert check_all_buckets_exist(zcB, buckets) @@ -5363,8 +5414,11 @@ def test_bucket_replication_alt_user_delete(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_alt_user_deletemarker_forbidden(): @@ -5509,8 +5563,11 @@ def test_bucket_replication_alt_user_deletemarker(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' @allow_bucket_replication def test_bucket_replication_alt_user_deny_tagreplication(): @@ -5623,8 +5680,11 @@ def test_bucket_replication_source_forbidden(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' # remove bucket policy so I can check for replication status source.s3_client.delete_bucket_policy(Bucket=source_bucket.name) @@ -5687,8 +5747,11 @@ def test_bucket_replication_source_forbidden_versioned(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' # remove bucket policy so I can check for replication status source.s3_client.delete_bucket_policy(Bucket=source_bucket.name) @@ -5849,8 +5912,11 @@ def test_bucket_replication_source_forbidden_objretention(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' # check the source object has replication status set to FAILED # uncomment me in https://github.com/ceph/ceph/pull/62147 @@ -5901,8 +5967,11 @@ def test_bucket_replication_source_forbidden_legalhold(): zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket - e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) - assert e.response['Error']['Code'] == 'NoSuchKey' + try: + dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert False, "Expected NoSuchKey error" + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' # check the source object has replication status set to FAILED @@ -6035,11 +6104,14 @@ def test_copy_obj_perm_check_between_zonegroups(zonegroup): realm_meta_checkpoint(realm) # copy object returns 403 - e = assert_raises(ClientError, dest_zone.s3_client.copy_object, - Bucket=dest_bucket.name, - CopySource={'Bucket': source_bucket.name, 'Key': objname}, - Key=objname) - assert e.response['Error']['Code'] == 'AccessDenied' + try: + dest_zone.s3_client.copy_object( + Bucket=dest_bucket.name, + CopySource={'Bucket': source_bucket.name, 'Key': objname}, + Key=objname) + assert False, "Expected AccessDenied error" + except ClientError as e: + assert e.response['Error']['Code'] == 'AccessDenied' def test_object_lock_sync(): diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py index 5343353af3f..73e04e7805e 100644 --- a/src/test/rgw/rgw_multi/zone_rados.py +++ b/src/test/rgw/rgw_multi/zone_rados.py @@ -72,7 +72,7 @@ class RadosZone(Zone): return self.s3_resource.Bucket(name) except ClientError as e: error_code = e.response['Error']['Code'] - if error_codein ['404', 'NoSuchBucket']: + if error_code in ['404', 'NoSuchBucket']: return None raise