From: Gabriel BenHanokh Date: Thu, 13 Nov 2025 10:02:01 +0000 (+0000) Subject: rgw/dedup: Add support for RGW versions. X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=00b36052d0fd982e83c0cd229eab0cc3c3692530;p=ceph.git rgw/dedup: Add support for RGW versions. Dedup code will scan all instances of versioned objects and include them in the dedup process Signed-off-by: Gabriel BenHanokh --- diff --git a/src/rgw/driver/rados/rgw_dedup.cc b/src/rgw/driver/rados/rgw_dedup.cc index 4b2c1de66e6..d6e0804bf34 100644 --- a/src/rgw/driver/rados/rgw_dedup.cc +++ b/src/rgw/driver/rados/rgw_dedup.cc @@ -418,10 +418,11 @@ namespace rgw::dedup { const rgw::sal::Bucket *p_bucket, const parsed_etag_t *p_parsed_etag, const std::string &obj_name, + const std::string &instance, uint64_t obj_size, const std::string &storage_class) { - disk_record_t rec(p_bucket, obj_name, p_parsed_etag, obj_size, storage_class); + disk_record_t rec(p_bucket, obj_name, p_parsed_etag, instance, obj_size, storage_class); // First pass using only ETAG and size taken from bucket-index rec.s.flags.set_fastlane(); @@ -682,10 +683,10 @@ namespace rgw::dedup { //--------------------------------------------------------------------------- static int get_ioctx(const DoutPrefixProvider* const dpp, rgw::sal::Driver* driver, - RGWRados* rados, + rgw::sal::RadosStore* store, const disk_record_t *p_rec, librados::IoCtx *p_ioctx, - std::string *oid) + std::string *p_oid) { unique_ptr bucket; { @@ -698,23 +699,12 @@ namespace rgw::dedup { } } - build_oid(p_rec->bucket_id, p_rec->obj_name, oid); - //ldpp_dout(dpp, 0) << __func__ << "::OID=" << oid << " || bucket_id=" << bucket_id << dendl; - rgw_pool data_pool; - rgw_obj obj{bucket->get_key(), *oid}; - if (!rados->get_obj_data_pool(bucket->get_placement_rule(), obj, &data_pool)) { - ldpp_dout(dpp, 1) << __func__ << "::failed to get data pool for bucket " - << bucket->get_name() << dendl; - return -EIO; - } - int ret = rgw_init_ioctx(dpp, rados->get_rados_handle(), data_pool, *p_ioctx); - if (ret < 0) { - ldpp_dout(dpp, 1) << __func__ << "::ERR: failed to get ioctx from data pool:" - << data_pool.to_str() << dendl; - return -EIO; - } - - return 0; + string dummy_locator; + const rgw_obj_index_key key(p_rec->obj_name, p_rec->instance); + rgw_obj obj(bucket->get_key(), key); + get_obj_bucket_and_oid_loc(obj, *p_oid, dummy_locator); + RGWBucketInfo& bucket_info = bucket->get_info(); + return store->get_obj_head_ioctx(dpp, bucket_info, obj, p_ioctx); } //--------------------------------------------------------------------------- @@ -786,8 +776,8 @@ namespace rgw::dedup { std::string src_oid, tgt_oid; librados::IoCtx src_ioctx, tgt_ioctx; - int ret1 = get_ioctx(dpp, driver, rados, p_src_rec, &src_ioctx, &src_oid); - int ret2 = get_ioctx(dpp, driver, rados, p_tgt_rec, &tgt_ioctx, &tgt_oid); + int ret1 = get_ioctx(dpp, driver, store, p_src_rec, &src_ioctx, &src_oid); + int ret2 = get_ioctx(dpp, driver, store, p_tgt_rec, &tgt_ioctx, &tgt_oid); if (unlikely(ret1 != 0 || ret2 != 0)) { ldpp_dout(dpp, 1) << __func__ << "::ERR: failed get_ioctx()" << dendl; return (ret1 ? ret1 : ret2); @@ -1092,7 +1082,7 @@ namespace rgw::dedup { << p_rec->obj_name << ")" << dendl; return 0; } - + p_obj->set_instance(p_rec->instance); d_ctl.metadata_access_throttle.acquire(); ret = p_obj->get_obj_attrs(null_yield, dpp); if (unlikely(ret < 0)) { @@ -1190,7 +1180,7 @@ namespace rgw::dedup { //--------------------------------------------------------------------------- static int write_blake3_object_attribute(const DoutPrefixProvider* const dpp, rgw::sal::Driver* driver, - RGWRados* rados, + rgw::sal::RadosStore *store, const disk_record_t *p_rec) { bufferlist etag_bl; @@ -1203,7 +1193,7 @@ namespace rgw::dedup { std::string oid; librados::IoCtx ioctx; - int ret = get_ioctx(dpp, driver, rados, p_rec, &ioctx, &oid); + int ret = get_ioctx(dpp, driver, store, p_rec, &ioctx, &oid); if (unlikely(ret != 0)) { ldpp_dout(dpp, 5) << __func__ << "::ERR: failed get_ioctx()" << dendl; return ret; @@ -1298,10 +1288,11 @@ namespace rgw::dedup { << "/" << src_rec.obj_name << dendl; // verify that SRC and TGT records don't refer to the same physical object // This could happen in theory if we read the same objects twice - if (src_rec.obj_name == p_tgt_rec->obj_name && src_rec.bucket_name == p_tgt_rec->bucket_name) { + if (src_rec.ref_tag == p_tgt_rec->ref_tag) { p_stats->duplicate_records++; - ldpp_dout(dpp, 10) << __func__ << "::WARN: Duplicate records for object=" - << src_rec.obj_name << dendl; + ldpp_dout(dpp, 10) << __func__ << "::WARN::REF_TAG::Duplicate records for " + << src_rec.obj_name << "::" << src_rec.ref_tag << "::" + << p_tgt_rec->obj_name << dendl; return 0; } @@ -1320,11 +1311,11 @@ namespace rgw::dedup { ldpp_dout(dpp, 10) << __func__ << "::HASH mismatch" << dendl; // TBD: set hash attributes on head objects to save calc next time if (src_rec.s.flags.hash_calculated()) { - write_blake3_object_attribute(dpp, driver, rados, &src_rec); + write_blake3_object_attribute(dpp, driver, store, &src_rec); p_stats->set_hash_attrs++; } if (p_tgt_rec->s.flags.hash_calculated()) { - write_blake3_object_attribute(dpp, driver, rados, p_tgt_rec); + write_blake3_object_attribute(dpp, driver, store, p_tgt_rec); p_stats->set_hash_attrs++; } return 0; @@ -1582,8 +1573,8 @@ namespace rgw::dedup { } return add_disk_rec_from_bucket_idx(disk_arr, p_bucket, &parsed_etag, - entry.key.name, entry.meta.size, - storage_class); + entry.key.name, entry.key.instance, + entry.meta.size, storage_class); } //--------------------------------------------------------------------------- @@ -1682,15 +1673,21 @@ namespace rgw::dedup { obj_count += result.dir.m.size(); for (auto& entry : result.dir.m) { const rgw_bucket_dir_entry& dirent = entry.second; + // make sure to advance marker in all cases! + marker = dirent.key; + ldpp_dout(dpp, 20) << __func__ << "::dirent = " << bucket->get_name() << "/" + << marker.name << "::instance=" << marker.instance << dendl; if (unlikely((!dirent.exists && !dirent.is_delete_marker()) || !dirent.pending_map.empty())) { // TBD: should we bailout ??? - ldpp_dout(dpp, 1) << __func__ << "::ERR: calling check_disk_state bucket=" - << bucket->get_name() << " entry=" << dirent.key << dendl; - // make sure we're advancing marker - marker = dirent.key; + ldpp_dout(dpp, 1) << __func__ << "::ERR: bad dirent::" << bucket->get_name() + << "/" << marker.name << "::instance=" << marker.instance << dendl; + continue; + } + else if (unlikely(dirent.is_delete_marker())) { + ldpp_dout(dpp, 20) << __func__ << "::skip delete_marker::" << bucket->get_name() + << "/" << marker.name << "::instance=" << marker.instance << dendl; continue; } - marker = dirent.key; ret = ingress_bucket_idx_single_object(disk_arr, bucket, dirent, p_worker_stats); } // TBD: advance marker only once here! diff --git a/src/rgw/driver/rados/rgw_dedup.h b/src/rgw/driver/rados/rgw_dedup.h index 44f2b91e485..b1df56249e8 100644 --- a/src/rgw/driver/rados/rgw_dedup.h +++ b/src/rgw/driver/rados/rgw_dedup.h @@ -161,6 +161,7 @@ namespace rgw::dedup { const rgw::sal::Bucket *p_bucket, const parsed_etag_t *p_parsed_etag, const std::string &obj_name, + const std::string &instance, uint64_t obj_size, const std::string &storage_class); diff --git a/src/rgw/driver/rados/rgw_dedup_cluster.h b/src/rgw/driver/rados/rgw_dedup_cluster.h index 52ddba86db3..b897de2548d 100644 --- a/src/rgw/driver/rados/rgw_dedup_cluster.h +++ b/src/rgw/driver/rados/rgw_dedup_cluster.h @@ -43,7 +43,7 @@ namespace rgw::dedup { //--------------------------------------------------------------------------- void set_shard(uint16_t shard) { - int n = snprintf(this->buff + this->prefix_len, BUFF_SIZE, "%03x", shard); + int n = snprintf(this->buff + this->prefix_len, BUFF_SIZE - this->prefix_len, "%03x", shard); this->total_len = this->prefix_len + n; } @@ -55,7 +55,7 @@ namespace rgw::dedup { inline const char* get_buff() { return this->buff; } inline unsigned get_buff_size() { return this->total_len; } private: - static const unsigned BUFF_SIZE = 15; + static const unsigned BUFF_SIZE = 16; unsigned total_len = 0; unsigned prefix_len = 0; char buff[BUFF_SIZE]; diff --git a/src/rgw/driver/rados/rgw_dedup_store.cc b/src/rgw/driver/rados/rgw_dedup_store.cc index 765b4f2b248..d2b62651c6c 100644 --- a/src/rgw/driver/rados/rgw_dedup_store.cc +++ b/src/rgw/driver/rados/rgw_dedup_store.cc @@ -36,6 +36,7 @@ namespace rgw::dedup { disk_record_t::disk_record_t(const rgw::sal::Bucket *p_bucket, const std::string &obj_name, const parsed_etag_t *p_parsed_etag, + const std::string &instance, uint64_t obj_size, const std::string &storage_class) { @@ -50,12 +51,13 @@ namespace rgw::dedup { this->s.md5_high = p_parsed_etag->md5_high; this->s.md5_low = p_parsed_etag->md5_low; this->s.obj_bytes_size = obj_size; - this->s.object_version = 0; this->bucket_id = p_bucket->get_bucket_id(); this->s.bucket_id_len = this->bucket_id.length(); this->tenant_name = p_bucket->get_tenant(); this->s.tenant_name_len = this->tenant_name.length(); + this->instance = instance; + this->s.instance_len = instance.length(); this->stor_class = storage_class; this->s.stor_class_len = storage_class.length(); @@ -86,10 +88,10 @@ namespace rgw::dedup { this->s.md5_high = CEPHTOH_64(p_rec->s.md5_high); this->s.md5_low = CEPHTOH_64(p_rec->s.md5_low); this->s.obj_bytes_size = CEPHTOH_64(p_rec->s.obj_bytes_size); - this->s.object_version = CEPHTOH_64(p_rec->s.object_version); this->s.bucket_id_len = CEPHTOH_16(p_rec->s.bucket_id_len); this->s.tenant_name_len = CEPHTOH_16(p_rec->s.tenant_name_len); + this->s.instance_len = CEPHTOH_16(p_rec->s.instance_len); this->s.stor_class_len = CEPHTOH_16(p_rec->s.stor_class_len); this->s.ref_tag_len = CEPHTOH_16(p_rec->s.ref_tag_len); this->s.manifest_len = CEPHTOH_16(p_rec->s.manifest_len); @@ -107,6 +109,9 @@ namespace rgw::dedup { this->tenant_name = std::string(p, this->s.tenant_name_len); p += p_rec->s.tenant_name_len; + this->instance = std::string(p, this->s.instance_len); + p += p_rec->s.instance_len; + this->stor_class = std::string(p, this->s.stor_class_len); p += p_rec->s.stor_class_len; @@ -144,10 +149,10 @@ namespace rgw::dedup { p_rec->s.md5_high = HTOCEPH_64(this->s.md5_high); p_rec->s.md5_low = HTOCEPH_64(this->s.md5_low); p_rec->s.obj_bytes_size = HTOCEPH_64(this->s.obj_bytes_size); - p_rec->s.object_version = HTOCEPH_64(this->s.object_version); p_rec->s.bucket_id_len = HTOCEPH_16(this->bucket_id.length()); p_rec->s.tenant_name_len = HTOCEPH_16(this->tenant_name.length()); + p_rec->s.instance_len = HTOCEPH_16(this->instance.length()); p_rec->s.stor_class_len = HTOCEPH_16(this->stor_class.length()); p_rec->s.ref_tag_len = HTOCEPH_16(this->ref_tag.length()); p_rec->s.manifest_len = HTOCEPH_16(this->manifest_bl.length()); @@ -168,6 +173,10 @@ namespace rgw::dedup { std::memcpy(p, this->tenant_name.data(), len); p += len; + len = this->instance.length(); + std::memcpy(p, this->instance.data(), len); + p += len; + len = this->stor_class.length(); std::memcpy(p, this->stor_class.data(), len); p += len; @@ -205,6 +214,7 @@ namespace rgw::dedup { this->bucket_name.length() + this->bucket_id.length() + this->tenant_name.length() + + this->instance.length() + this->stor_class.length() + this->ref_tag.length() + this->manifest_bl.length()); @@ -252,6 +262,7 @@ namespace rgw::dedup { stream << rec.bucket_name << "::" << rec.s.bucket_name_len << "\n"; stream << rec.bucket_id << "::" << rec.s.bucket_id_len << "\n"; stream << rec.tenant_name << "::" << rec.s.tenant_name_len << "\n"; + stream << rec.instance << "::" << rec.s.instance_len << "\n"; stream << rec.stor_class << "::" << rec.s.stor_class_len << "\n"; stream << rec.ref_tag << "::" << rec.s.ref_tag_len << "\n"; stream << "num_parts = " << rec.s.num_parts << "\n"; diff --git a/src/rgw/driver/rados/rgw_dedup_store.h b/src/rgw/driver/rados/rgw_dedup_store.h index d183968409c..7bca5d4e70e 100644 --- a/src/rgw/driver/rados/rgw_dedup_store.h +++ b/src/rgw/driver/rados/rgw_dedup_store.h @@ -138,6 +138,7 @@ namespace rgw::dedup { disk_record_t(const rgw::sal::Bucket *p_bucket, const std::string &obj_name, const parsed_etag_t *p_parsed_etag, + const std::string &instance, uint64_t obj_size, const std::string &storage_class); disk_record_t() {} @@ -161,10 +162,10 @@ namespace rgw::dedup { uint64_t md5_high; // High Bytes of the Object Data MD5 uint64_t md5_low; // Low Bytes of the Object Data MD5 uint64_t obj_bytes_size; - uint64_t object_version; uint16_t bucket_id_len; uint16_t tenant_name_len; + uint16_t instance_len; uint16_t stor_class_len; uint16_t ref_tag_len; @@ -180,6 +181,7 @@ namespace rgw::dedup { std::string bucket_id; std::string tenant_name; std::string ref_tag; + std::string instance; std::string stor_class; bufferlist manifest_bl; }; diff --git a/src/rgw/driver/rados/rgw_dedup_utils.h b/src/rgw/driver/rados/rgw_dedup_utils.h index e606f81dd70..ae10ffd6c69 100644 --- a/src/rgw/driver/rados/rgw_dedup_utils.h +++ b/src/rgw/driver/rados/rgw_dedup_utils.h @@ -356,14 +356,6 @@ namespace rgw::dedup { size_t len, const DoutPrefixProvider* dpp); - //--------------------------------------------------------------------------- - static inline void build_oid(const std::string &bucket_id, - const std::string &obj_name, - std::string *oid) - { - *oid = bucket_id + "_" + obj_name; - } - //--------------------------------------------------------------------------- static inline uint64_t calc_deduped_bytes(uint64_t head_obj_size, uint16_t num_parts, diff --git a/src/test/rgw/dedup/test_dedup.py b/src/test/rgw/dedup/test_dedup.py index f1dc15e73dc..eb766a7507e 100644 --- a/src/test/rgw/dedup/test_dedup.py +++ b/src/test/rgw/dedup/test_dedup.py @@ -1350,6 +1350,230 @@ def full_dedup_is_disabled(): return full_dedup_state_disabled +#============================================================================== +# RGW Versioning Tests: +#============================================================================== +#------------------------------------------------------------------------------- +def delete_all_versions(conn, bucket_name, dry_run=False): + log.info("delete_all_versions") + p_conf = { + 'PageSize': 1000 # Request 1000 items per page + # MaxItems is omitted to allow unlimited total items + } + paginator = conn.get_paginator('list_object_versions') + to_delete = [] + + for page in paginator.paginate(Bucket=bucket_name, PaginationConfig=p_conf): + # Collect versions + for v in page.get('Versions', []): + to_delete.append({'Key': v['Key'], 'VersionId': v['VersionId']}) + + # Collect delete markers + for dm in page.get('DeleteMarkers', []): + to_delete.append({'Key': dm['Key'], 'VersionId': dm['VersionId']}) + + # Delete in chunks + if dry_run: + log.info("DRY RUN would delete %d objects", len(to_delete)) + else: + conn.delete_objects(Bucket=bucket_name, Delete={'Objects': to_delete}) + to_delete.clear() + + +#------------------------------------------------------------------------------- +def list_all_versions(conn, bucket_name, verbose=False): + p_conf = { + 'PageSize': 1000 # Request 1000 items per page + # MaxItems is omitted to allow unlimited total items + } + paginator = conn.get_paginator("list_object_versions") + total_s3_versioned_objs=0 + for page in paginator.paginate(Bucket=bucket_name, PaginationConfig=p_conf): + # normal object versions + for v in page.get("Versions", []): + total_s3_versioned_objs += 1 + key = v["Key"] + vid = v["VersionId"] + size = v.get("Size", 0) + is_latest = v.get("IsLatest", False) + #etag = v.get("ETag") + if verbose: + log.info("%s::ver=%s, size=%d, IsLatest=%d", + key, vid, size, is_latest) + + # delete markers (no Size) + for dm in page.get("DeleteMarkers", []): + key = dm["Key"] + vid = dm["VersionId"] + is_latest = dm.get("IsLatest", False) + if verbose: + log.info("DeleteMarker::%s::ver=%s, IsLatest=%d", + key, vid, is_latest) + + return total_s3_versioned_objs + +#------------------------------------------------------------------------------- +def gen_files_in_range_single_copy(files, count, min_size, max_size): + assert(min_size <= max_size) + assert(min_size > 0) + + idx=0 + size_range = max_size - min_size + size=0 + for i in range(0, count): + size = min_size + random.randint(0, size_range-1) + idx += 1 + filename = "OBJ_" + str(idx) + files.append((filename, size, 1)) + write_file(filename, size) + + assert len(files) == count + +#------------------------------------------------------------------------------- +def simple_upload(bucket_name, files, conn, config, op_log, first_time): + for f in files: + filename=f[0] + size=f[1] + if first_time: + key = filename + else: + idx=random.randint(0, len(files)-1) + key=files[idx][0] + + log.debug("upload_file %s -> %s/%s (%d)", filename, bucket_name, key, size) + conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config) + resp = conn.head_object(Bucket=bucket_name, Key=key) + version_id = resp.get("VersionId") + op_log.append((filename, size, key, version_id)) + +#------------------------------------------------------------------------------- +def ver_calc_rados_obj_count(config, files, op_log): + size_dict = {} + num_copies_dict = {} + unique_s3_objs = set() + + for f in files: + filename=f[0] + size=f[1] + size_dict[filename] = size + num_copies_dict[filename] = 0 + + for o in op_log: + filename=o[0] + key=o[2] + num_copies_dict[filename] += 1 + unique_s3_objs.add(key) + + rados_obj_total = 0 + duplicated_tail_objs = 0 + for key, value in size_dict.items(): + size = value + num_copies = num_copies_dict[key] + assert num_copies > 0 + rados_obj_count = calc_rados_obj_count(num_copies, size, config) + rados_obj_total += (rados_obj_count * num_copies) + duplicated_tail_objs += ((num_copies-1) * (rados_obj_count-1)) + + # versioned buckets hold an extra rados-obj per versioned S3-Obj + unique_s3_objs_count = len(unique_s3_objs) + rados_obj_total += unique_s3_objs_count + rados_obj_count_post_dedup=(rados_obj_total-duplicated_tail_objs) + log.debug("calc::rados_obj_total=%d, rados_obj_count_post_dedup=%d", + rados_obj_total, rados_obj_count_post_dedup) + return(rados_obj_total, rados_obj_count_post_dedup, unique_s3_objs_count) + +#------------------------------------------------------------------------------- +def verify_objects_with_version(bucket_name, op_log, conn, config): + tempfile = OUT_DIR + "temp" + pend_delete_set = set() + for o in op_log: + filename=o[0] + size=o[1] + key=o[2] + version_id=o[3] + log.debug("verify: %s/%s:: ver=%s", bucket_name, filename, version_id) + + # call garbage collect for tail objects before reading the same filename + # this will help detect bad deletions + if filename in pend_delete_set: + result = admin(['gc', 'process', '--include-all']) + assert result[1] == 0 + + # only objects larger than RADOS_OBJ_SIZE got tail-objects + if size > RADOS_OBJ_SIZE: + pend_delete_set.add(filename) + + conn.download_file(Bucket=bucket_name, Key=key, Filename=tempfile, + Config=config, ExtraArgs={'VersionId': version_id}) + result = bash(['cmp', tempfile, OUT_DIR + filename]) + assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile) + os.remove(tempfile) + conn.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id) + + +#------------------------------------------------------------------------------- +# generate @num_files objects with @ver_count versions each of @obj_size +# verify that we got the correct number of rados-objects +# then dedup and verify that duplicate tail objects been removed +# read-verify *all* objects in all versions deleting one version after another +# while making sure the remaining versions are still good +# finally make sure no rados-object was left behind after the last ver was removed +@pytest.mark.basic_test +def test_dedup_with_versions(): + #return + + if full_dedup_is_disabled(): + return + + prepare_test() + bucket_name = "bucket1" + files=[] + op_log=[] + num_files=43 + min_size=1*KB + max_size=MULTIPART_SIZE*2 + success=False + try: + conn=get_single_connection() + conn.create_bucket(Bucket=bucket_name) + gen_files_in_range_single_copy(files, num_files, min_size, max_size) + # enable versioning + conn.put_bucket_versioning(Bucket=bucket_name, + VersioningConfiguration={"Status": "Enabled"}) + ver_count=7 + first_time=True + for i in range(0, ver_count): + simple_upload(bucket_name, files, conn, default_config, op_log, first_time) + first_time=False + + ret=ver_calc_rados_obj_count(default_config, files, op_log) + rados_objects_total=ret[0] + rados_objects_post_dedup=ret[1] + unique_s3_objs_count=ret[2] + assert unique_s3_objs_count == num_files + log.info("rados_objects_total=%d, rados_objects_post_dedup=%d", + rados_objects_total, rados_objects_post_dedup) + log.info("unique_s3_objs_count=%d, total_s3_versioned_objs=%d", + unique_s3_objs_count, len(op_log)) + total_s3_versioned_objs=list_all_versions(conn, bucket_name) + assert total_s3_versioned_objs == (num_files * ver_count) + assert total_s3_versioned_objs == len(op_log) + assert rados_objects_total == count_object_parts_in_all_buckets() + exec_dedup_internal(Dedup_Stats(), dry_run=False, max_dedup_time=500) + assert rados_objects_post_dedup == count_object_parts_in_all_buckets() + verify_objects_with_version(bucket_name, op_log, conn, default_config) + success=True + finally: + # cleanup must be executed even after a failure + if success == False: + delete_all_versions(conn, bucket_name, dry_run=False) + + # otherwise, objects been removed by verify_objects_with_version() + cleanup(bucket_name, conn) + +#============================================================================== +# ETag Corruption Tests: +#============================================================================== CORRUPTIONS = ("no corruption", "change_etag", "illegal_hex_value", "change_num_parts", "illegal_separator", "illegal_dec_val_num_parts", "illegal_num_parts_overflow")