}
}
+ //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
+ if (! this->source->have_instance()) {
+ RGWObjState* state = nullptr;
+ if (this->source->get_obj_state(dpp, &state, y) == 0) {
+ auto it = state->attrset.find(RGW_ATTR_ID_TAG);
+ if (it != state->attrset.end()) {
+ bufferlist bl = it->second;
+ this->source->set_object_version(bl.c_str());
+ ldpp_dout(dpp, 20) << __func__ << "id tag version is: " << this->source->get_object_version() << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << __func__ << "Failed to find id tag" << dendl;
+ }
+ }
+ }
+
return ret;
}
RGWGetDataCB* cb, optional_yield y)
{
const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
- std::string oid = source->get_key().get_oid();
+ std::string version = source->get_object_version();
+ std::string prefix;
+ if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+ prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid();
+ } else {
+ prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid();
+ }
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl;
+ ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "prefix: " << prefix << dendl;
+ ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl;
this->client_cb = cb;
- this->cb->set_client_cb(cb, dpp, &y); // what's this for? -Sam
+ this->cb->set_client_cb(cb, dpp, &y);
+ this->cb->set_prefix(prefix);
/* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
}
ceph::bufferlist bl;
- std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+ std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+ if (version.empty()) {
+ version = source->get_instance();
+ }
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, version, y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
return r;
}
} else {
- oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+ oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
//for ranged requests, for last part, the whole part might exist in the cache
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
this->cb->bypass_cache_write();
}
- if (start_part_num == 0) {
- this->cb->set_ofs(ofs);
- } else {
- this->cb->set_ofs(adjusted_start_ofs);
- ofs = adjusted_start_ofs; // redundant? -Sam
+ if (start_part_num != 0) {
+ ofs = adjusted_start_ofs;
}
this->cb->set_ofs(ofs);
//Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
if (write_to_cache) {
const std::lock_guard l(d4n_get_data_lock);
- rgw::d4n::CacheBlock block;
+ rgw::d4n::CacheBlock block, existing_block;
rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
- block.version = ""; // TODO: initialize correctly
block.hostsList.push_back(blockDir->cct->_conf->rgw_local_cache_address);
block.cacheObj.objName = source->get_key().get_oid();
block.cacheObj.bucketName = source->get_bucket()->get_name();
block.cacheObj.creationTime = to_iso_8601(source->get_mtime());
block.cacheObj.dirty = false;
+
+ //populating fields needed for building directory index
+ existing_block.cacheObj.objName = block.cacheObj.objName;
+ existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
Attrs attrs; // empty attrs for cache sets
+ std::string version = source->get_object_version();
+ if (version.empty()) {
+ version = source->get_instance();
+ }
+ ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
- std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
- block.blockID = ofs;
- block.size = bl.length();
- if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
- if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
-
- /* Store block in directory */
- if (!blockDir->exist_key(&block, *y)) { // If the block exists, do we want to update anything else? -Sam
- if (blockDir->set(&block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+ block.blockID = ofs;
+ block.size = bl.length();
+ block.version = version;
+ auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+ if (ret == 0) {
+ //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
+ ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+ if (ret == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+
+ /* Store block in directory */
+ if (!blockDir->exist_key(&block, *y)) {
+ if (blockDir->set(&block, *y) < 0) //should we revert previous steps if this step fails?
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+ if (blockDir->get(&existing_block, *y) < 0) {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+ } else {
+ if (existing_block.version != block.version) {
+ if (blockDir->del(&existing_block, *y) < 0) //delete existing block
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+ if (blockDir->set(&block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+ }
+ }
+ }
} else {
if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
}
}
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
- std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
ofs += bl_len;
block.blockID = ofs;
block.size = bl.length();
- if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
- if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
-
- /* Store block in directory */
- if (!blockDir->exist_key(&block, *y)) {
- if (blockDir->set(&block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ block.version = version;
+ if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+ auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+ if (ret == 0) {
+ ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+ if (ret == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+
+ /* Store block in directory */
+ if (!blockDir->exist_key(&block, *y)) {
+ if (blockDir->set(&block, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+ if (blockDir->get(&existing_block, *y) < 0) {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+ }
+ if (existing_block.version != block.version) {
+ if (blockDir->del(&existing_block, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+ if (blockDir->set(&block, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl;
+ }
+ }
} else {
if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
bl_rem.claim_append(bl_copy);
if (bl_rem.length() == rgw_get_obj_max_req_size) {
- std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
- ofs += bl_rem.length();
- block.blockID = ofs;
- block.size = bl_rem.length();
- if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
- if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs) == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", *y);
-
- /* Store block in directory */
- if (!blockDir->exist_key(&block, *y)) {
- if (blockDir->set(&block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+ if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+ ofs += bl_rem.length();
+ block.blockID = ofs;
+ block.size = bl_rem.length();
+ block.version = version;
+ auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+ if (ret == 0) {
+ ret = filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs);
+ if (ret == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y);
+
+ /* Store block in directory */
+ if (!blockDir->exist_key(&block, *y)) {
+ if (blockDir->set(&block, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+ if (blockDir->get(&existing_block, *y) < 0) {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+ } else {
+ if (existing_block.version != block.version) {
+ if (blockDir->del(&existing_block, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+ if (blockDir->set(&block, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+ }
+ }
+ }
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
- }
+ ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured while caching oid: " << oid << " error: " << ret << dendl;
+ }
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured during eviction: " << " error: " << ret << dendl;
}
}
bl_rem.clear();
bl_rem = std::move(bl);
- }
+ }//bl_rem.length()
}
}
+ /* Clean-up:
+ 1. do we need to clean up older versions of the cache backend, when we update version in block directory?
+ 2. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
+ 3. do we need to revert the cache ops, in case the directory ops fail
+ */
+
return 0;
}