from itertools import zip_longest
from io import StringIO
-import boto
-import boto.s3.connection
-from boto.s3.website import WebsiteConfiguration
-from boto.s3.cors import CORSConfiguration
+import boto3
from botocore.exceptions import ClientError
from nose.tools import eq_ as eq
self.master_zone = None
for z in zonegroup.zones:
+ log.debug('=== Debug get_conn for zone=%s ===', z.name)
zone_conn = z.get_conn(user.credentials)
non_account_zone_conn = z.get_conn(non_account_user.credentials)
non_account_alt_zone_conn = z.get_conn(non_account_alt_user.credentials)
def test_bucket_create():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
+ zonegroup_meta_checkpoint(zonegroup)
+
buckets, _ = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
tenant = 'testx'
uid = 'test'
- tenant_secondary_conn = boto.s3.connection.S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False,
- port=secondary.zone.gateways[0].port,
- host=secondary.zone.gateways[0].host,
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ # create s3 clients for tenant user
+ tenant_secondary_conn = boto3.client(
+ 's3',
+ endpoint_url=f'http://{secondary.zone.gateways[0].host}:{secondary.zone.gateways[0].port}',
+ aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ region_name='default'
+ )
- tenant_primary_conn = boto.s3.connection.S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False,
- port=primary.zone.gateways[0].port,
- host=primary.zone.gateways[0].host,
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ tenant_primary_conn = boto3.client(
+ 's3',
+ endpoint_url=f'http://{primary.zone.gateways[0].host}:{primary.zone.gateways[0].port}',
+ aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ region_name='default'
+ )
cmd = ['user', 'create', '--tenant', tenant, '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', 'tenanted-user']
primary.zone.cluster.admin(cmd)
zonegroup_meta_checkpoint(zonegroup)
try:
- bucket = tenant_secondary_conn.create_bucket('tenanted-bucket')
+ bucket_name = 'tenanted-bucket'
+ tenant_secondary_conn.create_bucket(Bucket=bucket_name)
zonegroup_meta_checkpoint(zonegroup)
- assert tenant_primary_conn.get_bucket(bucket.name)
+ tenant_primary_conn.head_bucket(Bucket=bucket_name)
log.info("bucket exists in tenant namespace")
- e = assert_raises(boto.exception.S3ResponseError, primary.get_bucket, bucket.name)
- assert e.error_code == 'NoSuchBucket'
+
+ e = assert_raises(ClientError, primary.s3_client.head_bucket, Bucket=bucket_name)
+ assert e.response['Error']['Code'] == 'NoSuchBucket'
log.info("bucket does not exist in default user namespace")
finally:
cmd = ['user', 'rm', '--tenant', tenant, '--uid', uid, '--purge-data']
for zone in zonegroup_conns.zones:
assert check_all_buckets_exist(zone, buckets)
- for zone, bucket_name in zone_buckets:
- zone.conn.delete_bucket(bucket_name)
+ for zone, bucket in zone_buckets:
+ zone.s3_client.delete_bucket(Bucket=bucket.name)
zonegroup_meta_checkpoint(zonegroup)
for zone in zonegroup_conns.zones:
assert check_all_buckets_dont_exist(zone, buckets)
-def get_bucket(zone, bucket_name):
- return zone.conn.get_bucket(bucket_name)
-
-def get_key(zone, bucket_name, obj_name):
- b = get_bucket(zone, bucket_name)
- return b.get_key(obj_name)
-
-def new_key(zone, bucket_name, obj_name):
- b = get_bucket(zone, bucket_name)
- return b.new_key(obj_name)
-
def check_bucket_eq(zone_conn1, zone_conn2, bucket):
if zone_conn2.zone.has_buckets():
zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
- objnames = [ 'myobj', '_myobj', ':', '&', '.', '..', '...', '.o', '.o.']
-
+ objnames = ['myobj', '_myobj', ':', '&', '.', '..', '...', '.o', '.o.']
content = 'asdasd'
- # don't wait for meta sync just yet
- for zone, bucket_name in zone_bucket:
+ # upload objects
+ for zone, bucket in zone_bucket:
for objname in objnames:
- k = new_key(zone, bucket_name, objname)
- k.set_contents_from_string(content)
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
zonegroup_meta_checkpoint(zonegroup)
objname = 'myobj'
content = 'asdasd'
- # don't wait for meta sync just yet
+ # upload objects
for zone, bucket in zone_bucket:
- k = new_key(zone, bucket, objname)
- k.set_contents_from_string(content)
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
zonegroup_meta_checkpoint(zonegroup)
zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
check_bucket_eq(source_conn, target_conn, bucket)
- # check object removal
+ # delete objects
for source_conn, bucket in zone_bucket:
- k = get_key(source_conn, bucket, objname)
- k.delete()
+ source_conn.s3_client.delete_object(Bucket=bucket.name, Key=objname)
+
for target_conn in zonegroup_conns.zones:
if source_conn.zone == target_conn.zone:
continue
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
- objnames = [f'obj{i}' for i in range(1,50)]
+ objnames = [f'obj{i}' for i in range(1, 50)]
content = 'asdasd'
- # don't wait for meta sync just yet
+ # upload objects
for zone, bucket in zone_bucket:
- create_objects(zone, bucket, objnames, content)
+ for objname in objnames:
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
zonegroup_meta_checkpoint(zonegroup)
zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
check_bucket_eq(source_conn, target_conn, bucket)
- # check object removal
+ # delete objects
for source_conn, bucket in zone_bucket:
- bucket.delete_keys(objnames)
+ objects = [{'Key': obj} for obj in objnames]
+ source_conn.s3_client.delete_objects(
+ Bucket=bucket.name,
+ Delete={'Objects': objects}
+ )
+
for target_conn in zonegroup_conns.zones:
if source_conn.zone == target_conn.zone:
continue
zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
check_bucket_eq(source_conn, target_conn, bucket)
-def get_latest_object_version(key):
- for k in key.bucket.list_versions(key.name):
- if k.is_latest:
- return k
- return None
-
def test_versioned_object_incremental_sync():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
# enable versioning
- for _, bucket in zone_bucket:
- bucket.configure_versioning(True)
+ for zone, bucket in zone_bucket:
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
zonegroup_meta_checkpoint(zonegroup)
# upload a dummy object to each bucket and wait for sync. this forces each
# bucket to finish a full sync and switch to incremental
for source_conn, bucket in zone_bucket:
- new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
+ source_conn.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='')
for target_conn in zonegroup_conns.zones:
if source_conn.zone == target_conn.zone:
continue
# create and delete multiple versions of an object from each zone
for zone_conn in zonegroup_conns.rw_zones:
obj = 'obj-' + zone_conn.name
- k = new_key(zone_conn, bucket, obj)
- k.set_contents_from_string('version1')
- log.debug('version1 id=%s', k.version_id)
+ resp1 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version1')
+ version_id_1 = resp1['VersionId']
+ log.debug('version1 id=%s', version_id_1)
# don't delete version1 - this tests that the initial version
# doesn't get squashed into later versions
# create and delete the following object versions to test that
# the operations don't race with each other during sync
- k.set_contents_from_string('version2')
- log.debug('version2 id=%s', k.version_id)
- k.bucket.delete_key(obj, version_id=k.version_id)
-
- k.set_contents_from_string('version3')
- log.debug('version3 id=%s', k.version_id)
- k.bucket.delete_key(obj, version_id=k.version_id)
+ resp2 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version2')
+ version_id_2 = resp2['VersionId']
+ log.debug('version2 id=%s', version_id_2)
+ zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_2)
+
+ # Create and delete version3
+ resp3 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version3')
+ version_id_3 = resp3['VersionId']
+ log.debug('version3 id=%s', version_id_3)
+ zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_3)
for _, bucket in zone_bucket:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ # update ACLs to test metadata-only entries
for _, bucket in zone_bucket:
- # overwrite the acls to test that metadata-only entries are applied
for zone_conn in zonegroup_conns.rw_zones:
obj = 'obj-' + zone_conn.name
- k = new_key(zone_conn, bucket.name, obj)
- v = get_latest_object_version(k)
- v.make_public()
+
+ # get latest version
+ response = zone_conn.s3_client.list_object_versions(
+ Bucket=bucket.name,
+ Prefix=obj,
+ MaxKeys=1
+ )
+ if response.get('Versions'):
+ latest = response['Versions'][0]
+ zone_conn.s3_client.put_object_acl(
+ Bucket=bucket.name,
+ Key=obj,
+ VersionId=latest['VersionId'],
+ ACL='public-read'
+ )
for _, bucket in zone_bucket:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
obj = 'obj'
# upload an initial object
- key1 = new_key(zone, bucket, obj)
- key1.set_contents_from_string('')
- log.debug('created initial version id=%s', key1.version_id)
+ resp1 = zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+ version_id_1 = resp1.get('VersionId', 'null')
+ log.debug('created initial version id=%s', version_id_1)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# enable versioning
- bucket.configure_versioning(True)
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
zonegroup_meta_checkpoint(zonegroup)
# re-upload the object as a new version
- key2 = new_key(zone, bucket, obj)
- key2.set_contents_from_string('')
- log.debug('created new version id=%s', key2.version_id)
+ resp2 = zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+ version_id_2 = resp2['VersionId']
+ log.debug('created new version id=%s', version_id_2)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
- bucket.delete_key(obj, version_id='null')
+ # delete null version
+ zone.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId='null')
- bucket.delete_key(obj, version_id=key2.version_id)
+ # delete version 2
+ zone.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_2)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# create a versioned bucket
bucket = zone.create_bucket(gen_bucket_name())
log.debug('created bucket=%s', bucket.name)
- bucket.configure_versioning(True)
+
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
zonegroup_meta_checkpoint(zonegroup)
- # upload a dummy object and wait for sync. this forces each zone to finish
- # a full sync and switch to incremental
- new_key(zone, bucket, 'dummy').set_contents_from_string('')
+ # upload a dummy object and wait for sync
+ zone.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
- # create several concurrent versions on each zone and let them race to sync
+ # create several concurrent versions on each zone
obj = 'obj'
for i in range(10):
for zone_conn in zonegroup_conns.rw_zones:
- k = new_key(zone_conn, bucket, obj)
- k.set_contents_from_string('version1')
- log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id)
+ resp = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version1')
+ log.debug('zone=%s version=%s', zone_conn.zone.name, resp['VersionId'])
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
zonegroup_data_checkpoint(zonegroup_conns)
def test_version_suspended_incremental_sync():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
-
zone = zonegroup_conns.rw_zones[0]
# create a non-versioned bucket
zonegroup_meta_checkpoint(zonegroup)
# upload an initial object
- key1 = new_key(zone, bucket, 'obj')
- key1.set_contents_from_string('')
- log.debug('created initial version id=%s', key1.version_id)
+ resp1 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
+ log.debug('created initial version id=%s', resp1.get('VersionId', 'null'))
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# enable versioning
- bucket.configure_versioning(True)
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
zonegroup_meta_checkpoint(zonegroup)
# re-upload the object as a new version
- key2 = new_key(zone, bucket, 'obj')
- key2.set_contents_from_string('')
- log.debug('created new version id=%s', key2.version_id)
+ resp2 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
+ log.debug('created new version id=%s', resp2['VersionId'])
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# suspend versioning
- bucket.configure_versioning(False)
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Suspended'}
+ )
zonegroup_meta_checkpoint(zonegroup)
# re-upload the object as a 'null' version
- key3 = new_key(zone, bucket, 'obj')
- key3.set_contents_from_string('')
- log.debug('created null version id=%s', key3.version_id)
+ resp3 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
+ log.debug('created null version id=%s', resp3.get('VersionId', 'null'))
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
def test_delete_marker_full_sync():
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
# enable versioning
- for _, bucket in zone_bucket:
- bucket.configure_versioning(True)
+ for zone, bucket in zone_bucket:
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
zonegroup_meta_checkpoint(zonegroup)
for zone, bucket in zone_bucket:
# upload an initial object
- key1 = new_key(zone, bucket, 'obj')
- key1.set_contents_from_string('')
+ zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
- # create a delete marker
- key2 = new_key(zone, bucket, 'obj')
- key2.delete()
- key2.delete()
- key2.delete()
+ # create delete markers
+ zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+ zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+ zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
# wait for full sync
for _, bucket in zone_bucket:
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
# enable/suspend versioning
- for _, bucket in zone_bucket:
- bucket.configure_versioning(True)
- bucket.configure_versioning(False)
+ for zone, bucket in zone_bucket:
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Suspended'}
+ )
zonegroup_meta_checkpoint(zonegroup)
for zone, bucket in zone_bucket:
# upload an initial object
- key1 = new_key(zone, bucket, 'obj')
- key1.set_contents_from_string('')
+ zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
- # create a delete marker
- key2 = new_key(zone, bucket, 'obj')
- key2.delete()
- key2.delete()
- key2.delete()
+ # create delete markers
+ zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+ zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+ zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
# wait for full sync
for _, bucket in zone_bucket:
# create a versioned bucket
bucket = zone.create_bucket(gen_bucket_name())
log.debug('created bucket=%s', bucket.name)
- bucket.configure_versioning(True)
+
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
zonegroup_meta_checkpoint(zonegroup)
obj = 'obj'
- # upload a dummy object and wait for sync. this forces each zone to finish
- # a full sync and switch to incremental
- new_key(zone, bucket, obj).set_contents_from_string('')
+ # upload a dummy object and wait for sync
+ zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
- # create several concurrent delete markers on each zone and let them race to sync
+ # create several concurrent delete markers on each zone
for i in range(2):
for zone_conn in zonegroup_conns.rw_zones:
- key = new_key(zone_conn, bucket, obj)
- key.delete()
+ zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_suspended_delete_marker_incremental_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a versioned bucket
+ bucket = zone.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Suspended'}
+ )
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ obj = 'obj'
+
+ # upload a dummy object and wait for sync
+ zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # create a delete marker on source zone
+ zone.s3_client.delete_object(Bucket=bucket.name, Key=obj)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
def test_bucket_versioning():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
- for _, bucket in zone_bucket:
- bucket.configure_versioning(True)
- res = bucket.get_versioning_status()
- key = 'Versioning'
- assert(key in res and res[key] == 'Enabled')
+ for zone, bucket in zone_bucket:
+ zone.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ response = zone.s3_client.get_bucket_versioning(Bucket=bucket.name)
+ assert response.get('Status') == 'Enabled'
def test_bucket_acl():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
- for _, bucket in zone_bucket:
- assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
- bucket.set_acl('public-read')
- assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
+ for zone, bucket in zone_bucket:
+ acl = zone.s3_client.get_bucket_acl(Bucket=bucket.name)
+ assert len(acl['Grants']) == 1 # single grant on owner
+
+ zone.s3_client.put_bucket_acl(Bucket=bucket.name, ACL='public-read')
+
+ acl = zone.s3_client.get_bucket_acl(Bucket=bucket.name)
+ assert len(acl['Grants']) == 2 # new grant on AllUsers
def test_bucket_cors():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
- for _, bucket in zone_bucket:
- cors_cfg = CORSConfiguration()
- cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
- bucket.set_cors(cors_cfg)
- assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
+ for zone, bucket in zone_bucket:
+ cors_cfg = {
+ 'CORSRules': [{
+ 'AllowedMethods': ['DELETE'],
+ 'AllowedOrigins': ['https://www.example.com'],
+ 'AllowedHeaders': ['*'],
+ 'MaxAgeSeconds': 3000
+ }]
+ }
+ zone.s3_client.put_bucket_cors(Bucket=bucket.name, CORSConfiguration=cors_cfg)
+
+ response = zone.s3_client.get_bucket_cors(Bucket=bucket.name)
+ assert response['CORSRules'] == cors_cfg['CORSRules']
def test_bucket_delete_notempty():
zonegroup = realm.master_zonegroup()
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
- for zone_conn, bucket_name in zone_bucket:
+ for zone_conn, bucket in zone_bucket:
# upload an object to each bucket on its own zone
- conn = zone_conn.get_connection()
- bucket = conn.get_bucket(bucket_name)
- k = bucket.new_key('foo')
- k.set_contents_from_string('bar')
+ zone_conn.s3_client.put_object(Bucket=bucket.name, Key='foo', Body='bar')
+
# attempt to delete the bucket before this object can sync
try:
- conn.delete_bucket(bucket_name)
- except boto.exception.S3ResponseError as e:
- assert(e.error_code == 'BucketNotEmpty')
+ zone_conn.s3_client.delete_bucket(Bucket=bucket.name)
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
continue
- assert False # expected 409 BucketNotEmpty
+ assert False # expected BucketNotEmpty
# assert that each bucket still exists on the master
- c1 = zonegroup_conns.master_zone.conn
- for _, bucket_name in zone_bucket:
- assert c1.get_bucket(bucket_name)
+ for _, bucket in zone_bucket:
+ zonegroup_conns.master_zone.s3_client.head_bucket(Bucket=bucket.name)
def test_multi_period_incremental_sync():
zonegroup = realm.master_zonegroup()
continue
bucket_name = gen_bucket_name()
log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
- bucket = zone_conn.conn.create_bucket(bucket_name)
+ zone_conn.s3_client.create_bucket(Bucket=bucket_name)
buckets.append(bucket_name)
# wait for zone 1 to sync
set_master_zone(z1)
mdlog_periods += [realm.current_period.id]
- for zone_conn, bucket_name in zone_bucket:
+ for zone_conn, _ in zone_bucket:
if zone_conn.zone == z3:
continue
bucket_name = gen_bucket_name()
log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
- zone_conn.conn.create_bucket(bucket_name)
+ zone_conn.s3_client.create_bucket(Bucket=bucket_name)
buckets.append(bucket_name)
# restart zone 3 gateway and wait for sync
for target_conn in zonegroup_conns.zones:
if source_conn.zone == target_conn.zone:
continue
-
if target_conn.zone.has_buckets():
target_conn.check_bucket_eq(source_conn, bucket_name)
# upload an object to each zone to generate a datalog entry
for zone, bucket in zone_bucket:
- k = new_key(zone, bucket.name, 'key')
- k.set_contents_from_string('body')
+ zone.s3_client.put_object(Bucket=bucket.name, Key='key', Body='body')
# wait for metadata and data sync to catch up
zonegroup_meta_checkpoint(zonegroup)
def test_multi_zone_redirect():
zonegroup = realm.master_zonegroup()
if len(zonegroup.rw_zones) < 2:
- raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
+ raise SkipTest("test_multi_zone_redirect skipped. Requires 2 or more zones in master zonegroup.")
zonegroup_conns = ZonegroupConns(zonegroup)
(zc1, zc2) = zonegroup_conns.rw_zones[0:2]
-
z1, z2 = (zc1.zone, zc2.zone)
set_sync_from_all(z2, False)
- # create a bucket on the first zone
bucket_name = gen_bucket_name()
log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
- bucket = zc1.conn.create_bucket(bucket_name)
+ zc1.s3_client.create_bucket(Bucket=bucket_name)
+
obj = 'testredirect'
-
- key = bucket.new_key(obj)
- data = 'A'*512
- key.set_contents_from_string(data)
+ data = 'A' * 512
+ zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data)
zonegroup_meta_checkpoint(zonegroup)
# try to read object from second zone (should fail)
- bucket2 = get_bucket(zc2, bucket_name)
- assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
+ e = assert_raises(ClientError, zc2.s3_client.get_object, Bucket=bucket_name, Key=obj)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
set_redirect_zone(z2, z1)
- key2 = bucket2.get_key(obj)
-
- eq(data, key2.get_contents_as_string(encoding='ascii'))
-
- key = bucket.new_key(obj)
+ # Should work now with redirect
+ response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+ eq(data, response['Body'].read().decode('ascii'))
+ # Test updates
for x in ['a', 'b', 'c', 'd']:
- data = x*512
- key.set_contents_from_string(data)
- eq(data, key2.get_contents_as_string(encoding='ascii'))
+ data = x * 512
+ zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data)
+ response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+ eq(data, response['Body'].read().decode('ascii'))
- # revert config changes
set_sync_from_all(z2, True)
set_redirect_zone(z2, None)
def test_set_bucket_website():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
- for _, bucket in zone_bucket:
- website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
+ for zone, bucket in zone_bucket:
+ website_cfg = {
+ 'IndexDocument': {'Suffix': 'index.html'},
+ 'ErrorDocument': {'Key': 'error.html'}
+ }
try:
- bucket.set_website_configuration(website_cfg)
- except boto.exception.S3ResponseError as e:
- if e.error_code == 'MethodNotAllowed':
+ zone.s3_client.put_bucket_website(
+ Bucket=bucket.name,
+ WebsiteConfiguration=website_cfg
+ )
+ except ClientError as e:
+ if e.response['Error']['Code'] == 'MethodNotAllowed':
raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
- assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
+
+ response = zone.s3_client.get_bucket_website(Bucket=bucket.name)
+ assert response['IndexDocument']['Suffix'] == 'index.html'
+ assert response['ErrorDocument']['Key'] == 'error.html'
def test_set_bucket_policy():
policy = '''{
}]
}'''
buckets, zone_bucket = create_bucket_per_zone_in_realm()
- for _, bucket in zone_bucket:
- bucket.set_policy(policy)
- assert(bucket.get_policy().decode('ascii') == policy)
+ for zone, bucket in zone_bucket:
+ zone.s3_client.put_bucket_policy(Bucket=bucket.name, Policy=policy)
+ response = zone.s3_client.get_bucket_policy(Bucket=bucket.name)
+ assert response['Policy'] == policy
@attr('bucket_sync_disable')
def test_bucket_sync_disable():
for zone, bucket in zone_bucket:
for objname in objnames:
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string(content)
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
zonegroup_meta_checkpoint(zonegroup)
for zone, bucket in zone_bucket:
for objname in objnames_2:
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string(content)
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
for bucket_name in buckets:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
- objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
+ objnames = ['obj1', 'obj2', 'obj3', 'obj4']
content = 'asdasd'
for zone, bucket in zone_bucket:
for objname in objnames:
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string(content)
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
zonegroup_meta_checkpoint(zonegroup)
zonegroup_meta_checkpoint(zonegroup)
- objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
+ objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
for zone, bucket in zone_bucket:
for objname in objnames_2:
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string(content)
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
for bucket_name in buckets:
enable_bucket_sync(realm.meta_master_zone(), bucket_name)
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
- _, bucket = zone_bucket[0]
+ zone, bucket = zone_bucket[0]
# initiate a multipart upload
- upload = bucket.initiate_multipart_upload('MULTIPART')
- mp = boto.s3.multipart.MultiPartUpload(bucket)
- mp.key_name = upload.key_name
- mp.id = upload.id
- part_size = 5 * 1024 * 1024 # 5M min part size
- mp.upload_part_from_file(StringIO('a' * part_size), 1)
- mp.upload_part_from_file(StringIO('b' * part_size), 2)
- mp.upload_part_from_file(StringIO('c' * part_size), 3)
- mp.upload_part_from_file(StringIO('d' * part_size), 4)
- mp.complete_upload()
+ key_name = 'MULTIPART'
+ response = zone.s3_client.create_multipart_upload(Bucket=bucket.name, Key=key_name)
+ upload_id = response['UploadId']
+
+ part_size = 5 * 1024 * 1024 # 5M min part size
+ parts = []
+
+ for part_num, char in enumerate(['a', 'b', 'c', 'd'], start=1):
+ part_response = zone.s3_client.upload_part(
+ Bucket=bucket.name,
+ Key=key_name,
+ PartNumber=part_num,
+ UploadId=upload_id,
+ Body=char * part_size
+ )
+ parts.append({'PartNumber': part_num, 'ETag': part_response['ETag']})
+
+ zone.s3_client.complete_multipart_upload(
+ Bucket=bucket.name,
+ Key=key_name,
+ UploadId=upload_id,
+ MultipartUpload={'Parts': parts}
+ )
zonegroup_meta_checkpoint(zonegroup)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
zonegroup_conns = ZonegroupConns(zonegroup)
if len(zonegroup.rw_zones) < 2:
- raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
+ raise SkipTest("test_encrypted_object_sync skipped. Requires 2 or more zones in master zonegroup.")
(zone1, zone2) = zonegroup_conns.rw_zones[0:2]
- # create a bucket on the first zone
bucket_name = gen_bucket_name()
log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
- bucket = zone1.conn.create_bucket(bucket_name)
+ zone1.s3_client.create_bucket(Bucket=bucket_name)
+ data = 'A' * 512
+
# upload an object with sse-c encryption
- sse_c_headers = {
- 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
- 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
- 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
- }
- key = bucket.new_key('testobj-sse-c')
- data = 'A'*512
- key.set_contents_from_string(data, headers=sse_c_headers)
+ zone1.s3_client.put_object(
+ Bucket=bucket_name,
+ Key='testobj-sse-c',
+ Body=data,
+ SSECustomerAlgorithm='AES256',
+ SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
+ )
# upload an object with sse-kms encryption
- sse_kms_headers = {
- 'x-amz-server-side-encryption': 'aws:kms',
- # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
- 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
- }
- key = bucket.new_key('testobj-sse-kms')
- key.set_contents_from_string(data, headers=sse_kms_headers)
+ zone1.s3_client.put_object(
+ Bucket=bucket_name,
+ Key='testobj-sse-kms',
+ Body=data,
+ ServerSideEncryption='aws:kms',
+ SSEKMSKeyId='testkey-1'
+ )
- # wait for the bucket metadata and data to sync
zonegroup_meta_checkpoint(zonegroup)
zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
# read the encrypted objects from the second zone
- bucket2 = get_bucket(zone2, bucket_name)
- key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
- eq(data, key.get_contents_as_string(headers=sse_c_headers, encoding='ascii'))
+ response = zone2.s3_client.get_object(
+ Bucket=bucket_name,
+ Key='testobj-sse-c',
+ SSECustomerAlgorithm='AES256',
+ SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
+ )
+ eq(data, response['Body'].read().decode('ascii'))
- key = bucket2.get_key('testobj-sse-kms')
- eq(data, key.get_contents_as_string(encoding='ascii'))
+ response = zone2.s3_client.get_object(Bucket=bucket_name, Key='testobj-sse-kms')
+ eq(data, response['Body'].read().decode('ascii'))
@attr('bucket_trim')
def test_bucket_index_log_trim():
def make_test_bucket():
name = gen_bucket_name()
log.info('create bucket zone=%s name=%s', zone.name, name)
- bucket = zone.conn.create_bucket(name)
+ bucket = zone.create_bucket(name)
for objname in ('a', 'b', 'c', 'd'):
- k = new_key(zone, name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_bucket_checkpoint(zonegroup_conns, name)
return bucket
def make_test_bucket():
name = gen_bucket_name()
log.info('create bucket zone=%s name=%s', zone.name, name)
- bucket = zone.conn.create_bucket(name)
+ bucket = zone.create_bucket(name)
for objname in ('a', 'b', 'c', 'd'):
- k = new_key(zone, name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_bucket_checkpoint(zonegroup_conns, name)
return bucket
# upload more objects
for objname in ('e', 'f', 'g', 'h'):
- k = new_key(zone, test_bucket.name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=test_bucket.name, Key=objname, Body='foo')
+
zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
# Resharding the bucket again
# upload more objects
for objname in ('i', 'j', 'k', 'l'):
- k = new_key(zone, test_bucket.name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=test_bucket.name, Key=objname, Body='foo')
+
zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
# verify the bucket has non-empty bilog
# create a test bucket, upload some objects, and wait for sync
def make_test_bucket():
name = gen_bucket_name()
- log.info('create bucket zone=%s name=%s', primary.name, name)
- bucket = primary.conn.create_bucket(name)
+ log.info('create bucket zone=%s name=%s', primary.zone.name, name)
+ bucket = primary.create_bucket(name)
for objname in ('a', 'b', 'c', 'd'):
- k = new_key(primary, name, objname)
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_bucket_checkpoint(zonegroup_conns, name)
return bucket
primary.zone.cluster.admin(cmd)
# delete bucket and test bilog autotrim
- primary.conn.delete_bucket(test_bucket.name)
+ primary.s3_client.delete_bucket(Bucket=test_bucket.name)
zonegroup_data_checkpoint(zonegroup_conns)
bilog_autotrim(primary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],)
# create a test bucket, upload some objects, and wait for sync
def make_test_bucket():
name = gen_bucket_name()
- log.info('create bucket zone=%s name=%s', primary.name, name)
- bucket = primary.conn.create_bucket(name)
+ log.info('create bucket zone=%s name=%s', primary.zone.name, name)
+ bucket = primary.create_bucket(name)
for objname in ('a', 'b', 'c', 'd'):
- k = new_key(primary, name, objname)
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_bucket_checkpoint(zonegroup_conns, name)
return bucket
primary.zone.cluster.admin(cmd)
# delete bucket and test bilog autotrim
- primary.conn.delete_bucket(test_bucket.name)
+ primary.s3_client.delete_bucket(Bucket=test_bucket.name)
zonegroup_data_checkpoint(zonegroup_conns)
bilog_autotrim(secondary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],)
zonegroup_conns = ZonegroupConns(zonegroup)
zone = zonegroup_conns.rw_zones[0]
- # create a bucket
bucket = zone.create_bucket(gen_bucket_name())
log.debug('created bucket=%s', bucket.name)
zonegroup_meta_checkpoint(zonegroup)
# upload some objects
for objname in ('a', 'b', 'c', 'd'):
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# reshard in each zone
# upload more objects
for objname in ('e', 'f', 'g', 'h'):
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
@attr('bucket_reshard')
try:
# upload some objects
for objname in ('a', 'b', 'c', 'd'):
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
# reshard on first zone
zone.zone.cluster.admin(['bucket', 'reshard',
# upload more objects
for objname in ('e', 'f', 'g', 'h'):
- k = new_key(zone, bucket.name, objname)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
finally:
for z in zonegroup_conns.rw_zones[1:]:
z.zone.start()
def test_bucket_creation_time():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
-
zonegroup_meta_checkpoint(zonegroup)
- zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
+ zone_buckets = []
+ for zone in zonegroup_conns.rw_zones:
+ response = zone.s3_client.list_buckets()
+ zone_buckets.append(response['Buckets'])
+
for z1, z2 in combinations(zone_buckets, 2):
for a, b in zip(z1, z2):
- eq(a.name, b.name)
- eq(a.creation_date, b.creation_date)
+ eq(a['Name'], b['Name'])
+ eq(a['CreationDate'], b['CreationDate'])
def get_bucket_shard_objects(zone, num_shards):
"""
random.shuffle(objs)
del objs[-(len(objs)//10):]
for obj in objs:
- k = new_key(zone, bucket_name, obj)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket_name, Key=obj, Body='foo')
def reshard_bucket(zone, bucket_name, num_shards):
"""
# Write zeroth generation
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * 11}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# Write several more generations
for num_shards in generations:
reshard_bucket(primary.zone, bucket.name, num_shards)
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# test for object names with '%' character.
for obj in range(1, 1100):
- k = new_key(primary, bucket.name, f'obj%{obj * 11}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj%{obj * 11}', Body='foo')
# wait for the secondary to catch up
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# write an object during incremental sync.
objname = 'test_incremental'
- k = new_key(primary, bucket, objname)
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# the object uploaded after bucket sync init should be replicated
- check_object_exists(bucket, objname)
+ check_object_exists(secondary, bucket.name, objname)
def test_role_sync():
bucket = zone.conn.create_bucket(gen_bucket_name())
obj_name = "a"
- k = new_key(zone, bucket.name, obj_name)
- k.set_contents_from_string('foo')
+ zone.s3_client.put_object(Bucket=bucket.name, Key=obj_name, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
log.debug('created bucket=%s', bucket.name)
# upload a dummy object and wait for sync.
- k = new_key(primary, bucket, 'dummy')
- k.set_contents_from_string('foo')
+ objname = 'dummy'
+ primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_data_checkpoint(zonegroup_conns)
- #check object on secondary before setacl
- bucket2 = get_bucket(secondary, bucket.name)
- before_set_acl = bucket2.get_acl(k)
- assert(len(before_set_acl.acl.grants) == 1)
+ # check object on secondary before setacl
+ before_acl = secondary.s3_client.get_object_acl(Bucket=bucket.name, Key=objname)
+ assert len(before_acl['Grants']) == 1
- #set object acl on primary and wait for sync.
- bucket.set_canned_acl('public-read', key_name=k)
- log.debug('set acl=%s', bucket.name)
+ # set object acl on secondary and wait for sync
+ secondary.s3_client.put_object_acl(Bucket=bucket.name, Key=objname, ACL='public-read')
+ log.debug('set acl on bucket=%s', bucket.name)
zonegroup_data_checkpoint(zonegroup_conns)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
- #check object secondary after setacl
- bucket2 = get_bucket(secondary, bucket.name)
- after_set_acl = bucket2.get_acl(k)
- assert(len(after_set_acl.acl.grants) == 2) # read grant added on AllUsers
+ # check object on primary after setacl
+ after_acl = primary.s3_client.get_object_acl(Bucket=bucket.name, Key=objname)
+ assert len(after_acl['Grants']) == 2 # read grant added on AllUsers
def test_assume_role_after_sync():
zonegroup = realm.master_zonegroup()
# write some objects that don't sync yet
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * 11}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
cmd += ['--source-zone', primary.name]
# Write zeroth generation
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * 11}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
# Write several more generations
generations = [17, 19, 23, 29, 31, 37]
for num_shards in generations:
reshard_bucket(primary.zone, bucket.name, num_shards)
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
cmd += ['--source-zone', primary.name]
# upload a dummy object and wait for sync. this forces each zone to finish
# a full sync and switch to incremental
- k = new_key(primary, bucket, 'dummy')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='foo')
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
try:
# Write more objects to primary
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * 11}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
cmd += ['--source-zone', primary.name]
# Write zeroth generation to primary
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * 11}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
# Write several more generations
generations = [17, 19, 23, 29, 31, 37]
for num_shards in generations:
reshard_bucket(primary.zone, bucket.name, num_shards)
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
+
# wait for the secondary to catch up to the latest gen
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# write some more objects to the last gen
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * generations[-1]}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * generations[-1]}', Body='foo')
cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
cmd += ['--source-zone', primary.name]
# Write zeroth generation to primary
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * 11}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
# wait for the secondary to catch up
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
for num_shards in generations:
reshard_bucket(primary.zone, bucket.name, num_shards)
for obj in range(1, 6):
- k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
+
cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
cmd += ['--source-zone', primary.name]
return bucket
def create_object(zone_conn, bucket, objname, content):
- k = new_key(zone_conn, bucket.name, objname)
- k.set_contents_from_string(content)
+ bucket_name = bucket.name if hasattr(bucket, 'name') else bucket
+ zone_conn.s3_client.put_object(Bucket=bucket_name, Key=objname, Body=content)
def create_objects(zone_conn, bucket, obj_arr, content):
+ bucket_name = bucket.name if hasattr(bucket, 'name') else bucket
for objname in obj_arr:
- create_object(zone_conn, bucket, objname, content)
+ zone_conn.s3_client.put_object(Bucket=bucket_name, Key=objname, Body=content)
-def check_object_exists(bucket, objname, content = None):
- k = bucket.get_key(objname)
- assert_not_equal(k, None)
- if (content != None):
- assert_equal(k.get_contents_as_string(encoding='ascii'), content)
+def check_object_exists(zone_or_client, bucket_name, objname, content=None):
+ # handle both zone connection and direct client request
+ client = zone_or_client.s3_client if hasattr(zone_or_client, 's3_client') else zone_or_client
-def check_objects_exist(bucket, obj_arr, content = None):
+ try:
+ response = client.get_object(Bucket=bucket_name, Key=objname)
+ if content is not None:
+ actual_content = response['Body'].read()
+ if isinstance(content, str):
+ actual_content = actual_content.decode('utf-8')
+ assert_equal(actual_content, content)
+ except ClientError as e:
+ if e.response['Error']['Code'] == 'NoSuchKey':
+ assert False, f"Object {objname} does not exist in bucket {bucket_name}"
+ raise
+
+def check_objects_exist(zone_or_client, bucket_name, obj_arr, content=None):
+ if isinstance(obj_arr, str):
+ obj_arr = [obj_arr]
for objname in obj_arr:
- check_object_exists(bucket, objname, content)
+ check_object_exists(zone_or_client, bucket_name, objname, content)
-def check_object_not_exists(bucket, objname):
- k = bucket.get_key(objname)
- assert_equal(k, None)
-
-def check_objects_not_exist(bucket, obj_arr):
+def check_object_not_exists(zone_or_client, bucket_name, objname):
+ client = zone_or_client.s3_client if hasattr(zone_or_client, 's3_client') else zone_or_client
+ try:
+ client.head_object(Bucket=bucket_name, Key=objname)
+ assert False, f"Object {objname} should not exist in bucket {bucket_name}"
+ except ClientError as e:
+ assert e.response['Error']['Code'] == '404'
+
+def check_objects_not_exist(zone_or_client, bucket_name, obj_arr):
+ if isinstance(obj_arr, str):
+ obj_arr = [obj_arr]
for objname in obj_arr:
- check_object_not_exists(bucket, objname)
+ check_object_not_exists(zone_or_client, bucket_name, objname)
@attr('fails_with_rgw')
@attr('sync_policy')
zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- objnames = [ 'obj1', 'obj2' ]
+ objnames = ['obj1', 'obj2']
content = 'asdasd'
- buckets = []
# create bucket & object in all zones
bucketA = create_zone_bucket(zcA)
- buckets.append(bucketA)
- create_object(zcA, bucketA, objnames[0], content)
+ zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnames[0], Body=content)
bucketB = create_zone_bucket(zcB)
- buckets.append(bucketB)
- create_object(zcB, bucketB, objnames[1], content)
+ zcB.s3_client.put_object(Bucket=bucketB.name, Key=objnames[1], Body=content)
zonegroup_meta_checkpoint(zonegroup)
- # 'zonegroup_data_checkpoint' currently fails for the zones not
- # allowed to sync. So as a workaround, data checkpoint is done
- # for only the ones configured.
zone_data_checkpoint(zoneB, zoneA)
- # verify if objects are synced accross the zone
- bucket = get_bucket(zcB, bucketA.name)
- check_object_exists(bucket, objnames[0], content)
-
- bucket = get_bucket(zcA, bucketB.name)
- check_object_exists(bucket, objnames[1], content)
+ # verify objects synced
+ check_object_exists(zcB, bucketA.name, objnames[0], content)
+ check_object_exists(zcA, bucketB.name, objnames[1], content)
remove_sync_policy_group(c1, "sync-group")
return
content = 'asdasd'
# create bucketA & objects in zoneA
- objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+ objnamesA = ['obj1', 'obj2', 'obj3']
bucketA = create_zone_bucket(zcA)
buckets.append(bucketA)
create_objects(zcA, bucketA, objnamesA, content)
# create bucketB & objects in zoneB
- objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+ objnamesB = ['obj4', 'obj5', 'obj6']
bucketB = create_zone_bucket(zcB)
buckets.append(bucketB)
create_objects(zcB, bucketB, objnamesB, content)
zone_data_checkpoint(zoneA, zoneB)
# verify if objnamesA synced to only zoneB but not zoneC
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesA, content)
-
- bucket = get_bucket(zcC, bucketA.name)
- check_objects_not_exist(bucket, objnamesA)
+ check_objects_exist(zcB, bucketA.name, objnamesA, content)
+ check_objects_not_exist(zcC, bucketA.name, objnamesA)
# verify if objnamesB synced to only zoneA but not zoneC
- bucket = get_bucket(zcA, bucketB.name)
- check_objects_exist(bucket, objnamesB, content)
-
- bucket = get_bucket(zcC, bucketB.name)
- check_objects_not_exist(bucket, objnamesB)
+ check_objects_exist(zcA, bucketB.name, objnamesB, content)
+ check_objects_not_exist(zcC, bucketB.name, objnamesB)
remove_sync_policy_group(c1, "sync-group")
return
zonegroup_conns = ZonegroupConns(zonegroup)
if len(zonegroup.zones) < 3:
- raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+ raise SkipTest("test_sync_flow_directional_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
zonegroup_meta_checkpoint(zonegroup)
zone_data_checkpoint(zoneB, zoneA)
# verify if objnamesA synced to only zoneB but not zoneC
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesA, content)
-
- bucket = get_bucket(zcC, bucketA.name)
- check_objects_not_exist(bucket, objnamesA)
+ check_objects_exist(zcB, bucketA.name, objnamesA, content)
+ check_objects_not_exist(zcC, bucketA.name, objnamesA)
# verify if objnamesB are not synced to either zoneA or zoneC
- bucket = get_bucket(zcA, bucketB.name)
- check_objects_not_exist(bucket, objnamesB)
-
- bucket = get_bucket(zcC, bucketB.name)
- check_objects_not_exist(bucket, objnamesB)
+ check_objects_not_exist(zcA, bucketB.name, objnamesB)
+ check_objects_not_exist(zcC, bucketB.name, objnamesB)
"""
verify the same at bucketA level
zone_data_checkpoint(zoneB, zoneA)
# verify that objnamesC are synced to bucketA in zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesC, content)
+ check_objects_exist(zcB, bucketA.name, objnamesC, content)
# verify that objnamesD are not synced to bucketA in zoneA
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnamesD)
+ check_objects_not_exist(zcA, bucketA.name, objnamesD)
remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
remove_sync_policy_group(c1, "sync-group")
return
+
@attr('fails_with_rgw')
@attr('sync_policy')
def test_sync_single_bucket():
zone_data_checkpoint(zoneB, zoneA)
# verify if bucketA objects are synced
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnames, content)
+ check_objects_exist(zcB, bucketA.name, objnames, content)
# bucketB objects should not be synced
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_not_exist(bucket, objnames)
+ check_objects_not_exist(zcB, bucketB.name, objnames)
"""
zone_data_checkpoint(zoneB, zoneA)
# verify if bucketA objects are synced
- bucket = get_bucket(zcB, bucketA.name)
- check_object_exists(bucket, objnames[2], content)
+ check_object_exists(zcB, bucketA.name, objnames[2], content)
# bucketB objects should not be synced
- bucket = get_bucket(zcB, bucketB.name)
- check_object_not_exists(bucket, objnames[2])
+ check_object_not_exists(zcB, bucketB.name, objnames[2])
remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
remove_sync_policy_group(c1, "sync-group")
# verify that objects are synced to bucketB in zoneB
# but not to bucketA
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_not_exist(bucket, objnames)
+ check_objects_not_exist(zcB, bucketA.name, objnames)
+ check_objects_exist(zcB, bucketB.name, objnames, content)
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_exist(bucket, objnames, content)
"""
Method (b): configure policy at only bucketA level with pipe
set to bucketB in target zone
# verify that objects are synced to bucketB in zoneB
# but not to bucketA
"""
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_not_exist(bucket, objnamesC)
-
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_exist(bucket, objnamesC, content)
+ check_objects_not_exist(zcB, bucketA.name, objnamesC)
+ check_objects_exist(zcB, bucketB.name, objnamesC, content)
remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
# verify that objects from only bucketB are synced to
# bucketA in zoneA
"""
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnamesD)
- check_objects_exist(bucket, objnamesE, content)
+ check_objects_not_exist(zcA, bucketA.name, objnamesD)
+ check_objects_exist(zcA, bucketA.name, objnamesE, content)
remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
remove_sync_policy_group(c1, "sync-group")
# verify that both zoneA bucketA & bucketB objects are synced to
# bucketB in zoneB but not to bucketA
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_not_exist(bucket, objnamesA)
- check_objects_not_exist(bucket, objnamesB)
+ check_objects_not_exist(zcB, bucketA.name, objnamesA)
+ check_objects_not_exist(zcB, bucketA.name, objnamesB)
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_exist(bucket, objnamesA, content)
- check_objects_exist(bucket, objnamesB, content)
+ check_objects_exist(zcB, bucketB.name, objnamesA, content)
+ check_objects_exist(zcB, bucketB.name, objnamesB, content)
"""
Method (b): configure at bucket level
# verify that both zoneA bucketA & bucketB objects are synced to
# bucketA in zoneB but not to bucketB
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_not_exist(bucket, objnamesC)
- check_objects_not_exist(bucket, objnamesD)
+ check_objects_not_exist(zcB, bucketB.name, objnamesC)
+ check_objects_not_exist(zcB, bucketB.name, objnamesD)
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesD, content)
- check_objects_exist(bucket, objnamesD, content)
+ check_objects_exist(zcB, bucketA.name, objnamesC, content)
+ check_objects_exist(zcB, bucketA.name, objnamesD, content)
remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
remove_sync_policy_group(c1, "sync-group")
# verify that objects from zoneA bucketA are synced to both
# bucketA & bucketB in zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesA, content)
-
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_exist(bucket, objnamesA, content)
+ check_objects_exist(zcB, bucketA.name, objnamesA, content)
+ check_objects_exist(zcB, bucketB.name, objnamesA, content)
"""
Method (b): configure at bucket level
# verify that objects from zoneA bucketA are synced to both
# bucketA & bucketB in zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesB, content)
-
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_exist(bucket, objnamesB, content)
+ check_objects_exist(zcB, bucketA.name, objnamesB, content)
+ check_objects_exist(zcB, bucketB.name, objnamesB, content)
remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
remove_sync_policy_group(c1, "sync-group")
assert check_all_buckets_exist(zone, buckets)
for zone, bucket_name in zone_bucket:
- zone.conn.delete_bucket(bucket_name)
+ zone.s3_client.delete_bucket(Bucket=bucket.name)
zonegroup_meta_checkpoint(zonegroup)
objname = 'dummy'
# upload a dummy object and wait for sync.
- k = new_key(primary, bucket, objname)
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_data_checkpoint(zonegroup_conns)
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# copy object on primary zone
- primary.s3_client.copy_object(Bucket=bucket.name,
- CopySource=bucket.name + '/'+ objname,
- Key= objname + '-copy1')
+ primary.s3_client.copy_object(
+ Bucket=bucket.name,
+ CopySource={'Bucket': bucket.name, 'Key': objname},
+ Key=objname + '-copy1'
+ )
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
# copy object on secondary zone
- secondary.s3_client.copy_object(Bucket=bucket.name,
- Key= objname + '-copy2',
- CopySource=bucket.name + '/'+ objname)
+ secondary.s3_client.copy_object(
+ Bucket=bucket.name,
+ Key=objname + '-copy2',
+ CopySource={'Bucket': bucket.name, 'Key': objname}
+ )
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
objname = 'dummy'
# upload a dummy object and wait for sync.
- k = new_key(primary, source_bucket, objname)
- k.set_contents_from_string('foo')
+ primary.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zonegroup_bucket_checkpoint(zonegroup_conns, source_bucket.name)
zonegroup_meta_checkpoint(zonegroup)
# copy object on primary zone
- primary.s3_client.copy_object(Bucket = dest_bucket.name,
- Key = objname + '-copy',
- CopySource = source_bucket.name + '/' + objname)
+ primary.s3_client.copy_object(
+ Bucket=dest_bucket.name,
+ Key=objname + '-copy',
+ CopySource={'Bucket': source_bucket.name, 'Key': objname}
+ )
zonegroup_bucket_checkpoint(zonegroup_conns, dest_bucket.name)
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
time.sleep(config.checkpoint_delay)
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
# delete object on source
source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
# delete object on source
source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object not exists in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object not exists in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket_name, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket_name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket.name, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
@attr('sync_policy')
@attr('fails_with_rgw')
def test_bucket_delete_with_zonegroup_sync_policy_directional():
-
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
-
zonegroup_meta_checkpoint(zonegroup)
(zoneA, zoneB) = zonegroup.zones[0:2]
c1 = zoneA.cluster
- # configure sync policy
zones = zoneA.name + ',' + zoneB.name
c1.admin(['sync', 'policy', 'get'])
create_sync_policy_group(c1, "sync-group")
zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- # configure sync policy for only bucketA and enable it
bucketA = create_zone_bucket(zcA)
- buckets = []
- buckets.append(bucketA)
time.sleep(config.checkpoint_delay)
zonegroup_meta_checkpoint(zonegroup)
- # create bucketA and objects in zoneA and zoneB
objnameA = 'a'
objnameB = 'b'
- # upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+ zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
- zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
zone_data_checkpoint(zoneB, zoneA)
- # verify that objnameA is synced to bucketA in zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnameA)
-
- # verify that objnameB is not synced to bucketA in zoneA
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnameB)
+ # verify sync
+ check_object_exists(zcB, bucketA.name, objnameA, 'foo')
+ check_object_not_exists(zcA, bucketA.name, objnameB)
log.debug('deleting object on zone A')
- k = get_key(zcA, bucket, objnameA)
- k.delete()
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
+ zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
- zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
-
- # delete bucket on zoneA. it should fail to delete
+ # delete bucket - should fail
log.debug('deleting bucket')
- assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+ e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
- assert check_all_buckets_exist(zcA, buckets)
- assert check_all_buckets_exist(zcB, buckets)
+ assert check_all_buckets_exist(zcA, [bucketA.name])
+ assert check_all_buckets_exist(zcB, [bucketA.name])
- # retry deleting bucket after removing the object from zone B. should succeed
+ # retry after deleting object from zone B
log.debug('deleting object on zone B')
- k = get_key(zcB, bucket, objnameB)
- k.delete()
+ zcB.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
time.sleep(config.checkpoint_delay)
log.debug('retry deleting bucket')
- zcA.delete_bucket(bucketA.name)
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
+ assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+ assert check_all_buckets_dont_exist(zcB, [bucketA.name])
remove_sync_policy_group(c1, "sync-group")
-
return
@attr('sync_policy')
@attr('fails_with_rgw')
def test_bucket_delete_with_bucket_sync_policy_directional():
-
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
-
zonegroup_meta_checkpoint(zonegroup)
(zoneA, zoneB) = zonegroup.zones[0:2]
c1 = zoneA.cluster
- # configure sync policy
zones = zoneA.name + ',' + zoneB.name
c1.admin(['sync', 'policy', 'get'])
create_sync_policy_group(c1, "sync-group")
zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- """
- configure policy at bucketA level with src and dest
- zones specified to zoneA and zoneB resp.
-
- verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
- """
-
- # configure sync policy for only bucketA and enable it
bucketA = create_zone_bucket(zcA)
- buckets = []
- buckets.append(bucketA)
create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name)
- #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name)
set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
- # create bucketA and objects in zoneA and zoneB
objnameA = 'a'
objnameB = 'b'
- # upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+ zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zone_data_checkpoint(zoneB, zoneA)
- # verify that objnameA is synced to bucketA in zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnameA)
-
- # verify that objnameB is not synced to bucketA in zoneA
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnameB)
+ check_object_exists(zcB, bucketA.name, objnameA, 'foo')
+ check_object_not_exists(zcA, bucketA.name, objnameB)
log.debug('deleting object on zone A')
- k = get_key(zcA, bucket, objnameA)
- k.delete()
-
- zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
+ zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
- # delete bucket on zoneA. it should fail to delete
log.debug('deleting bucket')
- assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+ e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
- assert check_all_buckets_exist(zcA, buckets)
- assert check_all_buckets_exist(zcB, buckets)
+ assert check_all_buckets_exist(zcA, [bucketA.name])
+ assert check_all_buckets_exist(zcB, [bucketA.name])
log.debug('deleting object on zone B')
- k = get_key(zcB, bucket, objnameB)
- k.delete()
+ zcB.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
time.sleep(config.checkpoint_delay)
- # retry deleting bucket after removing the object from zone B. should succeed
log.debug('retry deleting bucket')
- zcA.delete_bucket(bucketA.name)
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
+ assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+ assert check_all_buckets_dont_exist(zcB, [bucketA.name])
remove_sync_policy_group(c1, "sync-group")
-
return
@attr('sync_policy')
objnameB = 'b'
# upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+ zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
zone_data_checkpoint(zoneB, zoneA)
log.debug('deleting object A')
- k = get_key(zcA, bucketA, objnameA)
- k.delete()
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
log.debug('deleting object B')
- k = get_key(zcA, bucketA, objnameB)
- k.delete()
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
zone_data_checkpoint(zoneB, zoneA)
# delete bucket on zoneA.
log.debug('deleting bucket')
- zcA.delete_bucket(bucketA.name)
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
-
+ assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+ assert check_all_buckets_dont_exist(zcB, [bucketA.name])
remove_sync_policy_group(c1, "sync-group")
return
objnameB = 'b'
# upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+ zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
zone_data_checkpoint(zoneB, zoneA)
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
log.debug('deleting object A')
- k = get_key(zcA, bucketA, objnameA)
- k.delete()
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
log.debug('deleting object B')
- k = get_key(zcA, bucketA, objnameB)
- k.delete()
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
zone_data_checkpoint(zoneB, zoneA)
# delete bucket on zoneA.
log.debug('deleting bucket')
- zcA.delete_bucket(bucketA.name)
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
+ assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+ assert check_all_buckets_dont_exist(zcB, [bucketA.name])
remove_sync_flow(c1, "sync-group", "sync-flow", "symmetrical")
remove_sync_policy_group(c1, "sync-group")
objnameB = 'b'
# upload object in zone A and zone C
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcC, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+ zcC.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
zonegroup_meta_checkpoint(zonegroup)
zone_data_checkpoint(zoneB, zoneA)
# verify that objnameA is synced to zoneB but not zoneC
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnameA)
-
- bucket = get_bucket(zcC, bucketA.name)
- check_objects_not_exist(bucket, objnameA)
+ check_object_exists(zcB, bucketA.name, objnameA, 'foo')
+ check_object_not_exists(zcC, bucketA.name, objnameA)
# verify that objnameB is not synced to either zoneA or zoneB
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnameB)
-
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_not_exist(bucket, objnameB)
+ check_object_not_exists(zcA, bucketA.name, objnameB)
+ check_object_not_exists(zcB, bucketA.name, objnameB)
log.debug('deleting object on zone A')
- k = get_key(zcA, bucket, objnameA)
- k.delete()
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
# delete bucket on zoneA. it should fail to delete because zoneC still has objnameB
log.debug('deleting bucket')
- assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+ e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
assert check_all_buckets_exist(zcA, buckets)
assert check_all_buckets_exist(zcC, buckets)
# retry deleting bucket after removing the object from zone C. should succeed
log.debug('deleting object on zone C')
- k = get_key(zcC, bucket, objnameB)
- k.delete()
+ zcC.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
time.sleep(config.checkpoint_delay)
log.debug('retry deleting bucket')
- zcA.delete_bucket(bucketA.name)
+ zcA.s3_client.delete_bucket(Bucket=bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
assert check_all_buckets_dont_exist(zcC, buckets)
remove_sync_policy_group(c1, "sync-group")
-
+
return
@attr('sync_policy')
objnameB = 'b'
# upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+ zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
zone_data_checkpoint(zoneB, zoneA)
# verify that objnameA is synced to zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_object_exists(bucket, objnameA)
+ check_object_exists(zcB, bucketA.name, objnameA, 'foo')
# verify that objnameB is not synced to zoneA
- bucket = get_bucket(zcA, bucketA.name)
- check_object_not_exists(bucket, objnameB)
+ check_object_not_exists(zcA, bucketA.name, objnameB)
log.debug('deleting object A')
- k = get_key(zcA, bucketA, objnameA)
- k.delete()
+ zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
zone_data_checkpoint(zoneB, zoneA)
# delete bucket on zoneA. it should fail to delete because zoneB still has objnameB
log.debug('deleting bucket')
- assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+ e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+ assert e.response['Error']['Code'] == 'BucketNotEmpty'
assert check_all_buckets_exist(zcA, buckets)
assert check_all_buckets_exist(zcB, buckets)
dest_bucket = dest_zone.create_bucket(gen_bucket_name())
realm_meta_checkpoint(realm)
- # copy object
- dest_zone.s3_client.copy_object(
- Bucket=dest_bucket.name,
- CopySource=f'{source_bucket.name}/{objname}',
- Key=objname
- )
+ obj_sizes = [4096, 8 * 1024 * 1024]
+ for size in obj_sizes:
+ source_zone.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='x' * size)
- # check that object exists in destination bucket
- k = get_key(dest_zone, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ # copy object
+ dest_zone.s3_client.copy_object(
+ Bucket=dest_bucket.name,
+ CopySource=f'{source_bucket.name}/{objname}',
+ Key=objname
+ )
+
+ # verify
+ response = dest_zone.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'x' * size)
@allow_bucket_replication
def test_bucket_replication_alt_user_delete_forbidden():
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
# delete object on source
source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
zone_data_checkpoint(dest.zone, source.zone)
# check that object does exist in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
@allow_bucket_replication
def test_bucket_replication_alt_user_delete():
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
# delete object on source
source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
# delete object on source
source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
zone_data_checkpoint(dest.zone, source.zone)
# check that object does exist in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
@allow_bucket_replication
def test_bucket_replication_alt_user_deletemarker():
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(response['Body'].read().decode('utf-8'), 'foo')
# delete object on source
source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket_name, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket_name, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
zone_data_checkpoint(dest.zone, source.zone)
# check that object does not exist in destination bucket
source_bucket = source_zone.create_bucket(gen_bucket_name())
objname = 'dummy'
- k = new_key(source_zone, source_bucket.name, objname)
- k.set_contents_from_string('foo')
+ source_zone.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
for zg in realm.current_period.zonegroups:
if zg.name == zonegroup.name: