return next->create(dpp, params, y);
}
+std::unique_ptr<MultipartUpload> D4NFilterBucket::get_multipart_upload(
+ const std::string& oid,
+ std::optional<std::string> upload_id,
+ ACLOwner owner, ceph::real_time mtime)
+{
+ std::unique_ptr<MultipartUpload> nmu =
+ next->get_multipart_upload(oid, upload_id, owner, mtime);
+
+ return std::make_unique<D4NFilterMultipartUpload>(std::move(nmu), this, this->filter);
+}
+
int D4NFilterObject::copy_object(const ACLOwner& owner,
const rgw_user& remote_user,
req_info* info,
const DoutPrefixProvider* dpp,
optional_yield y)
{
- if (g_conf()->d4n_writecache_enabled) {
- this->dest_object = dest_object;
- this->dest_bucket = dest_bucket;
+ bool write_to_cache = g_conf()->d4n_writecache_enabled;
+ bool dirty{false};
+ std::unique_ptr<rgw::sal::Object::ReadOp> read_op(this->get_read_op());
+ read_op->params.mod_ptr = mod_ptr;
+ read_op->params.unmod_ptr = unmod_ptr;
+ read_op->params.high_precision_time = high_precision_time;
+ read_op->params.if_match = if_match;
+ read_op->params.if_nomatch = if_nomatch;
+ if (auto ret = read_op->prepare(y, dpp); ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): prepare method failed with ret: " << ret << dendl;
+ if (ret == -ERR_NOT_MODIFIED) {
+ ret = ERR_PRECONDITION_FAILED;
+ }
+ return ret;
+ }
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): is_multipart: " << is_multipart() << dendl;
+ //for multipart objects or for read only cache, write to backend store
+ if (is_multipart() || !write_to_cache) {
+ write_to_cache = false;
+ auto ret = next->copy_object(owner, remote_user, info, source_zone,
+ nextObject(dest_object),
+ nextBucket(dest_bucket),
+ nextBucket(src_bucket),
+ dest_placement, src_mtime, mtime,
+ mod_ptr, unmod_ptr, high_precision_time, if_match,
+ if_nomatch, attrs_mod, copy_if_newer, attrs,
+ category, olh_epoch, delete_at, version_id, tag,
+ etag, progress_cb, progress_data, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->copy_object failed with ret: " << ret << dendl;
+ return ret;
+ }
+ }
+
+ this->dest_object = dest_object;
+ this->dest_bucket = dest_bucket;
+ D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(dest_object);
+
+ rgw::sal::Attrs baseAttrs;
+ //ATTRSMOD_NONE - the attributes of the source object will be copied without modifications, attrs parameter is ignored
+ if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
+ baseAttrs = this->get_attrs();
+ baseAttrs.erase("user.rgw.version_id"); //delete source version id
+ if (version_id) {
+ bufferlist bl_val;
+ bl_val.append(*version_id);
+ baseAttrs["user.rgw.version_id"] = std::move(bl_val); //populate destination version id
+ }
+ }
+ //ATTRSMOD_MERGE - any conflicting meta keys on the source object's attributes are overwritten by values contained in attrs parameter.
+ if (attrs_mod == rgw::sal::ATTRSMOD_MERGE) { /* Merge */
+ rgw::sal::Attrs::iterator iter;
+
+ for (const auto& pair : attrs) {
+ iter = baseAttrs.find(pair.first);
+
+ if (iter != baseAttrs.end()) {
+ iter->second = pair.second;
+ } else {
+ baseAttrs.insert({pair.first, pair.second});
+ }
+ }
+ } else if (attrs_mod == rgw::sal::ATTRSMOD_REPLACE) { /* Replace */
+ //ATTRSMOD_REPLACE - new object will have the attributes provided by attrs parameter, source object attributes are not copied;
+ baseAttrs.insert(attrs.begin(), attrs.end());
+ }
+
+
+ time_t creationTime = -1;
+ std::string dest_version;
+ if (write_to_cache) {
+ dirty = true;
if (!dest_object->have_instance()) {
if (dest_object->get_bucket()->versioned() && !dest_object->get_bucket()->versioning_enabled()) { //if versioning is suspended
- this->dest_version = "null";
+ dest_version = "null";
} else {
constexpr uint32_t OBJ_INSTANCE_LEN = 32;
char buf[OBJ_INSTANCE_LEN + 1];
gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
- this->dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
+ dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): generating version: " << version << dendl;
}
} else {
- this->dest_version = dest_object->get_instance();
- }
-
- std::unique_ptr<rgw::sal::Object::ReadOp> read_op(this->get_read_op());
- if (auto ret = read_op->prepare(y, dpp); ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): prepare method failed with ret: " << ret << dendl;
- return ret;
+ dest_version = dest_object->get_instance();
}
+ d4n_dest_object->set_object_version(dest_version);
if (auto ret = read_op->iterate(dpp, 0, (this->get_size() - 1), nullptr, y); ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): iterate method failed with ret: " << ret << dendl;
return ret;
}
- rgw::sal::Attrs baseAttrs;
- if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
- baseAttrs = attrs;
- } else {
- baseAttrs = this->get_attrs();
- }
-
- if (attrs_mod == rgw::sal::ATTRSMOD_REPLACE) { /* Replace */
- rgw::sal::Attrs::iterator iter;
-
- for (const auto& pair : attrs) {
- iter = baseAttrs.find(pair.first);
-
- if (iter != baseAttrs.end()) {
- iter->second = pair.second;
- } else {
- baseAttrs.insert({pair.first, pair.second});
- }
- }
- } else if (attrs_mod == rgw::sal::ATTRSMOD_MERGE) { /* Merge */
- baseAttrs.insert(attrs.begin(), attrs.end());
- }
-
ceph::real_time dest_mtime;
if (mtime) {
if (real_clock::is_zero(*mtime)) {
} else {
dest_mtime = real_clock::now();
}
+ creationTime = ceph::real_clock::to_time_t(dest_mtime);
dest_object->set_mtime(dest_mtime);
dest_object->set_obj_size(this->get_size());
- dest_object->set_accounted_size(this->get_size());
+ dest_object->set_accounted_size(this->get_accounted_size());
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " size is: " << dest_object->get_size() << dendl;
- D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(dest_object);
- d4n_dest_object->set_obj_state_attrs(dpp, y, baseAttrs);
- bufferlist bl_data;
- std::string key = dest_bucket->get_name() + "_" + this->dest_version + "_" + dest_object->get_name();
- std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
- auto ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
+ d4n_dest_object->set_attrs_from_obj_state(dpp, y, baseAttrs);
+ } else {
+ auto o_attrs = baseAttrs;
+ dest_object->load_obj_state(dpp, y);
+ baseAttrs = dest_object->get_attrs();
+ d4n_dest_object->set_attrs_from_obj_state(dpp, y, baseAttrs);
+ d4n_dest_object->calculate_version(dpp, y, dest_version, o_attrs);
+ if (dest_version.empty()) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+ }
+ }
+ bufferlist bl_val;
+ bl_val.append(std::to_string(this->is_multipart()));
+ baseAttrs["user.rgw.multipart"] = std::move(bl_val);
+ bl_val.append(*etag);
+ baseAttrs[RGW_ATTR_ETAG] = std::move(bl_val);
+ baseAttrs[RGW_ATTR_ACL] = std::move(attrs[RGW_ATTR_ACL]);
+
+ bufferlist bl_data;
+ dest_version = d4n_dest_object->get_object_version();
+
+ std::string key = dest_bucket->get_name() + "_" + dest_version + "_" + dest_object->get_name();
+ std::string head_oid_in_cache;
+ if (dirty) {
+ head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+ } else {
+ head_oid_in_cache = key;
+ }
+ auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, baseAttrs.size(), y);
+ if (ret == 0) {
+ ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
baseAttrs.erase("user.rgw.mtime");
baseAttrs.erase("user.rgw.object_size");
baseAttrs.erase("user.rgw.accounted_size");
baseAttrs.erase("user.rgw.epoch");
+ baseAttrs.erase("user.rgw.multipart");
if (ret == 0) {
- time_t creationTime = ceph::real_clock::to_time_t(dest_mtime);
- dest_object->set_attrs(baseAttrs);
- ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->dest_version << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << dest_version << dendl;
bufferlist bl;
- driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), this->dest_version, true, y);
- d4n_dest_object->set_object_version(this->dest_version);
- ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, true);
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), dest_version, dirty, y);
+ d4n_dest_object->set_object_version(dest_version);
+ ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, dirty);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
return ret;
}
- driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, this->dest_version, true, dest_object->get_accounted_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_key(), y);
+ if (dirty) {
+ std::string object_key = dest_object->get_bucket()->get_name() + "_" + dest_object->get_oid();
+ driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, object_key, dest_version, true, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_key(), y);
+ }
//write object to directory.
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
.objName = dest_object->get_oid(),
.bucketName = dest_object->get_bucket()->get_name(),
.creationTime = std::to_string(creationTime),
- .dirty = true,
+ .dirty = dirty,
.hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
};
ret = driver->get_obj_dir()->set(dpp, &object, y);
return ret;
}
}
- } else {
- auto ret = next->copy_object(owner, remote_user, info, source_zone,
- nextObject(dest_object),
- nextBucket(dest_bucket),
- nextBucket(src_bucket),
- dest_placement, src_mtime, mtime,
- mod_ptr, unmod_ptr, high_precision_time, if_match,
- if_nomatch, attrs_mod, copy_if_newer, attrs,
- category, olh_epoch, delete_at, version_id, tag,
- etag, progress_cb, progress_data, dpp, y);
- if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->copy_object failed with ret: " << ret << dendl;
- return ret;
- }
}
+
return 0;
}
if (attr.second.length() > 0) {
if (attr.first == "user.rgw.mtime") {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
- auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
+ auto mtime = ceph::real_clock::from_double(std::stod(attr.second.to_str()));
this->set_mtime(mtime);
} else if (attr.first == "user.rgw.object_size") {
auto size = std::stoull(attr.second.to_str());
this->set_accounted_size(accounted_size);
} else if (attr.first == "user.rgw.epoch") {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
- auto epoch = std::stoull(attr.second.c_str());
+ auto epoch = std::stoull(attr.second.to_str());
this->set_epoch(epoch);
} else if (attr.first == "user.rgw.version_id") {
instance = attr.second.to_str();
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl;
} else if (attr.first == "user.rgw.source_zone") {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
- auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
+ auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.to_str()));
this->set_short_zone_id(short_zone_id);
+ } else if (attr.first == "user.rgw.multipart") {
+ std::string multipart = attr.second.to_str();
+ this->multipart = (multipart == "1") ? true : false;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): is_multipart: " << this->multipart << " multipart: " << multipart << dendl;
} else {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
}
attrs.erase("user.rgw.object_size");
attrs.erase("user.rgw.accounted_size");
attrs.erase("user.rgw.epoch");
+ attrs.erase("user.rgw.multipart");
/* Set attributes locally */
auto ret = this->set_attrs(attrs);
if (ret < 0) {
return found_in_cache;
}
-void D4NFilterObject::set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
+int D4NFilterObject::set_attr_crypt_parts(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
+{
+ if (attrs.count(RGW_ATTR_CRYPT_MODE)) {
+ std::vector<size_t> parts_len;
+ uint64_t obj_size = this->get_size();
+ uint64_t obj_max_req_size = dpp->get_cct()->_conf->rgw_get_obj_max_req_size;
+ uint64_t num_parts = (obj_size%obj_max_req_size) == 0 ? obj_size/obj_max_req_size : (obj_size/obj_max_req_size) + 1;
+ size_t remainder_size = obj_size;
+ for (uint64_t part = 0; part < num_parts; part++) {
+ size_t part_len;
+ if (part == (num_parts - 1)) { //last part
+ part_len = remainder_size;
+ } else {
+ part_len = obj_max_req_size;
+ }
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): part_num: " << part << " part_len: " << part_len << dendl;
+ parts_len.emplace_back(part_len);
+ remainder_size -= part_len;
+ }
+
+ bufferlist parts_bl;
+ ceph::encode(parts_len, parts_bl);
+ attrs[RGW_ATTR_CRYPT_PARTS] = std::move(parts_bl);
+ }
+ return 0;
+}
+
+void D4NFilterObject::set_attrs_from_obj_state(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
{
bufferlist bl_val;
bl_val.append(std::to_string(this->get_size()));
return;
}
-int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version)
+int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, rgw::sal::Attrs& attrs)
{
//versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
if (! this->have_instance() && version.empty()) {
- bufferlist bl;
- if (this->get_attr(RGW_ATTR_ID_TAG, bl)) {
+ bufferlist bl = attrs[RGW_ATTR_ID_TAG];
+ if (bl.length()) {
version = bl.c_str();
- ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl;
- } else {
- ldpp_dout(dpp, 0) << __func__ << " Failed to find id tag" << dendl;
- return -ENOENT;
+ if (!version.empty()) {
+ ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl;
+ }
}
}
- bufferlist bl;
if (this->have_instance()) {
version = this->get_instance();
}
int ret = -1;
rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
if (is_latest_version) {
+ std::string objName = this->get_name();
+ // special handling for name starting with '_'
+ if (objName[0] == '_') {
+ objName = "_" + this->get_name();
+ }
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): objName after special Handling: " << objName << dendl;
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
- .objName = this->get_name(),
+ .objName = objName,
.bucketName = this->get_bucket()->get_name(),
.dirty = dirty,
.hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
return ret;
}
+int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty)
+{
+ rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
+
+ //update data block entries in directory
+ off_t lst = this->get_size();
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Object size =" << lst << dendl;
+ off_t fst = 0;
+ do {
+ rgw::d4n::CacheBlock block, existing_block;
+ if (fst >= lst){
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+ block.cacheObj.bucketName = this->get_bucket()->get_name();
+ block.cacheObj.objName = this->get_key().get_oid();
+ block.cacheObj.dirty = dirty;
+ block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ existing_block.cacheObj.objName = block.cacheObj.objName;
+ existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
+
+ block.size = cur_len;
+ block.blockID = fst;
+ block.version = version;
+
+ /* Store block in directory */
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+
+ int ret;
+ if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) {
+ if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
+ block = existing_block;
+ block.version = version;
+ }
+
+ block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
+ if ((ret = blockDir->set(dpp, &block, y)) < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
+ return ret;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
+ return ret;
+ }
+ fst += cur_len;
+ } while(fst < lst);
+
+ return 0;
+}
+
+int D4NFilterObject::delete_data_block_cache_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty)
+{
+ //delete cache entries
+ off_t lst = this->get_size();
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Object size =" << lst << dendl;
+ off_t fst = 0;
+ do {
+ if (fst >= lst){
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+
+ std::string key = get_bucket()->get_name() + "_" + version + "_" + get_name() + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+ std::string key_in_cache;
+ if (dirty) {
+ key_in_cache = "D_" + key;
+ } else {
+ key_in_cache = key;
+ }
+ int ret;
+ if ((ret = driver->get_cache_driver()->delete_data(dpp, key_in_cache, y)) == 0) {
+ if (!(ret = driver->get_policy_driver()->get_cache_policy()->erase(dpp, key, y))) {
+ ldpp_dout(dpp, 0) << "Failed to delete policy entry for: " << key << ", ret=" << ret << dendl;
+ return ret;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to delete cache entry for: " << key_in_cache << ", ret=" << ret << dendl;
+ return ret;
+ }
+ fst += cur_len;
+ } while(fst < lst);
+
+ return 0;
+}
+
bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, rgw::d4n::CacheBlock& blk, optional_yield y)
{
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): this->get_oid(): " << this->get_oid() << dendl;
rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
.objName = this->get_oid(), //version-enabled buckets will not have version for latest version, so this will work even when version is not provided in input
}
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): this->obj oid is: " << this->obj.key.name << "instance is: " << this->obj.key.instance << dendl;
attrs = this->get_attrs();
- this->set_obj_state_attrs(dpp, y, attrs);
+ this->set_attrs_from_obj_state(dpp, y, attrs);
- ret = calculate_version(dpp, y, version);
- if (ret < 0 || version.empty()) {
+ calculate_version(dpp, y, version, attrs);
+ if (version.empty()) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
}
-
+ std::string objName = this->get_name();
head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name();
if (this->driver->get_policy_driver()->get_cache_policy()->exist_key(head_oid_in_cache) > 0) {
ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed, ret=" << ret << dendl;
return ret;
}
- if (params.part_num) {
- params.parts_count = next->params.parts_count;
- if (params.parts_count > 1) {
- ldpp_dout(dpp, 20) << __func__ << "params.part_count: " << params.parts_count << dendl;
- return 0; // d4n wont handle multipart read requests with part number for now
- }
- }
+
+ params.parts_count = next->params.parts_count;
this->source->load_obj_state(dpp, y);
attrs = source->get_attrs();
- source->set_obj_state_attrs(dpp, y, attrs);
-
- ret = source->calculate_version(dpp, y, version);
- if (ret < 0 || version.empty()) {
+ source->set_attrs_from_obj_state(dpp, y, attrs);
+ source->calculate_version(dpp, y, version, attrs);
+ if (version.empty()) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
}
+ this->source->set_attr_crypt_parts(dpp, y, attrs);
+
bufferlist bl;
head_oid_in_cache = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name();
ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): failed to cache head object during eviction, ret=" << ret << dendl;
}
} else {
+ /*
+ The following if statement handles the following:
+ 1. When part_num is given: if it is anything other than 1 and if source is not multipart, then return error
+ 2. When part_num is 0 and source is multipart
+ In both the cases the head is fetched from the backend store.
+ */
+ if (params.part_num || (!params.part_num && source->is_multipart())) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): source->is_multipart()= " << source->is_multipart() << dendl;
+ if (params.part_num) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): *(params.part_num)= " << *(params.part_num) << dendl;
+ }
+ if (!source->is_multipart()) {
+ if (params.part_num && *(params.part_num) != 1) {
+ return -ERR_INVALID_PART;
+ }
+ } else {
+ next->params = params;
+ auto ret = next->prepare(y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): next->prepare failed, ret=" << ret << dendl;
+ return ret;
+ }
+ params.parts_count = next->params.parts_count;
+ return 0;
+ }
+ }
+ bufferlist etag_bl;
+ if (get_attr(dpp, RGW_ATTR_ETAG, etag_bl, y) < 0) {
+ return -EINVAL;
+ }
+
+ if (params.mod_ptr || params.unmod_ptr) {
+ if (params.mod_ptr && !params.if_nomatch) {
+ ldpp_dout(dpp, 10) << "If-Modified-Since: " << *params.mod_ptr << " Last-Modified: " << source->get_mtime() << dendl;
+ if (!(*params.mod_ptr < source->get_mtime())) {
+ return -ERR_NOT_MODIFIED;
+ }
+ }
+
+ if (params.unmod_ptr && !params.if_match) {
+ ldpp_dout(dpp, 10) << "If-Modified-Since: " << *params.unmod_ptr << " Last-Modified: " << source->get_mtime() << dendl;
+ if (*params.unmod_ptr < source->get_mtime()) {
+ return -ERR_PRECONDITION_FAILED;
+ }
+ }
+ }
+
+ if (params.if_match) {
+ std::string if_match_str = rgw_string_unquote(params.if_match);
+ ldpp_dout(dpp, 10) << "If-Match: " << if_match_str << " ETAG: " << etag_bl.c_str() << dendl;
+
+ if (if_match_str.compare(0, etag_bl.length(), etag_bl.c_str(), etag_bl.length()) != 0) {
+ return -ERR_PRECONDITION_FAILED;
+ }
+ }
+ if (params.if_nomatch) {
+ std::string if_nomatch_str = rgw_string_unquote(params.if_nomatch);
+ ldpp_dout(dpp, 10) << "If-No-Match: " << if_nomatch_str << " ETAG: " << etag_bl.c_str() << dendl;
+ if (if_nomatch_str.compare(0, etag_bl.length(), etag_bl.c_str(), etag_bl.length()) == 0) {
+ return -ERR_NOT_MODIFIED;
+ }
+ }
+
+ if (params.lastmod) {
+ *params.lastmod = source->get_mtime();
+ }
+
if(perfcounter) {
perfcounter->inc(l_rgw_d4n_cache_hits);
}
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y);
if (source->dest_object && source->dest_bucket) {
+ D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+ std::string dest_version = d4n_dest_object->get_object_version();
rgw::d4n::CacheBlock dest_block;
dest_block.cacheObj.objName = source->dest_object->get_oid();
dest_block.cacheObj.bucketName = source->dest_bucket->get_name();
dest_block.blockID = ofs;
dest_block.size = len;
dest_block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
- dest_block.version = source->dest_version;
+ dest_block.version = dest_version;
dest_block.cacheObj.dirty = true;
- std::string key = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name() +
+ std::string key = source->dest_bucket->get_name() + "_" + dest_version + "_" + source->dest_object->get_name() +
"_" + std::to_string(ofs) + "_" + std::to_string(len);
std::string dest_oid_in_cache = "D_" + key;
auto ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, y);
if (ret == 0) {
rgw::sal::Attrs attrs;
- ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " destination object version in update method is: " << source->dest_version << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " destination object version in update method is: " << dest_version << dendl;
ret = source->driver->get_cache_driver()->put(dpp, dest_oid_in_cache, bl, bl.length(), attrs, y);
if (ret == 0) {
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), source->dest_version, true, y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), dest_version, true, y);
}
if (ret = source->driver->get_block_dir()->set(dpp, &dest_block, y); ret < 0){
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
//len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache
uint64_t cost = 0, len_to_read = 0, part_len = 0;
- aio = rgw::make_throttle(window_size, y);
-
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size <<
- " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
-
- this->offset = ofs;
-
- rgw::d4n::CacheBlock block;
- block.cacheObj.objName = source->get_key().get_oid();
- block.cacheObj.bucketName = source->get_bucket()->get_name();
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
- do {
- uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
- if (start_part_num == (num_parts - 1)) {
- len_to_read = len;
- part_len = len;
- cost = len;
- } else {
- len_to_read = obj_max_req_size;
- cost = obj_max_req_size;
- part_len = obj_max_req_size;
- }
- if (start_part_num == 0) {
- len_to_read -= diff_ofs;
- id += diff_ofs;
- read_ofs = diff_ofs;
- }
+ if ((params.part_num && !source->is_multipart()) || !params.part_num) {
+ aio = rgw::make_throttle(window_size, y);
- block.blockID = adjusted_start_ofs;
- block.size = part_len;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << " num_parts " << num_parts << dendl;
- ceph::bufferlist bl;
- std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+ this->offset = ofs;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
- " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.objName = source->get_key().get_oid();
+ block.cacheObj.bucketName = source->get_bucket()->get_name();
- int ret;
- if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
- auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ do {
+ uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
+ if (start_part_num == (num_parts - 1)) {
+ len_to_read = len;
+ part_len = len;
+ cost = len;
+ } else {
+ len_to_read = obj_max_req_size;
+ cost = obj_max_req_size;
+ part_len = obj_max_req_size;
+ }
+ if (start_part_num == 0) {
+ len_to_read -= diff_ofs;
+ id += diff_ofs;
+ read_ofs = diff_ofs;
+ }
- if (it != block.cacheObj.hostsList.end()) { /* Local copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
- std::string key = oid_in_cache;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.cacheObj.dirty << dendl;
+ block.blockID = adjusted_start_ofs;
+ block.size = part_len;
- if (block.cacheObj.dirty == true) {
- key = "D_" + oid_in_cache; // we keep track of dirty data in the cache for the metadata failure case
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl;
- }
+ ceph::bufferlist bl;
+ std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
+ " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if (block.version == version) {
- if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
- // Read From Cache
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
+ int ret;
+ if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+ auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
- this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
+ if (it != block.cacheObj.hostsList.end()) { /* Local copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
+ std::string key = oid_in_cache;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.cacheObj.dirty << dendl;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
- auto r = flush(dpp, std::move(completed), y);
+ if (block.cacheObj.dirty == true) {
+ key = "D_" + oid_in_cache; // we keep track of dirty data in the cache for the metadata failure case
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl;
+ }
- if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
- return r;
- }
- // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
- } else {
- int r = -1;
- if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
- if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ if (block.version == version) {
+ if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+ // Read From Cache
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
- auto r = drain(dpp, y);
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+ auto r = flush(dpp, std::move(completed), y);
- break;
- }
- }
- // if (block.version == version)
- } else {
- // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
+ if (r < 0) {
+ drain(dpp, y);
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+ return r;
+ }
+ // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
+ } else {
+ int r = -1;
+ if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl;
+
+ if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ auto r = drain(dpp, y);
- auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
- }
- // if (it != block.cacheObj.hostsList.end())
- } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
+ break;
+ }
}
- // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
- } else if (ret == -ENOENT) {
- block.blockID = adjusted_start_ofs;
- block.size = obj_max_req_size;
-
- if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
- auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
- if (it != block.cacheObj.hostsList.end()) { /* Local copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
+ // if (block.version == version)
+ } else {
+ // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
- if (block.version == version) {
- oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
- std::string key = oid_in_cache;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- //for ranged requests, for last part, the whole part might exist in the cache
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
- " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+ auto r = drain(dpp, y);
- if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
- // Read From Cache
- if (block.cacheObj.dirty == true){
- key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
- }
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
+ }
+ // if (it != block.cacheObj.hostsList.end())
+ } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ }
+ // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
+ } else if (ret == -ENOENT) {
+ block.blockID = adjusted_start_ofs;
+ uint64_t obj_size = source->get_size(), chunk_size = 0;
+ if (obj_size < obj_max_req_size) {
+ chunk_size = obj_size;
+ } else {
+ chunk_size = obj_max_req_size;
+ }
+ block.size = chunk_size;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
+ if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+ auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
+ if (it != block.cacheObj.hostsList.end()) { /* Local copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
- this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
+ if (block.version == version) {
+ oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(chunk_size);
+ std::string key = oid_in_cache;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
- auto r = flush(dpp, std::move(completed), y);
+ //for range requests, for last part, the whole part might exist in the cache
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
+ " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
- return r;
- }
- // if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
- } else {
- int r = -1;
- if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+ if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+ // Read From Cache
+ if (block.cacheObj.dirty == true){
+ key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+ }
- if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
- auto r = drain(dpp, y);
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, chunk_size)));
- break;
- }
- }
- // if (it != block.cacheObj.hostsList.end())
- } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
- }
- // if (block.version == version)
- } else {
- // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+ auto r = flush(dpp, std::move(completed), y);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ if (r < 0) {
+ drain(dpp, y);
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+ return r;
+ }
+ // if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
+ } else {
+ int r = -1;
+ if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+
+ if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- auto r = drain(dpp, y);
+ auto r = drain(dpp, y);
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
- }
- // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
- } else if (ret == -ENOENT) { /* Fetch from backend */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
- auto r = drain(dpp, y);
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
- break;
+ break;
+ }
+ }
+ // if (it != block.cacheObj.hostsList.end())
+ } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
}
- // else if (ret == -ENOENT)
- } else { /* Fetch from backend */
- if (ret < 0)
- ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
+ // if (block.version == version)
+ } else {
+ // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
auto r = drain(dpp, y);
+
if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
}
-
break;
}
+ } else if (ret == -ENOENT) { // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
+ block.blockID = adjusted_start_ofs;
+ uint64_t last_part_size = source->get_size() - adjusted_start_ofs;
+ block.size = last_part_size;
+ if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+ auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ if (it != block.cacheObj.hostsList.end()) { /* Local copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
+ if (block.version == version) {
+ oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(last_part_size);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
+ " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+ if ((part_len != last_part_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+ // Read From Cache
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: oid_in_cache=" << oid_in_cache << dendl;
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, last_part_size)));
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+ auto r = flush(dpp, std::move(completed), y);
+ if (r < 0) {
+ drain(dpp, y);
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+ return r;
+ }
+ } else { // if get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists
+ int r = -1;
+ if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+ if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
+ }
+ }
+ } else {// if (block.version == version)
+ //TODO: return retry error
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
+ }
+ } else if (block.cacheObj.hostsList.size()) {
+ //TODO: get remote copy
+ }
+ } else if (ret == -ENOENT) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
+ }
+ }
+ } else { // else if (ret == -ENOENT)
+ if (ret < 0)
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
- if (start_part_num == (num_parts - 1)) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- return drain(dpp, y);
- } else {
- adjusted_start_ofs += obj_max_req_size;
- }
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- start_part_num += 1;
- len -= obj_max_req_size;
- } while (start_part_num < num_parts);
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+
+ break;
+ }
+
+ if (start_part_num == (num_parts - 1)) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ return drain(dpp, y);
+ } else {
+ adjusted_start_ofs += obj_max_req_size;
+ }
+ start_part_num += 1;
+ len -= obj_max_req_size;
+ } while (start_part_num < num_parts);
+ }
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
Attrs obj_attrs;
obj_attrs = source->get_attrs();
}
- if (source->is_compressed() || obj_attrs.find(RGW_ATTR_CRYPT_MODE) != obj_attrs.end() || !y) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Skipping writing to cache" << dendl;
- this->cb->bypass_cache_write();
- }
-
- if (start_part_num != 0) {
- ofs = adjusted_start_ofs;
- }
-
- this->cb->set_ofs(ofs);
- auto r = next->iterate(dpp, ofs, end, this->cb.get(), y);
-
+ this->cb->set_ofs(diff_ofs);
+ this->cb->set_adjusted_start_ofs(adjusted_start_ofs);
+ this->cb->set_part_num(start_part_num);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): adjusted_start_ofs: " << adjusted_start_ofs << " end: " << end << dendl;
+ auto r = next->iterate(dpp, adjusted_start_ofs, end, this->cb.get(), y);
//calculate the number of blocks read from backend store, and increment the perfcounter using that
if(perfcounter) {
- uint64_t len_to_read_from_store = ((end - ofs) + 1);
+ uint64_t len_to_read_from_store = ((end - adjusted_start_ofs) + 1);
uint64_t num_blocks = (len_to_read_from_store%obj_max_req_size) == 0 ? len_to_read_from_store/obj_max_req_size : (len_to_read_from_store/obj_max_req_size) + 1;
perfcounter->inc(l_rgw_d4n_cache_misses, num_blocks);
}
ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, ret=" << r << dendl;
return r;
}
-
+ /* Copy params out of next */
+ params = next->params;
return this->cb->flush_last_part();
}
int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
auto rgw_get_obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
-
+ ldpp_dout(dpp, 20) << __func__ << ": bl_ofs is: " << bl_ofs << " bl_len is: " << bl_len << " ofs is: " << ofs << " part_count: " << part_count << dendl;
if (!last_part && bl.length() <= rgw_get_obj_max_req_size) {
if (client_cb) {
- auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
+ int r = 0;
+ //ranged request
+ if (bl_ofs != ofs && part_count == 0) {
+ if (ofs < bl_len) { // this can happen in case of multipart where each chunk returned is not always of size rgw_get_obj_max_req_size
+ off_t bl_part_len = bl_len - ofs;
+ ldpp_dout(dpp, 20) << __func__ << ": bl_part_len is: " << bl_part_len << dendl;
+ bufferlist bl_part;
+ bl.begin(ofs).copy(bl_part_len, bl_part);
+ ldpp_dout(dpp, 20) << __func__ << ": bl_part.length() is: " << bl_part.length() << dendl;
+ r = client_cb->handle_data(bl_part, 0, bl_part_len);
+ part_count += 1;
+ } else {
+ ofs = ofs - bl_len; //re-adjust the offset
+ ldpp_dout(dpp, 20) << __func__ << ": New value ofs is: " << ofs << dendl;
+ }
+ } else {
+ r = client_cb->handle_data(bl, bl_ofs, bl_len);
+ part_count += 1;
+ }
if (r < 0) {
+ ldpp_dout(dpp, 20) << __func__ << ": error returned is: " << r << dendl;
return r;
}
}
block.version = version;
if (source->dest_object && source->dest_bucket) {
- dest_prefix = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name();
+ D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+ std::string dest_version = d4n_dest_object->get_object_version();
+ dest_prefix = source->dest_bucket->get_name() + "_" + dest_version + "_" + source->dest_object->get_name();
dest_block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
dest_block.cacheObj.objName = source->dest_object->get_key().get_oid();
dest_block.cacheObj.bucketName = source->dest_object->get_bucket()->get_name();
//dest_block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
dest_block.cacheObj.dirty = false;
- dest_block.version = source->dest_version;
+ dest_block.version = dest_version;
}
//populating fields needed for building directory index
ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
- std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ std::string oid = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
- block.blockID = ofs;
+ block.blockID = adjusted_start_ofs;
block.size = bl.length();
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
std::string objEtag = "";
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, *y);
/* Store block in directory */
existing_block.blockID = block.blockID;
}
}
if (source->dest_object && source->dest_bucket) {
- std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
- dest_block.blockID = ofs;
+ D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+ std::string dest_version = d4n_dest_object->get_object_version();
+ std::string dest_oid = dest_prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
+ dest_block.blockID = adjusted_start_ofs;
dest_block.size = bl.length();
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, *y);
if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
}
}
}
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
- std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
- block.blockID = ofs;
+ std::string oid = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
+ block.blockID = adjusted_start_ofs;
block.size = bl.length();
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, *y);
/* Store block in directory */
existing_block.blockID = block.blockID;
}
}
if (source->dest_object && source->dest_bucket) {
- std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
- dest_block.blockID = ofs;
+ D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+ std::string dest_version = d4n_dest_object->get_object_version();
+ std::string dest_oid = dest_prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
+ dest_block.blockID = adjusted_start_ofs;
dest_block.size = bl.length();
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, *y);
if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
}
}
}
}
- ofs += bl_len;
+ adjusted_start_ofs += bl_len;
} else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
bl_rem.claim_append(bl_copy);
if (bl_rem.length() == rgw_get_obj_max_req_size) {
- std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+ std::string oid = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_rem.length());
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
- block.blockID = ofs;
+ block.blockID = adjusted_start_ofs;
block.size = bl_rem.length();
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, *y);
/* Store block in directory */
existing_block.blockID = block.blockID;
}
if (source->dest_object && source->dest_bucket) {
- std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
- dest_block.blockID = ofs;
+ D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+ std::string dest_version = d4n_dest_object->get_object_version();
+ std::string dest_oid = dest_prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_rem.length());
+ dest_block.blockID = adjusted_start_ofs;
dest_block.size = bl_rem.length();
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl_rem.length(), source->dest_version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, *y);
if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
}
}
}
}
- ofs += bl_rem.length();
+ adjusted_start_ofs += bl_rem.length();
bl_rem.clear();
bl_rem = std::move(bl);
}//bl_rem.length()
std::string version, policy_prefix;
if (!source->get_bucket()->versioned()) {
- version = source->get_object_version();
+ version = source->get_object_version();
} else if (source->get_bucket()->versioned() && !source->have_instance()) {
rgw::d4n::CacheBlock deleteBlock;
block.prevVersion = std::pair<std::string, bool>(block.version, block.deleteMarker);
if (!objDirty) { // object written to backend
return next->delete_obj(dpp, y, flags);
} else {
- if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->eraseObj(dpp, policy_prefix, y))) {
+ std::string object_key = source->get_bucket()->get_name() + "_" + source->get_oid();
+ if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->eraseObj(dpp, object_key, y))) {
ldpp_dout(dpp, 0) << "Failed to delete policy object entry for: " << source->get_name() << ", ret=" << ret << dendl;
return -ENOENT;
} else {
int D4NFilterWriter::prepare(optional_yield y)
{
startTime = time(NULL);
-
- int ret;
- if ((ret = driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y)) < 0)
- ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data() method failed, ret=" << ret << dendl;
-
d4n_writecache = g_conf()->d4n_writecache_enabled;
+
if (!d4n_writecache) {
- ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): calling next process" << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling next->prepare" << dendl;
return next->prepare(y);
+ } else {
+ //for non-versioned buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted
+ //alternatively, we could add logic to delete this lazily
+ if (!object->get_bucket()->versioned()) {
+ std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = object->get_delete_op();
+ auto ret = del_op->delete_obj(dpp, y, rgw::sal::FLAG_LOG_OP);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): delete_obj failed, ret=" << ret << dendl;
+ }
+ }
}
+ std::string version;
if (!object->have_instance()) {
if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended
- this->version = "null";
- object->set_instance(this->version);
+ version = "null";
+ object->set_instance(version);
} else {
constexpr uint32_t OBJ_INSTANCE_LEN = 32;
char buf[OBJ_INSTANCE_LEN + 1];
gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
- this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
+ version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): generating version: " << version << dendl;
}
} else {
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version is: " << object->get_instance() << dendl;
+ version = object->get_instance();
}
+ object->set_object_version(version);
+ this->version = version;
return 0;
}
off_t bl_len = bl.length();
off_t ofs = offset;
bool dirty = true;
- rgw::d4n::CacheBlock block, existing_block;
- std::string version;
+ std::string version = object->get_object_version();
std::string prefix;
- if (object->have_instance()) {
- version = obj->get_instance();
- } else {
- version = this->version;
- }
- prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
- rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
-
- block.cacheObj.bucketName = obj->get_bucket()->get_name();
- block.cacheObj.objName = obj->get_key().get_oid();
- block.cacheObj.dirty = dirty;
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
- existing_block.cacheObj.objName = block.cacheObj.objName;
- existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
+ prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
int ret = 0;
if (!d4n_writecache) {
ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): calling next process" << dendl;
return next->process(std::move(data), offset);
} else {
+ rgw::sal::Attrs attrs;
std::string oid = prefix + "_" + std::to_string(ofs);
std::string key = "D_" + oid + "_" + std::to_string(bl_len);
std::string oid_in_cache = oid + "_" + std::to_string(bl_len);
- block.size = bl.length();
- block.blockID = ofs;
- block.cacheObj.dirty = true;
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
- block.version = version;
dirty = true;
- ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, y);
+ ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, bl.length(), y);
if (ret == 0) {
if (bl.length() > 0) {
ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): key is: " << key << dendl;
- ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), obj->get_attrs(), y);
+ ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl;
driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y);
-
- /* Store block in directory */
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
-
- if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) {
- if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
- block = existing_block;
- block.version = version;
- }
-
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
- if ((ret = blockDir->set(dpp, &block, y)) < 0)
- ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
- } else {
- ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
- }
} else {
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): ERROR: writting data to the cache failed, ret=" << ret << dendl;
return ret;
std::unordered_set<std::string> hostsList = {};
auto creationTime = startTime;
std::string objEtag = etag;
- bool write_to_backend_store = false;
+ auto size = object->get_size();
+ std::string instance;
+ if (object->have_instance()) {
+ instance = object->get_instance();
+ }
int ret;
+ /* for cache coherence, we are going to cache the head even in case when read-only cache is enabled, just that
+ the head will not be marked dirty and the entire object will written to backend store also. In case write-back
+ cache is enabled, the head will be cached as dirty. */
if (d4n_writecache) {
- dirty = true;
- std::string version;
- if (object->have_instance()) {
- version = obj->get_instance();
- } else {
- version = this->version;
- }
- std::string key = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
+ auto ret = object->get_obj_attrs(y, dpp);
+ if (if_match) {
+ if (strcmp(if_match, "*") == 0) {
+ if (ret == -ENOENT) {
+ object->delete_data_block_cache_entries(dpp, y, this->version, true);
+ return -ERR_PRECONDITION_FAILED;
+ }
+ } else {
+ rgw::sal::Attrs attrs = object->get_attrs();
+ bufferlist bl;
+ auto iter = attrs.find(RGW_ATTR_ETAG);
+ if (iter == attrs.end()) {
+ object->delete_data_block_cache_entries(dpp, y, this->version, true);
+ return -ERR_PRECONDITION_FAILED;
+ } else {
+ bl = iter->second;
+ }
+ if (strncmp(if_match, bl.c_str(), bl.length()) != 0) {
+ object->delete_data_block_cache_entries(dpp, y, this->version, true);
+ return -ERR_PRECONDITION_FAILED;
+ }
+ }
+ }
+ if (if_nomatch) {
+ if (strcmp(if_nomatch, "*") == 0) {
+ if (ret != -ENOENT) {
+ object->delete_data_block_cache_entries(dpp, y, this->version, true);
+ return -ERR_PRECONDITION_FAILED;
+ }
+ } else {
+ rgw::sal::Attrs attrs = object->get_attrs();
+ bufferlist bl;
+ auto iter = attrs.find(RGW_ATTR_ETAG);
+ if (iter == attrs.end()) {
+ object->delete_data_block_cache_entries(dpp, y, this->version, true);
+ return -ERR_PRECONDITION_FAILED;
+ } else {
+ bl = iter->second;
+ }
+ if (strncmp(if_nomatch, bl.c_str(), bl.length()) == 0) {
+ object->delete_data_block_cache_entries(dpp, y, this->version, true);
+ return -ERR_PRECONDITION_FAILED;
+ }
+ }
+ }
+ //get_obj_attrs will override object version and size with older values, hence setting it here again
+ object->set_object_version(this->version);
+ object->set_instance(instance);
+ object->set_obj_size(size);
+ //update data block entries in directory
+ ret = object->set_data_block_dir_entries(dpp, y, this->version, true);
+ if (ret < 0) {
+ return ret;
+ }
+
+ dirty = true;
ceph::real_time m_time;
+ dirty = true;
if (mtime) {
m_time = *mtime;
} else {
object->set_mtime(m_time);
object->set_accounted_size(accounted_size);
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " size is: " << object->get_size() << dendl;
- object->set_obj_state_attrs(dpp, y, attrs);
- bufferlist bl;
- std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+ object->set_attr_crypt_parts(dpp, y, attrs);
+ object->set_attrs(attrs);
+ object->set_attrs_from_obj_state(dpp, y, attrs);
+ } else {
+ // we need to call next->complete here so that we are able to correctly get the object state needed for caching head
+ ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
+ delete_at, if_match, if_nomatch, user_data, zones_trace,
+ canceled, rctx, flags);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): writing to backend store failed, ret=" << ret << dendl;
+ return ret;
+ }
+ object->load_obj_state(dpp, y);
+ attrs = object->get_attrs();
+ object->set_attrs_from_obj_state(dpp, y, attrs);
+
+ std::string version;
+ object->calculate_version(dpp, y, version, attrs);
+ if (version.empty()) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+ }
+ }
+
+ std::string version = object->get_object_version();
+ std::string key = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
+
+ bufferlist bl;
+ std::string head_oid_in_cache;
+ //same as key, as there is no len or offset attached to head oid in cache
+ if (dirty) {
+ head_oid_in_cache = "D_" + key;
+ } else {
+ head_oid_in_cache = key;
+ }
+ ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): head_oid_in_cache is: " << head_oid_in_cache << dendl;
+ ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+ if (ret == 0) {
ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
attrs.erase("user.rgw.mtime");
attrs.erase("user.rgw.object_size");
attrs.erase("user.rgw.accounted_size");
attrs.erase("user.rgw.epoch");
+ attrs.erase("user.rgw.multipart");
object->set_object_version(version);
if (ret == 0) {
- object->set_attrs(attrs);
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl;
driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
- ret = object->set_head_obj_dir_entry(dpp, y, true, true);
+ ret = object->set_head_obj_dir_entry(dpp, y, true, dirty);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
return ret;
}
- driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y);
+ if (dirty) {
+ //using object oid here so that version is automatically picked for versioned buckets, and for non-versioned buckets the old version is replaced by the latest version
+ std::string object_key = obj->get_bucket()->get_name() + "_" + obj->get_oid();
+ ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): object_key=" << object_key << dendl;
+ driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, object_key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y);
+ }
//write object to directory.
hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address };
return ret;
}
} else { //if get_cache_driver()->put()
- write_to_backend_store = true;
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): put failed for head_oid_in_cache, ret=" << ret << dendl;
- ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling complete of backend store: " << dendl;
+ return ret;
}
- } else { // if d4n_writecache = true
- write_to_backend_store = true;
+ } else {
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): eviction failed for head_oid_in_cache, ret=" << ret << dendl;
+ return ret;
}
+ return 0;
+}
- if (write_to_backend_store) {
- ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
- delete_at, if_match, if_nomatch, user_data, zones_trace,
- canceled, rctx, flags);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): writing to backend store failed, ret=" << ret << dendl;
+int D4NFilterMultipartUpload::complete(const DoutPrefixProvider *dpp,
+ optional_yield y, CephContext* cct,
+ std::map<int, std::string>& part_etags,
+ std::list<rgw_obj_index_key>& remove_objs,
+ uint64_t& accounted_size, bool& compressed,
+ RGWCompressionInfo& cs_info, off_t& ofs,
+ std::string& tag, ACLOwner& owner,
+ uint64_t olh_epoch,
+ rgw::sal::Object* target_obj,
+ prefix_map_t& processed_prefixes)
+{
+ //call next->complete to complete writing the object to the backend store
+ auto ret = next->complete(dpp, y, cct, part_etags, remove_objs, accounted_size,
+ compressed, cs_info, ofs, tag, owner, olh_epoch,
+ nextObject(target_obj), processed_prefixes);
+ if (ret < 0) {
+ return ret;
+ }
+
+ //Cache only the head object for multipart objects
+ D4NFilterObject* d4n_target_obj = dynamic_cast<D4NFilterObject*>(target_obj);
+ std::string head_oid_in_cache;
+ rgw::sal::Attrs attrs;
+ d4n_target_obj->load_obj_state(dpp, y);
+ attrs = d4n_target_obj->get_attrs();
+ d4n_target_obj->set_attrs_from_obj_state(dpp, y, attrs);
+ bufferlist bl_val;
+ bool is_multipart = true;
+ bl_val.append(std::to_string(is_multipart));
+ attrs["user.rgw.multipart"] = std::move(bl_val);
+
+ std::string version;
+ d4n_target_obj->calculate_version(dpp, y, version, attrs);
+ if (version.empty()) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+ }
+
+ bufferlist bl;
+ head_oid_in_cache = d4n_target_obj->get_bucket()->get_name() + "_" + version + "_" + d4n_target_obj->get_name();
+ // we are evicting data if needed, since the head object will be a part of read cache, as the whole multipart object is written to the backend store
+ ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+ if (ret == 0) {
+ ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+ if (ret == 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterMultipartUpload::" << __func__ << " version stored in update method is: " << d4n_target_obj->get_object_version() << dendl;
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
+ ret = d4n_target_obj->set_head_obj_dir_entry(dpp, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
+ }
+ //write object to directory.
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ .objName = d4n_target_obj->get_oid(),
+ .bucketName = d4n_target_obj->get_bucket()->get_name(),
+ .creationTime = std::to_string(ceph::real_clock::to_double(target_obj->get_mtime())),
+ .dirty = false,
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
+ };
+ ret = driver->get_obj_dir()->set(dpp, &object, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterMultipartUpload::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
+ return ret;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): put for head object failed, ret=" << ret << dendl;
+ return ret;
}
+ } else {
+ ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): failed to cache head object during eviction, ret=" << ret << dendl;
+ return ret;
}
return 0;