namespace rgw { namespace d4n {
-std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) {
- return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size);
-}
-
// initiate a call to async_exec() on the connection's executor
struct initiate_exec {
std::shared_ptr<boost::redis::connection> conn;
}
CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) {
- if (entries_map.empty())
+ if (entries_heap.empty())
return {};
- auto it = std::min_element(std::begin(entries_map), std::end(entries_map),
- [](const auto& l, const auto& r) { return l.second->localWeight < r.second->localWeight; });
-
/* Get victim cache block */
- std::string key = it->second->key;
+ std::string key = entries_heap.top()->key;
CacheBlock victim;
victim.cacheObj.bucketName = key.substr(0, key.find('_'));
key.erase(0, key.find('_') + 1);
victim.cacheObj.objName = key.substr(0, key.find('_'));
- victim.blockID = it->second->offset;
- victim.size = it->second->len;
+ victim.blockID = entries_heap.top()->offset;
+ victim.size = entries_heap.top()->len;
if (dir->get(&victim, y) < 0) {
return {};
return false;
}
-int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
+#if 0
+int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
response<std::string> resp;
int age = get_age(y);
if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size));
it->second->localWeight += age;
- return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
+ return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
} else {
- uint64_t freeSpace = cacheNode->get_free_space(dpp);
-
- while (freeSpace < block->size) { /* Not enough space in local cache */
- if (int ret = eviction(dpp, cacheNode, y) > 0)
- freeSpace += ret;
- else
- return -1;
- }
+ if (eviction(dpp, block->size, cacheDriver, y) < 0)
+ return -1; // what if eviction turns into infinite loop? -Sam
int exists = dir->exist_key(block, y);
if (exists > 0) { /* Remote copy */
}
}
} else if (!exists) { /* No remote copy */
- // how to get bufferlist data? -Sam
- // do I need to add the block to the local cache here? -Sam
- // update hosts list for block as well?
- // insert entry here? -Sam
// localWeight += age;
- //return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
+ //return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
return 0;
} else {
return -1;
}
}
}
+#endif
-uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
- CacheBlock victim = find_victim(dpp, y);
+int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) {
+ uint64_t freeSpace = cacheDriver->get_free_space(dpp);
- if (victim.cacheObj.objName.empty()) {
- ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
- return 0; /* Return zero for failure */
- }
+ while (freeSpace < size) {
+ CacheBlock victim = find_victim(dpp, y);
- std::string key = build_index(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.blockID, victim.size);
- auto it = entries_map.find(key);
- if (it == entries_map.end()) {
- return 0;
- }
+ if (victim.cacheObj.objName.empty()) {
+ ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not retrieve victim block" << dendl;
+ return -1;
+ }
- int avgWeight = get_min_avg_weight(y);
- if (avgWeight < 0) {
- return 0;
- }
+ std::string key = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size);
+ auto it = entries_map.find(key);
+ if (it == entries_map.end()) {
+ return -1;
+ }
+
+ int avgWeight = get_min_avg_weight(y);
+ if (avgWeight < 0) {
+ return -1;
+ }
+
+ if (victim.hostsList.size() == 1 && victim.hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */
+ if (victim.globalWeight) {
+ it->second->localWeight += victim.globalWeight;
+
+ for (auto& entry : entries_heap) {
+ if (entry->key == key) {
+ (*(entry->handle))->localWeight = it->second->localWeight;
+ entries_heap.increase(entry->handle);
+ }
+ }
- if (victim.hostsList.size() == 1 && victim.hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */
- if (victim.globalWeight) {
- it->second->localWeight += victim.globalWeight;
- if (cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y) < 0) {
- return 0;
+ if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(it->second->localWeight), y) < 0) {
+ return -1;
+ }
+
+ victim.globalWeight = 0;
+ if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) {
+ return -1;
+ }
}
- victim.globalWeight = 0;
- if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) {
- return 0;
+ if (it->second->localWeight > avgWeight) {
+ // TODO: push victim block to remote cache
}
}
- if (it->second->localWeight > avgWeight) {
- // TODO: push victim block to remote cache
+ victim.globalWeight += it->second->localWeight;
+ if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) {
+ return -1;
}
- }
- victim.globalWeight += it->second->localWeight;
- if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { // just have one update? -Sam
- return 0;
- }
+ if (dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
+ return -1;
+ } else {
+ if (cacheDriver->del(dpp, key, y) < 0) {
+ return -1;
+ } else {
+ ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
- ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
+ uint64_t num_entries = entries_map.size();
- if (cacheNode->del(dpp, key, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
- return 0;
- } else {
- uint64_t num_entries = entries_map.size();
+ if (!avgWeight) {
+ if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam
+ return -1;
+ } else {
+ if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam
+ return -1;
+ }
- if (!avgWeight) {
- if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam
- return 0;
- } else {
- if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), dir->cct->_conf->rgw_local_cache_address, y) < 0) { // Where else must this be set? -Sam
- return 0;
- }
- int age = get_age(y);
- age = std::max(it->second->localWeight, age);
- if (set_age(age, y) < 0)
- return 0;
+ int age = get_age(y);
+ age = std::max(it->second->localWeight, age);
+ if (set_age(age, y) < 0)
+ return -1;
+ }
}
- }
- return victim.size; // this doesn't account for the additional attributes that were removed and need to be set with the new block -Sam
+ freeSpace = cacheDriver->get_free_space(dpp);
+ }
+
+ return 0;
}
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
{
- erase(dpp, key);
+ using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
- int age = get_age(y);
- assert(age > -1);
+ int age = get_age(y);
+ int localWeight = age;
+ auto entry = find_entry(key);
+ if (entry != nullptr) {
+ entry->localWeight += age;
+ localWeight = entry->localWeight;
+ }
+
+ erase(dpp, key);
- LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, age);
- entries_lfuda_list.push_back(*e);
+ LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, localWeight);
+ handle_type handle = entries_heap.push(e);
+ e->set_handle(handle);
entries_map.emplace(key, e);
+
+ if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(localWeight), y) < 0) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::update:: " << __func__ << "(): Cache driver set_attr method failed." << dendl;
+ }
}
bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
{
+ for (auto const& it : entries_heap) {
+ if (it->key == key) {
+ entries_heap.erase(it->handle);
+ break;
+ }
+ }
+
auto p = entries_map.find(key);
if (p == entries_map.end()) {
return false;
}
entries_map.erase(p);
- entries_lfuda_list.erase_and_dispose(entries_lfuda_list.iterator_to(*(p->second)), LFUDA_Entry_delete_disposer());
- return true;
+
+ return false;
}
int LRUPolicy::exist_key(std::string key)
return false;
}
-int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y)
{
- uint64_t freeSpace = cacheNode->get_free_space(dpp);
- while(freeSpace < block->size) {
- freeSpace = eviction(dpp, cacheNode, y);
+ uint64_t freeSpace = cacheDriver->get_free_space(dpp);
+
+ while (freeSpace < size) {
+ const std::lock_guard l(lru_lock);
+ auto p = entries_lru_list.front();
+ entries_map.erase(entries_map.find(p.key));
+ entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
+ cacheDriver->delete_data(dpp, p.key, null_yield);
+
+ freeSpace = cacheDriver->get_free_space(dpp);
}
- return 0;
-}
-uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y)
-{
- const std::lock_guard l(lru_lock);
- auto p = entries_lru_list.front();
- entries_map.erase(entries_map.find(p.key));
- entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
- cacheNode->delete_data(dpp, p.key, null_yield);
- return cacheNode->get_free_space(dpp);
+ return 0;
}
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
{
erase(dpp, key);
return true;
}
-int PolicyDriver::init() {
- if (policyName == "lfuda") {
- cachePolicy = new LFUDAPolicy(io);
- return 0;
- } else if (policyName == "lru") {
- cachePolicy = new LRUPolicy();
- return 0;
- }
-
- return -1;
-}
-
} } // namespace rgw::d4n
#pragma once
-#include <string>
-#include <iostream>
+#include <boost/heap/fibonacci_heap.hpp>
#include "rgw_common.h"
#include "d4n_directory.h"
-#include "../../rgw_redis_driver.h"
+#include "rgw_sal_d4n.h"
+#include "rgw_cache_driver.h"
#define dout_subsys ceph_subsys_rgw
#define dout_context g_ceph_context
+namespace rgw::sal {
+ class D4NFilterObject;
+}
+
namespace rgw { namespace d4n {
class CachePolicy {
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; }
virtual int exist_key(std::string key) = 0;
- virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
- virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+ virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) = 0;
virtual void shutdown() = 0;
};
class LFUDAPolicy : public CachePolicy {
private:
+ template<typename T>
+ struct EntryComparator {
+ bool operator()(T* const e1, T* const e2) const {
+ return e1->localWeight > e2->localWeight;
+ }
+ };
+
struct LFUDAEntry : public Entry {
int localWeight;
- LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string version, int localWeight) : Entry(key, offset, len, version),
- localWeight(localWeight) {}
- };
+ using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
+ handle_type handle;
- struct LFUDA_Entry_delete_disposer : public Entry_delete_disposer {
- void operator()(LFUDAEntry *e) {
- delete e;
- }
+ LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, int localWeight) : Entry(key, offset, len, version),
+ localWeight(localWeight) {}
+
+ void set_handle(handle_type handle_) { handle = handle_; }
};
- typedef boost::intrusive::list<LFUDAEntry> List;
+ using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
+ Heap entries_heap;
std::unordered_map<std::string, LFUDAEntry*> entries_map;
net::io_context& io;
std::shared_ptr<connection> conn;
- List entries_lfuda_list;
BlockDirectory* dir;
+ rgw::cache::CacheDriver* cacheDriver;
int set_age(int age, optional_yield y);
int get_age(optional_yield y);
CacheBlock find_victim(const DoutPrefixProvider* dpp, optional_yield y);
public:
- LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) {
+ LFUDAPolicy(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), io(io_context), cacheDriver{cacheDriver} {
conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
dir = new BlockDirectory{io};
}
return 0;
}
virtual int exist_key(std::string key) override;
- virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
- virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
virtual void shutdown() override;
+
+ void set_local_weight(std::string& key, int localWeight);
+ LFUDAEntry* find_entry(std::string key) {
+ auto it = entries_map.find(key);
+ if (it == entries_map.end())
+ return nullptr;
+ return it->second;
+ }
};
class LRUPolicy : public CachePolicy {
std::unordered_map<std::string, Entry*> entries_map;
std::mutex lru_lock;
List entries_lru_list;
+ rgw::cache::CacheDriver* cacheDriver;
public:
- LRUPolicy() = default;
+ LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
virtual int exist_key(std::string key) override;
- virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
- virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
virtual void shutdown() override {}
};
class PolicyDriver {
private:
- net::io_context& io;
std::string policyName;
CachePolicy* cachePolicy;
public:
- PolicyDriver(net::io_context& io_context, std::string _policyName) : io(io_context), policyName(_policyName) {}
+ PolicyDriver(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName)
+ {
+ if (policyName == "lfuda") {
+ cachePolicy = new LFUDAPolicy(io_context, cacheDriver);
+ } else if (policyName == "lru") {
+ cachePolicy = new LRUPolicy(cacheDriver);
+ }
+ }
~PolicyDriver() {
delete cachePolicy;
}
- int init();
CachePolicy* get_cache_policy() { return cachePolicy; }
std::string get_policy_name() { return policyName; }
};
cacheDriver = new rgw::cache::SSDDriver(partition_info);
objDir = new rgw::d4n::ObjectDirectory(io_context);
blockDir = new rgw::d4n::BlockDirectory(io_context);
- policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda");
+ policyDriver = new rgw::d4n::PolicyDriver(io_context, cacheDriver, "lfuda");
}
D4NFilterDriver::~D4NFilterDriver()
objDir->init(cct, dpp);
blockDir->init(cct, dpp);
- policyDriver->init();
policyDriver->get_cache_policy()->init(cct, dpp);
return 0;
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", source->driver->get_cache_driver(), y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", source->driver->get_cache_driver(), y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
len -= obj_max_req_size;
} while (start_part_num < num_parts);
-
-
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
Attrs obj_attrs;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl;
return r;
}
-
+
return this->cb->flush_last_part();
}
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
block.blockID = ofs; // TODO: fill out block correctly
block.size = bl.length();
- block.blockID = ofs;
- uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
- while(freeSpace < block.size) {
- freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
- }
- if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
- /* Store block in directory */
- if (!blockDir->exist_key(&block, null_yield)) {
- #if 0
- int ret = blockDir->set_value(&block);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
- return ret;
+ if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
+ if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
+
+ /* Store block in directory */
+ if (!blockDir->exist_key(&block, *y)) {
+ int ret = blockDir->set(&block, *y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+ return ret;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ }
} else {
- ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) {
+ ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl;
+ return -1;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl;
+ }
}
- #endif
}
}
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
ofs += bl_len;
block.blockID = ofs;
block.size = bl.length();
- uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
- while(freeSpace < block.size) {
- freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
- }
- if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
- /* Store block in directory */
- if (!blockDir->exist_key(&block, null_yield)) {
- #if 0
- int ret = blockDir->set_value(&block);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
- return ret;
- } else {
- ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { //only block size because attributes are stored for entire obj? -Sam
+ if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
+
+ /* Store block in directory */
+ if (!blockDir->exist_key(&block, *y)) {
+ int ret = blockDir->set(&block, *y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+ return ret;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ }
+ } else {
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) {
+ ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl;
+ return -1;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl;
+ }
}
- #endif
}
}
} else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
if (bl_rem.length() == rgw_get_obj_max_req_size) {
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
ofs += bl_rem.length();
- block.blockID = ofs;
+ block.blockID = ofs; // TODO: fill out block correctly
block.size = bl_rem.length();
- uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
- while(freeSpace < block.size) {
- freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
- }
- if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", filter->get_cache_driver(), null_yield);
- /* Store block in directory */
- if (!blockDir->exist_key(&block, null_yield)) {
- #if 0
- int ret = blockDir->set_value(&block);
- if (ret < 0) {
+ if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
+ if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", *y);
+
+ /* Store block in directory */
+ if (!blockDir->exist_key(&block, *y)) {
+ int ret = blockDir->set(&block, *y);
+ if (ret < 0) {
ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
return ret;
- } else {
- ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
- }
- #endif
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ }
+ } else {
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) {
+ ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl;
+ return -1;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl;
+ }
+ }
}
}
#include "rgw_ssd_driver.h"
#include "rgw_redis_driver.h"
-#include "rgw_redis_driver.h"
#include "driver/d4n/d4n_directory.h"
#include "driver/d4n/d4n_policy.h"
#define dout_subsys ceph_subsys_rgw
#define dout_context g_ceph_context
+namespace rgw::d4n {
+ class PolicyDriver;
+}
+
namespace rgw { namespace sal {
class D4NFilterDriver : public FilterDriver {
#include <boost/redis/connection.hpp>
#include "gtest/gtest.h"
+#include "gtest/gtest_prod.h"
#include "common/ceph_argparse.h"
#include "rgw_auth_registry.h"
#include "driver/d4n/d4n_policy.h"
DoutPrefixProvider* dpp;
};
-class LFUDAPolicyFixture: public ::testing::Test {
+class LFUDAPolicyFixture : public ::testing::Test {
protected:
virtual void SetUp() {
block = new rgw::d4n::CacheBlock{
rgw::cache::Partition partition_info{ .location = "RedisCache" };
cacheDriver = new rgw::cache::RedisDriver{io, partition_info};
- policyDriver = new rgw::d4n::PolicyDriver(io, "lfuda");
+ policyDriver = new rgw::d4n::PolicyDriver(io, cacheDriver, "lfuda");
dir = new rgw::d4n::BlockDirectory{io};
conn = new connection{boost::asio::make_strand(io)};
dir->init(env->cct, env->dpp);
cacheDriver->initialize(env->cct, env->dpp);
- policyDriver->init();
policyDriver->get_cache_policy()->init(env->cct, env->dpp);
bl.append("test data");
delete policyDriver;
}
+ std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) {
+ return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size);
+ }
+
+ int lfuda(const DoutPrefixProvider* dpp, rgw::d4n::CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
+ int age = 5; /* Arbitrary number for testing */
+ std::string oid = build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size);
+
+ if (this->policyDriver->get_cache_policy()->exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
+ auto entry = dynamic_cast<rgw::d4n::LFUDAPolicy*>(this->policyDriver->get_cache_policy())->find_entry(oid);
+ entry->localWeight += age;
+ return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(entry->localWeight), y);
+ } else {
+ if (this->policyDriver->get_cache_policy()->eviction(dpp, block->size, y) < 0)
+ return -1;
+
+ int exists = dir->exist_key(block, y);
+ if (exists > 0) { /* Remote copy */
+ if (dir->get(block, y) < 0) {
+ return -1;
+ } else {
+ if (!block->hostsList.empty()) {
+ block->globalWeight += age;
+
+ if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
+ } else {
+ return -1;
+ }
+ }
+ } else if (!exists) { /* No remote copy */
+ block->hostsList.push_back(dir->cct->_conf->rgw_local_cache_address);
+ block->cacheObj.hostsList.push_back(dir->cct->_conf->rgw_local_cache_address);
+ if (dir->set(block, y) < 0)
+ return -1;
+
+ this->policyDriver->get_cache_policy()->update(dpp, oid, 0, bl.length(), "", y);
+ if (cacheDriver->put(dpp, oid, bl, bl.length(), attrs, y) < 0)
+ return -1;
+ return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(age), y);
+ } else {
+ return -1;
+ }
+ }
+ }
+
rgw::d4n::CacheBlock* block;
rgw::d4n::BlockDirectory* dir;
rgw::d4n::PolicyDriver* policyDriver;
spawn::spawn(io, [this] (spawn::yield_context yield) {
std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield}));
- policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
-
- /* Change cache age for testing purposes */
- {
- boost::system::error_code ec;
- request req;
- req.push("HSET", "lfuda", "age", "5");
- response<int> resp;
-
- conn->async_exec(req, resp, yield[ec]);
-
- ASSERT_EQ((bool)ec, false);
- EXPECT_EQ(std::get<0>(resp).value(), 0);
- }
+ policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield});
- ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+ ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
dir->shutdown();
cacheDriver->shutdown();
boost::system::error_code ec;
request req;
- req.push("HGET", "RedisCache/testName", "localWeight");
+ req.push("HGET", "RedisCache/testBucket_testName_0_0", "localWeight");
req.push("FLUSHALL");
response<std::string, boost::redis::ignore_t> resp;
ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield}));
std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size);
ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield}));
- policyDriver->get_cache_policy()->insert(env->dpp, victimKey, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
+ policyDriver->get_cache_policy()->update(env->dpp, victimKey, 0, bl.length(), "", optional_yield{io, yield});
/* Remote block */
block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */
ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
- ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+ ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
dir->shutdown();
cacheDriver->shutdown();
boost::system::error_code ec;
request req;
req.push("EXISTS", "RedisCache/" + victimKey);
- req.push("HGET", victimKey, "globalWeight");
+ req.push("EXISTS", victimKey, "globalWeight");
req.push("HGET", key, "globalWeight");
req.push("FLUSHALL");
- response<int, std::string, std::string,
- std::string, boost::redis::ignore_t> resp;
+ response<int, int, std::string, std::string,
+ boost::redis::ignore_t> resp;
conn->async_exec(req, resp, yield[ec]);
ASSERT_EQ((bool)ec, false);
EXPECT_EQ(std::get<0>(resp).value(), 0);
- EXPECT_EQ(std::get<1>(resp).value(), "5");
- EXPECT_EQ(std::get<2>(resp).value(), "0");
+ EXPECT_EQ(std::get<1>(resp).value(), 0);
+ EXPECT_EQ(std::get<2>(resp).value(), "5");
conn->cancel();
});
TEST_F(LFUDAPolicyFixture, BackendGetBlockYield)
{
spawn::spawn(io, [this] (spawn::yield_context yield) {
- ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+ ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
dir->shutdown();
cacheDriver->shutdown();