-#include "../../../common/async/yield_context.h"
#include "d4n_policy.h"
+
+#include <boost/lexical_cast.hpp>
+#include "../../../common/async/yield_context.h"
#include "common/async/blocked_completion.h"
namespace rgw { namespace d4n {
}
if (!std::get<0>(resp).value()) {
- if (!set_age(0, y)) /* Initialize age */
+ if (set_age(0, y)) /* Initialize age */
return 0;
else
return -1;
}
}
-int LFUDAPolicy::set_global_weight(std::string key, int weight, optional_yield y) {
- try {
- boost::system::error_code ec;
- response<int> resp;
- request req;
- req.push("HSET", key, "globalWeight", std::to_string(weight));
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec)
- return {};
-
- return std::get<0>(resp).value(); /* Returns number of fields set */
- } catch(std::exception &e) {
- return -1;
- }
-}
-
-int LFUDAPolicy::get_global_weight(std::string key, optional_yield y) {
- try {
- boost::system::error_code ec;
- response<std::string> resp;
- request req;
- req.push("HGET", key, "globalWeight");
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec)
- return -1;
-
- return std::stoi(std::get<0>(resp).value());
- } catch(std::exception &e) {
- return -1;
- }
-}
-
int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y) {
try {
boost::system::error_code ec;
boost::system::error_code ec;
response<int> resp;
request req;
- req.push("HSET", "lfuda", "minAvgWeight:weight", cacheLocation);
+ req.push("HSET", "lfuda", "minAvgWeight:weight", boost::lexical_cast<int>(weight));
redis_exec(conn, ec, req, resp, y);
}
if (!std::get<0>(resp).value()) {
- if (!set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */, y)) { /* Initialize minimum average weight */
- return INT_MAX;
+ // Is int_max what we want here? -Sam
+ if (set_min_avg_weight(0, ""/* local cache location or keep empty? */, y)) { /* Initialize minimum average weight */
+ return 0;
} else {
return -1;
}
}
CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
- #if 0
- std::vector<rgw::cache::Entry> entries = cacheNode->list_entries(dpp);
- std::string victimName;
- int minWeight = INT_MAX;
+ if (entries_map.empty())
+ return {};
- for (auto it = entries.begin(); it != entries.end(); ++it) {
- std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight", y); // should represent block -Sam
+ auto it = std::min_element(std::begin(entries_map), std::end(entries_map),
+ [](const auto& l, const auto& r) { return l.second->localWeight < r.second->localWeight; });
/* Get victim cache block */
CacheBlock victim;
victim.cacheObj.bucketName = cacheNode->get_attr(dpp, victim.cacheObj.objName, "bucket_name", y); // generalize for other cache backends -Sam
victim.blockID = 0; // find way to get ID -Sam
- if (ret < 0)
- return {};
- } else if (std::stoi(localWeightStr) < minWeight) {
- minWeight = std::stoi(localWeightStr);
- victimName = it->key;
- }
+ if (dir->get(&victim, y) < 0) {
+ return {};
}
- /* Get victim cache block */
- CacheBlock victimBlock;
- victimBlock.cacheObj.objName = victimName;
- BlockDirectory blockDir(io);
- blockDir.init(cct, dpp);
-
- int ret = blockDir.get(&victimBlock, y);
+ return victim;
+}
- if (ret < 0)
- return {};
- #endif
- CacheBlock victimBlock;
- return victimBlock;
+void LFUDAPolicy::shutdown() {
+ dir->shutdown();
+
+ // call cancel() on the connection's executor
+ boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
}
int LFUDAPolicy::exist_key(std::string key, optional_yield y) {
- response<int> resp;
-
- try {
- boost::system::error_code ec;
- request req;
- req.push("EXISTS", key);
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec)
- return false;
- } catch(std::exception &e) {}
+ if (entries_map.count(key) != 0) {
+ return true;
+ }
- return std::get<0>(resp).value();
+ return false;
}
int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
- std::string key = "rgw-object:" + block->cacheObj.objName + ":directory";
- std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight", y); // change to block name eventually -Sam
- int localWeight = -1;
response<std::string> resp;
-
- if (localWeightStr.empty()) { // figure out where to set local weight -Sam
- int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age(y)), y);
- localWeight = get_age(y);
-
- if (ret < 0)
- return -1;
- } else {
- localWeight = std::stoi(localWeightStr);
- }
-
int age = get_age(y);
- if (exist_key(key, y)) { /* Local copy */
- localWeight += age;
+ if (exist_key(block->cacheObj.objName, y)) { /* Local copy */
+ auto it = entries_map.find(block->cacheObj.objName); // change to block name eventually -Sam
+ it->second->localWeight += age;
+ return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
} else {
uint64_t freeSpace = cacheNode->get_free_space(dpp);
- while (freeSpace < block->size) /* Not enough space in local cache */
- freeSpace += eviction(dpp, cacheNode, y);
-
- if (exist_key(key, y)) { /* Remote copy */
- try {
- boost::system::error_code ec;
- request req;
- req.push("HGET", key, "blockHosts");
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec)
- return -1;
- } catch(std::exception &e) {
- return -1;
- }
- } else {
- return -2;
+ while (freeSpace < block->size) { /* Not enough space in local cache */
+ if (int ret = eviction(dpp, cacheNode, y) > 0)
+ freeSpace += ret;
+ else
+ return -1;
}
- // should not hold local cache IP if in this else statement -Sam
- if (std::get<0>(resp).value().length() > 0) { /* Remote copy */
- int globalWeight = get_global_weight(key, y);
- globalWeight += age;
-
- if (set_global_weight(key, globalWeight, y))
+ std::string key; // = dir->build_index(block);
+ int exists = dir->exist_key(key, y);
+ if (exists > 0) { /* Remote copy */
+ if (dir->get(block, y) < 0) {
return -1;
- } else { /* No remote copy */
+ } else {
+ if (!block->hostsList.empty()) {
+ block->globalWeight += age;
+
+ if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
+ } else {
+ return -1;
+ }
+ }
+ } else if (!exists) { /* No remote copy */
+ // how to get bufferlist data? -Sam
// do I need to add the block to the local cache here? -Sam
- // update hosts list for block as well? check read workflow -Sam
- localWeight += age;
- return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(localWeight), y);
+ // update hosts list for block as well?
+ // insert entry here? -Sam
+ // localWeight += age;
+ //return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
+ return 0;
+ } else {
+ return -1;
}
- }
-
- return 0;
+ }
}
uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
if (victim.cacheObj.objName.empty()) {
ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
- return -1;
+ return 0; /* Return zero for failure */
}
- std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory";
- int globalWeight = get_global_weight(key, y);
- int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight", y)); // change to block name eventually -Sam
- int avgWeight = get_min_avg_weight(y);
- response<std::string> resp;
-
- if (exist_key(key, y)) {
- try {
- boost::system::error_code ec;
- request req;
- req.push("HGET", key, "blockHosts");
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec)
- return -1;
- } catch(std::exception &e) {
- return -1;
- }
- } else {
- return -2;
+ auto it = entries_map.find(victim.cacheObj.objName); // change to block name eventually -Sam
+ if (it == entries_map.end()) {
+ return 0;
}
- if (std::get<0>(resp).value().empty()) { /* Last copy */
- if (globalWeight > 0) {
- localWeight += globalWeight;
- int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight), y);
+ int avgWeight = get_min_avg_weight(y);
+ if (avgWeight < 0) {
+ return 0;
+ }
- if (!ret)
- ret = set_global_weight(key, 0, y);
- else
- return -1;
+ if (victim.hostsList.size() == 1 && victim.hostsList[0] == "127.0.0.1:6379" /* local cache address */) { /* Last copy */
+ if (victim.globalWeight) {
+ it->second->localWeight += victim.globalWeight;
+ if (cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y) < 0) {
+ return 0;
+ }
- if (ret)
- return -1;
+ victim.globalWeight = 0;
+ if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) {
+ return 0;
+ }
}
- if (avgWeight < 0)
- return -1;
-
- if (localWeight > avgWeight) {
- // push block to remote cache
+ if (it->second->localWeight > avgWeight) {
+ // TODO: push victim block to remote cache
}
}
- globalWeight += localWeight;
-
- if (set_global_weight(key, globalWeight, y))
- return -2;
+ victim.globalWeight += it->second->localWeight;
+ if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { // just have one update? -Sam
+ return 0;
+ }
ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
- int ret = cacheNode->del(dpp, victim.cacheObj.objName, y);
- if (!ret) {
- //ret = set_min_avg_weight(avgWeight - (localWeight/entries_map.size()), ""/*local cache location*/, y); // Where else must this be set? -Sam
+ if (cacheNode->del(dpp, victim.cacheObj.objName, y) < 0) {//} && dir->remove_host(&victim, ""/* local cache address */, y) < 0) {
+ return 0;
+ } else {
+ uint64_t num_entries = entries_map.size();
- if (!ret) {
- int age = get_age(y);
- age = std::max(localWeight, age);
- ret = set_age(age, y);
-
- if (ret)
- return -1;
+ if (!avgWeight) {
+ if (set_min_avg_weight(0, ""/*local cache location*/, y) < 0) // Where else must this be set? -Sam
+ return 0;
} else {
- return -1;
+ if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), ""/*local cache location*/, y) < 0) { // Where else must this be set? -Sam
+ return 0;
+ }
+ int age = get_age(y);
+ age = std::max(it->second->localWeight, age);
+ if (set_age(age, y) < 0)
+ return 0;
}
- } else {
- return -1;
}
- return victim.size;
+ return victim.size; // this doesn't account for the additional attributes that were removed and need to be set with the new block -Sam
}
void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
{
+ erase(dpp, key);
+ int age = get_age(y);
+ assert(age > -1);
+
+ LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, age);
+ entries_lfuda_list.push_back(*e);
+ entries_map.emplace(key, e);
}
bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
{
- return false;
-}
+ auto p = entries_map.find(key);
+ if (p == entries_map.end()) {
+ return false;
+ }
-void LFUDAPolicy::shutdown()
-{
- // call cancel() on the connection's executor
- boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
+ entries_map.erase(p);
+ entries_lfuda_list.erase_and_dispose(entries_lfuda_list.iterator_to(*(p->second)), LFUDA_Entry_delete_disposer());
+ return true;
}
int LRUPolicy::exist_key(std::string key, optional_yield y)
{
erase(dpp, key);
- Entry *e = new Entry(key, offset, len, ""); // update version later -Sam
+ Entry *e = new Entry(key, offset, len, version);
entries_lru_list.push_back(*e);
entries_map.emplace(key, e);
}
cachePolicy = new LRUPolicy();
return 0;
}
+
return -1;
}
uint64_t offset;
uint64_t len;
std::string version;
- Entry(std::string& key, uint64_t offset, uint64_t len, std:: string version) : key(key), offset(offset),
- len(len), version(version) {}
+ Entry(std::string& key, uint64_t offset, uint64_t len, std::string version) : key(key), offset(offset),
+ len(len), version(version) {}
};
//The disposer object function
delete e;
}
};
- typedef boost::intrusive::list<Entry> List;
-
- //cpp_redis::client client;
- //Address addr;
public:
CephContext* cct;
class LFUDAPolicy : public CachePolicy {
private:
+ struct LFUDAEntry : public Entry {
+ int localWeight;
+ LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string version, int localWeight) : Entry(key, offset, len, version),
+ localWeight(localWeight) {}
+ };
+
+ struct LFUDA_Entry_delete_disposer : public Entry_delete_disposer {
+ void operator()(LFUDAEntry *e) {
+ delete e;
+ }
+ };
+ typedef boost::intrusive::list<LFUDAEntry> List;
+
+ std::unordered_map<std::string, LFUDAEntry*> entries_map;
+
net::io_context& io;
std::shared_ptr<connection> conn;
+ List entries_lfuda_list;
+ BlockDirectory* dir;
+
+ int set_age(int age, optional_yield y);
+ int get_age(optional_yield y);
+ int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y);
+ int get_min_avg_weight(optional_yield y);
+ CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y);
public:
LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) {
conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
+ dir = new BlockDirectory{io};
}
~LFUDAPolicy() {
//delete dir;
shutdown();
}
- int set_age(int age, optional_yield y);
- int get_age(optional_yield y);
- int set_global_weight(std::string key, int weight, optional_yield y);
- int get_global_weight(std::string key, optional_yield y);
- int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y);
- int get_min_avg_weight(optional_yield y);
- CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y);
-
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) {
this->cct = cct;
return -EDESTADDRREQ;
}
+ dir->init(cct, dpp);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
return 0;
class LRUPolicy : public CachePolicy {
private:
- List entries_lru_list;
+ typedef boost::intrusive::list<Entry> List;
+
std::unordered_map<std::string, Entry*> entries_map;
std::mutex lru_lock;
+ List entries_lru_list;
public:
LRUPolicy() = default;