res = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
assert_equal(res['Body'].read().decode('utf-8'), 'foo')
assert 'TagCount' not in res
+
+@allow_bucket_replication
+def test_bucket_replication_source_forbidden():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.rw_zones[0]
+ dest = zonegroup_conns.rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+
+ # Deny myself from fetching the source object for replication
+ source.s3_client.put_bucket_policy(
+ Bucket=source_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Deny',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': ['s3:GetObjectVersionForReplication', 's3:GetObject'],
+ 'Resource': f'arn:aws:s3:::{source_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+ # remove bucket policy so I can check for replication status
+ source.s3_client.delete_bucket_policy(Bucket=source_bucket.name)
+
+ # check the source object has replication status set to FAILED
+ # uncomment me in https://github.com/ceph/ceph/pull/62147
+ # res = source.s3_client.head_object(Bucket=source_bucket.name, Key=objname)
+ # assert_equal(res['ReplicationStatus'], 'FAILED')
+
+@allow_bucket_replication
+def test_bucket_replication_source_forbidden_versioned():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.rw_zones[0]
+ dest = zonegroup_conns.rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ # enable versioning
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+
+ # Deny myself from fetching the source object for replication
+ source.s3_client.put_bucket_policy(
+ Bucket=source_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Deny',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': ['s3:GetObjectVersionForReplication', 's3:GetObjectVersion'],
+ 'Resource': f'arn:aws:s3:::{source_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+ # remove bucket policy so I can check for replication status
+ source.s3_client.delete_bucket_policy(Bucket=source_bucket.name)
+
+ # check the source object has replication status set to FAILED
+ # uncomment me in https://github.com/ceph/ceph/pull/62147
+ # res = source.s3_client.head_object(Bucket=source_bucket.name, Key=objname)
+ # assert_equal(res['ReplicationStatus'], 'FAILED')
+
+@allow_bucket_replication
+def test_bucket_replication_source_allow_either_getobject_or_getobjectversionforreplication():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.rw_zones[0]
+ dest = zonegroup_conns.rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+
+ # Deny myself from fetching the source object for replication
+ source.s3_client.put_bucket_policy(
+ Bucket=source_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Deny',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:GetObject',
+ 'Resource': f'arn:aws:s3:::{source_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ res = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(res['Body'].read().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_source_allow_either_getobjectversion_or_getobjectversionforreplication():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.rw_zones[0]
+ dest = zonegroup_conns.rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ # enable versioning
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+
+ # Deny myself from fetching the source object for replication
+ source.s3_client.put_bucket_policy(
+ Bucket=source_bucket.name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Deny',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:GetObjectVersionForReplication',
+ 'Resource': f'arn:aws:s3:::{source_bucket.name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object exists in destination bucket
+ res = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+ assert_equal(res['Body'].read().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_source_forbidden_objretention():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.rw_zones[0]
+ dest = zonegroup_conns.rw_zones[1]
+
+ source_bucket_name = gen_bucket_name()
+ source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True)
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket_name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+
+ # Deny myself from fetching the source object's retention for replication
+ source.s3_client.put_bucket_policy(
+ Bucket=source_bucket_name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Deny',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:GetObjectRetention',
+ 'Resource': f'arn:aws:s3:::{source_bucket_name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket_name, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+ # check the source object has replication status set to FAILED
+ # uncomment me in https://github.com/ceph/ceph/pull/62147
+ # res = source.s3_client.head_object(Bucket=source_bucket_name, Key=objname)
+ # assert_equal(res['ReplicationStatus'], 'FAILED')
+
+@allow_bucket_replication
+def test_bucket_replication_source_forbidden_legalhold():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.rw_zones[0]
+ dest = zonegroup_conns.rw_zones[1]
+
+ source_bucket_name = gen_bucket_name()
+ source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True)
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket_name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+
+ # Deny myself from fetching the source object's retention for replication
+ source.s3_client.put_bucket_policy(
+ Bucket=source_bucket_name,
+ Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Deny',
+ 'Principal': {'AWS': [f"arn:aws:iam:::user/{non_account_user.id}"]},
+ 'Action': 's3:GetObjectLegalHold',
+ 'Resource': f'arn:aws:s3:::{source_bucket_name}/*',
+ }]
+ })
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket_name, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+ # check the source object has replication status set to FAILED
+ # uncomment me in https://github.com/ceph/ceph/pull/62147
+ # res = source.s3_client.head_object(Bucket=source_bucket_name, Key=objname)
+ # assert_equal(res['ReplicationStatus'], 'FAILED')