]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/multisite: use boto3's ClientError in place of assert_raises from tools.py.
authorShilpa Jagannath <smanjara@redhat.com>
Tue, 3 Feb 2026 05:15:46 +0000 (00:15 -0500)
committerShilpa Jagannath <smanjara@redhat.com>
Tue, 3 Feb 2026 05:19:29 +0000 (00:19 -0500)
tools.py should eventually be removed once all other dependent tests move to boto3

Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/zone_rados.py

index 422bf3365a25db8433b450c97a0f94b2e2139e18..29e142ddc5e4f4f055636be48756dbecf570b8f9 100644 (file)
@@ -25,7 +25,6 @@ from nose.plugins.skip import SkipTest
 from .multisite import Zone, ZoneGroup, Credentials
 
 from .conn import get_gateway_connection
-from .tools import assert_raises
 
 class Config:
     """ test configuration """
@@ -660,8 +659,11 @@ def test_bucket_create_with_tenant():
         tenant_primary_conn.head_bucket(Bucket=bucket_name)
         log.info("bucket exists in tenant namespace")
 
-        e = assert_raises(ClientError, primary.s3_client.head_bucket, Bucket=bucket_name)
-        assert e.response['Error']['Code'] == '404'
+        try:
+            primary.s3_client.head_bucket(Bucket=bucket_name)
+            assert False, "Expected 404 error - bucket should not exist in default namespace"
+        except ClientError as e:
+            assert e.response['Error']['Code'] == '404'
         log.info("bucket does not exist in default user namespace")
     finally:
         cmd = ['user', 'rm', '--tenant', tenant, '--uid', uid, '--purge-data']
@@ -1433,25 +1435,28 @@ def test_multi_zone_redirect():
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    # try to read object from second zone (should fail)
-    e = assert_raises(ClientError, zc2.s3_client.get_object, Bucket=bucket_name, Key=obj)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
-
-    set_redirect_zone(z2, z1)
+    try:
+        # try to read object from second zone (should fail)
+        try:
+            zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+            assert False, "Expected NoSuchKey error"
+        except ClientError as e:
+            assert e.response['Error']['Code'] == 'NoSuchKey'
 
-    # Should work now with redirect
-    response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
-    eq(data, response['Body'].read().decode('ascii'))
+        set_redirect_zone(z2, z1)
 
-    # Test updates
-    for x in ['a', 'b', 'c', 'd']:
-        data = x * 512
-        zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data)
-        response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
-        eq(data, response['Body'].read().decode('ascii'))
+        # verify redirect is configured by checking for 301 response
+        try:
+            zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+            assert False, "expected 301 error (boto3 doesn't follow redirects \
+                        https://github.com/boto/botocore/issues/2571)"
+        except ClientError as e:
+            assert e.response['Error']['Code'] == '301', \
+                f"Expected 301 but got {e.response['Error']['Code']}"
 
-    set_sync_from_all(z2, True)
-    set_redirect_zone(z2, None)
+    finally:
+        set_sync_from_all(z2, True)
+        set_redirect_zone(z2, None)
 
 def test_zonegroup_remove():
     zonegroup = realm.master_zonegroup()
@@ -2311,10 +2316,12 @@ def test_role_delete_sync():
     zonegroup_conns.master_zone.iam_conn.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
 
     for zone in zonegroup_conns.zones:
-        e = assert_raises(zone.iam_conn.exceptions.DeleteConflictException,
-                          zone.iam_conn.delete_role, RoleName=role_name)
-        assert e.response['Error']['Code'] == 'DeleteConflict'
-        assert e.response['Error']['Message']
+        try:
+            zone.iam_conn.delete_role(RoleName=role_name)
+            assert False, "Expected DeleteConflictException"
+        except zone.iam_conn.exceptions.DeleteConflictException as e:
+            assert e.response['Error']['Code'] == 'DeleteConflict'
+            assert e.response['Error']['Message']
 
     zonegroup_conns.master_zone.iam_conn.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
 
@@ -2324,8 +2331,11 @@ def test_role_delete_sync():
 
     for zone in zonegroup_conns.zones:
         log.info(f'checking if zone: {zone.name} does not have role: {role_name}')
-        assert_raises(zone.iam_conn.exceptions.NoSuchEntityException,
-                      zone.iam_conn.get_role, RoleName=role_name)
+        try:
+            zone.iam_conn.get_role(RoleName=role_name)
+            assert False, "Expected NoSuchEntityException"
+        except zone.iam_conn.exceptions.NoSuchEntityException:
+            pass  # expected
         log.info(f'success, zone: {zone.name} does not have role: {role_name}')
 
 def test_forwarded_put_bucket_policy_error():
