From d8209c1e60acaf8233a74452e68b2f415c7ba6e8 Mon Sep 17 00:00:00 2001 From: Samarah Date: Wed, 10 Jan 2024 18:00:23 +0000 Subject: [PATCH] d4n: Add a central Redis connection to the filter to be shared by the rest of the D4N classes and update D4N unit tests appropriately Signed-off-by: Samarah --- src/rgw/driver/d4n/d4n_directory.cc | 12 -------- src/rgw/driver/d4n/d4n_directory.h | 48 +++-------------------------- src/rgw/driver/d4n/d4n_policy.cc | 7 ----- src/rgw/driver/d4n/d4n_policy.h | 37 +++++++--------------- src/rgw/driver/d4n/rgw_sal_d4n.cc | 33 ++++++++++++++------ src/rgw/driver/d4n/rgw_sal_d4n.h | 3 ++ src/test/rgw/test_d4n_directory.cc | 31 +++++-------------- src/test/rgw/test_d4n_policy.cc | 19 ++++-------- 8 files changed, 56 insertions(+), 134 deletions(-) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 856c88be356..abffcd16d86 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -68,12 +68,6 @@ int ObjectDirectory::exist_key(CacheObj* object, optional_yield y) return std::get<0>(resp).value(); } -void ObjectDirectory::shutdown() -{ - // call cancel() on the connection's executor - boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); -} - int ObjectDirectory::set(CacheObj* object, optional_yield y) { std::string key = build_index(object); @@ -332,12 +326,6 @@ int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) return std::get<0>(resp).value(); } -void BlockDirectory::shutdown() -{ - // call cancel() on the connection's executor - boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); -} - int BlockDirectory::set(CacheBlock* block, optional_yield y) { std::string key = build_index(block); diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index d60731f7ed5..cf562dd2730 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -40,32 +40,12 @@ class Directory { class ObjectDirectory: public Directory { public: - ObjectDirectory(net::io_context& io_context) { - conn = std::make_shared(boost::asio::make_strand(io_context)); - } - ~ObjectDirectory() { - shutdown(); - } + ObjectDirectory(std::shared_ptr& conn) : conn(conn) {} - int init(CephContext* cct, const DoutPrefixProvider* dpp) { + void init(CephContext* cct) { this->cct = cct; - - config cfg; - cfg.addr.host = cct->_conf->rgw_d4n_host; - cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port); - cfg.clientname = "D4N.ObjectDir"; - - if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl; - return -EDESTADDRREQ; - } - - conn->async_run(cfg, {}, net::consign(net::detached, conn)); - - return 0; } int exist_key(CacheObj* object, optional_yield y); - void shutdown(); int set(CacheObj* object, optional_yield y); int get(CacheObj* object, optional_yield y); @@ -81,32 +61,12 @@ class ObjectDirectory: public Directory { class BlockDirectory: public Directory { public: - BlockDirectory(net::io_context& io_context) { - conn = std::make_shared(boost::asio::make_strand(io_context)); - } - ~BlockDirectory() { - shutdown(); - } + BlockDirectory(std::shared_ptr& conn) : conn(conn) {} - int init(CephContext* cct, const DoutPrefixProvider* dpp) { + void init(CephContext* cct) { this->cct = cct; - - config cfg; - cfg.addr.host = cct->_conf->rgw_d4n_host; - cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port); - cfg.clientname = "D4N.BlockDir"; - - if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl; - return -EDESTADDRREQ; - } - - conn->async_run(cfg, {}, net::consign(net::detached, conn)); - - return 0; } int exist_key(CacheBlock* block, optional_yield y); - void shutdown(); int set(CacheBlock* block, optional_yield y); int get(CacheBlock* block, optional_yield y); diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index f53aafc445e..8de8bcb87cf 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -194,13 +194,6 @@ CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optiona return victim; } -void LFUDAPolicy::shutdown() { - dir->shutdown(); - - // call cancel() on the connection's executor - boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); -} - int LFUDAPolicy::exist_key(std::string key) { const std::lock_guard l(lfuda_lock); if (entries_map.count(key) != 0) { diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index e7c7781efca..60d55128ab0 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -33,7 +33,7 @@ class CachePolicy { CachePolicy() {} virtual ~CachePolicy() = default; - virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; } + virtual void init(CephContext *cct) = 0; virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0; @@ -65,7 +65,6 @@ class LFUDAPolicy : public CachePolicy { std::unordered_map entries_map; std::mutex lfuda_lock; - net::io_context& io; std::shared_ptr conn; BlockDirectory* dir; rgw::cache::CacheDriver* cacheDriver; @@ -77,32 +76,18 @@ class LFUDAPolicy : public CachePolicy { CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y); public: - LFUDAPolicy(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), io(io_context), cacheDriver{cacheDriver} { - conn = std::make_shared(boost::asio::make_strand(io_context)); - dir = new BlockDirectory{io}; + LFUDAPolicy(std::shared_ptr& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), + conn(conn), + cacheDriver(cacheDriver) + { + dir = new BlockDirectory{conn}; } ~LFUDAPolicy() { - shutdown(); delete dir; } - virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { - std::string address = cct->_conf->rgw_local_cache_address; - - config cfg; - cfg.addr.host = address.substr(0, address.find(":")); - cfg.addr.port = address.substr(address.find(":") + 1, address.length()); - cfg.clientname = "D4N.Policy"; - - if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Endpoint was not configured correctly." << dendl; - return -EDESTADDRREQ; - } - - dir->init(cct, dpp); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); - - return 0; + virtual void init(CephContext *cct) { + dir->init(cct); } virtual int exist_key(std::string key) override; //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; @@ -117,7 +102,6 @@ class LFUDAPolicy : public CachePolicy { return nullptr; return it->second; } - void shutdown(); }; class LRUPolicy : public CachePolicy { @@ -134,6 +118,7 @@ class LRUPolicy : public CachePolicy { public: LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {} + virtual void init(CephContext *cct) {} virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override; @@ -146,10 +131,10 @@ class PolicyDriver { CachePolicy* cachePolicy; public: - PolicyDriver(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName) + PolicyDriver(std::shared_ptr& conn, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName) { if (policyName == "lfuda") { - cachePolicy = new LFUDAPolicy(io_context, cacheDriver); + cachePolicy = new LFUDAPolicy(conn, cacheDriver); } else if (policyName == "lru") { cachePolicy = new LRUPolicy(cacheDriver); } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 4c2508553c6..12946ab430b 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -35,17 +35,18 @@ static inline Object* nextObject(Object* t) D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next) { + conn = std::make_shared(boost::asio::make_strand(io_context)); + rgw::cache::Partition partition_info; partition_info.location = g_conf()->rgw_d4n_l1_datacache_persistent_path; partition_info.name = "d4n"; partition_info.type = "read-cache"; partition_info.size = g_conf()->rgw_d4n_l1_datacache_size; - //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam cacheDriver = new rgw::cache::SSDDriver(partition_info); - objDir = new rgw::d4n::ObjectDirectory(io_context); - blockDir = new rgw::d4n::BlockDirectory(io_context); - policyDriver = new rgw::d4n::PolicyDriver(io_context, cacheDriver, "lfuda"); + objDir = new rgw::d4n::ObjectDirectory(conn); + blockDir = new rgw::d4n::BlockDirectory(conn); + policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda"); } D4NFilterDriver::~D4NFilterDriver() @@ -58,14 +59,28 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { - FilterDriver::initialize(cct, dpp); + namespace net = boost::asio; + using boost::redis::config; + + std::string address = cct->_conf->rgw_local_cache_address; + config cfg; + cfg.addr.host = address.substr(0, address.find(":")); + cfg.addr.port = address.substr(address.find(":") + 1, address.length()); + cfg.clientname = "D4N.Filter"; + + if (!cfg.addr.host.length() || !cfg.addr.port.length()) { + ldpp_dout(dpp, 10) << "D4NFilterDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl; + return -EDESTADDRREQ; + } - cacheDriver->initialize(dpp); + conn->async_run(cfg, {}, net::consign(net::detached, conn)); - objDir->init(cct, dpp); - blockDir->init(cct, dpp); + FilterDriver::initialize(cct, dpp); - policyDriver->get_cache_policy()->init(cct, dpp); + cacheDriver->initialize(dpp); + objDir->init(cct); + blockDir->init(cct); + policyDriver->get_cache_policy()->init(cct); return 0; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 5f785fb65cb..e43c0105265 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -38,8 +38,11 @@ namespace rgw::d4n { namespace rgw { namespace sal { +using boost::redis::connection; + class D4NFilterDriver : public FilterDriver { private: + std::shared_ptr conn; rgw::cache::CacheDriver* cacheDriver; rgw::d4n::ObjectDirectory* objDir; rgw::d4n::BlockDirectory* blockDir; diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 7328212dcff..59aceabd2a0 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -49,7 +49,8 @@ class Environment : public ::testing::Environment { class ObjectDirectoryFixture: public ::testing::Test { protected: virtual void SetUp() { - dir = new rgw::d4n::ObjectDirectory{io}; + conn = std::make_shared(boost::asio::make_strand(io)); + dir = new rgw::d4n::ObjectDirectory{conn}; obj = new rgw::d4n::CacheObj{ .objName = "testName", .bucketName = "testBucket", @@ -58,13 +59,11 @@ class ObjectDirectoryFixture: public ::testing::Test { .hostsList = { env->redisHost } }; - conn = new connection{boost::asio::make_strand(io)}; - ASSERT_NE(obj, nullptr); ASSERT_NE(dir, nullptr); ASSERT_NE(conn, nullptr); - dir->init(env->cct, env->dpp); + dir->init(env->cct); /* Run fixture's connection */ config cfg; @@ -75,7 +74,6 @@ class ObjectDirectoryFixture: public ::testing::Test { } virtual void TearDown() { - delete conn; delete obj; delete dir; } @@ -84,7 +82,7 @@ class ObjectDirectoryFixture: public ::testing::Test { rgw::d4n::ObjectDirectory* dir; net::io_context io; - connection* conn; + std::shared_ptr conn; std::vector vals{"testName", "testBucket", "", "0", env->redisHost}; std::vector fields{"objName", "bucketName", "creationTime", "dirty", "objHosts"}; @@ -93,7 +91,8 @@ class ObjectDirectoryFixture: public ::testing::Test { class BlockDirectoryFixture: public ::testing::Test { protected: virtual void SetUp() { - dir = new rgw::d4n::BlockDirectory{io}; + conn = std::make_shared(boost::asio::make_strand(io)); + dir = new rgw::d4n::BlockDirectory{conn}; block = new rgw::d4n::CacheBlock{ .cacheObj = { .objName = "testName", @@ -108,13 +107,11 @@ class BlockDirectoryFixture: public ::testing::Test { .hostsList = { env->redisHost } }; - conn = new connection{boost::asio::make_strand(io)}; - ASSERT_NE(block, nullptr); ASSERT_NE(dir, nullptr); ASSERT_NE(conn, nullptr); - dir->init(env->cct, env->dpp); + dir->init(env->cct); /* Run fixture's connection */ config cfg; @@ -125,7 +122,6 @@ class BlockDirectoryFixture: public ::testing::Test { } virtual void TearDown() { - delete conn; delete block; delete dir; } @@ -134,7 +130,7 @@ class BlockDirectoryFixture: public ::testing::Test { rgw::d4n::BlockDirectory* dir; net::io_context io; - connection* conn; + std::shared_ptr conn; std::vector vals{"0", "", "0", "0", env->redisHost, "testName", "testBucket", "", "0", env->redisHost}; @@ -146,7 +142,6 @@ TEST_F(ObjectDirectoryFixture, SetYield) { spawn::spawn(io, [this] (spawn::yield_context yield) { ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); - dir->shutdown(); boost::system::error_code ec; request req; @@ -185,7 +180,6 @@ TEST_F(ObjectDirectoryFixture, GetYield) ASSERT_EQ(0, dir->get(obj, optional_yield{io, yield})); EXPECT_EQ(obj->objName, "newoid"); - dir->shutdown(); { boost::system::error_code ec; @@ -207,7 +201,6 @@ TEST_F(ObjectDirectoryFixture, CopyYield) spawn::spawn(io, [this] (spawn::yield_context yield) { ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", optional_yield{io, yield})); - dir->shutdown(); boost::system::error_code ec; request req; @@ -252,7 +245,6 @@ TEST_F(ObjectDirectoryFixture, DelYield) } ASSERT_EQ(0, dir->del(obj, optional_yield{io, yield})); - dir->shutdown(); { boost::system::error_code ec; @@ -279,7 +271,6 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield) ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", optional_yield{io, yield})); ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", optional_yield{io, yield})); - dir->shutdown(); boost::system::error_code ec; request req; @@ -305,7 +296,6 @@ TEST_F(BlockDirectoryFixture, SetYield) { spawn::spawn(io, [this] (spawn::yield_context yield) { ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); - dir->shutdown(); boost::system::error_code ec; request req; @@ -344,7 +334,6 @@ TEST_F(BlockDirectoryFixture, GetYield) ASSERT_EQ(0, dir->get(block, optional_yield{io, yield})); EXPECT_EQ(block->cacheObj.objName, "newoid"); - dir->shutdown(); { boost::system::error_code ec; @@ -366,7 +355,6 @@ TEST_F(BlockDirectoryFixture, CopyYield) spawn::spawn(io, [this] (spawn::yield_context yield) { ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); ASSERT_EQ(0, dir->copy(block, "copyTestName", "copyBucketName", optional_yield{io, yield})); - dir->shutdown(); boost::system::error_code ec; request req; @@ -411,7 +399,6 @@ TEST_F(BlockDirectoryFixture, DelYield) } ASSERT_EQ(0, dir->del(block, optional_yield{io, yield})); - dir->shutdown(); { boost::system::error_code ec; @@ -438,7 +425,6 @@ TEST_F(BlockDirectoryFixture, UpdateFieldYield) ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); ASSERT_EQ(0, dir->update_field(block, "objName", "newTestName", optional_yield{io, yield})); ASSERT_EQ(0, dir->update_field(block, "blockHosts", "127.0.0.1:5000", optional_yield{io, yield})); - dir->shutdown(); boost::system::error_code ec; request req; @@ -481,7 +467,6 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield) } ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6000", optional_yield{io, yield})); - dir->shutdown(); { boost::system::error_code ec; diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index 659fcf93e06..b94fa426ef2 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -60,20 +60,20 @@ class LFUDAPolicyFixture : public ::testing::Test { .hostsList = { env->redisHost } }; + conn = std::make_shared(boost::asio::make_strand(io)); rgw::cache::Partition partition_info{ .location = "RedisCache", .size = 1000 }; cacheDriver = new rgw::cache::RedisDriver{io, partition_info}; - policyDriver = new rgw::d4n::PolicyDriver(io, cacheDriver, "lfuda"); - dir = new rgw::d4n::BlockDirectory{io}; - conn = new connection{boost::asio::make_strand(io)}; + policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda"); + dir = new rgw::d4n::BlockDirectory{conn}; ASSERT_NE(dir, nullptr); ASSERT_NE(cacheDriver, nullptr); ASSERT_NE(policyDriver, nullptr); ASSERT_NE(conn, nullptr); - dir->init(env->cct, env->dpp); + dir->init(env->cct); cacheDriver->initialize(env->dpp); - policyDriver->get_cache_policy()->init(env->cct, env->dpp); + policyDriver->get_cache_policy()->init(env->cct); bl.append("test data"); bufferlist attrVal; @@ -89,7 +89,6 @@ class LFUDAPolicyFixture : public ::testing::Test { } virtual void TearDown() { - delete conn; delete block; delete dir; delete cacheDriver; @@ -150,7 +149,7 @@ class LFUDAPolicyFixture : public ::testing::Test { rgw::cache::RedisDriver* cacheDriver; net::io_context io; - connection* conn; + std::shared_ptr conn; bufferlist bl; rgw::sal::Attrs attrs; @@ -165,9 +164,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); - dir->shutdown(); cacheDriver->shutdown(); - dynamic_cast(policyDriver->get_cache_policy())->shutdown(); boost::system::error_code ec; request req; @@ -228,9 +225,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); - dir->shutdown(); cacheDriver->shutdown(); - dynamic_cast(policyDriver->get_cache_policy())->shutdown(); std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); boost::system::error_code ec; @@ -260,9 +255,7 @@ TEST_F(LFUDAPolicyFixture, BackendGetBlockYield) spawn::spawn(io, [this] (spawn::yield_context yield) { ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); - dir->shutdown(); cacheDriver->shutdown(); - dynamic_cast(policyDriver->get_cache_policy())->shutdown(); boost::system::error_code ec; request req; -- 2.39.5