redistributed_loopback++;
}
+ // we no longer need the counter, reuse it to count actual dedup
+ hash_tab[idx].val.reset_count();
redistributed_search_max = std::max(redistributed_search_max, count);
redistributed_search_total += count;
}
else {
+ // we no longer need the counter, reuse it to count actual dedup
+ hash_tab[tab_idx].val.reset_count();
redistributed_not_needed++;
}
}
return idx;
}
+ //---------------------------------------------------------------------------
+ static void inc_counters(const key_t *p_key,
+ uint32_t head_object_size,
+ dedup_stats_t *p_small_objs,
+ dedup_stats_t *p_big_objs,
+ uint64_t *p_duplicate_head_bytes)
+ {
+ // This is an approximation only since size is stored in 4KB resolution
+ uint64_t byte_size_approx = disk_blocks_to_byte_size(p_key->size_4k_units);
+
+ // skip small single part objects which we can't dedup
+ if (!p_key->multipart_object() && (byte_size_approx <= head_object_size)) {
+ p_small_objs->duplicate_count ++;
+ p_small_objs->dedup_bytes_estimate += byte_size_approx;
+ return;
+ }
+ else {
+ uint64_t dup_bytes_approx = calc_deduped_bytes(head_object_size,
+ p_key->num_parts,
+ byte_size_approx);
+ p_big_objs->duplicate_count ++;
+ p_big_objs->dedup_bytes_estimate += dup_bytes_approx;
+
+ if (!p_key->multipart_object()) {
+ // single part objects duplicate the head object when dedup is used
+ *p_duplicate_head_bytes += head_object_size;
+ }
+ }
+ }
+
//---------------------------------------------------------------------------
int dedup_table_t::add_entry(key_t *p_key,
disk_block_id_t block_id,
record_id_t rec_id,
- bool shared_manifest)
+ bool shared_manifest,
+ dedup_stats_t *p_small_objs,
+ dedup_stats_t *p_big_objs,
+ uint64_t *p_duplicate_head_bytes)
{
value_t new_val(block_id, rec_id, shared_manifest);
uint32_t idx = find_entry(p_key);
}
else {
ceph_assert(hash_tab[idx].key == *p_key);
- val.count ++;
+ if (val.count <= MAX_COPIES_PER_OBJ) {
+ inc_counters(p_key, head_object_size, p_small_objs, p_big_objs,
+ p_duplicate_head_bytes);
+ }
+ if (val.count < std::numeric_limits<std::uint16_t>::max()) {
+ val.count ++;
+ }
if (!val.has_shared_manifest() && shared_manifest) {
// replace value!
ldpp_dout(dpp, 20) << __func__ << "::Replace with shared_manifest::["
ceph_assert(hash_tab[idx].key == *p_key);
value_t &val = hash_tab[idx].val;
ceph_assert(val.is_occupied());
- // we only update non-singletons since we purge singletons after the first pass
- ceph_assert(val.count > 1);
// need to overwrite the block_idx/rec_id from the first pass
// unless already set with shared_manifest with the correct block-id/rec-id
return -ENOENT;
}
+ //---------------------------------------------------------------------------
+ int dedup_table_t::inc_count(const key_t *p_key,
+ disk_block_id_t block_id,
+ record_id_t rec_id)
+ {
+ uint32_t idx = find_entry(p_key);
+ value_t &val = hash_tab[idx].val;
+ if (val.is_occupied()) {
+ if (val.block_idx == block_id && val.rec_id == rec_id) {
+ val.inc_count();
+ return 0;
+ }
+ else {
+ ldpp_dout(dpp, 5) << __func__ << "::ERR Failed Ncopies bloc/rec" << dendl;
+ }
+ }
+ else {
+ ldpp_dout(dpp, 5) << __func__ << "::ERR Failed Ncopies key" << dendl;
+ }
+
+ return -ENOENT;
+ }
+
//---------------------------------------------------------------------------
int dedup_table_t::get_val(const key_t *p_key, struct value_t *p_val /*OUT*/)
{
uint32_t idx = find_entry(p_key);
const value_t &val = hash_tab[idx].val;
- if (!val.is_occupied()) {
+ if (val.is_occupied()) {
+ *p_val = val;
+ return 0;
+ }
+ else {
return -ENOENT;
}
-
- *p_val = val;
- return 0;
}
//---------------------------------------------------------------------------
void dedup_table_t::count_duplicates(dedup_stats_t *p_small_objs,
- dedup_stats_t *p_big_objs,
- uint64_t *p_duplicate_head_bytes)
+ dedup_stats_t *p_big_objs)
{
for (uint32_t tab_idx = 0; tab_idx < entries_count; tab_idx++) {
if (!hash_tab[tab_idx].val.is_occupied()) {
const key_t &key = hash_tab[tab_idx].key;
// This is an approximation only since size is stored in 4KB resolution
uint64_t byte_size_approx = disk_blocks_to_byte_size(key.size_4k_units);
- uint32_t duplicate_count = (hash_tab[tab_idx].val.count -1);
// skip small single part objects which we can't dedup
if (!key.multipart_object() && (byte_size_approx <= head_object_size)) {
p_small_objs->singleton_count++;
}
else {
- p_small_objs->duplicate_count += duplicate_count;
p_small_objs->unique_count ++;
- p_small_objs->dedup_bytes_estimate += (duplicate_count * byte_size_approx);
}
- continue;
- }
-
- if (hash_tab[tab_idx].val.is_singleton()) {
- p_big_objs->singleton_count++;
}
else {
- ceph_assert(hash_tab[tab_idx].val.count > 1);
- uint64_t dup_bytes_approx = calc_deduped_bytes(head_object_size,
- key.num_parts,
- byte_size_approx);
- p_big_objs->dedup_bytes_estimate += (duplicate_count * dup_bytes_approx);
- p_big_objs->duplicate_count += duplicate_count;
- p_big_objs->unique_count ++;
-
- if (!key.multipart_object()) {
- // single part objects duplicate the head object when dedup is used
- uint64_t dup_head_bytes = duplicate_count * head_object_size;
- *p_duplicate_head_bytes += dup_head_bytes;
+ if (hash_tab[tab_idx].val.is_singleton()) {
+ p_big_objs->singleton_count++;
+ }
+ else {
+ ceph_assert(hash_tab[tab_idx].val.count > 1);
+ p_big_objs->unique_count ++;
}
}
}
unique_obj : int = 0
dedup_bytes_estimate : int = 0
duplicate_obj : int = 0
- dup_head_size_estimate : int = 0
- dup_head_size : int = 0
deduped_obj_bytes : int = 0
non_default_storage_class_objs_bytes : int = 0
potential_singleton_obj : int = 0
ETAG_ATTR="user.rgw.etag"
POOLNAME="default.rgw.buckets.data"
+MAX_COPIES_PER_OBJ=128
#-------------------------------------------------------------------------------
def write_file(filename, size):
full_filename = OUT_DIR + filename
#-------------------------------------------------------------------------------
def calc_dedupable_space(obj_size, config):
- dup_head_size=0
threshold = config.multipart_threshold
# Objects with size bigger than MULTIPART_SIZE are uploaded as multi-part
# multi-part objects got a zero size Head objects
dedupable_space = obj_size
elif obj_size > RADOS_OBJ_SIZE:
dedupable_space = obj_size - RADOS_OBJ_SIZE
- dup_head_size = RADOS_OBJ_SIZE
else:
dedupable_space = 0
log.debug("obj_size=%.2f MiB, dedupable_space=%.2f MiB",
float(obj_size)/MB, float(dedupable_space)/MB)
- return (dedupable_space, dup_head_size)
+ return dedupable_space
BLOCK_SIZE=4096
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def calc_expected_stats(dedup_stats, obj_size, num_copies, config):
dups_count = (num_copies - 1)
+ dups_count = min(dups_count, MAX_COPIES_PER_OBJ)
on_disk_byte_size = calc_on_disk_byte_size(obj_size)
log.debug("obj_size=%d, on_disk_byte_size=%d", obj_size, on_disk_byte_size)
threshold = config.multipart_threshold
else:
dedup_stats.skip_src_record += 1
dedup_stats.set_shared_manifest_src += 1
- dedup_stats.set_hash += num_copies
- dedup_stats.invalid_hash += num_copies
+ dedup_stats.set_hash += (dups_count + 1)
+ dedup_stats.invalid_hash += (dups_count + 1)
dedup_stats.unique_obj += 1
dedup_stats.duplicate_obj += dups_count
dedup_stats.deduped_obj += dups_count
- ret=calc_dedupable_space(on_disk_byte_size, config)
- deduped_obj_bytes=ret[0]
- dup_head_size=ret[1]
+ deduped_obj_bytes=calc_dedupable_space(on_disk_byte_size, config)
dedup_stats.deduped_obj_bytes += (deduped_obj_bytes * dups_count)
- dedup_stats.dup_head_size += (dup_head_size * dups_count)
- dedup_stats.dup_head_size_estimate += (dup_head_size * dups_count)
deduped_block_bytes=((deduped_obj_bytes+BLOCK_SIZE-1)//BLOCK_SIZE)*BLOCK_SIZE
dedup_stats.dedup_bytes_estimate += (deduped_block_bytes * dups_count)
assert(obj_size)
calc_expected_stats(dedup_stats, obj_size, num_copies, config)
total_space += (obj_size * num_copies)
- ret=calc_dedupable_space(obj_size, config)
- dedupable_space=ret[0]
- dup_head_size=ret[1]
+ dedupable_space=calc_dedupable_space(obj_size, config)
duplicated_space += ((num_copies-1) * dedupable_space)
rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config)
rados_objects_total += (rados_obj_count * num_copies)
for i in range(idx, num_copies):
key = gen_object_name(filename, i)
#log.debug("upload_file %s/%s with crc32", bucket_name, key)
- conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config, ExtraArgs={'ChecksumAlgorithm': 'crc32'})
+ conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config)
log.debug("==========================================")
- log.debug("Summery:\n%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+ log.debug("Summary: %d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
s3_objects_total, rados_objects_total, total_space/MB)
log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
assert(obj_size)
calc_expected_stats(dedup_stats, obj_size, num_copies, config)
total_space += (obj_size * num_copies)
- ret=calc_dedupable_space(obj_size, config)
- dedupable_space=ret[0]
- dup_head_size=ret[1]
+ dedupable_space=calc_dedupable_space(obj_size, config)
duplicated_space += ((num_copies-1) * dedupable_space)
rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config)
rados_objects_total += (rados_obj_count * num_copies)
log.debug("upload_objects::<%s/%s>", bucket_names[ten_id], key)
log.debug("==========================================")
- log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+ log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
s3_objects_total, rados_objects_total, total_space/MB)
log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
assert(obj_size)
calc_expected_stats(dedup_stats, obj_size, num_copies, config)
total_space += (obj_size * num_copies)
- ret=calc_dedupable_space(obj_size, config)
- dedupable_space=ret[0]
- dup_head_size=ret[1]
+ dedupable_space=calc_dedupable_space(obj_size, config)
duplicated_space += ((num_copies-1) * dedupable_space)
rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config)
rados_objects_total += (rados_obj_count * num_copies)
proc_list[idx].join()
log.debug("==========================================")
- log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+ log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
s3_objects_total, rados_objects_total, total_space/MB)
log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
#-------------------------------------------------------------------------------
def verify_objects(bucket_name, files, conn, expected_results, config):
- tempfile = OUT_DIR + "temp"
+ tmpfile = OUT_DIR + "temp"
for f in files:
filename=f[0]
obj_size=f[1]
num_copies=f[2]
log.debug("comparing file=%s, size=%d, copies=%d", filename, obj_size, num_copies)
for i in range(0, num_copies):
+ filecmp.clear_cache()
key = gen_object_name(filename, i)
- #log.debug("download_file(%s) with crc32", key)
- conn.download_file(bucket_name, key, tempfile, Config=config, ExtraArgs={'ChecksumMode': 'crc32'})
- #conn.download_file(bucket_name, key, tempfile, Config=config)
- result = bash(['cmp', tempfile, OUT_DIR + filename])
- assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile)
- os.remove(tempfile)
+ conn.download_file(bucket_name, key, tmpfile, Config=config)
+ equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
+ assert equal ,"Files %s and %s differ!!" % (key, tmpfile)
+ os.remove(tmpfile)
+ log.debug("verify_objects: finished reading all objects")
assert expected_results == count_object_parts_in_all_buckets(True)
log.debug("verify_objects::completed successfully!!")
-
#-------------------------------------------------------------------------------
def verify_objects_multi(files, conns, bucket_names, expected_results, config):
max_tenants=len(conns)
- tempfile = OUT_DIR + "temp"
+ tmpfile = OUT_DIR + "temp"
for f in files:
filename=f[0]
obj_size=f[1]
num_copies=f[2]
log.debug("comparing file=%s, size=%d, copies=%d", filename, obj_size, num_copies)
for i in range(0, num_copies):
+ filecmp.clear_cache()
key = gen_object_name(filename, i)
log.debug("comparing object %s with file %s", key, filename)
ten_id = i % max_tenants
- conns[ten_id].download_file(bucket_names[ten_id], key, tempfile, Config=config)
- result = bash(['cmp', tempfile, OUT_DIR + filename])
- assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile)
- os.remove(tempfile)
+ conns[ten_id].download_file(bucket_names[ten_id], key, tmpfile, Config=config)
+ equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
+ assert equal ,"Files %s and %s differ!!" % (key, tmpfile)
+ os.remove(tmpfile)
assert expected_results == count_object_parts_in_all_buckets(True)
log.debug("verify_objects::completed successfully!!")
#-------------------------------------------------------------------------------
def thread_verify(thread_id, num_threads, files, conn, bucket, config):
- tempfile = OUT_DIR + "temp" + str(thread_id)
+ tmpfile = OUT_DIR + "temp" + str(thread_id)
count = 0
for f in files:
filename=f[0]
if thread_id == target_thread:
key = gen_object_name(filename, i)
log.debug("comparing object %s with file %s", key, filename)
- conn.download_file(bucket, key, tempfile, Config=config)
- result = bash(['cmp', tempfile, OUT_DIR + filename])
- assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile)
- os.remove(tempfile)
+ conn.download_file(bucket, key, tmpfile, Config=config)
+ equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
+ assert equal ,"Files %s and %s differ!!" % (key, tmpfile)
+ os.remove(tmpfile)
#-------------------------------------------------------------------------------
dedup_stats.total_processed_objects = 0
dedup_stats.set_shared_manifest_src = 0
dedup_stats.deduped_obj = 0
- dedup_stats.dup_head_size = 0
dedup_stats.deduped_obj_bytes = 0
dedup_stats.skip_shared_manifest = 0
dedup_stats.skip_src_record = 0
dedup_stats.dedup_bytes_estimate = main['Dedup Bytes Estimate']
potential = md5_stats['Potential Dedup']
- dedup_stats.dup_head_size_estimate = potential['Duplicated Head Bytes Estimate']
- dedup_stats.dup_head_size = potential['Duplicated Head Bytes']
dedup_stats.potential_singleton_obj = potential['Singleton Obj (64KB-4MB)']
dedup_stats.potential_unique_obj = potential['Unique Obj (64KB-4MB)']
dedup_stats.potential_duplicate_obj = potential['Duplicate Obj (64KB-4MB)']
ret = upload_objects(bucket_name, files, indices, conn, config)
expected_results = ret[0]
dedup_stats = ret[1]
-
+ log.info("%d S3 objects were uploaded", ret[2])
exec_dedup(dedup_stats, dry_run)
if dry_run == False:
log.debug("Verify all objects")
conn.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id)
+#-------------------------------------------------------------------------------
+def print_bucket_versioning(conn, bucket_name):
+ resp = conn.get_bucket_versioning(Bucket=bucket_name)
+ status = resp.get('Status')
+ mfadelete = resp.get('MFADelete')
+
+ if status is None:
+ log.info("%s: versioning not configured", bucket_name)
+ else:
+ log.info("%s: Status=%s, MFADelete=%s", bucket_name, status, mfadelete)
+
#-------------------------------------------------------------------------------
# generate @num_files objects with @ver_count versions each of @obj_size
# verify that we got the correct number of rados-objects
return
prepare_test()
- bucket_name = "bucket1"
+ bucket_name = "bucketwithversions"
files=[]
op_log=[]
num_files=43
finally:
# cleanup must be executed even after a failure
if success == False:
+ # otherwise, objects been removed by verify_objects_with_version()
delete_all_versions(conn, bucket_name, dry_run=False)
- # otherwise, objects been removed by verify_objects_with_version()
+ conn.put_bucket_versioning(Bucket=bucket_name,
+ VersioningConfiguration={"Status": "Suspended"})
+ print_bucket_versioning(conn, bucket_name)
cleanup(bucket_name, conn)
#==============================================================================
s3_objects_total = ret[2]
dedup_stats2 = dedup_stats
- dedup_stats2.dup_head_size = 0
dedup_stats2.skip_shared_manifest=dedup_stats.deduped_obj
dedup_stats2.skip_src_record=dedup_stats.set_shared_manifest_src
dedup_stats2.set_shared_manifest_src=0
s3_objects_total = ret[2]
dedup_stats2 = dedup_stats
- dedup_stats2.dup_head_size = 0
dedup_stats2.skip_shared_manifest=dedup_stats.deduped_obj
dedup_stats2.skip_src_record=dedup_stats.set_shared_manifest_src
dedup_stats2.set_shared_manifest_src=0
stats_combined=ret[1]
stats_combined.skip_shared_manifest = stats_base.deduped_obj
stats_combined.skip_src_record -= stats_base.skip_src_record
- stats_combined.dup_head_size -= stats_base.dup_head_size
stats_combined.skip_src_record += stats_base.set_shared_manifest_src
stats_combined.set_shared_manifest_src -= stats_base.set_shared_manifest_src
expected_results = ret[0]
stats_combined = ret[1]
stats_combined.skip_shared_manifest = stats_base.deduped_obj
- stats_combined.dup_head_size -= stats_base.dup_head_size
stats_combined.skip_src_record -= stats_base.skip_src_record
stats_combined.skip_src_record += stats_base.set_shared_manifest_src
expected_results = ret[0]
stats_combined = ret[1]
stats_combined.skip_shared_manifest = stats_base.deduped_obj
- stats_combined.dup_head_size -= stats_base.dup_head_size
stats_combined.skip_src_record -= stats_base.skip_src_record
stats_combined.skip_src_record += stats_base.set_shared_manifest_src
stats_combined = ret[1]
stats_combined.skip_shared_manifest = stats_base.deduped_obj
stats_combined.skip_src_record -= stats_base.skip_src_record
- stats_combined.dup_head_size -= stats_base.dup_head_size
stats_combined.skip_src_record += stats_base.set_shared_manifest_src
stats_combined.set_shared_manifest_src -= stats_base.set_shared_manifest_src
# run dedup again
dedup_stats.set_shared_manifest_src=0
dedup_stats.deduped_obj=0
- dedup_stats.dup_head_size=0
dedup_stats.deduped_obj_bytes=0
dedup_stats.skip_src_record=src_record
dedup_stats.skip_shared_manifest=shared_manifest
# run dedup again
dedup_stats.set_shared_manifest_src=0
dedup_stats.deduped_obj=0
- dedup_stats.dup_head_size=0
dedup_stats.deduped_obj_bytes=0
dedup_stats.skip_src_record=src_record
dedup_stats.skip_shared_manifest=shared_manifest
stats_combined.skip_shared_manifest = stats_base.deduped_obj
stats_combined.skip_src_record = src_record
stats_combined.set_shared_manifest_src -= stats_base.set_shared_manifest_src
- stats_combined.dup_head_size -= stats_base.dup_head_size
stats_combined.deduped_obj -= stats_base.deduped_obj
stats_combined.deduped_obj_bytes -= stats_base.deduped_obj_bytes
ret=simple_dedup_with_tenants(files, conns, bucket_names, config)
stats_base=ret[1]
- for idx in range(0, 9):
+ for idx in range(0, 7):
ret = inc_step_with_tenants(stats_base, files, conns, bucket_names, config)
files=ret[0]
stats_last=ret[1]
stats_base.set_shared_manifest_src += stats_last.set_shared_manifest_src
- stats_base.dup_head_size += stats_last.dup_head_size
stats_base.deduped_obj += stats_last.deduped_obj
stats_base.deduped_obj_bytes += stats_last.deduped_obj_bytes
stats_base.set_hash += stats_last.set_hash
num_files=8
min_size=MULTIPART_SIZE
- # create files in range [MULTIPART_SIZE, 4*MULTIPART_SIZE] aligned on RADOS_OBJ_SIZE
- # create files in range [MULTIPART_SIZE, 1GB] aligned on RADOS_OBJ_SIZE
- gen_files_in_range(files, num_files, min_size, 1024*MB)
+ #gen_files_in_range(files, num_files, min_size, 1024*MB)
+ gen_files_in_range(files, num_files, min_size, 128*MB)
# add files in range [MULTIPART_SIZE, 4*MULTIPART_SIZE] aligned on MULTIPART_SIZE
gen_files_in_range(files, num_files, min_size, min_size*8, MULTIPART_SIZE)
def test_cleanup():
close_all_connections()
+#---------------------------------------------------------------------------
+def proc_upload_identical(proc_id, num_procs, filename, conn, bucket_name, num_copies, config):
+ log.debug("Proc_ID=%d/%d::num_copies=%d", proc_id, num_procs, num_copies)
+ for idx in range(num_copies):
+ log.debug("upload_objects::%s::idx=%d", filename, idx);
+ target_proc = (idx % num_procs)
+ if (proc_id == target_proc):
+ key = gen_object_name(filename, idx)
+ conn.upload_file(OUT_DIR+filename, bucket_name, key, Config=config)
+ #log.info("[%d]upload_objects::<%s/%s>", proc_id, bucket_name, key)
+
+#---------------------------------------------------------------------------
+def proc_parallel_upload_identical(files, conns, bucket_name, config):
+ num_procs=len(conns)
+ proc_list=list()
+ f = files[0]
+ filename=f[0]
+ num_copies=f[2]
+ for idx in range(num_procs):
+ log.debug("Create proc_id=%d", idx)
+ p=Process(target=proc_upload_identical,
+ args=(idx, num_procs, filename, conns[idx], bucket_name, num_copies, config))
+ proc_list.append(p)
+ proc_list[idx].start()
+
+
+ # wait for all worker proc to join
+ for idx in range(num_procs):
+ proc_list[idx].join()
+
+#---------------------------------------------------------------------------
+def calc_identical_copies_stats(files, conns, bucket_name, config):
+ f = files[0]
+ obj_size=f[1]
+ filename=f[0]
+ copies_count=f[2]
+ dedup_stats = Dedup_Stats()
+ s3_objects_total=copies_count
+ calc_expected_stats(dedup_stats, obj_size, copies_count, config)
+ dups_count = min(copies_count, MAX_COPIES_PER_OBJ)
+ total_space = (obj_size * copies_count)
+ dedupable_space=calc_dedupable_space(obj_size, config)
+ duplicated_space = (dups_count * dedupable_space)
+ rados_obj_count=calc_rados_obj_count(copies_count, obj_size, config)
+ rados_objects_total = (rados_obj_count * copies_count)
+ duplicated_tail_objs = (dups_count * (rados_obj_count-1))
+ log.info("upload_objects::%s::size=%d, copies_count=%d",
+ filename, obj_size, copies_count);
+
+ s3_object_count = count_objects_in_bucket(bucket_name, conns[0])
+ assert rados_objects_total == count_object_parts_in_all_buckets()
+ assert (s3_object_count == s3_objects_total)
+ expected_rados_obj_count_post_dedup=(rados_objects_total-duplicated_tail_objs)
+ return (expected_rados_obj_count_post_dedup, dedup_stats, s3_objects_total)
+
+#-------------------------------------------------------------------------------
+def __test_dedup_identical_copies(files, config, dry_run, verify, force_clean=False):
+ num_threads=32
+ bucket_name = "bucket1"
+ conns=get_connections(num_threads)
+ bucket_names=[bucket_name] * num_threads
+ try:
+ if dry_run:
+ conns[0].create_bucket(Bucket=bucket_name)
+ start = time.time_ns()
+ proc_parallel_upload_identical(files, conns, bucket_name, config)
+ upload_time_sec = (time.time_ns() - start) / (1000*1000*1000)
+ log.info("upload time = %d sec", upload_time_sec)
+
+ ret=calc_identical_copies_stats(files, conns, bucket_name, config)
+ expected_results = ret[0]
+ dedup_stats = ret[1]
+
+ exec_dedup(dedup_stats, dry_run)
+ if verify:
+ log.info("Verify all objects")
+ start_time = time.time_ns()
+ threads_verify_objects(files, conns, bucket_names, expected_results, config)
+ end_time = time.time_ns()
+ log.info("Verify all objects time = %d(sec)",
+ (end_time - start_time)/1_000_000_000)
+ finally:
+ # cleanup must be executed even after a failure
+ if not dry_run or force_clean:
+ log.info("cleanup bucket")
+ cleanup(bucket_name, conns[0])
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_identical_copies_1():
+ num_files=1
+ copies_count=64*1024+1
+ size=64*KB
+ config=default_config
+ prepare_test()
+ files=[]
+ gen_files_fixed_copies(files, num_files, size, copies_count)
+
+ # start with a dry_run
+ dry_run=True
+ verify=False
+ log.info("test_dedup_identical_copies:dry test")
+ __test_dedup_identical_copies(files, config, dry_run, verify)
+
+ # and then perform a full dedup
+ dry_run=False
+ # no need to read-verify data since min size for single-part dedup is 4MB
+ verify=False
+ force=False
+ log.info("test_dedup_identical_copies:full test")
+ __test_dedup_identical_copies(files, config, dry_run, verify, force)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_identical_copies_multipart():
+ num_files=1
+ copies_count=64*1024+1
+ size=16*KB
+ prepare_test()
+ files=[]
+ gen_files_fixed_copies(files, num_files, size, copies_count)
+ config=TransferConfig(multipart_threshold=size, multipart_chunksize=size)
+ # start with a dry_run
+ dry_run=True
+ verify=False
+ log.info("test_dedup_identical_copies_multipart:dry test")
+ __test_dedup_identical_copies(files, config, dry_run, verify)
+
+ # and then perform a full dedup
+ dry_run=False
+ verify=False
+ force_clean=True
+ log.info("test_dedup_identical_copies_multipart:full test")
+ __test_dedup_identical_copies(files, config, dry_run, verify, force_clean)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_identical_copies_multipart_small():
+ num_files=1
+ copies_count=1024
+ size=16*KB
+ prepare_test()
+ files=[]
+ gen_files_fixed_copies(files, num_files, size, copies_count)
+ config=TransferConfig(multipart_threshold=size, multipart_chunksize=size)
+ # start with a dry_run
+ dry_run=True
+ verify=False
+ log.info("test_dedup_identical_copies_multipart:dry test")
+ __test_dedup_identical_copies(files, config, dry_run, verify)
+
+ # and then perform a full dedup
+ dry_run=False
+ verify=True
+ force_clean=True
+ log.info("test_dedup_identical_copies_multipart:full test")
+ __test_dedup_identical_copies(files, config, dry_run, verify, force_clean)
+