@@ -2342,10 +2352,12 @@ def test_forwarded_put_bucket_policy_error():
     policy = 'Invalid policy document'
     try:
         for zone in zonegroup_conns.rw_zones:
-            e = assert_raises(ClientError, zone.s3_client.put_bucket_policy,
-                              Bucket=bucket, Policy=policy)
-            eq(e.response['Error']['Code'], 'InvalidArgument')
-            assert e.response['Error']['Message']
+            try:
+                zone.s3_client.put_bucket_policy(Bucket=bucket, Policy=policy)
+                assert False, "Expected InvalidArgument error"
+            except ClientError as e:
+                eq(e.response['Error']['Code'], 'InvalidArgument')
+                assert e.response['Error']['Message']
     finally:
         zonegroup_conns.rw_zones[0].delete_bucket(bucket)
         realm_meta_checkpoint(realm)
@@ -4053,11 +4065,13 @@ def test_bucket_create_location_constraint():
                 assert_equal(response['LocationConstraint'], zg.name)
             else:
                 # other zonegroup should fail with 400
-                e = assert_raises(ClientError,
-                                  z.s3_client.create_bucket,
-                                    Bucket=bucket_name,
-                                    CreateBucketConfiguration={'LocationConstraint': zg.name})
-                assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+                try:
+                    z.s3_client.create_bucket(
+                        Bucket=bucket_name,
+                        CreateBucketConfiguration={'LocationConstraint': zg.name})
+                    assert False, "expected 400 error"
+                except ClientError as e:
+                    assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
 
 def test_timestamp_based_epochs():
     """
@@ -4326,8 +4340,11 @@ def test_bucket_replication_normal_delete():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_normal_deletemarker():
@@ -4383,8 +4400,11 @@ def test_bucket_replication_normal_deletemarker():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_alt_user_forbidden():
@@ -4421,8 +4441,11 @@ def test_bucket_replication_alt_user_forbidden():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_alt_user():
@@ -4496,20 +4519,22 @@ def test_bucket_replication_reject_versioning_identical():
     zonegroup_meta_checkpoint(zonegroup)
 
     # create replication configuration
-    e = assert_raises(ClientError,
-                      source.s3_client.put_bucket_replication,
-                      Bucket=source_bucket.name,
-                      ReplicationConfiguration={
-                          'Role': '',
-                          'Rules': [{
-                              'ID': 'rule1',
-                              'Status': 'Enabled',
-                              'Destination': {
-                                  'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
-                              }
-                          }]
-                      })
-    assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+    try:
+        source.s3_client.put_bucket_replication(
+            Bucket=source_bucket.name,
+            ReplicationConfiguration={
+                'Role': '',
+                'Rules': [{
+                    'ID': 'rule1',
+                    'Status': 'Enabled',
+                    'Destination': {
+                        'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+                    }
+                }]
+            })
+        assert False, "Expected 400 error"
+    except ClientError as e:
+        assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
 
 @allow_bucket_replication
 def test_bucket_replication_reject_objectlock_identical():
@@ -4525,20 +4550,22 @@ def test_bucket_replication_reject_objectlock_identical():
     zonegroup_meta_checkpoint(zonegroup)
 
     # create replication configuration
-    e = assert_raises(ClientError,
-                      source.s3_client.put_bucket_replication,
-                      Bucket=source_bucket.name,
-                      ReplicationConfiguration={
-                          'Role': '',
-                          'Rules': [{
-                              'ID': 'rule1',
-                              'Status': 'Enabled',
-                              'Destination': {
-                                  'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
-                              }
-                          }]
-                      })
-    assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+    try:
+        source.s3_client.put_bucket_replication(
+            Bucket=source_bucket.name,
+            ReplicationConfiguration={
+                'Role': '',
+                'Rules': [{
+                    'ID': 'rule1',
+                    'Status': 'Enabled',
+                    'Destination': {
+                        'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
+                    }
+                }]
+            })
+        assert False, "Expected 400 error"
+    except ClientError as e:
+        assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
 
 @allow_bucket_replication
 def test_bucket_replication_non_versioned_to_versioned():
@@ -4584,8 +4611,11 @@ def test_bucket_replication_non_versioned_to_versioned():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object not exists in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_versioned_to_non_versioned():
@@ -4631,8 +4661,11 @@ def test_bucket_replication_versioned_to_non_versioned():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object not exists in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_lock_enabled_to_lock_disabled():
@@ -4694,8 +4727,11 @@ def test_bucket_replication_lock_enabled_to_lock_disabled():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_lock_disabled_to_lock_enabled():
@@ -4757,8 +4793,11 @@ def test_bucket_replication_lock_disabled_to_lock_enabled():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket_name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket_name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @attr('sync_policy')
 @attr('fails_with_rgw')
@@ -4807,8 +4846,11 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional():
 
     # delete bucket - should fail
     log.debug('deleting bucket')
-    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
-    assert e.response['Error']['Code'] == 'BucketNotEmpty'
+    try:
+        zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+        assert False, "Expected BucketNotEmpty error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
     assert check_all_buckets_exist(zcA, [bucketA.name])
     assert check_all_buckets_exist(zcB, [bucketA.name])
@@ -4879,8 +4921,11 @@ def test_bucket_delete_with_bucket_sync_policy_directional():
     zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
 
     log.debug('deleting bucket')
-    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
-    assert e.response['Error']['Code'] == 'BucketNotEmpty'
+    try:
+        zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+        assert False, "Expected BucketNotEmpty error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
     assert check_all_buckets_exist(zcA, [bucketA.name])
     assert check_all_buckets_exist(zcB, [bucketA.name])
@@ -5105,8 +5150,11 @@ def test_delete_bucket_with_zone_opt_out():
 
     # delete bucket on zoneA. it should fail to delete because zoneC still has objnameB
     log.debug('deleting bucket')
-    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
-    assert e.response['Error']['Code'] == 'BucketNotEmpty'
+    try:
+        zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+        assert False, "Expected BucketNotEmpty error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
     assert check_all_buckets_exist(zcA, buckets)
     assert check_all_buckets_exist(zcC, buckets)
@@ -5197,8 +5245,11 @@ def test_bucket_delete_with_sync_policy_object_prefix():
 
     # delete bucket on zoneA. it should fail to delete because zoneB still has objnameB
     log.debug('deleting bucket')
-    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
-    assert e.response['Error']['Code'] == 'BucketNotEmpty'
+    try:
+        zcA.s3_client.delete_bucket(Bucket=bucketA.name)
+        assert False, "Expected BucketNotEmpty error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
     assert check_all_buckets_exist(zcA, buckets)
     assert check_all_buckets_exist(zcB, buckets)
@@ -5363,8 +5414,11 @@ def test_bucket_replication_alt_user_delete():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_alt_user_deletemarker_forbidden():
@@ -5509,8 +5563,11 @@ def test_bucket_replication_alt_user_deletemarker():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
 @allow_bucket_replication
 def test_bucket_replication_alt_user_deny_tagreplication():
@@ -5623,8 +5680,11 @@ def test_bucket_replication_source_forbidden():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
     # remove bucket policy so I can check for replication status
     source.s3_client.delete_bucket_policy(Bucket=source_bucket.name)
@@ -5687,8 +5747,11 @@ def test_bucket_replication_source_forbidden_versioned():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
     # remove bucket policy so I can check for replication status
     source.s3_client.delete_bucket_policy(Bucket=source_bucket.name)
@@ -5849,8 +5912,11 @@ def test_bucket_replication_source_forbidden_objretention():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
     # check the source object has replication status set to FAILED
     # uncomment me in https://github.com/ceph/ceph/pull/62147
@@ -5901,8 +5967,11 @@ def test_bucket_replication_source_forbidden_legalhold():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
-    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
-    assert e.response['Error']['Code'] == 'NoSuchKey'
+    try:
+        dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+        assert False, "Expected NoSuchKey error"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == 'NoSuchKey'
 
     # check the source object has replication status set to FAILED
 
@@ -6035,11 +6104,14 @@ def test_copy_obj_perm_check_between_zonegroups(zonegroup):
         realm_meta_checkpoint(realm)
 
         # copy object returns 403
-        e = assert_raises(ClientError, dest_zone.s3_client.copy_object,
-                          Bucket=dest_bucket.name,
-                          CopySource={'Bucket': source_bucket.name, 'Key': objname},
-                          Key=objname)
-        assert e.response['Error']['Code'] == 'AccessDenied'
+        try:
+            dest_zone.s3_client.copy_object(
+                Bucket=dest_bucket.name,
+                CopySource={'Bucket': source_bucket.name, 'Key': objname},
+                Key=objname)
+            assert False, "Expected AccessDenied error"
+        except ClientError as e:
+            assert e.response['Error']['Code'] == 'AccessDenied'
 
 
 def test_object_lock_sync():
index 5343353af3f315c2b2aa9d1fef749ab8196c776c..73e04e7805e354109fdad9b9a97b5607b66fb054 100644 (file)
@@ -72,7 +72,7 @@ class RadosZone(Zone):
                 return self.s3_resource.Bucket(name)
             except ClientError as e:
                 error_code = e.response['Error']['Code']
-                if error_codein ['404', 'NoSuchBucket']:
+                if error_code in ['404', 'NoSuchBucket']:
                     return None
                 raise