From 8fbef97c026cc6e2ea1c423d3ace1249c53296f5 Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Fri, 10 Jan 2025 02:39:47 -0500 Subject: [PATCH] rgw/multisite: account for zones opted out of sync and object filters during bucket deletion Signed-off-by: Shilpa Jagannath --- src/rgw/driver/rados/rgw_rados.cc | 102 +- src/rgw/rgw_sync_policy.cc | 5 + src/rgw/rgw_sync_policy.h | 14 + src/test/rgw/rgw_multi/tests.py | 1573 +++++++++++++++++++---------- 4 files changed, 1123 insertions(+), 571 deletions(-) diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index e85fc4a1c68..ebdea6f6e4a 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -5621,23 +5621,16 @@ int get_zone_ids(const DoutPrefixProvider *dpp, int list_remote_buckets(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* const driver, const rgw_zone_id source_zone, + std::vector zids, const rgw_bucket& bucket, optional_yield y) { - - std::vector zids; - int ret = get_zone_ids(dpp, driver, source_zone, bucket, zids, y); - if (ret < 0) { - ldpp_dout(dpp, 10) << "failed to get remote zones (r=" << ret << ")" << dendl; - return ret; - } - std::vector peer_status; peer_status.resize(zids.size()); RGWCoroutinesManager crs(driver->ctx(), driver->getRados()->get_cr_registry()); RGWHTTPManager http(driver->ctx(), crs.get_completion_mgr()); - ret = http.start(); + int ret = http.start(); if (ret < 0) { ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; return ret; @@ -5657,6 +5650,62 @@ int list_remote_buckets(const DoutPrefixProvider *dpp, return 0; } +int check_remote_bucket_empty(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* const driver, + const rgw_zone_id source_zone, + RGWBucketInfo& bucket_info, + std::map zones, + const rgw_sync_policy_info* zg_sync_policy, + optional_yield y) +{ + std::vector zids; + int ret = get_zone_ids(dpp, driver, source_zone, bucket_info.bucket, zids, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "failed to get remote zones (r=" << ret << ")" << dendl; + return ret; + } + + /* when sync policy is configured to be unidirectional or some zones + opted out of sync or due to object filtering with prefix or tags, + the source and destination buckets can diverge. check for all such + cases, list the buckets in those zones and return enotempty */ + + bool is_zg_policy_directional = false; + if (zg_sync_policy) { + is_zg_policy_directional = zg_sync_policy->is_directional(); + } + + bool is_bucket_policy_directional = false; + bool bucket_has_filter = false; + auto bucket_sync_policy = bucket_info.sync_policy; + if (bucket_sync_policy) { + is_bucket_policy_directional = bucket_sync_policy->is_directional(); + bucket_has_filter = bucket_sync_policy->has_filter(); + } + + if (is_zg_policy_directional || is_bucket_policy_directional || + bucket_has_filter) { + ldpp_dout(dpp, 10) << "sync policy exists. listing remote zones" << dendl; + return list_remote_buckets(dpp, driver, source_zone, zids, bucket_info.bucket, y); + } + + std::vector opt_out_zones; // zones not participating in sync + for (const auto& z : zones) { + if (std::find(zids.begin(), zids.end(), z.first) == zids.end()) { + if (z.first == source_zone) { + continue; + } + opt_out_zones.push_back(z.first); + } + } + + if (!opt_out_zones.empty()) { + ldpp_dout(dpp, 10) << "sync policy exists. listing remote zones" << dendl; + return list_remote_buckets(dpp, driver, source_zone, opt_out_zones, bucket_info.bucket, y); + } + return 0; +} + /** * Delete a bucket. * bucket: the name of the bucket to delete @@ -5678,30 +5727,19 @@ int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, std::mapis_syncing_bucket_meta()) { - // check if asymmetric replication policy exists either at zonegroup or bucket level - auto zg_sync_policy = svc.zone->get_zonegroup().sync_policy; - bool is_zg_policy_directional = zg_sync_policy.is_directional(); - - bool is_bucket_policy_directional = false; - auto bucket_sync_policy = bucket_info.sync_policy; - if (bucket_sync_policy) { - is_bucket_policy_directional = bucket_sync_policy->is_directional(); - } - if (is_zg_policy_directional || is_bucket_policy_directional) { - ldpp_dout(dpp, 10) << "sync policy exists. listing remote zones" << dendl; - const rgw_zone_id source_zone = svc.zone->get_zone_params().get_id(); - r = list_remote_buckets(dpp, driver, source_zone, bucket, y); - if (r == -ENOTEMPTY) { - ldpp_dout(dpp, 0) << "ERROR: cannot delete bucket. objects exist in the bucket on another zone " << dendl; - return r; - } else if (r < 0) { - ldpp_dout(dpp, 10) << "failed to list remote buckets" << dendl; - // don't return. - } + const rgw_zone_id source_zone = svc.zone->get_zone_params().get_id(); + int r = check_remote_bucket_empty(dpp, driver, source_zone, bucket_info, + svc.zone->get_zonegroup().zones, + &svc.zone->get_zonegroup().sync_policy, y); + if (r == -ENOTEMPTY) { + ldpp_dout(dpp, 0) << "ERROR: cannot delete bucket. objects exist in the bucket on another zone " << dendl; + return r; + } else if (r < 0) { + ldpp_dout(dpp, 10) << "failed to list remote buckets" << dendl; + // don't return. } } diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index b65752959e9..129d11a1f8f 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -150,6 +150,11 @@ bool rgw_sync_pipe_filter::has_tags() const return !tags.empty(); } +bool rgw_sync_pipe_filter::has_prefix() const +{ + return prefix.has_value(); +} + bool rgw_sync_pipe_filter::check_tags(const std::vector& _tags) const { if (tags.empty()) { diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index fe720cb6195..78b4eb31ea4 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -240,6 +240,7 @@ struct rgw_sync_pipe_filter { bool is_subset_of(const rgw_sync_pipe_filter& f) const; bool has_tags() const; + bool has_prefix() const; bool check_tag(const std::string& s) const; bool check_tag(const std::string& k, const std::string& v) const; bool check_tags(const std::vector& tags) const; @@ -685,6 +686,19 @@ struct rgw_sync_policy_info { return false; } + bool has_filter() const { + for (auto& item : groups) { + auto& group = item.second; + for (auto& p : group.pipes) { + auto& filter = p.params.source.filter; + if (filter.has_prefix() || filter.has_tags()) { + return true; + } + } + } + return false; + } + void get_potential_related_buckets(const rgw_bucket& bucket, std::set *sources, std::set *dests) const; diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index b914102d137..17d2cc4f80e 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -2520,6 +2520,19 @@ def remove_sync_policy_group(cluster, group, bucket = None): assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group, bucket) return json.loads(result_json) +def remove_sync_flow(cluster, group, flow, type, source=None, dest=None, bucket = None): + cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id', flow, '--flow-type', type] + if source: + cmd += ['--source-zone', source] + if dest: + cmd += ['--dest-zone', dest] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to remove sync flow id=%s, bucket=%s' % (flow, bucket) + return json.loads(result_json) + def create_sync_group_flow_symmetrical(cluster, group, flow_id, zones, bucket = None): cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical', '--zones=%s' % zones] if bucket: @@ -2613,191 +2626,160 @@ def check_object_not_exists(bucket, objname): def check_objects_not_exist(bucket, obj_arr): for objname in obj_arr: check_object_not_exists(bucket, objname) - -@attr('sync_policy') -def test_bucket_delete_with_bucket_sync_policy_directional(): +@attr('fails_with_rgw') +@attr('sync_policy') +def test_sync_policy_config_zonegroup(): + """ + test_sync_policy_config_zonegroup: + test configuration of all sync commands + """ zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - zonegroup_meta_checkpoint(zonegroup) - (zoneA, zoneB) = zonegroup.zones[0:2] - (zcA, zcB) = zonegroup_conns.zones[0:2] + zonegroup_conns = ZonegroupConns(zonegroup) + z1, z2 = zonegroup.zones[0:2] + c1, c2 = (z1.cluster, z2.cluster) - c1 = zoneA.cluster + zones = z1.name+","+z2.name - # configure sync policy - zones = zoneA.name + ',' + zoneB.name c1.admin(['sync', 'policy', 'get']) + + # (a) zonegroup level create_sync_policy_group(c1, "sync-group") - create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name) - set_sync_policy_group_status(c1, "sync-group", "allowed") + set_sync_policy_group_status(c1, "sync-group", "enabled") + get_sync_policy_group(c1, "sync-group") - zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - """ - configure policy at bucketA level with src and dest - zones specified to zoneA and zoneB resp. - - verify zoneA bucketA syncs to zoneB BucketA but not viceversa. - """ - - # configure sync policy for only bucketA and enable it - bucketA = create_zone_bucket(zcA) - buckets = [] - buckets.append(bucketA) - create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) - create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name) - #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name) - create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name) - set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) - - get_sync_policy(c1, bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name) - zonegroup_meta_checkpoint(zonegroup) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + get_sync_policy_group(c1, "sync-group") - # create bucketA and objects in zoneA and zoneB - objnameA = 'a' - objnameB = 'b' + zonegroup.period.update(z1, commit=True) - # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + # (b) bucket level + zc1, zc2 = zonegroup_conns.zones[0:2] + bucket = create_zone_bucket(zc1) + bucket_name = bucket.name - zonegroup_meta_checkpoint(zonegroup) - zone_data_checkpoint(zoneB, zoneA) + create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name) + get_sync_policy_group(c1, "sync-bucket", bucket_name) - # verify that objnameA is synced to bucketA in zoneB - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnameA) + get_sync_policy(c1, bucket_name) - # verify that objnameB is not synced to bucketA in zoneA - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnameB) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name) + create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name) - log.debug('deleting object on zone A') - k = get_key(zcA, bucket, objnameA) - k.delete() + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name) + get_sync_policy_group(c1, "sync-bucket", bucket_name) - zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zonegroup_meta_checkpoint(zonegroup) - # delete bucket on zoneA. it should fail to delete - log.debug('deleting bucket') - assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name) + remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name) + remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name) + remove_sync_policy_group(c1, "sync-bucket", bucket_name) - assert check_all_buckets_exist(zcA, buckets) - assert check_all_buckets_exist(zcB, buckets) + get_sync_policy(c1, bucket_name) - log.debug('deleting object on zone B') - k = get_key(zcB, bucket, objnameB) - k.delete() - time.sleep(config.checkpoint_delay) + zonegroup_meta_checkpoint(zonegroup) - # retry deleting bucket after removing the object from zone B. should succeed - log.debug('retry deleting bucket') - zcA.delete_bucket(bucketA.name) + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name) + remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1") + remove_sync_policy_group(c1, "sync-group") - zonegroup_meta_checkpoint(zonegroup) + get_sync_policy(c1) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) + zonegroup.period.update(z1, commit=True) return +@attr('fails_with_rgw') @attr('sync_policy') -def test_bucket_delete_with_bucket_sync_policy_symmetric(): +def test_sync_flow_symmetrical_zonegroup_all(): + """ + test_sync_flow_symmetrical_zonegroup_all: + allows sync from all the zones to all other zones (default case) + """ zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - zonegroup_meta_checkpoint(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + (zoneA, zoneB) = zonegroup.zones[0:2] (zcA, zcB) = zonegroup_conns.zones[0:2] c1 = zoneA.cluster - # configure sync policy - zones = zoneA.name + ',' + zoneB.name c1.admin(['sync', 'policy', 'get']) + + zones = zoneA.name + ',' + zoneB.name create_sync_policy_group(c1, "sync-group") - create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) - set_sync_policy_group_status(c1, "sync-group", "allowed") + set_sync_policy_group_status(c1, "sync-group", "enabled") zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - """ - configure symmetrical policy at bucketA level with src and dest - zones specified to zoneA and zoneB resp. - """ + objnames = [ 'obj1', 'obj2' ] + content = 'asdasd' + buckets = [] - # configure sync policy for only bucketA and enable it + # create bucket & object in all zones bucketA = create_zone_bucket(zcA) - buckets = [] buckets.append(bucketA) - create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) - create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name) - create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name) - set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) - - get_sync_policy(c1, bucketA.name) - - zonegroup_meta_checkpoint(zonegroup) - - # create bucketA and objects in zoneA and zoneB - objnameA = 'a' - objnameB = 'b' + create_object(zcA, bucketA, objnames[0], content) - # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + bucketB = create_zone_bucket(zcB) + buckets.append(bucketB) + create_object(zcB, bucketB, objnames[1], content) zonegroup_meta_checkpoint(zonegroup) + # 'zonegroup_data_checkpoint' currently fails for the zones not + # allowed to sync. So as a workaround, data checkpoint is done + # for only the ones configured. zone_data_checkpoint(zoneB, zoneA) - log.debug('deleting object A') - k = get_key(zcA, bucketA, objnameA) - k.delete() - - log.debug('deleting object B') - k = get_key(zcA, bucketA, objnameB) - k.delete() - - zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) - zone_data_checkpoint(zoneB, zoneA) + # verify if objects are synced accross the zone + bucket = get_bucket(zcB, bucketA.name) + check_object_exists(bucket, objnames[0], content) - # delete bucket on zoneA. - log.debug('deleting bucket') - zcA.delete_bucket(bucketA.name) - zonegroup_meta_checkpoint(zonegroup) + bucket = get_bucket(zcA, bucketB.name) + check_object_exists(bucket, objnames[1], content) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) + remove_sync_policy_group(c1, "sync-group") return +@attr('fails_with_rgw') @attr('sync_policy') -def test_bucket_delete_with_zonegroup_sync_policy_symmetric(): +def test_sync_flow_symmetrical_zonegroup_select(): + """ + test_sync_flow_symmetrical_zonegroup_select: + allow sync between zoneA & zoneB + verify zoneC doesnt sync the data + """ zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) + if len(zonegroup.zones) < 3: + raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") + zonegroup_meta_checkpoint(zonegroup) - (zoneA, zoneB) = zonegroup.zones[0:2] - (zcA, zcB) = zonegroup_conns.zones[0:2] + (zoneA, zoneB, zoneC) = zonegroup.zones[0:3] + (zcA, zcB, zcC) = zonegroup_conns.zones[0:3] c1 = zoneA.cluster - # configure symmetric sync policy + # configure sync policy zones = zoneA.name + ',' + zoneB.name c1.admin(['sync', 'policy', 'get']) create_sync_policy_group(c1, "sync-group") @@ -2808,57 +2790,63 @@ def test_bucket_delete_with_zonegroup_sync_policy_symmetric(): zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - # configure sync policy for only bucketA and enable it - bucketA = create_zone_bucket(zcA) buckets = [] - buckets.append(bucketA) - - time.sleep(config.checkpoint_delay) - zonegroup_meta_checkpoint(zonegroup) + content = 'asdasd' - # create bucketA and objects in zoneA and zoneB - objnameA = 'a' - objnameB = 'b' + # create bucketA & objects in zoneA + objnamesA = [ 'obj1', 'obj2', 'obj3' ] + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + create_objects(zcA, bucketA, objnamesA, content) - # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + # create bucketB & objects in zoneB + objnamesB = [ 'obj4', 'obj5', 'obj6' ] + bucketB = create_zone_bucket(zcB) + buckets.append(bucketB) + create_objects(zcB, bucketB, objnamesB, content) zonegroup_meta_checkpoint(zonegroup) zone_data_checkpoint(zoneB, zoneA) + zone_data_checkpoint(zoneA, zoneB) - log.debug('deleting object A') - k = get_key(zcA, bucketA, objnameA) - k.delete() + # verify if objnamesA synced to only zoneB but not zoneC + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesA, content) - log.debug('deleting object B') - k = get_key(zcA, bucketA, objnameB) - k.delete() + bucket = get_bucket(zcC, bucketA.name) + check_objects_not_exist(bucket, objnamesA) - zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) - zone_data_checkpoint(zoneB, zoneA) + # verify if objnamesB synced to only zoneA but not zoneC + bucket = get_bucket(zcA, bucketB.name) + check_objects_exist(bucket, objnamesB, content) - # delete bucket on zoneA. - log.debug('deleting bucket') - zcA.delete_bucket(bucketA.name) - zonegroup_meta_checkpoint(zonegroup) + bucket = get_bucket(zcC, bucketB.name) + check_objects_not_exist(bucket, objnamesB) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) + remove_sync_policy_group(c1, "sync-group") return +@attr('fails_with_rgw') @attr('sync_policy') -def test_bucket_delete_with_zonegroup_sync_policy_directional(): +def test_sync_flow_directional_zonegroup_select(): + """ + test_sync_flow_directional_zonegroup_select: + allow sync from only zoneA to zoneB + + verify that data doesn't get synced to zoneC and + zoneA shouldn't sync data from zoneB either + """ zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) + if len(zonegroup.zones) < 3: + raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") + zonegroup_meta_checkpoint(zonegroup) - (zoneA, zoneB) = zonegroup.zones[0:2] - (zcA, zcB) = zonegroup_conns.zones[0:2] + (zoneA, zoneB, zoneC) = zonegroup.zones[0:3] + (zcA, zcB, zcC) = zonegroup_conns.zones[0:3] c1 = zoneA.cluster @@ -2866,149 +2854,106 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional(): zones = zoneA.name + ',' + zoneB.name c1.admin(['sync', 'policy', 'get']) create_sync_policy_group(c1, "sync-group") - create_sync_group_flow_directional(c1, "sync-group", "sync-flow1", zoneA.name, zoneB.name) + create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name) set_sync_policy_group_status(c1, "sync-group", "enabled") zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - # configure sync policy for only bucketA and enable it - bucketA = create_zone_bucket(zcA) buckets = [] - buckets.append(bucketA) - - time.sleep(config.checkpoint_delay) - zonegroup_meta_checkpoint(zonegroup) + content = 'asdasd' - # create bucketA and objects in zoneA and zoneB - objnameA = 'a' - objnameB = 'b' + # create bucketA & objects in zoneA + objnamesA = [ 'obj1', 'obj2', 'obj3' ] + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + create_objects(zcA, bucketA, objnamesA, content) - # upload object in each zone and wait for sync. - k = new_key(zcA, bucketA, objnameA) - k.set_contents_from_string('foo') - k = new_key(zcB, bucketA, objnameB) - k.set_contents_from_string('foo') + # create bucketB & objects in zoneB + objnamesB = [ 'obj4', 'obj5', 'obj6' ] + bucketB = create_zone_bucket(zcB) + buckets.append(bucketB) + create_objects(zcB, bucketB, objnamesB, content) zonegroup_meta_checkpoint(zonegroup) zone_data_checkpoint(zoneB, zoneA) - # verify that objnameA is synced to bucketA in zoneB + # verify if objnamesA synced to only zoneB but not zoneC bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnameA) - - # verify that objnameB is not synced to bucketA in zoneA - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnameB) - - log.debug('deleting object on zone A') - k = get_key(zcA, bucket, objnameA) - k.delete() - - zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) - - # delete bucket on zoneA. it should fail to delete - log.debug('deleting bucket') - assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) - - assert check_all_buckets_exist(zcA, buckets) - assert check_all_buckets_exist(zcB, buckets) - - # retry deleting bucket after removing the object from zone B. should succeed - log.debug('deleting object on zone B') - k = get_key(zcB, bucket, objnameB) - k.delete() - time.sleep(config.checkpoint_delay) - - log.debug('retry deleting bucket') - zcA.delete_bucket(bucketA.name) + check_objects_exist(bucket, objnamesA, content) - zonegroup_meta_checkpoint(zonegroup) + bucket = get_bucket(zcC, bucketA.name) + check_objects_not_exist(bucket, objnamesA) - assert check_all_buckets_dont_exist(zcA, buckets) - assert check_all_buckets_dont_exist(zcB, buckets) + # verify if objnamesB are not synced to either zoneA or zoneC + bucket = get_bucket(zcA, bucketB.name) + check_objects_not_exist(bucket, objnamesB) - return + bucket = get_bucket(zcC, bucketB.name) + check_objects_not_exist(bucket, objnamesB) -@attr('fails_with_rgw') -@attr('sync_policy') -def test_sync_policy_config_zonegroup(): - """ - test_sync_policy_config_zonegroup: - test configuration of all sync commands """ - zonegroup = realm.master_zonegroup() - zonegroup_meta_checkpoint(zonegroup) - - zonegroup_conns = ZonegroupConns(zonegroup) - z1, z2 = zonegroup.zones[0:2] - c1, c2 = (z1.cluster, z2.cluster) - - zones = z1.name+","+z2.name - - c1.admin(['sync', 'policy', 'get']) - - # (a) zonegroup level - create_sync_policy_group(c1, "sync-group") - set_sync_policy_group_status(c1, "sync-group", "enabled") - get_sync_policy_group(c1, "sync-group") - - get_sync_policy(c1) + verify the same at bucketA level + configure another policy at bucketA level with src and dest + zones specified to zoneA and zoneB resp. + verify zoneA bucketA syncs to zoneB BucketA but not viceversa. + """ + # reconfigure zonegroup pipe & flow + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) - create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name) - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) - get_sync_policy_group(c1, "sync-group") - - zonegroup.period.update(z1, commit=True) - - # (b) bucket level - zc1, zc2 = zonegroup_conns.zones[0:2] - bucket = create_zone_bucket(zc1) - bucket_name = bucket.name - create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name) - set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name) - get_sync_policy_group(c1, "sync-bucket", bucket_name) + # change state to allowed + set_sync_policy_group_status(c1, "sync-group", "allowed") - get_sync_policy(c1, bucket_name) + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) - create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name) - create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name) + # configure sync policy for only bucketA and enable it + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) + args = ['--source-bucket=*', '--dest-bucket=*'] + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) - create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name) - get_sync_policy_group(c1, "sync-bucket", bucket_name) + get_sync_policy(c1, bucketA.name) zonegroup_meta_checkpoint(zonegroup) - remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name) - remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name) - remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name) - remove_sync_policy_group(c1, "sync-bucket", bucket_name) - - get_sync_policy(c1, bucket_name) + # create objects in bucketA in zoneA and zoneB + objnamesC = [ 'obj7', 'obj8', 'obj9' ] + objnamesD = [ 'obj10', 'obj11', 'obj12' ] + create_objects(zcA, bucketA, objnamesC, content) + create_objects(zcB, bucketA, objnamesD, content) zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) - remove_sync_group_pipe(c1, "sync-group", "sync-pipe") - remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name) - remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1") - remove_sync_policy_group(c1, "sync-group") - - get_sync_policy(c1) + # verify that objnamesC are synced to bucketA in zoneB + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesC, content) - zonegroup.period.update(z1, commit=True) + # verify that objnamesD are not synced to bucketA in zoneA + bucket = get_bucket(zcA, bucketA.name) + check_objects_not_exist(bucket, objnamesD) + remove_sync_policy_group(c1, "sync-bucket", bucketA.name) + remove_sync_policy_group(c1, "sync-group") return @attr('fails_with_rgw') @attr('sync_policy') -def test_sync_flow_symmetrical_zonegroup_all(): +def test_sync_single_bucket(): """ - test_sync_flow_symmetrical_zonegroup_all: - allows sync from all the zones to all other zones (default case) + test_sync_single_bucket: + Allow data sync for only bucketA but not for other buckets via + below 2 methods + + (a) zonegroup: symmetrical flow but configure pipe for only bucketA. + (b) bucket level: configure policy for bucketA """ zonegroup = realm.master_zonegroup() @@ -3024,312 +2969,63 @@ def test_sync_flow_symmetrical_zonegroup_all(): c1.admin(['sync', 'policy', 'get']) zones = zoneA.name + ',' + zoneB.name - create_sync_policy_group(c1, "sync-group") - create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) - set_sync_policy_group_status(c1, "sync-group", "enabled") - - zonegroup.period.update(zoneA, commit=True) get_sync_policy(c1) - objnames = [ 'obj1', 'obj2' ] + objnames = [ 'obj1', 'obj2', 'obj3' ] content = 'asdasd' buckets = [] - # create bucket & object in all zones + # create bucketA & bucketB in zoneA bucketA = create_zone_bucket(zcA) buckets.append(bucketA) - create_object(zcA, bucketA, objnames[0], content) - - bucketB = create_zone_bucket(zcB) + bucketB = create_zone_bucket(zcA) buckets.append(bucketB) - create_object(zcB, bucketB, objnames[1], content) zonegroup_meta_checkpoint(zonegroup) - # 'zonegroup_data_checkpoint' currently fails for the zones not - # allowed to sync. So as a workaround, data checkpoint is done - # for only the ones configured. + + """ + Method (a): configure pipe for only bucketA + """ + # configure sync policy & pipe for only bucketA + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketA.name] + + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args) + set_sync_policy_group_status(c1, "sync-group", "enabled") + get_sync_policy(c1) + zonegroup.period.update(zoneA, commit=True) + + sync_info(c1) + + # create objects in bucketA & bucketB + create_objects(zcA, bucketA, objnames, content) + create_object(zcA, bucketB, objnames, content) + + zonegroup_meta_checkpoint(zonegroup) zone_data_checkpoint(zoneB, zoneA) - # verify if objects are synced accross the zone + # verify if bucketA objects are synced bucket = get_bucket(zcB, bucketA.name) - check_object_exists(bucket, objnames[0], content) + check_objects_exist(bucket, objnames, content) - bucket = get_bucket(zcA, bucketB.name) - check_object_exists(bucket, objnames[1], content) + # bucketB objects should not be synced + bucket = get_bucket(zcB, bucketB.name) + check_objects_not_exist(bucket, objnames) - remove_sync_policy_group(c1, "sync-group") - return -@attr('fails_with_rgw') -@attr('sync_policy') -def test_sync_flow_symmetrical_zonegroup_select(): """ - test_sync_flow_symmetrical_zonegroup_select: - allow sync between zoneA & zoneB - verify zoneC doesnt sync the data + Method (b): configure policy at only bucketA level """ + # reconfigure group pipe + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) + # change state to allowed + set_sync_policy_group_status(c1, "sync-group", "allowed") - if len(zonegroup.zones) < 3: - raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") - - zonegroup_meta_checkpoint(zonegroup) - - (zoneA, zoneB, zoneC) = zonegroup.zones[0:3] - (zcA, zcB, zcC) = zonegroup_conns.zones[0:3] - - c1 = zoneA.cluster - - # configure sync policy - zones = zoneA.name + ',' + zoneB.name - c1.admin(['sync', 'policy', 'get']) - create_sync_policy_group(c1, "sync-group") - create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) - set_sync_policy_group_status(c1, "sync-group", "enabled") - - zonegroup.period.update(zoneA, commit=True) - get_sync_policy(c1) - - buckets = [] - content = 'asdasd' - - # create bucketA & objects in zoneA - objnamesA = [ 'obj1', 'obj2', 'obj3' ] - bucketA = create_zone_bucket(zcA) - buckets.append(bucketA) - create_objects(zcA, bucketA, objnamesA, content) - - # create bucketB & objects in zoneB - objnamesB = [ 'obj4', 'obj5', 'obj6' ] - bucketB = create_zone_bucket(zcB) - buckets.append(bucketB) - create_objects(zcB, bucketB, objnamesB, content) - - zonegroup_meta_checkpoint(zonegroup) - zone_data_checkpoint(zoneB, zoneA) - zone_data_checkpoint(zoneA, zoneB) - - # verify if objnamesA synced to only zoneB but not zoneC - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesA, content) - - bucket = get_bucket(zcC, bucketA.name) - check_objects_not_exist(bucket, objnamesA) - - # verify if objnamesB synced to only zoneA but not zoneC - bucket = get_bucket(zcA, bucketB.name) - check_objects_exist(bucket, objnamesB, content) - - bucket = get_bucket(zcC, bucketB.name) - check_objects_not_exist(bucket, objnamesB) - - remove_sync_policy_group(c1, "sync-group") - return - -@attr('fails_with_rgw') -@attr('sync_policy') -def test_sync_flow_directional_zonegroup_select(): - """ - test_sync_flow_directional_zonegroup_select: - allow sync from only zoneA to zoneB - - verify that data doesn't get synced to zoneC and - zoneA shouldn't sync data from zoneB either - """ - - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - - if len(zonegroup.zones) < 3: - raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") - - zonegroup_meta_checkpoint(zonegroup) - - (zoneA, zoneB, zoneC) = zonegroup.zones[0:3] - (zcA, zcB, zcC) = zonegroup_conns.zones[0:3] - - c1 = zoneA.cluster - - # configure sync policy - zones = zoneA.name + ',' + zoneB.name - c1.admin(['sync', 'policy', 'get']) - create_sync_policy_group(c1, "sync-group") - create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name) - set_sync_policy_group_status(c1, "sync-group", "enabled") - - zonegroup.period.update(zoneA, commit=True) - get_sync_policy(c1) - - buckets = [] - content = 'asdasd' - - # create bucketA & objects in zoneA - objnamesA = [ 'obj1', 'obj2', 'obj3' ] - bucketA = create_zone_bucket(zcA) - buckets.append(bucketA) - create_objects(zcA, bucketA, objnamesA, content) - - # create bucketB & objects in zoneB - objnamesB = [ 'obj4', 'obj5', 'obj6' ] - bucketB = create_zone_bucket(zcB) - buckets.append(bucketB) - create_objects(zcB, bucketB, objnamesB, content) - - zonegroup_meta_checkpoint(zonegroup) - zone_data_checkpoint(zoneB, zoneA) - - # verify if objnamesA synced to only zoneB but not zoneC - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesA, content) - - bucket = get_bucket(zcC, bucketA.name) - check_objects_not_exist(bucket, objnamesA) - - # verify if objnamesB are not synced to either zoneA or zoneC - bucket = get_bucket(zcA, bucketB.name) - check_objects_not_exist(bucket, objnamesB) - - bucket = get_bucket(zcC, bucketB.name) - check_objects_not_exist(bucket, objnamesB) - - """ - verify the same at bucketA level - configure another policy at bucketA level with src and dest - zones specified to zoneA and zoneB resp. - - verify zoneA bucketA syncs to zoneB BucketA but not viceversa. - """ - # reconfigure zonegroup pipe & flow - remove_sync_group_pipe(c1, "sync-group", "sync-pipe") - remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) - create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) - - # change state to allowed - set_sync_policy_group_status(c1, "sync-group", "allowed") - - zonegroup.period.update(zoneA, commit=True) - get_sync_policy(c1) - - # configure sync policy for only bucketA and enable it - create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) - create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) - args = ['--source-bucket=*', '--dest-bucket=*'] - create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args) - set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) - - get_sync_policy(c1, bucketA.name) - - zonegroup_meta_checkpoint(zonegroup) - - # create objects in bucketA in zoneA and zoneB - objnamesC = [ 'obj7', 'obj8', 'obj9' ] - objnamesD = [ 'obj10', 'obj11', 'obj12' ] - create_objects(zcA, bucketA, objnamesC, content) - create_objects(zcB, bucketA, objnamesD, content) - - zonegroup_meta_checkpoint(zonegroup) - zone_data_checkpoint(zoneB, zoneA) - - # verify that objnamesC are synced to bucketA in zoneB - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnamesC, content) - - # verify that objnamesD are not synced to bucketA in zoneA - bucket = get_bucket(zcA, bucketA.name) - check_objects_not_exist(bucket, objnamesD) - - remove_sync_policy_group(c1, "sync-bucket", bucketA.name) - remove_sync_policy_group(c1, "sync-group") - return - -@attr('fails_with_rgw') -@attr('sync_policy') -def test_sync_single_bucket(): - """ - test_sync_single_bucket: - Allow data sync for only bucketA but not for other buckets via - below 2 methods - - (a) zonegroup: symmetrical flow but configure pipe for only bucketA. - (b) bucket level: configure policy for bucketA - """ - - zonegroup = realm.master_zonegroup() - zonegroup_meta_checkpoint(zonegroup) - - zonegroup_conns = ZonegroupConns(zonegroup) - - (zoneA, zoneB) = zonegroup.zones[0:2] - (zcA, zcB) = zonegroup_conns.zones[0:2] - - c1 = zoneA.cluster - - c1.admin(['sync', 'policy', 'get']) - - zones = zoneA.name + ',' + zoneB.name - get_sync_policy(c1) - - objnames = [ 'obj1', 'obj2', 'obj3' ] - content = 'asdasd' - buckets = [] - - # create bucketA & bucketB in zoneA - bucketA = create_zone_bucket(zcA) - buckets.append(bucketA) - bucketB = create_zone_bucket(zcA) - buckets.append(bucketB) - - zonegroup_meta_checkpoint(zonegroup) - - """ - Method (a): configure pipe for only bucketA - """ - # configure sync policy & pipe for only bucketA - create_sync_policy_group(c1, "sync-group") - create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) - args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketA.name] - - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args) - set_sync_policy_group_status(c1, "sync-group", "enabled") - get_sync_policy(c1) - zonegroup.period.update(zoneA, commit=True) - - sync_info(c1) - - # create objects in bucketA & bucketB - create_objects(zcA, bucketA, objnames, content) - create_object(zcA, bucketB, objnames, content) - - zonegroup_meta_checkpoint(zonegroup) - zone_data_checkpoint(zoneB, zoneA) - - # verify if bucketA objects are synced - bucket = get_bucket(zcB, bucketA.name) - check_objects_exist(bucket, objnames, content) - - # bucketB objects should not be synced - bucket = get_bucket(zcB, bucketB.name) - check_objects_not_exist(bucket, objnames) - - - """ - Method (b): configure policy at only bucketA level - """ - # reconfigure group pipe - remove_sync_group_pipe(c1, "sync-group", "sync-pipe") - create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) - - # change state to allowed - set_sync_policy_group_status(c1, "sync-group", "allowed") - - zonegroup.period.update(zoneA, commit=True) - get_sync_policy(c1) + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) # configure sync policy for only bucketA and enable it @@ -4370,3 +4066,802 @@ def test_bucket_replication_alt_user(): # check that object exists in destination bucket k = get_key(dest, dest_bucket, objname) assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo') + +@allow_bucket_replication +def test_bucket_replication_reject_versioning_identical(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + source = zonegroup_conns.non_account_rw_zones[0] + dest = zonegroup_conns.non_account_rw_zones[1] + + source_bucket = source.create_bucket(gen_bucket_name()) + dest_bucket = dest.create_bucket(gen_bucket_name()) + source.s3_client.put_bucket_versioning( + Bucket=source_bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zonegroup_meta_checkpoint(zonegroup) + + # create replication configuration + e = assert_raises(ClientError, + source.s3_client.put_bucket_replication, + Bucket=source_bucket.name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [{ + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket.name}', + } + }] + }) + assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 + +@allow_bucket_replication +def test_bucket_replicaion_reject_objectlock_identical(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + source = zonegroup_conns.non_account_rw_zones[0] + dest = zonegroup_conns.non_account_rw_zones[1] + + source_bucket = source.create_bucket(gen_bucket_name()) + dest_bucket_name = gen_bucket_name() + dest.s3_client.create_bucket(Bucket=dest_bucket_name, ObjectLockEnabledForBucket=True) + zonegroup_meta_checkpoint(zonegroup) + + # create replication configuration + e = assert_raises(ClientError, + source.s3_client.put_bucket_replication, + Bucket=source_bucket.name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [{ + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket_name}', + } + }] + }) + assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 + +@allow_bucket_replication +def test_bucket_replication_non_versioned_to_versioned(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + source = zonegroup_conns.non_account_rw_zones[0] + dest = zonegroup_conns.non_account_rw_zones[1] + + source_bucket = source.create_bucket(gen_bucket_name()) + dest_bucket = dest.create_bucket(gen_bucket_name()) + zonegroup_meta_checkpoint(zonegroup) + + # create replication configuration + response = source.s3_client.put_bucket_replication( + Bucket=source_bucket.name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [ + { + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket.name}', + } + } + ] + } + ) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + zonegroup_meta_checkpoint(zonegroup) + + # enable versioning on destination bucket + dest.s3_client.put_bucket_versioning( + Bucket=dest_bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zonegroup_meta_checkpoint(zonegroup) + + # upload an object and wait for sync. + objname = 'dummy' + k = new_key(source, source_bucket, objname) + k.set_contents_from_string('foo') + zone_data_checkpoint(dest.zone, source.zone) + + # check that object not exists in destination bucket + e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) + assert e.response['Error']['Code'] == 'NoSuchKey' + +@allow_bucket_replication +def test_bucket_replication_versioned_to_non_versioned(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + source = zonegroup_conns.non_account_rw_zones[0] + dest = zonegroup_conns.non_account_rw_zones[1] + + source_bucket = source.create_bucket(gen_bucket_name()) + dest_bucket = dest.create_bucket(gen_bucket_name()) + zonegroup_meta_checkpoint(zonegroup) + + # create replication configuration + response = source.s3_client.put_bucket_replication( + Bucket=source_bucket.name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [ + { + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket.name}', + } + } + ] + } + ) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + zonegroup_meta_checkpoint(zonegroup) + + # enable versioning on source bucket + source.s3_client.put_bucket_versioning( + Bucket=source_bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zonegroup_meta_checkpoint(zonegroup) + + # upload an object and wait for sync. + objname = 'dummy' + k = new_key(source, source_bucket, objname) + k.set_contents_from_string('foo') + zone_data_checkpoint(dest.zone, source.zone) + + # check that object not exists in destination bucket + e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) + assert e.response['Error']['Code'] == 'NoSuchKey' + +@allow_bucket_replication +def test_bucket_replication_lock_enabled_to_lock_disabled(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + source = zonegroup_conns.non_account_rw_zones[0] + dest = zonegroup_conns.non_account_rw_zones[1] + + source_bucket_name = gen_bucket_name() + source.create_bucket(source_bucket_name) + # enabled versioning + source.s3_client.put_bucket_versioning( + Bucket=source_bucket_name, + VersioningConfiguration={'Status': 'Enabled'} + ) + dest_bucket = dest.create_bucket(gen_bucket_name()) + # enabled versioning + dest.s3_client.put_bucket_versioning( + Bucket=dest_bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zonegroup_meta_checkpoint(zonegroup) + + # create replication configuration + source.s3_client.put_bucket_replication( + Bucket=source_bucket_name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [{ + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket.name}', + } + }] + } + ) + zonegroup_meta_checkpoint(zonegroup) + + # enable object lock on source bucket + source.s3_client.put_object_lock_configuration( + Bucket=source_bucket_name, + ObjectLockConfiguration={ + 'ObjectLockEnabled': 'Enabled', + 'Rule': { + 'DefaultRetention': { + 'Mode': 'GOVERNANCE', + 'Days': 1 + } + } + } + ) + zonegroup_meta_checkpoint(zonegroup) + + # upload an object and wait for sync. + objname = 'dummy' + k = new_key(source, source_bucket_name, objname) + k.set_contents_from_string('foo') + zone_data_checkpoint(dest.zone, source.zone) + + # check that object does not exist in destination bucket + e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket.name, Key=objname) + assert e.response['Error']['Code'] == 'NoSuchKey' + +@allow_bucket_replication +def test_bucket_replication_lock_disabled_to_lock_enabled(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + source = zonegroup_conns.non_account_rw_zones[0] + dest = zonegroup_conns.non_account_rw_zones[1] + + source_bucket = source.create_bucket(gen_bucket_name()) + # enabled versioning + source.s3_client.put_bucket_versioning( + Bucket=source_bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + dest_bucket_name = gen_bucket_name() + dest.create_bucket(dest_bucket_name) + # enabled versioning + dest.s3_client.put_bucket_versioning( + Bucket=dest_bucket_name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zonegroup_meta_checkpoint(zonegroup) + + # create replication configuration + source.s3_client.put_bucket_replication( + Bucket=source_bucket.name, + ReplicationConfiguration={ + 'Role': '', + 'Rules': [{ + 'ID': 'rule1', + 'Status': 'Enabled', + 'Destination': { + 'Bucket': f'arn:aws:s3:::{dest_bucket_name}', + } + }] + } + ) + zonegroup_meta_checkpoint(zonegroup) + + # enable object lock on destination bucket + dest.s3_client.put_object_lock_configuration( + Bucket=dest_bucket_name, + ObjectLockConfiguration={ + 'ObjectLockEnabled': 'Enabled', + 'Rule': { + 'DefaultRetention': { + 'Mode': 'GOVERNANCE', + 'Days': 1 + } + } + } + ) + zonegroup_meta_checkpoint(zonegroup) + + # upload an object and wait for sync. + objname = 'dummy' + k = new_key(source, source_bucket.name, objname) + k.set_contents_from_string('foo') + zone_data_checkpoint(dest.zone, source.zone) + + # check that object does not exist in destination bucket + e = assert_raises(ClientError, dest.s3_client.get_object, Bucket=dest_bucket_name, Key=objname) + assert e.response['Error']['Code'] == 'NoSuchKey' + +@attr('sync_policy') +@attr('fails_with_rgw') +def test_bucket_delete_with_zonegroup_sync_policy_directional(): + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + + c1 = zoneA.cluster + + # configure sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_directional(c1, "sync-group", "sync-flow1", zoneA.name, zoneB.name) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name) + set_sync_policy_group_status(c1, "sync-group", "enabled") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + # configure sync policy for only bucketA and enable it + bucketA = create_zone_bucket(zcA) + buckets = [] + buckets.append(bucketA) + + time.sleep(config.checkpoint_delay) + zonegroup_meta_checkpoint(zonegroup) + + # create bucketA and objects in zoneA and zoneB + objnameA = 'a' + objnameB = 'b' + + # upload object in each zone and wait for sync. + k = new_key(zcA, bucketA, objnameA) + k.set_contents_from_string('foo') + k = new_key(zcB, bucketA, objnameB) + k.set_contents_from_string('foo') + + zonegroup_meta_checkpoint(zonegroup) + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objnameA is synced to bucketA in zoneB + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnameA) + + # verify that objnameB is not synced to bucketA in zoneA + bucket = get_bucket(zcA, bucketA.name) + check_objects_not_exist(bucket, objnameB) + + log.debug('deleting object on zone A') + k = get_key(zcA, bucket, objnameA) + k.delete() + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + + # delete bucket on zoneA. it should fail to delete + log.debug('deleting bucket') + assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + + assert check_all_buckets_exist(zcA, buckets) + assert check_all_buckets_exist(zcB, buckets) + + # retry deleting bucket after removing the object from zone B. should succeed + log.debug('deleting object on zone B') + k = get_key(zcB, bucket, objnameB) + k.delete() + time.sleep(config.checkpoint_delay) + + log.debug('retry deleting bucket') + zcA.delete_bucket(bucketA.name) + + zonegroup_meta_checkpoint(zonegroup) + + assert check_all_buckets_dont_exist(zcA, buckets) + assert check_all_buckets_dont_exist(zcB, buckets) + + remove_sync_policy_group(c1, "sync-group") + + return + +@attr('sync_policy') +@attr('fails_with_rgw') +def test_bucket_delete_with_bucket_sync_policy_directional(): + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + + c1 = zoneA.cluster + + # configure sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name) + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + """ + configure policy at bucketA level with src and dest + zones specified to zoneA and zoneB resp. + + verify zoneA bucketA syncs to zoneB BucketA but not viceversa. + """ + + # configure sync policy for only bucketA and enable it + bucketA = create_zone_bucket(zcA) + buckets = [] + buckets.append(bucketA) + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name) + #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name) + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1, bucketA.name) + time.sleep(config.checkpoint_delay) + + zonegroup_meta_checkpoint(zonegroup) + + # create bucketA and objects in zoneA and zoneB + objnameA = 'a' + objnameB = 'b' + + # upload object in each zone and wait for sync. + k = new_key(zcA, bucketA, objnameA) + k.set_contents_from_string('foo') + k = new_key(zcB, bucketA, objnameB) + k.set_contents_from_string('foo') + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objnameA is synced to bucketA in zoneB + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnameA) + + # verify that objnameB is not synced to bucketA in zoneA + bucket = get_bucket(zcA, bucketA.name) + check_objects_not_exist(bucket, objnameB) + + log.debug('deleting object on zone A') + k = get_key(zcA, bucket, objnameA) + k.delete() + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + + # delete bucket on zoneA. it should fail to delete + log.debug('deleting bucket') + assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + + assert check_all_buckets_exist(zcA, buckets) + assert check_all_buckets_exist(zcB, buckets) + + log.debug('deleting object on zone B') + k = get_key(zcB, bucket, objnameB) + k.delete() + time.sleep(config.checkpoint_delay) + + # retry deleting bucket after removing the object from zone B. should succeed + log.debug('retry deleting bucket') + zcA.delete_bucket(bucketA.name) + + zonegroup_meta_checkpoint(zonegroup) + + assert check_all_buckets_dont_exist(zcA, buckets) + assert check_all_buckets_dont_exist(zcB, buckets) + + remove_sync_policy_group(c1, "sync-group") + + return + +@attr('sync_policy') +@attr('fails_with_rgw') +def test_bucket_delete_with_bucket_sync_policy_symmetric(): + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + + c1 = zoneA.cluster + + # configure sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + """ + configure symmetrical policy at bucketA level with src and dest + zones specified to zoneA and zoneB resp. + """ + + # configure sync policy for only bucketA and enable it + bucketA = create_zone_bucket(zcA) + buckets = [] + buckets.append(bucketA) + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name) + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1, bucketA.name) + + zonegroup_meta_checkpoint(zonegroup) + + # create bucketA and objects in zoneA and zoneB + objnameA = 'a' + objnameB = 'b' + + # upload object in each zone and wait for sync. + k = new_key(zcA, bucketA, objnameA) + k.set_contents_from_string('foo') + k = new_key(zcB, bucketA, objnameB) + k.set_contents_from_string('foo') + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zone_data_checkpoint(zoneB, zoneA) + + log.debug('deleting object A') + k = get_key(zcA, bucketA, objnameA) + k.delete() + + log.debug('deleting object B') + k = get_key(zcA, bucketA, objnameB) + k.delete() + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zone_data_checkpoint(zoneB, zoneA) + + # delete bucket on zoneA. + log.debug('deleting bucket') + zcA.delete_bucket(bucketA.name) + zonegroup_meta_checkpoint(zonegroup) + + assert check_all_buckets_dont_exist(zcA, buckets) + assert check_all_buckets_dont_exist(zcB, buckets) + + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +@attr('fails_with_rgw') +def test_bucket_delete_with_zonegroup_sync_policy_symmetric(): + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + + c1 = zoneA.cluster + + # configure symmetric sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + set_sync_policy_group_status(c1, "sync-group", "enabled") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + # configure sync policy for only bucketA and enable it + bucketA = create_zone_bucket(zcA) + buckets = [] + buckets.append(bucketA) + + time.sleep(config.checkpoint_delay) + zonegroup_meta_checkpoint(zonegroup) + + # create bucketA and objects in zoneA and zoneB + objnameA = 'a' + objnameB = 'b' + + # upload object in each zone and wait for sync. + k = new_key(zcA, bucketA, objnameA) + k.set_contents_from_string('foo') + k = new_key(zcB, bucketA, objnameB) + k.set_contents_from_string('foo') + + zone_data_checkpoint(zoneB, zoneA) + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + + log.debug('deleting object A') + k = get_key(zcA, bucketA, objnameA) + k.delete() + + log.debug('deleting object B') + k = get_key(zcA, bucketA, objnameB) + k.delete() + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zone_data_checkpoint(zoneB, zoneA) + + # delete bucket on zoneA. + log.debug('deleting bucket') + zcA.delete_bucket(bucketA.name) + zonegroup_meta_checkpoint(zonegroup) + + assert check_all_buckets_dont_exist(zcA, buckets) + assert check_all_buckets_dont_exist(zcB, buckets) + + remove_sync_flow(c1, "sync-group", "sync-flow", "symmetrical") + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +@attr('fails_with_rgw') +def test_delete_bucket_with_zone_opt_out(): + """ + test_delete_bucket_with_zone_opt_out: + allow sync between zoneA & zoneB + verify zoneC doesnt sync the data + test delete bucket + """ + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + if len(zonegroup.zones) < 3: + raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB, zoneC) = zonegroup.zones[0:3] + (zcA, zcB, zcC) = zonegroup_conns.zones[0:3] + + c1 = zoneA.cluster + + # configure sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + set_sync_policy_group_status(c1, "sync-group", "enabled") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + bucketA = create_zone_bucket(zcA) + buckets = [] + buckets.append(bucketA) + + time.sleep(config.checkpoint_delay) + zonegroup_meta_checkpoint(zonegroup) + + # create bucketA and objects in zoneA and zoneB + objnameA = 'a' + objnameB = 'b' + + # upload object in zone A and zone C + k = new_key(zcA, bucketA, objnameA) + k.set_contents_from_string('foo') + k = new_key(zcC, bucketA, objnameB) + k.set_contents_from_string('foo') + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objnameA is synced to zoneB but not zoneC + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnameA) + + bucket = get_bucket(zcC, bucketA.name) + check_objects_not_exist(bucket, objnameA) + + # verify that objnameB is not synced to either zoneA or zoneB + bucket = get_bucket(zcA, bucketA.name) + check_objects_not_exist(bucket, objnameB) + + bucket = get_bucket(zcB, bucketA.name) + check_objects_not_exist(bucket, objnameB) + + log.debug('deleting object on zone A') + k = get_key(zcA, bucket, objnameA) + k.delete() + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + + # delete bucket on zoneA. it should fail to delete because zoneC still has objnameB + log.debug('deleting bucket') + assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + + assert check_all_buckets_exist(zcA, buckets) + assert check_all_buckets_exist(zcC, buckets) + + # retry deleting bucket after removing the object from zone C. should succeed + log.debug('deleting object on zone C') + k = get_key(zcC, bucket, objnameB) + k.delete() + time.sleep(config.checkpoint_delay) + + log.debug('retry deleting bucket') + zcA.delete_bucket(bucketA.name) + + zonegroup_meta_checkpoint(zonegroup) + + assert check_all_buckets_dont_exist(zcA, buckets) + assert check_all_buckets_dont_exist(zcC, buckets) + + remove_sync_policy_group(c1, "sync-group") + + return + +@attr('sync_policy') +@attr('fails_with_rgw') +def test_bucket_delete_with_sync_policy_object_prefix(): + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + + c1 = zoneA.cluster + + # configure sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + """ + configure symmetrical policy at bucketA level with src and dest + and apply object prefix filtering + """ + + # configure sync policy for only bucketA with an object prefix filter + bucketA = create_zone_bucket(zcA) + buckets = [] + buckets.append(bucketA) + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name) + args = ['--prefix=test-'] + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name, args) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1, bucketA.name) + + zonegroup_meta_checkpoint(zonegroup) + + # create bucketA and objects in zoneA and zoneB + objnameA = 'test-a' + objnameB = 'b' + + # upload object in each zone and wait for sync. + k = new_key(zcA, bucketA, objnameA) + k.set_contents_from_string('foo') + k = new_key(zcB, bucketA, objnameB) + k.set_contents_from_string('foo') + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objnameA is synced to zoneB + bucket = get_bucket(zcB, bucketA.name) + check_object_exists(bucket, objnameA) + + # verify that objnameB is not synced to zoneA + bucket = get_bucket(zcA, bucketA.name) + check_object_not_exists(bucket, objnameB) + + log.debug('deleting object A') + k = get_key(zcA, bucketA, objnameA) + k.delete() + + zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) + zone_data_checkpoint(zoneB, zoneA) + + # delete bucket on zoneA. it should fail to delete because zoneB still has objnameB + log.debug('deleting bucket') + assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name) + + assert check_all_buckets_exist(zcA, buckets) + assert check_all_buckets_exist(zcB, buckets) + + remove_sync_policy_group(c1, "sync-group") + + return -- 2.39.5