From: Samarah Date: Tue, 31 Oct 2023 14:21:06 +0000 (+0000) Subject: rgw/d4n: this commit squashes the following commits related to LFUDA Policy. X-Git-Tag: v20.0.0~2219^2~42 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=93cbd6605932377241624082a9fe32d5adbf694e;p=ceph.git rgw/d4n: this commit squashes the following commits related to LFUDA Policy. rgw/d4n: Make minor updates, improve `LFUDAPolicy::erase` method, and fix `localWeight` policy sets rgw/d4n: Update `LFUDA::find_victim` method, add locks in policy code, and remove setting of attrs in filter's `put_async` calls d4n/policy: Update logic for minimum average weight calculation d4n/policy: Remove iteration in `eviction` Signed-off-by: Samarah --- diff --git a/ceph.spec.in b/ceph.spec.in index ed62be27f050..6e5942280bb4 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -2228,12 +2228,6 @@ fi %dir %{_localstatedir}/lib/ceph/radosgw %{_unitdir}/ceph-radosgw@.service %{_unitdir}/ceph-radosgw.target -%exclude %{_includedir}/cpp_redis -%exclude %{_includedir}/tacopie -%exclude /usr/lib/libcpp_redis.a -%exclude /usr/lib/libtacopie.a -%exclude /usr/lib/pkgconfig/cpp_redis.pc -%exclude /usr/lib/pkgconfig/tacopie.pc %post radosgw %if 0%{?suse_version} diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 58872e524da4..a518417e19b2 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -65,7 +65,7 @@ int ObjectDirectory::exist_key(CacheObj* object, optional_yield y) { return std::get<0>(resp).value(); } -void ObjectDirectory::shutdown() // generalize -Sam +void ObjectDirectory::shutdown() { // call cancel() on the connection's executor boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 2ad56f0f6199..5e6c620730fd 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -7,7 +7,6 @@ #include #define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context namespace rgw { namespace d4n { @@ -59,7 +58,7 @@ class ObjectDirectory: public Directory { // weave into write workflow -Sam cfg.clientname = "D4N.ObjectDir"; if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Object directory endpoint was not configured correctly" << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl; return -EDESTADDRREQ; } @@ -99,7 +98,7 @@ class BlockDirectory: public Directory { cfg.clientname = "D4N.BlockDir"; if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Block directory endpoint was not configured correctly" << dendl; + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl; return -EDESTADDRREQ; } diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 71ba05115b33..5f2903876e3d 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -100,31 +100,17 @@ int LFUDAPolicy::get_age(optional_yield y) { } } -int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y) { +int LFUDAPolicy::set_local_weight_sum(size_t weight, optional_yield y) { try { boost::system::error_code ec; response resp; request req; - req.push("HSET", "lfuda", "minAvgWeight:cache", cacheLocation); + req.push("HSET", dir->cct->_conf->rgw_local_cache_address, "localWeights", boost::lexical_cast(weight)); redis_exec(conn, ec, req, resp, y); if (ec) - return {}; - } catch(std::exception &e) { - return -1; - } - - try { - boost::system::error_code ec; - response resp; - request req; - req.push("HSET", "lfuda", "minAvgWeight:weight", boost::lexical_cast(weight)); - - redis_exec(conn, ec, req, resp, y); - - if (ec) - return {}; + return -1; return std::get<0>(resp).value(); /* Returns number of fields set */ } catch(std::exception &e) { @@ -132,13 +118,13 @@ int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, op } } -int LFUDAPolicy::get_min_avg_weight(optional_yield y) { +int LFUDAPolicy::get_local_weight_sum(optional_yield y) { response resp; try { boost::system::error_code ec; request req; - req.push("HEXISTS", "lfuda", "minAvgWeight:cache"); + req.push("HEXISTS", dir->cct->_conf->rgw_local_cache_address, "localWeights"); redis_exec(conn, ec, req, resp, y); @@ -149,10 +135,14 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) { } if (!std::get<0>(resp).value()) { - if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y)) { /* Initialize minimum average weight */ - return 0; - } else { + int sum = 0; + for (auto& entry : entries_map) + sum += entry.second->localWeight; + + if (set_local_weight_sum(sum, y) < 0) { /* Initialize */ return -1; + } else { + return sum; } } @@ -160,7 +150,7 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) { boost::system::error_code ec; response value; request req; - req.push("HGET", "lfuda", "minAvgWeight:weight"); + req.push("HGET", dir->cct->_conf->rgw_local_cache_address, "localWeights"); redis_exec(conn, ec, req, value, y); @@ -173,22 +163,23 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) { } } -CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) { +CacheBlock* LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) { + const std::lock_guard l(lfuda_lock); if (entries_heap.empty()) - return {}; + return nullptr; /* Get victim cache block */ std::string key = entries_heap.top()->key; - CacheBlock victim; + CacheBlock* victim = new CacheBlock(); - victim.cacheObj.bucketName = key.substr(0, key.find('_')); + victim->cacheObj.bucketName = key.substr(0, key.find('_')); key.erase(0, key.find('_') + 1); - victim.cacheObj.objName = key.substr(0, key.find('_')); - victim.blockID = entries_heap.top()->offset; - victim.size = entries_heap.top()->len; + victim->cacheObj.objName = key.substr(0, key.find('_')); + victim->blockID = entries_heap.top()->offset; + victim->size = entries_heap.top()->len; - if (dir->get(&victim, y) < 0) { - return {}; + if (dir->get(victim, y) < 0) { + return nullptr; } return victim; @@ -220,7 +211,7 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); } else { if (eviction(dpp, block->size, cacheDriver, y) < 0) - return -1; // what if eviction turns into infinite loop? -Sam + return -1; int exists = dir->exist_key(block, y); if (exists > 0) { /* Remote copy */ @@ -253,81 +244,79 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) { uint64_t freeSpace = cacheDriver->get_free_space(dpp); - while (freeSpace < size) { - CacheBlock victim = find_victim(dpp, y); + while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop? + CacheBlock* victim = find_victim(dpp, y); - if (victim.cacheObj.objName.empty()) { - ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not retrieve victim block" << dendl; + if (victim == nullptr) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl; + delete victim; return -1; } - std::string key = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size); + std::string key = victim->cacheObj.bucketName + "_" + victim->cacheObj.objName + "_" + std::to_string(victim->blockID) + "_" + std::to_string(victim->size); auto it = entries_map.find(key); if (it == entries_map.end()) { + delete victim; return -1; } - int avgWeight = get_min_avg_weight(y); + int avgWeight = get_local_weight_sum(y) / entries_map.size(); if (avgWeight < 0) { + delete victim; return -1; } - if (victim.hostsList.size() == 1 && victim.hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */ - if (victim.globalWeight) { - it->second->localWeight += victim.globalWeight; + if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */ + if (victim->globalWeight) { + const std::lock_guard l(lfuda_lock); + it->second->localWeight += victim->globalWeight; + (*it->second->handle)->localWeight = it->second->localWeight; + entries_heap.increase(it->second->handle); - for (auto& entry : entries_heap) { - if (entry->key == key) { - (*(entry->handle))->localWeight = it->second->localWeight; - entries_heap.increase(entry->handle); - } - } - - if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(it->second->localWeight), y) < 0) { + if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(it->second->localWeight), y) < 0) return -1; - } - victim.globalWeight = 0; - if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { + victim->globalWeight = 0; + if (dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) { + delete victim; return -1; - } + } } if (it->second->localWeight > avgWeight) { // TODO: push victim block to remote cache + // add remote cache host to host list } } - victim.globalWeight += it->second->localWeight; - if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { + victim->globalWeight += it->second->localWeight; + if (dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) { + delete victim; return -1; } - if (dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) { + if (dir->remove_host(victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) { + delete victim; return -1; - } else { - if (cacheDriver->del(dpp, key, y) < 0) { - return -1; - } else { - ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl; + } - uint64_t num_entries = entries_map.size(); + delete victim; - if (!avgWeight) { - if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam - return -1; - } else { - if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam - return -1; - } + if (cacheDriver->del(dpp, key, y) < 0) + return -1; - int age = get_age(y); - age = std::max(it->second->localWeight, age); - if (set_age(age, y) < 0) - return -1; - } - } + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl; + + int weight = (avgWeight * entries_map.size()) - it->second->localWeight; + if (set_local_weight_sum((weight > 0) ? weight : 0, y) < 0) + return -1; + + int age = get_age(y); + age = std::max(it->second->localWeight, age); + if (set_age(age, y) < 0) + return -1; + erase(dpp, key, y); freeSpace = cacheDriver->get_free_space(dpp); } @@ -346,35 +335,40 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64 localWeight = entry->localWeight; } - erase(dpp, key); + erase(dpp, key, y); + const std::lock_guard l(lfuda_lock); LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, localWeight); handle_type handle = entries_heap.push(e); e->set_handle(handle); entries_map.emplace(key, e); - if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(localWeight), y) < 0) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::update:: " << __func__ << "(): Cache driver set_attr method failed." << dendl; - } + if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl; + + auto localWeights = get_local_weight_sum(y); + localWeights += localWeight; + if (set_local_weight_sum(localWeights, y) < 0) + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl; } -bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) +bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) { - for (auto const& it : entries_heap) { - if (it->key == key) { - entries_heap.erase(it->handle); - break; - } - } - + const std::lock_guard l(lfuda_lock); auto p = entries_map.find(key); if (p == entries_map.end()) { return false; } + auto localWeights = get_local_weight_sum(y); + localWeights -= p->second->localWeight; + if (set_local_weight_sum(localWeights, y) < 0) + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl; + entries_map.erase(p); + entries_heap.erase(p->second->handle); - return false; + return true; } int LRUPolicy::exist_key(std::string key) @@ -405,14 +399,14 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) { - erase(dpp, key); + erase(dpp, key, y); Entry *e = new Entry(key, offset, len, version); entries_lru_list.push_back(*e); entries_map.emplace(key, e); } -bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) +bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) { const std::lock_guard l(lru_lock); auto p = entries_map.find(key); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 1a34aad6bd71..cb82eae0d94f 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -1,13 +1,11 @@ #pragma once #include -#include "rgw_common.h" #include "d4n_directory.h" #include "rgw_sal_d4n.h" #include "rgw_cache_driver.h" #define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context namespace rgw::sal { class D4NFilterObject; @@ -41,7 +39,7 @@ class CachePolicy { virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0; - virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) = 0; + virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual void shutdown() = 0; }; @@ -68,6 +66,7 @@ class LFUDAPolicy : public CachePolicy { using Heap = boost::heap::fibonacci_heap>>; Heap entries_heap; std::unordered_map entries_map; + std::mutex lfuda_lock; net::io_context& io; std::shared_ptr conn; @@ -76,9 +75,9 @@ class LFUDAPolicy : public CachePolicy { int set_age(int age, optional_yield y); int get_age(optional_yield y); - int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y); - int get_min_avg_weight(optional_yield y); - CacheBlock find_victim(const DoutPrefixProvider* dpp, optional_yield y); + int set_local_weight_sum(size_t weight, optional_yield y); + int get_local_weight_sum(optional_yield y); + CacheBlock* find_victim(const DoutPrefixProvider* dpp, optional_yield y); public: LFUDAPolicy(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), io(io_context), cacheDriver{cacheDriver} { @@ -99,7 +98,7 @@ class LFUDAPolicy : public CachePolicy { cfg.clientname = "D4N.Policy"; if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Endpoint was not configured correctly." << dendl; return -EDESTADDRREQ; } @@ -112,7 +111,7 @@ class LFUDAPolicy : public CachePolicy { //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override; - virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override; + virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual void shutdown() override; void set_local_weight(std::string& key, int localWeight); @@ -139,7 +138,7 @@ class LRUPolicy : public CachePolicy { virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override; - virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override; + virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual void shutdown() override {} }; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index d1ed8991c40f..b1d8e33187aa 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -15,9 +15,6 @@ #include "rgw_sal_d4n.h" -#define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context - namespace rgw { namespace sal { static inline Bucket* nextBucket(Bucket* t) @@ -126,13 +123,9 @@ int D4NFilterObject::copy_object(User* user, .objName = this->get_key().get_oid(), .bucketName = src_bucket->get_name() }; - int copy_valueReturn = driver->get_obj_dir()->copy(&obj, dest_object->get_name(), dest_bucket->get_name(), y); - if (copy_valueReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation succeeded." << dendl; - } + if (driver->get_obj_dir()->copy(&obj, dest_object->get_name(), dest_bucket->get_name(), y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory copy method failed." << dendl; /* Append additional metadata to attributes */ rgw::sal::Attrs baseAttrs = this->get_attrs(); @@ -209,13 +202,8 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr } } - int update_attrsReturn = driver->get_cache_driver()->set_attrs(dpp, this->get_key().get_oid(), *setattrs, y); - - if (update_attrsReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation succeeded." << dendl; - } + if (driver->get_cache_driver()->set_attrs(dpp, this->get_key().get_oid(), *setattrs, y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed." << dendl; } if (delattrs != NULL) { @@ -229,13 +217,8 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr } } - int del_attrsReturn = driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), *delattrs, y); - - if (del_attrsReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation succeeded." << dendl; - } + if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), *delattrs, y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl; } return next->set_obj_attrs(dpp, setattrs, delattrs, y); @@ -245,11 +228,9 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d rgw_obj* target_obj) { rgw::sal::Attrs attrs; - int get_attrsReturn = driver->get_cache_driver()->get_attrs(dpp, this->get_key().get_oid(), attrs, y); - - if (get_attrsReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl; + if (driver->get_cache_driver()->get_attrs(dpp, this->get_key().get_oid(), attrs, y) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; return next->get_obj_attrs(y, dpp, target_obj); } else { /* Set metadata locally */ @@ -298,18 +279,13 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d this->set_obj_state(*astate); /* Set attributes locally */ - int set_attrsReturn = this->set_attrs(attrs); - - if (set_attrsReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl; - + if (this->set_attrs(attrs) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; return next->get_obj_attrs(y, dpp, target_obj); - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation succeeded." << dendl; - - return 0; } } + + return 0; } int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, @@ -317,13 +293,9 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va { Attrs update; update[(std::string)attr_name] = attr_val; - int update_attrsReturn = driver->get_cache_driver()->update_attrs(dpp, this->get_key().get_oid(), update, y); - if (update_attrsReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation succeeded." << dendl; - } + if (driver->get_cache_driver()->update_attrs(dpp, this->get_key().get_oid(), update, y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver update_attrs method failed." << dendl; return next->modify_obj_attrs(attr_name, attr_val, y, dpp); } @@ -339,14 +311,10 @@ int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* /* Ensure delAttr exists */ if (std::find_if(currentattrs.begin(), currentattrs.end(), - [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) { - int delAttrReturn = driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), delattr, y); + [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) { - if (delAttrReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation succeeded." << dendl; - } + if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), delattr, y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl; } else return next->delete_obj_attrs(dpp, attr_name, y); @@ -389,10 +357,6 @@ std::unique_ptr D4NFilterObject::get_delete_op() int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp) { - rgw::sal::Attrs attrs; - int getObjReturn = source->driver->get_cache_driver()->get_attrs(dpp, source->get_key().get_oid(), - attrs, y); - next->params.mod_ptr = params.mod_ptr; next->params.unmod_ptr = params.unmod_ptr; next->params.high_precision_time = params.high_precision_time; @@ -402,9 +366,11 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix next->params.if_nomatch = params.if_nomatch; next->params.lastmod = params.lastmod; int ret = next->prepare(y, dpp); - - if (getObjReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation failed." << dendl; + + rgw::sal::Attrs attrs; + + if (source->driver->get_cache_driver()->get_attrs(dpp, source->get_key().get_oid(), attrs, y) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; } else { /* Set metadata locally */ RGWObjState* astate; @@ -412,51 +378,46 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix std::unique_ptr user = source->driver->get_user(source->get_bucket()->get_owner()); source->get_obj_state(dpp, &astate, y); - for (auto it = attrs.begin(); it != attrs.end(); ++it) { - if (it->second.length() > 0) { // or return? -Sam - if (it->first == "mtime") { - parse_time(it->second.c_str(), &astate->mtime); - attrs.erase(it->first); - } else if (it->first == "object_size") { - source->set_obj_size(std::stoull(it->second.c_str())); - attrs.erase(it->first); - } else if (it->first == "accounted_size") { - astate->accounted_size = std::stoull(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "epoch") { - astate->epoch = std::stoull(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "version_id") { - source->set_instance(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "source_zone_short_id") { - astate->zone_short_id = static_cast(std::stoul(it->second.c_str())); - attrs.erase(it->first); - } else if (it->first == "user_quota.max_size") { - quota_info.max_size = std::stoull(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "user_quota.max_objects") { - quota_info.max_objects = std::stoull(it->second.c_str()); - attrs.erase(it->first); - } else if (it->first == "max_buckets") { - user->set_max_buckets(std::stoull(it->second.c_str())); - attrs.erase(it->first); - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Unexpected attribute; not locally set." << dendl; - attrs.erase(it->first); + for (auto& attr : attrs) { + if (attr.second.length() > 0) { + if (attr.first == "mtime") { + parse_time(attr.second.c_str(), &astate->mtime); + attrs.erase(attr.first); + } else if (attr.first == "object_size") { + source->set_obj_size(std::stoull(attr.second.c_str())); + attrs.erase(attr.first); + } else if (attr.first == "accounted_size") { + astate->accounted_size = std::stoull(attr.second.c_str()); + attrs.erase(attr.first); + } else if (attr.first == "epoch") { + astate->epoch = std::stoull(attr.second.c_str()); + attrs.erase(attr.first); + } else if (attr.first == "version_id") { + source->set_instance(attr.second.c_str()); + attrs.erase(attr.first); + } else if (attr.first == "source_zone_short_id") { + astate->zone_short_id = static_cast(std::stoul(attr.second.c_str())); + attrs.erase(attr.first); + } else if (attr.first == "user_quota.max_size") { + quota_info.max_size = std::stoull(attr.second.c_str()); + attrs.erase(attr.first); + } else if (attr.first == "user_quota.max_objects") { + quota_info.max_objects = std::stoull(attr.second.c_str()); + attrs.erase(attr.first); + } else if (attr.first == "max_buckets") { + user->set_max_buckets(std::stoull(attr.second.c_str())); + attrs.erase(attr.first); + } else { + ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): Unexpected attribute; not locally set." << dendl; + attrs.erase(attr.first); + } } - } user->set_info(quota_info); source->set_obj_state(*astate); /* Set attributes locally */ - int set_attrsReturn = source->set_attrs(attrs); - - if (set_attrsReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation succeeded." << dendl; - } + if (source->set_attrs(attrs) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; } } @@ -569,12 +530,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - /* Build base block for inserting in LFUDA */ - rgw::d4n::CacheBlock block; - block.blockID = ofs; - block.cacheObj.objName = source->get_key().get_oid(); - block.cacheObj.bucketName = source->get_bucket()->get_name(); - if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache)) { // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); @@ -697,31 +652,23 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl block.cacheObj.creationTime = 0; block.cacheObj.dirty = false;// update hostsList since may overwrite existing hosts -Sam block.cacheObj.hostsList.push_back(blockDir->cct->_conf->rgw_local_cache_address); // Is the entire object getting stored in the local cache as well or only blocks? -Sam + Attrs attrs; // empty attrs for block sets if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); block.blockID = ofs; // TODO: fill out block correctly block.size = bl.length(); if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { - if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y); + if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) { + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y); - /* Store block in directory */ - if (!blockDir->exist_key(&block, *y)) { - int ret = blockDir->set(&block, *y); - if (ret < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; - return ret; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; - } + /* Store block in directory */ + if (!blockDir->exist_key(&block, *y)) { // If the block exists, do we want to update anything else? -Sam + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl; - return -1; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl; - } + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; } } } @@ -730,26 +677,17 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl ofs += bl_len; block.blockID = ofs; block.size = bl.length(); - if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { //only block size because attributes are stored for entire obj? -Sam - if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { + if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { + if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) { filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y); /* Store block in directory */ if (!blockDir->exist_key(&block, *y)) { - int ret = blockDir->set(&block, *y); - if (ret < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; - return ret; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; - } - } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl; - return -1; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl; - } + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; } } } @@ -767,26 +705,17 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl block.blockID = ofs; // TODO: fill out block correctly block.size = bl_rem.length(); if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { - if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) { + if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs) == 0) { filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", *y); - /* Store block in directory */ + /* Store block in directory */ if (!blockDir->exist_key(&block, *y)) { - int ret = blockDir->set(&block, *y); - if (ret < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; - return ret; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; - } - } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) { - ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl; - return -1; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl; - } - } + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; + } } } @@ -809,13 +738,8 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp }, .blockID = 0 // TODO: get correct blockID }; - int delDirReturn = source->driver->get_block_dir()->del(&block, y); - - if (delDirReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation succeeded." << dendl; - } + if (source->driver->get_block_dir()->del(&block, y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory del method failed." << dendl; Attrs::iterator attrs; Attrs currentattrs = source->get_attrs(); @@ -826,26 +750,16 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp currentFields.push_back(attrs->first); } - int delObjReturn = source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y); - - if (delObjReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation succeeded." << dendl; - } + if (source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver del method failed." << dendl; return next->delete_obj(dpp, y, flags); } int D4NFilterWriter::prepare(optional_yield y) { - int del_dataReturn = driver->get_cache_driver()->delete_data(save_dpp, obj->get_key().get_oid(), y); - - if (del_dataReturn < 0) { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation failed." << dendl; - } else { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation succeeded." << dendl; - } + if (driver->get_cache_driver()->delete_data(save_dpp, obj->get_key().get_oid(), y) < 0) + ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl; return next->prepare(y); } @@ -883,19 +797,14 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, .dirty = false, .hostsList = { driver->get_block_dir()->cct->_conf->rgw_local_cache_address } }, - .blockID = 0, // TODO: get correct version/blockID + .blockID = 0, // TODO: get correct blockID .version = "", .size = accounted_size, .hostsList = { driver->get_block_dir()->cct->_conf->rgw_local_cache_address } }; - int setDirReturn = driver->get_block_dir()->set(&block, y); - - if (setDirReturn < 0) { - ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl; - } else { - ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; - } + if (driver->get_block_dir()->set(&block, y) < 0) + ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed." << dendl; /* Retrieve complete set of attrs */ int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index ddf446e48618..a7d7d1c6f846 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -32,9 +32,6 @@ #include #include -#define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context - namespace rgw::d4n { class PolicyDriver; } diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index 90c52b1846bb..f981e74502d5 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -109,7 +109,7 @@ int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) cfg.addr.port = address.substr(address.find(":") + 1, address.length()); if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; + ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl; return -EDESTADDRREQ; } @@ -238,9 +238,28 @@ int RedisDriver::del(const DoutPrefixProvider* dpp, const std::string& key, opti int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) { + response exists; std::string value; std::string entry = partition_info.location + key; + try { + boost::system::error_code ec; + request req; + req.push("HEXISTS", entry, "data"); + + redis_exec(conn, ec, req, exists, y); + + if (ec) + return -1; + } catch(std::exception &e) { + return -1; + } + + if (!std::get<0>(exists).value()) { + ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Data field was not found." << dendl; + return -1; + } + try { boost::system::error_code ec; response resp; @@ -257,7 +276,7 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& return -1; } - try { // do we want key check here? -Sam + try { /* Append to existing value or set as new value */ boost::system::error_code ec; response resp; @@ -462,7 +481,7 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri } if (!std::get<0>(resp).value()) { - ldpp_dout(dpp, 20) << "RGW Redis Cache: Attribute was not set." << dendl; + ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Attribute was not found." << dendl; return {}; } diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index 8624fdbc5f9b..87815fb98dcd 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -34,7 +34,7 @@ class RedisDriver : public CacheDriver { /* Partition */ virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; } - virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } // how to get this from redis server? -Sam + virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } static std::optional get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type); static std::vector list_partitions(const DoutPrefixProvider* dpp); diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index e6c71db65e06..2d299ebf2d1a 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -58,7 +58,7 @@ class LFUDAPolicyFixture : public ::testing::Test { .hostsList = { env->redisHost } }; - rgw::cache::Partition partition_info{ .location = "RedisCache" }; + rgw::cache::Partition partition_info{ .location = "RedisCache", .size = 1000 }; cacheDriver = new rgw::cache::RedisDriver{io, partition_info}; policyDriver = new rgw::d4n::PolicyDriver(io, cacheDriver, "lfuda"); dir = new rgw::d4n::BlockDirectory{io}; @@ -129,7 +129,6 @@ class LFUDAPolicyFixture : public ::testing::Test { } } else if (!exists) { /* No remote copy */ block->hostsList.push_back(dir->cct->_conf->rgw_local_cache_address); - block->cacheObj.hostsList.push_back(dir->cct->_conf->rgw_local_cache_address); if (dir->set(block, y) < 0) return -1;