*/
static void decreasing_str(uint64_t num, string *str)
{
+ // This buffer must be big enough to hold the string representation of
+ // the largest unsigned 64-bit integer value (+ 1 more char).
char buf[32];
if (num < 0x10) { /* 16 */
- snprintf(buf, sizeof(buf), "9%02lld", 15 - (long long)num);
+ snprintf(buf, sizeof(buf), "9%02" PRIu64, 0xF - num);
} else if (num < 0x100) { /* 256 */
- snprintf(buf, sizeof(buf), "8%03lld", 255 - (long long)num);
+ snprintf(buf, sizeof(buf), "8%03" PRIu64, 0xFF - num);
} else if (num < 0x1000) /* 4096 */ {
- snprintf(buf, sizeof(buf), "7%04lld", 4095 - (long long)num);
+ snprintf(buf, sizeof(buf), "7%04" PRIu64, 0xFFF - num);
} else if (num < 0x10000) /* 65536 */ {
- snprintf(buf, sizeof(buf), "6%05lld", 65535 - (long long)num);
+ snprintf(buf, sizeof(buf), "6%05" PRIu64, 0xFFFF - num);
} else if (num < 0x100000000) /* 4G */ {
- snprintf(buf, sizeof(buf), "5%010lld", 0xFFFFFFFF - (long long)num);
+ snprintf(buf, sizeof(buf), "5%010" PRIu64, 0xFFFFFFFF - num);
+ } else if (num < 0x10000000000) /* 1T */ {
+ snprintf(buf, sizeof(buf), "4%015" PRIu64, 0xFFFFFFFFFF - num);
+ } else if (num < 0x1000000000000) /* 281T */ {
+ snprintf(buf, sizeof(buf), "3%018" PRIu64, 0xFFFFFFFFFFFF - num);
} else {
- snprintf(buf, sizeof(buf), "4%020lld", (long long)-num);
+ snprintf(buf, sizeof(buf), "2%020" PRIu64, std::numeric_limits<uint64_t>::max() - num);
}
*str = buf;
if (val[0] == 'i') {
key->instance = val.substr(1);
} else if (val[0] == 'v') {
+ // what we are dealing here with is the string representation of the versioned epoch (as converted to by
+ // decreasing_str() func); the first char is always 'v' to indicate that it is the versioned epoch; the
+ // second char is a digit in [9-2] range that is used to separate value ranges - in order to make
+ // string representation sort in the opposite direction and to decrease string length - to speed up
+ // the lexicographical comparison; hence +2 (1 for the value indicator and one for the range prefix);
string err;
- const char *s = val.c_str() + 1;
- *ver = strict_strtoll(s, 10, &err);
- if (!err.empty()) {
- CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s);
+ if (val.size() > 2) {
+ const char *s = val.c_str() + 2;
+ *ver = strict_strtoull(s, 10, &err);
+ if (!err.empty()) {
+ CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s);
+ return -EIO;
+ }
+ } else {
+ CLS_LOG(0, "ERROR: %s: bad index_key (%s): empty val", __func__, escape_str(index_key).c_str());
return -EIO;
}
}
return 0;
}
- bool start_modify(uint64_t candidate_epoch) {
- if (candidate_epoch) {
- if (candidate_epoch < olh_data_entry.epoch) {
- return false; /* olh cannot be modified, old epoch */
- }
- olh_data_entry.epoch = candidate_epoch;
- } else {
- if (olh_data_entry.epoch == 0) {
- olh_data_entry.epoch = 2; /* versioned epoch should start with 2, 1 is reserved to converted plain entries */
- } else {
- olh_data_entry.epoch++;
- }
+ /**
+ * This is called when a new instance of an object (in a versioned bucket) is added (via PUT) or an existing instance is removed.
+ * A part of that process is to update the OLH entry (in the bucket index) with the correct modification timestamp (epoch).
+ * This timestamp is then used later on to guard against OLH updates for add/remove instance ops that happened *before*
+ * the latest op that updated the OLH entry.
+ * @param candidate_epoch - this is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+ */
+ bool start_modify (uint64_t candidate_epoch) {
+ // only update the olh.epoch if it is newer than the current one.
+ if (candidate_epoch < olh_data_entry.epoch) {
+ return false; /* olh cannot be modified, old epoch */
}
+
+ olh_data_entry.epoch = candidate_epoch;
return true;
}
const uint64_t prev_epoch = olh.get_epoch();
- if (!olh.start_modify(op.olh_epoch)) {
- ret = obj.write(op.olh_epoch, false, header);
- if (ret < 0) {
- return ret;
- }
- if (removing) {
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
- }
- return write_header_while_logrecord(hctx, header);
- }
-
- // promote this version to current if it's a newer epoch, or if it matches the
- // current epoch and sorts after the current instance
- const bool promote = (olh.get_epoch() > prev_epoch) ||
- (olh.get_epoch() == prev_epoch &&
- olh.get_entry().key.instance >= op.key.instance);
+ // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+ uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
+ duration_cast<std::chrono::nanoseconds>(obj.mtime().time_since_epoch()).count();
+ if (olh.start_modify(candidate_epoch)) {
+ // promote this version to current if it's a newer epoch, or if it matches the
+ // current epoch and sorts after the current instance
+ const bool promote = (olh.get_epoch() > prev_epoch) ||
+ (olh.get_epoch() == prev_epoch &&
+ olh.get_entry().key.instance >= op.key.instance);
+ const bool epoch_collision = olh.get_epoch() == prev_epoch;
+
+ if (olh_found) {
+ const string &olh_tag = olh.get_tag();
+ if (op.olh_tag != olh_tag) {
+ if (!olh.pending_removal()) {
+ CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str());
+ return -ECANCELED;
+ }
+ /* if pending removal, this is a new olh instance */
+ olh.set_tag(op.olh_tag);
+ }
+ if (epoch_collision) {
+ auto const &s_key = op.key.to_string();
+ CLS_LOG(1, "NOTICE: versioned epoch collision (%lu) for object %s", prev_epoch, s_key.c_str());
+ }
+ if (promote && olh.exists()) {
+ rgw_bucket_olh_entry &olh_entry = olh.get_entry();
+ /* found olh, previous instance is no longer the latest, need to update */
+ if (!(olh_entry.key == op.key)) {
+ BIVerObjEntry old_obj(hctx, olh_entry.key);
- if (olh_found) {
- const string& olh_tag = olh.get_tag();
- if (op.olh_tag != olh_tag) {
- if (!olh.pending_removal()) {
- CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str());
- return -ECANCELED;
+ ret = old_obj.demote_current(header);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
+ return ret;
+ }
+ }
+ }
+ olh.set_pending_removal(false);
+ } else {
+ bool instance_only = (op.key.instance.empty() && op.delete_marker);
+ cls_rgw_obj_key key(op.key.name);
+ ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
+ return ret;
}
- /* if pending removal, this is a new olh instance */
olh.set_tag(op.olh_tag);
+ if (op.key.instance.empty()) {
+ obj.set_epoch(1);
+ }
}
- if (promote && olh.exists()) {
- rgw_bucket_olh_entry& olh_entry = olh.get_entry();
- /* found olh, previous instance is no longer the latest, need to update */
- if (!(olh_entry.key == op.key)) {
- BIVerObjEntry old_obj(hctx, olh_entry.key);
- ret = old_obj.demote_current(header);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
- return ret;
- }
- }
+ /* update the olh log */
+ olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
+ if (removing) {
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
}
- olh.set_pending_removal(false);
- } else {
- bool instance_only = (op.key.instance.empty() && op.delete_marker);
- cls_rgw_obj_key key(op.key.name);
- ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
+
+ if (promote) {
+ olh.update(op.key, op.delete_marker);
+ }
+ olh.set_exists(true);
+
+ /* write the instance and list entries */
+ ret = obj.write(olh.get_epoch(), promote, header);
if (ret < 0) {
- CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
}
- olh.set_tag(op.olh_tag);
- if (op.key.instance.empty()){
- obj.set_epoch(1);
- }
- }
- /* update the olh log */
- olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
- if (removing) {
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+ ret = olh.write(header);
}
+ else {
+ ret = obj.write(candidate_epoch, false, header);
+ if (ret < 0) {
+ return ret;
+ }
- if (promote) {
- olh.update(op.key, op.delete_marker);
- }
- olh.set_exists(true);
+ // no point here in adding CLS_RGW_OLH_OP_LINK_OLH to the pending log as we know that
+ // the epoch is already stale compared to the current - so no point in applying it;
- ret = olh.write(header);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
- return ret;
+ if (removing) {
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
+ ret = olh.write(header);
+ }
}
- /* write the instance and list entries */
- ret = obj.write(olh.get_epoch(), promote, header);
if (ret < 0) {
+ CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
return ret;
}
rgw_bucket_dir_entry& entry = obj.get_dir_entry();
rgw_bucket_entry_ver ver;
- ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch());
+ ver.epoch = candidate_epoch;
string *powner = NULL;
string *powner_display_name = NULL;
obj.set_epoch(1);
}
- if (!olh.start_modify(op.olh_epoch)) {
- ret = obj.unlink_list_entry(header);
- if (ret < 0) {
- return ret;
- }
-
- if (obj.is_delete_marker()) {
- return 0;
- }
+ // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+ uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
+ duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
+ if (olh.start_modify(candidate_epoch)) {
+ rgw_bucket_olh_entry &olh_entry = olh.get_entry();
+ cls_rgw_obj_key &olh_key = olh_entry.key;
+ CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
+ olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker);
+
+ if (olh_key == dest_key) {
+ /* this is the current head, need to update the OLH! */
+ cls_rgw_obj_key next_key;
+ bool found = false;
+ ret = obj.find_next_key(&next_key, &found);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret);
+ return ret;
+ }
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
- return olh.write(header);
- }
+ if (found) {
+ BIVerObjEntry next(hctx, next_key);
+ ret = next.write(olh.get_epoch(), true, header);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
+ return ret;
+ }
- rgw_bucket_olh_entry& olh_entry = olh.get_entry();
- cls_rgw_obj_key& olh_key = olh_entry.key;
- CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
- olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker);
+ CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__,
+ next_key.name.c_str(), next_key.instance.c_str(), (int) next.is_delete_marker());
- if (olh_key == dest_key) {
- /* this is the current head, need to update the OLH! */
- cls_rgw_obj_key next_key;
- bool found = false;
- ret = obj.find_next_key(&next_key, &found);
- if (ret < 0) {
- CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret);
- return ret;
+ olh.update(next_key, next.is_delete_marker());
+ olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
+ } else {
+ // next_key is empty, but we need to preserve its name in case this entry
+ // gets resharded, because this key is used for hash placement
+ next_key.name = dest_key.name;
+ olh.update(next_key, false);
+ olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
+ olh.set_exists(false);
+ olh.set_pending_removal(true);
+ }
}
- if (found) {
- BIVerObjEntry next(hctx, next_key);
- ret = next.write(olh.get_epoch(), true, header);
+ if (!obj.is_delete_marker()) {
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+ } else {
+ /* this is a delete marker, it's our responsibility to remove its
+ * instance entry */
+ ret = obj.unlink(header, op.key);
if (ret < 0) {
- CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
return ret;
}
-
- CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__,
- next_key.name.c_str(), next_key.instance.c_str(), (int)next.is_delete_marker());
-
- olh.update(next_key, next.is_delete_marker());
- olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
- } else {
- // next_key is empty, but we need to preserve its name in case this entry
- // gets resharded, because this key is used for hash placement
- next_key.name = dest_key.name;
- olh.update(next_key, false);
- olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
- olh.set_exists(false);
- olh.set_pending_removal(true);
}
- }
- if (!obj.is_delete_marker()) {
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
- } else {
- /* this is a delete marker, it's our responsibility to remove its
- * instance entry */
- ret = obj.unlink(header, op.key);
+ ret = obj.unlink_list_entry(header);
if (ret < 0) {
return ret;
}
}
+ else {
+ ret = obj.unlink_list_entry(header);
+ if (ret < 0) {
+ return ret;
+ }
- ret = obj.unlink_list_entry(header);
- if (ret < 0) {
- return ret;
+ if (obj.is_delete_marker()) {
+ return 0;
+ }
+
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
}
ret = olh.write(header);
}
rgw_bucket_entry_ver ver;
- ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch());
+ ver.epoch = candidate_epoch;
real_time mtime = obj.mtime(); /* mtime has no real meaning in
* instance removal context */
import logging
import errno
import dateutil.parser
+from datetime import datetime
+import threading
+from typing import Dict, List, Any
from itertools import combinations
from itertools import zip_longest
bilog, _ = zone.cluster.admin(cmd, read_only=True)
return json.loads(bilog)
+def bucket_list(zone, bucket, args = None):
+ cmd = ['bucket', 'list', '--bucket', bucket, '--max-entries', '100000', '--uid', user.name] + (args or [])
+ cmd += ['--tenant', config.tenant] if config.tenant else []
+ output, _ = zone.cluster.admin(cmd, read_only=True)
+ return json.loads(output)
+
def bilog_autotrim(zone, args = None):
cmd = ['bilog', 'autotrim'] + (args or []) + zone.zone_args()
zone.cluster.admin(cmd, debug_rgw=20)
# run bilog trim twice on primary zone where the bucket was resharded
bilog_autotrim(primary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],)
-
+
for zonegroup in realm.current_period.zonegroups:
zonegroup_conns = ZonegroupConns(zonegroup)
for zone in zonegroup_conns.zones:
log.info(f'checking if zone: {zone.name} has role: {role_name}')
assert(zone.has_role(role_name))
log.info(f'success, zone: {zone.name} has role: {role_name}')
-
+
for zone in zonegroup_conns.zones:
if zone == zonegroup_conns.master_zone:
log.info(f'creating bucket in primary zone')
CreateBucketConfiguration={'LocationConstraint': zg.name})
assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
+def test_timestamp_based_epochs():
+ """
+ test_timestamp_based_epochs:
+ the test generates objects/instance in both zones: for each of NUM_OBJECTS NUM_VERSIONS are generated;
+ then it waits for the replication to finish and then lists objects/instances in both zones and checks
+ that the instances there are listed are in chronological order, with the expectation that without
+ time-based epochs the listed order of object versions won't be chronological; with the time-based epochs
+ the order should be strictly chronological
+ """
+ class ObjVersion:
+ def __init__ (self, name: str, instance: str, mtime: datetime, ver_epoch: int):
+ self.name = name
+ self.instance = instance
+ self.mtime = mtime
+ self.ver_epoch = ver_epoch
+
+ def __eq__ (self, other):
+ return (self.name == other.name and
+ self.instance == other.instance and
+ self.mtime == other.mtime and
+ self.ver_epoch == other.ver_epoch)
+
+ def parse_bucket_list_output (data: Any) -> Dict[str, List[ObjVersion]]:
+ """
+ Parses output of the 'radosgw-admin bucket-list --bucket <name> --format json' command.
+ :param output:
+ :return:
+ """
+ if not isinstance(data, list):
+ raise ValueError("Expected a list of entries in JSON input")
+
+ results: Dict[str, List[ObjVersion]] = {}
+ for entry in data:
+ if not isinstance(entry, dict):
+ continue
+
+ name = entry["name"]
+ instance = entry["instance"]
+ mtime = entry.get("meta", {}).get("mtime")
+ ver_epoch = entry["versioned_epoch"]
+
+ obj_ver= ObjVersion(name, instance, mtime, ver_epoch)
+ if results.get(name) is None:
+ results[name] = []
+ results[name].append(obj_ver)
+
+ return results
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+
+ NUM_OBJECTS = 10
+ NUM_VERSIONS = 100
+
+ source_bucket = primary.create_bucket(gen_bucket_name())
+ log.info('created bucket=%s', source_bucket.name)
+
+ def create_bucket_objects (zone):
+ client = zone.s3_client
+ log.info(f"Creating objects for {client.meta.endpoint_url} in bucket {source_bucket.name}")
+ for i in range(0, NUM_OBJECTS):
+ for vid in range(0, NUM_VERSIONS):
+ key=f"obj-{i}.txt"
+ response=client.put_object(Key=key, Body=f"This is version {vid}", Bucket=source_bucket.name)
+ log.info(f"Instance {key} ({response['ResponseMetadata']['HTTPHeaders']['x-amz-version-id']}) created @ {client.meta.endpoint_url}")
+ log.info(f"{NUM_VERSIONS} versions created for object {key} on {client.meta.endpoint_url}")
+
+
+ # list all objects/versions in the zone and check that their versions are listed in the
+ # chronological order - from the newest to the oldest;
+ def check_modification_history(zone) -> Dict[str, int]:
+ response = bucket_list(zone, source_bucket.name)
+ obj_versions = parse_bucket_list_output(response)
+
+ # use this map to keep track of status checks for each object
+ obj_status = {f"obj-{oid}.txt" : -1 for oid in range(NUM_OBJECTS)}
+ expected_num_versions_per_obj = NUM_VERSIONS * len(zonegroup_conns.rw_zones)
+ for obj_name, versions in obj_versions.items():
+ log.info(f"Checking object {obj_name}'s' history - there are {len(versions)} versions")
+ assert len(versions) == expected_num_versions_per_obj, \
+ f"Number of versions ({len(versions)}) for {obj_name} does not match the expected number {expected_num_versions_per_obj}"
+ prev_version = versions[0]
+ out_of_order_versions = 0
+ for idx in range(1, len(versions)):
+ version = versions[idx]
+ # prior to the timestamp-based epochs we used integer based epochs which are not based on the modification time of the
+ # object; so whenever there is an epoch collision we might see that an older object might appear in the bucket
+ # listing before the newer one - which is the problem which timestamp-based epochs solve (by increasing epoch
+ # resolution significantly thus making epoch collisions virtually impossible); nevertheless, if the 2 versions
+ # were created at the exact same time we still rely on the version id to determine which one appears first
+ # (more recent) in the modification history even though both have the same timestamp;
+ if version.ver_epoch == prev_version.ver_epoch and version.mtime > prev_version.mtime:
+ log.error(f"Version {obj_name}:{version.instance} is newer than {obj_name}:{prev_version.instance} but is listed later in the history")
+ out_of_order_versions += 1
+ elif version.ver_epoch + 1 == prev_version.ver_epoch:
+ log.warning(f"Version {version.instance} is just 1ns apart from {prev_version.instance}")
+
+ prev_version = version
+
+ obj_status[obj_name] = out_of_order_versions
+ if out_of_order_versions==0:
+ log.info(f"{obj_name}: OK")
+ else:
+ log.warning(f"{obj_name}: {out_of_order_versions} versions are out of order")
+
+ return obj_status
+
+ def set_bucket_versioning(state: bool):
+ primary.s3_client.put_bucket_versioning(Bucket=source_bucket.name, VersioningConfiguration=
+ {'Status': 'Enabled' if state else 'Disabled'})
+
+ set_bucket_versioning(True)
+
+ # wait for those changes to propagate to the secondary zone;
+ zonegroup_meta_checkpoint(zonegroup)
+
+ threads = [threading.Thread(target=create_bucket_objects, args=[zone]) for zone in zonegroup_conns.rw_zones]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ # polls bucket sync status for all zones in the zonegroup until they catch up with the checkpoint
+ zonegroup_bucket_checkpoint(zonegroup_conns, source_bucket.name)
+
+ # now check modification history in each zone
+ threads = [threading.Thread(target=check_modification_history, args=[zone.zone])
+ for zone in zonegroup_conns.rw_zones]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ # check the results
+ for zone in zonegroup_conns.rw_zones:
+ log.info(f"Checking modification history for zone {zone.name}")
+ obj_status = check_modification_history(zone.zone)
+ for name, out_of_order_versions in obj_status.items():
+ if out_of_order_versions == 0:
+ log.info(f"Object {name}: history OK")
+ elif out_of_order_versions == -1:
+ assert False, f"Object {name}: has no versions"
+ else:
+ assert False, f"Object {name}: found {out_of_order_versions} versions which are out of order"
+
+
def run_per_zonegroup(func):
def wrapper(*args, **kwargs):
for zonegroup in realm.current_period.zonegroups:
assert check_all_buckets_dont_exist(zcA, buckets)
assert check_all_buckets_dont_exist(zcB, buckets)
-
remove_sync_policy_group(c1, "sync-group")
return
assert check_all_buckets_dont_exist(zcA, buckets)
assert check_all_buckets_dont_exist(zcB, buckets)
-
remove_sync_policy_group(c1, "sync-group")
return
bucket = get_bucket(zcC, bucketA.name)
check_objects_not_exist(bucket, objnameA)
-
# verify that objnameB is not synced to either zoneA or zoneB
bucket = get_bucket(zcA, bucketA.name)
check_objects_not_exist(bucket, objnameB)
assert check_all_buckets_dont_exist(zcC, buckets)
remove_sync_policy_group(c1, "sync-group")
-
return
@attr('sync_policy')
zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
zone_data_checkpoint(zoneB, zoneA)
-
# verify that objnameA is synced to zoneB
bucket = get_bucket(zcB, bucketA.name)
check_object_exists(bucket, objnameA)
def test_bucket_replication_source_forbidden_objretention():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
-
source = zonegroup_conns.rw_zones[0]
dest = zonegroup_conns.rw_zones[1]
-
source_bucket_name = gen_bucket_name()
source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True)
dest_bucket = dest.create_bucket(gen_bucket_name())
zonegroup_meta_checkpoint(zonegroup)
-
# create replication configuration
source.s3_client.put_bucket_replication(
Bucket=source_bucket_name,
}]
}
)
-
# Deny myself from fetching the source object's retention for replication
source.s3_client.put_bucket_policy(
Bucket=source_bucket_name,
})
)
zonegroup_meta_checkpoint(zonegroup)
-
# upload an object and wait for sync.
objname = 'dummy'
k = new_key(source, source_bucket_name, objname)
def test_bucket_replication_source_forbidden_legalhold():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
-
source = zonegroup_conns.rw_zones[0]
dest = zonegroup_conns.rw_zones[1]
-
source_bucket_name = gen_bucket_name()
source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True)
dest_bucket = dest.create_bucket(gen_bucket_name())
zonegroup_meta_checkpoint(zonegroup)
-
# create replication configuration
source.s3_client.put_bucket_replication(
Bucket=source_bucket_name,
}]
}
)
-
# Deny myself from fetching the source object's retention for replication
source.s3_client.put_bucket_policy(
Bucket=source_bucket_name,
})
)
zonegroup_meta_checkpoint(zonegroup)
-
# upload an object and wait for sync.
objname = 'dummy'
k = new_key(source, source_bucket_name, objname)