using storage_class_idx_t = uint8_t;
//---------------------------------------------------------------------------
- [[maybe_unused]] static int print_manifest(const DoutPrefixProvider *dpp,
+ [[maybe_unused]] static int print_manifest(CephContext *cct,
+ const DoutPrefixProvider *dpp,
RGWRados *rados,
const RGWObjManifest &manifest)
{
+ bool debug = cct->_conf->subsys.should_gather<ceph_subsys_rgw_dedup, 20>();
+ if (!debug) {
+ return 0;
+ }
+
unsigned idx = 0;
for (auto p = manifest.obj_begin(dpp); p != manifest.obj_end(dpp); ++p, ++idx) {
rgw_raw_obj raw_obj = p.get_location().get_raw_obj(rados);
{
d_head_object_size = cct->_conf->rgw_max_chunk_size;
d_min_obj_size_for_dedup = cct->_conf->rgw_dedup_min_obj_size_for_dedup;
- d_max_obj_size_for_split = cct->_conf->rgw_dedup_max_obj_size_for_split;
+ // limit split head to objects without tail
+ d_max_obj_size_for_split = d_head_object_size;
ldpp_dout(dpp, 10) << "Config Vals::d_head_object_size=" << d_head_object_size
<< "::d_min_obj_size_for_dedup=" << d_min_obj_size_for_dedup
<< "::d_max_obj_size_for_split=" << d_max_obj_size_for_split
const std::string &obj_name,
const std::string &instance,
const rgw_bucket &rb,
- librados::IoCtx *p_ioctx,
- std::string *p_oid)
+ librados::IoCtx *p_ioctx /*OUT*/,
+ std::string *p_oid /*OUT*/)
{
unique_ptr<rgw::sal::Bucket> bucket;
{
rgw::sal::Driver* driver,
rgw::sal::RadosStore* store,
const disk_record_t *p_rec,
- librados::IoCtx *p_ioctx,
- std::string *p_oid)
+ librados::IoCtx *p_ioctx /*OUT*/,
+ std::string *p_oid /*OUT*/)
{
rgw_bucket b{p_rec->tenant_name, p_rec->bucket_name, p_rec->bucket_id};
return get_ioctx_internal(dpp, driver, store, p_rec->obj_name, p_rec->instance,
//---------------------------------------------------------------------------
inline bool Background::should_split_head(uint64_t head_size, uint64_t obj_size)
{
- // max_obj_size_for_split of zero means don't split!
- return (head_size > 0 &&
- d_max_obj_size_for_split &&
- obj_size <= d_max_obj_size_for_split);
+ // Don't split RGW objects with existing tail-objects
+ return (head_size > 0 && head_size == obj_size);
}
//---------------------------------------------------------------------------
const string &ref_tag = p_tgt_rec->ref_tag;
ldpp_dout(dpp, 20) << __func__ << "::ref_tag=" << ref_tag << dendl;
+ // src_manifest was updated in split-head case to include the new_tail
ret = inc_ref_count_by_manifest(ref_tag, src_oid, src_manifest);
if (unlikely(ret != 0)) {
if (p_src_rec->s.flags.is_split_head()) {
bufferlist new_manifest_bl;
adjust_target_manifest(src_manifest, tgt_manifest, new_manifest_bl);
tgt_op.setxattr(RGW_ATTR_MANIFEST, new_manifest_bl);
- //tgt_op.setxattr(RGW_ATTR_MANIFEST, p_src_rec->manifest_bl);
if (p_tgt_rec->s.flags.hash_calculated()) {
tgt_op.setxattr(RGW_ATTR_BLAKE3, tgt_hash_bl);
ldpp_dout(dpp, 20) << __func__ <<"::Set TGT Strong Hash in CLS"<< dendl;
ondisk_byte_size);
p_stats->shared_manifest_dedup_bytes += dedupable_objects_bytes;
ldpp_dout(dpp, 20) << __func__ << "::(1)skipped shared_manifest, SRC::block_id="
- << src_val.block_idx << "::rec_id=" << (int)src_val.rec_id << dendl;
+ << src_val.get_src_block_id()
+ << "::rec_id=" << (int)src_val.get_src_rec_id() << dendl;
return 0;
}
librados::ObjectWriteOperation op;
etag_to_bufferlist(p_rec->s.md5_high, p_rec->s.md5_low, p_rec->s.num_parts,
&etag_bl);
- init_cmp_pairs(dpp, p_rec, etag_bl, hash_bl /*OUT PARAM*/, &op);
+ init_cmp_pairs(dpp, p_rec, etag_bl, hash_bl /*OUT*/, &op);
op.setxattr(RGW_ATTR_BLAKE3, hash_bl);
std::string oid;
//---------------------------------------------------------------------------
static int read_hash_and_manifest(const DoutPrefixProvider *const dpp,
rgw::sal::Driver *driver,
- RGWRados *rados,
+ rgw::sal::RadosStore *store,
disk_record_t *p_rec)
{
librados::IoCtx ioctx;
std::string oid;
- int ret = get_ioctx(dpp, driver, rados, p_rec, &ioctx, &oid);
+ int ret = get_ioctx(dpp, driver, store, p_rec, &ioctx, &oid);
if (unlikely(ret != 0)) {
ldpp_dout(dpp, 5) << __func__ << "::ERR: failed get_ioctx()" << dendl;
return ret;
}
//---------------------------------------------------------------------------
- static void set_explicit_manifest(RGWObjManifest *p_manifest,
- std::map<uint64_t, RGWObjManifestPart> &objs_map)
+ static void build_and_set_explicit_manifest(const DoutPrefixProvider *dpp,
+ const rgw_bucket *p_bucket,
+ const std::string &tail_name,
+ RGWObjManifest *p_manifest)
{
uint64_t obj_size = p_manifest->get_obj_size();
+ ceph_assert(obj_size == p_manifest->get_head_size());
+
+ const rgw_obj &head_obj = p_manifest->get_obj();
+ const rgw_obj_key &head_key = head_obj.key;
+ rgw_obj_key tail_key(tail_name, head_key.instance, head_key.ns);
+ rgw_obj tail_obj(*p_bucket, tail_key);
+
+ RGWObjManifestPart tail_part;
+ tail_part.loc = tail_obj;
+ tail_part.loc_ofs = 0;
+ tail_part.size = obj_size;
+
+ std::map<uint64_t, RGWObjManifestPart> objs_map;
+ objs_map[0] = tail_part;
+
p_manifest->set_head_size(0);
p_manifest->set_max_head_size(0);
p_manifest->set_prefix("");
p_manifest->set_explicit(obj_size, objs_map);
}
- //---------------------------------------------------------------------------
- // This code is based on RGWObjManifest::convert_to_explicit()
- static void build_explicit_objs_map(const DoutPrefixProvider *dpp,
- RGWRados *rados,
- const RGWObjManifest &manifest,
- const rgw_bucket *p_bucket,
- std::map<uint64_t, RGWObjManifestPart> *p_objs_map,
- const std::string &tail_name,
- md5_stats_t *p_stats)
- {
- bool manifest_raw_obj_logged = false;
- unsigned idx = 0;
- auto p = manifest.obj_begin(dpp);
- while (p != manifest.obj_end(dpp)) {
- const uint64_t offset = p.get_stripe_ofs();
- const rgw_obj_select& os = p.get_location();
- ldpp_dout(dpp, 20) << __func__ << "::[" << idx <<"]OBJ: "
- << os.get_raw_obj(rados).oid << "::ofs=" << p.get_ofs()
- << "::strp_offset=" << offset << dendl;
-
- RGWObjManifestPart& part = (*p_objs_map)[offset];
- part.loc_ofs = 0;
-
- if (offset == 0) {
- ldpp_dout(dpp, 20) << __func__ << "::[" << idx <<"] HEAD OBJ: "
- << os.get_raw_obj(rados).oid << dendl;
- const rgw_obj &head_obj = manifest.get_obj();
- const rgw_obj_key &head_key = head_obj.key;
- // TBD: Can we have different instance/ns values for head/tail ??
- // Should we take the instance/ns from the head or tail?
- // Maybe should refuse objects with different instance/ns on head/tail ?
- rgw_obj_key tail_key(tail_name, head_key.instance, head_key.ns);
- rgw_obj tail_obj(*p_bucket, tail_key);
- part.loc = tail_obj;
- }
- else {
- // RGWObjManifest::convert_to_explicit() is assuming raw_obj, but looking
- // at the RGWObjManifest::obj_iterator code it is clear the obj is not raw.
- // If it happens to be raw we still handle it correctly (and inc stat-count)
- std::optional<rgw_obj> obj_opt = os.get_head_obj();
- if (obj_opt.has_value()) {
- part.loc = obj_opt.value();
- }
- else {
- // report raw object in manifest only once
- if (!manifest_raw_obj_logged) {
- manifest_raw_obj_logged = true;
- ldpp_dout(dpp, 10) << __func__ << "::WARN: obj is_raw" << dendl;
- p_stats->manifest_raw_obj++;
- }
- const rgw_raw_obj& raw = os.get_raw_obj(rados);
- RGWSI_Tier_RADOS::raw_obj_to_obj(*p_bucket, raw, &part.loc);
- }
- }
-
- ++p;
- uint64_t next_offset = p.get_stripe_ofs();
- part.size = next_offset - offset;
- idx++;
- } // while (p != manifest.obj_end())
- }
-
//---------------------------------------------------------------------------
int Background::split_head_object(disk_record_t *p_src_rec, // IN-OUT PARAM
RGWObjManifest &src_manifest, // IN/OUT PARAM
bufferlist bl;
std::string head_oid;
librados::IoCtx ioctx;
- int ret = get_ioctx(dpp, driver, rados, p_src_rec, &ioctx, &head_oid);
+ int ret = get_ioctx(dpp, driver, store, p_src_rec, &ioctx, &head_oid);
if (unlikely(ret != 0)) {
ldpp_dout(dpp, 1) << __func__ << "::ERR: failed get_ioctx()" << dendl;
return ret;
}
}
- bool exclusive = true; // block overwrite
std::string tail_name = generate_split_head_tail_name(src_manifest);
const rgw_bucket_placement &tail_placement = src_manifest.get_tail_placement();
// Tail placement_rule was fixed before committed to SLAB, if looks bad -> abort
return ret;
}
+ bool exclusive = true; // block overwrite
ret = tail_ioctx.create(tail_oid, exclusive);
if (ret == 0) {
ldpp_dout(dpp, 20) << __func__ << "::successfully created: " << tail_oid << dendl;
ldpp_dout(dpp, 1) << __func__ << "::ERROR: failed to write " << tail_oid
<< " with: " << cpp_strerror(-ret) << dendl;
// don't leave orphan object behind
- tail_ioctx.remove(tail_oid);
+ int ret_rmv = tail_ioctx.remove(tail_oid);
+ if (ret_rmv != 0) {
+ ldpp_dout(dpp, 1) << __func__ << "::ERROR: failed to remove " << tail_oid
+ << " with: " << cpp_strerror(-ret_rmv) << dendl;
+ }
return ret;
}
else {
<< ret << dendl;
}
- std::map<uint64_t, RGWObjManifestPart> objs_map;
- build_explicit_objs_map(dpp, rados, src_manifest, p_bucket, &objs_map,
- tail_name, p_stats);
- set_explicit_manifest(&src_manifest, objs_map);
+ build_and_set_explicit_manifest(dpp, p_bucket, tail_name, &src_manifest);
bufferlist manifest_bl;
encode(src_manifest, manifest_bl);
// read the manifest and strong hash from the head-object attributes
ldpp_dout(dpp, 20) << __func__ << "::Fetch SRC strong hash from head-object::"
<< p_src_rec->obj_name << dendl;
- if (unlikely(read_hash_and_manifest(dpp, driver, rados, p_src_rec) != 0)) {
+ if (unlikely(read_hash_and_manifest(dpp, driver, store, p_src_rec) != 0)) {
return false;
}
try {
}
//---------------------------------------------------------------------------
- static bool parse_manifests(const DoutPrefixProvider *dpp,
- const disk_record_t *p_src_rec,
- const disk_record_t *p_tgt_rec,
- RGWObjManifest *p_src_manifest,
- RGWObjManifest *p_tgt_manifest)
+ static int parse_manifests(const DoutPrefixProvider *dpp,
+ const disk_record_t *p_src_rec,
+ const disk_record_t *p_tgt_rec,
+ RGWObjManifest *p_src_manifest,
+ RGWObjManifest *p_tgt_manifest)
{
bool valid_src_manifest = false;
try {
const RGWObjManifest &tgt_manifest,
md5_stats_t *p_stats)
{
+ // The only case leading to this scenario is server-side-copy
+
+ // server-side-copy can only share tail-objects
+ // since no tail-objects exists -> no sharing could be possible
+ if (!tgt_manifest.has_tail()) {
+ return false;
+ }
+
+ // tail object with non-explicit manifest are simply prefix plus running count
+ if (!tgt_manifest.has_explicit_objs() && !src_manifest.has_explicit_objs()) {
+ return (tgt_manifest.get_prefix() == src_manifest.get_prefix());
+ }
+
// Build a vector with all tail-objects on the SRC and then iterate over
// the TGT tail-objects looking for a single tail-object in both manifets.
- // If found -> abort the dedup
- // The only case leading to this scenario is server-side-copy
- // It is probably enough to scan the first few tail-objects, but better safe...
+ // It is enough to scan the first few tail-objects
+
+ constexpr unsigned MAX_OBJ_TO_COMPARE = 4;
std::string src_oid = build_oid(p_src_rec->bucket_id, p_src_rec->obj_name);
std::string tgt_oid = build_oid(p_tgt_rec->bucket_id, p_tgt_rec->obj_name);
std::vector<std::string> vec;
else {
ldpp_dout(dpp, 20) << __func__ << "::[" << idx <<"] Skip HEAD OBJ: "
<< raw_obj.oid << dendl;
- continue;
+ }
+ if (idx >= MAX_OBJ_TO_COMPARE) {
+ break;
}
}
+
idx = 0;
for (auto p = tgt_manifest.obj_begin(dpp); p != tgt_manifest.obj_end(dpp); ++p, ++idx) {
rgw_raw_obj raw_obj = p.get_location().get_raw_obj(rados);
else {
ldpp_dout(dpp, 20) << __func__ << "::[" << idx <<"] Skip HEAD OBJ: "
<< raw_obj.oid << dendl;
- continue;
+ }
+ if (idx >= MAX_OBJ_TO_COMPARE) {
+ break;
}
}
return 0;
}
+ RGWObjManifest src_manifest, tgt_manifest;
ret = parse_manifests(dpp, p_src_rec, p_tgt_rec, &src_manifest, &tgt_manifest);
if (unlikely(ret != 0)) {
return 0;
MB=(1024*KB)
POTENTIAL_OBJ_SIZE=(64*KB)
DEDUP_MIN_OBJ_SIZE=(64*KB)
-SPLIT_HEAD_SIZE=(16*MB)
+SPLIT_HEAD_SIZE=(4*MB)
RADOS_OBJ_SIZE=(4*MB)
# The default multipart threshold size for S3cmd is 15 MB.
MULTIPART_SIZE=(15*MB)
listing=conn.list_objects_v2(**list_args)
if 'Contents' not in listing or len(listing['Contents'])== 0:
- return 0
+ return obj_count
obj_count += len(listing['Contents'])
return obj_count
-#-------------------------------------------------------------------------------
-def copy_obj(base_bucket_name, base_key, bucket_name, key):
- s3_prefix="s3://"
- src = s3_prefix + base_bucket_name + "/" + base_key
- dest = s3_prefix + bucket_name + "/" + key
- result = bash(['s3cmd', 'cp', src, dest])
- assert result[1] == 0
-
#-------------------------------------------------------------------------------
def count_object_parts_in_all_buckets(verbose=False, expected_size=0):
result = rados(['lspools'])
if (rados_count > 1000):
### we can only do about 10 stat call per-second!!
### TBD: add obj_size to ls output to allow more efficient size check
- log.info(">>> rados obj_count(%d) is too high -> skip stat check\n",
+ log.info(">>> rados obj_count(%d) is too high -> skip stat check",
len(names))
expected_size = 0
for error in response['Errors']:
log.error("delete_objects::ERROR::Key=%s, Code=%s, Message=%s",
error['Key'], error['Code'], error['Message'])
-
+ assert(0)
else:
log.debug("All objects deleted successfully.")
listing=conn.list_objects_v2(**list_args)
if 'Contents' not in listing or len(listing['Contents'])== 0:
log.debug("Bucket '%s' is empty, skipping...", bucket_name)
- return
+ break
objects=[]
for obj in listing['Contents']:
log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
log.debug("Based on calculation we should have %.2f MiB total in pool", total_space/MB)
log.debug("Based on calculation we should have %.2f MiB duplicated space in pool", duplicated_space/MB)
- log.info("split_head_objs=%d, rados_objects_total=%d, duplicated_tail_objs=%d",
+ log.debug("split_head_objs=%d, rados_objects_total=%d, duplicated_tail_objs=%d",
split_head_objs, rados_objects_total, duplicated_tail_objs)
expected_rados_obj_count_post_dedup=(split_head_objs+rados_objects_total-duplicated_tail_objs)
log.debug("Post dedup expcted rados obj count = %d", expected_rados_obj_count_post_dedup)
log.debug("upload_objects::<%s/%s>", bucket_names[ten_id], key)
log.debug("==========================================")
- log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+ log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
s3_objects_total, rados_objects_total, total_space/MB)
log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
proc_list[idx].join()
log.debug("==========================================")
- log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+ log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
s3_objects_total, rados_objects_total, total_space/MB)
log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
key=obj['Key']
log.debug("check_if_any_obj_exists: key=%s", key)
if obj['Key'] in delete_set:
- log.info("key <%s> was found in bucket", key)
+ log.warning("Deleted key <%s> was found in bucket", key)
+ return True
if 'NextContinuationToken' in listing:
continuation_token = listing['NextContinuationToken']
else:
break
+ return False
#-------------------------------------------------------------------------------
def delete_objects_multi(conns, bucket_names, ten_id, object_keys):
verify=True
if verify:
log.debug("delete_dup_objects: verify delete_list_total")
- check_if_any_obj_exists(bucket_name, delete_list_total, conn)
+ assert(check_if_any_obj_exists(bucket_name, delete_list_total, conn)==False)
# must call garbage collection for predictable count
result = admin(['gc', 'process', '--include-all'])
for i in range(1, num_copies):
filecmp.clear_cache()
key = gen_object_name(filename, i)
- log.debug("comparing object %s with file %s", key, filename)
ten_id = i % max_tenants
+ log.debug("comparing object %s/%s with file %s", bucket_names[ten_id], key, filename)
conns[ten_id].download_file(bucket_names[ten_id], key, tmpfile,
Config=config)
equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
for f in files:
filename=f[0]
key = gen_object_name(filename, i)
- log.debug("comparing object %s with file %s", key, filename)
ten_id = i % max_tenants
+ log.debug("comparing object %s/%s with file %s", bucket_names[ten_id], key, filename)
conns[ten_id].download_file(bucket_names[ten_id], key, tmpfile,
Config=config)
equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
#-------------------------------------------------------------------------------
-def read_dedup_ratio(json):
+def read_dedup_ratio(jstats, field):
dedup_ratio=Dedup_Ratio()
+ json=jstats[field]
dedup_ratio.s3_bytes_before=json['s3_bytes_before']
dedup_ratio.s3_bytes_after=json['s3_bytes_after']
dedup_ratio.ratio=json['dedup_ratio']
- log.debug("Completed! ::ratio=%f", dedup_ratio.ratio)
+ log.debug("%s::before=%d, after=%d, ratio=%f", field,
+ dedup_ratio.s3_bytes_before, dedup_ratio.s3_bytes_after,
+ dedup_ratio.ratio)
return dedup_ratio
#-------------------------------------------------------------------------------
dedup_work_was_completed=jstats['completed']
if dedup_work_was_completed:
- dedup_ratio_estimate=read_dedup_ratio(jstats['dedup_ratio_estimate'])
- dedup_ratio_actual=read_dedup_ratio(jstats['dedup_ratio_actual'])
+ dedup_ratio_estimate=read_dedup_ratio(jstats, 'dedup_ratio_estimate')
+ dedup_ratio_actual=read_dedup_ratio(jstats, 'dedup_ratio_actual')
else:
log.debug("Uncompleted!")
if dry_run == False:
log.debug("Verify all objects")
verify_objects(bucket_name, files, conn, expected_results, config, run_cleanup_after)
+
+ return ret
finally:
if run_cleanup_after:
# cleanup must be executed even after a failure
cleanup(bucket_name, conn)
- return ret
-
-
#-------------------------------------------------------------------------------
def simple_dedup_with_tenants(files, conns, bucket_names, config, dry_run=False):
indices=[0] * len(files)
rados_obj_total = 0
duplicated_tail_objs = 0
+ split_head_objs = 0
for key, value in size_dict.items():
size = value
num_copies = num_copies_dict[key]
assert num_copies > 0
rados_obj_count = calc_rados_obj_count(num_copies, size, config)
rados_obj_total += (rados_obj_count * num_copies)
+ split_head_objs += calc_split_objs_count(size, num_copies, config)
duplicated_tail_objs += ((num_copies-1) * (rados_obj_count-1))
# versioned buckets hold an extra rados-obj per versioned S3-Obj
unique_s3_objs_count = len(unique_s3_objs)
rados_obj_total += unique_s3_objs_count
- rados_obj_count_post_dedup=(rados_obj_total-duplicated_tail_objs)
+ rados_obj_count_post_dedup=(split_head_objs+rados_obj_total-duplicated_tail_objs)
log.debug("calc::rados_obj_total=%d, rados_obj_count_post_dedup=%d",
rados_obj_total, rados_obj_count_post_dedup)
return(rados_obj_total, rados_obj_count_post_dedup, unique_s3_objs_count)
# finally make sure no rados-object was left behind after the last ver was removed
@pytest.mark.basic_test
def test_dedup_with_versions():
- #return
-
if full_dedup_is_disabled():
return
# enable versioning
conn.put_bucket_versioning(Bucket=bucket_name,
VersioningConfiguration={"Status": "Enabled"})
+ print_bucket_versioning(conn, bucket_name)
ver_count=7
first_time=True
for i in range(0, ver_count):
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_etag_corruption():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_md5_collisions():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_split_head_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
def loop_dedup_split_head():
prepare_test()
- #bucket_name = gen_bucket_name()
- bucket_name = "bucket1"
+ bucket_name = "splitheadbucket"
config=default_config
- max_copies_count=4
files=[]
+ max_copies_count=4
num_files=11 # [16KB-32MB]
base_size = 16*KB
log.debug("generate files: base size=%d KiB, max_size=%d KiB",
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
-def test_dedup_split_head():
- #return
-
+def test_dedup_split_head_simple():
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_copy():
- #return
dedup_copy_internal(False)
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_copy_multi_buckets():
- #return
dedup_copy_internal(True)
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
-def test_dedup_small():
- #return
+def test_copy_after_dedup():
+ if full_dedup_is_disabled():
+ return
+
+ prepare_test()
+ log.debug("test_copy_after_dedup: connect to AWS ...")
+ max_copies_count=3
+ num_files=8
+ files=[]
+ min_size=8*MB
+
+ # create files in range [8MB, 32MB] aligned on RADOS_OBJ_SIZE
+ gen_files_in_range(files, num_files, min_size, min_size*4)
+
+ # add file with excatly MULTIPART_SIZE
+ write_random(files, MULTIPART_SIZE, 2, 2)
+ bucket_cp= gen_bucket_name()
+ bucket_names=[]
+ try:
+ conn = get_single_connection()
+ conn.create_bucket(Bucket=bucket_cp)
+ bucket_names=create_buckets(conn, max_copies_count)
+ conns=[conn] * max_copies_count
+ dry_run=False
+ ret = simple_dedup_with_tenants(files, conns, bucket_names, default_config,
+ dry_run)
+ expected_results = ret[0]
+ dedup_stats = ret[1]
+ cp_head_count=0
+ for f in files:
+ filename=f[0]
+ obj_size=f[1]
+ num_copies=f[2]
+ for i in range(0, num_copies):
+ key = gen_object_name(filename, i)
+ key_cp = key + "_cp"
+ bucket_name = bucket_names[i]
+ base_obj = {'Bucket': bucket_name, 'Key': key}
+ log.debug("copy_object({%s, %s} -> %s/%s", bucket_name, key, bucket_cp, key_cp);
+ conn.copy_object(CopySource=base_obj, Bucket=bucket_cp, Key=key_cp)
+ cp_head_count += 1
+
+ assert (expected_results + cp_head_count) == count_object_parts_in_all_buckets(False, 0)
+ finally:
+ # cleanup must be executed even after a failure
+ delete_bucket_with_all_objects(bucket_cp, conn)
+ cleanup_all_buckets(bucket_names, conns)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_small():
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_small_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
# should be made to the system
@pytest.mark.basic_test
def test_dedup_inc_0_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
# should be made to the system
@pytest.mark.basic_test
def test_dedup_inc_0():
- #return
-
if full_dedup_is_disabled():
return
# 3) Run another dedup
@pytest.mark.basic_test
def test_dedup_inc_1_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
# 3) Run another dedup
@pytest.mark.basic_test
def test_dedup_inc_1():
- #return
-
if full_dedup_is_disabled():
return
# 4) Run another dedup
@pytest.mark.basic_test
def test_dedup_inc_2_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
# 4) Run another dedup
@pytest.mark.basic_test
def test_dedup_inc_2():
- #return
-
if full_dedup_is_disabled():
return
# 3) Run another dedup
@pytest.mark.basic_test
def test_dedup_inc_with_remove_multi_tenants():
- #return
if full_dedup_is_disabled():
return
# 3) Run another dedup
@pytest.mark.basic_test
def test_dedup_inc_with_remove():
- #return
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_multipart_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_multipart():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_basic_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_basic():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_small_multipart_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_small_multipart():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_large_scale_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_large_scale():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_empty_bucket():
- #return
-
if full_dedup_is_disabled():
return
@pytest.mark.basic_test
#@pytest.mark.inc_test
def test_dedup_inc_loop_with_tenants():
- #return
-
if full_dedup_is_disabled():
return
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_small_with_tenants():
- #return
-
log.debug("test_dedup_dry_small_with_tenants: connect to AWS ...")
prepare_test()
max_copies_count=3
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_multipart():
- #return
-
prepare_test()
bucket_name = gen_bucket_name()
log.debug("test_dedup_dry_multipart: connect to AWS ...")
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_basic():
- #return
-
prepare_test()
bucket_name = gen_bucket_name()
log.debug("test_dedup_dry_basic: connect to AWS ...")
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_small_multipart():
- #return
-
prepare_test()
log.debug("test_dedup_dry_small_multipart: connect to AWS ...")
config2 = TransferConfig(multipart_threshold=4*KB, multipart_chunksize=1*MB)
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_small():
- #return
-
bucket_name = gen_bucket_name()
log.debug("test_dedup_dry_small: connect to AWS ...")
conn=get_single_connection()
# 6) verify that dedup ratio is reported correctly
@pytest.mark.basic_test
def test_dedup_dry_small_large_mix():
- #return
-
dry_run=True
log.debug("test_dedup_dry_small_large_mix: connect to AWS ...")
prepare_test()
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_basic_with_tenants():
- #return
-
prepare_test()
max_copies_count=3
num_files=23
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_multipart_with_tenants():
- #return
-
prepare_test()
log.debug("test_dedup_dry_multipart_with_tenants: connect to AWS ...")
max_copies_count=3
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_small_multipart_with_tenants():
- #return
-
prepare_test()
max_copies_count=4
num_files=10
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_large_scale_with_tenants():
- #return
-
prepare_test()
max_copies_count=3
num_threads=64
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_dry_large_scale():
- #return
-
prepare_test()
bucket_name = gen_bucket_name()
max_copies_count=2
#-------------------------------------------------------------------------------
def __test_dedup_identical_copies(files, config, dry_run, verify, force_clean=False):
+ finished=False
num_threads=32
bucket_name = "bucket1"
conns=get_connections(num_threads)
end_time = time.time_ns()
log.info("Verify all objects time = %d(sec)",
(end_time - start_time)/1_000_000_000)
+ finished=True
finally:
# cleanup must be executed even after a failure
- if not dry_run or force_clean:
+ if not dry_run or force_clean or not finished:
log.info("cleanup bucket")
cleanup(bucket_name, conns[0])
@pytest.mark.basic_test
def test_dedup_identical_copies_1():
num_files=1
- copies_count=64*1024+1
+ copies_count=1024
size=64*KB
config=default_config
prepare_test()
log.info("test_dedup_identical_copies:full test")
__test_dedup_identical_copies(files, config, dry_run, verify, force)
-#-------------------------------------------------------------------------------
-@pytest.mark.basic_test
-def test_dedup_identical_copies_multipart():
- num_files=1
- copies_count=64*1024+1
- size=16*KB
- prepare_test()
- files=[]
- gen_files_fixed_copies(files, num_files, size, copies_count)
- config=TransferConfig(multipart_threshold=size, multipart_chunksize=size)
- # start with a dry_run
- dry_run=True
- verify=False
- log.info("test_dedup_identical_copies_multipart:dry test")
- __test_dedup_identical_copies(files, config, dry_run, verify)
-
- # and then perform a full dedup
- dry_run=False
- verify=False
- force_clean=True
- log.info("test_dedup_identical_copies_multipart:full test")
- __test_dedup_identical_copies(files, config, dry_run, verify, force_clean)
-
#-------------------------------------------------------------------------------
@pytest.mark.basic_test
def test_dedup_identical_copies_multipart_small():
files=[]
gen_files_fixed_copies(files, num_files, size, copies_count)
config=TransferConfig(multipart_threshold=size, multipart_chunksize=size)
+
# start with a dry_run
dry_run=True
verify=False
log.info("test_dedup_identical_copies_multipart:full test")
__test_dedup_identical_copies(files, config, dry_run, verify, force_clean)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_copy_single_obj():
+ return
+ conn=get_single_connection()
+ base_obj = {'Bucket': "bucket2", 'Key': "rados2"}
+ conn.copy_object(CopySource=base_obj, Bucket="bucket3", Key="rados3")
+