From 5df4a668dbe441cf6457e985a25b2bf0aacc4f3e Mon Sep 17 00:00:00 2001 From: Samarah Date: Fri, 12 Jan 2024 19:01:05 +0000 Subject: [PATCH] rgw/d4n: Perform minor cleanup; add error checking for `localWeights` and fix order of entry deletion in policy Signed-off-by: Samarah --- src/rgw/driver/d4n/d4n_directory.cc | 4 +- src/rgw/driver/d4n/d4n_policy.cc | 20 +++++++-- src/rgw/driver/d4n/rgw_sal_d4n.cc | 69 +++++++++++++++-------------- 3 files changed, 54 insertions(+), 39 deletions(-) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index abffcd16d86..a3dbc0b3bf9 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -61,7 +61,7 @@ int ObjectDirectory::exist_key(CacheObj* object, optional_yield y) redis_exec(conn, ec, req, resp, y); - if ((bool)ec) + if (ec) return false; } catch (std::exception &e) {} @@ -319,7 +319,7 @@ int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) redis_exec(conn, ec, req, resp, y); - if ((bool)ec) + if (ec) return false; } catch (std::exception &e) {} diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 8de8bcb87cf..b2d4a725cac 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -357,7 +357,12 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64 if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl; - auto localWeights = get_local_weight_sum(y); + int localWeights = get_local_weight_sum(y); + if (localWeights < 0) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl; + return; + } + localWeights += ((localWeight < 0) ? 0 : localWeight); if (set_local_weight_sum(localWeights, y) < 0) ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl; @@ -370,13 +375,20 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o return false; } - auto localWeights = get_local_weight_sum(y); + int localWeights = get_local_weight_sum(y); + if (localWeights < 0) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl; + return false; + } + localWeights -= ((p->second->localWeight < 0) ? 0 : p->second->localWeight); - if (set_local_weight_sum(localWeights, y) < 0) + if (set_local_weight_sum(localWeights, y) < 0) { ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl; + return false; + } - entries_map.erase(p); entries_heap.erase(p->second->handle); + entries_map.erase(p); return true; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 12946ab430b..02123a26598 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -49,12 +49,15 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda"); } - D4NFilterDriver::~D4NFilterDriver() - { - delete cacheDriver; - delete objDir; - delete blockDir; - delete policyDriver; +D4NFilterDriver::~D4NFilterDriver() +{ + // call cancel() on the connection's executor + boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); + + delete cacheDriver; + delete objDir; + delete blockDir; + delete policyDriver; } int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) @@ -515,8 +518,8 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid(); } - ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "prefix: " << prefix << dendl; - ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "prefix: " << prefix << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl; this->client_cb = cb; this->cb->set_client_cb(cb, dpp, &y); @@ -597,7 +600,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; @@ -716,15 +719,15 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (ret == 0) { filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y); - /* Store block in directory */ + /* Store block in directory */ if (!blockDir->exist_key(&block, *y)) { if (blockDir->set(&block, *y) < 0) //should we revert previous steps if this step fails? - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { existing_block.blockID = block.blockID; existing_block.size = block.size; if (blockDir->get(&existing_block, *y) < 0) { - ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; + ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; } else { if (existing_block.version != block.version) { if (blockDir->del(&existing_block, *y) < 0) //delete existing block @@ -739,7 +742,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } } else { if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; } } } @@ -759,12 +762,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl /* Store block in directory */ if (!blockDir->exist_key(&block, *y)) { if (blockDir->set(&block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { existing_block.blockID = block.blockID; existing_block.size = block.size; if (blockDir->get(&existing_block, *y) < 0) { - ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; + ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; } if (existing_block.version != block.version) { if (blockDir->del(&existing_block, *y) < 0) @@ -778,7 +781,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } } else { if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; } } } @@ -807,23 +810,23 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (!blockDir->exist_key(&block, *y)) { if (blockDir->set(&block, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - existing_block.blockID = block.blockID; - existing_block.size = block.size; - if (blockDir->get(&existing_block, *y) < 0) { - ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; - } else { - if (existing_block.version != block.version) { - if (blockDir->del(&existing_block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(&block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; - } - } - } + } else { + existing_block.blockID = block.blockID; + existing_block.size = block.size; + if (blockDir->get(&existing_block, *y) < 0) { + ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; + } else { + if (existing_block.version != block.version) { + if (blockDir->del(&existing_block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; + } + } + } } else { ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured while caching oid: " << oid << " error: " << ret << dendl; } -- 2.39.5