return std::get<0>(resp).value();
}
-void ObjectDirectory::shutdown()
-{
- // call cancel() on the connection's executor
- boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
-}
-
int ObjectDirectory::set(CacheObj* object, optional_yield y)
{
std::string key = build_index(object);
return std::get<0>(resp).value();
}
-void BlockDirectory::shutdown()
-{
- // call cancel() on the connection's executor
- boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
-}
-
int BlockDirectory::set(CacheBlock* block, optional_yield y)
{
std::string key = build_index(block);
class ObjectDirectory: public Directory {
public:
- ObjectDirectory(net::io_context& io_context) {
- conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
- }
- ~ObjectDirectory() {
- shutdown();
- }
+ ObjectDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
- int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+ void init(CephContext* cct) {
this->cct = cct;
-
- config cfg;
- cfg.addr.host = cct->_conf->rgw_d4n_host;
- cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
- cfg.clientname = "D4N.ObjectDir";
-
- if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
- ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
- return -EDESTADDRREQ;
- }
-
- conn->async_run(cfg, {}, net::consign(net::detached, conn));
-
- return 0;
}
int exist_key(CacheObj* object, optional_yield y);
- void shutdown();
int set(CacheObj* object, optional_yield y);
int get(CacheObj* object, optional_yield y);
class BlockDirectory: public Directory {
public:
- BlockDirectory(net::io_context& io_context) {
- conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
- }
- ~BlockDirectory() {
- shutdown();
- }
+ BlockDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
- int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+ void init(CephContext* cct) {
this->cct = cct;
-
- config cfg;
- cfg.addr.host = cct->_conf->rgw_d4n_host;
- cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
- cfg.clientname = "D4N.BlockDir";
-
- if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
- ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
- return -EDESTADDRREQ;
- }
-
- conn->async_run(cfg, {}, net::consign(net::detached, conn));
-
- return 0;
}
int exist_key(CacheBlock* block, optional_yield y);
- void shutdown();
int set(CacheBlock* block, optional_yield y);
int get(CacheBlock* block, optional_yield y);
return victim;
}
-void LFUDAPolicy::shutdown() {
- dir->shutdown();
-
- // call cancel() on the connection's executor
- boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
-}
-
int LFUDAPolicy::exist_key(std::string key) {
const std::lock_guard l(lfuda_lock);
if (entries_map.count(key) != 0) {
CachePolicy() {}
virtual ~CachePolicy() = default;
- virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; }
+ virtual void init(CephContext *cct) = 0;
virtual int exist_key(std::string key) = 0;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
std::unordered_map<std::string, LFUDAEntry*> entries_map;
std::mutex lfuda_lock;
- net::io_context& io;
std::shared_ptr<connection> conn;
BlockDirectory* dir;
rgw::cache::CacheDriver* cacheDriver;
CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
public:
- LFUDAPolicy(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), io(io_context), cacheDriver{cacheDriver} {
- conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
- dir = new BlockDirectory{io};
+ LFUDAPolicy(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(),
+ conn(conn),
+ cacheDriver(cacheDriver)
+ {
+ dir = new BlockDirectory{conn};
}
~LFUDAPolicy() {
- shutdown();
delete dir;
}
- virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) {
- std::string address = cct->_conf->rgw_local_cache_address;
-
- config cfg;
- cfg.addr.host = address.substr(0, address.find(":"));
- cfg.addr.port = address.substr(address.find(":") + 1, address.length());
- cfg.clientname = "D4N.Policy";
-
- if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
- return -EDESTADDRREQ;
- }
-
- dir->init(cct, dpp);
- conn->async_run(cfg, {}, net::consign(net::detached, conn));
-
- return 0;
+ virtual void init(CephContext *cct) {
+ dir->init(cct);
}
virtual int exist_key(std::string key) override;
//virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
return nullptr;
return it->second;
}
- void shutdown();
};
class LRUPolicy : public CachePolicy {
public:
LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
+ virtual void init(CephContext *cct) {}
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
CachePolicy* cachePolicy;
public:
- PolicyDriver(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName)
+ PolicyDriver(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName)
{
if (policyName == "lfuda") {
- cachePolicy = new LFUDAPolicy(io_context, cacheDriver);
+ cachePolicy = new LFUDAPolicy(conn, cacheDriver);
} else if (policyName == "lru") {
cachePolicy = new LRUPolicy(cacheDriver);
}
D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next)
{
+ conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
+
rgw::cache::Partition partition_info;
partition_info.location = g_conf()->rgw_d4n_l1_datacache_persistent_path;
partition_info.name = "d4n";
partition_info.type = "read-cache";
partition_info.size = g_conf()->rgw_d4n_l1_datacache_size;
- //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
cacheDriver = new rgw::cache::SSDDriver(partition_info);
- objDir = new rgw::d4n::ObjectDirectory(io_context);
- blockDir = new rgw::d4n::BlockDirectory(io_context);
- policyDriver = new rgw::d4n::PolicyDriver(io_context, cacheDriver, "lfuda");
+ objDir = new rgw::d4n::ObjectDirectory(conn);
+ blockDir = new rgw::d4n::BlockDirectory(conn);
+ policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda");
}
D4NFilterDriver::~D4NFilterDriver()
int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
{
- FilterDriver::initialize(cct, dpp);
+ namespace net = boost::asio;
+ using boost::redis::config;
+
+ std::string address = cct->_conf->rgw_local_cache_address;
+ config cfg;
+ cfg.addr.host = address.substr(0, address.find(":"));
+ cfg.addr.port = address.substr(address.find(":") + 1, address.length());
+ cfg.clientname = "D4N.Filter";
+
+ if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
+ ldpp_dout(dpp, 10) << "D4NFilterDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
+ return -EDESTADDRREQ;
+ }
- cacheDriver->initialize(dpp);
+ conn->async_run(cfg, {}, net::consign(net::detached, conn));
- objDir->init(cct, dpp);
- blockDir->init(cct, dpp);
+ FilterDriver::initialize(cct, dpp);
- policyDriver->get_cache_policy()->init(cct, dpp);
+ cacheDriver->initialize(dpp);
+ objDir->init(cct);
+ blockDir->init(cct);
+ policyDriver->get_cache_policy()->init(cct);
return 0;
}
namespace rgw { namespace sal {
+using boost::redis::connection;
+
class D4NFilterDriver : public FilterDriver {
private:
+ std::shared_ptr<connection> conn;
rgw::cache::CacheDriver* cacheDriver;
rgw::d4n::ObjectDirectory* objDir;
rgw::d4n::BlockDirectory* blockDir;
class ObjectDirectoryFixture: public ::testing::Test {
protected:
virtual void SetUp() {
- dir = new rgw::d4n::ObjectDirectory{io};
+ conn = std::make_shared<connection>(boost::asio::make_strand(io));
+ dir = new rgw::d4n::ObjectDirectory{conn};
obj = new rgw::d4n::CacheObj{
.objName = "testName",
.bucketName = "testBucket",
.hostsList = { env->redisHost }
};
- conn = new connection{boost::asio::make_strand(io)};
-
ASSERT_NE(obj, nullptr);
ASSERT_NE(dir, nullptr);
ASSERT_NE(conn, nullptr);
- dir->init(env->cct, env->dpp);
+ dir->init(env->cct);
/* Run fixture's connection */
config cfg;
}
virtual void TearDown() {
- delete conn;
delete obj;
delete dir;
}
rgw::d4n::ObjectDirectory* dir;
net::io_context io;
- connection* conn;
+ std::shared_ptr<connection> conn;
std::vector<std::string> vals{"testName", "testBucket", "", "0", env->redisHost};
std::vector<std::string> fields{"objName", "bucketName", "creationTime", "dirty", "objHosts"};
class BlockDirectoryFixture: public ::testing::Test {
protected:
virtual void SetUp() {
- dir = new rgw::d4n::BlockDirectory{io};
+ conn = std::make_shared<connection>(boost::asio::make_strand(io));
+ dir = new rgw::d4n::BlockDirectory{conn};
block = new rgw::d4n::CacheBlock{
.cacheObj = {
.objName = "testName",
.hostsList = { env->redisHost }
};
- conn = new connection{boost::asio::make_strand(io)};
-
ASSERT_NE(block, nullptr);
ASSERT_NE(dir, nullptr);
ASSERT_NE(conn, nullptr);
- dir->init(env->cct, env->dpp);
+ dir->init(env->cct);
/* Run fixture's connection */
config cfg;
}
virtual void TearDown() {
- delete conn;
delete block;
delete dir;
}
rgw::d4n::BlockDirectory* dir;
net::io_context io;
- connection* conn;
+ std::shared_ptr<connection> conn;
std::vector<std::string> vals{"0", "", "0", "0", env->redisHost,
"testName", "testBucket", "", "0", env->redisHost};
{
spawn::spawn(io, [this] (spawn::yield_context yield) {
ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
- dir->shutdown();
boost::system::error_code ec;
request req;
ASSERT_EQ(0, dir->get(obj, optional_yield{io, yield}));
EXPECT_EQ(obj->objName, "newoid");
- dir->shutdown();
{
boost::system::error_code ec;
spawn::spawn(io, [this] (spawn::yield_context yield) {
ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", optional_yield{io, yield}));
- dir->shutdown();
boost::system::error_code ec;
request req;
}
ASSERT_EQ(0, dir->del(obj, optional_yield{io, yield}));
- dir->shutdown();
{
boost::system::error_code ec;
ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", optional_yield{io, yield}));
ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", optional_yield{io, yield}));
- dir->shutdown();
boost::system::error_code ec;
request req;
{
spawn::spawn(io, [this] (spawn::yield_context yield) {
ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
- dir->shutdown();
boost::system::error_code ec;
request req;
ASSERT_EQ(0, dir->get(block, optional_yield{io, yield}));
EXPECT_EQ(block->cacheObj.objName, "newoid");
- dir->shutdown();
{
boost::system::error_code ec;
spawn::spawn(io, [this] (spawn::yield_context yield) {
ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
ASSERT_EQ(0, dir->copy(block, "copyTestName", "copyBucketName", optional_yield{io, yield}));
- dir->shutdown();
boost::system::error_code ec;
request req;
}
ASSERT_EQ(0, dir->del(block, optional_yield{io, yield}));
- dir->shutdown();
{
boost::system::error_code ec;
ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
ASSERT_EQ(0, dir->update_field(block, "objName", "newTestName", optional_yield{io, yield}));
ASSERT_EQ(0, dir->update_field(block, "blockHosts", "127.0.0.1:5000", optional_yield{io, yield}));
- dir->shutdown();
boost::system::error_code ec;
request req;
}
ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6000", optional_yield{io, yield}));
- dir->shutdown();
{
boost::system::error_code ec;
.hostsList = { env->redisHost }
};
+ conn = std::make_shared<connection>(boost::asio::make_strand(io));
rgw::cache::Partition partition_info{ .location = "RedisCache", .size = 1000 };
cacheDriver = new rgw::cache::RedisDriver{io, partition_info};
- policyDriver = new rgw::d4n::PolicyDriver(io, cacheDriver, "lfuda");
- dir = new rgw::d4n::BlockDirectory{io};
- conn = new connection{boost::asio::make_strand(io)};
+ policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda");
+ dir = new rgw::d4n::BlockDirectory{conn};
ASSERT_NE(dir, nullptr);
ASSERT_NE(cacheDriver, nullptr);
ASSERT_NE(policyDriver, nullptr);
ASSERT_NE(conn, nullptr);
- dir->init(env->cct, env->dpp);
+ dir->init(env->cct);
cacheDriver->initialize(env->dpp);
- policyDriver->get_cache_policy()->init(env->cct, env->dpp);
+ policyDriver->get_cache_policy()->init(env->cct);
bl.append("test data");
bufferlist attrVal;
}
virtual void TearDown() {
- delete conn;
delete block;
delete dir;
delete cacheDriver;
rgw::cache::RedisDriver* cacheDriver;
net::io_context io;
- connection* conn;
+ std::shared_ptr<connection> conn;
bufferlist bl;
rgw::sal::Attrs attrs;
ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
- dir->shutdown();
cacheDriver->shutdown();
- dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->shutdown();
boost::system::error_code ec;
request req;
ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
- dir->shutdown();
cacheDriver->shutdown();
- dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->shutdown();
std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
boost::system::error_code ec;
spawn::spawn(io, [this] (spawn::yield_context yield) {
ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
- dir->shutdown();
cacheDriver->shutdown();
- dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->shutdown();
boost::system::error_code ec;
request req;