assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group, bucket)
return json.loads(result_json)
+def remove_sync_flow(cluster, group, flow, type, source=None, dest=None, bucket = None):
+ cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id', flow, '--flow-type', type]
+ if source:
+ cmd += ['--source-zone', source]
+ if dest:
+ cmd += ['--dest-zone', dest]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to remove sync flow id=%s, bucket=%s' % (flow, bucket)
+ return json.loads(result_json)
+
def create_sync_group_flow_symmetrical(cluster, group, flow_id, zones, bucket = None):
cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical', '--zones=%s' % zones]
if bucket:
def check_objects_not_exist(bucket, obj_arr):
for objname in obj_arr:
check_object_not_exists(bucket, objname)
-
-@attr('sync_policy')
-def test_bucket_delete_with_bucket_sync_policy_directional():
+@attr('fails_with_rgw')
+@attr('sync_policy')
+def test_sync_policy_config_zonegroup():
+ """
+ test_sync_policy_config_zonegroup:
+ test configuration of all sync commands
+ """
zonegroup = realm.master_zonegroup()
- zonegroup_conns = ZonegroupConns(zonegroup)
-
zonegroup_meta_checkpoint(zonegroup)
- (zoneA, zoneB) = zonegroup.zones[0:2]
- (zcA, zcB) = zonegroup_conns.zones[0:2]
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ z1, z2 = zonegroup.zones[0:2]
+ c1, c2 = (z1.cluster, z2.cluster)
- c1 = zoneA.cluster
+ zones = z1.name+","+z2.name
- # configure sync policy
- zones = zoneA.name + ',' + zoneB.name
c1.admin(['sync', 'policy', 'get'])
+
+ # (a) zonegroup level
create_sync_policy_group(c1, "sync-group")
- create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
- create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
- set_sync_policy_group_status(c1, "sync-group", "allowed")
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+ get_sync_policy_group(c1, "sync-group")
- zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- """
- configure policy at bucketA level with src and dest
- zones specified to zoneA and zoneB resp.
-
- verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
- """
-
- # configure sync policy for only bucketA and enable it
- bucketA = create_zone_bucket(zcA)
- buckets = []
- buckets.append(bucketA)
- create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
- create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name)
- #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
- create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name)
- set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
-
- get_sync_policy(c1, bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
- zonegroup_meta_checkpoint(zonegroup)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ get_sync_policy_group(c1, "sync-group")
- # create bucketA and objects in zoneA and zoneB
- objnameA = 'a'
- objnameB = 'b'
+ zonegroup.period.update(z1, commit=True)
- # upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ # (b) bucket level
+ zc1, zc2 = zonegroup_conns.zones[0:2]
+ bucket = create_zone_bucket(zc1)
+ bucket_name = bucket.name
- zonegroup_meta_checkpoint(zonegroup)
- zone_data_checkpoint(zoneB, zoneA)
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name)
+ get_sync_policy_group(c1, "sync-bucket", bucket_name)
- # verify that objnameA is synced to bucketA in zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnameA)
+ get_sync_policy(c1, bucket_name)
- # verify that objnameB is not synced to bucketA in zoneA
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnameB)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+ create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
- log.debug('deleting object on zone A')
- k = get_key(zcA, bucket, objnameA)
- k.delete()
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name)
+ get_sync_policy_group(c1, "sync-bucket", bucket_name)
- zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zonegroup_meta_checkpoint(zonegroup)
- # delete bucket on zoneA. it should fail to delete
- log.debug('deleting bucket')
- assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+ remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name)
+ remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+ remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+ remove_sync_policy_group(c1, "sync-bucket", bucket_name)
- assert check_all_buckets_exist(zcA, buckets)
- assert check_all_buckets_exist(zcB, buckets)
+ get_sync_policy(c1, bucket_name)
- log.debug('deleting object on zone B')
- k = get_key(zcB, bucket, objnameB)
- k.delete()
- time.sleep(config.checkpoint_delay)
+ zonegroup_meta_checkpoint(zonegroup)
- # retry deleting bucket after removing the object from zone B. should succeed
- log.debug('retry deleting bucket')
- zcA.delete_bucket(bucketA.name)
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
+ remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1")
+ remove_sync_policy_group(c1, "sync-group")
- zonegroup_meta_checkpoint(zonegroup)
+ get_sync_policy(c1)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
+ zonegroup.period.update(z1, commit=True)
return
+@attr('fails_with_rgw')
@attr('sync_policy')
-def test_bucket_delete_with_bucket_sync_policy_symmetric():
+def test_sync_flow_symmetrical_zonegroup_all():
+ """
+ test_sync_flow_symmetrical_zonegroup_all:
+ allows sync from all the zones to all other zones (default case)
+ """
zonegroup = realm.master_zonegroup()
- zonegroup_conns = ZonegroupConns(zonegroup)
-
zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
(zoneA, zoneB) = zonegroup.zones[0:2]
(zcA, zcB) = zonegroup_conns.zones[0:2]
c1 = zoneA.cluster
- # configure sync policy
- zones = zoneA.name + ',' + zoneB.name
c1.admin(['sync', 'policy', 'get'])
+
+ zones = zoneA.name + ',' + zoneB.name
create_sync_policy_group(c1, "sync-group")
- create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
- set_sync_policy_group_status(c1, "sync-group", "allowed")
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- """
- configure symmetrical policy at bucketA level with src and dest
- zones specified to zoneA and zoneB resp.
- """
+ objnames = [ 'obj1', 'obj2' ]
+ content = 'asdasd'
+ buckets = []
- # configure sync policy for only bucketA and enable it
+ # create bucket & object in all zones
bucketA = create_zone_bucket(zcA)
- buckets = []
buckets.append(bucketA)
- create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
- create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
- create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name)
- set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
-
- get_sync_policy(c1, bucketA.name)
-
- zonegroup_meta_checkpoint(zonegroup)
-
- # create bucketA and objects in zoneA and zoneB
- objnameA = 'a'
- objnameB = 'b'
+ create_object(zcA, bucketA, objnames[0], content)
- # upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ bucketB = create_zone_bucket(zcB)
+ buckets.append(bucketB)
+ create_object(zcB, bucketB, objnames[1], content)
zonegroup_meta_checkpoint(zonegroup)
+ # 'zonegroup_data_checkpoint' currently fails for the zones not
+ # allowed to sync. So as a workaround, data checkpoint is done
+ # for only the ones configured.
zone_data_checkpoint(zoneB, zoneA)
- log.debug('deleting object A')
- k = get_key(zcA, bucketA, objnameA)
- k.delete()
-
- log.debug('deleting object B')
- k = get_key(zcA, bucketA, objnameB)
- k.delete()
-
- zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
- zone_data_checkpoint(zoneB, zoneA)
+ # verify if objects are synced accross the zone
+ bucket = get_bucket(zcB, bucketA.name)
+ check_object_exists(bucket, objnames[0], content)
- # delete bucket on zoneA.
- log.debug('deleting bucket')
- zcA.delete_bucket(bucketA.name)
- zonegroup_meta_checkpoint(zonegroup)
+ bucket = get_bucket(zcA, bucketB.name)
+ check_object_exists(bucket, objnames[1], content)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
+ remove_sync_policy_group(c1, "sync-group")
return
+@attr('fails_with_rgw')
@attr('sync_policy')
-def test_bucket_delete_with_zonegroup_sync_policy_symmetric():
+def test_sync_flow_symmetrical_zonegroup_select():
+ """
+ test_sync_flow_symmetrical_zonegroup_select:
+ allow sync between zoneA & zoneB
+ verify zoneC doesnt sync the data
+ """
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
+ if len(zonegroup.zones) < 3:
+ raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
zonegroup_meta_checkpoint(zonegroup)
- (zoneA, zoneB) = zonegroup.zones[0:2]
- (zcA, zcB) = zonegroup_conns.zones[0:2]
+ (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+ (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
c1 = zoneA.cluster
- # configure symmetric sync policy
+ # configure sync policy
zones = zoneA.name + ',' + zoneB.name
c1.admin(['sync', 'policy', 'get'])
create_sync_policy_group(c1, "sync-group")
zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- # configure sync policy for only bucketA and enable it
- bucketA = create_zone_bucket(zcA)
buckets = []
- buckets.append(bucketA)
-
- time.sleep(config.checkpoint_delay)
- zonegroup_meta_checkpoint(zonegroup)
+ content = 'asdasd'
- # create bucketA and objects in zoneA and zoneB
- objnameA = 'a'
- objnameB = 'b'
+ # create bucketA & objects in zoneA
+ objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ create_objects(zcA, bucketA, objnamesA, content)
- # upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ # create bucketB & objects in zoneB
+ objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+ bucketB = create_zone_bucket(zcB)
+ buckets.append(bucketB)
+ create_objects(zcB, bucketB, objnamesB, content)
zonegroup_meta_checkpoint(zonegroup)
zone_data_checkpoint(zoneB, zoneA)
+ zone_data_checkpoint(zoneA, zoneB)
- log.debug('deleting object A')
- k = get_key(zcA, bucketA, objnameA)
- k.delete()
+ # verify if objnamesA synced to only zoneB but not zoneC
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesA, content)
- log.debug('deleting object B')
- k = get_key(zcA, bucketA, objnameB)
- k.delete()
+ bucket = get_bucket(zcC, bucketA.name)
+ check_objects_not_exist(bucket, objnamesA)
- zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
- zone_data_checkpoint(zoneB, zoneA)
+ # verify if objnamesB synced to only zoneA but not zoneC
+ bucket = get_bucket(zcA, bucketB.name)
+ check_objects_exist(bucket, objnamesB, content)
- # delete bucket on zoneA.
- log.debug('deleting bucket')
- zcA.delete_bucket(bucketA.name)
- zonegroup_meta_checkpoint(zonegroup)
+ bucket = get_bucket(zcC, bucketB.name)
+ check_objects_not_exist(bucket, objnamesB)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
+ remove_sync_policy_group(c1, "sync-group")
return
+@attr('fails_with_rgw')
@attr('sync_policy')
-def test_bucket_delete_with_zonegroup_sync_policy_directional():
+def test_sync_flow_directional_zonegroup_select():
+ """
+ test_sync_flow_directional_zonegroup_select:
+ allow sync from only zoneA to zoneB
+
+ verify that data doesn't get synced to zoneC and
+ zoneA shouldn't sync data from zoneB either
+ """
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
+ if len(zonegroup.zones) < 3:
+ raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
zonegroup_meta_checkpoint(zonegroup)
- (zoneA, zoneB) = zonegroup.zones[0:2]
- (zcA, zcB) = zonegroup_conns.zones[0:2]
+ (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+ (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
c1 = zoneA.cluster
zones = zoneA.name + ',' + zoneB.name
c1.admin(['sync', 'policy', 'get'])
create_sync_policy_group(c1, "sync-group")
- create_sync_group_flow_directional(c1, "sync-group", "sync-flow1", zoneA.name, zoneB.name)
+ create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
set_sync_policy_group_status(c1, "sync-group", "enabled")
zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- # configure sync policy for only bucketA and enable it
- bucketA = create_zone_bucket(zcA)
buckets = []
- buckets.append(bucketA)
-
- time.sleep(config.checkpoint_delay)
- zonegroup_meta_checkpoint(zonegroup)
+ content = 'asdasd'
- # create bucketA and objects in zoneA and zoneB
- objnameA = 'a'
- objnameB = 'b'
+ # create bucketA & objects in zoneA
+ objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ create_objects(zcA, bucketA, objnamesA, content)
- # upload object in each zone and wait for sync.
- k = new_key(zcA, bucketA, objnameA)
- k.set_contents_from_string('foo')
- k = new_key(zcB, bucketA, objnameB)
- k.set_contents_from_string('foo')
+ # create bucketB & objects in zoneB
+ objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+ bucketB = create_zone_bucket(zcB)
+ buckets.append(bucketB)
+ create_objects(zcB, bucketB, objnamesB, content)
zonegroup_meta_checkpoint(zonegroup)
zone_data_checkpoint(zoneB, zoneA)
- # verify that objnameA is synced to bucketA in zoneB
+ # verify if objnamesA synced to only zoneB but not zoneC
bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnameA)
-
- # verify that objnameB is not synced to bucketA in zoneA
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnameB)
-
- log.debug('deleting object on zone A')
- k = get_key(zcA, bucket, objnameA)
- k.delete()
-
- zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
-
- # delete bucket on zoneA. it should fail to delete
- log.debug('deleting bucket')
- assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
-
- assert check_all_buckets_exist(zcA, buckets)
- assert check_all_buckets_exist(zcB, buckets)
-
- # retry deleting bucket after removing the object from zone B. should succeed
- log.debug('deleting object on zone B')
- k = get_key(zcB, bucket, objnameB)
- k.delete()
- time.sleep(config.checkpoint_delay)
-
- log.debug('retry deleting bucket')
- zcA.delete_bucket(bucketA.name)
+ check_objects_exist(bucket, objnamesA, content)
- zonegroup_meta_checkpoint(zonegroup)
+ bucket = get_bucket(zcC, bucketA.name)
+ check_objects_not_exist(bucket, objnamesA)
- assert check_all_buckets_dont_exist(zcA, buckets)
- assert check_all_buckets_dont_exist(zcB, buckets)
+ # verify if objnamesB are not synced to either zoneA or zoneC
+ bucket = get_bucket(zcA, bucketB.name)
+ check_objects_not_exist(bucket, objnamesB)
- return
+ bucket = get_bucket(zcC, bucketB.name)
+ check_objects_not_exist(bucket, objnamesB)
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_policy_config_zonegroup():
- """
- test_sync_policy_config_zonegroup:
- test configuration of all sync commands
"""
- zonegroup = realm.master_zonegroup()
- zonegroup_meta_checkpoint(zonegroup)
-
- zonegroup_conns = ZonegroupConns(zonegroup)
- z1, z2 = zonegroup.zones[0:2]
- c1, c2 = (z1.cluster, z2.cluster)
-
- zones = z1.name+","+z2.name
-
- c1.admin(['sync', 'policy', 'get'])
-
- # (a) zonegroup level
- create_sync_policy_group(c1, "sync-group")
- set_sync_policy_group_status(c1, "sync-group", "enabled")
- get_sync_policy_group(c1, "sync-group")
-
- get_sync_policy(c1)
+ verify the same at bucketA level
+ configure another policy at bucketA level with src and dest
+ zones specified to zoneA and zoneB resp.
+ verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
+ """
+ # reconfigure zonegroup pipe & flow
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
- create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
-
create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
- get_sync_policy_group(c1, "sync-group")
-
- zonegroup.period.update(z1, commit=True)
-
- # (b) bucket level
- zc1, zc2 = zonegroup_conns.zones[0:2]
- bucket = create_zone_bucket(zc1)
- bucket_name = bucket.name
- create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name)
- set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name)
- get_sync_policy_group(c1, "sync-bucket", bucket_name)
+ # change state to allowed
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
- get_sync_policy(c1, bucket_name)
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
- create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
- create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+ # configure sync policy for only bucketA and enable it
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+ args = ['--source-bucket=*', '--dest-bucket=*']
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
- create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name)
- get_sync_policy_group(c1, "sync-bucket", bucket_name)
+ get_sync_policy(c1, bucketA.name)
zonegroup_meta_checkpoint(zonegroup)
- remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name)
- remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
- remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
- remove_sync_policy_group(c1, "sync-bucket", bucket_name)
-
- get_sync_policy(c1, bucket_name)
+ # create objects in bucketA in zoneA and zoneB
+ objnamesC = [ 'obj7', 'obj8', 'obj9' ]
+ objnamesD = [ 'obj10', 'obj11', 'obj12' ]
+ create_objects(zcA, bucketA, objnamesC, content)
+ create_objects(zcB, bucketA, objnamesD, content)
zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
- remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
- remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
- remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1")
- remove_sync_policy_group(c1, "sync-group")
-
- get_sync_policy(c1)
+ # verify that objnamesC are synced to bucketA in zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesC, content)
- zonegroup.period.update(z1, commit=True)
+ # verify that objnamesD are not synced to bucketA in zoneA
+ bucket = get_bucket(zcA, bucketA.name)
+ check_objects_not_exist(bucket, objnamesD)
+ remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+ remove_sync_policy_group(c1, "sync-group")
return
@attr('fails_with_rgw')
@attr('sync_policy')
-def test_sync_flow_symmetrical_zonegroup_all():
+def test_sync_single_bucket():
"""
- test_sync_flow_symmetrical_zonegroup_all:
- allows sync from all the zones to all other zones (default case)
+ test_sync_single_bucket:
+ Allow data sync for only bucketA but not for other buckets via
+ below 2 methods
+
+ (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
+ (b) bucket level: configure policy for bucketA
"""
zonegroup = realm.master_zonegroup()
c1.admin(['sync', 'policy', 'get'])
zones = zoneA.name + ',' + zoneB.name
- create_sync_policy_group(c1, "sync-group")
- create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
- create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
- set_sync_policy_group_status(c1, "sync-group", "enabled")
-
- zonegroup.period.update(zoneA, commit=True)
get_sync_policy(c1)
- objnames = [ 'obj1', 'obj2' ]
+ objnames = [ 'obj1', 'obj2', 'obj3' ]
content = 'asdasd'
buckets = []
- # create bucket & object in all zones
+ # create bucketA & bucketB in zoneA
bucketA = create_zone_bucket(zcA)
buckets.append(bucketA)
- create_object(zcA, bucketA, objnames[0], content)
-
- bucketB = create_zone_bucket(zcB)
+ bucketB = create_zone_bucket(zcA)
buckets.append(bucketB)
- create_object(zcB, bucketB, objnames[1], content)
zonegroup_meta_checkpoint(zonegroup)
- # 'zonegroup_data_checkpoint' currently fails for the zones not
- # allowed to sync. So as a workaround, data checkpoint is done
- # for only the ones configured.
+
+ """
+ Method (a): configure pipe for only bucketA
+ """
+ # configure sync policy & pipe for only bucketA
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketA.name]
+
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+ get_sync_policy(c1)
+ zonegroup.period.update(zoneA, commit=True)
+
+ sync_info(c1)
+
+ # create objects in bucketA & bucketB
+ create_objects(zcA, bucketA, objnames, content)
+ create_object(zcA, bucketB, objnames, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
zone_data_checkpoint(zoneB, zoneA)
- # verify if objects are synced accross the zone
+ # verify if bucketA objects are synced
bucket = get_bucket(zcB, bucketA.name)
- check_object_exists(bucket, objnames[0], content)
+ check_objects_exist(bucket, objnames, content)
- bucket = get_bucket(zcA, bucketB.name)
- check_object_exists(bucket, objnames[1], content)
+ # bucketB objects should not be synced
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_not_exist(bucket, objnames)
- remove_sync_policy_group(c1, "sync-group")
- return
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_flow_symmetrical_zonegroup_select():
"""
- test_sync_flow_symmetrical_zonegroup_select:
- allow sync between zoneA & zoneB
- verify zoneC doesnt sync the data
+ Method (b): configure policy at only bucketA level
"""
+ # reconfigure group pipe
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
- zonegroup = realm.master_zonegroup()
- zonegroup_conns = ZonegroupConns(zonegroup)
+ # change state to allowed
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
- if len(zonegroup.zones) < 3:
- raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
-
- zonegroup_meta_checkpoint(zonegroup)
-
- (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
- (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
-
- c1 = zoneA.cluster
-
- # configure sync policy
- zones = zoneA.name + ',' + zoneB.name
- c1.admin(['sync', 'policy', 'get'])
- create_sync_policy_group(c1, "sync-group")
- create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
- create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
- set_sync_policy_group_status(c1, "sync-group", "enabled")
-
- zonegroup.period.update(zoneA, commit=True)
- get_sync_policy(c1)
-
- buckets = []
- content = 'asdasd'
-
- # create bucketA & objects in zoneA
- objnamesA = [ 'obj1', 'obj2', 'obj3' ]
- bucketA = create_zone_bucket(zcA)
- buckets.append(bucketA)
- create_objects(zcA, bucketA, objnamesA, content)
-
- # create bucketB & objects in zoneB
- objnamesB = [ 'obj4', 'obj5', 'obj6' ]
- bucketB = create_zone_bucket(zcB)
- buckets.append(bucketB)
- create_objects(zcB, bucketB, objnamesB, content)
-
- zonegroup_meta_checkpoint(zonegroup)
- zone_data_checkpoint(zoneB, zoneA)
- zone_data_checkpoint(zoneA, zoneB)
-
- # verify if objnamesA synced to only zoneB but not zoneC
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesA, content)
-
- bucket = get_bucket(zcC, bucketA.name)
- check_objects_not_exist(bucket, objnamesA)
-
- # verify if objnamesB synced to only zoneA but not zoneC
- bucket = get_bucket(zcA, bucketB.name)
- check_objects_exist(bucket, objnamesB, content)
-
- bucket = get_bucket(zcC, bucketB.name)
- check_objects_not_exist(bucket, objnamesB)
-
- remove_sync_policy_group(c1, "sync-group")
- return
-
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_flow_directional_zonegroup_select():
- """
- test_sync_flow_directional_zonegroup_select:
- allow sync from only zoneA to zoneB
-
- verify that data doesn't get synced to zoneC and
- zoneA shouldn't sync data from zoneB either
- """
-
- zonegroup = realm.master_zonegroup()
- zonegroup_conns = ZonegroupConns(zonegroup)
-
- if len(zonegroup.zones) < 3:
- raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
-
- zonegroup_meta_checkpoint(zonegroup)
-
- (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
- (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
-
- c1 = zoneA.cluster
-
- # configure sync policy
- zones = zoneA.name + ',' + zoneB.name
- c1.admin(['sync', 'policy', 'get'])
- create_sync_policy_group(c1, "sync-group")
- create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
- create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
- set_sync_policy_group_status(c1, "sync-group", "enabled")
-
- zonegroup.period.update(zoneA, commit=True)
- get_sync_policy(c1)
-
- buckets = []
- content = 'asdasd'
-
- # create bucketA & objects in zoneA
- objnamesA = [ 'obj1', 'obj2', 'obj3' ]
- bucketA = create_zone_bucket(zcA)
- buckets.append(bucketA)
- create_objects(zcA, bucketA, objnamesA, content)
-
- # create bucketB & objects in zoneB
- objnamesB = [ 'obj4', 'obj5', 'obj6' ]
- bucketB = create_zone_bucket(zcB)
- buckets.append(bucketB)
- create_objects(zcB, bucketB, objnamesB, content)
-
- zonegroup_meta_checkpoint(zonegroup)
- zone_data_checkpoint(zoneB, zoneA)
-
- # verify if objnamesA synced to only zoneB but not zoneC
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesA, content)
-
- bucket = get_bucket(zcC, bucketA.name)
- check_objects_not_exist(bucket, objnamesA)
-
- # verify if objnamesB are not synced to either zoneA or zoneC
- bucket = get_bucket(zcA, bucketB.name)
- check_objects_not_exist(bucket, objnamesB)
-
- bucket = get_bucket(zcC, bucketB.name)
- check_objects_not_exist(bucket, objnamesB)
-
- """
- verify the same at bucketA level
- configure another policy at bucketA level with src and dest
- zones specified to zoneA and zoneB resp.
-
- verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
- """
- # reconfigure zonegroup pipe & flow
- remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
- remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
- create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
- create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-
- # change state to allowed
- set_sync_policy_group_status(c1, "sync-group", "allowed")
-
- zonegroup.period.update(zoneA, commit=True)
- get_sync_policy(c1)
-
- # configure sync policy for only bucketA and enable it
- create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
- create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
- args = ['--source-bucket=*', '--dest-bucket=*']
- create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args)
- set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
-
- get_sync_policy(c1, bucketA.name)
-
- zonegroup_meta_checkpoint(zonegroup)
-
- # create objects in bucketA in zoneA and zoneB
- objnamesC = [ 'obj7', 'obj8', 'obj9' ]
- objnamesD = [ 'obj10', 'obj11', 'obj12' ]
- create_objects(zcA, bucketA, objnamesC, content)
- create_objects(zcB, bucketA, objnamesD, content)
-
- zonegroup_meta_checkpoint(zonegroup)
- zone_data_checkpoint(zoneB, zoneA)
-
- # verify that objnamesC are synced to bucketA in zoneB
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnamesC, content)
-
- # verify that objnamesD are not synced to bucketA in zoneA
- bucket = get_bucket(zcA, bucketA.name)
- check_objects_not_exist(bucket, objnamesD)
-
- remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
- remove_sync_policy_group(c1, "sync-group")
- return
-
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_single_bucket():
- """
- test_sync_single_bucket:
- Allow data sync for only bucketA but not for other buckets via
- below 2 methods
-
- (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
- (b) bucket level: configure policy for bucketA
- """
-
- zonegroup = realm.master_zonegroup()
- zonegroup_meta_checkpoint(zonegroup)
-
- zonegroup_conns = ZonegroupConns(zonegroup)
-
- (zoneA, zoneB) = zonegroup.zones[0:2]
- (zcA, zcB) = zonegroup_conns.zones[0:2]
-
- c1 = zoneA.cluster
-
- c1.admin(['sync', 'policy', 'get'])
-
- zones = zoneA.name + ',' + zoneB.name
- get_sync_policy(c1)
-
- objnames = [ 'obj1', 'obj2', 'obj3' ]
- content = 'asdasd'
- buckets = []
-
- # create bucketA & bucketB in zoneA
- bucketA = create_zone_bucket(zcA)
- buckets.append(bucketA)
- bucketB = create_zone_bucket(zcA)
- buckets.append(bucketB)
-
- zonegroup_meta_checkpoint(zonegroup)
-
- """
- Method (a): configure pipe for only bucketA
- """
- # configure sync policy & pipe for only bucketA
- create_sync_policy_group(c1, "sync-group")
- create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
- args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketA.name]
-
- create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args)
- set_sync_policy_group_status(c1, "sync-group", "enabled")
- get_sync_policy(c1)
- zonegroup.period.update(zoneA, commit=True)
-
- sync_info(c1)
-
- # create objects in bucketA & bucketB
- create_objects(zcA, bucketA, objnames, content)
- create_object(zcA, bucketB, objnames, content)
-
- zonegroup_meta_checkpoint(zonegroup)
- zone_data_checkpoint(zoneB, zoneA)
-
- # verify if bucketA objects are synced
- bucket = get_bucket(zcB, bucketA.name)
- check_objects_exist(bucket, objnames, content)
-
- # bucketB objects should not be synced
- bucket = get_bucket(zcB, bucketB.name)
- check_objects_not_exist(bucket, objnames)
-
-
- """
- Method (b): configure policy at only bucketA level
- """
- # reconfigure group pipe
- remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
- create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-
- # change state to allowed
- set_sync_policy_group_status(c1, "sync-group", "allowed")
-
- zonegroup.period.update(zoneA, commit=True)
- get_sync_policy(c1)
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
# configure sync policy for only bucketA and enable it
# check that object exists in destination bucket
k = get_key(dest, dest_bucket, objname)
assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_reject_versioning_identical():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ e = assert_raises(ClientError,
+ source.s3_client.put_bucket_replication,
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ })
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+
+@allow_bucket_replication
+def test_bucket_replicaion_reject_objectlock_identical():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket_name = gen_bucket_name()
+ dest.s3_client.create_bucket(Bucket=dest_bucket_name, ObjectLockEnabledForBucket=True)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ e = assert_raises(ClientError,
+ source.s3_client.put_bucket_replication,
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
+ }
+ }]
+ })
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+
+@allow_bucket_replication
+def test_bucket_replication_non_versioned_to_versioned():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [
+ {
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }
+ ]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # enable versioning on destination bucket
+ dest.s3_client.put_bucket_versioning(
+ Bucket=dest_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object not exists in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_versioned_to_non_versioned():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ response = source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [
+ {
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }
+ ]
+ }
+ )
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # enable versioning on source bucket
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object not exists in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_lock_enabled_to_lock_disabled():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket_name = gen_bucket_name()
+ source.create_bucket(source_bucket_name)
+ # enabled versioning
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket_name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ dest_bucket = dest.create_bucket(gen_bucket_name())
+ # enabled versioning
+ dest.s3_client.put_bucket_versioning(
+ Bucket=dest_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket_name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+ }
+ }]
+ }
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # enable object lock on source bucket
+ source.s3_client.put_object_lock_configuration(
+ Bucket=source_bucket_name,
+ ObjectLockConfiguration={
+ 'ObjectLockEnabled': 'Enabled',
+ 'Rule': {
+ 'DefaultRetention': {
+ 'Mode': 'GOVERNANCE',
+ 'Days': 1
+ }
+ }
+ }
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket_name, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_lock_disabled_to_lock_enabled():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ source = zonegroup_conns.non_account_rw_zones[0]
+ dest = zonegroup_conns.non_account_rw_zones[1]
+
+ source_bucket = source.create_bucket(gen_bucket_name())
+ # enabled versioning
+ source.s3_client.put_bucket_versioning(
+ Bucket=source_bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ dest_bucket_name = gen_bucket_name()
+ dest.create_bucket(dest_bucket_name)
+ # enabled versioning
+ dest.s3_client.put_bucket_versioning(
+ Bucket=dest_bucket_name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create replication configuration
+ source.s3_client.put_bucket_replication(
+ Bucket=source_bucket.name,
+ ReplicationConfiguration={
+ 'Role': '',
+ 'Rules': [{
+ 'ID': 'rule1',
+ 'Status': 'Enabled',
+ 'Destination': {
+ 'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
+ }
+ }]
+ }
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # enable object lock on destination bucket
+ dest.s3_client.put_object_lock_configuration(
+ Bucket=dest_bucket_name,
+ ObjectLockConfiguration={
+ 'ObjectLockEnabled': 'Enabled',
+ 'Rule': {
+ 'DefaultRetention': {
+ 'Mode': 'GOVERNANCE',
+ 'Days': 1
+ }
+ }
+ }
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an object and wait for sync.
+ objname = 'dummy'
+ k = new_key(source, source_bucket.name, objname)
+ k.set_contents_from_string('foo')
+ zone_data_checkpoint(dest.zone, source.zone)
+
+ # check that object does not exist in destination bucket
+ e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket_name, Key=objname)
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_zonegroup_sync_policy_directional():
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_directional(c1, "sync-group", "sync-flow1", zoneA.name, zoneB.name)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ # configure sync policy for only bucketA and enable it
+ bucketA = create_zone_bucket(zcA)
+ buckets = []
+ buckets.append(bucketA)
+
+ time.sleep(config.checkpoint_delay)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucketA and objects in zoneA and zoneB
+ objnameA = 'a'
+ objnameB = 'b'
+
+ # upload object in each zone and wait for sync.
+ k = new_key(zcA, bucketA, objnameA)
+ k.set_contents_from_string('foo')
+ k = new_key(zcB, bucketA, objnameB)
+ k.set_contents_from_string('foo')
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objnameA is synced to bucketA in zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnameA)
+
+ # verify that objnameB is not synced to bucketA in zoneA
+ bucket = get_bucket(zcA, bucketA.name)
+ check_objects_not_exist(bucket, objnameB)
+
+ log.debug('deleting object on zone A')
+ k = get_key(zcA, bucket, objnameA)
+ k.delete()
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+ # delete bucket on zoneA. it should fail to delete
+ log.debug('deleting bucket')
+ assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+ assert check_all_buckets_exist(zcA, buckets)
+ assert check_all_buckets_exist(zcB, buckets)
+
+ # retry deleting bucket after removing the object from zone B. should succeed
+ log.debug('deleting object on zone B')
+ k = get_key(zcB, bucket, objnameB)
+ k.delete()
+ time.sleep(config.checkpoint_delay)
+
+ log.debug('retry deleting bucket')
+ zcA.delete_bucket(bucketA.name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ assert check_all_buckets_dont_exist(zcA, buckets)
+ assert check_all_buckets_dont_exist(zcB, buckets)
+
+ remove_sync_policy_group(c1, "sync-group")
+
+ return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_bucket_sync_policy_directional():
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ """
+ configure policy at bucketA level with src and dest
+ zones specified to zoneA and zoneB resp.
+
+ verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
+ """
+
+ # configure sync policy for only bucketA and enable it
+ bucketA = create_zone_bucket(zcA)
+ buckets = []
+ buckets.append(bucketA)
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name)
+ #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+ time.sleep(config.checkpoint_delay)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucketA and objects in zoneA and zoneB
+ objnameA = 'a'
+ objnameB = 'b'
+
+ # upload object in each zone and wait for sync.
+ k = new_key(zcA, bucketA, objnameA)
+ k.set_contents_from_string('foo')
+ k = new_key(zcB, bucketA, objnameB)
+ k.set_contents_from_string('foo')
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objnameA is synced to bucketA in zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnameA)
+
+ # verify that objnameB is not synced to bucketA in zoneA
+ bucket = get_bucket(zcA, bucketA.name)
+ check_objects_not_exist(bucket, objnameB)
+
+ log.debug('deleting object on zone A')
+ k = get_key(zcA, bucket, objnameA)
+ k.delete()
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+ # delete bucket on zoneA. it should fail to delete
+ log.debug('deleting bucket')
+ assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+ assert check_all_buckets_exist(zcA, buckets)
+ assert check_all_buckets_exist(zcB, buckets)
+
+ log.debug('deleting object on zone B')
+ k = get_key(zcB, bucket, objnameB)
+ k.delete()
+ time.sleep(config.checkpoint_delay)
+
+ # retry deleting bucket after removing the object from zone B. should succeed
+ log.debug('retry deleting bucket')
+ zcA.delete_bucket(bucketA.name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ assert check_all_buckets_dont_exist(zcA, buckets)
+ assert check_all_buckets_dont_exist(zcB, buckets)
+
+ remove_sync_policy_group(c1, "sync-group")
+
+ return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_bucket_sync_policy_symmetric():
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ """
+ configure symmetrical policy at bucketA level with src and dest
+ zones specified to zoneA and zoneB resp.
+ """
+
+ # configure sync policy for only bucketA and enable it
+ bucketA = create_zone_bucket(zcA)
+ buckets = []
+ buckets.append(bucketA)
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucketA and objects in zoneA and zoneB
+ objnameA = 'a'
+ objnameB = 'b'
+
+ # upload object in each zone and wait for sync.
+ k = new_key(zcA, bucketA, objnameA)
+ k.set_contents_from_string('foo')
+ k = new_key(zcB, bucketA, objnameB)
+ k.set_contents_from_string('foo')
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ log.debug('deleting object A')
+ k = get_key(zcA, bucketA, objnameA)
+ k.delete()
+
+ log.debug('deleting object B')
+ k = get_key(zcA, bucketA, objnameB)
+ k.delete()
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # delete bucket on zoneA.
+ log.debug('deleting bucket')
+ zcA.delete_bucket(bucketA.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ assert check_all_buckets_dont_exist(zcA, buckets)
+ assert check_all_buckets_dont_exist(zcB, buckets)
+
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_zonegroup_sync_policy_symmetric():
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ # configure symmetric sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ # configure sync policy for only bucketA and enable it
+ bucketA = create_zone_bucket(zcA)
+ buckets = []
+ buckets.append(bucketA)
+
+ time.sleep(config.checkpoint_delay)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucketA and objects in zoneA and zoneB
+ objnameA = 'a'
+ objnameB = 'b'
+
+ # upload object in each zone and wait for sync.
+ k = new_key(zcA, bucketA, objnameA)
+ k.set_contents_from_string('foo')
+ k = new_key(zcB, bucketA, objnameB)
+ k.set_contents_from_string('foo')
+
+ zone_data_checkpoint(zoneB, zoneA)
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+ log.debug('deleting object A')
+ k = get_key(zcA, bucketA, objnameA)
+ k.delete()
+
+ log.debug('deleting object B')
+ k = get_key(zcA, bucketA, objnameB)
+ k.delete()
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # delete bucket on zoneA.
+ log.debug('deleting bucket')
+ zcA.delete_bucket(bucketA.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ assert check_all_buckets_dont_exist(zcA, buckets)
+ assert check_all_buckets_dont_exist(zcB, buckets)
+
+ remove_sync_flow(c1, "sync-group", "sync-flow", "symmetrical")
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_delete_bucket_with_zone_opt_out():
+ """
+ test_delete_bucket_with_zone_opt_out:
+ allow sync between zoneA & zoneB
+ verify zoneC doesnt sync the data
+ test delete bucket
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ if len(zonegroup.zones) < 3:
+ raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+ (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ bucketA = create_zone_bucket(zcA)
+ buckets = []
+ buckets.append(bucketA)
+
+ time.sleep(config.checkpoint_delay)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucketA and objects in zoneA and zoneB
+ objnameA = 'a'
+ objnameB = 'b'
+
+ # upload object in zone A and zone C
+ k = new_key(zcA, bucketA, objnameA)
+ k.set_contents_from_string('foo')
+ k = new_key(zcC, bucketA, objnameB)
+ k.set_contents_from_string('foo')
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objnameA is synced to zoneB but not zoneC
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnameA)
+
+ bucket = get_bucket(zcC, bucketA.name)
+ check_objects_not_exist(bucket, objnameA)
+
+ # verify that objnameB is not synced to either zoneA or zoneB
+ bucket = get_bucket(zcA, bucketA.name)
+ check_objects_not_exist(bucket, objnameB)
+
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_not_exist(bucket, objnameB)
+
+ log.debug('deleting object on zone A')
+ k = get_key(zcA, bucket, objnameA)
+ k.delete()
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+ # delete bucket on zoneA. it should fail to delete because zoneC still has objnameB
+ log.debug('deleting bucket')
+ assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+ assert check_all_buckets_exist(zcA, buckets)
+ assert check_all_buckets_exist(zcC, buckets)
+
+ # retry deleting bucket after removing the object from zone C. should succeed
+ log.debug('deleting object on zone C')
+ k = get_key(zcC, bucket, objnameB)
+ k.delete()
+ time.sleep(config.checkpoint_delay)
+
+ log.debug('retry deleting bucket')
+ zcA.delete_bucket(bucketA.name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ assert check_all_buckets_dont_exist(zcA, buckets)
+ assert check_all_buckets_dont_exist(zcC, buckets)
+
+ remove_sync_policy_group(c1, "sync-group")
+
+ return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_sync_policy_object_prefix():
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ """
+ configure symmetrical policy at bucketA level with src and dest
+ and apply object prefix filtering
+ """
+
+ # configure sync policy for only bucketA with an object prefix filter
+ bucketA = create_zone_bucket(zcA)
+ buckets = []
+ buckets.append(bucketA)
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
+ args = ['--prefix=test-']
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name, args)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucketA and objects in zoneA and zoneB
+ objnameA = 'test-a'
+ objnameB = 'b'
+
+ # upload object in each zone and wait for sync.
+ k = new_key(zcA, bucketA, objnameA)
+ k.set_contents_from_string('foo')
+ k = new_key(zcB, bucketA, objnameB)
+ k.set_contents_from_string('foo')
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objnameA is synced to zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_object_exists(bucket, objnameA)
+
+ # verify that objnameB is not synced to zoneA
+ bucket = get_bucket(zcA, bucketA.name)
+ check_object_not_exists(bucket, objnameB)
+
+ log.debug('deleting object A')
+ k = get_key(zcA, bucketA, objnameA)
+ k.delete()
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # delete bucket on zoneA. it should fail to delete because zoneB still has objnameB
+ log.debug('deleting bucket')
+ assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+ assert check_all_buckets_exist(zcA, buckets)
+ assert check_all_buckets_exist(zcB, buckets)
+
+ remove_sync_policy_group(c1, "sync-group")
+
+ return