return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
}
+bool D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y)
+{
+ rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ .objName = this->get_name(),
+ .bucketName = this->get_bucket()->get_name(),
+ };
+
+ rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+ .cacheObj = object,
+ .blockID = 0,
+ .version = version,
+ .size = 0
+ };
+
+ bool found_in_cache = true;
+ //if the block corresponding to head object does not exist in directory, implies it is not cached
+ if (blockDir->exist_key(&block, y) && (blockDir->get(&block, y) == 0)) {
+ rgw::sal::Attrs attrs;
+ std::string version = block.version;
+ this->set_object_version(version);
+ //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version
+ std::string head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name();
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache." << dendl;
+ auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
+ found_in_cache = false;
+ } else {
+ /* Set metadata locally */
+ RGWQuotaInfo quota_info;
+
+ std::string instance;
+ for (auto& attr : attrs) {
+ if (attr.second.length() > 0) {
+ if (attr.first == "user.rgw.mtime") {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
+ auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
+ this->set_mtime(mtime);
+ } else if (attr.first == "user.rgw.object_size") {
+ auto size = std::stoull(attr.second.c_str());
+ this->set_obj_size(size);
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
+ } else if (attr.first == "user.rgw.accounted_size") {
+ auto accounted_size = std::stoull(attr.second.c_str());
+ this->set_accounted_size(accounted_size);
+ } else if (attr.first == "user.rgw.epoch") {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
+ auto epoch = std::stoull(attr.second.c_str());
+ this->set_epoch(epoch);
+ } else if (attr.first == "user.rgw.version_id") {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id." << dendl;
+ instance = attr.second.to_str();
+ } else if (attr.first == "user.rgw.source_zone") {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
+ auto zone_short_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
+ this->set_short_zone_id(zone_short_id);
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
+ }
+ }//end-if
+ }//end-for
+ //this->set_obj_state(astate);
+ this->set_instance(instance); //set this only after setting object state else it won't take effect
+ attrs.erase("user.rgw.mtime");
+ attrs.erase("user.rgw.object_size");
+ attrs.erase("user.rgw.accounted_size");
+ attrs.erase("user.rgw.epoch");
+ /* Set attributes locally */
+ ret = this->set_attrs(attrs);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
+ }
+ }
+ } else {
+ found_in_cache = false;
+ }
+
+ return found_in_cache;
+}
+
+void D4NFilterObject::set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
+{
+ bufferlist bl_val;
+ bl_val.append(std::to_string(this->get_size()));
+ attrs["user.rgw.object_size"] = std::move(bl_val);
+
+ bl_val.append(std::to_string(this->get_epoch()));
+ attrs["user.rgw.epoch"] = std::move(bl_val);
+
+ bl_val.append(std::to_string(ceph::real_clock::to_double(this->get_mtime())));
+ attrs["user.rgw.mtime"] = std::move(bl_val);
+
+ if(this->have_instance()) {
+ bl_val.append(this->get_instance());
+ attrs["user.rgw.version_id"] = std::move(bl_val);
+ }
+
+ bl_val.append(std::to_string(this->get_short_zone_id()));
+ attrs["user.rgw.source_zone"] = std::move(bl_val);
+
+ bl_val.append(std::to_string(this->get_accounted_size()));
+ attrs["user.rgw.accounted_size"] = std::move(bl_val); // will this get updated?
+
+ return;
+}
+
+int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version)
+{
+ //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
+ if (! this->have_instance() && version.empty()) {
+ bufferlist bl;
+ if (this->get_attr(RGW_ATTR_ID_TAG, bl)) {
+ version = bl.c_str();
+ ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << __func__ << " Failed to find id tag" << dendl;
+ return -ENOENT;
+ }
+ }
+ bufferlist bl;
+ if (this->have_instance()) {
+ version = this->get_instance();
+ }
+
+ this->set_object_version(version);
+
+ return 0;
+}
+
+int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y)
+{
+ rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ .objName = this->get_name(),
+ .bucketName = this->get_bucket()->get_name(),
+ };
+
+ rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+ .cacheObj = object,
+ .blockID = 0,
+ .version = this->get_object_version(),
+ .size = 0
+ };
+
+ auto ret = blockDir->set(&block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ }
+
+ return ret;
+}
+
int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj)
{
- rgw::sal::Attrs attrs;
-
- if (driver->get_cache_driver()->get_attrs(dpp, this->get_key().get_oid(), attrs, y) < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
- return next->get_obj_attrs(y, dpp, target_obj);
- } else {
- /* Set metadata locally */
- RGWQuotaInfo quota_info;
+ if (!get_obj_attrs_from_cache(dpp, y)) {
+ std::string head_oid_in_cache;
+ rgw::sal::Attrs attrs;
+ std::string version;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from backend store." << dendl;
+ auto ret = next->get_obj_attrs(y, dpp, target_obj);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl;
+ return ret;
+ }
+
this->load_obj_state(dpp, y);
+ attrs = this->get_attrs();
+ this->set_obj_state_attrs(dpp, y, attrs);
- for (auto it = attrs.begin(); it != attrs.end(); ++it) {
- if (it->second.length() > 0) {
- if (it->first == "mtime") {
- ceph::real_time mtime;
- parse_time(it->second.c_str(), &mtime);
- this->set_mtime(mtime);
- attrs.erase(it->first);
- } else if (it->first == "object_size") {
- this->set_obj_size(std::stoull(it->second.c_str()));
- attrs.erase(it->first);
- } else if (it->first == "accounted_size") {
- this->set_accounted_size(std::stoull(it->second.c_str()));
- attrs.erase(it->first);
- } else if (it->first == "epoch") {
- this->set_epoch(std::stoull(it->second.c_str()));
- attrs.erase(it->first);
- } else if (it->first == "version_id") {
- this->set_instance(it->second.c_str());
- attrs.erase(it->first);
- } else if (it->first == "this_zone_short_id") {
- this->set_short_zone_id(static_cast<uint32_t>(std::stoul(it->second.c_str())));
- attrs.erase(it->first);
- } else if (it->first == "user_quota.max_size") {
- quota_info.max_size = std::stoull(it->second.c_str());
- attrs.erase(it->first);
- } else if (it->first == "user_quota.max_objects") {
- quota_info.max_objects = std::stoull(it->second.c_str());
- attrs.erase(it->first);
- } else if (it->first == "max_buckets") {
- attrs.erase(it->first);
- } else {
- ldpp_dout(dpp, 20) << "D4N Filter: Unexpected attribute; not locally set." << dendl;
- attrs.erase(it->first);
- }
- }
+ ret = calculate_version(dpp, y, version);
+ if (ret < 0 || version.empty()) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
}
- /* Set attributes locally */
- if (this->set_attrs(attrs) < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
- return next->get_obj_attrs(y, dpp, target_obj);
+ head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name();
+ ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
+ if (ret == 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
+ this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, y);
+ ret = set_head_obj_dir_entry(dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ }
+ } else {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in block dir with error: " << ret << dendl;
}
}
int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp)
{
- next->params.mod_ptr = params.mod_ptr;
- next->params.unmod_ptr = params.unmod_ptr;
- next->params.high_precision_time = params.high_precision_time;
- next->params.mod_zone_id = params.mod_zone_id;
- next->params.mod_pg_ver = params.mod_pg_ver;
- next->params.if_match = params.if_match;
- next->params.if_nomatch = params.if_nomatch;
- next->params.lastmod = params.lastmod;
- int ret = next->prepare(y, dpp);
-
- rgw::sal::Attrs attrs;
-
- if (source->driver->get_cache_driver()->get_attrs(dpp, source->get_key().get_oid(), attrs, y) < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
- } else {
- /* Set metadata locally */
- RGWQuotaInfo quota_info;
- source->load_obj_state(dpp, y);
-
- for (auto& attr : attrs) {
- if (attr.second.length() > 0) {
- if (attr.first == "mtime") {
- ceph::real_time mtime;
- parse_time(attr.second.c_str(), &mtime);
- source->set_mtime(mtime);
- } else if (attr.first == "object_size") {
- source->set_obj_size(std::stoull(attr.second.c_str()));
- attrs.erase(attr.first);
- } else if (attr.first == "accounted_size") {
- source->set_accounted_size(std::stoull(attr.second.c_str()));
- attrs.erase(attr.first);
- } else if (attr.first == "epoch") {
- source->set_epoch(std::stoull(attr.second.c_str()));
- attrs.erase(attr.first);
- } else if (attr.first == "version_id") {
- source->set_instance(attr.second.c_str());
- attrs.erase(attr.first);
- } else if (attr.first == "source_zone_short_id") {
- source->set_short_zone_id(static_cast<uint32_t>(std::stoul(attr.second.c_str())));
- attrs.erase(attr.first);
- } else if (attr.first == "user_quota.max_size") {
- quota_info.max_size = std::stoull(attr.second.c_str());
- attrs.erase(attr.first);
- } else if (attr.first == "user_quota.max_objects") {
- quota_info.max_objects = std::stoull(attr.second.c_str());
- attrs.erase(attr.first);
- } else if (attr.first == "max_buckets") {
- attrs.erase(attr.first);
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): Unexpected attribute; not locally set." << dendl;
- }
+ if (!source->get_obj_attrs_from_cache(dpp, y)) {
+ std::string head_oid_in_cache;
+ rgw::sal::Attrs attrs;
+ std::string version;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): fetching head object from backend store" << dendl;
+ next->params = params;
+ auto ret = next->prepare(y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed with error: " << ret << dendl;
+ return ret;
+ }
+ if (params.part_num) {
+ params.parts_count = next->params.parts_count;
+ if (params.parts_count > 1) {
+ ldpp_dout(dpp, 20) << __func__ << "params.part_count: " << params.parts_count << dendl;
+ return 0; // d4n wont handle multipart read requests with part number for now
}
-
- /* Set attributes locally */
- if (source->set_attrs(attrs) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
- }
- }
+ }
+ this->source->load_obj_state(dpp, y);
+ attrs = source->get_attrs();
+ source->set_obj_state_attrs(dpp, y, attrs);
- //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
- if (! source->have_instance()) {
- if (source->load_obj_state(dpp, y) == 0) {
- bufferlist bl;
- if (source->get_attr(RGW_ATTR_ID_TAG, bl)) {
- source->set_object_version(bl.c_str());
- ldpp_dout(dpp, 20) << __func__ << "id tag version is: " << source->get_object_version() << dendl;
+ ret = source->calculate_version(dpp, y, version);
+ if (ret < 0 || version.empty()) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+ }
+
+ bufferlist bl;
+ head_oid_in_cache = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name();
+ ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+ if (ret == 0) {
+ ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+ if (ret == 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, y);
+ ret = source->set_head_obj_dir_entry(dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ }
} else {
- ldpp_dout(dpp, 20) << __func__ << "Failed to find id tag" << dendl;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): put for head object failed with error: " << ret << dendl;
}
+ } else {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object, eviction returned error: " << ret << dendl;
}
}
-
- return ret;
+
+ return 0;
}
void D4NFilterObject::D4NFilterReadOp::cancel() {
if (it != blocks_info.end()) {
std::string version = source->get_object_version();
std::string prefix = source->get_prefix();
- if (version.empty()) {
- version = source->get_instance();
- }
std::pair<uint64_t, uint64_t> ofs_len_pair = it->second;
uint64_t ofs = ofs_len_pair.first;
uint64_t len = ofs_len_pair.second;
std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y);
blocks_info.erase(it);
} else {
const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
std::string version = source->get_object_version();
std::string prefix;
- if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+ /* After prepare() method, for versioned objects, get_oid() returns an oid with versionId added,
+ * even for versioned objects, where version id is not provided as input
+ */
+ if (source->have_instance()) {
prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid();
} else {
prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid();
this->offset = ofs;
- if (version.empty()) {
- version = source->get_instance();
- }
-
do {
uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
if (start_part_num == (num_parts - 1)) {
ceph::bufferlist bl;
std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl;
-
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
Attrs attrs; // empty attrs for cache sets
std::string version = source->get_object_version();
std::string prefix = source->get_prefix();
- if (version.empty()) {
- version = source->get_instance();
- }
+
ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache