this->offset = ofs;
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.objName = source->get_key().get_oid();
+ block.cacheObj.bucketName = source->get_bucket()->get_name();
+
do {
uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
if (start_part_num == (num_parts - 1)) {
read_ofs = diff_ofs;
}
+ block.blockID = adjusted_start_ofs;
+ block.size = part_len;
+
ceph::bufferlist bl;
std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
- // Read From Cache
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+ int ret = -1;
+ if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && (ret = source->driver->get_block_dir()->get(&block, y)) == 0) {
+ auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address);
+ if (it != block.hostsList.end()) { /* Local copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl;
+
+ if (block.version == version) {
+ if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+ // Read From Cache
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+ auto r = flush(dpp, std::move(completed), y);
+
+ if (r < 0) {
+ drain(dpp, y);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+ return r;
+ }
+ // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
+ } else {
+ oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+ //for ranged requests, for last part, the whole part might exist in the cache
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
+ " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+
+ if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+ // Read From Cache
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+ auto r = flush(dpp, std::move(completed), y);
+
+ if (r < 0) {
+ drain(dpp, y);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+ return r;
+ }
+ }
+ }
+ // if (block.version == version)
+ } else {
+ // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+ auto r = drain(dpp, y);
+
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+ return r;
+ }
+ }
+ } else if (block.hostsList.size()) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
+ // TODO: Retrieve remotely
+ }
+ // if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && int ret = source->driver->get_block_dir()->get(&block, y) == 0)
+ } else { /* Fetch from backend */
+ if (ret < 0)
+ ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << dendl;
- this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
- auto r = flush(dpp, std::move(completed), y);
+ auto r = drain(dpp, y);
if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
- return r;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+ return r;
}
- } else {
- oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
- //for ranged requests, for last part, the whole part might exist in the cache
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
- " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
-
- if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
- // Read From Cache
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
-
- this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
-
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
- auto r = flush(dpp, std::move(completed), y);
- if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
- return r;
- }
-
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
- auto r = drain(dpp, y);
-
- if (r < 0) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
- return r;
- }
-
- break;
- }
+ break;
}
if (start_part_num == (num_parts - 1)) {
}
}
} else {
- ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
}
}
}