zonegroup = realm.master_zonegroup()
if len(zonegroup.zones) < 2:
raise SkipTest("More than one zone needed in any one or multiple zone(s).")
-
+
zones = ",".join([z.name for z in zonegroup.zones])
z = zonegroup.zones[0]
c = z.cluster
def test_bucket_replication_normal():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
-
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo', Tagging='key1=value1')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ res = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(res['TagCount'], 1)
+ assert_equal(res['Body'].read().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_normal_delete():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ time.sleep(config.checkpoint_delay)
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+ # delete object on source
+ source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
+ time.sleep(config.checkpoint_delay)
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_normal_deletemarker():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
source = zonegroup_conns.non_account_rw_zones[0]
dest = zonegroup_conns.non_account_rw_zones[1]
source_bucket = source.create_bucket(gen_bucket_name())
+ # enable versioning
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
dest_bucket = dest.create_bucket(gen_bucket_name())
+ # enable versioning
+ dest.s3_client.put_bucket_versioning(
+ Bucket=dest_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
zonegroup_meta_checkpoint(zonegroup)
# create replication configuration
k = get_key(dest, dest_bucket, objname)
assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ # delete object on source
+ source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
+ time.sleep(config.checkpoint_delay)
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
@allow_bucket_replication
def test_bucket_replication_alt_user_forbidden():
zonegroup = realm.master_zonegroup()
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
- 'Principal': {'AWS': [f"arn:aws:iam:::user/{user.id}"]},
- 'Action': 's3:PutObject',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:ReplicateObject',
'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
}]
})
# upload an object and wait for sync.
objname = 'dummy'
- k = new_key(source, source_bucket, objname)
- k.set_contents_from_string('foo')
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo', Tagging='key1=value1')
zone_data_checkpoint(dest.zone, source.zone)
# check that object exists in destination bucket
- k = get_key(dest, dest_bucket, objname)
- assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+ res = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(res['TagCount'], 1)
+ assert_equal(res['Body'].read().decode('utf-8'), 'foo')
@allow_bucket_replication
def test_bucket_replication_reject_versioning_identical():
# check that object exists in destination bucket
k = get_key(dest_zone, dest_bucket, objname)
assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_alt_user_delete_forbidden():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_alt_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ 'AccessControlTranslation': {
+ 'Owner': non_account_alt_user.id,
+ },
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ # give access to user to write to alt user's bucket
+ dest.s3_client.put_bucket_policy(
+ Bucket=dest_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:ReplicateObject',
+ 'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+ # delete object on source
+ source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
+ time.sleep(config.checkpoint_delay)
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does exist in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_alt_user_delete():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_alt_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ 'AccessControlTranslation': {
+ 'Owner': non_account_alt_user.id,
+ },
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ # give access to user to write to alt user's bucket
+ dest.s3_client.put_bucket_policy(
+ Bucket=dest_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': ['s3:ReplicateObject', 's3:ReplicateDelete'],
+ 'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+ # delete object on source
+ source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
+ time.sleep(config.checkpoint_delay)
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_alt_user_deletemarker_forbidden():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_alt_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ # enable versioning
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ # enable versioning
+ dest.s3_client.put_bucket_versioning(
+ Bucket=dest_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ 'AccessControlTranslation': {
+ 'Owner': non_account_alt_user.id,
+ },
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ # give access to user to write to alt user's bucket
+ dest.s3_client.put_bucket_policy(
+ Bucket=dest_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:ReplicateObject',
+ 'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+ # delete object on source
+ source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
+ time.sleep(config.checkpoint_delay)
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does exist in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_alt_user_deletemarker():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_alt_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ # enable versioning
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ # enable versioning
+ dest.s3_client.put_bucket_versioning(
+ Bucket=dest_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ 'AccessControlTranslation': {
+ 'Owner': non_account_alt_user.id,
+ },
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ # give access to user to write to alt user's bucket
+ dest.s3_client.put_bucket_policy(
+ Bucket=dest_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': ['s3:ReplicateObject', 's3:ReplicateDelete'],
+ 'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ k = get_key(dest, dest_bucket, objname)
+ assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+ # delete object on source
+ source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
+ time.sleep(config.checkpoint_delay)
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_alt_user_deny_tagreplication():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_alt_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ 'AccessControlTranslation': {
+ 'Owner': non_account_alt_user.id,
+ },
+ }
+ }]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ # give access to user to write to alt user's bucket
+ dest.s3_client.put_bucket_policy(
+ Bucket=dest_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [
+ {
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:ReplicateObject',
+ 'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
+ },
+ {
+ 'Effect': 'Deny',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:ReplicateTags',
+ 'Resource': f'arn:aws:s3:::{dest_bucket.name}/*',
+ }
+ ]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo', Tagging='key1=value1')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket without tags
+ res = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(res['Body'].read().decode('utf-8'), 'foo')
+ assert 'TagCount' not in res