]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: account for zones opted out of sync and object filters 53799/head
authorShilpa Jagannath <smanjara@redhat.com>
Fri, 10 Jan 2025 07:39:47 +0000 (02:39 -0500)
committerShilpa Jagannath <smanjara@redhat.com>
Fri, 4 Apr 2025 17:18:41 +0000 (13:18 -0400)
during bucket deletion

Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/rgw/driver/rados/rgw_rados.cc
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h
src/test/rgw/rgw_multi/tests.py

index e85fc4a1c6889c770fff983d110f471555f230d6..ebdea6f6e4a26218cb466da040e5eca59631d5af 100644 (file)
@@ -5621,23 +5621,16 @@ int get_zone_ids(const DoutPrefixProvider *dpp,
 int list_remote_buckets(const DoutPrefixProvider *dpp,
                                               rgw::sal::RadosStore* const driver,
                        const rgw_zone_id source_zone,
+                       std::vector<rgw_zone_id> zids,
                        const rgw_bucket& bucket,
                        optional_yield y)
 {
-
-  std::vector<rgw_zone_id> zids;
-  int ret = get_zone_ids(dpp, driver, source_zone, bucket, zids, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 10) << "failed to get remote zones (r=" << ret << ")" << dendl;
-    return ret;
-  }
-
   std::vector<bucket_unordered_list_result> peer_status;
   peer_status.resize(zids.size());
 
   RGWCoroutinesManager crs(driver->ctx(), driver->getRados()->get_cr_registry());
   RGWHTTPManager http(driver->ctx(), crs.get_completion_mgr());
-  ret = http.start();
+  int ret = http.start();
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
     return ret;
@@ -5657,6 +5650,62 @@ int list_remote_buckets(const DoutPrefixProvider *dpp,
   return 0;
 }
 
+int check_remote_bucket_empty(const DoutPrefixProvider* dpp,
+                              rgw::sal::RadosStore* const driver,
+                              const rgw_zone_id source_zone,
+                              RGWBucketInfo& bucket_info,
+                              std::map<rgw_zone_id, RGWZone> zones,
+                              const rgw_sync_policy_info* zg_sync_policy,
+                              optional_yield y)
+{
+  std::vector<rgw_zone_id> zids;
+  int ret = get_zone_ids(dpp, driver, source_zone, bucket_info.bucket, zids, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 10) << "failed to get remote zones (r=" << ret << ")" << dendl;
+    return ret;
+  }
+
+  /* when sync policy is configured to be unidirectional or some zones
+     opted out of sync or due to object filtering with prefix or tags,
+     the source and destination buckets can diverge. check for all such
+     cases, list the buckets in those zones and return enotempty */
+
+  bool is_zg_policy_directional = false;
+  if (zg_sync_policy) {
+    is_zg_policy_directional = zg_sync_policy->is_directional();
+  }
+
+  bool is_bucket_policy_directional = false;
+  bool bucket_has_filter = false;
+  auto bucket_sync_policy = bucket_info.sync_policy;
+  if (bucket_sync_policy) {
+    is_bucket_policy_directional = bucket_sync_policy->is_directional();
+    bucket_has_filter = bucket_sync_policy->has_filter();
+  }
+
+  if (is_zg_policy_directional || is_bucket_policy_directional ||
+      bucket_has_filter) {
+    ldpp_dout(dpp, 10) << "sync policy exists. listing remote zones" << dendl;
+    return list_remote_buckets(dpp, driver, source_zone, zids, bucket_info.bucket, y);
+  }
+
+  std::vector<rgw_zone_id> opt_out_zones; // zones not participating in sync
+  for (const auto& z : zones) {
+    if (std::find(zids.begin(), zids.end(), z.first) == zids.end()) {
+      if (z.first == source_zone) {
+        continue;
+      }
+      opt_out_zones.push_back(z.first);
+    }
+  }
+
+  if (!opt_out_zones.empty()) {
+    ldpp_dout(dpp, 10) << "sync policy exists. listing remote zones" << dendl;
+    return list_remote_buckets(dpp, driver, source_zone, opt_out_zones, bucket_info.bucket, y);
+  }
+  return 0;
+}
+
 /**
  * Delete a bucket.
  * bucket: the name of the bucket to delete
@@ -5678,30 +5727,19 @@ int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, std::map<std::string, bu
     }
   }
 
-  // delete_bucket checks for objects in the bucket on other zones,
-  // if there is bucket sync policy configured, by doing unordered
-  // listing with max_key=1. if objects are found, don't delete the bucket.
+  // check if asymmetric replication policy exists either at zonegroup or bucket level.
+  // if objects still exist in remote zones, don't delete the bucket.
   if (svc.zone->is_syncing_bucket_meta()) {
-    // check if asymmetric replication policy exists either at zonegroup or bucket level
-    auto zg_sync_policy = svc.zone->get_zonegroup().sync_policy;
-    bool is_zg_policy_directional = zg_sync_policy.is_directional();
-
-    bool is_bucket_policy_directional = false;
-    auto bucket_sync_policy = bucket_info.sync_policy;
-    if (bucket_sync_policy) {
-      is_bucket_policy_directional = bucket_sync_policy->is_directional();
-    }
-    if (is_zg_policy_directional || is_bucket_policy_directional) {
-      ldpp_dout(dpp, 10) << "sync policy exists. listing remote zones" << dendl;
-      const rgw_zone_id source_zone = svc.zone->get_zone_params().get_id();
-      r = list_remote_buckets(dpp, driver, source_zone, bucket, y);
-      if (r == -ENOTEMPTY) {
-        ldpp_dout(dpp, 0) << "ERROR: cannot delete bucket. objects exist in the bucket on another zone " << dendl;
-        return r;
-      } else if (r < 0) {
-        ldpp_dout(dpp, 10) << "failed to list remote buckets" << dendl;
-        // don't return.
-      }
+    const rgw_zone_id source_zone = svc.zone->get_zone_params().get_id();
+    int r = check_remote_bucket_empty(dpp, driver, source_zone, bucket_info,
+                                      svc.zone->get_zonegroup().zones,
+                                      &svc.zone->get_zonegroup().sync_policy, y);
+    if (r == -ENOTEMPTY) {
+      ldpp_dout(dpp, 0) << "ERROR: cannot delete bucket. objects exist in the bucket on another zone " << dendl;
+      return r;
+    } else if (r < 0) {
+      ldpp_dout(dpp, 10) << "failed to list remote buckets" << dendl;
+      // don't return.
     }
   }
 
index b65752959e9c93bb456d2656e2f7999030a62c00..129d11a1f8fc6a153990348809c40c8bb56fbd8b 100644 (file)
@@ -150,6 +150,11 @@ bool rgw_sync_pipe_filter::has_tags() const
   return !tags.empty();
 }
 
+bool rgw_sync_pipe_filter::has_prefix() const
+{
+  return prefix.has_value();
+}
+
 bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& _tags) const
 {
   if (tags.empty()) {
index fe720cb619583bcab75bb0015ba1e23d7ff89f93..78b4eb31ea4b41d13f9e49b70a0ba3fb97a90eb9 100644 (file)
@@ -240,6 +240,7 @@ struct rgw_sync_pipe_filter {
   bool is_subset_of(const rgw_sync_pipe_filter& f) const;
 
   bool has_tags() const;
+  bool has_prefix() const;
   bool check_tag(const std::string& s) const;
   bool check_tag(const std::string& k, const std::string& v) const;
   bool check_tags(const std::vector<std::string>& tags) const;
@@ -685,6 +686,19 @@ struct rgw_sync_policy_info {
     return false;
   }
 
+  bool has_filter() const {
+    for (auto& item : groups) {
+      auto& group = item.second;
+      for (auto& p : group.pipes) {
+        auto& filter = p.params.source.filter;
+        if (filter.has_prefix() || filter.has_tags()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   void get_potential_related_buckets(const rgw_bucket& bucket,
                                      std::set<rgw_bucket> *sources,
                                      std::set<rgw_bucket> *dests) const;
index b914102d137ecc8094c79ae36fbaf5f3b367edaa..17d2cc4f80e5bddc3be4da85a5de043340fe2f51 100644 (file)
@@ -2520,6 +2520,19 @@ def remove_sync_policy_group(cluster, group, bucket = None):
         assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group, bucket)
     return json.loads(result_json)
 
+def remove_sync_flow(cluster, group, flow, type, source=None, dest=None, bucket = None):
+    cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id', flow, '--flow-type', type]
+    if source:
+        cmd += ['--source-zone', source]
+    if dest:
+        cmd += ['--dest-zone', dest]
+    if bucket:
+        cmd += ['--bucket', bucket]
+    (result_json, retcode) = cluster.admin(cmd)
+    if retcode != 0:
+        assert False, 'failed to remove sync flow id=%s, bucket=%s' % (flow, bucket)
+    return json.loads(result_json)
+
 def create_sync_group_flow_symmetrical(cluster, group, flow_id, zones, bucket = None):
     cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical', '--zones=%s' % zones]
     if bucket:
@@ -2613,191 +2626,160 @@ def check_object_not_exists(bucket, objname):
 def check_objects_not_exist(bucket, obj_arr):
     for objname in obj_arr:
         check_object_not_exists(bucket, objname)
-        
-@attr('sync_policy')
-def test_bucket_delete_with_bucket_sync_policy_directional():
 
+@attr('fails_with_rgw')
+@attr('sync_policy')
+def test_sync_policy_config_zonegroup():
+    """
+    test_sync_policy_config_zonegroup:
+        test configuration of all sync commands
+    """
     zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-
     zonegroup_meta_checkpoint(zonegroup)
 
-    (zoneA, zoneB) = zonegroup.zones[0:2]
-    (zcA, zcB) = zonegroup_conns.zones[0:2]
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    z1, z2 = zonegroup.zones[0:2]
+    c1, c2 = (z1.cluster, z2.cluster)
 
-    c1 = zoneA.cluster
+    zones = z1.name+","+z2.name
 
-    # configure sync policy
-    zones = zoneA.name + ',' + zoneB.name
     c1.admin(['sync', 'policy', 'get'])
+
+    # (a) zonegroup level
     create_sync_policy_group(c1, "sync-group")
-    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
-    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
-    set_sync_policy_group_status(c1, "sync-group", "allowed")
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+    get_sync_policy_group(c1, "sync-group")
 
-    zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    """
-        configure policy at bucketA level with src and dest
-        zones specified to zoneA and zoneB resp.
-
-        verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
-    """
-
-    # configure sync policy for only bucketA and enable it
-    bucketA = create_zone_bucket(zcA)
-    buckets = []
-    buckets.append(bucketA)
-    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
-    create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name)
-    #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
-    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name)
-    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
-
-    get_sync_policy(c1, bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
 
-    zonegroup_meta_checkpoint(zonegroup)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    get_sync_policy_group(c1, "sync-group")
 
-    # create bucketA and objects in zoneA and zoneB
-    objnameA = 'a'
-    objnameB = 'b'
+    zonegroup.period.update(z1, commit=True)
 
-    # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    # (b)  bucket level
+    zc1, zc2 = zonegroup_conns.zones[0:2]
+    bucket = create_zone_bucket(zc1)
+    bucket_name = bucket.name
 
-    zonegroup_meta_checkpoint(zonegroup)
-    zone_data_checkpoint(zoneB, zoneA)
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name)
+    get_sync_policy_group(c1, "sync-bucket", bucket_name)
 
-    # verify that objnameA is synced to bucketA in zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnameA)
+    get_sync_policy(c1, bucket_name)
 
-    # verify that objnameB is not synced to bucketA in zoneA
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnameB)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+    create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
 
-    log.debug('deleting object on zone A')
-    k = get_key(zcA, bucket, objnameA)
-    k.delete()
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name)
+    get_sync_policy_group(c1, "sync-bucket", bucket_name)
 
-    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zonegroup_meta_checkpoint(zonegroup)
 
-    # delete bucket on zoneA. it should fail to delete
-    log.debug('deleting bucket')
-    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+    remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name)
+    remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+    remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+    remove_sync_policy_group(c1, "sync-bucket", bucket_name)
 
-    assert check_all_buckets_exist(zcA, buckets)
-    assert check_all_buckets_exist(zcB, buckets)
+    get_sync_policy(c1, bucket_name)
 
-    log.debug('deleting object on zone B')
-    k = get_key(zcB, bucket, objnameB)
-    k.delete()
-    time.sleep(config.checkpoint_delay)
+    zonegroup_meta_checkpoint(zonegroup)
 
-    # retry deleting bucket after removing the object from zone B. should succeed
-    log.debug('retry deleting bucket')
-    zcA.delete_bucket(bucketA.name)
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
+    remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1")
+    remove_sync_policy_group(c1, "sync-group")
 
-    zonegroup_meta_checkpoint(zonegroup)
+    get_sync_policy(c1)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
+    zonegroup.period.update(z1, commit=True)
 
     return
 
+@attr('fails_with_rgw')
 @attr('sync_policy')
-def test_bucket_delete_with_bucket_sync_policy_symmetric():
+def test_sync_flow_symmetrical_zonegroup_all():
+    """
+    test_sync_flow_symmetrical_zonegroup_all:
+        allows sync from all the zones to all other zones (default case)
+    """
 
     zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-
     zonegroup_meta_checkpoint(zonegroup)
 
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
     (zoneA, zoneB) = zonegroup.zones[0:2]
     (zcA, zcB) = zonegroup_conns.zones[0:2]
 
     c1 = zoneA.cluster
 
-    # configure sync policy
-    zones = zoneA.name + ',' + zoneB.name
     c1.admin(['sync', 'policy', 'get'])
+
+    zones = zoneA.name + ',' + zoneB.name
     create_sync_policy_group(c1, "sync-group")
-    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
     create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-    set_sync_policy_group_status(c1, "sync-group", "allowed")
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
 
     zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    """
-        configure symmetrical policy at bucketA level with src and dest
-        zones specified to zoneA and zoneB resp.
-    """
+    objnames = [ 'obj1', 'obj2' ]
+    content = 'asdasd'
+    buckets = []
 
-    # configure sync policy for only bucketA and enable it
+    # create bucket & object in all zones
     bucketA = create_zone_bucket(zcA)
-    buckets = []
     buckets.append(bucketA)
-    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
-    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
-    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name)
-    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
-
-    get_sync_policy(c1, bucketA.name)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    # create bucketA and objects in zoneA and zoneB
-    objnameA = 'a'
-    objnameB = 'b'
+    create_object(zcA, bucketA, objnames[0], content)
 
-    # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    bucketB = create_zone_bucket(zcB)
+    buckets.append(bucketB)
+    create_object(zcB, bucketB, objnames[1], content)
 
     zonegroup_meta_checkpoint(zonegroup)
+    # 'zonegroup_data_checkpoint' currently fails for the zones not
+    # allowed to sync. So as a workaround, data checkpoint is done
+    # for only the ones configured.
     zone_data_checkpoint(zoneB, zoneA)
 
-    log.debug('deleting object A')
-    k = get_key(zcA, bucketA, objnameA)
-    k.delete()
-
-    log.debug('deleting object B')
-    k = get_key(zcA, bucketA, objnameB)
-    k.delete()
-
-    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
-    zone_data_checkpoint(zoneB, zoneA)
+    # verify if objects are synced accross the zone
+    bucket = get_bucket(zcB, bucketA.name)
+    check_object_exists(bucket, objnames[0], content)
 
-    # delete bucket on zoneA.
-    log.debug('deleting bucket')
-    zcA.delete_bucket(bucketA.name)
-    zonegroup_meta_checkpoint(zonegroup)
+    bucket = get_bucket(zcA, bucketB.name)
+    check_object_exists(bucket, objnames[1], content)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
+    remove_sync_policy_group(c1, "sync-group")
     return
 
+@attr('fails_with_rgw')
 @attr('sync_policy')
-def test_bucket_delete_with_zonegroup_sync_policy_symmetric():
+def test_sync_flow_symmetrical_zonegroup_select():
+    """
+    test_sync_flow_symmetrical_zonegroup_select:
+        allow sync between zoneA & zoneB
+        verify zoneC doesnt sync the data
+    """
 
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
 
+    if len(zonegroup.zones) < 3:
+        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
     zonegroup_meta_checkpoint(zonegroup)
 
-    (zoneA, zoneB) = zonegroup.zones[0:2]
-    (zcA, zcB) = zonegroup_conns.zones[0:2]
+    (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+    (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
 
     c1 = zoneA.cluster
 
-    # configure symmetric sync policy
+    # configure sync policy
     zones = zoneA.name + ',' + zoneB.name
     c1.admin(['sync', 'policy', 'get'])
     create_sync_policy_group(c1, "sync-group")
@@ -2808,57 +2790,63 @@ def test_bucket_delete_with_zonegroup_sync_policy_symmetric():
     zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    # configure sync policy for only bucketA and enable it
-    bucketA = create_zone_bucket(zcA)
     buckets = []
-    buckets.append(bucketA)
-
-    time.sleep(config.checkpoint_delay)
-    zonegroup_meta_checkpoint(zonegroup)
+    content = 'asdasd'
 
-    # create bucketA and objects in zoneA and zoneB
-    objnameA = 'a'
-    objnameB = 'b'
+    # create bucketA & objects in zoneA
+    objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    create_objects(zcA, bucketA, objnamesA, content)
 
-    # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    # create bucketB & objects in zoneB
+    objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+    bucketB = create_zone_bucket(zcB)
+    buckets.append(bucketB)
+    create_objects(zcB, bucketB, objnamesB, content)
 
     zonegroup_meta_checkpoint(zonegroup)
     zone_data_checkpoint(zoneB, zoneA)
+    zone_data_checkpoint(zoneA, zoneB)
 
-    log.debug('deleting object A')
-    k = get_key(zcA, bucketA, objnameA)
-    k.delete()
+    # verify if objnamesA synced to only zoneB but not zoneC
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesA, content)
 
-    log.debug('deleting object B')
-    k = get_key(zcA, bucketA, objnameB)
-    k.delete()
+    bucket = get_bucket(zcC, bucketA.name)
+    check_objects_not_exist(bucket, objnamesA)
 
-    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
-    zone_data_checkpoint(zoneB, zoneA)
+    # verify if objnamesB synced to only zoneA but not zoneC
+    bucket = get_bucket(zcA, bucketB.name)
+    check_objects_exist(bucket, objnamesB, content)
 
-    # delete bucket on zoneA.
-    log.debug('deleting bucket')
-    zcA.delete_bucket(bucketA.name)
-    zonegroup_meta_checkpoint(zonegroup)
+    bucket = get_bucket(zcC, bucketB.name)
+    check_objects_not_exist(bucket, objnamesB)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
+    remove_sync_policy_group(c1, "sync-group")
     return
 
+@attr('fails_with_rgw')
 @attr('sync_policy')
-def test_bucket_delete_with_zonegroup_sync_policy_directional():
+def test_sync_flow_directional_zonegroup_select():
+    """
+    test_sync_flow_directional_zonegroup_select:
+        allow sync from only zoneA to zoneB
+        
+        verify that data doesn't get synced to zoneC and
+        zoneA shouldn't sync data from zoneB either
+    """
 
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
 
+    if len(zonegroup.zones) < 3:
+        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
     zonegroup_meta_checkpoint(zonegroup)
 
-    (zoneA, zoneB) = zonegroup.zones[0:2]
-    (zcA, zcB) = zonegroup_conns.zones[0:2]
+    (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+    (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
 
     c1 = zoneA.cluster
 
@@ -2866,149 +2854,106 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional():
     zones = zoneA.name + ',' + zoneB.name
     c1.admin(['sync', 'policy', 'get'])
     create_sync_policy_group(c1, "sync-group")
-    create_sync_group_flow_directional(c1, "sync-group", "sync-flow1", zoneA.name, zoneB.name)
+    create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
     create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
     set_sync_policy_group_status(c1, "sync-group", "enabled")
 
     zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    # configure sync policy for only bucketA and enable it
-    bucketA = create_zone_bucket(zcA)
     buckets = []
-    buckets.append(bucketA)
-
-    time.sleep(config.checkpoint_delay)
-    zonegroup_meta_checkpoint(zonegroup)
+    content = 'asdasd'
 
-    # create bucketA and objects in zoneA and zoneB
-    objnameA = 'a'
-    objnameB = 'b'
+    # create bucketA & objects in zoneA
+    objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+    bucketA = create_zone_bucket(zcA)
+    buckets.append(bucketA)
+    create_objects(zcA, bucketA, objnamesA, content)
 
-    # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    # create bucketB & objects in zoneB
+    objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+    bucketB = create_zone_bucket(zcB)
+    buckets.append(bucketB)
+    create_objects(zcB, bucketB, objnamesB, content)
 
     zonegroup_meta_checkpoint(zonegroup)
     zone_data_checkpoint(zoneB, zoneA)
 
-    # verify that objnameA is synced to bucketA in zoneB
+    # verify if objnamesA synced to only zoneB but not zoneC
     bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnameA)
-
-    # verify that objnameB is not synced to bucketA in zoneA
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnameB)
-
-    log.debug('deleting object on zone A')
-    k = get_key(zcA, bucket, objnameA)
-    k.delete()
-
-    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
-
-    # delete bucket on zoneA. it should fail to delete
-    log.debug('deleting bucket')
-    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
-
-    assert check_all_buckets_exist(zcA, buckets)
-    assert check_all_buckets_exist(zcB, buckets)
-
-    # retry deleting bucket after removing the object from zone B. should succeed
-    log.debug('deleting object on zone B')
-    k = get_key(zcB, bucket, objnameB)
-    k.delete()
-    time.sleep(config.checkpoint_delay)
-
-    log.debug('retry deleting bucket')
-    zcA.delete_bucket(bucketA.name)
+    check_objects_exist(bucket, objnamesA, content)
 
-    zonegroup_meta_checkpoint(zonegroup)
+    bucket = get_bucket(zcC, bucketA.name)
+    check_objects_not_exist(bucket, objnamesA)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
+    # verify if objnamesB are not synced to either zoneA or zoneC
+    bucket = get_bucket(zcA, bucketB.name)
+    check_objects_not_exist(bucket, objnamesB)
 
-    return
+    bucket = get_bucket(zcC, bucketB.name)
+    check_objects_not_exist(bucket, objnamesB)
 
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_policy_config_zonegroup():
-    """
-    test_sync_policy_config_zonegroup:
-        test configuration of all sync commands
     """
-    zonegroup = realm.master_zonegroup()
-    zonegroup_meta_checkpoint(zonegroup)
-
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    z1, z2 = zonegroup.zones[0:2]
-    c1, c2 = (z1.cluster, z2.cluster)
-
-    zones = z1.name+","+z2.name
-
-    c1.admin(['sync', 'policy', 'get'])
-
-    # (a) zonegroup level
-    create_sync_policy_group(c1, "sync-group")
-    set_sync_policy_group_status(c1, "sync-group", "enabled")
-    get_sync_policy_group(c1, "sync-group")
-
-    get_sync_policy(c1)
+        verify the same at bucketA level
+        configure another policy at bucketA level with src and dest
+        zones specified to zoneA and zoneB resp.
 
+        verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
+    """
+    # reconfigure zonegroup pipe & flow
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
     create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
-    create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
-
     create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-    get_sync_policy_group(c1, "sync-group")
-
-    zonegroup.period.update(z1, commit=True)
-
-    # (b)  bucket level
-    zc1, zc2 = zonegroup_conns.zones[0:2]
-    bucket = create_zone_bucket(zc1)
-    bucket_name = bucket.name
 
-    create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name)
-    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name)
-    get_sync_policy_group(c1, "sync-bucket", bucket_name)
+    # change state to allowed
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
 
-    get_sync_policy(c1, bucket_name)
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
 
-    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
-    create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+    # configure sync policy for only bucketA and enable it
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+    args = ['--source-bucket=*', '--dest-bucket=*']
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
 
-    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name)
-    get_sync_policy_group(c1, "sync-bucket", bucket_name)
+    get_sync_policy(c1, bucketA.name)
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name)
-    remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
-    remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
-    remove_sync_policy_group(c1, "sync-bucket", bucket_name)
-
-    get_sync_policy(c1, bucket_name)
+    # create objects in bucketA in zoneA and zoneB
+    objnamesC = [ 'obj7', 'obj8', 'obj9' ]
+    objnamesD = [ 'obj10', 'obj11', 'obj12' ]
+    create_objects(zcA, bucketA, objnamesC, content)
+    create_objects(zcB, bucketA, objnamesD, content)
 
     zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
 
-    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
-    remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
-    remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1")
-    remove_sync_policy_group(c1, "sync-group")
-
-    get_sync_policy(c1)
+    # verify that objnamesC are synced to bucketA in zoneB
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnamesC, content)
 
-    zonegroup.period.update(z1, commit=True)
+    # verify that objnamesD are not synced to bucketA in zoneA
+    bucket = get_bucket(zcA, bucketA.name)
+    check_objects_not_exist(bucket, objnamesD)
 
+    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+    remove_sync_policy_group(c1, "sync-group")
     return
 
 @attr('fails_with_rgw')
 @attr('sync_policy')
-def test_sync_flow_symmetrical_zonegroup_all():
+def test_sync_single_bucket():
     """
-    test_sync_flow_symmetrical_zonegroup_all:
-        allows sync from all the zones to all other zones (default case)
+    test_sync_single_bucket:
+        Allow data sync for only bucketA but not for other buckets via
+        below 2 methods
+
+        (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
+        (b) bucket level: configure policy for bucketA
     """
 
     zonegroup = realm.master_zonegroup()
@@ -3024,312 +2969,63 @@ def test_sync_flow_symmetrical_zonegroup_all():
     c1.admin(['sync', 'policy', 'get'])
 
     zones = zoneA.name + ',' + zoneB.name
-    create_sync_policy_group(c1, "sync-group")
-    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
-    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-    set_sync_policy_group_status(c1, "sync-group", "enabled")
-
-    zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    objnames = [ 'obj1', 'obj2' ]
+    objnames = [ 'obj1', 'obj2', 'obj3' ]
     content = 'asdasd'
     buckets = []
 
-    # create bucket & object in all zones
+    # create bucketA & bucketB in zoneA
     bucketA = create_zone_bucket(zcA)
     buckets.append(bucketA)
-    create_object(zcA, bucketA, objnames[0], content)
-
-    bucketB = create_zone_bucket(zcB)
+    bucketB = create_zone_bucket(zcA)
     buckets.append(bucketB)
-    create_object(zcB, bucketB, objnames[1], content)
 
     zonegroup_meta_checkpoint(zonegroup)
-    # 'zonegroup_data_checkpoint' currently fails for the zones not
-    # allowed to sync. So as a workaround, data checkpoint is done
-    # for only the ones configured.
+
+    """
+        Method (a): configure pipe for only bucketA
+    """
+    # configure sync policy & pipe for only bucketA
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+    args = ['--source-bucket=' +  bucketA.name, '--dest-bucket=' + bucketA.name]
+
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+    get_sync_policy(c1)
+    zonegroup.period.update(zoneA, commit=True)
+    
+    sync_info(c1)
+
+    # create objects in bucketA & bucketB
+    create_objects(zcA, bucketA, objnames, content)
+    create_object(zcA, bucketB, objnames, content)
+
+    zonegroup_meta_checkpoint(zonegroup)
     zone_data_checkpoint(zoneB, zoneA)
 
-    # verify if objects are synced accross the zone
+    # verify if bucketA objects are synced
     bucket = get_bucket(zcB, bucketA.name)
-    check_object_exists(bucket, objnames[0], content)
+    check_objects_exist(bucket, objnames, content)
 
-    bucket = get_bucket(zcA, bucketB.name)
-    check_object_exists(bucket, objnames[1], content)
+    # bucketB objects should not be synced
+    bucket = get_bucket(zcB, bucketB.name)
+    check_objects_not_exist(bucket, objnames)
 
-    remove_sync_policy_group(c1, "sync-group")
-    return
 
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_flow_symmetrical_zonegroup_select():
     """
-    test_sync_flow_symmetrical_zonegroup_select:
-        allow sync between zoneA & zoneB
-        verify zoneC doesnt sync the data
+        Method (b): configure policy at only bucketA level 
     """
+    # reconfigure group pipe
+    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
 
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
+    # change state to allowed
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
 
-    if len(zonegroup.zones) < 3:
-        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
-    (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
-
-    c1 = zoneA.cluster
-
-    # configure sync policy
-    zones = zoneA.name + ',' + zoneB.name
-    c1.admin(['sync', 'policy', 'get'])
-    create_sync_policy_group(c1, "sync-group")
-    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
-    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-    set_sync_policy_group_status(c1, "sync-group", "enabled")
-
-    zonegroup.period.update(zoneA, commit=True)
-    get_sync_policy(c1)
-
-    buckets = []
-    content = 'asdasd'
-
-    # create bucketA & objects in zoneA
-    objnamesA = [ 'obj1', 'obj2', 'obj3' ]
-    bucketA = create_zone_bucket(zcA)
-    buckets.append(bucketA)
-    create_objects(zcA, bucketA, objnamesA, content)
-
-    # create bucketB & objects in zoneB
-    objnamesB = [ 'obj4', 'obj5', 'obj6' ]
-    bucketB = create_zone_bucket(zcB)
-    buckets.append(bucketB)
-    create_objects(zcB, bucketB, objnamesB, content)
-
-    zonegroup_meta_checkpoint(zonegroup)
-    zone_data_checkpoint(zoneB, zoneA)
-    zone_data_checkpoint(zoneA, zoneB)
-
-    # verify if objnamesA synced to only zoneB but not zoneC
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesA, content)
-
-    bucket = get_bucket(zcC, bucketA.name)
-    check_objects_not_exist(bucket, objnamesA)
-
-    # verify if objnamesB synced to only zoneA but not zoneC
-    bucket = get_bucket(zcA, bucketB.name)
-    check_objects_exist(bucket, objnamesB, content)
-
-    bucket = get_bucket(zcC, bucketB.name)
-    check_objects_not_exist(bucket, objnamesB)
-
-    remove_sync_policy_group(c1, "sync-group")
-    return
-
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_flow_directional_zonegroup_select():
-    """
-    test_sync_flow_directional_zonegroup_select:
-        allow sync from only zoneA to zoneB
-        
-        verify that data doesn't get synced to zoneC and
-        zoneA shouldn't sync data from zoneB either
-    """
-
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-
-    if len(zonegroup.zones) < 3:
-        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
-    (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
-
-    c1 = zoneA.cluster
-
-    # configure sync policy
-    zones = zoneA.name + ',' + zoneB.name
-    c1.admin(['sync', 'policy', 'get'])
-    create_sync_policy_group(c1, "sync-group")
-    create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
-    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
-    set_sync_policy_group_status(c1, "sync-group", "enabled")
-
-    zonegroup.period.update(zoneA, commit=True)
-    get_sync_policy(c1)
-
-    buckets = []
-    content = 'asdasd'
-
-    # create bucketA & objects in zoneA
-    objnamesA = [ 'obj1', 'obj2', 'obj3' ]
-    bucketA = create_zone_bucket(zcA)
-    buckets.append(bucketA)
-    create_objects(zcA, bucketA, objnamesA, content)
-
-    # create bucketB & objects in zoneB
-    objnamesB = [ 'obj4', 'obj5', 'obj6' ]
-    bucketB = create_zone_bucket(zcB)
-    buckets.append(bucketB)
-    create_objects(zcB, bucketB, objnamesB, content)
-
-    zonegroup_meta_checkpoint(zonegroup)
-    zone_data_checkpoint(zoneB, zoneA)
-
-    # verify if objnamesA synced to only zoneB but not zoneC
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesA, content)
-
-    bucket = get_bucket(zcC, bucketA.name)
-    check_objects_not_exist(bucket, objnamesA)
-
-    # verify if objnamesB are not synced to either zoneA or zoneC
-    bucket = get_bucket(zcA, bucketB.name)
-    check_objects_not_exist(bucket, objnamesB)
-
-    bucket = get_bucket(zcC, bucketB.name)
-    check_objects_not_exist(bucket, objnamesB)
-
-    """
-        verify the same at bucketA level
-        configure another policy at bucketA level with src and dest
-        zones specified to zoneA and zoneB resp.
-
-        verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
-    """
-    # reconfigure zonegroup pipe & flow
-    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
-    remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
-    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
-    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-
-    # change state to allowed
-    set_sync_policy_group_status(c1, "sync-group", "allowed")
-
-    zonegroup.period.update(zoneA, commit=True)
-    get_sync_policy(c1)
-
-    # configure sync policy for only bucketA and enable it
-    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
-    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
-    args = ['--source-bucket=*', '--dest-bucket=*']
-    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args)
-    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
-
-    get_sync_policy(c1, bucketA.name)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    # create objects in bucketA in zoneA and zoneB
-    objnamesC = [ 'obj7', 'obj8', 'obj9' ]
-    objnamesD = [ 'obj10', 'obj11', 'obj12' ]
-    create_objects(zcA, bucketA, objnamesC, content)
-    create_objects(zcB, bucketA, objnamesD, content)
-
-    zonegroup_meta_checkpoint(zonegroup)
-    zone_data_checkpoint(zoneB, zoneA)
-
-    # verify that objnamesC are synced to bucketA in zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesC, content)
-
-    # verify that objnamesD are not synced to bucketA in zoneA
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnamesD)
-
-    remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
-    remove_sync_policy_group(c1, "sync-group")
-    return
-
-@attr('fails_with_rgw')
-@attr('sync_policy')
-def test_sync_single_bucket():
-    """
-    test_sync_single_bucket:
-        Allow data sync for only bucketA but not for other buckets via
-        below 2 methods
-
-        (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
-        (b) bucket level: configure policy for bucketA
-    """
-
-    zonegroup = realm.master_zonegroup()
-    zonegroup_meta_checkpoint(zonegroup)
-
-    zonegroup_conns = ZonegroupConns(zonegroup)
-
-    (zoneA, zoneB) = zonegroup.zones[0:2]
-    (zcA, zcB) = zonegroup_conns.zones[0:2]
-
-    c1 = zoneA.cluster
-
-    c1.admin(['sync', 'policy', 'get'])
-
-    zones = zoneA.name + ',' + zoneB.name
-    get_sync_policy(c1)
-
-    objnames = [ 'obj1', 'obj2', 'obj3' ]
-    content = 'asdasd'
-    buckets = []
-
-    # create bucketA & bucketB in zoneA
-    bucketA = create_zone_bucket(zcA)
-    buckets.append(bucketA)
-    bucketB = create_zone_bucket(zcA)
-    buckets.append(bucketB)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    """
-        Method (a): configure pipe for only bucketA
-    """
-    # configure sync policy & pipe for only bucketA
-    create_sync_policy_group(c1, "sync-group")
-    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
-    args = ['--source-bucket=' +  bucketA.name, '--dest-bucket=' + bucketA.name]
-
-    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args)
-    set_sync_policy_group_status(c1, "sync-group", "enabled")
-    get_sync_policy(c1)
-    zonegroup.period.update(zoneA, commit=True)
-    
-    sync_info(c1)
-
-    # create objects in bucketA & bucketB
-    create_objects(zcA, bucketA, objnames, content)
-    create_object(zcA, bucketB, objnames, content)
-
-    zonegroup_meta_checkpoint(zonegroup)
-    zone_data_checkpoint(zoneB, zoneA)
-
-    # verify if bucketA objects are synced
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnames, content)
-
-    # bucketB objects should not be synced
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_not_exist(bucket, objnames)
-
-
-    """
-        Method (b): configure policy at only bucketA level 
-    """
-    # reconfigure group pipe
-    remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
-    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
-
-    # change state to allowed
-    set_sync_policy_group_status(c1, "sync-group", "allowed")
-
-    zonegroup.period.update(zoneA, commit=True)
-    get_sync_policy(c1)
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
 
 
     # configure sync policy for only bucketA and enable it
@@ -4370,3 +4066,802 @@ def test_bucket_replication_alt_user():
     # check that object exists in destination bucket
     k = get_key(dest, dest_bucket, objname)
     assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+
+@allow_bucket_replication
+def test_bucket_replication_reject_versioning_identical():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    source = zonegroup_conns.non_account_rw_zones[0]
+    dest = zonegroup_conns.non_account_rw_zones[1]
+
+    source_bucket = source.create_bucket(gen_bucket_name())
+    dest_bucket = dest.create_bucket(gen_bucket_name())
+    source.s3_client.put_bucket_versioning(
+        Bucket=source_bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create replication configuration
+    e = assert_raises(ClientError,
+                      source.s3_client.put_bucket_replication,
+                      Bucket=source_bucket.name,
+                      ReplicationConfiguration={
+                          'Role': '',
+                          'Rules': [{
+                              'ID': 'rule1',
+                              'Status': 'Enabled',
+                              'Destination': {
+                                  'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+                              }
+                          }]
+                      })
+    assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+
+@allow_bucket_replication
+def test_bucket_replicaion_reject_objectlock_identical():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    source = zonegroup_conns.non_account_rw_zones[0]
+    dest = zonegroup_conns.non_account_rw_zones[1]
+
+    source_bucket = source.create_bucket(gen_bucket_name())
+    dest_bucket_name = gen_bucket_name()
+    dest.s3_client.create_bucket(Bucket=dest_bucket_name, ObjectLockEnabledForBucket=True)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create replication configuration
+    e = assert_raises(ClientError,
+                      source.s3_client.put_bucket_replication,
+                      Bucket=source_bucket.name,
+                      ReplicationConfiguration={
+                          'Role': '',
+                          'Rules': [{
+                              'ID': 'rule1',
+                              'Status': 'Enabled',
+                              'Destination': {
+                                  'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
+                              }
+                          }]
+                      })
+    assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+
+@allow_bucket_replication
+def test_bucket_replication_non_versioned_to_versioned():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    source = zonegroup_conns.non_account_rw_zones[0]
+    dest = zonegroup_conns.non_account_rw_zones[1]
+
+    source_bucket = source.create_bucket(gen_bucket_name())
+    dest_bucket = dest.create_bucket(gen_bucket_name())
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create replication configuration
+    response = source.s3_client.put_bucket_replication(
+        Bucket=source_bucket.name,
+        ReplicationConfiguration={
+            'Role': '',
+            'Rules': [
+                {
+                    'ID': 'rule1',
+                    'Status': 'Enabled',
+                    'Destination': {
+                        'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+                    }
+                }
+            ]
+        }
+    )
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # enable versioning on destination bucket
+    dest.s3_client.put_bucket_versioning(
+        Bucket=dest_bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # upload an object and wait for sync.
+    objname = 'dummy'
+    k = new_key(source, source_bucket, objname)
+    k.set_contents_from_string('foo')
+    zone_data_checkpoint(dest.zone, source.zone)
+
+    # check that object not exists in destination bucket
+    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+    assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_versioned_to_non_versioned():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    source = zonegroup_conns.non_account_rw_zones[0]
+    dest = zonegroup_conns.non_account_rw_zones[1]
+
+    source_bucket = source.create_bucket(gen_bucket_name())
+    dest_bucket = dest.create_bucket(gen_bucket_name())
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create replication configuration
+    response = source.s3_client.put_bucket_replication(
+        Bucket=source_bucket.name,
+        ReplicationConfiguration={
+            'Role': '',
+            'Rules': [
+                {
+                    'ID': 'rule1',
+                    'Status': 'Enabled',
+                    'Destination': {
+                        'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+                    }
+                }
+            ]
+        }
+    )
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # enable versioning on source bucket
+    source.s3_client.put_bucket_versioning(
+        Bucket=source_bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # upload an object and wait for sync.
+    objname = 'dummy'
+    k = new_key(source, source_bucket, objname)
+    k.set_contents_from_string('foo')
+    zone_data_checkpoint(dest.zone, source.zone)
+
+    # check that object not exists in destination bucket
+    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+    assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_lock_enabled_to_lock_disabled():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    source = zonegroup_conns.non_account_rw_zones[0]
+    dest = zonegroup_conns.non_account_rw_zones[1]
+
+    source_bucket_name = gen_bucket_name()
+    source.create_bucket(source_bucket_name)
+    # enabled versioning
+    source.s3_client.put_bucket_versioning(
+        Bucket=source_bucket_name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    dest_bucket = dest.create_bucket(gen_bucket_name())
+    # enabled versioning
+    dest.s3_client.put_bucket_versioning(
+        Bucket=dest_bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create replication configuration
+    source.s3_client.put_bucket_replication(
+        Bucket=source_bucket_name,
+        ReplicationConfiguration={
+            'Role': '',
+            'Rules': [{
+                'ID': 'rule1',
+                'Status': 'Enabled',
+                'Destination': {
+                    'Bucket': f'arn:aws:s3:::{dest_bucket.name}',
+                }
+            }]
+        }
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # enable object lock on source bucket
+    source.s3_client.put_object_lock_configuration(
+        Bucket=source_bucket_name,
+        ObjectLockConfiguration={
+            'ObjectLockEnabled': 'Enabled',
+            'Rule': {
+                'DefaultRetention': {
+                    'Mode': 'GOVERNANCE',
+                    'Days': 1
+                }
+            }
+        }
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # upload an object and wait for sync.
+    objname = 'dummy'
+    k = new_key(source, source_bucket_name, objname)
+    k.set_contents_from_string('foo')
+    zone_data_checkpoint(dest.zone, source.zone)
+
+    # check that object does not exist in destination bucket
+    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname)
+    assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@allow_bucket_replication
+def test_bucket_replication_lock_disabled_to_lock_enabled():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    source = zonegroup_conns.non_account_rw_zones[0]
+    dest = zonegroup_conns.non_account_rw_zones[1]
+
+    source_bucket = source.create_bucket(gen_bucket_name())
+    # enabled versioning
+    source.s3_client.put_bucket_versioning(
+        Bucket=source_bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    dest_bucket_name = gen_bucket_name()
+    dest.create_bucket(dest_bucket_name)
+    # enabled versioning
+    dest.s3_client.put_bucket_versioning(
+        Bucket=dest_bucket_name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create replication configuration
+    source.s3_client.put_bucket_replication(
+        Bucket=source_bucket.name,
+        ReplicationConfiguration={
+            'Role': '',
+            'Rules': [{
+                'ID': 'rule1',
+                'Status': 'Enabled',
+                'Destination': {
+                    'Bucket': f'arn:aws:s3:::{dest_bucket_name}',
+                }
+            }]
+        }
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # enable object lock on destination bucket
+    dest.s3_client.put_object_lock_configuration(
+        Bucket=dest_bucket_name,
+        ObjectLockConfiguration={
+            'ObjectLockEnabled': 'Enabled',
+            'Rule': {
+                'DefaultRetention': {
+                    'Mode': 'GOVERNANCE',
+                    'Days': 1
+                }
+            }
+        }
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # upload an object and wait for sync.
+    objname = 'dummy'
+    k = new_key(source, source_bucket.name, objname)
+    k.set_contents_from_string('foo')
+    zone_data_checkpoint(dest.zone, source.zone)
+
+    # check that object does not exist in destination bucket
+    e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket_name, Key=objname)
+    assert e.response['Error']['Code'] == 'NoSuchKey'
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_zonegroup_sync_policy_directional():
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+    c1 = zoneA.cluster
+
+    # configure sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_directional(c1, "sync-group", "sync-flow1", zoneA.name, zoneB.name)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    # configure sync policy for only bucketA and enable it
+    bucketA = create_zone_bucket(zcA)
+    buckets = []
+    buckets.append(bucketA)
+
+    time.sleep(config.checkpoint_delay)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create bucketA and objects in zoneA and zoneB
+    objnameA = 'a'
+    objnameB = 'b'
+
+    # upload object in each zone and wait for sync.
+    k = new_key(zcA, bucketA, objnameA)
+    k.set_contents_from_string('foo')
+    k = new_key(zcB, bucketA, objnameB)
+    k.set_contents_from_string('foo')
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that objnameA is synced to bucketA in zoneB
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnameA)
+
+    # verify that objnameB is not synced to bucketA in zoneA
+    bucket = get_bucket(zcA, bucketA.name)
+    check_objects_not_exist(bucket, objnameB)
+
+    log.debug('deleting object on zone A')
+    k = get_key(zcA, bucket, objnameA)
+    k.delete()
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+    # delete bucket on zoneA. it should fail to delete
+    log.debug('deleting bucket')
+    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+    assert check_all_buckets_exist(zcA, buckets)
+    assert check_all_buckets_exist(zcB, buckets)
+
+    # retry deleting bucket after removing the object from zone B. should succeed
+    log.debug('deleting object on zone B')
+    k = get_key(zcB, bucket, objnameB)
+    k.delete()
+    time.sleep(config.checkpoint_delay)
+
+    log.debug('retry deleting bucket')
+    zcA.delete_bucket(bucketA.name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    assert check_all_buckets_dont_exist(zcA, buckets)
+    assert check_all_buckets_dont_exist(zcB, buckets)
+
+    remove_sync_policy_group(c1, "sync-group")
+
+    return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_bucket_sync_policy_directional():
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+    c1 = zoneA.cluster
+
+    # configure sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    """
+        configure policy at bucketA level with src and dest
+        zones specified to zoneA and zoneB resp.
+
+        verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
+    """
+
+    # configure sync policy for only bucketA and enable it
+    bucketA = create_zone_bucket(zcA)
+    buckets = []
+    buckets.append(bucketA)
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name)
+    #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1, bucketA.name)
+    time.sleep(config.checkpoint_delay)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create bucketA and objects in zoneA and zoneB
+    objnameA = 'a'
+    objnameB = 'b'
+
+    # upload object in each zone and wait for sync.
+    k = new_key(zcA, bucketA, objnameA)
+    k.set_contents_from_string('foo')
+    k = new_key(zcB, bucketA, objnameB)
+    k.set_contents_from_string('foo')
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that objnameA is synced to bucketA in zoneB
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnameA)
+
+    # verify that objnameB is not synced to bucketA in zoneA
+    bucket = get_bucket(zcA, bucketA.name)
+    check_objects_not_exist(bucket, objnameB)
+
+    log.debug('deleting object on zone A')
+    k = get_key(zcA, bucket, objnameA)
+    k.delete()
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+    # delete bucket on zoneA. it should fail to delete
+    log.debug('deleting bucket')
+    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+    assert check_all_buckets_exist(zcA, buckets)
+    assert check_all_buckets_exist(zcB, buckets)
+
+    log.debug('deleting object on zone B')
+    k = get_key(zcB, bucket, objnameB)
+    k.delete()
+    time.sleep(config.checkpoint_delay)
+
+    # retry deleting bucket after removing the object from zone B. should succeed
+    log.debug('retry deleting bucket')
+    zcA.delete_bucket(bucketA.name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    assert check_all_buckets_dont_exist(zcA, buckets)
+    assert check_all_buckets_dont_exist(zcB, buckets)
+    
+    remove_sync_policy_group(c1, "sync-group")
+
+    return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_bucket_sync_policy_symmetric():
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+    c1 = zoneA.cluster
+
+    # configure sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    """
+        configure symmetrical policy at bucketA level with src and dest
+        zones specified to zoneA and zoneB resp.
+    """
+
+    # configure sync policy for only bucketA and enable it
+    bucketA = create_zone_bucket(zcA)
+    buckets = []
+    buckets.append(bucketA)
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1, bucketA.name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create bucketA and objects in zoneA and zoneB
+    objnameA = 'a'
+    objnameB = 'b'
+
+    # upload object in each zone and wait for sync.
+    k = new_key(zcA, bucketA, objnameA)
+    k.set_contents_from_string('foo')
+    k = new_key(zcB, bucketA, objnameB)
+    k.set_contents_from_string('foo')
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    log.debug('deleting object A')
+    k = get_key(zcA, bucketA, objnameA)
+    k.delete()
+
+    log.debug('deleting object B')
+    k = get_key(zcA, bucketA, objnameB)
+    k.delete()
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # delete bucket on zoneA.
+    log.debug('deleting bucket')
+    zcA.delete_bucket(bucketA.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    assert check_all_buckets_dont_exist(zcA, buckets)
+    assert check_all_buckets_dont_exist(zcB, buckets)
+    
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_zonegroup_sync_policy_symmetric():
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+    c1 = zoneA.cluster
+
+    # configure symmetric sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    # configure sync policy for only bucketA and enable it
+    bucketA = create_zone_bucket(zcA)
+    buckets = []
+    buckets.append(bucketA)
+
+    time.sleep(config.checkpoint_delay)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create bucketA and objects in zoneA and zoneB
+    objnameA = 'a'
+    objnameB = 'b'
+
+    # upload object in each zone and wait for sync.
+    k = new_key(zcA, bucketA, objnameA)
+    k.set_contents_from_string('foo')
+    k = new_key(zcB, bucketA, objnameB)
+    k.set_contents_from_string('foo')
+
+    zone_data_checkpoint(zoneB, zoneA)
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+    log.debug('deleting object A')
+    k = get_key(zcA, bucketA, objnameA)
+    k.delete()
+
+    log.debug('deleting object B')
+    k = get_key(zcA, bucketA, objnameB)
+    k.delete()
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # delete bucket on zoneA.
+    log.debug('deleting bucket')
+    zcA.delete_bucket(bucketA.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    assert check_all_buckets_dont_exist(zcA, buckets)
+    assert check_all_buckets_dont_exist(zcB, buckets)
+
+    remove_sync_flow(c1, "sync-group", "sync-flow", "symmetrical")
+    remove_sync_policy_group(c1, "sync-group")
+    return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_delete_bucket_with_zone_opt_out():
+    """
+    test_delete_bucket_with_zone_opt_out:
+        allow sync between zoneA & zoneB
+        verify zoneC doesnt sync the data
+        test delete bucket
+    """
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    if len(zonegroup.zones) < 3:
+        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+    (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
+
+    c1 = zoneA.cluster
+
+    # configure sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    bucketA = create_zone_bucket(zcA)
+    buckets = []
+    buckets.append(bucketA)
+
+    time.sleep(config.checkpoint_delay)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create bucketA and objects in zoneA and zoneB
+    objnameA = 'a'
+    objnameB = 'b'
+
+    # upload object in zone A and zone C
+    k = new_key(zcA, bucketA, objnameA)
+    k.set_contents_from_string('foo')
+    k = new_key(zcC, bucketA, objnameB)
+    k.set_contents_from_string('foo')
+
+    zonegroup_meta_checkpoint(zonegroup)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # verify that objnameA is synced to zoneB but not zoneC
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_exist(bucket, objnameA)
+
+    bucket = get_bucket(zcC, bucketA.name)
+    check_objects_not_exist(bucket, objnameA)
+    
+    # verify that objnameB is not synced to either zoneA or zoneB
+    bucket = get_bucket(zcA, bucketA.name)
+    check_objects_not_exist(bucket, objnameB)
+
+    bucket = get_bucket(zcB, bucketA.name)
+    check_objects_not_exist(bucket, objnameB)
+
+    log.debug('deleting object on zone A')
+    k = get_key(zcA, bucket, objnameA)
+    k.delete()
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+
+    # delete bucket on zoneA. it should fail to delete because zoneC still has objnameB
+    log.debug('deleting bucket')
+    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+    assert check_all_buckets_exist(zcA, buckets)
+    assert check_all_buckets_exist(zcC, buckets)
+
+    # retry deleting bucket after removing the object from zone C. should succeed
+    log.debug('deleting object on zone C')
+    k = get_key(zcC, bucket, objnameB)
+    k.delete()
+    time.sleep(config.checkpoint_delay)
+
+    log.debug('retry deleting bucket')
+    zcA.delete_bucket(bucketA.name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    assert check_all_buckets_dont_exist(zcA, buckets)
+    assert check_all_buckets_dont_exist(zcC, buckets)
+
+    remove_sync_policy_group(c1, "sync-group")
+    
+    return
+
+@attr('sync_policy')
+@attr('fails_with_rgw')
+def test_bucket_delete_with_sync_policy_object_prefix():
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    (zoneA, zoneB) = zonegroup.zones[0:2]
+    (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+    c1 = zoneA.cluster
+
+    # configure sync policy
+    zones = zoneA.name + ',' + zoneB.name
+    c1.admin(['sync', 'policy', 'get'])
+    create_sync_policy_group(c1, "sync-group")
+    create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+    create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+    set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+    zonegroup.period.update(zoneA, commit=True)
+    get_sync_policy(c1)
+
+    """
+        configure symmetrical policy at bucketA level with src and dest
+        and apply object prefix filtering
+    """
+
+    # configure sync policy for only bucketA with an object prefix filter
+    bucketA = create_zone_bucket(zcA)
+    buckets = []
+    buckets.append(bucketA)
+    create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+    create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
+    args = ['--prefix=test-']
+    create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name, args)
+    set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+    get_sync_policy(c1, bucketA.name)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create bucketA and objects in zoneA and zoneB
+    objnameA = 'test-a'
+    objnameB = 'b'
+
+    # upload object in each zone and wait for sync.
+    k = new_key(zcA, bucketA, objnameA)
+    k.set_contents_from_string('foo')
+    k = new_key(zcB, bucketA, objnameB)
+    k.set_contents_from_string('foo')
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zone_data_checkpoint(zoneB, zoneA)
+    
+    # verify that objnameA is synced to zoneB
+    bucket = get_bucket(zcB, bucketA.name)
+    check_object_exists(bucket, objnameA)
+
+    # verify that objnameB is not synced to zoneA
+    bucket = get_bucket(zcA, bucketA.name)
+    check_object_not_exists(bucket, objnameB)
+
+    log.debug('deleting object A')
+    k = get_key(zcA, bucketA, objnameA)
+    k.delete()
+
+    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zone_data_checkpoint(zoneB, zoneA)
+
+    # delete bucket on zoneA. it should fail to delete because zoneB still has objnameB
+    log.debug('deleting bucket')
+    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+
+    assert check_all_buckets_exist(zcA, buckets)
+    assert check_all_buckets_exist(zcB, buckets)
+
+    remove_sync_policy_group(c1, "sync-group")
+
+    return