From: Samarah Date: Thu, 19 Oct 2023 15:34:20 +0000 (+0000) Subject: rgw/d4n: this commit squashes the following two commits. X-Git-Tag: testing/wip-batrick-testing-20240411.154038~45^2~44 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=ce71823379dccf95c9d22881ce464b816c1edbbc;p=ceph-ci.git rgw/d4n: this commit squashes the following two commits. rgw/d4n: Remove `get_block` from policy and add logic to D4N filter d4n/policy: Create a min heap of the local weights Signed-off-by: Samarah --- diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 48ff4543fe2..71ba05115b3 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -6,10 +6,6 @@ namespace rgw { namespace d4n { -std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) { - return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size); -} - // initiate a call to async_exec() on the connection's executor struct initiate_exec { std::shared_ptr conn; @@ -178,21 +174,18 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) { } CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) { - if (entries_map.empty()) + if (entries_heap.empty()) return {}; - auto it = std::min_element(std::begin(entries_map), std::end(entries_map), - [](const auto& l, const auto& r) { return l.second->localWeight < r.second->localWeight; }); - /* Get victim cache block */ - std::string key = it->second->key; + std::string key = entries_heap.top()->key; CacheBlock victim; victim.cacheObj.bucketName = key.substr(0, key.find('_')); key.erase(0, key.find('_') + 1); victim.cacheObj.objName = key.substr(0, key.find('_')); - victim.blockID = it->second->offset; - victim.size = it->second->len; + victim.blockID = entries_heap.top()->offset; + victim.size = entries_heap.top()->len; if (dir->get(&victim, y) < 0) { return {}; @@ -216,23 +209,18 @@ int LFUDAPolicy::exist_key(std::string key) { return false; } -int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) { +#if 0 +int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) { response resp; int age = get_age(y); if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */ auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size)); it->second->localWeight += age; - return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); + return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); } else { - uint64_t freeSpace = cacheNode->get_free_space(dpp); - - while (freeSpace < block->size) { /* Not enough space in local cache */ - if (int ret = eviction(dpp, cacheNode, y) > 0) - freeSpace += ret; - else - return -1; - } + if (eviction(dpp, block->size, cacheDriver, y) < 0) + return -1; // what if eviction turns into infinite loop? -Sam int exists = dir->exist_key(block, y); if (exists > 0) { /* Remote copy */ @@ -252,107 +240,141 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw } } } else if (!exists) { /* No remote copy */ - // how to get bufferlist data? -Sam - // do I need to add the block to the local cache here? -Sam - // update hosts list for block as well? - // insert entry here? -Sam // localWeight += age; - //return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); + //return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); return 0; } else { return -1; } } } +#endif -uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { - CacheBlock victim = find_victim(dpp, y); +int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) { + uint64_t freeSpace = cacheDriver->get_free_space(dpp); - if (victim.cacheObj.objName.empty()) { - ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl; - return 0; /* Return zero for failure */ - } + while (freeSpace < size) { + CacheBlock victim = find_victim(dpp, y); - std::string key = build_index(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.blockID, victim.size); - auto it = entries_map.find(key); - if (it == entries_map.end()) { - return 0; - } + if (victim.cacheObj.objName.empty()) { + ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not retrieve victim block" << dendl; + return -1; + } - int avgWeight = get_min_avg_weight(y); - if (avgWeight < 0) { - return 0; - } + std::string key = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size); + auto it = entries_map.find(key); + if (it == entries_map.end()) { + return -1; + } + + int avgWeight = get_min_avg_weight(y); + if (avgWeight < 0) { + return -1; + } + + if (victim.hostsList.size() == 1 && victim.hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */ + if (victim.globalWeight) { + it->second->localWeight += victim.globalWeight; + + for (auto& entry : entries_heap) { + if (entry->key == key) { + (*(entry->handle))->localWeight = it->second->localWeight; + entries_heap.increase(entry->handle); + } + } - if (victim.hostsList.size() == 1 && victim.hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */ - if (victim.globalWeight) { - it->second->localWeight += victim.globalWeight; - if (cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y) < 0) { - return 0; + if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(it->second->localWeight), y) < 0) { + return -1; + } + + victim.globalWeight = 0; + if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { + return -1; + } } - victim.globalWeight = 0; - if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { - return 0; + if (it->second->localWeight > avgWeight) { + // TODO: push victim block to remote cache } } - if (it->second->localWeight > avgWeight) { - // TODO: push victim block to remote cache + victim.globalWeight += it->second->localWeight; + if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { + return -1; } - } - victim.globalWeight += it->second->localWeight; - if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { // just have one update? -Sam - return 0; - } + if (dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) { + return -1; + } else { + if (cacheDriver->del(dpp, key, y) < 0) { + return -1; + } else { + ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl; - ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl; + uint64_t num_entries = entries_map.size(); - if (cacheNode->del(dpp, key, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) { - return 0; - } else { - uint64_t num_entries = entries_map.size(); + if (!avgWeight) { + if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam + return -1; + } else { + if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam + return -1; + } - if (!avgWeight) { - if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam - return 0; - } else { - if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), dir->cct->_conf->rgw_local_cache_address, y) < 0) { // Where else must this be set? -Sam - return 0; - } - int age = get_age(y); - age = std::max(it->second->localWeight, age); - if (set_age(age, y) < 0) - return 0; + int age = get_age(y); + age = std::max(it->second->localWeight, age); + if (set_age(age, y) < 0) + return -1; + } } - } - return victim.size; // this doesn't account for the additional attributes that were removed and need to be set with the new block -Sam + freeSpace = cacheDriver->get_free_space(dpp); + } + + return 0; } -void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) +void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) { - erase(dpp, key); + using handle_type = boost::heap::fibonacci_heap>>::handle_type; - int age = get_age(y); - assert(age > -1); + int age = get_age(y); + int localWeight = age; + auto entry = find_entry(key); + if (entry != nullptr) { + entry->localWeight += age; + localWeight = entry->localWeight; + } + + erase(dpp, key); - LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, age); - entries_lfuda_list.push_back(*e); + LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, localWeight); + handle_type handle = entries_heap.push(e); + e->set_handle(handle); entries_map.emplace(key, e); + + if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(localWeight), y) < 0) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::update:: " << __func__ << "(): Cache driver set_attr method failed." << dendl; + } } bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) { + for (auto const& it : entries_heap) { + if (it->key == key) { + entries_heap.erase(it->handle); + break; + } + } + auto p = entries_map.find(key); if (p == entries_map.end()) { return false; } entries_map.erase(p); - entries_lfuda_list.erase_and_dispose(entries_lfuda_list.iterator_to(*(p->second)), LFUDA_Entry_delete_disposer()); - return true; + + return false; } int LRUPolicy::exist_key(std::string key) @@ -364,26 +386,24 @@ int LRUPolicy::exist_key(std::string key) return false; } -int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) +int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) { - uint64_t freeSpace = cacheNode->get_free_space(dpp); - while(freeSpace < block->size) { - freeSpace = eviction(dpp, cacheNode, y); + uint64_t freeSpace = cacheDriver->get_free_space(dpp); + + while (freeSpace < size) { + const std::lock_guard l(lru_lock); + auto p = entries_lru_list.front(); + entries_map.erase(entries_map.find(p.key)); + entries_lru_list.pop_front_and_dispose(Entry_delete_disposer()); + cacheDriver->delete_data(dpp, p.key, null_yield); + + freeSpace = cacheDriver->get_free_space(dpp); } - return 0; -} -uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) -{ - const std::lock_guard l(lru_lock); - auto p = entries_lru_list.front(); - entries_map.erase(entries_map.find(p.key)); - entries_lru_list.pop_front_and_dispose(Entry_delete_disposer()); - cacheNode->delete_data(dpp, p.key, null_yield); - return cacheNode->get_free_space(dpp); + return 0; } -void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) +void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) { erase(dpp, key); @@ -404,16 +424,4 @@ bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) return true; } -int PolicyDriver::init() { - if (policyName == "lfuda") { - cachePolicy = new LFUDAPolicy(io); - return 0; - } else if (policyName == "lru") { - cachePolicy = new LRUPolicy(); - return 0; - } - - return -1; -} - } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 63da686d9a6..1a34aad6bd7 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -1,14 +1,18 @@ #pragma once -#include -#include +#include #include "rgw_common.h" #include "d4n_directory.h" -#include "../../rgw_redis_driver.h" +#include "rgw_sal_d4n.h" +#include "rgw_cache_driver.h" #define dout_subsys ceph_subsys_rgw #define dout_context g_ceph_context +namespace rgw::sal { + class D4NFilterObject; +} + namespace rgw { namespace d4n { class CachePolicy { @@ -35,34 +39,40 @@ class CachePolicy { virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; } virtual int exist_key(std::string key) = 0; - virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; - virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; + virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) = 0; virtual void shutdown() = 0; }; class LFUDAPolicy : public CachePolicy { private: + template + struct EntryComparator { + bool operator()(T* const e1, T* const e2) const { + return e1->localWeight > e2->localWeight; + } + }; + struct LFUDAEntry : public Entry { int localWeight; - LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string version, int localWeight) : Entry(key, offset, len, version), - localWeight(localWeight) {} - }; + using handle_type = boost::heap::fibonacci_heap>>::handle_type; + handle_type handle; - struct LFUDA_Entry_delete_disposer : public Entry_delete_disposer { - void operator()(LFUDAEntry *e) { - delete e; - } + LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, int localWeight) : Entry(key, offset, len, version), + localWeight(localWeight) {} + + void set_handle(handle_type handle_) { handle = handle_; } }; - typedef boost::intrusive::list List; + using Heap = boost::heap::fibonacci_heap>>; + Heap entries_heap; std::unordered_map entries_map; net::io_context& io; std::shared_ptr conn; - List entries_lfuda_list; BlockDirectory* dir; + rgw::cache::CacheDriver* cacheDriver; int set_age(int age, optional_yield y); int get_age(optional_yield y); @@ -71,7 +81,7 @@ class LFUDAPolicy : public CachePolicy { CacheBlock find_victim(const DoutPrefixProvider* dpp, optional_yield y); public: - LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) { + LFUDAPolicy(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), io(io_context), cacheDriver{cacheDriver} { conn = std::make_shared(boost::asio::make_strand(io_context)); dir = new BlockDirectory{io}; } @@ -99,11 +109,19 @@ class LFUDAPolicy : public CachePolicy { return 0; } virtual int exist_key(std::string key) override; - virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; - virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override; virtual void shutdown() override; + + void set_local_weight(std::string& key, int localWeight); + LFUDAEntry* find_entry(std::string key) { + auto it = entries_map.find(key); + if (it == entries_map.end()) + return nullptr; + return it->second; + } }; class LRUPolicy : public CachePolicy { @@ -113,31 +131,36 @@ class LRUPolicy : public CachePolicy { std::unordered_map entries_map; std::mutex lru_lock; List entries_lru_list; + rgw::cache::CacheDriver* cacheDriver; public: - LRUPolicy() = default; + LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {} virtual int exist_key(std::string key) override; - virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; - virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override; virtual void shutdown() override {} }; class PolicyDriver { private: - net::io_context& io; std::string policyName; CachePolicy* cachePolicy; public: - PolicyDriver(net::io_context& io_context, std::string _policyName) : io(io_context), policyName(_policyName) {} + PolicyDriver(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName) + { + if (policyName == "lfuda") { + cachePolicy = new LFUDAPolicy(io_context, cacheDriver); + } else if (policyName == "lru") { + cachePolicy = new LRUPolicy(cacheDriver); + } + } ~PolicyDriver() { delete cachePolicy; } - int init(); CachePolicy* get_cache_policy() { return cachePolicy; } std::string get_policy_name() { return policyName; } }; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index f0c9650a249..8c8e7a8d92f 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -48,7 +48,7 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont cacheDriver = new rgw::cache::SSDDriver(partition_info); objDir = new rgw::d4n::ObjectDirectory(io_context); blockDir = new rgw::d4n::BlockDirectory(io_context); - policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda"); + policyDriver = new rgw::d4n::PolicyDriver(io_context, cacheDriver, "lfuda"); } D4NFilterDriver::~D4NFilterDriver() @@ -68,7 +68,6 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) objDir->init(cct, dpp); blockDir->init(cct, dpp); - policyDriver->init(); policyDriver->get_cache_policy()->init(cct, dpp); return 0; @@ -583,7 +582,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", source->driver->get_cache_driver(), y); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; @@ -604,7 +603,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", source->driver->get_cache_driver(), y); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; @@ -641,8 +640,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int len -= obj_max_req_size; } while (start_part_num < num_parts); - - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl; Attrs obj_attrs; @@ -669,7 +666,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl; return r; } - + return this->cb->flush_last_part(); } @@ -708,24 +705,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); block.blockID = ofs; // TODO: fill out block correctly block.size = bl.length(); - block.blockID = ofs; - uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); - while(freeSpace < block.size) { - freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield); - } - if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield); - /* Store block in directory */ - if (!blockDir->exist_key(&block, null_yield)) { - #if 0 - int ret = blockDir->set_value(&block); - if (ret < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; - return ret; + if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { + if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y); + + /* Store block in directory */ + if (!blockDir->exist_key(&block, *y)) { + int ret = blockDir->set(&block, *y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; + return ret; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + } } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) { + ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl; + return -1; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl; + } } - #endif } } } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache @@ -733,23 +733,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl ofs += bl_len; block.blockID = ofs; block.size = bl.length(); - uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); - while(freeSpace < block.size) { - freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield); - } - if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield); - /* Store block in directory */ - if (!blockDir->exist_key(&block, null_yield)) { - #if 0 - int ret = blockDir->set_value(&block); - if (ret < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; - return ret; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { //only block size because attributes are stored for entire obj? -Sam + if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y); + + /* Store block in directory */ + if (!blockDir->exist_key(&block, *y)) { + int ret = blockDir->set(&block, *y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; + return ret; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + } + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) { + ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl; + return -1; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl; + } } - #endif } } } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache @@ -763,25 +767,29 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (bl_rem.length() == rgw_get_obj_max_req_size) { std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length()); ofs += bl_rem.length(); - block.blockID = ofs; + block.blockID = ofs; // TODO: fill out block correctly block.size = bl_rem.length(); - uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); - while(freeSpace < block.size) { - freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield); - } - if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", filter->get_cache_driver(), null_yield); - /* Store block in directory */ - if (!blockDir->exist_key(&block, null_yield)) { - #if 0 - int ret = blockDir->set_value(&block); - if (ret < 0) { + if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { + if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) { + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", *y); + + /* Store block in directory */ + if (!blockDir->exist_key(&block, *y)) { + int ret = blockDir->set(&block, *y); + if (ret < 0) { ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; return ret; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; - } - #endif + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + } + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) { + ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl; + return -1; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl; + } + } } } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 78428a01ad2..ddf446e4861 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -24,7 +24,6 @@ #include "rgw_ssd_driver.h" #include "rgw_redis_driver.h" -#include "rgw_redis_driver.h" #include "driver/d4n/d4n_directory.h" #include "driver/d4n/d4n_policy.h" @@ -36,6 +35,10 @@ #define dout_subsys ceph_subsys_rgw #define dout_context g_ceph_context +namespace rgw::d4n { + class PolicyDriver; +} + namespace rgw { namespace sal { class D4NFilterDriver : public FilterDriver { diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index 1c104cb2f64..e6c71db65e0 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -3,6 +3,7 @@ #include #include "gtest/gtest.h" +#include "gtest/gtest_prod.h" #include "common/ceph_argparse.h" #include "rgw_auth_registry.h" #include "driver/d4n/d4n_policy.h" @@ -40,7 +41,7 @@ class Environment : public ::testing::Environment { DoutPrefixProvider* dpp; }; -class LFUDAPolicyFixture: public ::testing::Test { +class LFUDAPolicyFixture : public ::testing::Test { protected: virtual void SetUp() { block = new rgw::d4n::CacheBlock{ @@ -59,7 +60,7 @@ class LFUDAPolicyFixture: public ::testing::Test { rgw::cache::Partition partition_info{ .location = "RedisCache" }; cacheDriver = new rgw::cache::RedisDriver{io, partition_info}; - policyDriver = new rgw::d4n::PolicyDriver(io, "lfuda"); + policyDriver = new rgw::d4n::PolicyDriver(io, cacheDriver, "lfuda"); dir = new rgw::d4n::BlockDirectory{io}; conn = new connection{boost::asio::make_strand(io)}; @@ -70,7 +71,6 @@ class LFUDAPolicyFixture: public ::testing::Test { dir->init(env->cct, env->dpp); cacheDriver->initialize(env->cct, env->dpp); - policyDriver->init(); policyDriver->get_cache_policy()->init(env->cct, env->dpp); bl.append("test data"); @@ -94,6 +94,55 @@ class LFUDAPolicyFixture: public ::testing::Test { delete policyDriver; } + std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) { + return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size); + } + + int lfuda(const DoutPrefixProvider* dpp, rgw::d4n::CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) { + int age = 5; /* Arbitrary number for testing */ + std::string oid = build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size); + + if (this->policyDriver->get_cache_policy()->exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */ + auto entry = dynamic_cast(this->policyDriver->get_cache_policy())->find_entry(oid); + entry->localWeight += age; + return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(entry->localWeight), y); + } else { + if (this->policyDriver->get_cache_policy()->eviction(dpp, block->size, y) < 0) + return -1; + + int exists = dir->exist_key(block, y); + if (exists > 0) { /* Remote copy */ + if (dir->get(block, y) < 0) { + return -1; + } else { + if (!block->hostsList.empty()) { + block->globalWeight += age; + + if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) { + return -1; + } else { + return 0; + } + } else { + return -1; + } + } + } else if (!exists) { /* No remote copy */ + block->hostsList.push_back(dir->cct->_conf->rgw_local_cache_address); + block->cacheObj.hostsList.push_back(dir->cct->_conf->rgw_local_cache_address); + if (dir->set(block, y) < 0) + return -1; + + this->policyDriver->get_cache_policy()->update(dpp, oid, 0, bl.length(), "", y); + if (cacheDriver->put(dpp, oid, bl, bl.length(), attrs, y) < 0) + return -1; + return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(age), y); + } else { + return -1; + } + } + } + rgw::d4n::CacheBlock* block; rgw::d4n::BlockDirectory* dir; rgw::d4n::PolicyDriver* policyDriver; @@ -111,22 +160,9 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) spawn::spawn(io, [this] (spawn::yield_context yield) { std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield})); - policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield}); - - /* Change cache age for testing purposes */ - { - boost::system::error_code ec; - request req; - req.push("HSET", "lfuda", "age", "5"); - response resp; - - conn->async_exec(req, resp, yield[ec]); - - ASSERT_EQ((bool)ec, false); - EXPECT_EQ(std::get<0>(resp).value(), 0); - } + policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield}); - ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); + ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); dir->shutdown(); cacheDriver->shutdown(); @@ -134,7 +170,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) boost::system::error_code ec; request req; - req.push("HGET", "RedisCache/testName", "localWeight"); + req.push("HGET", "RedisCache/testBucket_testName_0_0", "localWeight"); req.push("FLUSHALL"); response resp; @@ -178,7 +214,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield})); std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size); ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield})); - policyDriver->get_cache_policy()->insert(env->dpp, victimKey, 0, bl.length(), "", cacheDriver, optional_yield{io, yield}); + policyDriver->get_cache_policy()->update(env->dpp, victimKey, 0, bl.length(), "", optional_yield{io, yield}); /* Remote block */ block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */ @@ -189,7 +225,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); - ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); + ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); dir->shutdown(); cacheDriver->shutdown(); @@ -199,19 +235,19 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) boost::system::error_code ec; request req; req.push("EXISTS", "RedisCache/" + victimKey); - req.push("HGET", victimKey, "globalWeight"); + req.push("EXISTS", victimKey, "globalWeight"); req.push("HGET", key, "globalWeight"); req.push("FLUSHALL"); - response resp; + response resp; conn->async_exec(req, resp, yield[ec]); ASSERT_EQ((bool)ec, false); EXPECT_EQ(std::get<0>(resp).value(), 0); - EXPECT_EQ(std::get<1>(resp).value(), "5"); - EXPECT_EQ(std::get<2>(resp).value(), "0"); + EXPECT_EQ(std::get<1>(resp).value(), 0); + EXPECT_EQ(std::get<2>(resp).value(), "5"); conn->cancel(); }); @@ -221,7 +257,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) TEST_F(LFUDAPolicyFixture, BackendGetBlockYield) { spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); + ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); dir->shutdown(); cacheDriver->shutdown();