From: Gabriel BenHanokh Date: Mon, 24 Nov 2025 08:02:22 +0000 (+0000) Subject: rgw/dedup: Prevent the dup-counter from wrapping around after it reaches 64K of ident... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fheads%2Fwip_dedup_high_dup_count_v1;p=ceph-ci.git rgw/dedup: Prevent the dup-counter from wrapping around after it reaches 64K of identical copies. Limit dedup from a single SRC to 128 Target copies to prevent OMAP size from growing out of control Resolves: rhbz#2415656 Resolves: rhbz#2416043 Signed-off-by: Gabriel BenHanokh --- diff --git a/src/rgw/driver/rados/rgw_dedup.cc b/src/rgw/driver/rados/rgw_dedup.cc index eaf0492cb48..f841e8aad5a 100644 --- a/src/rgw/driver/rados/rgw_dedup.cc +++ b/src/rgw/driver/rados/rgw_dedup.cc @@ -465,7 +465,9 @@ namespace rgw::dedup { << "::ETAG=" << std::hex << p_rec->s.md5_high << p_rec->s.md5_low << std::dec << dendl; - int ret = p_table->add_entry(&key, block_id, rec_id, has_shared_manifest); + int ret = p_table->add_entry(&key, block_id, rec_id, has_shared_manifest, + &p_stats->small_objs_stat, &p_stats->big_objs_stat, + &p_stats->dup_head_bytes_estimate); if (ret == 0) { p_stats->loaded_objects ++; ldpp_dout(dpp, 20) << __func__ << "::" << p_rec->bucket_name << "/" @@ -1061,6 +1063,24 @@ namespace rgw::dedup { return 0; } + // limit the number of ref_count in the SRC-OBJ to MAX_COPIES_PER_OBJ + // check <= because we also count the SRC-OBJ + if (src_val.get_count() <= MAX_COPIES_PER_OBJ) { + disk_block_id_t src_block_id = src_val.get_src_block_id(); + record_id_t src_rec_id = src_val.get_src_rec_id(); + // update the number of identical copies we got + ldpp_dout(dpp, 20) << __func__ << "::Obj " << p_rec->obj_name + << " has " << src_val.get_count() << " copies" << dendl; + p_table->inc_count(&key_from_bucket_index, src_block_id, src_rec_id); + } + else { + // We don't want more than @MAX_COPIES_PER_OBJ to prevent OMAP overload + p_stats->skipped_too_many_copies++; + ldpp_dout(dpp, 10) << __func__ << "::Obj " << p_rec->obj_name + << " has too many copies already" << dendl; + return 0; + } + // Every object after this point was counted as a dedup potential // If we conclude that it can't be dedup it should be accounted for rgw_bucket b{p_rec->tenant_name, p_rec->bucket_name, p_rec->bucket_id}; @@ -1250,8 +1270,8 @@ namespace rgw::dedup { return 0; } - disk_block_id_t src_block_id = src_val.block_idx; - record_id_t src_rec_id = src_val.rec_id; + disk_block_id_t src_block_id = src_val.get_src_block_id(); + record_id_t src_rec_id = src_val.get_src_rec_id(); if (block_id == src_block_id && rec_id == src_rec_id) { // the table entry point to this record which means it is a dedup source so nothing to do p_stats->skipped_source_record++; @@ -1794,8 +1814,7 @@ namespace rgw::dedup { return -ECANCELED; } } - p_table->count_duplicates(&p_stats->small_objs_stat, &p_stats->big_objs_stat, - &p_stats->dup_head_bytes_estimate); + p_table->count_duplicates(&p_stats->small_objs_stat, &p_stats->big_objs_stat); display_table_stat_counters(dpp, p_stats); ldpp_dout(dpp, 10) << __func__ << "::MD5 Loop::" << d_ctl.dedup_type << dendl; diff --git a/src/rgw/driver/rados/rgw_dedup_table.cc b/src/rgw/driver/rados/rgw_dedup_table.cc index c4841303e33..4f34b27d18e 100644 --- a/src/rgw/driver/rados/rgw_dedup_table.cc +++ b/src/rgw/driver/rados/rgw_dedup_table.cc @@ -83,10 +83,14 @@ namespace rgw::dedup { redistributed_loopback++; } + // we no longer need the counter, reuse it to count actual dedup + hash_tab[idx].val.reset_count(); redistributed_search_max = std::max(redistributed_search_max, count); redistributed_search_total += count; } else { + // we no longer need the counter, reuse it to count actual dedup + hash_tab[tab_idx].val.reset_count(); redistributed_not_needed++; } } @@ -104,11 +108,44 @@ namespace rgw::dedup { return idx; } + //--------------------------------------------------------------------------- + static void inc_counters(const key_t *p_key, + uint32_t head_object_size, + dedup_stats_t *p_small_objs, + dedup_stats_t *p_big_objs, + uint64_t *p_duplicate_head_bytes) + { + // This is an approximation only since size is stored in 4KB resolution + uint64_t byte_size_approx = disk_blocks_to_byte_size(p_key->size_4k_units); + + // skip small single part objects which we can't dedup + if (!p_key->multipart_object() && (byte_size_approx <= head_object_size)) { + p_small_objs->duplicate_count ++; + p_small_objs->dedup_bytes_estimate += byte_size_approx; + return; + } + else { + uint64_t dup_bytes_approx = calc_deduped_bytes(head_object_size, + p_key->num_parts, + byte_size_approx); + p_big_objs->duplicate_count ++; + p_big_objs->dedup_bytes_estimate += dup_bytes_approx; + + if (!p_key->multipart_object()) { + // single part objects duplicate the head object when dedup is used + *p_duplicate_head_bytes += head_object_size; + } + } + } + //--------------------------------------------------------------------------- int dedup_table_t::add_entry(key_t *p_key, disk_block_id_t block_id, record_id_t rec_id, - bool shared_manifest) + bool shared_manifest, + dedup_stats_t *p_small_objs, + dedup_stats_t *p_big_objs, + uint64_t *p_duplicate_head_bytes) { value_t new_val(block_id, rec_id, shared_manifest); uint32_t idx = find_entry(p_key); @@ -128,7 +165,13 @@ namespace rgw::dedup { } else { ceph_assert(hash_tab[idx].key == *p_key); - val.count ++; + if (val.count <= MAX_COPIES_PER_OBJ) { + inc_counters(p_key, head_object_size, p_small_objs, p_big_objs, + p_duplicate_head_bytes); + } + if (val.count < std::numeric_limits::max()) { + val.count ++; + } if (!val.has_shared_manifest() && shared_manifest) { // replace value! ldpp_dout(dpp, 20) << __func__ << "::Replace with shared_manifest::[" @@ -154,8 +197,6 @@ namespace rgw::dedup { ceph_assert(hash_tab[idx].key == *p_key); value_t &val = hash_tab[idx].val; ceph_assert(val.is_occupied()); - // we only update non-singletons since we purge singletons after the first pass - ceph_assert(val.count > 1); // need to overwrite the block_idx/rec_id from the first pass // unless already set with shared_manifest with the correct block-id/rec-id @@ -189,23 +230,46 @@ namespace rgw::dedup { return -ENOENT; } + //--------------------------------------------------------------------------- + int dedup_table_t::inc_count(const key_t *p_key, + disk_block_id_t block_id, + record_id_t rec_id) + { + uint32_t idx = find_entry(p_key); + value_t &val = hash_tab[idx].val; + if (val.is_occupied()) { + if (val.block_idx == block_id && val.rec_id == rec_id) { + val.inc_count(); + return 0; + } + else { + ldpp_dout(dpp, 5) << __func__ << "::ERR Failed Ncopies bloc/rec" << dendl; + } + } + else { + ldpp_dout(dpp, 5) << __func__ << "::ERR Failed Ncopies key" << dendl; + } + + return -ENOENT; + } + //--------------------------------------------------------------------------- int dedup_table_t::get_val(const key_t *p_key, struct value_t *p_val /*OUT*/) { uint32_t idx = find_entry(p_key); const value_t &val = hash_tab[idx].val; - if (!val.is_occupied()) { + if (val.is_occupied()) { + *p_val = val; + return 0; + } + else { return -ENOENT; } - - *p_val = val; - return 0; } //--------------------------------------------------------------------------- void dedup_table_t::count_duplicates(dedup_stats_t *p_small_objs, - dedup_stats_t *p_big_objs, - uint64_t *p_duplicate_head_bytes) + dedup_stats_t *p_big_objs) { for (uint32_t tab_idx = 0; tab_idx < entries_count; tab_idx++) { if (!hash_tab[tab_idx].val.is_occupied()) { @@ -215,7 +279,6 @@ namespace rgw::dedup { const key_t &key = hash_tab[tab_idx].key; // This is an approximation only since size is stored in 4KB resolution uint64_t byte_size_approx = disk_blocks_to_byte_size(key.size_4k_units); - uint32_t duplicate_count = (hash_tab[tab_idx].val.count -1); // skip small single part objects which we can't dedup if (!key.multipart_object() && (byte_size_approx <= head_object_size)) { @@ -223,29 +286,16 @@ namespace rgw::dedup { p_small_objs->singleton_count++; } else { - p_small_objs->duplicate_count += duplicate_count; p_small_objs->unique_count ++; - p_small_objs->dedup_bytes_estimate += (duplicate_count * byte_size_approx); } - continue; - } - - if (hash_tab[tab_idx].val.is_singleton()) { - p_big_objs->singleton_count++; } else { - ceph_assert(hash_tab[tab_idx].val.count > 1); - uint64_t dup_bytes_approx = calc_deduped_bytes(head_object_size, - key.num_parts, - byte_size_approx); - p_big_objs->dedup_bytes_estimate += (duplicate_count * dup_bytes_approx); - p_big_objs->duplicate_count += duplicate_count; - p_big_objs->unique_count ++; - - if (!key.multipart_object()) { - // single part objects duplicate the head object when dedup is used - uint64_t dup_head_bytes = duplicate_count * head_object_size; - *p_duplicate_head_bytes += dup_head_bytes; + if (hash_tab[tab_idx].val.is_singleton()) { + p_big_objs->singleton_count++; + } + else { + ceph_assert(hash_tab[tab_idx].val.count > 1); + p_big_objs->unique_count ++; } } } diff --git a/src/rgw/driver/rados/rgw_dedup_table.h b/src/rgw/driver/rados/rgw_dedup_table.h index 85fd3295f5e..4a46db6e5b7 100644 --- a/src/rgw/driver/rados/rgw_dedup_table.h +++ b/src/rgw/driver/rados/rgw_dedup_table.h @@ -66,6 +66,7 @@ namespace rgw::dedup { public: // 8 Bytes Value struct value_t { + friend class dedup_table_t; value_t() { this->block_idx = 0xFFFFFFFF; this->count = 0; @@ -83,15 +84,21 @@ namespace rgw::dedup { flags.set_shared_manifest(); } } - - inline void clear_flags() { flags.clear(); } inline bool has_shared_manifest() const {return flags.has_shared_manifest(); } + inline uint16_t get_count() { return this->count; } + inline disk_block_id_t get_src_block_id() { return this->block_idx; } + inline record_id_t get_src_rec_id() { return this->rec_id; } + private: inline void set_shared_manifest_src() { this->flags.set_shared_manifest(); } + inline void inc_count() { count ++; } + inline void reset_count() { count = 0; } + inline void clear_flags() { flags.clear(); } inline bool is_singleton() const { return (count == 1); } inline bool is_occupied() const { return flags.is_occupied(); } inline void set_occupied() { this->flags.set_occupied(); } inline void clear_occupied() { this->flags.clear_occupied(); } + disk_block_id_t block_idx; // 32 bits uint16_t count; // 16 bits record_id_t rec_id; // 8 bits @@ -103,20 +110,27 @@ namespace rgw::dedup { uint32_t _head_object_size, uint8_t *p_slab, uint64_t slab_size); - int add_entry(key_t *p_key, disk_block_id_t block_id, record_id_t rec_id, - bool shared_manifest); + int add_entry(key_t *p_key, + disk_block_id_t block_id, + record_id_t rec_id, + bool shared_manifest, + dedup_stats_t *p_small_objs_stat, + dedup_stats_t *p_big_objs_stat, + uint64_t *p_duplicate_head_bytes); + void update_entry(key_t *p_key, disk_block_id_t block_id, record_id_t rec_id, bool shared_manifest); int get_val(const key_t *p_key, struct value_t *p_val /*OUT*/); + int inc_count(const key_t *p_key, disk_block_id_t block_id, record_id_t rec_id); + int set_shared_manifest_src_mode(const key_t *p_key, disk_block_id_t block_id, record_id_t rec_id); void count_duplicates(dedup_stats_t *p_small_objs_stat, - dedup_stats_t *p_big_objs_stat, - uint64_t *p_duplicate_head_bytes); + dedup_stats_t *p_big_objs_stat); void remove_singletons_and_redistribute_keys(); private: diff --git a/src/rgw/driver/rados/rgw_dedup_utils.cc b/src/rgw/driver/rados/rgw_dedup_utils.cc index 04c842a0fba..61ad6b91c51 100644 --- a/src/rgw/driver/rados/rgw_dedup_utils.cc +++ b/src/rgw/driver/rados/rgw_dedup_utils.cc @@ -557,6 +557,7 @@ namespace rgw::dedup { this->skipped_purged_small += other.skipped_purged_small; this->skipped_singleton += other.skipped_singleton; this->skipped_singleton_bytes += other.skipped_singleton_bytes; + this->skipped_too_many_copies += other.skipped_too_many_copies; this->skipped_source_record += other.skipped_source_record; this->duplicate_records += other.duplicate_records; this->size_mismatch += other.size_mismatch; @@ -671,6 +672,9 @@ namespace rgw::dedup { if (this->skipped_singleton) { f->dump_unsigned("Skipped singleton Bytes", this->skipped_singleton_bytes); } + if (this->skipped_too_many_copies) { + f->dump_unsigned("Skipped Too Many Copies", this->skipped_too_many_copies); + } f->dump_unsigned("Skipped source record", this->skipped_source_record); if (this->ingress_skip_encrypted) { @@ -755,6 +759,7 @@ namespace rgw::dedup { encode(m.skipped_purged_small, bl); encode(m.skipped_singleton, bl); encode(m.skipped_singleton_bytes, bl); + encode(m.skipped_too_many_copies, bl); encode(m.skipped_source_record, bl); encode(m.duplicate_records, bl); encode(m.size_mismatch, bl); @@ -808,6 +813,7 @@ namespace rgw::dedup { decode(m.skipped_purged_small, bl); decode(m.skipped_singleton, bl); decode(m.skipped_singleton_bytes, bl); + decode(m.skipped_too_many_copies, bl); decode(m.skipped_source_record, bl); decode(m.duplicate_records, bl); decode(m.size_mismatch, bl); diff --git a/src/rgw/driver/rados/rgw_dedup_utils.h b/src/rgw/driver/rados/rgw_dedup_utils.h index ae10ffd6c69..abe62432122 100644 --- a/src/rgw/driver/rados/rgw_dedup_utils.h +++ b/src/rgw/driver/rados/rgw_dedup_utils.h @@ -56,6 +56,10 @@ namespace rgw::dedup { static_assert(MAX_MD5_SHARD < NULL_SHARD); static_assert(MAX_MD5_SHARD <= MD5_SHARD_HARD_LIMIT); + // Limit the number of duplicates allowed per unique object to control + // the number of ref_count entries in the SRC-OBJ + const uint8_t MAX_COPIES_PER_OBJ = 128; + //--------------------------------------------------------------------------- enum dedup_req_type_t { DEDUP_TYPE_NONE = 0, @@ -249,6 +253,7 @@ namespace rgw::dedup { uint64_t skipped_purged_small = 0; uint64_t skipped_singleton = 0; uint64_t skipped_singleton_bytes = 0; + uint64_t skipped_too_many_copies = 0; uint64_t skipped_source_record = 0; uint64_t duplicate_records = 0; uint64_t size_mismatch = 0; diff --git a/src/test/rgw/dedup/test_dedup.py b/src/test/rgw/dedup/test_dedup.py index 4838f91056f..a9db908a90f 100644 --- a/src/test/rgw/dedup/test_dedup.py +++ b/src/test/rgw/dedup/test_dedup.py @@ -278,6 +278,7 @@ default_config = TransferConfig(multipart_threshold=MULTIPART_SIZE, multipart_ch ETAG_ATTR="user.rgw.etag" POOLNAME="default.rgw.buckets.data" +MAX_COPIES_PER_OBJ=128 #------------------------------------------------------------------------------- def write_file(filename, size): full_filename = OUT_DIR + filename @@ -566,6 +567,7 @@ def calc_on_disk_byte_size(byte_size): #------------------------------------------------------------------------------- def calc_expected_stats(dedup_stats, obj_size, num_copies, config): dups_count = (num_copies - 1) + dups_count = min(dups_count, MAX_COPIES_PER_OBJ) on_disk_byte_size = calc_on_disk_byte_size(obj_size) log.debug("obj_size=%d, on_disk_byte_size=%d", obj_size, on_disk_byte_size) threshold = config.multipart_threshold @@ -595,8 +597,8 @@ def calc_expected_stats(dedup_stats, obj_size, num_copies, config): else: dedup_stats.skip_src_record += 1 dedup_stats.set_shared_manifest_src += 1 - dedup_stats.set_hash += num_copies - dedup_stats.invalid_hash += num_copies + dedup_stats.set_hash += (dups_count + 1) + dedup_stats.invalid_hash += (dups_count + 1) dedup_stats.unique_obj += 1 dedup_stats.duplicate_obj += dups_count dedup_stats.deduped_obj += dups_count @@ -664,10 +666,10 @@ def upload_objects(bucket_name, files, indices, conn, config, check_obj_count=Tr for i in range(idx, num_copies): key = gen_object_name(filename, i) #log.debug("upload_file %s/%s with crc32", bucket_name, key) - conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config, ExtraArgs={'ChecksumAlgorithm': 'crc32'}) + conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config) log.debug("==========================================") - log.debug("Summery:\n%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB", + log.debug("Summary: %d S3 objects were uploaded (%d rados objects), total size = %.2f MiB", s3_objects_total, rados_objects_total, total_space/MB) log.debug("Based on calculation we should have %d rados objects", rados_objects_total) log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs) @@ -720,7 +722,7 @@ def upload_objects_multi(files, conns, bucket_names, indices, config, check_obj_ log.debug("upload_objects::<%s/%s>", bucket_names[ten_id], key) log.debug("==========================================") - log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB", + log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB", s3_objects_total, rados_objects_total, total_space/MB) log.debug("Based on calculation we should have %d rados objects", rados_objects_total) log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs) @@ -801,7 +803,7 @@ def procs_upload_objects(files, conns, bucket_names, indices, config, check_obj_ proc_list[idx].join() log.debug("==========================================") - log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB", + log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB", s3_objects_total, rados_objects_total, total_space/MB) log.debug("Based on calculation we should have %d rados objects", rados_objects_total) log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs) @@ -827,42 +829,42 @@ def procs_upload_objects(files, conns, bucket_names, indices, config, check_obj_ #------------------------------------------------------------------------------- def verify_objects(bucket_name, files, conn, expected_results, config): - tempfile = OUT_DIR + "temp" + tmpfile = OUT_DIR + "temp" for f in files: filename=f[0] obj_size=f[1] num_copies=f[2] log.debug("comparing file=%s, size=%d, copies=%d", filename, obj_size, num_copies) for i in range(0, num_copies): + filecmp.clear_cache() key = gen_object_name(filename, i) - #log.debug("download_file(%s) with crc32", key) - conn.download_file(bucket_name, key, tempfile, Config=config, ExtraArgs={'ChecksumMode': 'crc32'}) - #conn.download_file(bucket_name, key, tempfile, Config=config) - result = bash(['cmp', tempfile, OUT_DIR + filename]) - assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile) - os.remove(tempfile) + conn.download_file(bucket_name, key, tmpfile, Config=config) + equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False) + assert equal ,"Files %s and %s differ!!" % (key, tmpfile) + os.remove(tmpfile) + log.debug("verify_objects: finished reading all objects") assert expected_results == count_object_parts_in_all_buckets(True) log.debug("verify_objects::completed successfully!!") - #------------------------------------------------------------------------------- def verify_objects_multi(files, conns, bucket_names, expected_results, config): max_tenants=len(conns) - tempfile = OUT_DIR + "temp" + tmpfile = OUT_DIR + "temp" for f in files: filename=f[0] obj_size=f[1] num_copies=f[2] log.debug("comparing file=%s, size=%d, copies=%d", filename, obj_size, num_copies) for i in range(0, num_copies): + filecmp.clear_cache() key = gen_object_name(filename, i) log.debug("comparing object %s with file %s", key, filename) ten_id = i % max_tenants - conns[ten_id].download_file(bucket_names[ten_id], key, tempfile, Config=config) - result = bash(['cmp', tempfile, OUT_DIR + filename]) - assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile) - os.remove(tempfile) + conns[ten_id].download_file(bucket_names[ten_id], key, tmpfile, Config=config) + equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False) + assert equal ,"Files %s and %s differ!!" % (key, tmpfile) + os.remove(tmpfile) assert expected_results == count_object_parts_in_all_buckets(True) log.debug("verify_objects::completed successfully!!") @@ -870,7 +872,7 @@ def verify_objects_multi(files, conns, bucket_names, expected_results, config): #------------------------------------------------------------------------------- def thread_verify(thread_id, num_threads, files, conn, bucket, config): - tempfile = OUT_DIR + "temp" + str(thread_id) + tmpfile = OUT_DIR + "temp" + str(thread_id) count = 0 for f in files: filename=f[0] @@ -883,10 +885,10 @@ def thread_verify(thread_id, num_threads, files, conn, bucket, config): if thread_id == target_thread: key = gen_object_name(filename, i) log.debug("comparing object %s with file %s", key, filename) - conn.download_file(bucket, key, tempfile, Config=config) - result = bash(['cmp', tempfile, OUT_DIR + filename]) - assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile) - os.remove(tempfile) + conn.download_file(bucket, key, tmpfile, Config=config) + equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False) + assert equal ,"Files %s and %s differ!!" % (key, tmpfile) + os.remove(tmpfile) #------------------------------------------------------------------------------- @@ -1239,7 +1241,7 @@ def simple_dedup(conn, files, bucket_name, run_cleanup_after, config, dry_run): ret = upload_objects(bucket_name, files, indices, conn, config) expected_results = ret[0] dedup_stats = ret[1] - + log.info("%d S3 objects were uploaded", ret[2]) exec_dedup(dedup_stats, dry_run) if dry_run == False: log.debug("Verify all objects") @@ -2995,3 +2997,161 @@ def test_dedup_dry_large_scale(): def test_cleanup(): close_all_connections() +#--------------------------------------------------------------------------- +def proc_upload_identical(proc_id, num_procs, filename, conn, bucket_name, num_copies, config): + log.debug("Proc_ID=%d::started", proc_id) + for idx in range(num_copies): + log.debug("upload_objects::%s::idx=%d", filename, idx); + target_proc = (idx % num_procs) + if (proc_id == target_proc): + key = gen_object_name(filename, idx) + conn.upload_file(OUT_DIR+filename, bucket_name, key, Config=config) + #log.info("[%d]upload_objects::<%s/%s>", proc_id, bucket_name, key) + +#--------------------------------------------------------------------------- +def proc_parallel_upload_identical(files, conns, bucket_name, config): + num_procs=len(conns) + proc_list=list() + f = files[0] + filename=f[0] + num_copies=f[2] + for idx in range(num_procs): + log.debug("Create proc_id=%d", idx) + p=Process(target=proc_upload_identical, + args=(idx, num_procs, filename, conns[idx], bucket_name, num_copies, config)) + proc_list.append(p) + proc_list[idx].start() + + + # wait for all worker proc to join + for idx in range(num_procs): + proc_list[idx].join() + +#--------------------------------------------------------------------------- +def calc_identical_copies_stats(files, conns, bucket_name, config): + f = files[0] + obj_size=f[1] + filename=f[0] + copies_count=f[2] + dedup_stats = Dedup_Stats() + s3_objects_total=copies_count + calc_expected_stats(dedup_stats, obj_size, copies_count, config) + dups_count = min(copies_count, MAX_COPIES_PER_OBJ) + total_space = (obj_size * copies_count) + ret=calc_dedupable_space(obj_size, config) + dedupable_space=ret[0] + duplicated_space = (dups_count * dedupable_space) + rados_obj_count=calc_rados_obj_count(copies_count, obj_size, config) + rados_objects_total = (rados_obj_count * copies_count) + duplicated_tail_objs = (dups_count * (rados_obj_count-1)) + log.info("upload_objects::%s::size=%d, copies_count=%d", + filename, obj_size, copies_count); + + s3_object_count = count_objects_in_bucket(bucket_name, conns[0]) + assert rados_objects_total == count_object_parts_in_all_buckets() + assert (s3_object_count == s3_objects_total) + expected_rados_obj_count_post_dedup=(rados_objects_total-duplicated_tail_objs) + return (expected_rados_obj_count_post_dedup, dedup_stats, s3_objects_total) + +#------------------------------------------------------------------------------- +def __test_dedup_identical_copies(files, config, dry_run, verify, force_clean=False): + num_threads=32 + bucket_name = "bucket1" + conns=get_connections(num_threads) + bucket_names=[bucket_name] * num_threads + try: + if dry_run: + conns[0].create_bucket(Bucket=bucket_name) + start = time.time_ns() + proc_parallel_upload_identical(files, conns, bucket_name, config) + upload_time_sec = (time.time_ns() - start) / (1000*1000*1000) + log.info("upload time = %d sec", upload_time_sec) + + ret=calc_identical_copies_stats(files, conns, bucket_name, config) + expected_results = ret[0] + dedup_stats = ret[1] + + exec_dedup(dedup_stats, dry_run) + if verify: + log.info("Verify all objects") + start_time = time.time_ns() + threads_verify_objects(files, conns, bucket_names, expected_results, config) + end_time = time.time_ns() + log.info("Verify all objects time = %d(sec)", + (end_time - start_time)/1_000_000_000) + finally: + # cleanup must be executed even after a failure + if not dry_run or force_clean: + cleanup(bucket_name, conns[0]) + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_identical_copies(): + num_files=1 + copies_count=64*1024+1 + size=64*KB + config=default_config + prepare_test() + files=[] + gen_files_fixed_copies(files, num_files, size, copies_count) + + # start with a dry_run + dry_run=True + verify=False + log.info("test_dedup_identical_copies:dry test") + __test_dedup_identical_copies(files, config, dry_run, verify) + + # and then perform a full dedup + dry_run=False + # no need to read-verify data since min size for single-part dedup is 4MB + verify=False + force=False + log.info("test_dedup_identical_copies:full test") + __test_dedup_identical_copies(files, config, dry_run, verify, force) + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_identical_copies_multipart(): + num_files=1 + copies_count=64*1024+1 + size=16*KB + prepare_test() + files=[] + gen_files_fixed_copies(files, num_files, size, copies_count) + config=TransferConfig(multipart_threshold=size, multipart_chunksize=size) + # start with a dry_run + dry_run=True + verify=False + log.info("test_dedup_identical_copies_multipart:dry test") + __test_dedup_identical_copies(files, config, dry_run, verify) + + # and then perform a full dedup + dry_run=False + verify=False + force_clean=True + log.info("test_dedup_identical_copies_multipart:full test") + __test_dedup_identical_copies(files, config, dry_run, verify, force_clean) + +#------------------------------------------------------------------------------- +@pytest.mark.basic_test +def test_dedup_identical_copies_multipart_small(): + num_files=1 + copies_count=1024 + size=16*KB + prepare_test() + files=[] + gen_files_fixed_copies(files, num_files, size, copies_count) + config=TransferConfig(multipart_threshold=size, multipart_chunksize=size) + # start with a dry_run + dry_run=True + verify=False + log.info("test_dedup_identical_copies_multipart:dry test") + __test_dedup_identical_copies(files, config, dry_run, verify) + + # and then perform a full dedup + dry_run=False + verify=True + force_clean=True + log.info("test_dedup_identical_copies_multipart:full test") + __test_dedup_identical_copies(files, config, dry_run, verify, force_clean) +