return next->create(dpp, params, y);
}
+int D4NFilterObject::copy_object(const ACLOwner& owner,
+ const rgw_user& remote_user,
+ req_info* info,
+ const rgw_zone_id& source_zone,
+ rgw::sal::Object* dest_object,
+ rgw::sal::Bucket* dest_bucket,
+ rgw::sal::Bucket* src_bucket,
+ const rgw_placement_rule& dest_placement,
+ ceph::real_time* src_mtime,
+ ceph::real_time* mtime,
+ const ceph::real_time* mod_ptr,
+ const ceph::real_time* unmod_ptr,
+ bool high_precision_time,
+ const char* if_match,
+ const char* if_nomatch,
+ AttrsMod attrs_mod,
+ bool copy_if_newer,
+ Attrs& attrs,
+ RGWObjCategory category,
+ uint64_t olh_epoch,
+ boost::optional<ceph::real_time> delete_at,
+ std::string* version_id,
+ std::string* tag,
+ std::string* etag,
+ void (*progress_cb)(off_t, void *),
+ void* progress_data,
+ const DoutPrefixProvider* dpp,
+ optional_yield y)
+{
+ if (g_conf()->d4n_writecache_enabled) {
+ this->dest_object = dest_object;
+ this->dest_bucket = dest_bucket;
+
+ if (!dest_object->have_instance()) {
+ if (dest_object->get_bucket()->versioned() && !dest_object->get_bucket()->versioning_enabled()) { //if versioning is suspended
+ this->dest_version = "null";
+ } else {
+ constexpr uint32_t OBJ_INSTANCE_LEN = 32;
+ char buf[OBJ_INSTANCE_LEN + 1];
+ gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
+ this->dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): generating version: " << version << dendl;
+ }
+ } else {
+ this->dest_version = dest_object->get_instance();
+ }
+
+ std::unique_ptr<rgw::sal::Object::ReadOp> read_op(this->get_read_op());
+ if (auto ret = read_op->prepare(y, dpp); ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): prepare method failed with ret: " << ret << dendl;
+ return ret;
+ }
+ if (auto ret = read_op->iterate(dpp, 0, (this->get_size() - 1), nullptr, y); ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): iterate method failed with ret: " << ret << dendl;
+ return ret;
+ }
+
+ rgw::sal::Attrs baseAttrs;
+ if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
+ baseAttrs = attrs;
+ } else {
+ baseAttrs = this->get_attrs();
+ }
+
+ if (attrs_mod == rgw::sal::ATTRSMOD_REPLACE) { /* Replace */
+ rgw::sal::Attrs::iterator iter;
+
+ for (const auto& pair : attrs) {
+ iter = baseAttrs.find(pair.first);
+
+ if (iter != baseAttrs.end()) {
+ iter->second = pair.second;
+ } else {
+ baseAttrs.insert({pair.first, pair.second});
+ }
+ }
+ } else if (attrs_mod == rgw::sal::ATTRSMOD_MERGE) { /* Merge */
+ baseAttrs.insert(attrs.begin(), attrs.end());
+ }
+
+ ceph::real_time dest_mtime;
+ if (mtime) {
+ if (real_clock::is_zero(*mtime)) {
+ *mtime = real_clock::now();
+ }
+ dest_mtime = *mtime;
+ } else {
+ dest_mtime = real_clock::now();
+ }
+ dest_object->set_mtime(dest_mtime);
+ dest_object->set_obj_size(this->get_size());
+ dest_object->set_accounted_size(this->get_size());
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " size is: " << dest_object->get_size() << dendl;
+ D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(dest_object);
+ d4n_dest_object->set_obj_state_attrs(dpp, y, baseAttrs);
+ bufferlist bl_data;
+ std::string key = dest_bucket->get_name() + "_" + this->dest_version + "_" + dest_object->get_name();
+ std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+ auto ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
+ baseAttrs.erase("user.rgw.mtime");
+ baseAttrs.erase("user.rgw.object_size");
+ baseAttrs.erase("user.rgw.accounted_size");
+ baseAttrs.erase("user.rgw.epoch");
+ if (ret == 0) {
+ time_t creationTime = ceph::real_clock::to_time_t(dest_mtime);
+ dest_object->set_attrs(baseAttrs);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->dest_version << dendl;
+ bufferlist bl;
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), this->dest_version, true, y);
+ d4n_dest_object->set_object_version(this->dest_version);
+ ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ return ret;
+ }
+ driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, this->dest_version, true, dest_object->get_accounted_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_key(), y);
+
+ //write object to directory.
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ .objName = dest_object->get_oid(),
+ .bucketName = dest_object->get_bucket()->get_name(),
+ .creationTime = std::to_string(creationTime),
+ .dirty = true,
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
+ };
+ ret = driver->get_obj_dir()->set(dpp, &object, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
+ return ret;
+ }
+ }
+ } else {
+ auto ret = next->copy_object(owner, remote_user, info, source_zone,
+ nextObject(dest_object),
+ nextBucket(dest_bucket),
+ nextBucket(src_bucket),
+ dest_placement, src_mtime, mtime,
+ mod_ptr, unmod_ptr, high_precision_time, if_match,
+ if_nomatch, attrs_mod, copy_if_newer, attrs,
+ category, olh_epoch, delete_at, version_id, tag,
+ etag, progress_cb, progress_data, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->copy_object failed with ret: " << ret << dendl;
+ return ret;
+ }
+ }
+ return 0;
+}
+
int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
Attrs* delattrs, optional_yield y, uint32_t flags)
{
ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl;
bl_list.push_back(bl);
- int r = client_cb->handle_data(bl, 0, bl.length());
- if (r < 0) {
- return r;
+ if (client_cb) {
+ int r = client_cb->handle_data(bl, 0, bl.length());
+ if (r < 0) {
+ return r;
+ }
}
auto it = blocks_info.find(offset);
if (it != blocks_info.end()) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y);
+ if (source->dest_object && source->dest_bucket) {
+ rgw::d4n::CacheBlock dest_block;
+ dest_block.cacheObj.objName = source->dest_object->get_oid();
+ dest_block.cacheObj.bucketName = source->dest_bucket->get_name();
+ dest_block.cacheObj.dirty = true; //writing to cache
+ dest_block.blockID = ofs;
+ dest_block.size = len;
+ dest_block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ dest_block.version = source->dest_version;
+ dest_block.dirty = true;
+ std::string key = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name() +
+ "_" + std::to_string(ofs) + "_" + std::to_string(len);
+ std::string dest_oid_in_cache = "D_" + key;
+ auto ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, y);
+ if (ret == 0) {
+ rgw::sal::Attrs attrs;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " destination object version in update method is: " << source->dest_version << dendl;
+ ret = source->driver->get_cache_driver()->put(dpp, dest_oid_in_cache, bl, bl.length(), attrs, y);
+ if (ret == 0) {
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), source->dest_version, true, y);
+ }
+ if (ret = source->driver->get_block_dir()->set(dpp, &dest_block, y); ret < 0){
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+ }
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " eviction returned ret: " << ret << dendl;
+ }
+ }
blocks_info.erase(it);
} else {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
auto rgw_get_obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
if (!last_part && bl.length() <= rgw_get_obj_max_req_size) {
- auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
+ if (client_cb) {
+ auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
- if (r < 0) {
- return r;
+ if (r < 0) {
+ return r;
+ }
}
}
Attrs attrs; // empty attrs for cache sets
std::string version = source->get_object_version();
std::string prefix = source->get_prefix();
+ std::string dest_prefix;
- rgw::d4n::CacheBlock block, existing_block;
+ rgw::d4n::CacheBlock block, existing_block, dest_block;
rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
block.cacheObj.objName = source->get_key().get_oid();
block.cacheObj.bucketName = source->get_bucket()->get_name();
bool dirty = block.dirty = false; //Reading from the backend, data is clean
block.version = version;
+ if (source->dest_object && source->dest_bucket) {
+ dest_prefix = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name();
+ dest_block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ dest_block.cacheObj.objName = source->dest_object->get_key().get_oid();
+ dest_block.cacheObj.bucketName = source->dest_object->get_bucket()->get_name();
+ //dest_block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
+ dest_block.cacheObj.dirty = dest_block.dirty = false;
+ dest_block.version = source->dest_version;
+ }
+
//populating fields needed for building directory index
existing_block.cacheObj.objName = block.cacheObj.objName;
existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
}
}
}
+ if (source->dest_object && source->dest_bucket) {
+ std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ dest_block.blockID = ofs;
+ dest_block.size = bl.length();
+ auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+ if (ret == 0) {
+ ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
+ if (ret == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+ if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+ }
+ }
+ }
+ }
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
block.blockID = ofs;
block.size = bl.length();
- ofs += bl_len;
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
}
}
}
+ if (source->dest_object && source->dest_bucket) {
+ std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ dest_block.blockID = ofs;
+ dest_block.size = bl.length();
+ auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+ if (ret == 0) {
+ ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
+ if (ret == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+ if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+ }
+ }
+ }
+ }
+ ofs += bl_len;
} else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
block.blockID = ofs;
block.size = bl_rem.length();
- ofs += bl_rem.length();
+
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
}
}
+ if (source->dest_object && source->dest_bucket) {
+ std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+ dest_block.blockID = ofs;
+ dest_block.size = bl_rem.length();
+ auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+ if (ret == 0) {
+ ret = filter->get_cache_driver()->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
+ if (ret == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl_rem.length(), source->dest_version, dirty, *y);
+ if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+ }
+ }
+ }
+ }
+ ofs += bl_rem.length();
bl_rem.clear();
bl_rem = std::move(bl);
}//bl_rem.length()
}
/* Clean-up:
- 1. do we need to clean up older versions of the cache backend, when we update version in block directory?
- 2. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
- 3. do we need to revert the cache ops, in case the directory ops fail
+ 1. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
+ 2. do we need to revert the cache ops, in case the directory ops fail
*/
return 0;