From: Pritha Srivastava Date: Wed, 7 May 2025 06:05:13 +0000 (+0530) Subject: rgw/d4n: optimizing iterate method to align X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=55faac7af5e64a945bfdc4f3af10266313a9c446;p=ceph.git rgw/d4n: optimizing iterate method to align last block also with max_chunk_size(object size or rgw_max_chunk_size) and to perform checks based on object size. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index de759eb6097..db419142deb 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -1910,6 +1910,11 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw:: int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, RGWGetDataCB* cb, optional_yield y) { + //special handling in case object size is zero + if (source->get_size() == 0) { + return 0; + } + const uint64_t window_size = g_conf()->rgw_get_obj_window_size; std::string version = source->get_object_version(); std::string prefix = get_cache_block_prefix(source, version); @@ -1926,17 +1931,24 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int chunks using the larger chunk, but all corner cases need to be considered like the last chunk which might be smaller than max_chunk_size and also ranged requests where a smaller chunk is overwritten by a larger chunk size != max_chunk_size */ - uint64_t max_chunk_size = g_conf()->rgw_max_chunk_size; + uint64_t max_chunk_size = std::min(g_conf()->rgw_max_chunk_size, source->get_size()); uint64_t start_part_num = 0; uint64_t part_num = ofs/max_chunk_size; //part num of ofs wrt start of the object uint64_t adjusted_start_ofs = part_num*max_chunk_size; //in case of ranged request, adjust the start offset to the beginning of a chunk/ part - uint64_t diff_ofs = ofs - adjusted_start_ofs; //difference between actual offset and adjusted offset + uint64_t start_diff_ofs = ofs - adjusted_start_ofs; //difference between actual start offset and adjusted start offset off_t len = (end - adjusted_start_ofs) + 1; uint64_t num_parts = (len%max_chunk_size) == 0 ? len/max_chunk_size : (len/max_chunk_size) + 1; //calculate num parts based on adjusted offset + uint64_t last_part_num = end/max_chunk_size; + uint64_t adjusted_end_ofs = std::min(((last_part_num + 1)*max_chunk_size - 1), (source->get_size() - 1)); //align end offset to max_chunk_size boundary in case of ranged request + uint64_t end_diff_ofs = adjusted_end_ofs - end; //difference between actual end offset and adjusted end offset + uint64_t adjusted_len = (adjusted_end_ofs - adjusted_start_ofs) + 1; //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache uint64_t cost = 0, len_to_read = 0, part_len = 0; - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_start_offset: " << adjusted_start_ofs << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_end_ofs: " << adjusted_end_ofs << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_len: " << adjusted_len << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " len: " << len << dendl; if ((params.part_num && !source->is_multipart()) || !params.part_num) { aio = rgw::make_throttle(window_size, y); @@ -1952,18 +1964,18 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int do { uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk if (start_part_num == (num_parts - 1)) { - len_to_read = len; - part_len = len; - cost = len; + len_to_read = adjusted_len - end_diff_ofs; + part_len = adjusted_len; + cost = adjusted_len; } else { len_to_read = max_chunk_size; cost = max_chunk_size; part_len = max_chunk_size; } if (start_part_num == 0) { - len_to_read -= diff_ofs; - id += diff_ofs; - read_ofs = diff_ofs; + len_to_read -= start_diff_ofs; + id += start_diff_ofs; + read_ofs = start_diff_ofs; } block.blockID = adjusted_start_ofs; @@ -1972,7 +1984,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ceph::bufferlist bl; std::string oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(part_len)); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << " " << __LINE__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; int ret; @@ -2031,143 +2043,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // TODO: Retrieve remotely // Policy decision: should we cache remote blocks locally? } - } else if (ret == -ENOENT) { // end - if ((ret = source->driver->get_block_dir()->get - block.blockID = adjusted_start_ofs; - uint64_t obj_size = source->get_size(), chunk_size = 0; - if (obj_size < max_chunk_size) { - chunk_size = obj_size; - } else { - chunk_size = max_chunk_size; - } - block.size = chunk_size; - - if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { - auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); - - if (it != block.cacheObj.hostsList.end()) { /* Local copy */ - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl; - - if (block.version == version) { - oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(chunk_size)); - - //for range requests, for last part, the whole part might exist in the cache - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << - " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - - if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::INCR, y) > 0) { - // Read From Cache - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: oid_in_cache=" << oid_in_cache << dendl; - - auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, chunk_size))); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; - auto r = flush(dpp, std::move(completed), y); - if (r < 0) { - drain(dpp, y); - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl; - return r; - } - } else { // end - if ((part_len != chunk_size) && update_refcount_if_key_exists - int r = -1; - if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl; - - if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */ - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl; - // TODO: Retrieve remotely - // Policy decision: should we cache remote blocks locally? - } else { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - - auto r = drain(dpp, y); - if (r < 0) { - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; - return r; - } - break; - } - } - } else if (block.cacheObj.hostsList.size()) { /* Remote copy */ - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl; - // TODO: Retrieve remotely - // Policy decision: should we cache remote blocks locally? - } - } else { // end - if (block.version == version) - // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - auto r = drain(dpp, y); - if (r < 0) { - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; - return r; - } - break; - } - } else if (ret == -ENOENT) { // end - if source->driver->get_block_dir()->get(dpp, &block, y)) - block.blockID = adjusted_start_ofs; - uint64_t last_part_size = source->get_size() - adjusted_start_ofs; - block.size = last_part_size; - if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { - auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); - if (it != block.cacheObj.hostsList.end()) { /* Local copy */ - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl; - if (block.version == version) { - oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(last_part_size)); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << - " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if ((part_len != last_part_size) && source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::INCR, y) > 0) { - // Read From Cache - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl; - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: oid_in_cache=" << oid_in_cache << dendl; - auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, last_part_size))); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; - auto r = flush(dpp, std::move(completed), y); - if (r < 0) { - drain(dpp, y); - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl; - return r; - } - } else { // if get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists - int r = -1; - if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl; - if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */ - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl; - // TODO: Retrieve remotely - // Policy decision: should we cache remote blocks locally? - } else { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - auto r = drain(dpp, y); - if (r < 0) { - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; - return r; - } - break; - } - } - } else {// end - if (block.version == version) - //TODO: return retry error - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - auto r = drain(dpp, y); - if (r < 0) { - ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; - return r; - } - break; - } - } else if (block.cacheObj.hostsList.size()) { - //TODO: get remote copy - } - } else if (ret == -ENOENT) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - auto r = drain(dpp, y); - if (r < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; - return r; - } - break; - } - } } else { // else if (ret == -ENOENT) if (ret < 0) ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl; @@ -2188,7 +2063,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int } start_part_num += 1; - len -= max_chunk_size; + adjusted_len -= max_chunk_size; } while (start_part_num < num_parts); } ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl; @@ -2198,14 +2073,16 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int obj_attrs = source->get_attrs(); } - this->cb->set_ofs(diff_ofs); + this->cb->set_start_ofs(start_diff_ofs); + this->cb->set_len(len); this->cb->set_adjusted_start_ofs(adjusted_start_ofs); this->cb->set_part_num(start_part_num); + this->cb->set_num_parts(num_parts); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): adjusted_start_ofs: " << adjusted_start_ofs << " end: " << end << dendl; - auto r = next->iterate(dpp, adjusted_start_ofs, end, this->cb.get(), y); + auto r = next->iterate(dpp, adjusted_start_ofs, adjusted_end_ofs, this->cb.get(), y); //calculate the number of blocks read from backend store, and increment the perfcounter using that if(perfcounter) { - uint64_t len_to_read_from_store = ((end - adjusted_start_ofs) + 1); + uint64_t len_to_read_from_store = ((adjusted_end_ofs - adjusted_start_ofs) + 1); uint64_t num_blocks = (len_to_read_from_store%max_chunk_size) == 0 ? len_to_read_from_store/max_chunk_size : (len_to_read_from_store/max_chunk_size) + 1; perfcounter->inc(l_rgw_d4n_cache_misses, num_blocks); } @@ -2251,27 +2128,37 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part() int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { auto rgw_max_chunk_size = g_conf()->rgw_max_chunk_size; - ldpp_dout(dpp, 20) << __func__ << ": bl_ofs is: " << bl_ofs << " bl_len is: " << bl_len << " ofs is: " << ofs << " part_count: " << part_count << dendl; + ldpp_dout(dpp, 20) << __func__ << ": bl_ofs is: " << bl_ofs << " bl_len is: " << bl_len << " part_num: " << part_num << dendl; + ldpp_dout(dpp, 20) << __func__ << ": start_ofs is: " << start_ofs << " end_ofs is: " << end_ofs << " part_num: " << part_num << dendl; if (!last_part && bl.length() <= rgw_max_chunk_size) { if (client_cb) { int r = 0; //ranged request - if (bl_ofs != ofs && part_count == 0) { - if (ofs < bl_len) { // this can happen in case of multipart where each chunk returned is not always of size rgw_max_chunk_size - off_t bl_part_len = bl_len - ofs; + if (bl_ofs != start_ofs && part_num == 0) { + if (start_ofs < bl_len) { // this can happen in case of multipart where each chunk returned is not always of size rgw_max_chunk_size + off_t bl_part_len = bl_len - start_ofs; ldpp_dout(dpp, 20) << __func__ << ": bl_part_len is: " << bl_part_len << dendl; bufferlist bl_part; - bl.begin(ofs).copy(bl_part_len, bl_part); + bl.begin(start_ofs).copy(bl_part_len, bl_part); ldpp_dout(dpp, 20) << __func__ << ": bl_part.length() is: " << bl_part.length() << dendl; r = client_cb->handle_data(bl_part, 0, bl_part_len); - part_count += 1; + part_num += 1; + len_sent += bl_part_len; } else { - ofs = ofs - bl_len; //re-adjust the offset - ldpp_dout(dpp, 20) << __func__ << ": New value ofs is: " << ofs << dendl; + start_ofs = start_ofs - bl_len; //re-adjust the offset + ldpp_dout(dpp, 20) << __func__ << ": New value ofs is: " << start_ofs << dendl; } + } else if (part_num == (num_parts - 1) && (len_sent + bl_len) > len) { + uint64_t extra = (len_sent + bl_len) - len; + uint64_t len_to_send = bl_len - extra; + bufferlist bl_part; + bl.begin(bl_ofs).copy(len_to_send, bl_part); + ldpp_dout(dpp, 20) << __func__ << ": last part bl_part.length() is: " << bl_part.length() << dendl; + r = client_cb->handle_data(bl_part, 0, bl_part.length()); } else { r = client_cb->handle_data(bl, bl_ofs, bl_len); - part_count += 1; + part_num += 1; + len_sent += bl_len; } if (r < 0) { @@ -2290,6 +2177,8 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl rgw::d4n::CacheBlock block, existing_block, dest_block; rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); + auto policy = filter->get_policy_driver()->get_cache_policy(); + auto cache_driver = filter->get_cache_driver(); block.cacheObj.objName = source->get_key().get_oid(); block.cacheObj.bucketName = source->get_bucket()->get_bucket_id(); std::stringstream s; @@ -2317,16 +2206,16 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache std::string oid = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len)); - if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { + if (!policy->exist_key(oid)) { block.blockID = adjusted_start_ofs; block.size = bl.length(); - auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); + auto ret = policy->eviction(dpp, block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y); + ret = cache_driver->put(dpp, oid, bl, bl.length(), attrs, *y); if (ret == 0) { std::string objEtag = ""; - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); + policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); /* Store block in directory */ existing_block.blockID = block.blockID; @@ -2356,11 +2245,11 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string dest_oid = get_key_in_cache(dest_prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len)); dest_block.blockID = adjusted_start_ofs; dest_block.size = bl.length(); - auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y); + auto ret = policy->eviction(dpp, dest_block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y); + ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y); if (ret == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); + policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) { ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB:: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl; } @@ -2371,12 +2260,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string oid = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len)); block.blockID = adjusted_start_ofs; block.size = bl.length(); - if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { - auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); + if (!policy->exist_key(oid)) { + auto ret = policy->eviction(dpp, block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y); + ret = cache_driver->put(dpp, oid, bl, bl.length(), attrs, *y); if (ret == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); + policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); /* Store block in directory */ existing_block.blockID = block.blockID; @@ -2406,11 +2295,11 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string dest_oid = get_key_in_cache(dest_prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len)); dest_block.blockID = adjusted_start_ofs; dest_block.size = bl.length(); - auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y); + auto ret = policy->eviction(dpp, dest_block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y); + ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y); if (ret == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); + policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) { ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl; } @@ -2428,15 +2317,15 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (bl_rem.length() == rgw_max_chunk_size) { std::string oid = prefix + CACHE_DELIM + std::to_string(adjusted_start_ofs) + CACHE_DELIM + std::to_string(bl_rem.length()); - if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { + if (!policy->exist_key(oid)) { block.blockID = adjusted_start_ofs; block.size = bl_rem.length(); - auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); + auto ret = policy->eviction(dpp, block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y); + ret = cache_driver->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y); if (ret == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); + policy->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); /* Store block in directory */ existing_block.blockID = block.blockID; @@ -2469,11 +2358,11 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string dest_oid = dest_prefix + CACHE_DELIM + std::to_string(adjusted_start_ofs) + CACHE_DELIM + std::to_string(bl_rem.length()); dest_block.blockID = adjusted_start_ofs; dest_block.size = bl_rem.length(); - auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y); + auto ret = policy->eviction(dpp, dest_block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y); + ret = cache_driver->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y); if (ret == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); + policy->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) { ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 65b4f31733d..8511e03554d 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -147,14 +147,16 @@ class D4NFilterObject : public FilterObject { D4NFilterDriver* filter; D4NFilterObject* source; RGWGetDataCB* client_cb; - int64_t ofs = 0, len = 0; + int64_t start_ofs = 0, len = 0, end_ofs = 0; int64_t adjusted_start_ofs{0}; + int64_t adjusted_end_ofs{0}; bufferlist bl_rem; bool last_part{false}; bool write_to_cache{true}; const DoutPrefixProvider* dpp; optional_yield* y; - int part_count{0}; + int part_num{0}, num_parts{0}; + int len_sent = 0; public: D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter), @@ -166,9 +168,11 @@ class D4NFilterObject : public FilterObject { this->dpp = dpp; this->y = y; } - void set_ofs(uint64_t ofs) { this->ofs = ofs; } + void set_start_ofs(uint64_t ofs) { this->start_ofs = ofs; } + void set_len(uint64_t len) { this->len = len; } void set_adjusted_start_ofs(uint64_t adjusted_start_ofs) { this->adjusted_start_ofs = adjusted_start_ofs; } - void set_part_num(uint64_t part_num) { this->part_count = part_num; } + void set_part_num(uint64_t part_num) { this->part_num = part_num; } + void set_num_parts(uint64_t num_parts) { this->num_parts = num_parts; } int flush_last_part(); void bypass_cache_write() { this->write_to_cache = false; } }; diff --git a/src/test/rgw/test_d4n_filter.cc b/src/test/rgw/test_d4n_filter.cc index f43a8f56b20..7c9fca4843f 100755 --- a/src/test/rgw/test_d4n_filter.cc +++ b/src/test/rgw/test_d4n_filter.cc @@ -1537,7 +1537,7 @@ TEST_F(D4NFilterFixture, GetObjectWrite) Read_CB cb(&bl); std::unique_ptr read_op(obj->get_read_op()); EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); - EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, (ofs-1), &cb, optional_yield{yield}), 0); boost::system::error_code ec; request req; @@ -2216,7 +2216,7 @@ TEST_F(D4NFilterFixture, GetVersionedObjectWrite) Read_CB cb(&bl); std::unique_ptr read_op_enabled(objEnabled->get_read_op()); EXPECT_EQ(read_op_enabled->prepare(optional_yield{yield}, env->dpp), 0); - EXPECT_EQ(read_op_enabled->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + EXPECT_EQ(read_op_enabled->iterate(env->dpp, 0, (ofs - 1), &cb, optional_yield{yield}), 0); boost::system::error_code ec; request req; @@ -2285,7 +2285,7 @@ TEST_F(D4NFilterFixture, GetVersionedObjectWrite) Read_CB cb(&bl); std::unique_ptr read_op_suspended(objSuspended->get_read_op()); EXPECT_EQ(read_op_suspended->prepare(optional_yield{yield}, env->dpp), 0); - EXPECT_EQ(read_op_suspended->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + EXPECT_EQ(read_op_suspended->iterate(env->dpp, 0, (ofs - 1), &cb, optional_yield{yield}), 0); boost::system::error_code ec; request req; @@ -3189,7 +3189,6 @@ TEST_F(D4NFilterFixture, SimpleDeleteBeforeCleaning) std::unique_ptr read_op(objEnabled->get_read_op()); EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), -2); // Simple read; should return -ENOENT - EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), -2); /* TODO: std::string attr_val; @@ -3266,7 +3265,7 @@ TEST_F(D4NFilterFixture, VersionedDeleteBeforeCleaning) std::unique_ptr read_op(objEnabled->get_read_op()); EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); - EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, (ofs - 1), nullptr, optional_yield{yield}), 0); EXPECT_EQ(objEnabled->get_instance(), instances[0]); // Next latest version objEnabled->set_instance(instances[0]); @@ -3304,7 +3303,7 @@ TEST_F(D4NFilterFixture, VersionedDeleteBeforeCleaning) next->set_instance(""); std::unique_ptr read_op(next->get_read_op()); EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0); - EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0); + EXPECT_EQ(read_op->iterate(env->dpp, 0, (ofs - 1), &cb, optional_yield{yield}), 0); EXPECT_EQ(next->get_instance(), instances[0]); // Next latest version }