]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/multisite: Add testcases for sync policy feature
authorSoumya Koduri <skoduri@redhat.com>
Wed, 18 Nov 2020 11:19:25 +0000 (16:49 +0530)
committerAdam C. Emerson <aemerson@redhat.com>
Fri, 13 Jan 2023 13:27:47 +0000 (08:27 -0500)
Added test cases for the various use-cases of multisite sync
policy feature listed in
https://docs.ceph.com/en/latest/radosgw/multisite-sync-policy/

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
src/test/rgw/rgw_multi/tests.py

index 51004ebaa6edb120e6fbf95126db5a7f53eb205e..156fac12e7f34444e64040b5c54c8cde97f9ca81 100644 (file)
@@ -17,6 +17,7 @@ from boto.s3.website import WebsiteConfiguration
 from boto.s3.cors import CORSConfiguration
 
 from nose.tools import eq_ as eq
+from nose.tools import assert_not_equal, assert_equal
 from nose.plugins.attrib import attr
 from nose.plugins.skip import SkipTest
 
@@ -1902,3 +1903,959 @@ def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init():
     # expect all generations to replicate via 'bucket incremental sync'
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
     zonegroup_data_checkpoint(zonegroup_conns)
+
+def sync_info(cluster, bucket = None):
+    cmd = ['sync', 'info']
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to get sync policy'
+
+    return json.loads(result_json)
+
+def get_sync_policy(cluster, bucket = None):
+    cmd = ['sync', 'policy', 'get']
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to get sync policy'
+
+    return json.loads(result_json)
+
+def create_sync_policy_group(cluster, group, status = "allowed", bucket = None):
+    cmd = ['sync', 'group', 'create', '--group-id', group, '--status' , status]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to create sync policy group id=%s, bucket=%s' % (group, bucket)
+    return json.loads(result_json)
+
+def set_sync_policy_group_status(cluster, group, status, bucket = None):
+    cmd = ['sync', 'group', 'modify', '--group-id', group, '--status' , status]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to set sync policy group id=%s, bucket=%s' % (group, bucket)
+    return json.loads(result_json)
+
+def get_sync_policy_group(cluster, group, bucket = None):
+    cmd = ['sync', 'group', 'get', '--group-id', group]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to get sync policy group id=%s, bucket=%s' % (group, bucket)
+    return json.loads(result_json)
+
+def remove_sync_policy_group(cluster, group, bucket = None):
+    cmd = ['sync', 'group', 'remove', '--group-id', group]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group, bucket)
+    return json.loads(result_json)
+
+def create_sync_group_flow_symmetrical(cluster, group, flow_id, zones, bucket = None):
+    cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical', '--zones=%s' % zones]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to create sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket)
+    return json.loads(result_json)
+
+def create_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None):
+    cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to create sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket)
+    return json.loads(result_json)
+
+def remove_sync_group_flow_symmetrical(cluster, group, flow_id, zones = None, bucket = None):
+    cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical']
+    if zones:
+        cmd += ['--zones=%s' % zones]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to remove sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket)
+    return json.loads(result_json)
+
+def remove_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None):
+    cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to remove sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket)
+    return json.loads(result_json)
+
+def create_sync_group_pipe(cluster, group, pipe_id, src_zones, dest_zones, bucket = None, args = []):
+    cmd = ['sync', 'group', 'pipe', 'create', '--group-id', group, '--pipe-id' , pipe_id, '--source-zones=%s' % src_zones, '--dest-zones=%s' % dest_zones]
+    if bucket:
+        b_args = '--bucket=' + bucket
+        cmd.append(b_args)
+    if args:
+        cmd += args
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to create sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket)
+    return json.loads(result_json)
+
+def remove_sync_group_pipe(cluster, group, pipe_id, bucket = None, args = None):
+    cmd = ['sync', 'group', 'pipe', 'remove', '--group-id', group, '--pipe-id' , pipe_id]
+    if bucket:
+        b_args = '--bucket=' + bucket
+        cmd.append(b_args)
+    if args:
+        cmd.append(args)
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to remove sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket)
+    return json.loads(result_json)
+
+def create_zone_bucket(zone):
+    b_name = gen_bucket_name()
+    log.info('create bucket zone=%s name=%s', zone.name, b_name)
+    bucket = zone.create_bucket(b_name)
+    return bucket
+
+def create_object(zone_conn, bucket, objname, content):
+    k = new_key(zone_conn, bucket.name, objname)
+    k.set_contents_from_string(content)
+
+def create_objects(zone_conn, bucket, obj_arr, content):
+    for objname in obj_arr:
+        create_object(zone_conn, bucket, objname, content)
+
+def check_object_exists(bucket, objname, content = None):
+    k = bucket.get_key(objname)
+    assert_not_equal(k, None)
+    if (content != None):
+        assert_equal(k.get_contents_as_string(encoding='ascii'), content)
+
+def check_objects_exist(bucket, obj_arr, content = None):
+    for objname in obj_arr:
+        check_object_exists(bucket, objname, content)
+
+def check_object_not_exists(bucket, objname):
+    k = bucket.get_key(objname)
+    assert_equal(k, None)
+
+def check_objects_not_exist(bucket, obj_arr):
+    for objname in obj_arr:
+        check_object_not_exists(bucket, objname)
+
+@attr('sync_policy')
+def test_sync_policy_config_zonegroup():
+    """
+    test_sync_policy_config_zonegroup:
+        test configuration of all sync commands
+    """
+    zonegroup = realm.master_zonegroup()
+    zonegroup_meta_checkpoint(zonegroup)
+
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    z1, z2 = zonegroup.zones[0:2]
+    c1, c2 = (z1.cluster, z2.cluster)
+
+    zones = z1.name+","+z2.name
+
+    c1.admin(['sync', 'policy', 'get'])
+
+    # (a) zonegroup level
+    create_sync_policy_group(c1, "sync-group")
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+    get_sync_policy_group(c1, "sync-group")
+
+    get_sync_policy(c1)
+
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
+
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    get_sync_policy_group(c1, "sync-group")
+
+    zonegroup.period.update(z1, commit=True)
+
+    # (b)  bucket level
+    zc1, zc2 = zonegroup_conns.zones[0:2]
+    bucket = create_zone_bucket(zc1)
+    bucket_name = bucket.name
+
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name)
+    get_sync_policy_group(c1, "sync-bucket", bucket_name)
+
+    get_sync_policy(c1, bucket_name)
+
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+    create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name)
+    get_sync_policy_group(c1, "sync-bucket", bucket_name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name)
+    remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+    remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+    remove_sync_policy_group(c1, "sync-bucket", bucket_name)
+
+    get_sync_policy(c1, bucket_name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
+    remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1")
+    remove_sync_policy_group(c1, "sync-group")
+
+    get_sync_policy(c1)
+
+    zonegroup.period.update(z1, commit=True)
+
+    return
+
+@attr('sync_policy')
+def test_sync_flow_symmetrical_zonegroup_all():
+    """
+    test_sync_flow_symmetrical_zonegroup_all:
+        allows sync from all the zones to all other zones (default case)
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_meta_checkpoint(zonegroup)
+
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+    c1 = zoneA.cluster
+
+    c1.admin(['sync', 'policy', 'get'])
+
+    zones = zoneA.name + ',' + zoneB.name
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    objnames = [ 'obj1', 'obj2' ]
+    content = 'asdasd'
+    buckets = []
+
+    # create bucket & object in all zones
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    create_object(zcA, bucketA, objnames[0], content)
+
+    bucketB = create_zone_bucket(zcB)
+    buckets.append(bucketB)
+    create_object(zcB, bucketB, objnames[1], content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    # 'zonegroup_data_checkpoint' currently fails for the zones not
+    # allowed to sync. So as a workaround, data checkpoint is done
+    # for only the ones configured.
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify if objects are synced accross the zone
+    bucket = get_bucket(zcB, bucketA.name)
+    check_object_exists(bucket, objnames[0], content)
+
+    bucket = get_bucket(zcA, bucketB.name)
+    check_object_exists(bucket, objnames[1], content)
+
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+def test_sync_flow_symmetrical_zonegroup_select():
+    """
+    test_sync_flow_symmetrical_zonegroup_select:
+        allow sync between zoneA & zoneB
+        verify zoneC doesnt sync the data
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    if len(zonegroup.zones) < 3:
+        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+    (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
+
+    c1 = zoneA.cluster
+
+    # configure sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    buckets = []
+    content = 'asdasd'
+
+    # create bucketA & objects in zoneA
+    objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    create_objects(zcA, bucketA, objnamesA, content)
+
+    # create bucketB & objects in zoneB
+    objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+    bucketB = create_zone_bucket(zcB)
+    buckets.append(bucketB)
+    create_objects(zcB, bucketB, objnamesB, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+    zone_data_checkpoint(zoneA, zoneB)
+
+    # verify if objnamesA synced to only zoneB but not zoneC
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesA, content)
+
+    bucket = get_bucket(zcC, bucketA.name)
+    check_objects_not_exist(bucket, objnamesA)
+
+    # verify if objnamesB synced to only zoneA but not zoneC
+    bucket = get_bucket(zcA, bucketB.name)
+    check_objects_exist(bucket, objnamesB, content)
+
+    bucket = get_bucket(zcC, bucketB.name)
+    check_objects_not_exist(bucket, objnamesB)
+
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+def test_sync_flow_directional_zonegroup_select():
+    """
+    test_sync_flow_directional_zonegroup_select:
+        allow sync from only zoneA to zoneB
+        
+        verify that data doesn't get synced to zoneC and
+        zoneA shouldn't sync data from zoneB either
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    if len(zonegroup.zones) < 3:
+        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+    (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
+
+    c1 = zoneA.cluster
+
+    # configure sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    buckets = []
+    content = 'asdasd'
+
+    # create bucketA & objects in zoneA
+    objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    create_objects(zcA, bucketA, objnamesA, content)
+
+    # create bucketB & objects in zoneB
+    objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+    bucketB = create_zone_bucket(zcB)
+    buckets.append(bucketB)
+    create_objects(zcB, bucketB, objnamesB, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify if objnamesA synced to only zoneB but not zoneC
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesA, content)
+
+    bucket = get_bucket(zcC, bucketA.name)
+    check_objects_not_exist(bucket, objnamesA)
+
+    # verify if objnamesB are not synced to either zoneA or zoneC
+    bucket = get_bucket(zcA, bucketB.name)
+    check_objects_not_exist(bucket, objnamesB)
+
+    bucket = get_bucket(zcC, bucketB.name)
+    check_objects_not_exist(bucket, objnamesB)
+
+    """
+        verify the same at bucketA level
+        configure another policy at bucketA level with src and dest
+        zones specified to zoneA and zoneB resp.
+
+        verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
+    """
+    # reconfigure zonegroup pipe & flow
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+    # change state to allowed
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    # configure sync policy for only bucketA and enable it
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+    args = ['--source-bucket=*', '--dest-bucket=*']
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1, bucketA.name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create objects in bucketA in zoneA and zoneB
+    objnamesC = [ 'obj7', 'obj8', 'obj9' ]
+    objnamesD = [ 'obj10', 'obj11', 'obj12' ]
+    create_objects(zcA, bucketA, objnamesC, content)
+    create_objects(zcB, bucketA, objnamesD, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that objnamesC are synced to bucketA in zoneB
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesC, content)
+
+    # verify that objnamesD are not synced to bucketA in zoneA
+    bucket = get_bucket(zcA, bucketA.name)
+    check_objects_not_exist(bucket, objnamesD)
+
+    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+def test_sync_single_bucket():
+    """
+    test_sync_single_bucket:
+        Allow data sync for only bucketA but not for other buckets via
+        below 2 methods
+
+        (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
+        (b) bucket level: configure policy for bucketA
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_meta_checkpoint(zonegroup)
+
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+    c1 = zoneA.cluster
+
+    c1.admin(['sync', 'policy', 'get'])
+
+    zones = zoneA.name + ',' + zoneB.name
+    get_sync_policy(c1)
+
+    objnames = [ 'obj1', 'obj2', 'obj3' ]
+    content = 'asdasd'
+    buckets = []
+
+    # create bucketA & bucketB in zoneA
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    bucketB = create_zone_bucket(zcA)
+    buckets.append(bucketB)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    """
+        Method (a): configure pipe for only bucketA
+    """
+    # configure sync policy & pipe for only bucketA
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    args = ['--source-bucket=' +  bucketA.name, '--dest-bucket=' + bucketA.name]
+
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+    get_sync_policy(c1)
+    zonegroup.period.update(zoneA, commit=True)
+    
+    sync_info(c1)
+
+    # create objects in bucketA & bucketB
+    create_objects(zcA, bucketA, objnames, content)
+    create_object(zcA, bucketB, objnames, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify if bucketA objects are synced
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnames, content)
+
+    # bucketB objects should not be synced
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_not_exist(bucket, objnames)
+
+
+    """
+        Method (b): configure policy at only bucketA level 
+    """
+    # reconfigure group pipe
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+    # change state to allowed
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+
+    # configure sync policy for only bucketA and enable it
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1, bucketA.name)
+
+    # create object in bucketA
+    create_object(zcA, bucketA, objnames[2], content)
+
+    # create object in bucketA too
+    create_object(zcA, bucketB, objnames[2], content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify if bucketA objects are synced
+    bucket = get_bucket(zcB, bucketA.name)
+    check_object_exists(bucket, objnames[2], content)
+
+    # bucketB objects should not be synced
+    bucket = get_bucket(zcB, bucketB.name)
+    check_object_not_exists(bucket, objnames[2])
+
+    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+def test_sync_different_buckets():
+    """
+    test_sync_different_buckets:
+        sync zoneA bucketA to zoneB bucketB via below methods
+
+        (a) zonegroup: directional flow but configure pipe for zoneA bucketA to zoneB bucketB
+        (b) bucket: configure another policy at bucketA level with pipe set to
+        another bucket(bucketB) in target zone.
+
+        sync zoneA bucketA from zoneB bucketB
+        (c) configure another policy at bucketA level with pipe set from
+        another bucket(bucketB) in source zone.
+
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_meta_checkpoint(zonegroup)
+
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+    zones = zoneA.name + ',' + zoneB.name
+
+    c1 = zoneA.cluster
+
+    c1.admin(['sync', 'policy', 'get'])
+
+    objnames = [ 'obj1', 'obj2' ]
+    objnamesB = [ 'obj3', 'obj4' ]
+    content = 'asdasd'
+    buckets = []
+
+    # create bucketA & bucketB in zoneA
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    bucketB = create_zone_bucket(zcA)
+    buckets.append(bucketB)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    """
+        Method (a): zonegroup - configure pipe for only bucketA
+    """
+    # configure pipe from zoneA bucketA to zoneB bucketB
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    args = ['--source-bucket=' +  bucketA.name, '--dest-bucket=' + bucketB.name]
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    # create objects in bucketA
+    create_objects(zcA, bucketA, objnames, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that objects are synced to bucketB in zoneB
+    # but not to bucketA
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_not_exist(bucket, objnames)
+
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_exist(bucket, objnames, content)
+    """
+        Method (b): configure policy at only bucketA level with pipe
+        set to bucketB in target zone
+    """
+
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+    # change state to allowed
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    # configure sync policy for only bucketA and enable it
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+    args = ['--source-bucket=*', '--dest-bucket=' + bucketB.name]
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipeA", zones, zones, bucketA.name, args)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1, bucketA.name)
+
+    objnamesC = [ 'obj5', 'obj6' ]
+
+    zonegroup_meta_checkpoint(zonegroup)
+    # create objects in bucketA
+    create_objects(zcA, bucketA, objnamesC, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    """
+    # verify that objects are synced to bucketB in zoneB
+    # but not to bucketA
+    """
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_not_exist(bucket, objnamesC)
+
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_exist(bucket, objnamesC, content)
+
+    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+    zonegroup_meta_checkpoint(zonegroup)
+    get_sync_policy(c1, bucketA.name)
+
+    """
+        Method (c): configure policy at only bucketA level with pipe
+        set from bucketB in source zone
+        verify zoneA bucketA syncs from zoneB BucketB but not bucketA
+    """
+
+    # configure sync policy for only bucketA and enable it
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+    args = ['--source-bucket=' +  bucketB.name, '--dest-bucket=' + '*']
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name, args)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1, bucketA.name)
+
+    # create objects in bucketA & B in ZoneB
+    objnamesD = [ 'obj7', 'obj8' ]
+    objnamesE = [ 'obj9', 'obj10' ]
+
+    create_objects(zcB, bucketA, objnamesD, content)
+    create_objects(zcB, bucketB, objnamesE, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneA, zoneB)
+    """
+    # verify that objects from only bucketB are synced to
+    # bucketA in zoneA
+    """
+    bucket = get_bucket(zcA, bucketA.name)
+    check_objects_not_exist(bucket, objnamesD)
+    check_objects_exist(bucket, objnamesE, content)
+
+    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+def test_sync_multiple_buckets_to_single():
+    """
+    test_sync_multiple_buckets_to_single:
+        directional flow
+        (a) pipe: sync zoneA bucketA,bucketB to zoneB bucketB
+
+        (b) configure another policy at bucketA level with pipe configured
+        to sync from multiple buckets (bucketA & bucketB)
+
+        verify zoneA bucketA & bucketB syncs to zoneB BucketB
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_meta_checkpoint(zonegroup)
+
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+    zones = zoneA.name + ',' + zoneB.name
+
+    c1 = zoneA.cluster
+
+    c1.admin(['sync', 'policy', 'get'])
+
+    objnamesA = [ 'obj1', 'obj2' ]
+    objnamesB = [ 'obj3', 'obj4' ]
+    content = 'asdasd'
+    buckets = []
+
+    # create bucketA & bucketB in zoneA
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    bucketB = create_zone_bucket(zcA)
+    buckets.append(bucketB)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # configure pipe from zoneA bucketA,bucketB to zoneB bucketB
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+    source_buckets = [ bucketA.name, bucketB.name ]
+    for source_bucket in source_buckets:
+        args = ['--source-bucket=' +  source_bucket, '--dest-bucket=' + bucketB.name]
+        create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, None, args)
+
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    # create objects in bucketA & bucketB
+    create_objects(zcA, bucketA, objnamesA, content)
+    create_objects(zcA, bucketB, objnamesB, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that both zoneA bucketA & bucketB objects are synced to
+    # bucketB in zoneB but not to bucketA
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_not_exist(bucket, objnamesA)
+    check_objects_not_exist(bucket, objnamesB)
+
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_exist(bucket, objnamesA, content)
+    check_objects_exist(bucket, objnamesB, content)
+
+    """
+        Method (b): configure at bucket level
+    """
+    # reconfigure pipe & flow
+    for source_bucket in source_buckets:
+        remove_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket)
+    remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+    # change state to allowed
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    objnamesC = [ 'obj5', 'obj6' ]
+    objnamesD = [ 'obj7', 'obj8' ]
+
+    # configure sync policy for only bucketA and enable it
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+    source_buckets = [ bucketA.name, bucketB.name ]
+    for source_bucket in source_buckets:
+        args = ['--source-bucket=' +  source_bucket, '--dest-bucket=' + '*']
+        create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, bucketA.name, args)
+
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    # create objects in bucketA
+    create_objects(zcA, bucketA, objnamesC, content)
+    create_objects(zcA, bucketB, objnamesD, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that both zoneA bucketA & bucketB objects are synced to
+    # bucketA in zoneB but not to bucketB
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_not_exist(bucket, objnamesC)
+    check_objects_not_exist(bucket, objnamesD)
+
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesD, content)
+    check_objects_exist(bucket, objnamesD, content)
+
+    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+def test_sync_single_bucket_to_multiple():
+    """
+    test_sync_single_bucket_to_multiple:
+        directional flow
+        (a) pipe: sync zoneA bucketA to zoneB bucketA & bucketB
+
+        (b) configure another policy at bucketA level with pipe configured
+        to sync to multiple buckets (bucketA & bucketB)
+
+        verify zoneA bucketA syncs to zoneB bucketA & bucketB
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_meta_checkpoint(zonegroup)
+
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+    zones = zoneA.name + ',' + zoneB.name
+
+    c1 = zoneA.cluster
+
+    c1.admin(['sync', 'policy', 'get'])
+
+    objnamesA = [ 'obj1', 'obj2' ]
+    content = 'asdasd'
+    buckets = []
+
+    # create bucketA & bucketB in zoneA
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    bucketB = create_zone_bucket(zcA)
+    buckets.append(bucketB)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # configure pipe from zoneA bucketA to zoneB bucketA, bucketB
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+
+    dest_buckets = [ bucketA.name, bucketB.name ]
+    for dest_bucket in dest_buckets:
+        args = ['--source-bucket=' +  bucketA.name, '--dest-bucket=' + dest_bucket]
+        create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, None, args)
+
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    # create objects in bucketA
+    create_objects(zcA, bucketA, objnamesA, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that objects from zoneA bucketA are synced to both
+    # bucketA & bucketB in zoneB
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesA, content)
+
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_exist(bucket, objnamesA, content)
+
+    """
+        Method (b): configure at bucket level
+    """
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", '*', '*')
+
+    # change state to allowed
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    objnamesB = [ 'obj3', 'obj4' ]
+
+    # configure sync policy for only bucketA and enable it
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+    dest_buckets = [ bucketA.name, bucketB.name ]
+    for dest_bucket in dest_buckets:
+        args = ['--source-bucket=' + '*', '--dest-bucket=' + dest_bucket]
+        create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, bucketA.name, args)
+
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    # create objects in bucketA
+    create_objects(zcA, bucketA, objnamesB, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that objects from zoneA bucketA are synced to both
+    # bucketA & bucketB in zoneB
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesB, content)
+
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_exist(bucket, objnamesB, content)
+
+    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+    remove_sync_policy_group(c1, "sync-group")
+    return