#include "d4n_policy.h"
-#include <boost/lexical_cast.hpp>
#include "../../../common/async/yield_context.h"
#include "common/async/blocked_completion.h"
#include "common/dout.h"
template <typename Handler, typename Response>
void operator()(Handler handler, const boost::redis::request& req, Response& resp)
{
- auto h = boost::asio::consign(std::move(handler), conn);
- return boost::asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] {
+ auto h = asio::consign(std::move(handler), conn);
+ return asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] {
c->async_exec(req, resp, std::move(h));
});
}
const boost::redis::request& req,
Response& resp, CompletionToken&& token)
{
- return boost::asio::async_initiate<CompletionToken,
+ return asio::async_initiate<CompletionToken,
void(boost::system::error_code, std::size_t)>(
initiate_exec{std::move(conn)}, token, req, resp);
}
-template <typename T>
-void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response<T>& resp, optional_yield y)
+template <typename... Types>
+void redis_exec(std::shared_ptr<connection> conn,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::response<Types...>& resp, optional_yield y)
{
if (y) {
auto yield = y.get_yield_context();
}
}
-int LFUDAPolicy::set_age(int age, optional_yield y) {
+int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) {
+ dir->init(cct);
+ int result = 0;
+ response<int, int, int, int> resp;
+
try {
boost::system::error_code ec;
- response<int> resp;
request req;
- req.push("HSET", "lfuda", "age", std::to_string(age));
-
+ req.push("HEXISTS", "lfuda", "age");
+ req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum)); /* New cache node will always have the minimum average weight */
+ req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size()));
+ req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_local_cache_address);
+
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
- return std::get<0>(resp).value(); /* Returns number of fields set */
+ result = std::min(std::get<1>(resp).value(), std::min(std::get<2>(resp).value(), std::get<3>(resp).value()));
} catch (std::exception &e) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
-}
-
-int LFUDAPolicy::get_age(optional_yield y) {
- response<int> resp;
- try {
- boost::system::error_code ec;
- request req;
- req.push("HEXISTS", "lfuda", "age");
-
- redis_exec(conn, ec, req, resp, y);
+ if (!std::get<0>(resp).value()) { /* Only set maximum age if it doesn't exist */
+ try {
+ boost::system::error_code ec;
+ response<int> value;
+ request req;
+ req.push("HSET", "lfuda", "age", age);
+
+ redis_exec(conn, ec, req, value, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
- if (ec) {
- return -ec.value();
+ result = std::min(result, std::get<0>(value).value());
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
- } catch (std::exception &e) {
- return -EINVAL;
}
- if (!std::get<0>(resp).value()) {
- if (set_age(1, y)) /* Initialize age */
- return 1;
- else
- return -ENOENT;
- }
+ asio::co_spawn(io_context.get_executor(),
+ redis_sync(dpp, y), asio::detached);
+
+ return result;
+}
+
+int LFUDAPolicy::age_sync(const DoutPrefixProvider* dpp, optional_yield y) {
+ response<std::string> resp;
try {
boost::system::error_code ec;
- response<std::string> value;
request req;
req.push("HGET", "lfuda", "age");
- redis_exec(conn, ec, req, value, y);
+ redis_exec(conn, ec, req, resp, y);
if (ec) {
return -ec.value();
}
-
- return std::stoi(std::get<0>(value).value());
} catch (std::exception &e) {
return -EINVAL;
}
-}
-
-int LFUDAPolicy::set_local_weight_sum(int weight, optional_yield y) {
- weight = weight > 0 ? weight : 0;
- try {
- boost::system::error_code ec;
- response<int> resp;
- request req;
- req.push("HSET", dir->cct->_conf->rgw_local_cache_address, "localWeights", std::to_string(weight));
+ if (age > std::stoi(std::get<0>(resp).value()) || std::get<0>(resp).value().empty()) { /* Set new maximum age */
+ try {
+ boost::system::error_code ec;
+ request req;
+ response<int> value;
+ req.push("HSET", "lfuda", "age", age);
+ redis_exec(conn, ec, req, resp, y);
- redis_exec(conn, ec, req, resp, y);
+ if (ec) {
+ return -ec.value();
+ }
- if (ec) {
- return -ec.value();
+ return std::get<0>(value).value();
+ } catch (std::exception &e) {
+ return -EINVAL;
}
-
- return std::get<0>(resp).value(); /* Returns number of fields set */
- } catch (std::exception &e) {
- return -EINVAL;
+ } else {
+ age = std::stoi(std::get<0>(resp).value());
+ return 0;
}
}
-int LFUDAPolicy::get_local_weight_sum(optional_yield y) {
- response<int> resp;
+int LFUDAPolicy::local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y) {
+ int result;
- try {
- boost::system::error_code ec;
- request req;
- req.push("HEXISTS", dir->cct->_conf->rgw_local_cache_address, "localWeights");
+ if (fabs(weightSum - postedSum) > (postedSum * 0.1)) {
+ response<std::string, std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", "lfuda", "minLocalWeights_sum");
+ req.push("HGET", "lfuda", "minLocalWeights_size");
+
+ redis_exec(conn, ec, req, resp, y);
- if (ec) {
- return -ec.value();
+ if (ec) {
+ return -ec.value();
+ }
+ } catch (std::exception &e) {
+ return -EINVAL;
}
- } catch (std::exception &e) {
- return -EINVAL;
- }
-
- if (!std::get<0>(resp).value()) {
- int sum = 0;
- for (auto& entry : entries_map)
- sum += entry.second->localWeight;
-
- if (int ret = set_local_weight_sum(sum, y) < 0) { /* Initialize */
- return ret;
+
+ float minAvgWeight = std::stof(std::get<0>(resp).value()) / std::stof(std::get<1>(resp).value());
+
+ if ((static_cast<float>(weightSum) / static_cast<float>(entries_map.size())) < minAvgWeight) { /* Set new minimum weight */
+ try {
+ boost::system::error_code ec;
+ request req;
+ response<int, int, int> value;
+ req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum));
+ req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size()));
+ req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_local_cache_address);
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ return -ec.value();
+ }
+
+ result = std::min(std::get<0>(value).value(), std::get<1>(value).value());
+ result = std::min(result, std::get<2>(value).value());
+ } catch (std::exception &e) {
+ return -EINVAL;
+ }
} else {
- return sum;
+ weightSum = std::stoi(std::get<0>(resp).value());
+ postedSum = std::stoi(std::get<0>(resp).value());
}
}
- try {
+ try { /* Post update for local cache */
boost::system::error_code ec;
- response<std::string> value;
request req;
- req.push("HGET", dir->cct->_conf->rgw_local_cache_address, "localWeights");
-
- redis_exec(conn, ec, req, value, y);
+ response<int, int> resp;
+ req.push("HSET", dpp->get_cct()->_conf->rgw_local_cache_address, "avgLocalWeight_sum", std::to_string(weightSum));
+ req.push("HSET", dpp->get_cct()->_conf->rgw_local_cache_address, "avgLocalWeight_size", std::to_string(entries_map.size()));
+ redis_exec(conn, ec, req, resp, y);
if (ec) {
return -ec.value();
}
- return std::stoi(std::get<0>(value).value());
+ result = std::min(std::get<0>(resp).value(), std::get<1>(resp).value());
} catch (std::exception &e) {
return -EINVAL;
}
+
+ return result;
+}
+
+asio::awaitable<void> LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, optional_yield y) {
+ rthread_timer.emplace(co_await asio::this_coro::executor);
+ co_await asio::this_coro::throw_if_cancelled(true);
+ co_await asio::this_coro::reset_cancellation_state(
+ asio::enable_terminal_cancellation());
+
+ for (;;) try {
+ /* Update age */
+ if (int ret = age_sync(dpp, y) < 0) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl;
+ }
+
+ /* Update minimum local weight sum */
+ if (int ret = local_weight_sync(dpp, y) < 0) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl;
+ }
+
+ int interval = dpp->get_cct()->_conf->rgw_lfuda_sync_frequency;
+ rthread_timer->expires_after(std::chrono::seconds(interval));
+ co_await rthread_timer->async_wait(asio::use_awaitable);
+ } catch (sys::system_error& e) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
+
+ if (e.code() == asio::error::operation_aborted) {
+ break;
+ } else {
+ continue;
+ }
+ }
}
CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optional_yield y) {
return false;
}
-#if 0
-int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
- response<std::string> resp;
- int age = get_age(y);
-
- if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
- auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size));
- it->second->localWeight += age;
- return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
- } else {
- if (eviction(dpp, block->size, cacheDriver, y) < 0)
- return -1;
-
- int exists = dir->exist_key(block, y);
- if (exists > 0) { /* Remote copy */
- if (dir->get(block, y) < 0) {
- return -1;
- } else {
- if (!block->hostsList.empty()) {
- block->globalWeight += age;
-
- if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) {
- return -1;
- } else {
- return 0;
- }
- } else {
- return -1;
- }
- }
- } else if (!exists) { /* No remote copy */
- // localWeight += age;
- //return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
- return 0;
- } else {
- return -1;
- }
- }
-}
-#endif
-
int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) {
uint64_t freeSpace = cacheDriver->get_free_space(dpp);
return -ENOENT;
}
- int avgWeight = get_local_weight_sum(y);
- if (avgWeight < 0) {
- delete victim;
- return avgWeight;
- }
-
- avgWeight /= entries_map.size();
+ int avgWeight = weightSum / entries_map.size();
if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */
if (victim->globalWeight) {
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl;
- int weight = (avgWeight * entries_map.size()) - it->second->localWeight;
- if (int ret = set_local_weight_sum(weight, y) < 0)
- return ret;
+ weightSum = (avgWeight * entries_map.size()) - it->second->localWeight;
- int age = get_age(y);
age = std::max(it->second->localWeight, age);
- if (int ret = set_age(age, y) < 0)
- return ret;
erase(dpp, key, y);
freeSpace = cacheDriver->get_free_space(dpp);
{
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
const std::lock_guard l(lfuda_lock);
-
- int age = get_age(y);
- if (age < 0) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): LFUDAPolicy get_age method failed." << dendl;
- return;
- }
int localWeight = age;
auto entry = find_entry(key);
if (entry != nullptr) {
- entry->localWeight += age;
- localWeight = entry->localWeight;
+ localWeight = entry->localWeight + age;
}
erase(dpp, key, y);
if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(localWeight), y) < 0)
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
- int localWeights = get_local_weight_sum(y);
- if (localWeights < 0) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl;
- return;
- }
-
- localWeights += ((localWeight < 0) ? 0 : localWeight);
- if (set_local_weight_sum(localWeights, y) < 0)
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl;
+ weightSum += ((localWeight < 0) ? 0 : localWeight);
}
bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
return false;
}
- int localWeights = get_local_weight_sum(y);
- if (localWeights < 0) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl;
- return false;
- }
-
- localWeights -= ((p->second->localWeight < 0) ? 0 : p->second->localWeight);
- if (set_local_weight_sum(localWeights, y) < 0) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl;
- return false;
- }
+ weightSum -= ((p->second->localWeight < 0) ? 0 : p->second->localWeight);
entries_heap.erase(p->second->handle);
entries_map.erase(p);
#pragma once
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
#include <boost/heap/fibonacci_heap.hpp>
+#include <boost/system/detail/errc.hpp>
+
#include "d4n_directory.h"
#include "rgw_sal_d4n.h"
#include "rgw_cache_driver.h"
namespace rgw { namespace d4n {
+namespace asio = boost::asio;
+namespace sys = boost::system;
+
class CachePolicy {
protected:
struct Entry : public boost::intrusive::list_base_hook<> {
CachePolicy() {}
virtual ~CachePolicy() = default;
- virtual void init(CephContext *cct) = 0;
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) = 0;
virtual int exist_key(std::string key) = 0;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
Heap entries_heap;
std::unordered_map<std::string, LFUDAEntry*> entries_map;
- std::mutex lfuda_lock;
+ std::mutex lfuda_lock;
+ int age = 1, weightSum = 0, postedSum = 0;
+ optional_yield y = null_yield;
std::shared_ptr<connection> conn;
BlockDirectory* dir;
rgw::cache::CacheDriver* cacheDriver;
+ std::optional<asio::steady_timer> rthread_timer;
- int set_age(int age, optional_yield y);
- int get_age(optional_yield y);
- int set_local_weight_sum(int weight, optional_yield y);
- int get_local_weight_sum(optional_yield y);
CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
+ int age_sync(const DoutPrefixProvider* dpp, optional_yield y);
+ int local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y);
+ asio::awaitable<void> redis_sync(const DoutPrefixProvider* dpp, optional_yield y);
+ void rthread_stop() {
+ std::lock_guard l{lfuda_lock};
+
+ if (rthread_timer) {
+ rthread_timer->cancel();
+ }
+ }
+ LFUDAEntry* find_entry(std::string key) {
+ auto it = entries_map.find(key);
+ if (it == entries_map.end())
+ return nullptr;
+ return it->second;
+ }
public:
LFUDAPolicy(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(),
- conn(conn),
- cacheDriver(cacheDriver)
+ conn(conn),
+ cacheDriver(cacheDriver)
{
dir = new BlockDirectory{conn};
}
~LFUDAPolicy() {
+ rthread_stop();
delete dir;
}
- virtual void init(CephContext *cct) {
- dir->init(cct);
- }
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context);
virtual int exist_key(std::string key) override;
- //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
-
- void set_local_weight(std::string& key, int localWeight);
- LFUDAEntry* find_entry(std::string key) {
- auto it = entries_map.find(key);
- if (it == entries_map.end())
- return nullptr;
- return it->second;
- }
+ void save_y(optional_yield y) { this->y = y; }
};
class LRUPolicy : public CachePolicy {
public:
LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
- virtual void init(CephContext *cct) {}
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) { return 0; }
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
#include "gtest/gtest.h"
#include "gtest/gtest_prod.h"
#include "common/ceph_argparse.h"
+#include "common/async/blocked_completion.h"
#include "rgw_auth_registry.h"
#include "driver/d4n/d4n_policy.h"
.hostsList = { env->redisHost }
};
- conn = std::make_shared<connection>(boost::asio::make_strand(io));
+ conn = std::make_shared<connection>(net::make_strand(io));
rgw::cache::Partition partition_info{ .location = "RedisCache", .size = 1000 };
cacheDriver = new rgw::cache::RedisDriver{io, partition_info};
policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda");
dir->init(env->cct);
cacheDriver->initialize(env->dpp);
- policyDriver->get_cache_policy()->init(env->cct);
bl.append("test data");
bufferlist attrVal;
}
int lfuda(const DoutPrefixProvider* dpp, rgw::d4n::CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
- int age = 5; /* Arbitrary number for testing */
+ int age = 1;
std::string oid = build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size);
if (this->policyDriver->get_cache_policy()->exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
- auto entry = dynamic_cast<rgw::d4n::LFUDAPolicy*>(this->policyDriver->get_cache_policy())->find_entry(oid);
- entry->localWeight += age;
- return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(entry->localWeight), y);
+ policyDriver->get_cache_policy()->update(env->dpp, oid, 0, bl.length(), "", y);
+ return 0;
} else {
if (this->policyDriver->get_cache_policy()->eviction(dpp, block->size, y) < 0)
return -1;
ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield}));
policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield});
- ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+ ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
cacheDriver->shutdown();
boost::system::error_code ec;
request req;
- req.push("HGET", "RedisCache/testBucket_testName_0_0", "localWeight");
+ req.push("HGET", "RedisCache/testBucket_testName_0_0", "user.rgw.localWeight");
req.push("FLUSHALL");
response<std::string, boost::redis::ignore_t> resp;
conn->async_exec(req, resp, yield[ec]);
ASSERT_EQ((bool)ec, false);
- EXPECT_EQ(std::get<0>(resp).value(), "6");
+ EXPECT_EQ(std::get<0>(resp).value(), "2");
conn->cancel();
});
ASSERT_EQ((bool)ec, false);
EXPECT_EQ(std::get<0>(resp).value(), 0);
EXPECT_EQ(std::get<1>(resp).value(), 0);
- EXPECT_EQ(std::get<2>(resp).value(), "5");
+ EXPECT_EQ(std::get<2>(resp).value(), "1");
conn->cancel();
});