From: Pritha Srivastava Date: Fri, 8 Mar 2024 08:07:34 +0000 (+0530) Subject: rgw/d4n: implementation of caching head in read workflow. X-Git-Tag: v20.3.0~8^2~38 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1febf88bb6d2343ed71730aaa2b9e182a2671105;p=ceph.git rgw/d4n: implementation of caching head in read workflow. modifications in ReadOp::prepare() method of the d4n filter driver to cache the head object. modification in get_obj_attrs to read from cache or backend store. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 3903e81dfddc..605b8f9b6a7a 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -137,60 +137,193 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags); } +bool D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y) +{ + rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir(); + rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ + .objName = this->get_name(), + .bucketName = this->get_bucket()->get_name(), + }; + + rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ + .cacheObj = object, + .blockID = 0, + .version = version, + .size = 0 + }; + + bool found_in_cache = true; + //if the block corresponding to head object does not exist in directory, implies it is not cached + if (blockDir->exist_key(&block, y) && (blockDir->get(&block, y) == 0)) { + rgw::sal::Attrs attrs; + std::string version = block.version; + this->set_object_version(version); + //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version + std::string head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name(); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache." << dendl; + auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; + found_in_cache = false; + } else { + /* Set metadata locally */ + RGWQuotaInfo quota_info; + + std::string instance; + for (auto& attr : attrs) { + if (attr.second.length() > 0) { + if (attr.first == "user.rgw.mtime") { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl; + auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str())); + this->set_mtime(mtime); + } else if (attr.first == "user.rgw.object_size") { + auto size = std::stoull(attr.second.c_str()); + this->set_obj_size(size); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl; + } else if (attr.first == "user.rgw.accounted_size") { + auto accounted_size = std::stoull(attr.second.c_str()); + this->set_accounted_size(accounted_size); + } else if (attr.first == "user.rgw.epoch") { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl; + auto epoch = std::stoull(attr.second.c_str()); + this->set_epoch(epoch); + } else if (attr.first == "user.rgw.version_id") { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id." << dendl; + instance = attr.second.to_str(); + } else if (attr.first == "user.rgw.source_zone") { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl; + auto zone_short_id = static_cast(std::stoul(attr.second.c_str())); + this->set_short_zone_id(zone_short_id); + } else { + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl; + } + }//end-if + }//end-for + //this->set_obj_state(astate); + this->set_instance(instance); //set this only after setting object state else it won't take effect + attrs.erase("user.rgw.mtime"); + attrs.erase("user.rgw.object_size"); + attrs.erase("user.rgw.accounted_size"); + attrs.erase("user.rgw.epoch"); + /* Set attributes locally */ + ret = this->set_attrs(attrs); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; + } + } + } else { + found_in_cache = false; + } + + return found_in_cache; +} + +void D4NFilterObject::set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs) +{ + bufferlist bl_val; + bl_val.append(std::to_string(this->get_size())); + attrs["user.rgw.object_size"] = std::move(bl_val); + + bl_val.append(std::to_string(this->get_epoch())); + attrs["user.rgw.epoch"] = std::move(bl_val); + + bl_val.append(std::to_string(ceph::real_clock::to_double(this->get_mtime()))); + attrs["user.rgw.mtime"] = std::move(bl_val); + + if(this->have_instance()) { + bl_val.append(this->get_instance()); + attrs["user.rgw.version_id"] = std::move(bl_val); + } + + bl_val.append(std::to_string(this->get_short_zone_id())); + attrs["user.rgw.source_zone"] = std::move(bl_val); + + bl_val.append(std::to_string(this->get_accounted_size())); + attrs["user.rgw.accounted_size"] = std::move(bl_val); // will this get updated? + + return; +} + +int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version) +{ + //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only + if (! this->have_instance() && version.empty()) { + bufferlist bl; + if (this->get_attr(RGW_ATTR_ID_TAG, bl)) { + version = bl.c_str(); + ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl; + } else { + ldpp_dout(dpp, 20) << __func__ << " Failed to find id tag" << dendl; + return -ENOENT; + } + } + bufferlist bl; + if (this->have_instance()) { + version = this->get_instance(); + } + + this->set_object_version(version); + + return 0; +} + +int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y) +{ + rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir(); + rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ + .objName = this->get_name(), + .bucketName = this->get_bucket()->get_name(), + }; + + rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ + .cacheObj = object, + .blockID = 0, + .version = this->get_object_version(), + .size = 0 + }; + + auto ret = blockDir->set(&block, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + } + + return ret; +} + int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj) { - rgw::sal::Attrs attrs; - - if (driver->get_cache_driver()->get_attrs(dpp, this->get_key().get_oid(), attrs, y) < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; - return next->get_obj_attrs(y, dpp, target_obj); - } else { - /* Set metadata locally */ - RGWQuotaInfo quota_info; + if (!get_obj_attrs_from_cache(dpp, y)) { + std::string head_oid_in_cache; + rgw::sal::Attrs attrs; + std::string version; + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from backend store." << dendl; + auto ret = next->get_obj_attrs(y, dpp, target_obj); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl; + return ret; + } + this->load_obj_state(dpp, y); + attrs = this->get_attrs(); + this->set_obj_state_attrs(dpp, y, attrs); - for (auto it = attrs.begin(); it != attrs.end(); ++it) { - if (it->second.length() > 0) { - if (it->first == "mtime") { - ceph::real_time mtime; - parse_time(it->second.c_str(), &mtime); - this->set_mtime(mtime); - attrs.erase(it->first); - } else if (it->first == "object_size") { - this->set_obj_size(std::stoull(it->second.c_str())); - attrs.erase(it->first); - } else if (it->first == "accounted_size") { - this->set_accounted_size(std::stoull(it->second.c_str())); - attrs.erase(it->first); - } else if (it->first == "epoch") { - this->set_epoch(std::stoull(it->second.c_str())); - attrs.erase(it->first); - } else if (it->first == "version_id") { - this->set_instance(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "this_zone_short_id") { - this->set_short_zone_id(static_cast(std::stoul(it->second.c_str()))); - attrs.erase(it->first); - } else if (it->first == "user_quota.max_size") { - quota_info.max_size = std::stoull(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "user_quota.max_objects") { - quota_info.max_objects = std::stoull(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "max_buckets") { - attrs.erase(it->first); - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Unexpected attribute; not locally set." << dendl; - attrs.erase(it->first); - } - } + ret = calculate_version(dpp, y, version); + if (ret < 0 || version.empty()) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl; } - /* Set attributes locally */ - if (this->set_attrs(attrs) < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; - return next->get_obj_attrs(y, dpp, target_obj); + head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name(); + ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y); + if (ret == 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl; + this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, y); + ret = set_head_obj_dir_entry(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + } + } else { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in block dir with error: " << ret << dendl; } } @@ -279,79 +412,54 @@ std::unique_ptr D4NFilterObject::get_delete_op() int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp) { - next->params.mod_ptr = params.mod_ptr; - next->params.unmod_ptr = params.unmod_ptr; - next->params.high_precision_time = params.high_precision_time; - next->params.mod_zone_id = params.mod_zone_id; - next->params.mod_pg_ver = params.mod_pg_ver; - next->params.if_match = params.if_match; - next->params.if_nomatch = params.if_nomatch; - next->params.lastmod = params.lastmod; - int ret = next->prepare(y, dpp); - - rgw::sal::Attrs attrs; - - if (source->driver->get_cache_driver()->get_attrs(dpp, source->get_key().get_oid(), attrs, y) < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; - } else { - /* Set metadata locally */ - RGWQuotaInfo quota_info; - source->load_obj_state(dpp, y); - - for (auto& attr : attrs) { - if (attr.second.length() > 0) { - if (attr.first == "mtime") { - ceph::real_time mtime; - parse_time(attr.second.c_str(), &mtime); - source->set_mtime(mtime); - } else if (attr.first == "object_size") { - source->set_obj_size(std::stoull(attr.second.c_str())); - attrs.erase(attr.first); - } else if (attr.first == "accounted_size") { - source->set_accounted_size(std::stoull(attr.second.c_str())); - attrs.erase(attr.first); - } else if (attr.first == "epoch") { - source->set_epoch(std::stoull(attr.second.c_str())); - attrs.erase(attr.first); - } else if (attr.first == "version_id") { - source->set_instance(attr.second.c_str()); - attrs.erase(attr.first); - } else if (attr.first == "source_zone_short_id") { - source->set_short_zone_id(static_cast(std::stoul(attr.second.c_str()))); - attrs.erase(attr.first); - } else if (attr.first == "user_quota.max_size") { - quota_info.max_size = std::stoull(attr.second.c_str()); - attrs.erase(attr.first); - } else if (attr.first == "user_quota.max_objects") { - quota_info.max_objects = std::stoull(attr.second.c_str()); - attrs.erase(attr.first); - } else if (attr.first == "max_buckets") { - attrs.erase(attr.first); - } else { - ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): Unexpected attribute; not locally set." << dendl; - } + if (!source->get_obj_attrs_from_cache(dpp, y)) { + std::string head_oid_in_cache; + rgw::sal::Attrs attrs; + std::string version; + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): fetching head object from backend store" << dendl; + next->params = params; + auto ret = next->prepare(y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed with error: " << ret << dendl; + return ret; + } + if (params.part_num) { + params.parts_count = next->params.parts_count; + if (params.parts_count > 1) { + ldpp_dout(dpp, 20) << __func__ << "params.part_count: " << params.parts_count << dendl; + return 0; // d4n wont handle multipart read requests with part number for now } - - /* Set attributes locally */ - if (source->set_attrs(attrs) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; - } - } + } + this->source->load_obj_state(dpp, y); + attrs = source->get_attrs(); + source->set_obj_state_attrs(dpp, y, attrs); - //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only - if (! source->have_instance()) { - if (source->load_obj_state(dpp, y) == 0) { - bufferlist bl; - if (source->get_attr(RGW_ATTR_ID_TAG, bl)) { - source->set_object_version(bl.c_str()); - ldpp_dout(dpp, 20) << __func__ << "id tag version is: " << source->get_object_version() << dendl; + ret = source->calculate_version(dpp, y, version); + if (ret < 0 || version.empty()) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl; + } + + bufferlist bl; + head_oid_in_cache = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name(); + ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y); + if (ret == 0) { + ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y); + if (ret == 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl; + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, y); + ret = source->set_head_obj_dir_entry(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + } } else { - ldpp_dout(dpp, 20) << __func__ << "Failed to find id tag" << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): put for head object failed with error: " << ret << dendl; } + } else { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object, eviction returned error: " << ret << dendl; } } - - return ret; + + return 0; } void D4NFilterObject::D4NFilterReadOp::cancel() { @@ -400,14 +508,12 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw:: if (it != blocks_info.end()) { std::string version = source->get_object_version(); std::string prefix = source->get_prefix(); - if (version.empty()) { - version = source->get_instance(); - } std::pair ofs_len_pair = it->second; uint64_t ofs = ofs_len_pair.first; uint64_t len = ofs_len_pair.second; std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len); ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl; source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y); blocks_info.erase(it); } else { @@ -428,7 +534,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int const uint64_t window_size = g_conf()->rgw_get_obj_window_size; std::string version = source->get_object_version(); std::string prefix; - if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added + /* After prepare() method, for versioned objects, get_oid() returns an oid with versionId added, + * even for versioned objects, where version id is not provided as input + */ + if (source->have_instance()) { prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid(); } else { prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid(); @@ -463,10 +572,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->offset = ofs; - if (version.empty()) { - version = source->get_instance(); - } - do { uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk if (start_part_num == (num_parts - 1)) { @@ -487,8 +592,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ceph::bufferlist bl; std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl; - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; @@ -616,9 +719,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl Attrs attrs; // empty attrs for cache sets std::string version = source->get_object_version(); std::string prefix = source->get_prefix(); - if (version.empty()) { - version = source->get_instance(); - } + ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl; if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 508bc3a60435..40c98d5d2f88 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -106,6 +106,10 @@ class D4NFilterObject : public FilterObject { std::string version; std::string prefix; + bool get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y); + void set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs); + int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version); + int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y); public: struct D4NFilterReadOp : FilterReadOp { public: