const rgw::sal::Bucket *p_bucket,
const parsed_etag_t *p_parsed_etag,
const std::string &obj_name,
+ const std::string &instance,
uint64_t obj_size,
const std::string &storage_class)
{
- disk_record_t rec(p_bucket, obj_name, p_parsed_etag, obj_size, storage_class);
+ disk_record_t rec(p_bucket, obj_name, p_parsed_etag, instance, obj_size, storage_class);
// First pass using only ETAG and size taken from bucket-index
rec.s.flags.set_fastlane();
//---------------------------------------------------------------------------
static int get_ioctx(const DoutPrefixProvider* const dpp,
rgw::sal::Driver* driver,
- RGWRados* rados,
+ rgw::sal::RadosStore* store,
const disk_record_t *p_rec,
librados::IoCtx *p_ioctx,
- std::string *oid)
+ std::string *p_oid)
{
unique_ptr<rgw::sal::Bucket> bucket;
{
}
}
- build_oid(p_rec->bucket_id, p_rec->obj_name, oid);
- //ldpp_dout(dpp, 0) << __func__ << "::OID=" << oid << " || bucket_id=" << bucket_id << dendl;
- rgw_pool data_pool;
- rgw_obj obj{bucket->get_key(), *oid};
- if (!rados->get_obj_data_pool(bucket->get_placement_rule(), obj, &data_pool)) {
- ldpp_dout(dpp, 1) << __func__ << "::failed to get data pool for bucket "
- << bucket->get_name() << dendl;
- return -EIO;
- }
- int ret = rgw_init_ioctx(dpp, rados->get_rados_handle(), data_pool, *p_ioctx);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << __func__ << "::ERR: failed to get ioctx from data pool:"
- << data_pool.to_str() << dendl;
- return -EIO;
- }
-
- return 0;
+ string dummy_locator;
+ const rgw_obj_index_key key(p_rec->obj_name, p_rec->instance);
+ rgw_obj obj(bucket->get_key(), key);
+ get_obj_bucket_and_oid_loc(obj, *p_oid, dummy_locator);
+ RGWBucketInfo& bucket_info = bucket->get_info();
+ return store->get_obj_head_ioctx(dpp, bucket_info, obj, p_ioctx);
}
//---------------------------------------------------------------------------
std::string src_oid, tgt_oid;
librados::IoCtx src_ioctx, tgt_ioctx;
- int ret1 = get_ioctx(dpp, driver, rados, p_src_rec, &src_ioctx, &src_oid);
- int ret2 = get_ioctx(dpp, driver, rados, p_tgt_rec, &tgt_ioctx, &tgt_oid);
+ int ret1 = get_ioctx(dpp, driver, store, p_src_rec, &src_ioctx, &src_oid);
+ int ret2 = get_ioctx(dpp, driver, store, p_tgt_rec, &tgt_ioctx, &tgt_oid);
if (unlikely(ret1 != 0 || ret2 != 0)) {
ldpp_dout(dpp, 1) << __func__ << "::ERR: failed get_ioctx()" << dendl;
return (ret1 ? ret1 : ret2);
<< p_rec->obj_name << ")" << dendl;
return 0;
}
-
+ p_obj->set_instance(p_rec->instance);
d_ctl.metadata_access_throttle.acquire();
ret = p_obj->get_obj_attrs(null_yield, dpp);
if (unlikely(ret < 0)) {
//---------------------------------------------------------------------------
static int write_blake3_object_attribute(const DoutPrefixProvider* const dpp,
rgw::sal::Driver* driver,
- RGWRados* rados,
+ rgw::sal::RadosStore *store,
const disk_record_t *p_rec)
{
bufferlist etag_bl;
std::string oid;
librados::IoCtx ioctx;
- int ret = get_ioctx(dpp, driver, rados, p_rec, &ioctx, &oid);
+ int ret = get_ioctx(dpp, driver, store, p_rec, &ioctx, &oid);
if (unlikely(ret != 0)) {
ldpp_dout(dpp, 5) << __func__ << "::ERR: failed get_ioctx()" << dendl;
return ret;
<< "/" << src_rec.obj_name << dendl;
// verify that SRC and TGT records don't refer to the same physical object
// This could happen in theory if we read the same objects twice
- if (src_rec.obj_name == p_tgt_rec->obj_name && src_rec.bucket_name == p_tgt_rec->bucket_name) {
+ if (src_rec.ref_tag == p_tgt_rec->ref_tag) {
p_stats->duplicate_records++;
- ldpp_dout(dpp, 10) << __func__ << "::WARN: Duplicate records for object="
- << src_rec.obj_name << dendl;
+ ldpp_dout(dpp, 10) << __func__ << "::WARN::REF_TAG::Duplicate records for "
+ << src_rec.obj_name << "::" << src_rec.ref_tag << "::"
+ << p_tgt_rec->obj_name << dendl;
return 0;
}
ldpp_dout(dpp, 10) << __func__ << "::HASH mismatch" << dendl;
// TBD: set hash attributes on head objects to save calc next time
if (src_rec.s.flags.hash_calculated()) {
- write_blake3_object_attribute(dpp, driver, rados, &src_rec);
+ write_blake3_object_attribute(dpp, driver, store, &src_rec);
p_stats->set_hash_attrs++;
}
if (p_tgt_rec->s.flags.hash_calculated()) {
- write_blake3_object_attribute(dpp, driver, rados, p_tgt_rec);
+ write_blake3_object_attribute(dpp, driver, store, p_tgt_rec);
p_stats->set_hash_attrs++;
}
return 0;
}
return add_disk_rec_from_bucket_idx(disk_arr, p_bucket, &parsed_etag,
- entry.key.name, entry.meta.size,
- storage_class);
+ entry.key.name, entry.key.instance,
+ entry.meta.size, storage_class);
}
//---------------------------------------------------------------------------
obj_count += result.dir.m.size();
for (auto& entry : result.dir.m) {
const rgw_bucket_dir_entry& dirent = entry.second;
+ // make sure to advance marker in all cases!
+ marker = dirent.key;
+ ldpp_dout(dpp, 20) << __func__ << "::dirent = " << bucket->get_name() << "/"
+ << marker.name << "::instance=" << marker.instance << dendl;
if (unlikely((!dirent.exists && !dirent.is_delete_marker()) || !dirent.pending_map.empty())) {
// TBD: should we bailout ???
- ldpp_dout(dpp, 1) << __func__ << "::ERR: calling check_disk_state bucket="
- << bucket->get_name() << " entry=" << dirent.key << dendl;
- // make sure we're advancing marker
- marker = dirent.key;
+ ldpp_dout(dpp, 1) << __func__ << "::ERR: bad dirent::" << bucket->get_name()
+ << "/" << marker.name << "::instance=" << marker.instance << dendl;
+ continue;
+ }
+ else if (unlikely(dirent.is_delete_marker())) {
+ ldpp_dout(dpp, 20) << __func__ << "::skip delete_marker::" << bucket->get_name()
+ << "/" << marker.name << "::instance=" << marker.instance << dendl;
continue;
}
- marker = dirent.key;
ret = ingress_bucket_idx_single_object(disk_arr, bucket, dirent, p_worker_stats);
}
// TBD: advance marker only once here!
const rgw::sal::Bucket *p_bucket,
const parsed_etag_t *p_parsed_etag,
const std::string &obj_name,
+ const std::string &instance,
uint64_t obj_size,
const std::string &storage_class);
//---------------------------------------------------------------------------
void set_shard(uint16_t shard) {
- int n = snprintf(this->buff + this->prefix_len, BUFF_SIZE, "%03x", shard);
+ int n = snprintf(this->buff + this->prefix_len, BUFF_SIZE - this->prefix_len, "%03x", shard);
this->total_len = this->prefix_len + n;
}
inline const char* get_buff() { return this->buff; }
inline unsigned get_buff_size() { return this->total_len; }
private:
- static const unsigned BUFF_SIZE = 15;
+ static const unsigned BUFF_SIZE = 16;
unsigned total_len = 0;
unsigned prefix_len = 0;
char buff[BUFF_SIZE];
disk_record_t::disk_record_t(const rgw::sal::Bucket *p_bucket,
const std::string &obj_name,
const parsed_etag_t *p_parsed_etag,
+ const std::string &instance,
uint64_t obj_size,
const std::string &storage_class)
{
this->s.md5_high = p_parsed_etag->md5_high;
this->s.md5_low = p_parsed_etag->md5_low;
this->s.obj_bytes_size = obj_size;
- this->s.object_version = 0;
this->bucket_id = p_bucket->get_bucket_id();
this->s.bucket_id_len = this->bucket_id.length();
this->tenant_name = p_bucket->get_tenant();
this->s.tenant_name_len = this->tenant_name.length();
+ this->instance = instance;
+ this->s.instance_len = instance.length();
this->stor_class = storage_class;
this->s.stor_class_len = storage_class.length();
this->s.md5_high = CEPHTOH_64(p_rec->s.md5_high);
this->s.md5_low = CEPHTOH_64(p_rec->s.md5_low);
this->s.obj_bytes_size = CEPHTOH_64(p_rec->s.obj_bytes_size);
- this->s.object_version = CEPHTOH_64(p_rec->s.object_version);
this->s.bucket_id_len = CEPHTOH_16(p_rec->s.bucket_id_len);
this->s.tenant_name_len = CEPHTOH_16(p_rec->s.tenant_name_len);
+ this->s.instance_len = CEPHTOH_16(p_rec->s.instance_len);
this->s.stor_class_len = CEPHTOH_16(p_rec->s.stor_class_len);
this->s.ref_tag_len = CEPHTOH_16(p_rec->s.ref_tag_len);
this->s.manifest_len = CEPHTOH_16(p_rec->s.manifest_len);
this->tenant_name = std::string(p, this->s.tenant_name_len);
p += p_rec->s.tenant_name_len;
+ this->instance = std::string(p, this->s.instance_len);
+ p += p_rec->s.instance_len;
+
this->stor_class = std::string(p, this->s.stor_class_len);
p += p_rec->s.stor_class_len;
p_rec->s.md5_high = HTOCEPH_64(this->s.md5_high);
p_rec->s.md5_low = HTOCEPH_64(this->s.md5_low);
p_rec->s.obj_bytes_size = HTOCEPH_64(this->s.obj_bytes_size);
- p_rec->s.object_version = HTOCEPH_64(this->s.object_version);
p_rec->s.bucket_id_len = HTOCEPH_16(this->bucket_id.length());
p_rec->s.tenant_name_len = HTOCEPH_16(this->tenant_name.length());
+ p_rec->s.instance_len = HTOCEPH_16(this->instance.length());
p_rec->s.stor_class_len = HTOCEPH_16(this->stor_class.length());
p_rec->s.ref_tag_len = HTOCEPH_16(this->ref_tag.length());
p_rec->s.manifest_len = HTOCEPH_16(this->manifest_bl.length());
std::memcpy(p, this->tenant_name.data(), len);
p += len;
+ len = this->instance.length();
+ std::memcpy(p, this->instance.data(), len);
+ p += len;
+
len = this->stor_class.length();
std::memcpy(p, this->stor_class.data(), len);
p += len;
this->bucket_name.length() +
this->bucket_id.length() +
this->tenant_name.length() +
+ this->instance.length() +
this->stor_class.length() +
this->ref_tag.length() +
this->manifest_bl.length());
stream << rec.bucket_name << "::" << rec.s.bucket_name_len << "\n";
stream << rec.bucket_id << "::" << rec.s.bucket_id_len << "\n";
stream << rec.tenant_name << "::" << rec.s.tenant_name_len << "\n";
+ stream << rec.instance << "::" << rec.s.instance_len << "\n";
stream << rec.stor_class << "::" << rec.s.stor_class_len << "\n";
stream << rec.ref_tag << "::" << rec.s.ref_tag_len << "\n";
stream << "num_parts = " << rec.s.num_parts << "\n";
disk_record_t(const rgw::sal::Bucket *p_bucket,
const std::string &obj_name,
const parsed_etag_t *p_parsed_etag,
+ const std::string &instance,
uint64_t obj_size,
const std::string &storage_class);
disk_record_t() {}
uint64_t md5_high; // High Bytes of the Object Data MD5
uint64_t md5_low; // Low Bytes of the Object Data MD5
uint64_t obj_bytes_size;
- uint64_t object_version;
uint16_t bucket_id_len;
uint16_t tenant_name_len;
+ uint16_t instance_len;
uint16_t stor_class_len;
uint16_t ref_tag_len;
std::string bucket_id;
std::string tenant_name;
std::string ref_tag;
+ std::string instance;
std::string stor_class;
bufferlist manifest_bl;
};
size_t len,
const DoutPrefixProvider* dpp);
- //---------------------------------------------------------------------------
- static inline void build_oid(const std::string &bucket_id,
- const std::string &obj_name,
- std::string *oid)
- {
- *oid = bucket_id + "_" + obj_name;
- }
-
//---------------------------------------------------------------------------
static inline uint64_t calc_deduped_bytes(uint64_t head_obj_size,
uint16_t num_parts,
return full_dedup_state_disabled
+#==============================================================================
+# RGW Versioning Tests:
+#==============================================================================
+#-------------------------------------------------------------------------------
+def delete_all_versions(conn, bucket_name, dry_run=False):
+ log.info("delete_all_versions")
+ p_conf = {
+ 'PageSize': 1000 # Request 1000 items per page
+ # MaxItems is omitted to allow unlimited total items
+ }
+ paginator = conn.get_paginator('list_object_versions')
+ to_delete = []
+
+ for page in paginator.paginate(Bucket=bucket_name, PaginationConfig=p_conf):
+ # Collect versions
+ for v in page.get('Versions', []):
+ to_delete.append({'Key': v['Key'], 'VersionId': v['VersionId']})
+
+ # Collect delete markers
+ for dm in page.get('DeleteMarkers', []):
+ to_delete.append({'Key': dm['Key'], 'VersionId': dm['VersionId']})
+
+ # Delete in chunks
+ if dry_run:
+ log.info("DRY RUN would delete %d objects", len(to_delete))
+ else:
+ conn.delete_objects(Bucket=bucket_name, Delete={'Objects': to_delete})
+ to_delete.clear()
+
+
+#-------------------------------------------------------------------------------
+def list_all_versions(conn, bucket_name, verbose=False):
+ p_conf = {
+ 'PageSize': 1000 # Request 1000 items per page
+ # MaxItems is omitted to allow unlimited total items
+ }
+ paginator = conn.get_paginator("list_object_versions")
+ total_s3_versioned_objs=0
+ for page in paginator.paginate(Bucket=bucket_name, PaginationConfig=p_conf):
+ # normal object versions
+ for v in page.get("Versions", []):
+ total_s3_versioned_objs += 1
+ key = v["Key"]
+ vid = v["VersionId"]
+ size = v.get("Size", 0)
+ is_latest = v.get("IsLatest", False)
+ #etag = v.get("ETag")
+ if verbose:
+ log.info("%s::ver=%s, size=%d, IsLatest=%d",
+ key, vid, size, is_latest)
+
+ # delete markers (no Size)
+ for dm in page.get("DeleteMarkers", []):
+ key = dm["Key"]
+ vid = dm["VersionId"]
+ is_latest = dm.get("IsLatest", False)
+ if verbose:
+ log.info("DeleteMarker::%s::ver=%s, IsLatest=%d",
+ key, vid, is_latest)
+
+ return total_s3_versioned_objs
+
+#-------------------------------------------------------------------------------
+def gen_files_in_range_single_copy(files, count, min_size, max_size):
+ assert(min_size <= max_size)
+ assert(min_size > 0)
+
+ idx=0
+ size_range = max_size - min_size
+ size=0
+ for i in range(0, count):
+ size = min_size + random.randint(0, size_range-1)
+ idx += 1
+ filename = "OBJ_" + str(idx)
+ files.append((filename, size, 1))
+ write_file(filename, size)
+
+ assert len(files) == count
+
+#-------------------------------------------------------------------------------
+def simple_upload(bucket_name, files, conn, config, op_log, first_time):
+ for f in files:
+ filename=f[0]
+ size=f[1]
+ if first_time:
+ key = filename
+ else:
+ idx=random.randint(0, len(files)-1)
+ key=files[idx][0]
+
+ log.debug("upload_file %s -> %s/%s (%d)", filename, bucket_name, key, size)
+ conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config)
+ resp = conn.head_object(Bucket=bucket_name, Key=key)
+ version_id = resp.get("VersionId")
+ op_log.append((filename, size, key, version_id))
+
+#-------------------------------------------------------------------------------
+def ver_calc_rados_obj_count(config, files, op_log):
+ size_dict = {}
+ num_copies_dict = {}
+ unique_s3_objs = set()
+
+ for f in files:
+ filename=f[0]
+ size=f[1]
+ size_dict[filename] = size
+ num_copies_dict[filename] = 0
+
+ for o in op_log:
+ filename=o[0]
+ key=o[2]
+ num_copies_dict[filename] += 1
+ unique_s3_objs.add(key)
+
+ rados_obj_total = 0
+ duplicated_tail_objs = 0
+ for key, value in size_dict.items():
+ size = value
+ num_copies = num_copies_dict[key]
+ assert num_copies > 0
+ rados_obj_count = calc_rados_obj_count(num_copies, size, config)
+ rados_obj_total += (rados_obj_count * num_copies)
+ duplicated_tail_objs += ((num_copies-1) * (rados_obj_count-1))
+
+ # versioned buckets hold an extra rados-obj per versioned S3-Obj
+ unique_s3_objs_count = len(unique_s3_objs)
+ rados_obj_total += unique_s3_objs_count
+ rados_obj_count_post_dedup=(rados_obj_total-duplicated_tail_objs)
+ log.debug("calc::rados_obj_total=%d, rados_obj_count_post_dedup=%d",
+ rados_obj_total, rados_obj_count_post_dedup)
+ return(rados_obj_total, rados_obj_count_post_dedup, unique_s3_objs_count)
+
+#-------------------------------------------------------------------------------
+def verify_objects_with_version(bucket_name, op_log, conn, config):
+ tempfile = OUT_DIR + "temp"
+ pend_delete_set = set()
+ for o in op_log:
+ filename=o[0]
+ size=o[1]
+ key=o[2]
+ version_id=o[3]
+ log.debug("verify: %s/%s:: ver=%s", bucket_name, filename, version_id)
+
+ # call garbage collect for tail objects before reading the same filename
+ # this will help detect bad deletions
+ if filename in pend_delete_set:
+ result = admin(['gc', 'process', '--include-all'])
+ assert result[1] == 0
+
+ # only objects larger than RADOS_OBJ_SIZE got tail-objects
+ if size > RADOS_OBJ_SIZE:
+ pend_delete_set.add(filename)
+
+ conn.download_file(Bucket=bucket_name, Key=key, Filename=tempfile,
+ Config=config, ExtraArgs={'VersionId': version_id})
+ result = bash(['cmp', tempfile, OUT_DIR + filename])
+ assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile)
+ os.remove(tempfile)
+ conn.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id)
+
+
+#-------------------------------------------------------------------------------
+# generate @num_files objects with @ver_count versions each of @obj_size
+# verify that we got the correct number of rados-objects
+# then dedup and verify that duplicate tail objects been removed
+# read-verify *all* objects in all versions deleting one version after another
+# while making sure the remaining versions are still good
+# finally make sure no rados-object was left behind after the last ver was removed
+@pytest.mark.basic_test
+def test_dedup_with_versions():
+ #return
+
+ if full_dedup_is_disabled():
+ return
+
+ prepare_test()
+ bucket_name = "bucket1"
+ files=[]
+ op_log=[]
+ num_files=43
+ min_size=1*KB
+ max_size=MULTIPART_SIZE*2
+ success=False
+ try:
+ conn=get_single_connection()
+ conn.create_bucket(Bucket=bucket_name)
+ gen_files_in_range_single_copy(files, num_files, min_size, max_size)
+ # enable versioning
+ conn.put_bucket_versioning(Bucket=bucket_name,
+ VersioningConfiguration={"Status": "Enabled"})
+ ver_count=7
+ first_time=True
+ for i in range(0, ver_count):
+ simple_upload(bucket_name, files, conn, default_config, op_log, first_time)
+ first_time=False
+
+ ret=ver_calc_rados_obj_count(default_config, files, op_log)
+ rados_objects_total=ret[0]
+ rados_objects_post_dedup=ret[1]
+ unique_s3_objs_count=ret[2]
+ assert unique_s3_objs_count == num_files
+ log.info("rados_objects_total=%d, rados_objects_post_dedup=%d",
+ rados_objects_total, rados_objects_post_dedup)
+ log.info("unique_s3_objs_count=%d, total_s3_versioned_objs=%d",
+ unique_s3_objs_count, len(op_log))
+ total_s3_versioned_objs=list_all_versions(conn, bucket_name)
+ assert total_s3_versioned_objs == (num_files * ver_count)
+ assert total_s3_versioned_objs == len(op_log)
+ assert rados_objects_total == count_object_parts_in_all_buckets()
+ exec_dedup_internal(Dedup_Stats(), dry_run=False, max_dedup_time=500)
+ assert rados_objects_post_dedup == count_object_parts_in_all_buckets()
+ verify_objects_with_version(bucket_name, op_log, conn, default_config)
+ success=True
+ finally:
+ # cleanup must be executed even after a failure
+ if success == False:
+ delete_all_versions(conn, bucket_name, dry_run=False)
+
+ # otherwise, objects been removed by verify_objects_with_version()
+ cleanup(bucket_name, conn)
+
+#==============================================================================
+# ETag Corruption Tests:
+#==============================================================================
CORRUPTIONS = ("no corruption", "change_etag", "illegal_hex_value",
"change_num_parts", "illegal_separator",
"illegal_dec_val_num_parts", "illegal_num_parts_overflow")