From: Shilpa Jagannath Date: Wed, 21 Jan 2026 23:54:55 +0000 (-0500) Subject: qa/multisite: boto3 in tests.py X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f232222ef2e426e9b883231cf70993365e13a3ba;p=ceph-ci.git qa/multisite: boto3 in tests.py Signed-off-by: Shilpa Jagannath (cherry picked from commit 3438add3fc9e086da6313830e697e6a987893f3c) Conflicts: src/test/rgw/rgw_multi/tests.py - Whitespace and other drift - Tentacle doesn't sync object locks - `test_suspended_delete_marker_incremental_sync` is a new test but not a new behavior. Signed-off-by: Adam C. Emerson --- diff --git a/src/test/rgw/rgw_multi/conn.py b/src/test/rgw/rgw_multi/conn.py index 34fb4ae340d..7d23b1699a0 100644 --- a/src/test/rgw/rgw_multi/conn.py +++ b/src/test/rgw/rgw_multi/conn.py @@ -1,19 +1,29 @@ - -import boto.iam.connection -import boto.sts.connection import boto3 def get_gateway_connection(gateway, credentials, region): """ connect to the given gateway """ # Always create a new connection to the gateway to ensure each set of credentials gets its own connection - conn = boto3.client('s3', + s3_client = boto3.client('s3', endpoint_url='http://' + gateway.host + ':' + str(gateway.port), aws_access_key_id=credentials.access_key, aws_secret_access_key=credentials.secret, region_name=region) - if gateway.connection is None: - gateway.connection = conn - return conn + if gateway.s3_client is None: + gateway.s3_client = s3_client + return s3_client + +def get_gateway_s3_resource(gateway, credentials, region): + """ connect to boto3 s3 resource api of the given gateway """ + print(f"Credentials: {credentials.access_key}") + s3_resource = boto3.resource('s3', + endpoint_url='http://' + gateway.host + ':' + str(gateway.port), + aws_access_key_id=credentials.access_key, + aws_secret_access_key=credentials.secret, + region_name=region, + ) + if gateway.s3_resource is None: + gateway.s3_resource = s3_resource + return s3_resource def get_gateway_secure_connection(gateway, credentials, region): """ secure connect to the given gateway """ diff --git a/src/test/rgw/rgw_multi/multisite.py b/src/test/rgw/rgw_multi/multisite.py index 842344abdd3..77704c01ee5 100644 --- a/src/test/rgw/rgw_multi/multisite.py +++ b/src/test/rgw/rgw_multi/multisite.py @@ -3,7 +3,7 @@ from io import StringIO import json -from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection, get_gateway_sns_client, get_gateway_sts_connection, get_gateway_temp_s3_client +from .conn import get_gateway_connection, get_gateway_s3_resource, get_gateway_iam_connection, get_gateway_secure_connection, get_gateway_sns_client, get_gateway_sts_connection, get_gateway_temp_s3_client class Cluster: """ interface to run commands against a distinct ceph cluster """ @@ -25,6 +25,8 @@ class Gateway: self.zone = zone self.connection = None self.secure_connection = None + self.s3_client = None + self.s3_resource = None self.ssl_port = ssl_port self.iam_connection = None self.sns_client = None @@ -190,7 +192,8 @@ class ZoneConn(object): if self.zone.gateways is not None: region = "" if self.zone.zonegroup is None else self.zone.zonegroup.name self.iam_conn = get_gateway_iam_connection(self.zone.gateways[0], self.credentials, region) - self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials, region) + self.s3_client = get_gateway_connection(self.zone.gateways[0], self.credentials, region) + self.s3_resource = get_gateway_s3_resource(self.zone.gateways[0], self.credentials, region) self.secure_conn = get_gateway_connection(self.zone.gateways[0], self.credentials, region) self.sns_client = get_gateway_sns_client(self.zone.gateways[0], self.credentials, region) self.temp_s3_client = None diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 45226a58a91..cd4ba406e37 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -11,10 +11,7 @@ from itertools import combinations from itertools import zip_longest from io import StringIO -import boto -import boto.s3.connection -from boto.s3.website import WebsiteConfiguration -from boto.s3.cors import CORSConfiguration +import boto3 from botocore.exceptions import ClientError from nose.tools import eq_ as eq @@ -489,6 +486,7 @@ class ZonegroupConns: self.master_zone = None for z in zonegroup.zones: + log.debug('=== Debug get_conn for zone=%s ===', z.name) zone_conn = z.get_conn(user.credentials) non_account_zone_conn = z.get_conn(non_account_user.credentials) non_account_alt_zone_conn = z.get_conn(non_account_alt_user.credentials) @@ -603,6 +601,8 @@ def create_bucket_per_zone_in_realm(): def test_bucket_create(): zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) + zonegroup_meta_checkpoint(zonegroup) + buckets, _ = create_bucket_per_zone(zonegroup_conns) zonegroup_meta_checkpoint(zonegroup) @@ -624,30 +624,35 @@ def test_bucket_create_with_tenant(): tenant = 'testx' uid = 'test' - tenant_secondary_conn = boto.s3.connection.S3Connection(aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - is_secure=False, - port=secondary.zone.gateways[0].port, - host=secondary.zone.gateways[0].host, - calling_format='boto.s3.connection.OrdinaryCallingFormat') + # create s3 clients for tenant user + tenant_secondary_conn = boto3.client( + 's3', + endpoint_url=f'http://{secondary.zone.gateways[0].host}:{secondary.zone.gateways[0].port}', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name='default' + ) - tenant_primary_conn = boto.s3.connection.S3Connection(aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - is_secure=False, - port=primary.zone.gateways[0].port, - host=primary.zone.gateways[0].host, - calling_format='boto.s3.connection.OrdinaryCallingFormat') + tenant_primary_conn = boto3.client( + 's3', + endpoint_url=f'http://{primary.zone.gateways[0].host}:{primary.zone.gateways[0].port}', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name='default' + ) cmd = ['user', 'create', '--tenant', tenant, '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', 'tenanted-user'] primary.zone.cluster.admin(cmd) zonegroup_meta_checkpoint(zonegroup) try: - bucket = tenant_secondary_conn.create_bucket('tenanted-bucket') + bucket_name = 'tenanted-bucket' + tenant_secondary_conn.create_bucket(Bucket=bucket_name) zonegroup_meta_checkpoint(zonegroup) - assert tenant_primary_conn.get_bucket(bucket.name) + tenant_primary_conn.head_bucket(Bucket=bucket_name) log.info("bucket exists in tenant namespace") - e = assert_raises(boto.exception.S3ResponseError, primary.get_bucket, bucket.name) - assert e.error_code == 'NoSuchBucket' + + e = assert_raises(ClientError, primary.s3_client.head_bucket, Bucket=bucket_name) + assert e.response['Error']['Code'] == 'NoSuchBucket' log.info("bucket does not exist in default user namespace") finally: cmd = ['user', 'rm', '--tenant', tenant, '--uid', uid, '--purge-data'] @@ -685,25 +690,14 @@ def test_bucket_remove(): for zone in zonegroup_conns.zones: assert check_all_buckets_exist(zone, buckets) - for zone, bucket_name in zone_buckets: - zone.conn.delete_bucket(bucket_name) + for zone, bucket in zone_buckets: + zone.s3_client.delete_bucket(Bucket=bucket.name) zonegroup_meta_checkpoint(zonegroup) for zone in zonegroup_conns.zones: assert check_all_buckets_dont_exist(zone, buckets) -def get_bucket(zone, bucket_name): - return zone.conn.get_bucket(bucket_name) - -def get_key(zone, bucket_name, obj_name): - b = get_bucket(zone, bucket_name) - return b.get_key(obj_name) - -def new_key(zone, bucket_name, obj_name): - b = get_bucket(zone, bucket_name) - return b.new_key(obj_name) - def check_bucket_eq(zone_conn1, zone_conn2, bucket): if zone_conn2.zone.has_buckets(): zone_conn2.check_bucket_eq(zone_conn1, bucket.name) @@ -839,15 +833,13 @@ def test_object_sync(): zonegroup_conns = ZonegroupConns(zonegroup) buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - objnames = [ 'myobj', '_myobj', ':', '&', '.', '..', '...', '.o', '.o.'] - + objnames = ['myobj', '_myobj', ':', '&', '.', '..', '...', '.o', '.o.'] content = 'asdasd' - # don't wait for meta sync just yet - for zone, bucket_name in zone_bucket: + # upload objects + for zone, bucket in zone_bucket: for objname in objnames: - k = new_key(zone, bucket_name, objname) - k.set_contents_from_string(content) + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content) zonegroup_meta_checkpoint(zonegroup) @@ -867,10 +859,9 @@ def test_object_delete(): objname = 'myobj' content = 'asdasd' - # don't wait for meta sync just yet + # upload objects for zone, bucket in zone_bucket: - k = new_key(zone, bucket, objname) - k.set_contents_from_string(content) + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content) zonegroup_meta_checkpoint(zonegroup) @@ -883,10 +874,10 @@ def test_object_delete(): zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) check_bucket_eq(source_conn, target_conn, bucket) - # check object removal + # delete objects for source_conn, bucket in zone_bucket: - k = get_key(source_conn, bucket, objname) - k.delete() + source_conn.s3_client.delete_object(Bucket=bucket.name, Key=objname) + for target_conn in zonegroup_conns.zones: if source_conn.zone == target_conn.zone: continue @@ -899,12 +890,13 @@ def test_multi_object_delete(): zonegroup_conns = ZonegroupConns(zonegroup) buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - objnames = [f'obj{i}' for i in range(1,50)] + objnames = [f'obj{i}' for i in range(1, 50)] content = 'asdasd' - # don't wait for meta sync just yet + # upload objects for zone, bucket in zone_bucket: - create_objects(zone, bucket, objnames, content) + for objname in objnames: + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content) zonegroup_meta_checkpoint(zonegroup) @@ -917,9 +909,14 @@ def test_multi_object_delete(): zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) check_bucket_eq(source_conn, target_conn, bucket) - # check object removal + # delete objects for source_conn, bucket in zone_bucket: - bucket.delete_keys(objnames) + objects = [{'Key': obj} for obj in objnames] + source_conn.s3_client.delete_objects( + Bucket=bucket.name, + Delete={'Objects': objects} + ) + for target_conn in zonegroup_conns.zones: if source_conn.zone == target_conn.zone: continue @@ -927,27 +924,24 @@ def test_multi_object_delete(): zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) check_bucket_eq(source_conn, target_conn, bucket) -def get_latest_object_version(key): - for k in key.bucket.list_versions(key.name): - if k.is_latest: - return k - return None - def test_versioned_object_incremental_sync(): zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) # enable versioning - for _, bucket in zone_bucket: - bucket.configure_versioning(True) + for zone, bucket in zone_bucket: + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) zonegroup_meta_checkpoint(zonegroup) # upload a dummy object to each bucket and wait for sync. this forces each # bucket to finish a full sync and switch to incremental for source_conn, bucket in zone_bucket: - new_key(source_conn, bucket, 'dummy').set_contents_from_string('') + source_conn.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='') for target_conn in zonegroup_conns.zones: if source_conn.zone == target_conn.zone: continue @@ -957,33 +951,48 @@ def test_versioned_object_incremental_sync(): # create and delete multiple versions of an object from each zone for zone_conn in zonegroup_conns.rw_zones: obj = 'obj-' + zone_conn.name - k = new_key(zone_conn, bucket, obj) - k.set_contents_from_string('version1') - log.debug('version1 id=%s', k.version_id) + resp1 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version1') + version_id_1 = resp1['VersionId'] + log.debug('version1 id=%s', version_id_1) # don't delete version1 - this tests that the initial version # doesn't get squashed into later versions # create and delete the following object versions to test that # the operations don't race with each other during sync - k.set_contents_from_string('version2') - log.debug('version2 id=%s', k.version_id) - k.bucket.delete_key(obj, version_id=k.version_id) - - k.set_contents_from_string('version3') - log.debug('version3 id=%s', k.version_id) - k.bucket.delete_key(obj, version_id=k.version_id) + resp2 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version2') + version_id_2 = resp2['VersionId'] + log.debug('version2 id=%s', version_id_2) + zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_2) + + # Create and delete version3 + resp3 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version3') + version_id_3 = resp3['VersionId'] + log.debug('version3 id=%s', version_id_3) + zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_3) for _, bucket in zone_bucket: zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + # update ACLs to test metadata-only entries for _, bucket in zone_bucket: - # overwrite the acls to test that metadata-only entries are applied for zone_conn in zonegroup_conns.rw_zones: obj = 'obj-' + zone_conn.name - k = new_key(zone_conn, bucket.name, obj) - v = get_latest_object_version(k) - v.make_public() + + # get latest version + response = zone_conn.s3_client.list_object_versions( + Bucket=bucket.name, + Prefix=obj, + MaxKeys=1 + ) + if response.get('Versions'): + latest = response['Versions'][0] + zone_conn.s3_client.put_object_acl( + Bucket=bucket.name, + Key=obj, + VersionId=latest['VersionId'], + ACL='public-read' + ) for _, bucket in zone_bucket: zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -1001,24 +1010,29 @@ def test_null_version_id_delete(): obj = 'obj' # upload an initial object - key1 = new_key(zone, bucket, obj) - key1.set_contents_from_string('') - log.debug('created initial version id=%s', key1.version_id) + resp1 = zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') + version_id_1 = resp1.get('VersionId', 'null') + log.debug('created initial version id=%s', version_id_1) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # enable versioning - bucket.configure_versioning(True) + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) zonegroup_meta_checkpoint(zonegroup) # re-upload the object as a new version - key2 = new_key(zone, bucket, obj) - key2.set_contents_from_string('') - log.debug('created new version id=%s', key2.version_id) + resp2 = zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') + version_id_2 = resp2['VersionId'] + log.debug('created new version id=%s', version_id_2) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) - bucket.delete_key(obj, version_id='null') + # delete null version + zone.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId='null') - bucket.delete_key(obj, version_id=key2.version_id) + # delete version 2 + zone.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_2) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -1030,22 +1044,24 @@ def test_concurrent_versioned_object_incremental_sync(): # create a versioned bucket bucket = zone.create_bucket(gen_bucket_name()) log.debug('created bucket=%s', bucket.name) - bucket.configure_versioning(True) + + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) zonegroup_meta_checkpoint(zonegroup) - # upload a dummy object and wait for sync. this forces each zone to finish - # a full sync and switch to incremental - new_key(zone, bucket, 'dummy').set_contents_from_string('') + # upload a dummy object and wait for sync + zone.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) - # create several concurrent versions on each zone and let them race to sync + # create several concurrent versions on each zone obj = 'obj' for i in range(10): for zone_conn in zonegroup_conns.rw_zones: - k = new_key(zone_conn, bucket, obj) - k.set_contents_from_string('version1') - log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id) + resp = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version1') + log.debug('zone=%s version=%s', zone_conn.zone.name, resp['VersionId']) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) zonegroup_data_checkpoint(zonegroup_conns) @@ -1053,7 +1069,6 @@ def test_concurrent_versioned_object_incremental_sync(): def test_version_suspended_incremental_sync(): zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) - zone = zonegroup_conns.rw_zones[0] # create a non-versioned bucket @@ -1062,29 +1077,32 @@ def test_version_suspended_incremental_sync(): zonegroup_meta_checkpoint(zonegroup) # upload an initial object - key1 = new_key(zone, bucket, 'obj') - key1.set_contents_from_string('') - log.debug('created initial version id=%s', key1.version_id) + resp1 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='') + log.debug('created initial version id=%s', resp1.get('VersionId', 'null')) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # enable versioning - bucket.configure_versioning(True) + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) zonegroup_meta_checkpoint(zonegroup) # re-upload the object as a new version - key2 = new_key(zone, bucket, 'obj') - key2.set_contents_from_string('') - log.debug('created new version id=%s', key2.version_id) + resp2 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='') + log.debug('created new version id=%s', resp2['VersionId']) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # suspend versioning - bucket.configure_versioning(False) + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Suspended'} + ) zonegroup_meta_checkpoint(zonegroup) # re-upload the object as a 'null' version - key3 = new_key(zone, bucket, 'obj') - key3.set_contents_from_string('') - log.debug('created null version id=%s', key3.version_id) + resp3 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='') + log.debug('created null version id=%s', resp3.get('VersionId', 'null')) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) def test_delete_marker_full_sync(): @@ -1093,20 +1111,21 @@ def test_delete_marker_full_sync(): buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) # enable versioning - for _, bucket in zone_bucket: - bucket.configure_versioning(True) + for zone, bucket in zone_bucket: + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) zonegroup_meta_checkpoint(zonegroup) for zone, bucket in zone_bucket: # upload an initial object - key1 = new_key(zone, bucket, 'obj') - key1.set_contents_from_string('') + zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='') - # create a delete marker - key2 = new_key(zone, bucket, 'obj') - key2.delete() - key2.delete() - key2.delete() + # create delete markers + zone.s3_client.delete_object(Bucket=bucket.name, Key='obj') + zone.s3_client.delete_object(Bucket=bucket.name, Key='obj') + zone.s3_client.delete_object(Bucket=bucket.name, Key='obj') # wait for full sync for _, bucket in zone_bucket: @@ -1118,21 +1137,25 @@ def test_suspended_delete_marker_full_sync(): buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) # enable/suspend versioning - for _, bucket in zone_bucket: - bucket.configure_versioning(True) - bucket.configure_versioning(False) + for zone, bucket in zone_bucket: + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Suspended'} + ) zonegroup_meta_checkpoint(zonegroup) for zone, bucket in zone_bucket: # upload an initial object - key1 = new_key(zone, bucket, 'obj') - key1.set_contents_from_string('') + zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='') - # create a delete marker - key2 = new_key(zone, bucket, 'obj') - key2.delete() - key2.delete() - key2.delete() + # create delete markers + zone.s3_client.delete_object(Bucket=bucket.name, Key='obj') + zone.s3_client.delete_object(Bucket=bucket.name, Key='obj') + zone.s3_client.delete_object(Bucket=bucket.name, Key='obj') # wait for full sync for _, bucket in zone_bucket: @@ -1146,47 +1169,94 @@ def test_concurrent_delete_markers_incremental_sync(): # create a versioned bucket bucket = zone.create_bucket(gen_bucket_name()) log.debug('created bucket=%s', bucket.name) - bucket.configure_versioning(True) + + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) zonegroup_meta_checkpoint(zonegroup) obj = 'obj' - # upload a dummy object and wait for sync. this forces each zone to finish - # a full sync and switch to incremental - new_key(zone, bucket, obj).set_contents_from_string('') + # upload a dummy object and wait for sync + zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) - # create several concurrent delete markers on each zone and let them race to sync + # create several concurrent delete markers on each zone for i in range(2): for zone_conn in zonegroup_conns.rw_zones: - key = new_key(zone_conn, bucket, obj) - key.delete() + zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_suspended_delete_marker_incremental_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + zone = zonegroup_conns.rw_zones[0] + + # create a versioned bucket + bucket = zone.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Suspended'} + ) + + zonegroup_meta_checkpoint(zonegroup) + + obj = 'obj' + + # upload a dummy object and wait for sync + zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # create a delete marker on source zone + zone.s3_client.delete_object(Bucket=bucket.name, Key=obj) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) def test_bucket_versioning(): buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - bucket.configure_versioning(True) - res = bucket.get_versioning_status() - key = 'Versioning' - assert(key in res and res[key] == 'Enabled') + for zone, bucket in zone_bucket: + zone.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + response = zone.s3_client.get_bucket_versioning(Bucket=bucket.name) + assert response.get('Status') == 'Enabled' def test_bucket_acl(): buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner - bucket.set_acl('public-read') - assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers + for zone, bucket in zone_bucket: + acl = zone.s3_client.get_bucket_acl(Bucket=bucket.name) + assert len(acl['Grants']) == 1 # single grant on owner + + zone.s3_client.put_bucket_acl(Bucket=bucket.name, ACL='public-read') + + acl = zone.s3_client.get_bucket_acl(Bucket=bucket.name) + assert len(acl['Grants']) == 2 # new grant on AllUsers def test_bucket_cors(): buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - cors_cfg = CORSConfiguration() - cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000) - bucket.set_cors(cors_cfg) - assert(bucket.get_cors().to_xml() == cors_cfg.to_xml()) + for zone, bucket in zone_bucket: + cors_cfg = { + 'CORSRules': [{ + 'AllowedMethods': ['DELETE'], + 'AllowedOrigins': ['https://www.example.com'], + 'AllowedHeaders': ['*'], + 'MaxAgeSeconds': 3000 + }] + } + zone.s3_client.put_bucket_cors(Bucket=bucket.name, CORSConfiguration=cors_cfg) + + response = zone.s3_client.get_bucket_cors(Bucket=bucket.name) + assert response['CORSRules'] == cors_cfg['CORSRules'] def test_bucket_delete_notempty(): zonegroup = realm.master_zonegroup() @@ -1194,24 +1264,21 @@ def test_bucket_delete_notempty(): buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) zonegroup_meta_checkpoint(zonegroup) - for zone_conn, bucket_name in zone_bucket: + for zone_conn, bucket in zone_bucket: # upload an object to each bucket on its own zone - conn = zone_conn.get_connection() - bucket = conn.get_bucket(bucket_name) - k = bucket.new_key('foo') - k.set_contents_from_string('bar') + zone_conn.s3_client.put_object(Bucket=bucket.name, Key='foo', Body='bar') + # attempt to delete the bucket before this object can sync try: - conn.delete_bucket(bucket_name) - except boto.exception.S3ResponseError as e: - assert(e.error_code == 'BucketNotEmpty') + zone_conn.s3_client.delete_bucket(Bucket=bucket.name) + except ClientError as e: + assert e.response['Error']['Code'] == 'BucketNotEmpty' continue - assert False # expected 409 BucketNotEmpty + assert False # expected BucketNotEmpty # assert that each bucket still exists on the master - c1 = zonegroup_conns.master_zone.conn - for _, bucket_name in zone_bucket: - assert c1.get_bucket(bucket_name) + for _, bucket in zone_bucket: + zonegroup_conns.master_zone.s3_client.head_bucket(Bucket=bucket.name) def test_multi_period_incremental_sync(): zonegroup = realm.master_zonegroup() @@ -1242,7 +1309,7 @@ def test_multi_period_incremental_sync(): continue bucket_name = gen_bucket_name() log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) - bucket = zone_conn.conn.create_bucket(bucket_name) + zone_conn.s3_client.create_bucket(Bucket=bucket_name) buckets.append(bucket_name) # wait for zone 1 to sync @@ -1252,12 +1319,12 @@ def test_multi_period_incremental_sync(): set_master_zone(z1) mdlog_periods += [realm.current_period.id] - for zone_conn, bucket_name in zone_bucket: + for zone_conn, _ in zone_bucket: if zone_conn.zone == z3: continue bucket_name = gen_bucket_name() log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) - zone_conn.conn.create_bucket(bucket_name) + zone_conn.s3_client.create_bucket(Bucket=bucket_name) buckets.append(bucket_name) # restart zone 3 gateway and wait for sync @@ -1270,7 +1337,6 @@ def test_multi_period_incremental_sync(): for target_conn in zonegroup_conns.zones: if source_conn.zone == target_conn.zone: continue - if target_conn.zone.has_buckets(): target_conn.check_bucket_eq(source_conn, bucket_name) @@ -1306,8 +1372,7 @@ def test_datalog_autotrim(): # upload an object to each zone to generate a datalog entry for zone, bucket in zone_bucket: - k = new_key(zone, bucket.name, 'key') - k.set_contents_from_string('body') + zone.s3_client.put_object(Bucket=bucket.name, Key='key', Body='body') # wait for metadata and data sync to catch up zonegroup_meta_checkpoint(zonegroup) @@ -1337,45 +1402,41 @@ def test_datalog_autotrim(): def test_multi_zone_redirect(): zonegroup = realm.master_zonegroup() if len(zonegroup.rw_zones) < 2: - raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.") + raise SkipTest("test_multi_zone_redirect skipped. Requires 2 or more zones in master zonegroup.") zonegroup_conns = ZonegroupConns(zonegroup) (zc1, zc2) = zonegroup_conns.rw_zones[0:2] - z1, z2 = (zc1.zone, zc2.zone) set_sync_from_all(z2, False) - # create a bucket on the first zone bucket_name = gen_bucket_name() log.info('create bucket zone=%s name=%s', z1.name, bucket_name) - bucket = zc1.conn.create_bucket(bucket_name) + zc1.s3_client.create_bucket(Bucket=bucket_name) + obj = 'testredirect' - - key = bucket.new_key(obj) - data = 'A'*512 - key.set_contents_from_string(data) + data = 'A' * 512 + zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data) zonegroup_meta_checkpoint(zonegroup) # try to read object from second zone (should fail) - bucket2 = get_bucket(zc2, bucket_name) - assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj) + e = assert_raises(ClientError, zc2.s3_client.get_object, Bucket=bucket_name, Key=obj) + assert e.response['Error']['Code'] == 'NoSuchKey' set_redirect_zone(z2, z1) - key2 = bucket2.get_key(obj) - - eq(data, key2.get_contents_as_string(encoding='ascii')) - - key = bucket.new_key(obj) + # Should work now with redirect + response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj) + eq(data, response['Body'].read().decode('ascii')) + # Test updates for x in ['a', 'b', 'c', 'd']: - data = x*512 - key.set_contents_from_string(data) - eq(data, key2.get_contents_as_string(encoding='ascii')) + data = x * 512 + zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data) + response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj) + eq(data, response['Body'].read().decode('ascii')) - # revert config changes set_sync_from_all(z2, True) set_redirect_zone(z2, None) @@ -1441,14 +1502,23 @@ def test_zg_master_zone_delete(): def test_set_bucket_website(): buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html') + for zone, bucket in zone_bucket: + website_cfg = { + 'IndexDocument': {'Suffix': 'index.html'}, + 'ErrorDocument': {'Key': 'error.html'} + } try: - bucket.set_website_configuration(website_cfg) - except boto.exception.S3ResponseError as e: - if e.error_code == 'MethodNotAllowed': + zone.s3_client.put_bucket_website( + Bucket=bucket.name, + WebsiteConfiguration=website_cfg + ) + except ClientError as e: + if e.response['Error']['Code'] == 'MethodNotAllowed': raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.") - assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml()) + + response = zone.s3_client.get_bucket_website(Bucket=bucket.name) + assert response['IndexDocument']['Suffix'] == 'index.html' + assert response['ErrorDocument']['Key'] == 'error.html' def test_set_bucket_policy(): policy = '''{ @@ -1459,9 +1529,10 @@ def test_set_bucket_policy(): }] }''' buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - bucket.set_policy(policy) - assert(bucket.get_policy().decode('ascii') == policy) + for zone, bucket in zone_bucket: + zone.s3_client.put_bucket_policy(Bucket=bucket.name, Policy=policy) + response = zone.s3_client.get_bucket_policy(Bucket=bucket.name) + assert response['Policy'] == policy @attr('bucket_sync_disable') def test_bucket_sync_disable(): @@ -1489,8 +1560,7 @@ def test_bucket_sync_enable_right_after_disable(): for zone, bucket in zone_bucket: for objname in objnames: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content) zonegroup_meta_checkpoint(zonegroup) @@ -1505,8 +1575,7 @@ def test_bucket_sync_enable_right_after_disable(): for zone, bucket in zone_bucket: for objname in objnames_2: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content) for bucket_name in buckets: zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) @@ -1519,13 +1588,12 @@ def test_bucket_sync_disable_enable(): zonegroup_conns = ZonegroupConns(zonegroup) buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ] + objnames = ['obj1', 'obj2', 'obj3', 'obj4'] content = 'asdasd' for zone, bucket in zone_bucket: for objname in objnames: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content) zonegroup_meta_checkpoint(zonegroup) @@ -1537,12 +1605,11 @@ def test_bucket_sync_disable_enable(): zonegroup_meta_checkpoint(zonegroup) - objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ] + objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8'] for zone, bucket in zone_bucket: for objname in objnames_2: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content) for bucket_name in buckets: enable_bucket_sync(realm.meta_master_zone(), bucket_name) @@ -1557,19 +1624,32 @@ def test_multipart_object_sync(): zonegroup_conns = ZonegroupConns(zonegroup) buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - _, bucket = zone_bucket[0] + zone, bucket = zone_bucket[0] # initiate a multipart upload - upload = bucket.initiate_multipart_upload('MULTIPART') - mp = boto.s3.multipart.MultiPartUpload(bucket) - mp.key_name = upload.key_name - mp.id = upload.id - part_size = 5 * 1024 * 1024 # 5M min part size - mp.upload_part_from_file(StringIO('a' * part_size), 1) - mp.upload_part_from_file(StringIO('b' * part_size), 2) - mp.upload_part_from_file(StringIO('c' * part_size), 3) - mp.upload_part_from_file(StringIO('d' * part_size), 4) - mp.complete_upload() + key_name = 'MULTIPART' + response = zone.s3_client.create_multipart_upload(Bucket=bucket.name, Key=key_name) + upload_id = response['UploadId'] + + part_size = 5 * 1024 * 1024 # 5M min part size + parts = [] + + for part_num, char in enumerate(['a', 'b', 'c', 'd'], start=1): + part_response = zone.s3_client.upload_part( + Bucket=bucket.name, + Key=key_name, + PartNumber=part_num, + UploadId=upload_id, + Body=char * part_size + ) + parts.append({'PartNumber': part_num, 'ETag': part_response['ETag']}) + + zone.s3_client.complete_multipart_upload( + Bucket=bucket.name, + Key=key_name, + UploadId=upload_id, + MultipartUpload={'Parts': parts} + ) zonegroup_meta_checkpoint(zonegroup) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -1579,45 +1659,50 @@ def test_encrypted_object_sync(): zonegroup_conns = ZonegroupConns(zonegroup) if len(zonegroup.rw_zones) < 2: - raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.") + raise SkipTest("test_encrypted_object_sync skipped. Requires 2 or more zones in master zonegroup.") (zone1, zone2) = zonegroup_conns.rw_zones[0:2] - # create a bucket on the first zone bucket_name = gen_bucket_name() log.info('create bucket zone=%s name=%s', zone1.name, bucket_name) - bucket = zone1.conn.create_bucket(bucket_name) + zone1.s3_client.create_bucket(Bucket=bucket_name) + data = 'A' * 512 + # upload an object with sse-c encryption - sse_c_headers = { - 'x-amz-server-side-encryption-customer-algorithm': 'AES256', - 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', - 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==' - } - key = bucket.new_key('testobj-sse-c') - data = 'A'*512 - key.set_contents_from_string(data, headers=sse_c_headers) + zone1.s3_client.put_object( + Bucket=bucket_name, + Key='testobj-sse-c', + Body=data, + SSECustomerAlgorithm='AES256', + SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw==' + ) # upload an object with sse-kms encryption - sse_kms_headers = { - 'x-amz-server-side-encryption': 'aws:kms', - # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this) - 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', - } - key = bucket.new_key('testobj-sse-kms') - key.set_contents_from_string(data, headers=sse_kms_headers) + zone1.s3_client.put_object( + Bucket=bucket_name, + Key='testobj-sse-kms', + Body=data, + ServerSideEncryption='aws:kms', + SSEKMSKeyId='testkey-1' + ) - # wait for the bucket metadata and data to sync zonegroup_meta_checkpoint(zonegroup) zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name) # read the encrypted objects from the second zone - bucket2 = get_bucket(zone2, bucket_name) - key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers) - eq(data, key.get_contents_as_string(headers=sse_c_headers, encoding='ascii')) + response = zone2.s3_client.get_object( + Bucket=bucket_name, + Key='testobj-sse-c', + SSECustomerAlgorithm='AES256', + SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw==' + ) + eq(data, response['Body'].read().decode('ascii')) - key = bucket2.get_key('testobj-sse-kms') - eq(data, key.get_contents_as_string(encoding='ascii')) + response = zone2.s3_client.get_object(Bucket=bucket_name, Key='testobj-sse-kms') + eq(data, response['Body'].read().decode('ascii')) @attr('bucket_trim') def test_bucket_index_log_trim(): @@ -1630,10 +1715,9 @@ def test_bucket_index_log_trim(): def make_test_bucket(): name = gen_bucket_name() log.info('create bucket zone=%s name=%s', zone.name, name) - bucket = zone.conn.create_bucket(name) + bucket = zone.create_bucket(name) for objname in ('a', 'b', 'c', 'd'): - k = new_key(zone, name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_bucket_checkpoint(zonegroup_conns, name) return bucket @@ -1688,10 +1772,9 @@ def test_bucket_reshard_index_log_trim(): def make_test_bucket(): name = gen_bucket_name() log.info('create bucket zone=%s name=%s', zone.name, name) - bucket = zone.conn.create_bucket(name) + bucket = zone.create_bucket(name) for objname in ('a', 'b', 'c', 'd'): - k = new_key(zone, name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_bucket_checkpoint(zonegroup_conns, name) return bucket @@ -1725,8 +1808,8 @@ def test_bucket_reshard_index_log_trim(): # upload more objects for objname in ('e', 'f', 'g', 'h'): - k = new_key(zone, test_bucket.name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=test_bucket.name, Key=objname, Body='foo') + zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name) # Resharding the bucket again @@ -1757,8 +1840,8 @@ def test_bucket_reshard_index_log_trim(): # upload more objects for objname in ('i', 'j', 'k', 'l'): - k = new_key(zone, test_bucket.name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=test_bucket.name, Key=objname, Body='foo') + zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name) # verify the bucket has non-empty bilog @@ -1776,11 +1859,10 @@ def test_bucket_log_trim_after_delete_bucket_primary_reshard(): # create a test bucket, upload some objects, and wait for sync def make_test_bucket(): name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', primary.name, name) - bucket = primary.conn.create_bucket(name) + log.info('create bucket zone=%s name=%s', primary.zone.name, name) + bucket = primary.create_bucket(name) for objname in ('a', 'b', 'c', 'd'): - k = new_key(primary, name, objname) - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_bucket_checkpoint(zonegroup_conns, name) return bucket @@ -1801,7 +1883,7 @@ def test_bucket_log_trim_after_delete_bucket_primary_reshard(): primary.zone.cluster.admin(cmd) # delete bucket and test bilog autotrim - primary.conn.delete_bucket(test_bucket.name) + primary.s3_client.delete_bucket(Bucket=test_bucket.name) zonegroup_data_checkpoint(zonegroup_conns) bilog_autotrim(primary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],) @@ -1838,11 +1920,10 @@ def test_bucket_log_trim_after_delete_bucket_secondary_reshard(): # create a test bucket, upload some objects, and wait for sync def make_test_bucket(): name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', primary.name, name) - bucket = primary.conn.create_bucket(name) + log.info('create bucket zone=%s name=%s', primary.zone.name, name) + bucket = primary.create_bucket(name) for objname in ('a', 'b', 'c', 'd'): - k = new_key(primary, name, objname) - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_bucket_checkpoint(zonegroup_conns, name) return bucket @@ -1864,7 +1945,7 @@ def test_bucket_log_trim_after_delete_bucket_secondary_reshard(): primary.zone.cluster.admin(cmd) # delete bucket and test bilog autotrim - primary.conn.delete_bucket(test_bucket.name) + primary.s3_client.delete_bucket(Bucket=test_bucket.name) zonegroup_data_checkpoint(zonegroup_conns) bilog_autotrim(secondary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],) @@ -1897,15 +1978,13 @@ def test_bucket_reshard_incremental(): zonegroup_conns = ZonegroupConns(zonegroup) zone = zonegroup_conns.rw_zones[0] - # create a bucket bucket = zone.create_bucket(gen_bucket_name()) log.debug('created bucket=%s', bucket.name) zonegroup_meta_checkpoint(zonegroup) # upload some objects for objname in ('a', 'b', 'c', 'd'): - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # reshard in each zone @@ -1917,8 +1996,7 @@ def test_bucket_reshard_incremental(): # upload more objects for objname in ('e', 'f', 'g', 'h'): - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @attr('bucket_reshard') @@ -1940,8 +2018,7 @@ def test_bucket_reshard_full(): try: # upload some objects for objname in ('a', 'b', 'c', 'd'): - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') # reshard on first zone zone.zone.cluster.admin(['bucket', 'reshard', @@ -1951,8 +2028,7 @@ def test_bucket_reshard_full(): # upload more objects for objname in ('e', 'f', 'g', 'h'): - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') finally: for z in zonegroup_conns.rw_zones[1:]: z.zone.start() @@ -1962,14 +2038,17 @@ def test_bucket_reshard_full(): def test_bucket_creation_time(): zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) - zonegroup_meta_checkpoint(zonegroup) - zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones] + zone_buckets = [] + for zone in zonegroup_conns.rw_zones: + response = zone.s3_client.list_buckets() + zone_buckets.append(response['Buckets']) + for z1, z2 in combinations(zone_buckets, 2): for a, b in zip(z1, z2): - eq(a.name, b.name) - eq(a.creation_date, b.creation_date) + eq(a['Name'], b['Name']) + eq(a['CreationDate'], b['CreationDate']) def get_bucket_shard_objects(zone, num_shards): """ @@ -1990,8 +2069,7 @@ def write_most_shards(zone, bucket_name, num_shards): random.shuffle(objs) del objs[-(len(objs)//10):] for obj in objs: - k = new_key(zone, bucket_name, obj) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket_name, Key=obj, Body='foo') def reshard_bucket(zone, bucket_name, num_shards): """ @@ -2118,8 +2196,7 @@ def test_zap_init_bucket_sync_run(): # Write zeroth generation for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * 11}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # Write several more generations @@ -2127,8 +2204,7 @@ def test_zap_init_bucket_sync_run(): for num_shards in generations: reshard_bucket(primary.zone, bucket.name, num_shards) for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * num_shards}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -2168,8 +2244,7 @@ def test_list_bucket_key_marker_encoding(): # test for object names with '%' character. for obj in range(1, 1100): - k = new_key(primary, bucket.name, f'obj%{obj * 11}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj%{obj * 11}', Body='foo') # wait for the secondary to catch up zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -2185,12 +2260,11 @@ def test_list_bucket_key_marker_encoding(): # write an object during incremental sync. objname = 'test_incremental' - k = new_key(primary, bucket, objname) - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # the object uploaded after bucket sync init should be replicated - check_object_exists(bucket, objname) + check_object_exists(secondary, bucket.name, objname) def test_role_sync(): @@ -2270,8 +2344,7 @@ def test_replication_status(): bucket = zone.conn.create_bucket(gen_bucket_name()) obj_name = "a" - k = new_key(zone, bucket.name, obj_name) - k.set_contents_from_string('foo') + zone.s3_client.put_object(Bucket=bucket.name, Key=obj_name, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -2301,26 +2374,24 @@ def test_object_acl(): log.debug('created bucket=%s', bucket.name) # upload a dummy object and wait for sync. - k = new_key(primary, bucket, 'dummy') - k.set_contents_from_string('foo') + objname = 'dummy' + primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_data_checkpoint(zonegroup_conns) - #check object on secondary before setacl - bucket2 = get_bucket(secondary, bucket.name) - before_set_acl = bucket2.get_acl(k) - assert(len(before_set_acl.acl.grants) == 1) + # check object on secondary before setacl + before_acl = secondary.s3_client.get_object_acl(Bucket=bucket.name, Key=objname) + assert len(before_acl['Grants']) == 1 - #set object acl on primary and wait for sync. - bucket.set_canned_acl('public-read', key_name=k) - log.debug('set acl=%s', bucket.name) + # set object acl on secondary and wait for sync + secondary.s3_client.put_object_acl(Bucket=bucket.name, Key=objname, ACL='public-read') + log.debug('set acl on bucket=%s', bucket.name) zonegroup_data_checkpoint(zonegroup_conns) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) - #check object secondary after setacl - bucket2 = get_bucket(secondary, bucket.name) - after_set_acl = bucket2.get_acl(k) - assert(len(after_set_acl.acl.grants) == 2) # read grant added on AllUsers + # check object on primary after setacl + after_acl = primary.s3_client.get_object_acl(Bucket=bucket.name, Key=objname) + assert len(after_acl['Grants']) == 2 # read grant added on AllUsers def test_assume_role_after_sync(): zonegroup = realm.master_zonegroup() @@ -2375,8 +2446,7 @@ def test_bucket_full_sync_after_data_sync_init(): # write some objects that don't sync yet for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * 11}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo') cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() cmd += ['--source-zone', primary.name] @@ -2408,16 +2478,14 @@ def test_resharded_bucket_full_sync_after_data_sync_init(): # Write zeroth generation for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * 11}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo') # Write several more generations generations = [17, 19, 23, 29, 31, 37] for num_shards in generations: reshard_bucket(primary.zone, bucket.name, num_shards) for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * num_shards}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo') cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() cmd += ['--source-zone', primary.name] @@ -2444,8 +2512,7 @@ def test_bucket_incremental_sync_after_data_sync_init(): # upload a dummy object and wait for sync. this forces each zone to finish # a full sync and switch to incremental - k = new_key(primary, bucket, 'dummy') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='foo') zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) try: @@ -2454,8 +2521,7 @@ def test_bucket_incremental_sync_after_data_sync_init(): # Write more objects to primary for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * 11}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo') cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() cmd += ['--source-zone', primary.name] @@ -2483,16 +2549,15 @@ def test_resharded_bucket_incremental_sync_latest_after_data_sync_init(): # Write zeroth generation to primary for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * 11}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo') # Write several more generations generations = [17, 19, 23, 29, 31, 37] for num_shards in generations: reshard_bucket(primary.zone, bucket.name, num_shards) for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * num_shards}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo') + # wait for the secondary to catch up to the latest gen zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -2503,8 +2568,7 @@ def test_resharded_bucket_incremental_sync_latest_after_data_sync_init(): # write some more objects to the last gen for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * generations[-1]}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * generations[-1]}', Body='foo') cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() cmd += ['--source-zone', primary.name] @@ -2532,8 +2596,7 @@ def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init(): # Write zeroth generation to primary for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * 11}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo') # wait for the secondary to catch up zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -2547,8 +2610,8 @@ def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init(): for num_shards in generations: reshard_bucket(primary.zone, bucket.name, num_shards) for obj in range(1, 6): - k = new_key(primary, bucket.name, f'obj{obj * num_shards}') - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo') + cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() cmd += ['--source-zone', primary.name] @@ -2699,30 +2762,49 @@ def create_zone_bucket(zone): return bucket def create_object(zone_conn, bucket, objname, content): - k = new_key(zone_conn, bucket.name, objname) - k.set_contents_from_string(content) + bucket_name = bucket.name if hasattr(bucket, 'name') else bucket + zone_conn.s3_client.put_object(Bucket=bucket_name, Key=objname, Body=content) def create_objects(zone_conn, bucket, obj_arr, content): + bucket_name = bucket.name if hasattr(bucket, 'name') else bucket for objname in obj_arr: - create_object(zone_conn, bucket, objname, content) + zone_conn.s3_client.put_object(Bucket=bucket_name, Key=objname, Body=content) -def check_object_exists(bucket, objname, content = None): - k = bucket.get_key(objname) - assert_not_equal(k, None) - if (content != None): - assert_equal(k.get_contents_as_string(encoding='ascii'), content) +def check_object_exists(zone_or_client, bucket_name, objname, content=None): + # handle both zone connection and direct client request + client = zone_or_client.s3_client if hasattr(zone_or_client, 's3_client') else zone_or_client -def check_objects_exist(bucket, obj_arr, content = None): + try: + response = client.get_object(Bucket=bucket_name, Key=objname) + if content is not None: + actual_content = response['Body'].read() + if isinstance(content, str): + actual_content = actual_content.decode('utf-8') + assert_equal(actual_content, content) + except ClientError as e: + if e.response['Error']['Code'] == 'NoSuchKey': + assert False, f"Object {objname} does not exist in bucket {bucket_name}" + raise + +def check_objects_exist(zone_or_client, bucket_name, obj_arr, content=None): + if isinstance(obj_arr, str): + obj_arr = [obj_arr] for objname in obj_arr: - check_object_exists(bucket, objname, content) + check_object_exists(zone_or_client, bucket_name, objname, content) -def check_object_not_exists(bucket, objname): - k = bucket.get_key(objname) - assert_equal(k, None) - -def check_objects_not_exist(bucket, obj_arr): +def check_object_not_exists(zone_or_client, bucket_name, objname): + client = zone_or_client.s3_client if hasattr(zone_or_client, 's3_client') else zone_or_client + try: + client.head_object(Bucket=bucket_name, Key=objname) + assert False, f"Object {objname} should not exist in bucket {bucket_name}" + except ClientError as e: + assert e.response['Error']['Code'] == '404' + +def check_objects_not_exist(zone_or_client, bucket_name, obj_arr): + if isinstance(obj_arr, str): + obj_arr = [obj_arr] for objname in obj_arr: - check_object_not_exists(bucket, objname) + check_object_not_exists(zone_or_client, bucket_name, objname) @attr('fails_with_rgw') @attr('sync_policy') @@ -2825,31 +2907,22 @@ def test_sync_flow_symmetrical_zonegroup_all(): zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - objnames = [ 'obj1', 'obj2' ] + objnames = ['obj1', 'obj2'] content = 'asdasd' - buckets = [] # create bucket & object in all zones bucketA = create_zone_bucket(zcA) - buckets.append(bucketA) - create_object(zcA, bucketA, objnames[0], content) + zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnames[0], Body=content) bucketB = create_zone_bucket(zcB) - buckets.append(bucketB) - create_object(zcB, bucketB, objnames[1], content) + zcB.s3_client.put_object(Bucket=bucketB.name, Key=objnames[1], Body=content) zonegroup_meta_checkpoint(zonegroup) - # 'zonegroup_data_checkpoint' currently fails for the zones not - # allowed to sync. So as a workaround, data checkpoint is done - # for only the ones configured. zone_data_checkpoint(zoneB, zoneA) - # verify if objects are synced accross the zone - bucket = get_bucket(zcB, bucketA.name) - check_object_exists(bucket, objnames[0], content) - - bucket = get_bucket(zcA, bucketB.name) - check_object_exists(bucket, objnames[1], content) + # verify objects synced + check_object_exists(zcB, bucketA.name, objnames[0], content) + check_object_exists(zcA, bucketB.name, objnames[1], content) remove_sync_policy_group(c1, "sync-group") return @@ -2891,13 +2964,13 @@ def test_sync_flow_symmetrical_zonegroup_select(): content = 'asdasd' # create bucketA & objects in zoneA - objnamesA = [ 'obj1', 'obj2', 'obj3' ] + objnamesA = ['obj1', 'obj2', 'obj3'] bucketA = create_zone_bucket(zcA) buckets.append(bucketA) create_objects(zcA, bucketA, objnamesA, content) # create bucketB & objects in zoneB - objnamesB = [ 'obj4', 'obj5', 'obj6' ] + objnamesB = ['obj4', 'obj5', 'obj6'] bucketB = create_zone_bucket(zcB) buckets.append(bucketB) create_objects(zcB, bucketB, objnamesB, content) @@ -2907,18 +2980,12 @@ def test_sync_flow_symmetrical_zonegroup_select(): zone_data_checkpoint(zoneA, zoneB) # verify if objnamesA synced to only zoneB but not zoneC - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesA, content) - - bucket = get_bucket(zcC, bucketA.name) - check_objects_not_exist(bucket, objnamesA) + check_objects_exist(zcB, bucketA.name, objnamesA, content) + check_objects_not_exist(zcC, bucketA.name, objnamesA) # verify if objnamesB synced to only zoneA but not zoneC - bucket = get_bucket(zcA, bucketB.name) - check_objects_exist(bucket, objnamesB, content) - - bucket = get_bucket(zcC, bucketB.name) - check_objects_not_exist(bucket, objnamesB) + check_objects_exist(zcA, bucketB.name, objnamesB, content) + check_objects_not_exist(zcC, bucketB.name, objnamesB) remove_sync_policy_group(c1, "sync-group") return @@ -2938,7 +3005,7 @@ def test_sync_flow_directional_zonegroup_select(): zonegroup_conns = ZonegroupConns(zonegroup) if len(zonegroup.zones) < 3: - raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") + raise SkipTest("test_sync_flow_directional_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") zonegroup_meta_checkpoint(zonegroup) @@ -2977,18 +3044,12 @@ def test_sync_flow_directional_zonegroup_select(): zone_data_checkpoint(zoneB, zoneA) # verify if objnamesA synced to only zoneB but not zoneC - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesA, content) - - bucket = get_bucket(zcC, bucketA.name) - check_objects_not_exist(bucket, objnamesA) + check_objects_exist(zcB, bucketA.name, objnamesA, content) + check_objects_not_exist(zcC, bucketA.name, objnamesA) # verify if objnamesB are not synced to either zoneA or zoneC - bucket = get_bucket(zcA, bucketB.name) - check_objects_not_exist(bucket, objnamesB) - - bucket = get_bucket(zcC, bucketB.name) - check_objects_not_exist(bucket, objnamesB) + check_objects_not_exist(zcA, bucketB.name, objnamesB) + check_objects_not_exist(zcC, bucketB.name, objnamesB) """ verify the same at bucketA level @@ -3030,17 +3091,16 @@ def test_sync_flow_directional_zonegroup_select(): zone_data_checkpoint(zoneB, zoneA) # verify that objnamesC are synced to bucketA in zoneB - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesC, content) + check_objects_exist(zcB, bucketA.name, objnamesC, content) # verify that objnamesD are not synced to bucketA in zoneA - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnamesD) + check_objects_not_exist(zcA, bucketA.name, objnamesD) remove_sync_policy_group(c1, "sync-bucket", bucketA.name) remove_sync_policy_group(c1, "sync-group") return + @attr('fails_with_rgw') @attr('sync_policy') def test_sync_single_bucket(): @@ -3103,12 +3163,10 @@ def test_sync_single_bucket(): zone_data_checkpoint(zoneB, zoneA) # verify if bucketA objects are synced - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnames, content) + check_objects_exist(zcB, bucketA.name, objnames, content) # bucketB objects should not be synced - bucket = get_bucket(zcB, bucketB.name) - check_objects_not_exist(bucket, objnames) + check_objects_not_exist(zcB, bucketB.name, objnames) """ @@ -3143,12 +3201,10 @@ def test_sync_single_bucket(): zone_data_checkpoint(zoneB, zoneA) # verify if bucketA objects are synced - bucket = get_bucket(zcB, bucketA.name) - check_object_exists(bucket, objnames[2], content) + check_object_exists(zcB, bucketA.name, objnames[2], content) # bucketB objects should not be synced - bucket = get_bucket(zcB, bucketB.name) - check_object_not_exists(bucket, objnames[2]) + check_object_not_exists(zcB, bucketB.name, objnames[2]) remove_sync_policy_group(c1, "sync-bucket", bucketA.name) remove_sync_policy_group(c1, "sync-group") @@ -3217,11 +3273,9 @@ def test_sync_different_buckets(): # verify that objects are synced to bucketB in zoneB # but not to bucketA - bucket = get_bucket(zcB, bucketA.name) - check_objects_not_exist(bucket, objnames) + check_objects_not_exist(zcB, bucketA.name, objnames) + check_objects_exist(zcB, bucketB.name, objnames, content) - bucket = get_bucket(zcB, bucketB.name) - check_objects_exist(bucket, objnames, content) """ Method (b): configure policy at only bucketA level with pipe set to bucketB in target zone @@ -3258,11 +3312,8 @@ def test_sync_different_buckets(): # verify that objects are synced to bucketB in zoneB # but not to bucketA """ - bucket = get_bucket(zcB, bucketA.name) - check_objects_not_exist(bucket, objnamesC) - - bucket = get_bucket(zcB, bucketB.name) - check_objects_exist(bucket, objnamesC, content) + check_objects_not_exist(zcB, bucketA.name, objnamesC) + check_objects_exist(zcB, bucketB.name, objnamesC, content) remove_sync_policy_group(c1, "sync-bucket", bucketA.name) zonegroup_meta_checkpoint(zonegroup) @@ -3296,9 +3347,8 @@ def test_sync_different_buckets(): # verify that objects from only bucketB are synced to # bucketA in zoneA """ - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnamesD) - check_objects_exist(bucket, objnamesE, content) + check_objects_not_exist(zcA, bucketA.name, objnamesD) + check_objects_exist(zcA, bucketA.name, objnamesE, content) remove_sync_policy_group(c1, "sync-bucket", bucketA.name) remove_sync_policy_group(c1, "sync-group") @@ -3365,13 +3415,11 @@ def test_sync_multiple_buckets_to_single(): # verify that both zoneA bucketA & bucketB objects are synced to # bucketB in zoneB but not to bucketA - bucket = get_bucket(zcB, bucketA.name) - check_objects_not_exist(bucket, objnamesA) - check_objects_not_exist(bucket, objnamesB) + check_objects_not_exist(zcB, bucketA.name, objnamesA) + check_objects_not_exist(zcB, bucketA.name, objnamesB) - bucket = get_bucket(zcB, bucketB.name) - check_objects_exist(bucket, objnamesA, content) - check_objects_exist(bucket, objnamesB, content) + check_objects_exist(zcB, bucketB.name, objnamesA, content) + check_objects_exist(zcB, bucketB.name, objnamesB, content) """ Method (b): configure at bucket level @@ -3414,13 +3462,11 @@ def test_sync_multiple_buckets_to_single(): # verify that both zoneA bucketA & bucketB objects are synced to # bucketA in zoneB but not to bucketB - bucket = get_bucket(zcB, bucketB.name) - check_objects_not_exist(bucket, objnamesC) - check_objects_not_exist(bucket, objnamesD) + check_objects_not_exist(zcB, bucketB.name, objnamesC) + check_objects_not_exist(zcB, bucketB.name, objnamesD) - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesD, content) - check_objects_exist(bucket, objnamesD, content) + check_objects_exist(zcB, bucketA.name, objnamesC, content) + check_objects_exist(zcB, bucketA.name, objnamesD, content) remove_sync_policy_group(c1, "sync-bucket", bucketA.name) remove_sync_policy_group(c1, "sync-group") @@ -3487,11 +3533,8 @@ def test_sync_single_bucket_to_multiple(): # verify that objects from zoneA bucketA are synced to both # bucketA & bucketB in zoneB - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesA, content) - - bucket = get_bucket(zcB, bucketB.name) - check_objects_exist(bucket, objnamesA, content) + check_objects_exist(zcB, bucketA.name, objnamesA, content) + check_objects_exist(zcB, bucketB.name, objnamesA, content) """ Method (b): configure at bucket level @@ -3528,11 +3571,8 @@ def test_sync_single_bucket_to_multiple(): # verify that objects from zoneA bucketA are synced to both # bucketA & bucketB in zoneB - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesB, content) - - bucket = get_bucket(zcB, bucketB.name) - check_objects_exist(bucket, objnamesB, content) + check_objects_exist(zcB, bucketA.name, objnamesB, content) + check_objects_exist(zcB, bucketB.name, objnamesB, content) remove_sync_policy_group(c1, "sync-bucket", bucketA.name) remove_sync_policy_group(c1, "sync-group") @@ -3589,7 +3629,7 @@ def test_bucket_remove_rgw_down(): assert check_all_buckets_exist(zone, buckets) for zone, bucket_name in zone_bucket: - zone.conn.delete_bucket(bucket_name) + zone.s3_client.delete_bucket(Bucket=bucket.name) zonegroup_meta_checkpoint(zonegroup) @@ -3927,8 +3967,7 @@ def test_copy_object_same_bucket(): objname = 'dummy' # upload a dummy object and wait for sync. - k = new_key(primary, bucket, objname) - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_data_checkpoint(zonegroup_conns) @@ -3937,16 +3976,20 @@ def test_copy_object_same_bucket(): zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # copy object on primary zone - primary.s3_client.copy_object(Bucket=bucket.name, - CopySource=bucket.name + '/'+ objname, - Key= objname + '-copy1') + primary.s3_client.copy_object( + Bucket=bucket.name, + CopySource={'Bucket': bucket.name, 'Key': objname}, + Key=objname + '-copy1' + ) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) # copy object on secondary zone - secondary.s3_client.copy_object(Bucket=bucket.name, - Key= objname + '-copy2', - CopySource=bucket.name + '/'+ objname) + secondary.s3_client.copy_object( + Bucket=bucket.name, + Key=objname + '-copy2', + CopySource={'Bucket': bucket.name, 'Key': objname} + ) zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) @@ -3963,8 +4006,7 @@ def test_copy_object_different_bucket(): objname = 'dummy' # upload a dummy object and wait for sync. - k = new_key(primary, source_bucket, objname) - k.set_contents_from_string('foo') + primary.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zonegroup_meta_checkpoint(zonegroup) zonegroup_bucket_checkpoint(zonegroup_conns, source_bucket.name) @@ -3976,9 +4018,11 @@ def test_copy_object_different_bucket(): zonegroup_meta_checkpoint(zonegroup) # copy object on primary zone - primary.s3_client.copy_object(Bucket = dest_bucket.name, - Key = objname + '-copy', - CopySource = source_bucket.name + '/' + objname) + primary.s3_client.copy_object( + Bucket=dest_bucket.name, + Key=objname + '-copy', + CopySource={'Bucket': source_bucket.name, 'Key': objname} + ) zonegroup_bucket_checkpoint(zonegroup_conns, dest_bucket.name) @@ -4108,14 +4152,13 @@ def test_bucket_replication_normal_delete(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') time.sleep(config.checkpoint_delay) zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') # delete object on source source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname) @@ -4167,13 +4210,12 @@ def test_bucket_replication_normal_deletemarker(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') # delete object on source source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname) @@ -4215,8 +4257,7 @@ def test_bucket_replication_alt_user_forbidden(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket @@ -4379,8 +4420,7 @@ def test_bucket_replication_non_versioned_to_versioned(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object not exists in destination bucket @@ -4427,8 +4467,7 @@ def test_bucket_replication_versioned_to_non_versioned(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object not exists in destination bucket @@ -4491,8 +4530,7 @@ def test_bucket_replication_lock_enabled_to_lock_disabled(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket_name, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket_name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket @@ -4555,8 +4593,7 @@ def test_bucket_replication_lock_disabled_to_lock_enabled(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket.name, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket @@ -4566,10 +4603,8 @@ def test_bucket_replication_lock_disabled_to_lock_enabled(): @attr('sync_policy') @attr('fails_with_rgw') def test_bucket_delete_with_zonegroup_sync_policy_directional(): - zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) - zonegroup_meta_checkpoint(zonegroup) (zoneA, zoneB) = zonegroup.zones[0:2] @@ -4577,7 +4612,6 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional(): c1 = zoneA.cluster - # configure sync policy zones = zoneA.name + ',' + zoneB.name c1.admin(['sync', 'policy', 'get']) create_sync_policy_group(c1, "sync-group") @@ -4588,74 +4622,58 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional(): zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - # configure sync policy for only bucketA and enable it bucketA = create_zone_bucket(zcA) - buckets = [] - buckets.append(bucketA) time.sleep(config.checkpoint_delay) zonegroup_meta_checkpoint(zonegroup) - # create bucketA and objects in zoneA and zoneB objnameA = 'a' objnameB = 'b' - # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo') + zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo') zonegroup_meta_checkpoint(zonegroup) - zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zone_bucket_checkpoint(zoneB, zoneA, bucketA.name) zone_data_checkpoint(zoneB, zoneA) - # verify that objnameA is synced to bucketA in zoneB - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnameA) - - # verify that objnameB is not synced to bucketA in zoneA - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnameB) + # verify sync + check_object_exists(zcB, bucketA.name, objnameA, 'foo') + check_object_not_exists(zcA, bucketA.name, objnameB) log.debug('deleting object on zone A') - k = get_key(zcA, bucket, objnameA) - k.delete() + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA) + zone_bucket_checkpoint(zoneB, zoneA, bucketA.name) - zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) - - # delete bucket on zoneA. it should fail to delete + # delete bucket - should fail log.debug('deleting bucket') - assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) + assert e.response['Error']['Code'] == 'BucketNotEmpty' - assert check_all_buckets_exist(zcA, buckets) - assert check_all_buckets_exist(zcB, buckets) + assert check_all_buckets_exist(zcA, [bucketA.name]) + assert check_all_buckets_exist(zcB, [bucketA.name]) - # retry deleting bucket after removing the object from zone B. should succeed + # retry after deleting object from zone B log.debug('deleting object on zone B') - k = get_key(zcB, bucket, objnameB) - k.delete() + zcB.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB) time.sleep(config.checkpoint_delay) log.debug('retry deleting bucket') - zcA.delete_bucket(bucketA.name) + zcA.s3_client.delete_bucket(Bucket=bucketA.name) zonegroup_meta_checkpoint(zonegroup) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) + assert check_all_buckets_dont_exist(zcA, [bucketA.name]) + assert check_all_buckets_dont_exist(zcB, [bucketA.name]) remove_sync_policy_group(c1, "sync-group") - return @attr('sync_policy') @attr('fails_with_rgw') def test_bucket_delete_with_bucket_sync_policy_directional(): - zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) - zonegroup_meta_checkpoint(zonegroup) (zoneA, zoneB) = zonegroup.zones[0:2] @@ -4663,7 +4681,6 @@ def test_bucket_delete_with_bucket_sync_policy_directional(): c1 = zoneA.cluster - # configure sync policy zones = zoneA.name + ',' + zoneB.name c1.admin(['sync', 'policy', 'get']) create_sync_policy_group(c1, "sync-group") @@ -4674,20 +4691,9 @@ def test_bucket_delete_with_bucket_sync_policy_directional(): zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - """ - configure policy at bucketA level with src and dest - zones specified to zoneA and zoneB resp. - - verify zoneA bucketA syncs to zoneB BucketA but not viceversa. - """ - - # configure sync policy for only bucketA and enable it bucketA = create_zone_bucket(zcA) - buckets = [] - buckets.append(bucketA) create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name) - #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name) create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name) set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) @@ -4696,56 +4702,42 @@ def test_bucket_delete_with_bucket_sync_policy_directional(): zonegroup_meta_checkpoint(zonegroup) - # create bucketA and objects in zoneA and zoneB objnameA = 'a' objnameB = 'b' - # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo') + zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo') zonegroup_meta_checkpoint(zonegroup) zone_data_checkpoint(zoneB, zoneA) - # verify that objnameA is synced to bucketA in zoneB - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnameA) - - # verify that objnameB is not synced to bucketA in zoneA - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnameB) + check_object_exists(zcB, bucketA.name, objnameA, 'foo') + check_object_not_exists(zcA, bucketA.name, objnameB) log.debug('deleting object on zone A') - k = get_key(zcA, bucket, objnameA) - k.delete() - - zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA) + zone_bucket_checkpoint(zoneB, zoneA, bucketA.name) - # delete bucket on zoneA. it should fail to delete log.debug('deleting bucket') - assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) + assert e.response['Error']['Code'] == 'BucketNotEmpty' - assert check_all_buckets_exist(zcA, buckets) - assert check_all_buckets_exist(zcB, buckets) + assert check_all_buckets_exist(zcA, [bucketA.name]) + assert check_all_buckets_exist(zcB, [bucketA.name]) log.debug('deleting object on zone B') - k = get_key(zcB, bucket, objnameB) - k.delete() + zcB.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB) time.sleep(config.checkpoint_delay) - # retry deleting bucket after removing the object from zone B. should succeed log.debug('retry deleting bucket') - zcA.delete_bucket(bucketA.name) + zcA.s3_client.delete_bucket(Bucket=bucketA.name) zonegroup_meta_checkpoint(zonegroup) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) + assert check_all_buckets_dont_exist(zcA, [bucketA.name]) + assert check_all_buckets_dont_exist(zcB, [bucketA.name]) remove_sync_policy_group(c1, "sync-group") - return @attr('sync_policy') @@ -4796,33 +4788,28 @@ def test_bucket_delete_with_bucket_sync_policy_symmetric(): objnameB = 'b' # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo') + zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo') zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) zone_data_checkpoint(zoneB, zoneA) log.debug('deleting object A') - k = get_key(zcA, bucketA, objnameA) - k.delete() + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA) log.debug('deleting object B') - k = get_key(zcA, bucketA, objnameB) - k.delete() + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB) zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) zone_data_checkpoint(zoneB, zoneA) # delete bucket on zoneA. log.debug('deleting bucket') - zcA.delete_bucket(bucketA.name) + zcA.s3_client.delete_bucket(Bucket=bucketA.name) zonegroup_meta_checkpoint(zonegroup) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) - + assert check_all_buckets_dont_exist(zcA, [bucketA.name]) + assert check_all_buckets_dont_exist(zcB, [bucketA.name]) remove_sync_policy_group(c1, "sync-group") return @@ -4864,32 +4851,28 @@ def test_bucket_delete_with_zonegroup_sync_policy_symmetric(): objnameB = 'b' # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo') + zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo') zone_data_checkpoint(zoneB, zoneA) zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) log.debug('deleting object A') - k = get_key(zcA, bucketA, objnameA) - k.delete() + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA) log.debug('deleting object B') - k = get_key(zcA, bucketA, objnameB) - k.delete() + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB) zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) zone_data_checkpoint(zoneB, zoneA) # delete bucket on zoneA. log.debug('deleting bucket') - zcA.delete_bucket(bucketA.name) + zcA.s3_client.delete_bucket(Bucket=bucketA.name) zonegroup_meta_checkpoint(zonegroup) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) + assert check_all_buckets_dont_exist(zcA, [bucketA.name]) + assert check_all_buckets_dont_exist(zcB, [bucketA.name]) remove_sync_flow(c1, "sync-group", "sync-flow", "symmetrical") remove_sync_policy_group(c1, "sync-group") @@ -4941,49 +4924,40 @@ def test_delete_bucket_with_zone_opt_out(): objnameB = 'b' # upload object in zone A and zone C - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcC, bucketA, objnameB) - k.set_contents_from_string('foo') + zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo') + zcC.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo') zonegroup_meta_checkpoint(zonegroup) zone_data_checkpoint(zoneB, zoneA) # verify that objnameA is synced to zoneB but not zoneC - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnameA) - - bucket = get_bucket(zcC, bucketA.name) - check_objects_not_exist(bucket, objnameA) + check_object_exists(zcB, bucketA.name, objnameA, 'foo') + check_object_not_exists(zcC, bucketA.name, objnameA) # verify that objnameB is not synced to either zoneA or zoneB - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnameB) - - bucket = get_bucket(zcB, bucketA.name) - check_objects_not_exist(bucket, objnameB) + check_object_not_exists(zcA, bucketA.name, objnameB) + check_object_not_exists(zcB, bucketA.name, objnameB) log.debug('deleting object on zone A') - k = get_key(zcA, bucket, objnameA) - k.delete() + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA) zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) # delete bucket on zoneA. it should fail to delete because zoneC still has objnameB log.debug('deleting bucket') - assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) + assert e.response['Error']['Code'] == 'BucketNotEmpty' assert check_all_buckets_exist(zcA, buckets) assert check_all_buckets_exist(zcC, buckets) # retry deleting bucket after removing the object from zone C. should succeed log.debug('deleting object on zone C') - k = get_key(zcC, bucket, objnameB) - k.delete() + zcC.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB) time.sleep(config.checkpoint_delay) log.debug('retry deleting bucket') - zcA.delete_bucket(bucketA.name) + zcA.s3_client.delete_bucket(Bucket=bucketA.name) zonegroup_meta_checkpoint(zonegroup) @@ -4991,7 +4965,7 @@ def test_delete_bucket_with_zone_opt_out(): assert check_all_buckets_dont_exist(zcC, buckets) remove_sync_policy_group(c1, "sync-group") - + return @attr('sync_policy') @@ -5043,32 +5017,28 @@ def test_bucket_delete_with_sync_policy_object_prefix(): objnameB = 'b' # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo') + zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo') zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) zone_data_checkpoint(zoneB, zoneA) # verify that objnameA is synced to zoneB - bucket = get_bucket(zcB, bucketA.name) - check_object_exists(bucket, objnameA) + check_object_exists(zcB, bucketA.name, objnameA, 'foo') # verify that objnameB is not synced to zoneA - bucket = get_bucket(zcA, bucketA.name) - check_object_not_exists(bucket, objnameB) + check_object_not_exists(zcA, bucketA.name, objnameB) log.debug('deleting object A') - k = get_key(zcA, bucketA, objnameA) - k.delete() + zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA) zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) zone_data_checkpoint(zoneB, zoneA) # delete bucket on zoneA. it should fail to delete because zoneB still has objnameB log.debug('deleting bucket') - assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name) + assert e.response['Error']['Code'] == 'BucketNotEmpty' assert check_all_buckets_exist(zcA, buckets) assert check_all_buckets_exist(zcB, buckets) @@ -5097,16 +5067,20 @@ def test_copy_obj_between_zonegroups(zonegroup): dest_bucket = dest_zone.create_bucket(gen_bucket_name()) realm_meta_checkpoint(realm) - # copy object - dest_zone.s3_client.copy_object( - Bucket=dest_bucket.name, - CopySource=f'{source_bucket.name}/{objname}', - Key=objname - ) + obj_sizes = [4096, 8 * 1024 * 1024] + for size in obj_sizes: + source_zone.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='x' * size) - # check that object exists in destination bucket - k = get_key(dest_zone, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + # copy object + dest_zone.s3_client.copy_object( + Bucket=dest_bucket.name, + CopySource=f'{source_bucket.name}/{objname}', + Key=objname + ) + + # verify + response = dest_zone.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'x' * size) @allow_bucket_replication def test_bucket_replication_alt_user_delete_forbidden(): @@ -5155,13 +5129,12 @@ def test_bucket_replication_alt_user_delete_forbidden(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') # delete object on source source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname) @@ -5169,8 +5142,8 @@ def test_bucket_replication_alt_user_delete_forbidden(): zone_data_checkpoint(dest.zone, source.zone) # check that object does exist in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') @allow_bucket_replication def test_bucket_replication_alt_user_delete(): @@ -5219,13 +5192,12 @@ def test_bucket_replication_alt_user_delete(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') # delete object on source source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname) @@ -5293,13 +5265,12 @@ def test_bucket_replication_alt_user_deletemarker_forbidden(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') # delete object on source source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname) @@ -5307,8 +5278,8 @@ def test_bucket_replication_alt_user_deletemarker_forbidden(): zone_data_checkpoint(dest.zone, source.zone) # check that object does exist in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') @allow_bucket_replication def test_bucket_replication_alt_user_deletemarker(): @@ -5367,13 +5338,12 @@ def test_bucket_replication_alt_user_deletemarker(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket - k = get_key(dest, dest_bucket, objname) - assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname) + assert_equal(response['Body'].read().decode('utf-8'), 'foo') # delete object on source source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname) @@ -5491,8 +5461,7 @@ def test_bucket_replication_source_forbidden(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket @@ -5556,8 +5525,7 @@ def test_bucket_replication_source_forbidden_versioned(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket @@ -5616,8 +5584,7 @@ def test_bucket_replication_source_allow_either_getobject_or_getobjectversionfor # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket @@ -5673,8 +5640,7 @@ def test_bucket_replication_source_allow_either_getobjectversion_or_getobjectver # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object exists in destination bucket @@ -5726,8 +5692,7 @@ def test_bucket_replication_source_forbidden_objretention(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket_name, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket @@ -5784,8 +5749,7 @@ def test_bucket_replication_source_forbidden_legalhold(): # upload an object and wait for sync. objname = 'dummy' - k = new_key(source, source_bucket_name, objname) - k.set_contents_from_string('foo') + source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') zone_data_checkpoint(dest.zone, source.zone) # check that object does not exist in destination bucket @@ -5912,8 +5876,7 @@ def test_copy_obj_perm_check_between_zonegroups(zonegroup): source_bucket = source_zone.create_bucket(gen_bucket_name()) objname = 'dummy' - k = new_key(source_zone, source_bucket.name, objname) - k.set_contents_from_string('foo') + source_zone.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo') for zg in realm.current_period.zonegroups: if zg.name == zonegroup.name: diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py index 50fa15cf1fa..c2830c96a6f 100644 --- a/src/test/rgw/rgw_multi/zone_rados.py +++ b/src/test/rgw/rgw_multi/zone_rados.py @@ -1,4 +1,5 @@ import logging +import boto3 from botocore.exceptions import ClientError from itertools import zip_longest # type: ignore @@ -67,16 +68,6 @@ class RadosZone(Zone): def __init__(self, zone, credentials): super(RadosZone.Conn, self).__init__(zone, credentials) - region = "" if zone.zonegroup is None else zone.zonegroup.name - self.s3_resource = boto3.resource( - 's3', - endpoint_url='http://' + zone.gateways[0].host + ':' + str(zone.gateways[0].port), - aws_access_key_id=credentials.access_key, - aws_secret_access_key=credentials.secret, - region_name=region, - verify=False - ) - def get_bucket(self, name): try: bucket = self.s3_resource.Bucket(name) @@ -92,7 +83,7 @@ class RadosZone(Zone): bucket = self.s3_resource.create_bucket(Bucket=name) return bucket except ClientError as e: - if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou': + if e.response['Error']['Code'] == '409': return self.s3_resource.Bucket(name) raise @@ -194,7 +185,7 @@ class RadosZone(Zone): def has_role(self, role_name): try: self.get_role(role_name) - except BotoServerError: + except ClientError: return False return True