From: Samarah Date: Wed, 6 Mar 2024 20:58:03 +0000 (+0000) Subject: rgw/d4n: Add directory probing to read workflow X-Git-Tag: testing/wip-rishabh-testing-20250426.123842-debug~14^2~37 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=8ac805bded7d4719354baa89dadd5b9b97df3117;p=ceph-ci.git rgw/d4n: Add directory probing to read workflow Signed-off-by: Samarah --- diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 1fd9437691d..20f81ffd7da 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -444,7 +444,7 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y) std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[4])); block->hostsList.clear(); - while (!ss.eof()) { + while (!ss.eof()) { // Replace with boost::split? -Sam std::string host; std::getline(ss, host, '_'); block->hostsList.push_back(host); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 605b8f9b6a7..88c10b552ba 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -572,6 +572,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->offset = ofs; + rgw::d4n::CacheBlock block; + block.cacheObj.objName = source->get_key().get_oid(); + block.cacheObj.bucketName = source->get_bucket()->get_name(); + do { uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk if (start_part_num == (num_parts - 1)) { @@ -589,59 +593,91 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int read_ofs = diff_ofs; } + block.blockID = adjusted_start_ofs; + block.size = part_len; + ceph::bufferlist bl; std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { - // Read From Cache - auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); + int ret = -1; + if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && (ret = source->driver->get_block_dir()->get(&block, y)) == 0) { + auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address); + if (it != block.hostsList.end()) { /* Local copy */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl; + + if (block.version == version) { + if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { + // Read From Cache + auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); + + this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len))); + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; + auto r = flush(dpp, std::move(completed), y); + + if (r < 0) { + drain(dpp, y); + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; + return r; + } + // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) + } else { + oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size); + //for ranged requests, for last part, the whole part might exist in the cache + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << + " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; + + if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { + // Read From Cache + auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); + + this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size))); + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; + auto r = flush(dpp, std::move(completed), y); + + if (r < 0) { + drain(dpp, y); + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; + return r; + } + } + } + // if (block.version == version) + } else { + // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + + auto r = drain(dpp, y); + + if (r < 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; + return r; + } + } + } else if (block.hostsList.size()) { /* Remote copy */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl; + // TODO: Retrieve remotely + } + // if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && int ret = source->driver->get_block_dir()->get(&block, y) == 0) + } else { /* Fetch from backend */ + if (ret < 0) + ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << dendl; - this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len))); + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; - auto r = flush(dpp, std::move(completed), y); + auto r = drain(dpp, y); if (r < 0) { - drain(dpp, y); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; - return r; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; + return r; } - } else { - oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size); - //for ranged requests, for last part, the whole part might exist in the cache - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << - " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - - if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { - // Read From Cache - auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - - this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size))); - - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; - auto r = flush(dpp, std::move(completed), y); - if (r < 0) { - drain(dpp, y); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; - return r; - } - - } else { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - - auto r = drain(dpp, y); - - if (r < 0) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; - return r; - } - - break; - } + break; } if (start_part_num == (num_parts - 1)) { @@ -756,7 +792,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } } } else { - ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl; } } } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 40c98d5d2f8..7011f2a64bf 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -159,7 +159,7 @@ class D4NFilterObject : public FilterObject { std::unique_ptr aio; uint64_t offset = 0; // next offset to write to client rgw::AioResultList completed; // completed read results, sorted by offset - std::unordered_map> blocks_info; + std::unordered_map> blocks_info; int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y); void cancel();