From 9076ccd6afc202ffd0ffe19081036f983a031651 Mon Sep 17 00:00:00 2001 From: Samarah Date: Fri, 12 Jan 2024 19:01:05 +0000 Subject: [PATCH] d4n/policy: Update local weight sum + age handling and d4n policy test Signed-off-by: Samarah --- src/common/options/rgw.yaml.in | 8 + src/rgw/driver/d4n/d4n_policy.cc | 307 ++++++++++++++---------------- src/rgw/driver/d4n/d4n_policy.h | 56 ++++-- src/rgw/driver/d4n/rgw_sal_d4n.cc | 7 +- src/rgw/driver/d4n/rgw_sal_d4n.h | 1 + src/test/rgw/test_d4n_policy.cc | 19 +- 6 files changed, 203 insertions(+), 195 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index bcce8389a5341..a3c11093fba48 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3666,6 +3666,14 @@ options: see_also: - rgw_thread_pool_size with_legacy: true +- name: rgw_lfuda_sync_frequency + type: int + level: advanced + desc: LFUDA variables' sync frequency in seconds + default: 60 + services: + - rgw + with_legacy: true - name: rgw_backend_store type: str level: advanced diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 5adba67db01c7..08386cf419ee2 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -1,6 +1,5 @@ #include "d4n_policy.h" -#include #include "../../../common/async/yield_context.h" #include "common/async/blocked_completion.h" #include "common/dout.h" @@ -17,8 +16,8 @@ struct initiate_exec { template void operator()(Handler handler, const boost::redis::request& req, Response& resp) { - auto h = boost::asio::consign(std::move(handler), conn); - return boost::asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] { + auto h = asio::consign(std::move(handler), conn); + return asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] { c->async_exec(req, resp, std::move(h)); }); } @@ -29,13 +28,16 @@ auto async_exec(std::shared_ptr conn, const boost::redis::request& req, Response& resp, CompletionToken&& token) { - return boost::asio::async_initiate( initiate_exec{std::move(conn)}, token, req, resp); } -template -void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response& resp, optional_yield y) +template +void redis_exec(std::shared_ptr conn, + boost::system::error_code& ec, + const boost::redis::request& req, + boost::redis::response& resp, optional_yield y) { if (y) { auto yield = y.get_yield_context(); @@ -45,133 +47,195 @@ void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, } } -int LFUDAPolicy::set_age(int age, optional_yield y) { +int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) { + dir->init(cct); + int result = 0; + response resp; + try { boost::system::error_code ec; - response resp; request req; - req.push("HSET", "lfuda", "age", std::to_string(age)); - + req.push("HEXISTS", "lfuda", "age"); + req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum)); /* New cache node will always have the minimum average weight */ + req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size())); + req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_local_cache_address); + redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - return std::get<0>(resp).value(); /* Returns number of fields set */ + result = std::min(std::get<1>(resp).value(), std::min(std::get<2>(resp).value(), std::get<3>(resp).value())); } catch (std::exception &e) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } -} - -int LFUDAPolicy::get_age(optional_yield y) { - response resp; - try { - boost::system::error_code ec; - request req; - req.push("HEXISTS", "lfuda", "age"); - - redis_exec(conn, ec, req, resp, y); + if (!std::get<0>(resp).value()) { /* Only set maximum age if it doesn't exist */ + try { + boost::system::error_code ec; + response value; + request req; + req.push("HSET", "lfuda", "age", age); + + redis_exec(conn, ec, req, value, y); + + if (ec) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } - if (ec) { - return -ec.value(); + result = std::min(result, std::get<0>(value).value()); + } catch (std::exception &e) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } - } catch (std::exception &e) { - return -EINVAL; } - if (!std::get<0>(resp).value()) { - if (set_age(1, y)) /* Initialize age */ - return 1; - else - return -ENOENT; - } + asio::co_spawn(io_context.get_executor(), + redis_sync(dpp, y), asio::detached); + + return result; +} + +int LFUDAPolicy::age_sync(const DoutPrefixProvider* dpp, optional_yield y) { + response resp; try { boost::system::error_code ec; - response value; request req; req.push("HGET", "lfuda", "age"); - redis_exec(conn, ec, req, value, y); + redis_exec(conn, ec, req, resp, y); if (ec) { return -ec.value(); } - - return std::stoi(std::get<0>(value).value()); } catch (std::exception &e) { return -EINVAL; } -} - -int LFUDAPolicy::set_local_weight_sum(int weight, optional_yield y) { - weight = weight > 0 ? weight : 0; - try { - boost::system::error_code ec; - response resp; - request req; - req.push("HSET", dir->cct->_conf->rgw_local_cache_address, "localWeights", std::to_string(weight)); + if (age > std::stoi(std::get<0>(resp).value()) || std::get<0>(resp).value().empty()) { /* Set new maximum age */ + try { + boost::system::error_code ec; + request req; + response value; + req.push("HSET", "lfuda", "age", age); + redis_exec(conn, ec, req, resp, y); - redis_exec(conn, ec, req, resp, y); + if (ec) { + return -ec.value(); + } - if (ec) { - return -ec.value(); + return std::get<0>(value).value(); + } catch (std::exception &e) { + return -EINVAL; } - - return std::get<0>(resp).value(); /* Returns number of fields set */ - } catch (std::exception &e) { - return -EINVAL; + } else { + age = std::stoi(std::get<0>(resp).value()); + return 0; } } -int LFUDAPolicy::get_local_weight_sum(optional_yield y) { - response resp; +int LFUDAPolicy::local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y) { + int result; - try { - boost::system::error_code ec; - request req; - req.push("HEXISTS", dir->cct->_conf->rgw_local_cache_address, "localWeights"); + if (fabs(weightSum - postedSum) > (postedSum * 0.1)) { + response resp; - redis_exec(conn, ec, req, resp, y); + try { + boost::system::error_code ec; + request req; + req.push("HGET", "lfuda", "minLocalWeights_sum"); + req.push("HGET", "lfuda", "minLocalWeights_size"); + + redis_exec(conn, ec, req, resp, y); - if (ec) { - return -ec.value(); + if (ec) { + return -ec.value(); + } + } catch (std::exception &e) { + return -EINVAL; } - } catch (std::exception &e) { - return -EINVAL; - } - - if (!std::get<0>(resp).value()) { - int sum = 0; - for (auto& entry : entries_map) - sum += entry.second->localWeight; - - if (int ret = set_local_weight_sum(sum, y) < 0) { /* Initialize */ - return ret; + + float minAvgWeight = std::stof(std::get<0>(resp).value()) / std::stof(std::get<1>(resp).value()); + + if ((static_cast(weightSum) / static_cast(entries_map.size())) < minAvgWeight) { /* Set new minimum weight */ + try { + boost::system::error_code ec; + request req; + response value; + req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum)); + req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size())); + req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_local_cache_address); + redis_exec(conn, ec, req, resp, y); + + if (ec) { + return -ec.value(); + } + + result = std::min(std::get<0>(value).value(), std::get<1>(value).value()); + result = std::min(result, std::get<2>(value).value()); + } catch (std::exception &e) { + return -EINVAL; + } } else { - return sum; + weightSum = std::stoi(std::get<0>(resp).value()); + postedSum = std::stoi(std::get<0>(resp).value()); } } - try { + try { /* Post update for local cache */ boost::system::error_code ec; - response value; request req; - req.push("HGET", dir->cct->_conf->rgw_local_cache_address, "localWeights"); - - redis_exec(conn, ec, req, value, y); + response resp; + req.push("HSET", dpp->get_cct()->_conf->rgw_local_cache_address, "avgLocalWeight_sum", std::to_string(weightSum)); + req.push("HSET", dpp->get_cct()->_conf->rgw_local_cache_address, "avgLocalWeight_size", std::to_string(entries_map.size())); + redis_exec(conn, ec, req, resp, y); if (ec) { return -ec.value(); } - return std::stoi(std::get<0>(value).value()); + result = std::min(std::get<0>(resp).value(), std::get<1>(resp).value()); } catch (std::exception &e) { return -EINVAL; } + + return result; +} + +asio::awaitable LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, optional_yield y) { + rthread_timer.emplace(co_await asio::this_coro::executor); + co_await asio::this_coro::throw_if_cancelled(true); + co_await asio::this_coro::reset_cancellation_state( + asio::enable_terminal_cancellation()); + + for (;;) try { + /* Update age */ + if (int ret = age_sync(dpp, y) < 0) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl; + } + + /* Update minimum local weight sum */ + if (int ret = local_weight_sync(dpp, y) < 0) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl; + } + + int interval = dpp->get_cct()->_conf->rgw_lfuda_sync_frequency; + rthread_timer->expires_after(std::chrono::seconds(interval)); + co_await rthread_timer->async_wait(asio::use_awaitable); + } catch (sys::system_error& e) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; + + if (e.code() == asio::error::operation_aborted) { + break; + } else { + continue; + } + } } CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optional_yield y) { @@ -205,47 +269,6 @@ int LFUDAPolicy::exist_key(std::string key) { return false; } -#if 0 -int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) { - response resp; - int age = get_age(y); - - if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */ - auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size)); - it->second->localWeight += age; - return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); - } else { - if (eviction(dpp, block->size, cacheDriver, y) < 0) - return -1; - - int exists = dir->exist_key(block, y); - if (exists > 0) { /* Remote copy */ - if (dir->get(block, y) < 0) { - return -1; - } else { - if (!block->hostsList.empty()) { - block->globalWeight += age; - - if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) { - return -1; - } else { - return 0; - } - } else { - return -1; - } - } - } else if (!exists) { /* No remote copy */ - // localWeight += age; - //return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); - return 0; - } else { - return -1; - } - } -} -#endif - int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) { uint64_t freeSpace = cacheDriver->get_free_space(dpp); @@ -266,13 +289,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional return -ENOENT; } - int avgWeight = get_local_weight_sum(y); - if (avgWeight < 0) { - delete victim; - return avgWeight; - } - - avgWeight /= entries_map.size(); + int avgWeight = weightSum / entries_map.size(); if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */ if (victim->globalWeight) { @@ -316,14 +333,9 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl; - int weight = (avgWeight * entries_map.size()) - it->second->localWeight; - if (int ret = set_local_weight_sum(weight, y) < 0) - return ret; + weightSum = (avgWeight * entries_map.size()) - it->second->localWeight; - int age = get_age(y); age = std::max(it->second->localWeight, age); - if (int ret = set_age(age, y) < 0) - return ret; erase(dpp, key, y); freeSpace = cacheDriver->get_free_space(dpp); @@ -336,17 +348,10 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64 { using handle_type = boost::heap::fibonacci_heap>>::handle_type; const std::lock_guard l(lfuda_lock); - - int age = get_age(y); - if (age < 0) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): LFUDAPolicy get_age method failed." << dendl; - return; - } int localWeight = age; auto entry = find_entry(key); if (entry != nullptr) { - entry->localWeight += age; - localWeight = entry->localWeight; + localWeight = entry->localWeight + age; } erase(dpp, key, y); @@ -359,15 +364,7 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64 if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl; - int localWeights = get_local_weight_sum(y); - if (localWeights < 0) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl; - return; - } - - localWeights += ((localWeight < 0) ? 0 : localWeight); - if (set_local_weight_sum(localWeights, y) < 0) - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl; + weightSum += ((localWeight < 0) ? 0 : localWeight); } bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) @@ -377,17 +374,7 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o return false; } - int localWeights = get_local_weight_sum(y); - if (localWeights < 0) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl; - return false; - } - - localWeights -= ((p->second->localWeight < 0) ? 0 : p->second->localWeight); - if (set_local_weight_sum(localWeights, y) < 0) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl; - return false; - } + weightSum -= ((p->second->localWeight < 0) ? 0 : p->second->localWeight); entries_heap.erase(p->second->handle); entries_map.erase(p); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 60d55128ab0b3..3740f5f047265 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -1,6 +1,11 @@ #pragma once +#include +#include +#include #include +#include + #include "d4n_directory.h" #include "rgw_sal_d4n.h" #include "rgw_cache_driver.h" @@ -11,6 +16,9 @@ namespace rgw::sal { namespace rgw { namespace d4n { +namespace asio = boost::asio; +namespace sys = boost::system; + class CachePolicy { protected: struct Entry : public boost::intrusive::list_base_hook<> { @@ -33,7 +41,7 @@ class CachePolicy { CachePolicy() {} virtual ~CachePolicy() = default; - virtual void init(CephContext *cct) = 0; + virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) = 0; virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0; @@ -63,45 +71,51 @@ class LFUDAPolicy : public CachePolicy { using Heap = boost::heap::fibonacci_heap>>; Heap entries_heap; std::unordered_map entries_map; - std::mutex lfuda_lock; + std::mutex lfuda_lock; + int age = 1, weightSum = 0, postedSum = 0; + optional_yield y = null_yield; std::shared_ptr conn; BlockDirectory* dir; rgw::cache::CacheDriver* cacheDriver; + std::optional rthread_timer; - int set_age(int age, optional_yield y); - int get_age(optional_yield y); - int set_local_weight_sum(int weight, optional_yield y); - int get_local_weight_sum(optional_yield y); CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y); + int age_sync(const DoutPrefixProvider* dpp, optional_yield y); + int local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y); + asio::awaitable redis_sync(const DoutPrefixProvider* dpp, optional_yield y); + void rthread_stop() { + std::lock_guard l{lfuda_lock}; + + if (rthread_timer) { + rthread_timer->cancel(); + } + } + LFUDAEntry* find_entry(std::string key) { + auto it = entries_map.find(key); + if (it == entries_map.end()) + return nullptr; + return it->second; + } public: LFUDAPolicy(std::shared_ptr& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), - conn(conn), - cacheDriver(cacheDriver) + conn(conn), + cacheDriver(cacheDriver) { dir = new BlockDirectory{conn}; } ~LFUDAPolicy() { + rthread_stop(); delete dir; } - virtual void init(CephContext *cct) { - dir->init(cct); - } + virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context); virtual int exist_key(std::string key) override; - //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; - - void set_local_weight(std::string& key, int localWeight); - LFUDAEntry* find_entry(std::string key) { - auto it = entries_map.find(key); - if (it == entries_map.end()) - return nullptr; - return it->second; - } + void save_y(optional_yield y) { this->y = y; } }; class LRUPolicy : public CachePolicy { @@ -118,7 +132,7 @@ class LRUPolicy : public CachePolicy { public: LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {} - virtual void init(CephContext *cct) {} + virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) { return 0; } virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 691d589bf2ae5..39dbd5772d95e 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -33,7 +33,8 @@ static inline Object* nextObject(Object* t) return dynamic_cast(t)->get_next(); } -D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next) +D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next), + io_context(io_context) { conn = std::make_shared(boost::asio::make_strand(io_context)); @@ -83,7 +84,7 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) cacheDriver->initialize(dpp); objDir->init(cct); blockDir->init(cct); - policyDriver->get_cache_policy()->init(cct); + policyDriver->get_cache_policy()->init(cct, dpp, io_context); return 0; } @@ -985,8 +986,6 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, } baseAttrs.insert(attrs.begin(), attrs.end()); - - // is the accounted_size equivalent to the length? -Sam //bufferlist bl_empty; //int putReturn = driver->get_cache_driver()-> diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 0893371a190bc..46fcfb5c1ad1b 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -47,6 +47,7 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::ObjectDirectory* objDir; rgw::d4n::BlockDirectory* blockDir; rgw::d4n::PolicyDriver* policyDriver; + boost::asio::io_context& io_context; public: D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context); diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index b94fa426ef2af..78fead5732d73 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -5,6 +5,7 @@ #include "gtest/gtest.h" #include "gtest/gtest_prod.h" #include "common/ceph_argparse.h" +#include "common/async/blocked_completion.h" #include "rgw_auth_registry.h" #include "driver/d4n/d4n_policy.h" @@ -60,7 +61,7 @@ class LFUDAPolicyFixture : public ::testing::Test { .hostsList = { env->redisHost } }; - conn = std::make_shared(boost::asio::make_strand(io)); + conn = std::make_shared(net::make_strand(io)); rgw::cache::Partition partition_info{ .location = "RedisCache", .size = 1000 }; cacheDriver = new rgw::cache::RedisDriver{io, partition_info}; policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda"); @@ -73,7 +74,6 @@ class LFUDAPolicyFixture : public ::testing::Test { dir->init(env->cct); cacheDriver->initialize(env->dpp); - policyDriver->get_cache_policy()->init(env->cct); bl.append("test data"); bufferlist attrVal; @@ -100,13 +100,12 @@ class LFUDAPolicyFixture : public ::testing::Test { } int lfuda(const DoutPrefixProvider* dpp, rgw::d4n::CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) { - int age = 5; /* Arbitrary number for testing */ + int age = 1; std::string oid = build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size); if (this->policyDriver->get_cache_policy()->exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */ - auto entry = dynamic_cast(this->policyDriver->get_cache_policy())->find_entry(oid); - entry->localWeight += age; - return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(entry->localWeight), y); + policyDriver->get_cache_policy()->update(env->dpp, oid, 0, bl.length(), "", y); + return 0; } else { if (this->policyDriver->get_cache_policy()->eviction(dpp, block->size, y) < 0) return -1; @@ -162,13 +161,13 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield})); policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield}); - ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); + ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); cacheDriver->shutdown(); boost::system::error_code ec; request req; - req.push("HGET", "RedisCache/testBucket_testName_0_0", "localWeight"); + req.push("HGET", "RedisCache/testBucket_testName_0_0", "user.rgw.localWeight"); req.push("FLUSHALL"); response resp; @@ -176,7 +175,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) conn->async_exec(req, resp, yield[ec]); ASSERT_EQ((bool)ec, false); - EXPECT_EQ(std::get<0>(resp).value(), "6"); + EXPECT_EQ(std::get<0>(resp).value(), "2"); conn->cancel(); }); @@ -243,7 +242,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) ASSERT_EQ((bool)ec, false); EXPECT_EQ(std::get<0>(resp).value(), 0); EXPECT_EQ(std::get<1>(resp).value(), 0); - EXPECT_EQ(std::get<2>(resp).value(), "5"); + EXPECT_EQ(std::get<2>(resp).value(), "1"); conn->cancel(); }); -- 2.39.5