Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': zg.name})
assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+
+def allow_bucket_replication(function):
+ def wrapper(*args, **kwargs):
+ zonegroup = realm.master_zonegroup()
+ if len(zonegroup.zones) < 2:
+ raise SkipTest("More than one zone needed in any one or multiple zone(s).")
+
+ zones = ",".join([z.name for z in zonegroup.zones])
+ z = zonegroup.zones[0]
+ c = z.cluster
+
+ create_sync_policy_group(c, "sync-group")
+ create_sync_group_flow_symmetrical(c, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c, "sync-group", "sync-pipe", zones, zones)
+
+ zonegroup.period.update(z, commit=True)
+ realm_meta_checkpoint(realm)
+
+ try:
+ function(*args, **kwargs)
+ finally:
+ remove_sync_group_pipe(c, "sync-group", "sync-pipe")
+ remove_sync_group_flow_symmetrical(c, "sync-group", "sync-flow")
+ remove_sync_policy_group(c, "sync-group")
+
+ zonegroup.period.update(z, commit=True)
+ realm_meta_checkpoint(realm)
+
+ return wrapper
+
+@allow_bucket_replication
+def test_bucket_replication_normal():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_alt_user_forbidden():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_alt_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_alt_user():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_alt_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ 'AccessControlTranslation': {
+ 'Owner': non_account_alt_user.id,
+ },
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ # give access to user to write to alt user's bucket
+ dest.s3_client.put_bucket_policy(
+ Bucket=dest_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{user.id}"]},
+ 'Action': 's3:PutObject',
+ 'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')