]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
d4n: Add a central Redis connection to the filter to be shared by the rest of the...
authorSamarah <samarah.uriarte@ibm.com>
Wed, 10 Jan 2024 18:00:23 +0000 (18:00 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:52 +0000 (21:24 +0530)
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/test/rgw/test_d4n_directory.cc
src/test/rgw/test_d4n_policy.cc

index 856c88be3563d995bf3b905db5ca378029bbfa75..abffcd16d86ff58b61f2d27573c89b7e6757d6bd 100644 (file)
@@ -68,12 +68,6 @@ int ObjectDirectory::exist_key(CacheObj* object, optional_yield y)
   return std::get<0>(resp).value();
 }
 
-void ObjectDirectory::shutdown()
-{
-  // call cancel() on the connection's executor
-  boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
-}
-
 int ObjectDirectory::set(CacheObj* object, optional_yield y) 
 {
   std::string key = build_index(object);
@@ -332,12 +326,6 @@ int BlockDirectory::exist_key(CacheBlock* block, optional_yield y)
   return std::get<0>(resp).value();
 }
 
-void BlockDirectory::shutdown()
-{
-  // call cancel() on the connection's executor
-  boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
-}
-
 int BlockDirectory::set(CacheBlock* block, optional_yield y) 
 {
   std::string key = build_index(block);
index d60731f7ed52e193ed4f38aa62a1654dc607baf7..cf562dd27301a45095243cf43bdd4ea67f3e25ef 100644 (file)
@@ -40,32 +40,12 @@ class Directory {
 
 class ObjectDirectory: public Directory {
   public:
-    ObjectDirectory(net::io_context& io_context) {
-      conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
-    }
-    ~ObjectDirectory() {
-      shutdown();
-    }
+    ObjectDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
 
-    int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+    void init(CephContext* cct) {
       this->cct = cct;
-
-      config cfg;
-      cfg.addr.host = cct->_conf->rgw_d4n_host;
-      cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
-      cfg.clientname = "D4N.ObjectDir";
-
-      if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
-       ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
-       return -EDESTADDRREQ;
-      }
-      
-      conn->async_run(cfg, {}, net::consign(net::detached, conn)); 
-
-      return 0;
     }
     int exist_key(CacheObj* object, optional_yield y);
-    void shutdown();
 
     int set(CacheObj* object, optional_yield y);
     int get(CacheObj* object, optional_yield y);
@@ -81,32 +61,12 @@ class ObjectDirectory: public Directory {
 
 class BlockDirectory: public Directory {
   public:
-    BlockDirectory(net::io_context& io_context) {
-      conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
-    }
-    ~BlockDirectory() {
-      shutdown();
-    }
+    BlockDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
     
-    int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+    void init(CephContext* cct) {
       this->cct = cct;
-
-      config cfg;
-      cfg.addr.host = cct->_conf->rgw_d4n_host;
-      cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
-      cfg.clientname = "D4N.BlockDir";
-
-      if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
-       ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
-       return -EDESTADDRREQ;
-      }
-
-      conn->async_run(cfg, {}, net::consign(net::detached, conn)); 
-
-      return 0;
     }
     int exist_key(CacheBlock* block, optional_yield y);
-    void shutdown();
 
     int set(CacheBlock* block, optional_yield y);
     int get(CacheBlock* block, optional_yield y);
index f53aafc445ebda3766058b60a6e067a88087ad5f..8de8bcb87cf2b9ead145de4c2f089145bc4c991f 100644 (file)
@@ -194,13 +194,6 @@ CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optiona
   return victim;
 }
 
-void LFUDAPolicy::shutdown() {
-  dir->shutdown();
-  
-  // call cancel() on the connection's executor
-  boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
-}
-
 int LFUDAPolicy::exist_key(std::string key) {
   const std::lock_guard l(lfuda_lock);
   if (entries_map.count(key) != 0) {
index e7c7781efcadd3ac55ec1a422b1c8b1ed4ff3de3..60d55128ab0b377e7c722163d03a17401269c765 100644 (file)
@@ -33,7 +33,7 @@ class CachePolicy {
     CachePolicy() {}
     virtual ~CachePolicy() = default; 
 
-    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; } 
+    virtual void init(CephContext *cct) = 0; 
     virtual int exist_key(std::string key) = 0;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
@@ -65,7 +65,6 @@ class LFUDAPolicy : public CachePolicy {
     std::unordered_map<std::string, LFUDAEntry*> entries_map;
     std::mutex lfuda_lock;
 
-    net::io_context& io;
     std::shared_ptr<connection> conn;
     BlockDirectory* dir;
     rgw::cache::CacheDriver* cacheDriver;
@@ -77,32 +76,18 @@ class LFUDAPolicy : public CachePolicy {
     CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
 
   public:
-    LFUDAPolicy(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), io(io_context), cacheDriver{cacheDriver} {
-      conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
-      dir = new BlockDirectory{io};
+    LFUDAPolicy(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), 
+                                                                         conn(conn), 
+                                                                         cacheDriver(cacheDriver)
+    {
+      dir = new BlockDirectory{conn};
     }
     ~LFUDAPolicy() {
-      shutdown();
       delete dir;
     } 
 
-    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) {
-      std::string address = cct->_conf->rgw_local_cache_address;
-
-      config cfg;
-      cfg.addr.host = address.substr(0, address.find(":"));
-      cfg.addr.port = address.substr(address.find(":") + 1, address.length());
-      cfg.clientname = "D4N.Policy";
-
-      if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
-       ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
-       return -EDESTADDRREQ;
-      }
-
-      dir->init(cct, dpp);
-      conn->async_run(cfg, {}, net::consign(net::detached, conn));
-
-      return 0;
+    virtual void init(CephContext *cct) {
+      dir->init(cct);
     }
     virtual int exist_key(std::string key) override;
     //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
@@ -117,7 +102,6 @@ class LFUDAPolicy : public CachePolicy {
         return nullptr;
       return it->second;
     }
-    void shutdown();
 };
 
 class LRUPolicy : public CachePolicy {
@@ -134,6 +118,7 @@ class LRUPolicy : public CachePolicy {
   public:
     LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
 
+    virtual void init(CephContext *cct) {} 
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
@@ -146,10 +131,10 @@ class PolicyDriver {
     CachePolicy* cachePolicy;
 
   public:
-    PolicyDriver(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName) 
+    PolicyDriver(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName) 
     {
       if (policyName == "lfuda") {
-       cachePolicy = new LFUDAPolicy(io_context, cacheDriver);
+       cachePolicy = new LFUDAPolicy(conn, cacheDriver);
       } else if (policyName == "lru") {
        cachePolicy = new LRUPolicy(cacheDriver);
       }
index 4c2508553c6dfe2b30e4d3f2ca22e8f708094d3b..12946ab430b55c2fcac6a4de9ee25a972c56f9da 100644 (file)
@@ -35,17 +35,18 @@ static inline Object* nextObject(Object* t)
 
 D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next) 
 {
+  conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
+
   rgw::cache::Partition partition_info;
   partition_info.location = g_conf()->rgw_d4n_l1_datacache_persistent_path;
   partition_info.name = "d4n";
   partition_info.type = "read-cache";
   partition_info.size = g_conf()->rgw_d4n_l1_datacache_size;
 
-  //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
   cacheDriver = new rgw::cache::SSDDriver(partition_info);
-  objDir = new rgw::d4n::ObjectDirectory(io_context);
-  blockDir = new rgw::d4n::BlockDirectory(io_context);
-  policyDriver = new rgw::d4n::PolicyDriver(io_context, cacheDriver, "lfuda");
+  objDir = new rgw::d4n::ObjectDirectory(conn);
+  blockDir = new rgw::d4n::BlockDirectory(conn);
+  policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda");
 }
 
  D4NFilterDriver::~D4NFilterDriver()
@@ -58,14 +59,28 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont
 
 int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
 {
-  FilterDriver::initialize(cct, dpp);
+  namespace net = boost::asio;
+  using boost::redis::config;
+
+  std::string address = cct->_conf->rgw_local_cache_address;
+  config cfg;
+  cfg.addr.host = address.substr(0, address.find(":"));
+  cfg.addr.port = address.substr(address.find(":") + 1, address.length());
+  cfg.clientname = "D4N.Filter";
+
+  if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
+    ldpp_dout(dpp, 10) << "D4NFilterDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
+    return -EDESTADDRREQ;
+  }
 
-  cacheDriver->initialize(dpp);
+  conn->async_run(cfg, {}, net::consign(net::detached, conn));
 
-  objDir->init(cct, dpp);
-  blockDir->init(cct, dpp);
+  FilterDriver::initialize(cct, dpp);
 
-  policyDriver->get_cache_policy()->init(cct, dpp);
+  cacheDriver->initialize(dpp);
+  objDir->init(cct);
+  blockDir->init(cct);
+  policyDriver->get_cache_policy()->init(cct);
 
   return 0;
 }
index 5f785fb65cb91a781e7d75bb693ac3cf9d30dc50..e43c0105265acec513aa571fa14021cce1d711c0 100644 (file)
@@ -38,8 +38,11 @@ namespace rgw::d4n {
 
 namespace rgw { namespace sal {
 
+using boost::redis::connection;
+
 class D4NFilterDriver : public FilterDriver {
   private:
+    std::shared_ptr<connection> conn;
     rgw::cache::CacheDriver* cacheDriver;
     rgw::d4n::ObjectDirectory* objDir;
     rgw::d4n::BlockDirectory* blockDir;
index 7328212dcffa64d34b9dab34f5e2e728b6320d57..59aceabd2a0d78ec6b8fc97a6c23407621eca00f 100644 (file)
@@ -49,7 +49,8 @@ class Environment : public ::testing::Environment {
 class ObjectDirectoryFixture: public ::testing::Test {
   protected:
     virtual void SetUp() {
-      dir = new rgw::d4n::ObjectDirectory{io};
+      conn = std::make_shared<connection>(boost::asio::make_strand(io));
+      dir = new rgw::d4n::ObjectDirectory{conn};
       obj = new rgw::d4n::CacheObj{
        .objName = "testName",
        .bucketName = "testBucket",
@@ -58,13 +59,11 @@ class ObjectDirectoryFixture: public ::testing::Test {
        .hostsList = { env->redisHost }
       };
 
-      conn = new connection{boost::asio::make_strand(io)};
-
       ASSERT_NE(obj, nullptr);
       ASSERT_NE(dir, nullptr);
       ASSERT_NE(conn, nullptr);
 
-      dir->init(env->cct, env->dpp);
+      dir->init(env->cct);
 
       /* Run fixture's connection */
       config cfg;
@@ -75,7 +74,6 @@ class ObjectDirectoryFixture: public ::testing::Test {
     } 
 
     virtual void TearDown() {
-      delete conn;
       delete obj;
       delete dir;
     }
@@ -84,7 +82,7 @@ class ObjectDirectoryFixture: public ::testing::Test {
     rgw::d4n::ObjectDirectory* dir;
 
     net::io_context io;
-    connection* conn;
+    std::shared_ptr<connection> conn;
 
     std::vector<std::string> vals{"testName", "testBucket", "", "0", env->redisHost};
     std::vector<std::string> fields{"objName", "bucketName", "creationTime", "dirty", "objHosts"};
@@ -93,7 +91,8 @@ class ObjectDirectoryFixture: public ::testing::Test {
 class BlockDirectoryFixture: public ::testing::Test {
   protected:
     virtual void SetUp() {
-      dir = new rgw::d4n::BlockDirectory{io};
+      conn = std::make_shared<connection>(boost::asio::make_strand(io));
+      dir = new rgw::d4n::BlockDirectory{conn};
       block = new rgw::d4n::CacheBlock{
         .cacheObj = {
          .objName = "testName",
@@ -108,13 +107,11 @@ class BlockDirectoryFixture: public ::testing::Test {
        .hostsList = { env->redisHost }
       };
 
-      conn = new connection{boost::asio::make_strand(io)};
-
       ASSERT_NE(block, nullptr);
       ASSERT_NE(dir, nullptr);
       ASSERT_NE(conn, nullptr);
 
-      dir->init(env->cct, env->dpp);
+      dir->init(env->cct);
 
       /* Run fixture's connection */
       config cfg;
@@ -125,7 +122,6 @@ class BlockDirectoryFixture: public ::testing::Test {
     } 
 
     virtual void TearDown() {
-      delete conn;
       delete block;
       delete dir;
     }
@@ -134,7 +130,7 @@ class BlockDirectoryFixture: public ::testing::Test {
     rgw::d4n::BlockDirectory* dir;
 
     net::io_context io;
-    connection* conn;
+    std::shared_ptr<connection> conn;
 
     std::vector<std::string> vals{"0", "", "0", "0", env->redisHost, 
                                    "testName", "testBucket", "", "0", env->redisHost};
@@ -146,7 +142,6 @@ TEST_F(ObjectDirectoryFixture, SetYield)
 {
   spawn::spawn(io, [this] (spawn::yield_context yield) {
     ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
-    dir->shutdown();
 
     boost::system::error_code ec;
     request req;
@@ -185,7 +180,6 @@ TEST_F(ObjectDirectoryFixture, GetYield)
 
     ASSERT_EQ(0, dir->get(obj, optional_yield{io, yield}));
     EXPECT_EQ(obj->objName, "newoid");
-    dir->shutdown();
 
     {
       boost::system::error_code ec;
@@ -207,7 +201,6 @@ TEST_F(ObjectDirectoryFixture, CopyYield)
   spawn::spawn(io, [this] (spawn::yield_context yield) {
     ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
     ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", optional_yield{io, yield}));
-    dir->shutdown();
 
     boost::system::error_code ec;
     request req;
@@ -252,7 +245,6 @@ TEST_F(ObjectDirectoryFixture, DelYield)
     }
 
     ASSERT_EQ(0, dir->del(obj, optional_yield{io, yield}));
-    dir->shutdown();
 
     {
       boost::system::error_code ec;
@@ -279,7 +271,6 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield)
     ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
     ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", optional_yield{io, yield}));
     ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", optional_yield{io, yield}));
-    dir->shutdown();
 
     boost::system::error_code ec;
     request req;
@@ -305,7 +296,6 @@ TEST_F(BlockDirectoryFixture, SetYield)
 {
   spawn::spawn(io, [this] (spawn::yield_context yield) {
     ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
-    dir->shutdown();
 
     boost::system::error_code ec;
     request req;
@@ -344,7 +334,6 @@ TEST_F(BlockDirectoryFixture, GetYield)
 
     ASSERT_EQ(0, dir->get(block, optional_yield{io, yield}));
     EXPECT_EQ(block->cacheObj.objName, "newoid");
-    dir->shutdown();
 
     {
       boost::system::error_code ec;
@@ -366,7 +355,6 @@ TEST_F(BlockDirectoryFixture, CopyYield)
   spawn::spawn(io, [this] (spawn::yield_context yield) {
     ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
     ASSERT_EQ(0, dir->copy(block, "copyTestName", "copyBucketName", optional_yield{io, yield}));
-    dir->shutdown();
 
     boost::system::error_code ec;
     request req;
@@ -411,7 +399,6 @@ TEST_F(BlockDirectoryFixture, DelYield)
     }
 
     ASSERT_EQ(0, dir->del(block, optional_yield{io, yield}));
-    dir->shutdown();
 
     {
       boost::system::error_code ec;
@@ -438,7 +425,6 @@ TEST_F(BlockDirectoryFixture, UpdateFieldYield)
     ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
     ASSERT_EQ(0, dir->update_field(block, "objName", "newTestName", optional_yield{io, yield}));
     ASSERT_EQ(0, dir->update_field(block, "blockHosts", "127.0.0.1:5000", optional_yield{io, yield}));
-    dir->shutdown();
 
     boost::system::error_code ec;
     request req;
@@ -481,7 +467,6 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield)
     }
 
     ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6000", optional_yield{io, yield}));
-    dir->shutdown();
 
     {
       boost::system::error_code ec;
index 659fcf93e06a05a688ddf16f8865562cebbb9511..b94fa426ef2afb949d66363aaf3ac50432995c94 100644 (file)
@@ -60,20 +60,20 @@ class LFUDAPolicyFixture : public ::testing::Test {
        .hostsList = { env->redisHost }
       };
 
+      conn = std::make_shared<connection>(boost::asio::make_strand(io));
       rgw::cache::Partition partition_info{ .location = "RedisCache", .size = 1000 };
       cacheDriver = new rgw::cache::RedisDriver{io, partition_info};
-      policyDriver = new rgw::d4n::PolicyDriver(io, cacheDriver, "lfuda");
-      dir = new rgw::d4n::BlockDirectory{io};
-      conn = new connection{boost::asio::make_strand(io)};
+      policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda");
+      dir = new rgw::d4n::BlockDirectory{conn};
 
       ASSERT_NE(dir, nullptr);
       ASSERT_NE(cacheDriver, nullptr);
       ASSERT_NE(policyDriver, nullptr);
       ASSERT_NE(conn, nullptr);
 
-      dir->init(env->cct, env->dpp);
+      dir->init(env->cct);
       cacheDriver->initialize(env->dpp);
-      policyDriver->get_cache_policy()->init(env->cct, env->dpp);
+      policyDriver->get_cache_policy()->init(env->cct);
 
       bl.append("test data");
       bufferlist attrVal;
@@ -89,7 +89,6 @@ class LFUDAPolicyFixture : public ::testing::Test {
     } 
 
     virtual void TearDown() {
-      delete conn;
       delete block;
       delete dir;
       delete cacheDriver;
@@ -150,7 +149,7 @@ class LFUDAPolicyFixture : public ::testing::Test {
     rgw::cache::RedisDriver* cacheDriver;
 
     net::io_context io;
-    connection* conn;
+    std::shared_ptr<connection> conn;
 
     bufferlist bl;
     rgw::sal::Attrs attrs;
@@ -165,9 +164,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
 
     ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
 
-    dir->shutdown();
     cacheDriver->shutdown();
-    dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->shutdown(); 
 
     boost::system::error_code ec;
     request req;
@@ -228,9 +225,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
 
     ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
 
-    dir->shutdown();
     cacheDriver->shutdown();
-    dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->shutdown(); 
 
     std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
     boost::system::error_code ec;
@@ -260,9 +255,7 @@ TEST_F(LFUDAPolicyFixture, BackendGetBlockYield)
   spawn::spawn(io, [this] (spawn::yield_context yield) {
     ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
 
-    dir->shutdown();
     cacheDriver->shutdown();
-    dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->shutdown(); 
 
     boost::system::error_code ec;
     request req;