def check_objects_not_exist(bucket, obj_arr):
for objname in obj_arr:
check_object_not_exists(bucket, objname)
+
+@attr('sync_policy')
+def test_bucket_delete_with_sync_policy():
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ """
+ configure policy at bucketA level with src and dest
+ zones specified to zoneA and zoneB resp.
+
+ verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
+ """
+
+ # configure sync policy for only bucketA and enable it
+ bucketA = create_zone_bucket(zcA)
+ buckets = []
+ buckets.append(bucketA)
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name)
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucketA and objects in zoneA and zoneB
+ objnameA = 'a'
+ objnameB = 'b'
+
+ # upload object in each zone and wait for sync.
+ k = new_key(zcA, bucketA, objnameA)
+ k.set_contents_from_string('foo')
+ k = new_key(zcB, bucketA, objnameB)
+ k.set_contents_from_string('foo')
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objnameA is synced to bucketA in zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnameA)
+
+ # verify that objnameB is not synced to bucketA in zoneA
+ bucket = get_bucket(zcA, bucketA.name)
+ check_objects_not_exist(bucket, objnameB)
+
+ log.debug('deleting object')
+ k = get_key(zcA, bucket, objnameA)
+ k.delete()
+
+ zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+ # delete bucket on zoneA. it should fail to delete
+ log.debug('deleting bucket')
+ assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+ assert check_all_buckets_exist(zcA, buckets)
+ assert check_all_buckets_exist(zcB, buckets)
+ return
+
@attr('fails_with_rgw')
@attr('sync_policy')