From 54a5adae3370f76f6eb97620c3eb3da325e01d06 Mon Sep 17 00:00:00 2001 From: Samarah Date: Tue, 5 Sep 2023 16:53:32 +0000 Subject: [PATCH] d4n/policy: this commit squashes the following changes to policy and filter driver files. d4n/policy: Update policy with correct directory usage; create LFUDAEntry struct + entries map; add `optional_yield` to `insert` method d4n/policy: Add `version` to Entry struct/policy methods and calls in the D4N filter; add error checking and fix logic in LFUDA code Signed-off-by: Samarah --- src/rgw/driver/d4n/d4n_policy.cc | 303 ++++++++++++------------------ src/rgw/driver/d4n/d4n_policy.h | 45 +++-- src/rgw/driver/d4n/rgw_sal_d4n.cc | 23 ++- src/rgw/driver/d4n/rgw_sal_d4n.h | 4 +- 4 files changed, 160 insertions(+), 215 deletions(-) diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 987c7d39458..3981fb0ab41 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -1,5 +1,7 @@ -#include "../../../common/async/yield_context.h" #include "d4n_policy.h" + +#include +#include "../../../common/async/yield_context.h" #include "common/async/blocked_completion.h" namespace rgw { namespace d4n { @@ -75,7 +77,7 @@ int LFUDAPolicy::get_age(optional_yield y) { } if (!std::get<0>(resp).value()) { - if (!set_age(0, y)) /* Initialize age */ + if (set_age(0, y)) /* Initialize age */ return 0; else return -1; @@ -98,42 +100,6 @@ int LFUDAPolicy::get_age(optional_yield y) { } } -int LFUDAPolicy::set_global_weight(std::string key, int weight, optional_yield y) { - try { - boost::system::error_code ec; - response resp; - request req; - req.push("HSET", key, "globalWeight", std::to_string(weight)); - - redis_exec(conn, ec, req, resp, y); - - if (ec) - return {}; - - return std::get<0>(resp).value(); /* Returns number of fields set */ - } catch(std::exception &e) { - return -1; - } -} - -int LFUDAPolicy::get_global_weight(std::string key, optional_yield y) { - try { - boost::system::error_code ec; - response resp; - request req; - req.push("HGET", key, "globalWeight"); - - redis_exec(conn, ec, req, resp, y); - - if (ec) - return -1; - - return std::stoi(std::get<0>(resp).value()); - } catch(std::exception &e) { - return -1; - } -} - int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y) { try { boost::system::error_code ec; @@ -153,7 +119,7 @@ int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, op boost::system::error_code ec; response resp; request req; - req.push("HSET", "lfuda", "minAvgWeight:weight", cacheLocation); + req.push("HSET", "lfuda", "minAvgWeight:weight", boost::lexical_cast(weight)); redis_exec(conn, ec, req, resp, y); @@ -183,8 +149,9 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) { } if (!std::get<0>(resp).value()) { - if (!set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */, y)) { /* Initialize minimum average weight */ - return INT_MAX; + // Is int_max what we want here? -Sam + if (set_min_avg_weight(0, ""/* local cache location or keep empty? */, y)) { /* Initialize minimum average weight */ + return 0; } else { return -1; } @@ -208,13 +175,11 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) { } CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { - #if 0 - std::vector entries = cacheNode->list_entries(dpp); - std::string victimName; - int minWeight = INT_MAX; + if (entries_map.empty()) + return {}; - for (auto it = entries.begin(); it != entries.end(); ++it) { - std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight", y); // should represent block -Sam + auto it = std::min_element(std::begin(entries_map), std::end(entries_map), + [](const auto& l, const auto& r) { return l.second->localWeight < r.second->localWeight; }); /* Get victim cache block */ CacheBlock victim; @@ -222,105 +187,76 @@ CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::C victim.cacheObj.bucketName = cacheNode->get_attr(dpp, victim.cacheObj.objName, "bucket_name", y); // generalize for other cache backends -Sam victim.blockID = 0; // find way to get ID -Sam - if (ret < 0) - return {}; - } else if (std::stoi(localWeightStr) < minWeight) { - minWeight = std::stoi(localWeightStr); - victimName = it->key; - } + if (dir->get(&victim, y) < 0) { + return {}; } - /* Get victim cache block */ - CacheBlock victimBlock; - victimBlock.cacheObj.objName = victimName; - BlockDirectory blockDir(io); - blockDir.init(cct, dpp); - - int ret = blockDir.get(&victimBlock, y); + return victim; +} - if (ret < 0) - return {}; - #endif - CacheBlock victimBlock; - return victimBlock; +void LFUDAPolicy::shutdown() { + dir->shutdown(); + + // call cancel() on the connection's executor + boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); } int LFUDAPolicy::exist_key(std::string key, optional_yield y) { - response resp; - - try { - boost::system::error_code ec; - request req; - req.push("EXISTS", key); - - redis_exec(conn, ec, req, resp, y); - - if (ec) - return false; - } catch(std::exception &e) {} + if (entries_map.count(key) != 0) { + return true; + } - return std::get<0>(resp).value(); + return false; } int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) { - std::string key = "rgw-object:" + block->cacheObj.objName + ":directory"; - std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight", y); // change to block name eventually -Sam - int localWeight = -1; response resp; - - if (localWeightStr.empty()) { // figure out where to set local weight -Sam - int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age(y)), y); - localWeight = get_age(y); - - if (ret < 0) - return -1; - } else { - localWeight = std::stoi(localWeightStr); - } - int age = get_age(y); - if (exist_key(key, y)) { /* Local copy */ - localWeight += age; + if (exist_key(block->cacheObj.objName, y)) { /* Local copy */ + auto it = entries_map.find(block->cacheObj.objName); // change to block name eventually -Sam + it->second->localWeight += age; + return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); } else { uint64_t freeSpace = cacheNode->get_free_space(dpp); - while (freeSpace < block->size) /* Not enough space in local cache */ - freeSpace += eviction(dpp, cacheNode, y); - - if (exist_key(key, y)) { /* Remote copy */ - try { - boost::system::error_code ec; - request req; - req.push("HGET", key, "blockHosts"); - - redis_exec(conn, ec, req, resp, y); - - if (ec) - return -1; - } catch(std::exception &e) { - return -1; - } - } else { - return -2; + while (freeSpace < block->size) { /* Not enough space in local cache */ + if (int ret = eviction(dpp, cacheNode, y) > 0) + freeSpace += ret; + else + return -1; } - // should not hold local cache IP if in this else statement -Sam - if (std::get<0>(resp).value().length() > 0) { /* Remote copy */ - int globalWeight = get_global_weight(key, y); - globalWeight += age; - - if (set_global_weight(key, globalWeight, y)) + std::string key; // = dir->build_index(block); + int exists = dir->exist_key(key, y); + if (exists > 0) { /* Remote copy */ + if (dir->get(block, y) < 0) { return -1; - } else { /* No remote copy */ + } else { + if (!block->hostsList.empty()) { + block->globalWeight += age; + + if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) { + return -1; + } else { + return 0; + } + } else { + return -1; + } + } + } else if (!exists) { /* No remote copy */ + // how to get bufferlist data? -Sam // do I need to add the block to the local cache here? -Sam - // update hosts list for block as well? check read workflow -Sam - localWeight += age; - return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(localWeight), y); + // update hosts list for block as well? + // insert entry here? -Sam + // localWeight += age; + //return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); + return 0; + } else { + return -1; } - } - - return 0; + } } uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { @@ -328,96 +264,88 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD if (victim.cacheObj.objName.empty()) { ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl; - return -1; + return 0; /* Return zero for failure */ } - std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory"; - int globalWeight = get_global_weight(key, y); - int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight", y)); // change to block name eventually -Sam - int avgWeight = get_min_avg_weight(y); - response resp; - - if (exist_key(key, y)) { - try { - boost::system::error_code ec; - request req; - req.push("HGET", key, "blockHosts"); - - redis_exec(conn, ec, req, resp, y); - - if (ec) - return -1; - } catch(std::exception &e) { - return -1; - } - } else { - return -2; + auto it = entries_map.find(victim.cacheObj.objName); // change to block name eventually -Sam + if (it == entries_map.end()) { + return 0; } - if (std::get<0>(resp).value().empty()) { /* Last copy */ - if (globalWeight > 0) { - localWeight += globalWeight; - int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight), y); + int avgWeight = get_min_avg_weight(y); + if (avgWeight < 0) { + return 0; + } - if (!ret) - ret = set_global_weight(key, 0, y); - else - return -1; + if (victim.hostsList.size() == 1 && victim.hostsList[0] == "127.0.0.1:6379" /* local cache address */) { /* Last copy */ + if (victim.globalWeight) { + it->second->localWeight += victim.globalWeight; + if (cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y) < 0) { + return 0; + } - if (ret) - return -1; + victim.globalWeight = 0; + if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { + return 0; + } } - if (avgWeight < 0) - return -1; - - if (localWeight > avgWeight) { - // push block to remote cache + if (it->second->localWeight > avgWeight) { + // TODO: push victim block to remote cache } } - globalWeight += localWeight; - - if (set_global_weight(key, globalWeight, y)) - return -2; + victim.globalWeight += it->second->localWeight; + if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { // just have one update? -Sam + return 0; + } ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl; - int ret = cacheNode->del(dpp, victim.cacheObj.objName, y); - if (!ret) { - //ret = set_min_avg_weight(avgWeight - (localWeight/entries_map.size()), ""/*local cache location*/, y); // Where else must this be set? -Sam + if (cacheNode->del(dpp, victim.cacheObj.objName, y) < 0) {//} && dir->remove_host(&victim, ""/* local cache address */, y) < 0) { + return 0; + } else { + uint64_t num_entries = entries_map.size(); - if (!ret) { - int age = get_age(y); - age = std::max(localWeight, age); - ret = set_age(age, y); - - if (ret) - return -1; + if (!avgWeight) { + if (set_min_avg_weight(0, ""/*local cache location*/, y) < 0) // Where else must this be set? -Sam + return 0; } else { - return -1; + if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), ""/*local cache location*/, y) < 0) { // Where else must this be set? -Sam + return 0; + } + int age = get_age(y); + age = std::max(it->second->localWeight, age); + if (set_age(age, y) < 0) + return 0; } - } else { - return -1; } - return victim.size; + return victim.size; // this doesn't account for the additional attributes that were removed and need to be set with the new block -Sam } void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) { + erase(dpp, key); + int age = get_age(y); + assert(age > -1); + + LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, age); + entries_lfuda_list.push_back(*e); + entries_map.emplace(key, e); } bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) { - return false; -} + auto p = entries_map.find(key); + if (p == entries_map.end()) { + return false; + } -void LFUDAPolicy::shutdown() -{ - // call cancel() on the connection's executor - boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); + entries_map.erase(p); + entries_lfuda_list.erase_and_dispose(entries_lfuda_list.iterator_to(*(p->second)), LFUDA_Entry_delete_disposer()); + return true; } int LRUPolicy::exist_key(std::string key, optional_yield y) @@ -452,7 +380,7 @@ void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t { erase(dpp, key); - Entry *e = new Entry(key, offset, len, ""); // update version later -Sam + Entry *e = new Entry(key, offset, len, version); entries_lru_list.push_back(*e); entries_map.emplace(key, e); } @@ -477,6 +405,7 @@ int PolicyDriver::init() { cachePolicy = new LRUPolicy(); return 0; } + return -1; } diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 9e2faed25c8..f5ddfcc7222 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -18,8 +18,8 @@ class CachePolicy { uint64_t offset; uint64_t len; std::string version; - Entry(std::string& key, uint64_t offset, uint64_t len, std:: string version) : key(key), offset(offset), - len(len), version(version) {} + Entry(std::string& key, uint64_t offset, uint64_t len, std::string version) : key(key), offset(offset), + len(len), version(version) {} }; //The disposer object function @@ -28,10 +28,6 @@ class CachePolicy { delete e; } }; - typedef boost::intrusive::list List; - - //cpp_redis::client client; - //Address addr; public: CephContext* cct; @@ -53,26 +49,42 @@ class CachePolicy { class LFUDAPolicy : public CachePolicy { private: + struct LFUDAEntry : public Entry { + int localWeight; + LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string version, int localWeight) : Entry(key, offset, len, version), + localWeight(localWeight) {} + }; + + struct LFUDA_Entry_delete_disposer : public Entry_delete_disposer { + void operator()(LFUDAEntry *e) { + delete e; + } + }; + typedef boost::intrusive::list List; + + std::unordered_map entries_map; + net::io_context& io; std::shared_ptr conn; + List entries_lfuda_list; + BlockDirectory* dir; + + int set_age(int age, optional_yield y); + int get_age(optional_yield y); + int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y); + int get_min_avg_weight(optional_yield y); + CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y); public: LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) { conn = std::make_shared(boost::asio::make_strand(io_context)); + dir = new BlockDirectory{io}; } ~LFUDAPolicy() { //delete dir; shutdown(); } - int set_age(int age, optional_yield y); - int get_age(optional_yield y); - int set_global_weight(std::string key, int weight, optional_yield y); - int get_global_weight(std::string key, optional_yield y); - int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y); - int get_min_avg_weight(optional_yield y); - CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y); - virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { this->cct = cct; @@ -85,6 +97,7 @@ class LFUDAPolicy : public CachePolicy { return -EDESTADDRREQ; } + dir->init(cct, dpp); conn->async_run(cfg, {}, net::consign(net::detached, conn)); return 0; @@ -99,9 +112,11 @@ class LFUDAPolicy : public CachePolicy { class LRUPolicy : public CachePolicy { private: - List entries_lru_list; + typedef boost::intrusive::list List; + std::unordered_map entries_map; std::mutex lru_lock; + List entries_lru_list; public: LRUPolicy() = default; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 430bf0c0c8a..6f90f34c1db 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -51,7 +51,6 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont blockDir = new rgw::d4n::BlockDirectory(io_context); cacheBlock = new rgw::d4n::CacheBlock(); policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda"); - lruPolicyDriver = new rgw::d4n::PolicyDriver(io_context, "lru"); } D4NFilterDriver::~D4NFilterDriver() @@ -61,7 +60,6 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont delete blockDir; delete cacheBlock; delete policyDriver; - delete lruPolicyDriver; } int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) @@ -76,9 +74,6 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) policyDriver->init(); policyDriver->get_cache_policy()->init(cct, dpp); - lruPolicyDriver->init(); - lruPolicyDriver->get_cache_policy()->init(cct, dpp); - return 0; } @@ -299,6 +294,9 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d } else if (it->first == "max_buckets") { user->set_max_buckets(std::stoull(it->second.c_str())); attrs.erase(it->first); + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Unexpected attribute; not locally set." << dendl; + attrs.erase(it->first); } } } @@ -450,6 +448,9 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix } else if (it->first == "max_buckets") { user->set_max_buckets(std::stoull(it->second.c_str())); attrs.erase(it->first); + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Unexpected attribute; not locally set." << dendl; + attrs.erase(it->first); } } user->set_info(quota_info); @@ -803,14 +804,17 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl const std::lock_guard l(d4n_get_data_lock); rgw::d4n::CacheBlock block; rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); - //block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port)); - block.cacheObj.bucketName = source->get_bucket()->get_name(); + block.version = ""; + block.hostsList.push_back("127.0.0.1:6379" /*current cache addr*/); block.cacheObj.objName = source->get_key().get_oid(); + block.cacheObj.bucketName = source->get_bucket()->get_name(); + block.cacheObj.creationTime = 0; + block.cacheObj.dirty = false; + block.cacheObj.hostsList.push_back("127.0.0.1:6379" /*current cache addr*/); if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); - block.blockID = 0; // TODO: fill out block correctly - block.version = ""; + block.blockID = ofs; // TODO: fill out block correctly block.size = bl.length(); block.blockID = ofs; uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); @@ -836,7 +840,6 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); ofs += bl_len; block.blockID = ofs; - block.version = ""; block.size = bl.length(); uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); while(freeSpace < block.size) { diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index c30d4d59d2e..601f2f545ba 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -45,7 +45,6 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::BlockDirectory* blockDir; rgw::d4n::CacheBlock* cacheBlock; rgw::d4n::PolicyDriver* policyDriver; - rgw::d4n::PolicyDriver* lruPolicyDriver; public: D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context); @@ -66,8 +65,7 @@ class D4NFilterDriver : public FilterDriver { rgw::cache::CacheDriver* get_cache_driver() { return cacheDriver; } rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir; } rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; } - rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; } - rgw::d4n::PolicyDriver* get_policy_driver() { return lruPolicyDriver; } + rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; } }; class D4NFilterUser : public FilterUser { -- 2.39.5