source_bucket = source_zone.create_bucket(gen_bucket_name())
objname = 'dummy'
- k = new_key(source_zone, source_bucket.name, objname)
- k.set_contents_from_string('foo')
for zg in realm.current_period.zonegroups:
if zg.name == zonegroup.name:
dest_bucket = dest_zone.create_bucket(gen_bucket_name())
realm_meta_checkpoint(realm)
- # copy object
- dest_zone.s3_client.copy_object(
- Bucket=dest_bucket.name,
- CopySource=f'{source_bucket.name}/{objname}',
- Key=objname
- )
-
- # check that object exists in destination bucket
- k = get_key(dest_zone, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ # try object sizes of 4K and 8MiB
+ # 4K to test no progress case
+ # 8MiB to test progress case
+ obj_sizes = [4096, 8 * 1024 * 1024]
+ for size in obj_sizes:
+ k = new_key(source_zone, source_bucket.name, objname)
+ k.set_contents_from_string('x' * size)
+
+ # copy object
+ dest_zone.s3_client.copy_object(
+ Bucket=dest_bucket.name,
+ CopySource=f'{source_bucket.name}/{objname}',
+ Key=objname
+ )
+
+ # check that object exists in destination bucket
+ k = get_key(dest_zone, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'x' * size)
@allow_bucket_replication
def test_bucket_replication_alt_user_delete_forbidden():