From 4195486c8edc68bfdf5cd4d87e04ef67f35a9364 Mon Sep 17 00:00:00 2001 From: Igor Gomon Date: Tue, 11 Mar 2025 18:41:41 +0000 Subject: [PATCH] rgw: implement timestamp-based epochs. Signed-off-by: Igor Gomon --- src/cls/rgw/cls_rgw.cc | 304 ++++++++++++++++------------- src/cls/rgw/cls_rgw_ops.h | 7 + src/cls/rgw/cls_rgw_types.cc | 3 + src/cls/rgw/cls_rgw_types.h | 15 ++ src/common/strtol.cc | 19 ++ src/common/strtol.h | 2 + src/rgw/driver/rados/rgw_rados.cc | 11 +- src/test/rgw/rgw_multi/tests.py | 175 +++++++++++++++-- src/test/rgw/rgw_multi/tests_es.py | 1 - 9 files changed, 382 insertions(+), 155 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 53aa89ff6362..673f218e3c49 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -321,19 +321,25 @@ static int get_obj_vals(cls_method_context_t hctx, */ static void decreasing_str(uint64_t num, string *str) { + // This buffer must be big enough to hold the string representation of + // the largest unsigned 64-bit integer value (+ 1 more char). char buf[32]; if (num < 0x10) { /* 16 */ - snprintf(buf, sizeof(buf), "9%02lld", 15 - (long long)num); + snprintf(buf, sizeof(buf), "9%02" PRIu64, 0xF - num); } else if (num < 0x100) { /* 256 */ - snprintf(buf, sizeof(buf), "8%03lld", 255 - (long long)num); + snprintf(buf, sizeof(buf), "8%03" PRIu64, 0xFF - num); } else if (num < 0x1000) /* 4096 */ { - snprintf(buf, sizeof(buf), "7%04lld", 4095 - (long long)num); + snprintf(buf, sizeof(buf), "7%04" PRIu64, 0xFFF - num); } else if (num < 0x10000) /* 65536 */ { - snprintf(buf, sizeof(buf), "6%05lld", 65535 - (long long)num); + snprintf(buf, sizeof(buf), "6%05" PRIu64, 0xFFFF - num); } else if (num < 0x100000000) /* 4G */ { - snprintf(buf, sizeof(buf), "5%010lld", 0xFFFFFFFF - (long long)num); + snprintf(buf, sizeof(buf), "5%010" PRIu64, 0xFFFFFFFF - num); + } else if (num < 0x10000000000) /* 1T */ { + snprintf(buf, sizeof(buf), "4%015" PRIu64, 0xFFFFFFFFFF - num); + } else if (num < 0x1000000000000) /* 281T */ { + snprintf(buf, sizeof(buf), "3%018" PRIu64, 0xFFFFFFFFFFFF - num); } else { - snprintf(buf, sizeof(buf), "4%020lld", (long long)-num); + snprintf(buf, sizeof(buf), "2%020" PRIu64, std::numeric_limits::max() - num); } *str = buf; @@ -498,11 +504,21 @@ static int decode_list_index_key(const string& index_key, cls_rgw_obj_key *key, if (val[0] == 'i') { key->instance = val.substr(1); } else if (val[0] == 'v') { + // what we are dealing here with is the string representation of the versioned epoch (as converted to by + // decreasing_str() func); the first char is always 'v' to indicate that it is the versioned epoch; the + // second char is a digit in [9-2] range that is used to separate value ranges - in order to make + // string representation sort in the opposite direction and to decrease string length - to speed up + // the lexicographical comparison; hence +2 (1 for the value indicator and one for the range prefix); string err; - const char *s = val.c_str() + 1; - *ver = strict_strtoll(s, 10, &err); - if (!err.empty()) { - CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s); + if (val.size() > 2) { + const char *s = val.c_str() + 2; + *ver = strict_strtoull(s, 10, &err); + if (!err.empty()) { + CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s); + return -EIO; + } + } else { + CLS_LOG(0, "ERROR: %s: bad index_key (%s): empty val", __func__, escape_str(index_key).c_str()); return -EIO; } } @@ -1627,19 +1643,20 @@ public: return 0; } - bool start_modify(uint64_t candidate_epoch) { - if (candidate_epoch) { - if (candidate_epoch < olh_data_entry.epoch) { - return false; /* olh cannot be modified, old epoch */ - } - olh_data_entry.epoch = candidate_epoch; - } else { - if (olh_data_entry.epoch == 0) { - olh_data_entry.epoch = 2; /* versioned epoch should start with 2, 1 is reserved to converted plain entries */ - } else { - olh_data_entry.epoch++; - } + /** + * This is called when a new instance of an object (in a versioned bucket) is added (via PUT) or an existing instance is removed. + * A part of that process is to update the OLH entry (in the bucket index) with the correct modification timestamp (epoch). + * This timestamp is then used later on to guard against OLH updates for add/remove instance ops that happened *before* + * the latest op that updated the OLH entry. + * @param candidate_epoch - this is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync; + */ + bool start_modify (uint64_t candidate_epoch) { + // only update the olh.epoch if it is newer than the current one. + if (candidate_epoch < olh_data_entry.epoch) { + return false; /* olh cannot be modified, old epoch */ } + + olh_data_entry.epoch = candidate_epoch; return true; } @@ -1889,81 +1906,95 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer const uint64_t prev_epoch = olh.get_epoch(); - if (!olh.start_modify(op.olh_epoch)) { - ret = obj.write(op.olh_epoch, false, header); - if (ret < 0) { - return ret; - } - if (removing) { - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch); - } - return write_header_while_logrecord(hctx, header); - } - - // promote this version to current if it's a newer epoch, or if it matches the - // current epoch and sorts after the current instance - const bool promote = (olh.get_epoch() > prev_epoch) || - (olh.get_epoch() == prev_epoch && - olh.get_entry().key.instance >= op.key.instance); + // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync; + uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : + duration_cast(obj.mtime().time_since_epoch()).count(); + if (olh.start_modify(candidate_epoch)) { + // promote this version to current if it's a newer epoch, or if it matches the + // current epoch and sorts after the current instance + const bool promote = (olh.get_epoch() > prev_epoch) || + (olh.get_epoch() == prev_epoch && + olh.get_entry().key.instance >= op.key.instance); + const bool epoch_collision = olh.get_epoch() == prev_epoch; + + if (olh_found) { + const string &olh_tag = olh.get_tag(); + if (op.olh_tag != olh_tag) { + if (!olh.pending_removal()) { + CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str()); + return -ECANCELED; + } + /* if pending removal, this is a new olh instance */ + olh.set_tag(op.olh_tag); + } + if (epoch_collision) { + auto const &s_key = op.key.to_string(); + CLS_LOG(1, "NOTICE: versioned epoch collision (%lu) for object %s", prev_epoch, s_key.c_str()); + } + if (promote && olh.exists()) { + rgw_bucket_olh_entry &olh_entry = olh.get_entry(); + /* found olh, previous instance is no longer the latest, need to update */ + if (!(olh_entry.key == op.key)) { + BIVerObjEntry old_obj(hctx, olh_entry.key); - if (olh_found) { - const string& olh_tag = olh.get_tag(); - if (op.olh_tag != olh_tag) { - if (!olh.pending_removal()) { - CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str()); - return -ECANCELED; + ret = old_obj.demote_current(header); + if (ret < 0) { + CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret); + return ret; + } + } + } + olh.set_pending_removal(false); + } else { + bool instance_only = (op.key.instance.empty() && op.delete_marker); + cls_rgw_obj_key key(op.key.name); + ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header); + if (ret < 0) { + CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); + return ret; } - /* if pending removal, this is a new olh instance */ olh.set_tag(op.olh_tag); + if (op.key.instance.empty()) { + obj.set_epoch(1); + } } - if (promote && olh.exists()) { - rgw_bucket_olh_entry& olh_entry = olh.get_entry(); - /* found olh, previous instance is no longer the latest, need to update */ - if (!(olh_entry.key == op.key)) { - BIVerObjEntry old_obj(hctx, olh_entry.key); - ret = old_obj.demote_current(header); - if (ret < 0) { - CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret); - return ret; - } - } + /* update the olh log */ + olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker); + if (removing) { + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false); } - olh.set_pending_removal(false); - } else { - bool instance_only = (op.key.instance.empty() && op.delete_marker); - cls_rgw_obj_key key(op.key.name); - ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header); + + if (promote) { + olh.update(op.key, op.delete_marker); + } + olh.set_exists(true); + + /* write the instance and list entries */ + ret = obj.write(olh.get_epoch(), promote, header); if (ret < 0) { - CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); return ret; } - olh.set_tag(op.olh_tag); - if (op.key.instance.empty()){ - obj.set_epoch(1); - } - } - /* update the olh log */ - olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker); - if (removing) { - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false); + ret = olh.write(header); } + else { + ret = obj.write(candidate_epoch, false, header); + if (ret < 0) { + return ret; + } - if (promote) { - olh.update(op.key, op.delete_marker); - } - olh.set_exists(true); + // no point here in adding CLS_RGW_OLH_OP_LINK_OLH to the pending log as we know that + // the epoch is already stale compared to the current - so no point in applying it; - ret = olh.write(header); - if (ret < 0) { - CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret); - return ret; + if (removing) { + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch); + ret = olh.write(header); + } } - /* write the instance and list entries */ - ret = obj.write(olh.get_epoch(), promote, header); if (ret < 0) { + CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret); return ret; } @@ -1978,7 +2009,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer rgw_bucket_dir_entry& entry = obj.get_dir_entry(); rgw_bucket_entry_ver ver; - ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch()); + ver.epoch = candidate_epoch; string *powner = NULL; string *powner_display_name = NULL; @@ -2061,73 +2092,76 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, obj.set_epoch(1); } - if (!olh.start_modify(op.olh_epoch)) { - ret = obj.unlink_list_entry(header); - if (ret < 0) { - return ret; - } - - if (obj.is_delete_marker()) { - return 0; - } + // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync; + uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : + duration_cast(real_clock::now().time_since_epoch()).count(); + if (olh.start_modify(candidate_epoch)) { + rgw_bucket_olh_entry &olh_entry = olh.get_entry(); + cls_rgw_obj_key &olh_key = olh_entry.key; + CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__, + olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker); + + if (olh_key == dest_key) { + /* this is the current head, need to update the OLH! */ + cls_rgw_obj_key next_key; + bool found = false; + ret = obj.find_next_key(&next_key, &found); + if (ret < 0) { + CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret); + return ret; + } - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch); - return olh.write(header); - } + if (found) { + BIVerObjEntry next(hctx, next_key); + ret = next.write(olh.get_epoch(), true, header); + if (ret < 0) { + CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret); + return ret; + } - rgw_bucket_olh_entry& olh_entry = olh.get_entry(); - cls_rgw_obj_key& olh_key = olh_entry.key; - CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__, - olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker); + CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__, + next_key.name.c_str(), next_key.instance.c_str(), (int) next.is_delete_marker()); - if (olh_key == dest_key) { - /* this is the current head, need to update the OLH! */ - cls_rgw_obj_key next_key; - bool found = false; - ret = obj.find_next_key(&next_key, &found); - if (ret < 0) { - CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret); - return ret; + olh.update(next_key, next.is_delete_marker()); + olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker()); + } else { + // next_key is empty, but we need to preserve its name in case this entry + // gets resharded, because this key is used for hash placement + next_key.name = dest_key.name; + olh.update(next_key, false); + olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false); + olh.set_exists(false); + olh.set_pending_removal(true); + } } - if (found) { - BIVerObjEntry next(hctx, next_key); - ret = next.write(olh.get_epoch(), true, header); + if (!obj.is_delete_marker()) { + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false); + } else { + /* this is a delete marker, it's our responsibility to remove its + * instance entry */ + ret = obj.unlink(header, op.key); if (ret < 0) { - CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret); return ret; } - - CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__, - next_key.name.c_str(), next_key.instance.c_str(), (int)next.is_delete_marker()); - - olh.update(next_key, next.is_delete_marker()); - olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker()); - } else { - // next_key is empty, but we need to preserve its name in case this entry - // gets resharded, because this key is used for hash placement - next_key.name = dest_key.name; - olh.update(next_key, false); - olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false); - olh.set_exists(false); - olh.set_pending_removal(true); } - } - if (!obj.is_delete_marker()) { - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false); - } else { - /* this is a delete marker, it's our responsibility to remove its - * instance entry */ - ret = obj.unlink(header, op.key); + ret = obj.unlink_list_entry(header); if (ret < 0) { return ret; } } + else { + ret = obj.unlink_list_entry(header); + if (ret < 0) { + return ret; + } - ret = obj.unlink_list_entry(header); - if (ret < 0) { - return ret; + if (obj.is_delete_marker()) { + return 0; + } + + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch); } ret = olh.write(header); @@ -2144,7 +2178,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } rgw_bucket_entry_ver ver; - ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch()); + ver.epoch = candidate_epoch; real_time mtime = obj.mtime(); /* mtime has no real meaning in * instance removal context */ diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 225df29fe510..9d0267acf4b0 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -232,9 +232,16 @@ WRITE_CLASS_ENCODER(rgw_cls_link_olh_op) struct rgw_cls_unlink_instance_op { cls_rgw_obj_key key; std::string op_tag; + // this represents a remote epoch during multisite sync uint64_t olh_epoch; bool log_op; uint16_t bilog_flags; + // cls ops include olh_tag so the OLH class code can guard sensitive updates—only proceed if op.olh_tag equals + // the OLH’s stored tag. If it doesn’t, the op fails and the caller refreshes state/retries. + // for context: in real clusters, out‑of‑order replication or topology changes can recreate/move an OLH + // (eg, resharding or certain multisite flows). The tag changes with that new OLH “generation,” so stale + // writers carrying the old tag get refused instead of overwriting the new state. A concrete example of failures + // tied to OLH attributes shows how wrong attributes/tags cause bad GET behavior, which is why the guard exists. std::string olh_tag; rgw_zone_set zones_trace; diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index 9fd60aaff3f3..bfcd42c7ff31 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -465,6 +465,9 @@ void rgw_bucket_olh_entry::dump(Formatter *f) const encode_json("key", key, f); encode_json("delete_marker", delete_marker, f); encode_json("epoch", epoch, f); + ceph::real_time tp {std::chrono::nanoseconds (epoch)}; + utime_t ut(tp); + encode_json("epoch_timestamp", ut, f); encode_json("pending_log", pending_log, f); encode_json("tag", tag, f); encode_json("exists", exists, f); diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 1bfcbcc97b89..e41d03d4458b 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -511,14 +511,20 @@ WRITE_CLASS_ENCODER(rgw_cls_bi_entry) enum OLHLogOp { CLS_RGW_OLH_OP_UNKNOWN = 0, + // link OLH entry to a specific object version CLS_RGW_OLH_OP_LINK_OLH = 1, + // deletes OLH object from the data pool and removes OLH entry from the bucket index CLS_RGW_OLH_OP_UNLINK_OLH = 2, /* object does not exist */ + // remove a specific instance of an object, such as . CLS_RGW_OLH_OP_REMOVE_INSTANCE = 3, }; struct rgw_bucket_olh_log_entry { uint64_t epoch; OLHLogOp op; + // Once the OLH Log Entries are processed for a given epoch (by apply_olh_log()) the corresponding olh.pending.* + // xattrs are removed from the corresponding OLH object (in the data pool). The pending xattrs to be removed + // are those that match op_tag. std::string op_tag; cls_rgw_obj_key key; bool delete_marker; @@ -555,8 +561,17 @@ WRITE_CLASS_ENCODER(rgw_bucket_olh_log_entry) struct rgw_bucket_olh_entry { cls_rgw_obj_key key; bool delete_marker; + // the epoch represents the latest modification timestamp for the S3 object identified by the key; uint64_t epoch; + // epoch -> op list mapping: stores pending modifications to the S3 object identified by the key; + // this is basically a per-S3-object WAL whose main purpose is crash safety and idempotency; operations + // PUT/DELETE that modify S3 object history write to this log first; it is being + // replayed by the apply_olh_log() on the same zone; + // usually there's only 1 op per epoch key but more than 1 op would be associated with an epoch in case + // of versioned DELETE for the current instance: [remove instance, link] std::map > pending_log; + // unique tag for this entry; it remains the same until the entry is deleted (like when versioning + // is suspended) and then re-created (by re-enabling versioning); std::string tag; bool exists; bool pending_removal; diff --git a/src/common/strtol.cc b/src/common/strtol.cc index 0e197535b7ac..c198df4f383c 100644 --- a/src/common/strtol.cc +++ b/src/common/strtol.cc @@ -58,6 +58,25 @@ long long strict_strtoll(std::string_view str, int base, std::string *err) return ret; } +unsigned long long strict_strtoull(std::string_view str, int base, std::string *err) +{ + char *endptr; + errno = 0; /* To distinguish success/failure after call (see man page) */ + auto ret = strtoull(str.data(), &endptr, base); + if (endptr == str.data() || endptr != str.data() + str.size()) { + *err = (std::string{"Expected option value to be integer, got '"} + + std::string{str} + "'"); + return 0; + } + if (errno) { + *err = (std::string{"The option value '"} + std::string{str} + + "' seems to be invalid"); + return 0; + } + *err = ""; + return ret; +} + int strict_strtol(std::string_view str, int base, std::string *err) { long long ret = strict_strtoll(str, base, err); diff --git a/src/common/strtol.h b/src/common/strtol.h index 681ac1a290c8..9108f942a30c 100644 --- a/src/common/strtol.h +++ b/src/common/strtol.h @@ -73,6 +73,8 @@ bool strict_strtob(std::string_view str, std::string *err); long long strict_strtoll(std::string_view str, int base, std::string *err); +unsigned long long strict_strtoull(std::string_view str, int base, std::string *err); + int strict_strtol(std::string_view str, int base, std::string *err); double strict_strtod(std::string_view str, std::string *err); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 054604002754..2f7a4ada7d7f 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -6380,8 +6380,10 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, if (add_log) { r = add_datalog_entry(dpp, store->svc.datalog_rados, target->get_bucket_info(), bs->shard_id, y); - ldpp_dout(dpp, 0) << "failed to write datalog for object: r=" << r << dendl; - return r; + if (r < 0) { + ldpp_dout(dpp, 0) << "failed to write datalog for object: r=" << r << dendl; + return r; + } } return 0; @@ -8948,6 +8950,11 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp, ldpp_dout(dpp, 20) << "olh_log_entry: epoch=" << iter->first << " op=" << (int)entry.op << " key=" << entry.key.name << "[" << entry.key.instance << "] " << (entry.delete_marker ? "(delete)" : "") << dendl; + + if (link_epoch == iter->first) + ldpp_dout(dpp, 1) << "apply_olh_log epoch collision detected for " << entry.key + << "; incoming op: " << entry.op << "(" << entry.op_tag << ")" << dendl; + switch (entry.op) { case CLS_RGW_OLH_OP_REMOVE_INSTANCE: remove_instances.push_back(entry.key); diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 1d4ad5d47894..d339a74737a2 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -6,6 +6,9 @@ import time import logging import errno import dateutil.parser +from datetime import datetime +import threading +from typing import Dict, List, Any from itertools import combinations from itertools import zip_longest @@ -110,6 +113,12 @@ def bilog_list(zone, bucket, args = None): bilog, _ = zone.cluster.admin(cmd, read_only=True) return json.loads(bilog) +def bucket_list(zone, bucket, args = None): + cmd = ['bucket', 'list', '--bucket', bucket, '--max-entries', '100000', '--uid', user.name] + (args or []) + cmd += ['--tenant', config.tenant] if config.tenant else [] + output, _ = zone.cluster.admin(cmd, read_only=True) + return json.loads(output) + def bilog_autotrim(zone, args = None): cmd = ['bilog', 'autotrim'] + (args or []) + zone.zone_args() zone.cluster.admin(cmd, debug_rgw=20) @@ -1826,7 +1835,7 @@ def test_bucket_log_trim_after_delete_bucket_primary_reshard(): # run bilog trim twice on primary zone where the bucket was resharded bilog_autotrim(primary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],) - + for zonegroup in realm.current_period.zonegroups: zonegroup_conns = ZonegroupConns(zonegroup) for zone in zonegroup_conns.zones: @@ -2315,7 +2324,7 @@ def test_assume_role_after_sync(): log.info(f'checking if zone: {zone.name} has role: {role_name}') assert(zone.has_role(role_name)) log.info(f'success, zone: {zone.name} has role: {role_name}') - + for zone in zonegroup_conns.zones: if zone == zonegroup_conns.master_zone: log.info(f'creating bucket in primary zone') @@ -3971,6 +3980,153 @@ def test_bucket_create_location_constraint(): CreateBucketConfiguration={'LocationConstraint': zg.name}) assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400 +def test_timestamp_based_epochs(): + """ + test_timestamp_based_epochs: + the test generates objects/instance in both zones: for each of NUM_OBJECTS NUM_VERSIONS are generated; + then it waits for the replication to finish and then lists objects/instances in both zones and checks + that the instances there are listed are in chronological order, with the expectation that without + time-based epochs the listed order of object versions won't be chronological; with the time-based epochs + the order should be strictly chronological + """ + class ObjVersion: + def __init__ (self, name: str, instance: str, mtime: datetime, ver_epoch: int): + self.name = name + self.instance = instance + self.mtime = mtime + self.ver_epoch = ver_epoch + + def __eq__ (self, other): + return (self.name == other.name and + self.instance == other.instance and + self.mtime == other.mtime and + self.ver_epoch == other.ver_epoch) + + def parse_bucket_list_output (data: Any) -> Dict[str, List[ObjVersion]]: + """ + Parses output of the 'radosgw-admin bucket-list --bucket --format json' command. + :param output: + :return: + """ + if not isinstance(data, list): + raise ValueError("Expected a list of entries in JSON input") + + results: Dict[str, List[ObjVersion]] = {} + for entry in data: + if not isinstance(entry, dict): + continue + + name = entry["name"] + instance = entry["instance"] + mtime = entry.get("meta", {}).get("mtime") + ver_epoch = entry["versioned_epoch"] + + obj_ver= ObjVersion(name, instance, mtime, ver_epoch) + if results.get(name) is None: + results[name] = [] + results[name].append(obj_ver) + + return results + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + + NUM_OBJECTS = 10 + NUM_VERSIONS = 100 + + source_bucket = primary.create_bucket(gen_bucket_name()) + log.info('created bucket=%s', source_bucket.name) + + def create_bucket_objects (zone): + client = zone.s3_client + log.info(f"Creating objects for {client.meta.endpoint_url} in bucket {source_bucket.name}") + for i in range(0, NUM_OBJECTS): + for vid in range(0, NUM_VERSIONS): + key=f"obj-{i}.txt" + response=client.put_object(Key=key, Body=f"This is version {vid}", Bucket=source_bucket.name) + log.info(f"Instance {key} ({response['ResponseMetadata']['HTTPHeaders']['x-amz-version-id']}) created @ {client.meta.endpoint_url}") + log.info(f"{NUM_VERSIONS} versions created for object {key} on {client.meta.endpoint_url}") + + + # list all objects/versions in the zone and check that their versions are listed in the + # chronological order - from the newest to the oldest; + def check_modification_history(zone) -> Dict[str, int]: + response = bucket_list(zone, source_bucket.name) + obj_versions = parse_bucket_list_output(response) + + # use this map to keep track of status checks for each object + obj_status = {f"obj-{oid}.txt" : -1 for oid in range(NUM_OBJECTS)} + expected_num_versions_per_obj = NUM_VERSIONS * len(zonegroup_conns.rw_zones) + for obj_name, versions in obj_versions.items(): + log.info(f"Checking object {obj_name}'s' history - there are {len(versions)} versions") + assert len(versions) == expected_num_versions_per_obj, \ + f"Number of versions ({len(versions)}) for {obj_name} does not match the expected number {expected_num_versions_per_obj}" + prev_version = versions[0] + out_of_order_versions = 0 + for idx in range(1, len(versions)): + version = versions[idx] + # prior to the timestamp-based epochs we used integer based epochs which are not based on the modification time of the + # object; so whenever there is an epoch collision we might see that an older object might appear in the bucket + # listing before the newer one - which is the problem which timestamp-based epochs solve (by increasing epoch + # resolution significantly thus making epoch collisions virtually impossible); nevertheless, if the 2 versions + # were created at the exact same time we still rely on the version id to determine which one appears first + # (more recent) in the modification history even though both have the same timestamp; + if version.ver_epoch == prev_version.ver_epoch and version.mtime > prev_version.mtime: + log.error(f"Version {obj_name}:{version.instance} is newer than {obj_name}:{prev_version.instance} but is listed later in the history") + out_of_order_versions += 1 + elif version.ver_epoch + 1 == prev_version.ver_epoch: + log.warning(f"Version {version.instance} is just 1ns apart from {prev_version.instance}") + + prev_version = version + + obj_status[obj_name] = out_of_order_versions + if out_of_order_versions==0: + log.info(f"{obj_name}: OK") + else: + log.warning(f"{obj_name}: {out_of_order_versions} versions are out of order") + + return obj_status + + def set_bucket_versioning(state: bool): + primary.s3_client.put_bucket_versioning(Bucket=source_bucket.name, VersioningConfiguration= + {'Status': 'Enabled' if state else 'Disabled'}) + + set_bucket_versioning(True) + + # wait for those changes to propagate to the secondary zone; + zonegroup_meta_checkpoint(zonegroup) + + threads = [threading.Thread(target=create_bucket_objects, args=[zone]) for zone in zonegroup_conns.rw_zones] + for t in threads: + t.start() + for t in threads: + t.join() + + # polls bucket sync status for all zones in the zonegroup until they catch up with the checkpoint + zonegroup_bucket_checkpoint(zonegroup_conns, source_bucket.name) + + # now check modification history in each zone + threads = [threading.Thread(target=check_modification_history, args=[zone.zone]) + for zone in zonegroup_conns.rw_zones] + for t in threads: + t.start() + for t in threads: + t.join() + + # check the results + for zone in zonegroup_conns.rw_zones: + log.info(f"Checking modification history for zone {zone.name}") + obj_status = check_modification_history(zone.zone) + for name, out_of_order_versions in obj_status.items(): + if out_of_order_versions == 0: + log.info(f"Object {name}: history OK") + elif out_of_order_versions == -1: + assert False, f"Object {name}: has no versions" + else: + assert False, f"Object {name}: found {out_of_order_versions} versions which are out of order" + + def run_per_zonegroup(func): def wrapper(*args, **kwargs): for zonegroup in realm.current_period.zonegroups: @@ -4712,7 +4868,6 @@ def test_bucket_delete_with_bucket_sync_policy_directional(): assert check_all_buckets_dont_exist(zcA, buckets) assert check_all_buckets_dont_exist(zcB, buckets) - remove_sync_policy_group(c1, "sync-group") return @@ -4791,7 +4946,6 @@ def test_bucket_delete_with_bucket_sync_policy_symmetric(): assert check_all_buckets_dont_exist(zcA, buckets) assert check_all_buckets_dont_exist(zcB, buckets) - remove_sync_policy_group(c1, "sync-group") return @@ -4924,7 +5078,6 @@ def test_delete_bucket_with_zone_opt_out(): bucket = get_bucket(zcC, bucketA.name) check_objects_not_exist(bucket, objnameA) - # verify that objnameB is not synced to either zoneA or zoneB bucket = get_bucket(zcA, bucketA.name) check_objects_not_exist(bucket, objnameB) @@ -4960,7 +5113,6 @@ def test_delete_bucket_with_zone_opt_out(): assert check_all_buckets_dont_exist(zcC, buckets) remove_sync_policy_group(c1, "sync-group") - return @attr('sync_policy') @@ -5019,7 +5171,6 @@ def test_bucket_delete_with_sync_policy_object_prefix(): zone_bucket_checkpoint(zoneA, zoneB, bucketA.name) zone_data_checkpoint(zoneB, zoneA) - # verify that objnameA is synced to zoneB bucket = get_bucket(zcB, bucketA.name) check_object_exists(bucket, objnameA) @@ -5654,15 +5805,12 @@ def test_bucket_replication_source_allow_either_getobjectversion_or_getobjectver def test_bucket_replication_source_forbidden_objretention(): zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) - source = zonegroup_conns.rw_zones[0] dest = zonegroup_conns.rw_zones[1] - source_bucket_name = gen_bucket_name() source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True) dest_bucket = dest.create_bucket(gen_bucket_name()) zonegroup_meta_checkpoint(zonegroup) - # create replication configuration source.s3_client.put_bucket_replication( Bucket=source_bucket_name, @@ -5677,7 +5825,6 @@ def test_bucket_replication_source_forbidden_objretention(): }] } ) - # Deny myself from fetching the source object's retention for replication source.s3_client.put_bucket_policy( Bucket=source_bucket_name, @@ -5692,7 +5839,6 @@ def test_bucket_replication_source_forbidden_objretention(): }) ) zonegroup_meta_checkpoint(zonegroup) - # upload an object and wait for sync. objname = 'dummy' k = new_key(source, source_bucket_name, objname) @@ -5712,15 +5858,12 @@ def test_bucket_replication_source_forbidden_objretention(): def test_bucket_replication_source_forbidden_legalhold(): zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) - source = zonegroup_conns.rw_zones[0] dest = zonegroup_conns.rw_zones[1] - source_bucket_name = gen_bucket_name() source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True) dest_bucket = dest.create_bucket(gen_bucket_name()) zonegroup_meta_checkpoint(zonegroup) - # create replication configuration source.s3_client.put_bucket_replication( Bucket=source_bucket_name, @@ -5735,7 +5878,6 @@ def test_bucket_replication_source_forbidden_legalhold(): }] } ) - # Deny myself from fetching the source object's retention for replication source.s3_client.put_bucket_policy( Bucket=source_bucket_name, @@ -5750,7 +5892,6 @@ def test_bucket_replication_source_forbidden_legalhold(): }) ) zonegroup_meta_checkpoint(zonegroup) - # upload an object and wait for sync. objname = 'dummy' k = new_key(source, source_bucket_name, objname) diff --git a/src/test/rgw/rgw_multi/tests_es.py b/src/test/rgw/rgw_multi/tests_es.py index 08c11718bd04..2ee2b9423003 100644 --- a/src/test/rgw/rgw_multi/tests_es.py +++ b/src/test/rgw/rgw_multi/tests_es.py @@ -4,7 +4,6 @@ import logging import boto import boto.s3.connection -import datetime import dateutil from itertools import zip_longest # type: ignore -- 2.47.3