]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
d4n: Store `optional_yield` in client cb and update index used in
authorSamarah <samarah.uriarte@ibm.com>
Fri, 6 Oct 2023 20:20:43 +0000 (20:20 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:51 +0000 (21:24 +0530)
D4N/cache classes

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/test/rgw/test_d4n_directory.cc
src/test/rgw/test_d4n_policy.cc

index f608d6266e0d14f3daefada494fd192d749c37af..58872e524da4c7ba7d6b61860db50ee5241366a2 100644 (file)
@@ -224,7 +224,7 @@ int ObjectDirectory::del(CacheObj* object, optional_yield y) {
 }
 
 std::string BlockDirectory::build_index(CacheBlock* block) {
-  return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID);
+  return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
 }
 
 int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) {
@@ -277,7 +277,7 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y) {
   }
 
   endpoint.pop_back();
-  redisValues.push_back(endpoint); // Set in filter -Sam
+  redisValues.push_back(endpoint);
 
   redisValues.push_back("objName");
   redisValues.push_back(block->cacheObj.objName);
@@ -298,7 +298,7 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y) {
   }
 
   endpoint.pop_back();
-  redisValues.push_back(endpoint); // Set in filter -Sam
+  redisValues.push_back(endpoint);
 
   try {
     boost::system::error_code ec;
index e71e2774bb88ca468681441d2fe494350227d89d..2ad56f0f61993f9b64932ee17d4415eabb85ddd7 100644 (file)
@@ -116,10 +116,11 @@ class BlockDirectory: public Directory {
     int del(CacheBlock* block, optional_yield y);
     int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y);
     int remove_host(CacheBlock* block, std::string value, optional_yield y);
-    std::string build_index(CacheBlock* block);
 
   private:
     std::shared_ptr<connection> conn;
+
+    std::string build_index(CacheBlock* block);
 };
 
 } } // namespace rgw::d4n
index 4453e5ec48bea31a72ca1ed01243067201c36581..48ff4543fe2763878d1aabbacf19c975f99d6080 100644 (file)
@@ -6,6 +6,10 @@
 
 namespace rgw { namespace d4n {
 
+std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) {
+  return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size); 
+}
+
 // initiate a call to async_exec() on the connection's executor
 struct initiate_exec {
   std::shared_ptr<boost::redis::connection> conn;
@@ -173,7 +177,7 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) {
   }
 }
 
-CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
+CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) {
   if (entries_map.empty())
     return {};
 
@@ -187,7 +191,8 @@ CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::C
   victim.cacheObj.bucketName = key.substr(0, key.find('_')); 
   key.erase(0, key.find('_') + 1);
   victim.cacheObj.objName = key.substr(0, key.find('_'));
-  victim.blockID = boost::lexical_cast<uint64_t>(key.substr(key.find('_') + 1, key.length()));
+  victim.blockID = it->second->offset;
+  victim.size = it->second->len;
 
   if (dir->get(&victim, y) < 0) {
     return {};
@@ -203,7 +208,7 @@ void LFUDAPolicy::shutdown() {
   boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
 }
 
-int LFUDAPolicy::exist_key(std::string key, optional_yield y) {
+int LFUDAPolicy::exist_key(std::string key) {
   if (entries_map.count(key) != 0) {
     return true;
   }
@@ -215,8 +220,8 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw
   response<std::string> resp;
   int age = get_age(y);
 
-  if (exist_key(dir->build_index(block), y)) { /* Local copy */
-    auto it = entries_map.find(dir->build_index(block));
+  if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
+    auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size));
     it->second->localWeight += age;
     return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
   } else {
@@ -261,14 +266,15 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw
 }
 
 uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
-  CacheBlock victim = find_victim(dpp, cacheNode, y);
+  CacheBlock victim = find_victim(dpp, y);
 
   if (victim.cacheObj.objName.empty()) {
     ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
     return 0; /* Return zero for failure */
   }
 
-  auto it = entries_map.find(dir->build_index(&victim));
+  std::string key = build_index(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.blockID, victim.size);
+  auto it = entries_map.find(key);
   if (it == entries_map.end()) {
     return 0;
   }
@@ -303,7 +309,7 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD
 
   ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
 
-  if (cacheNode->del(dpp, victim.cacheObj.objName, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
+  if (cacheNode->del(dpp, key, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
     return 0;
   } else {
     uint64_t num_entries = entries_map.size();
@@ -349,7 +355,7 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
   return true;
 }
 
-int LRUPolicy::exist_key(std::string key, optional_yield y)
+int LRUPolicy::exist_key(std::string key)
 {
   const std::lock_guard l(lru_lock);
   if (entries_map.count(key) != 0) {
index 713aa78c890c9df53289d8f48e4421f8eb7b1eb2..63da686d9a67f1f277ea535d7693f69fe4b44f1f 100644 (file)
@@ -34,7 +34,7 @@ class CachePolicy {
     virtual ~CachePolicy() = default; 
 
     virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; } 
-    virtual int exist_key(std::string key, optional_yield y) = 0;
+    virtual int exist_key(std::string key) = 0;
     virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
     virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
@@ -68,7 +68,7 @@ class LFUDAPolicy : public CachePolicy {
     int get_age(optional_yield y);
     int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y);
     int get_min_avg_weight(optional_yield y);
-    CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y);
+    CacheBlock find_victim(const DoutPrefixProvider* dpp, optional_yield y);
 
   public:
     LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) {
@@ -98,7 +98,7 @@ class LFUDAPolicy : public CachePolicy {
 
       return 0;
     }
-    virtual int exist_key(std::string key, optional_yield y) override;
+    virtual int exist_key(std::string key) override;
     virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
     virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
@@ -117,7 +117,7 @@ class LRUPolicy : public CachePolicy {
   public:
     LRUPolicy() = default;
 
-    virtual int exist_key(std::string key, optional_yield y) override;
+    virtual int exist_key(std::string key) override;
     virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
     virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
index 059c3112619982ffedd878acfb632cc0dc4df216..f0c9650a249fa0f548b20159dd65e46c1bcacd62 100644 (file)
@@ -44,8 +44,8 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont
   partition_info.type = "read-cache";
   partition_info.size = g_conf()->rgw_d4n_l1_datacache_size;
 
-  cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
-  //cacheDriver = new rgw::cache::SSDDriver(partition_info);
+  //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
+  cacheDriver = new rgw::cache::SSDDriver(partition_info);
   objDir = new rgw::d4n::ObjectDirectory(io_context);
   blockDir = new rgw::d4n::BlockDirectory(io_context);
   policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda");
@@ -526,8 +526,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl;
 
   this->client_cb = cb;
-  this->cb->set_client_cb(cb, dpp); // what's this for? -Sam
-  // save y here -Sam
+  this->cb->set_client_cb(cb, dpp, &y); // what's this for? -Sam
 
   /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
      One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
@@ -580,7 +579,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     block.cacheObj.objName = source->get_key().get_oid();
     block.cacheObj.bucketName = source->get_bucket()->get_name();
 
-    if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) { 
+    if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache)) { 
       // Read From Cache
       auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
 
@@ -601,7 +600,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
       " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-      if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) {
+      if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache)) {
         // Read From Cache
         auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
 
@@ -671,12 +670,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     return r;
   }
   
-  return this->cb->flush_last_part(y);
+  return this->cb->flush_last_part();
 }
 
-int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(optional_yield y)
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
 {
-  save_y = &y;
   last_part = true;
   return handle_data(bl_rem, 0, bl_rem.length());
 }
index ead07740a5e76c49960664648297fc383e9ec273..78428a01ad2baa1e37aaebe0ec8605beee1f9da0 100644 (file)
@@ -112,19 +112,21 @@ class D4NFilterObject : public FilterObject {
            bool last_part{false};
            std::mutex d4n_get_data_lock;
            bool write_to_cache{true};
-      const DoutPrefixProvider* dpp;
+           const DoutPrefixProvider* dpp;
+           optional_yield* y;
 
          public:
            D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid, D4NFilterObject* _source) : filter(_filter), 
-                                                                         oid(_oid), 
-                                                                   source(_source) {}
-
-           optional_yield* save_y;
+                                                                                                   oid(_oid), source(_source) {}
 
            int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
-           void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp) { this->client_cb = client_cb; this->dpp = dpp;}
+           void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp, optional_yield* y) { 
+              this->client_cb = client_cb; 
+              this->dpp = dpp;
+              this->y = y;
+            }
            void set_ofs(uint64_t ofs) { this->ofs = ofs; }
-           int flush_last_part(optional_yield y);
+           int flush_last_part();
            void bypass_cache_write() { this->write_to_cache = false; }
        };
 
index 806417416daa71f84a256d1c36461cffc2f3d067..4ce35eaf1d2f5d1cbd176dc635b6e6b8741b11e3 100644 (file)
@@ -279,7 +279,7 @@ TEST_F(BlockDirectoryFixture, SetYield)
 
     boost::system::error_code ec;
     request req;
-    req.push_range("HMGET", "testBucket_testName_0", fields);
+    req.push_range("HMGET", "testBucket_testName_0_0", fields);
     req.push("FLUSHALL");
 
     response< std::vector<std::string>,
@@ -303,7 +303,7 @@ TEST_F(BlockDirectoryFixture, GetYield)
     {
       boost::system::error_code ec;
       request req;
-      req.push("HSET", "testBucket_testName_0", "objName", "newoid");
+      req.push("HSET", "testBucket_testName_0_0", "objName", "newoid");
       response<int> resp;
 
       conn->async_exec(req, resp, yield[ec]);
@@ -340,8 +340,8 @@ TEST_F(BlockDirectoryFixture, CopyYield)
 
     boost::system::error_code ec;
     request req;
-    req.push("EXISTS", "copyBucketName_copyTestName_0");
-    req.push_range("HMGET", "copyBucketName_copyTestName_0", fields);
+    req.push("EXISTS", "copyBucketName_copyTestName_0_0");
+    req.push_range("HMGET", "copyBucketName_copyTestName_0_0", fields);
     req.push("FLUSHALL");
 
     response<int, std::vector<std::string>, 
@@ -371,7 +371,7 @@ TEST_F(BlockDirectoryFixture, DelYield)
     {
       boost::system::error_code ec;
       request req;
-      req.push("EXISTS", "testBucket_testName_0");
+      req.push("EXISTS", "testBucket_testName_0_0");
       response<int> resp;
 
       conn->async_exec(req, resp, yield[ec]);
@@ -412,7 +412,7 @@ TEST_F(BlockDirectoryFixture, UpdateFieldYield)
 
     boost::system::error_code ec;
     request req;
-    req.push("HMGET", "testBucket_testName_0", "objName", "blockHosts");
+    req.push("HMGET", "testBucket_testName_0_0", "objName", "blockHosts");
     req.push("FLUSHALL");
     response< std::vector<std::string>, 
              boost::redis::ignore_t> resp;
@@ -439,8 +439,8 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield)
     {
       boost::system::error_code ec;
       request req;
-      req.push("HEXISTS", "testBucket_testName_0", "blockHosts");
-      req.push("HGET", "testBucket_testName_0", "blockHosts");
+      req.push("HEXISTS", "testBucket_testName_0_0", "blockHosts");
+      req.push("HGET", "testBucket_testName_0_0", "blockHosts");
       response<int, std::string> resp;
 
       conn->async_exec(req, resp, yield[ec]);
index ce2fde244bf8143c4eec825e37f11073c325aa7c..1c104cb2f64910243f0f1a3e7f7fe1aa21cdc2a1 100644 (file)
@@ -104,19 +104,13 @@ class LFUDAPolicyFixture: public ::testing::Test {
 
     bufferlist bl;
     rgw::sal::Attrs attrs;
-    std::string key = "testName";
 };
 
 TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
 {
   spawn::spawn(io, [this] (spawn::yield_context yield) {
+    std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
     ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield}));
-
-    rgw::d4n::CacheBlock temp;
-    temp.blockID = 0;
-    temp.cacheObj.objName = "testName";
-    temp.cacheObj.bucketName = "testBucket";
-    std::string key = dir->build_index(&temp);
     policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
 
     /* Change cache age for testing purposes */
@@ -182,9 +176,9 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     attrs.insert({"bucket_name", attrVal});
 
     ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield}));
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, victim.cacheObj.objName, bl, bl.length(), attrs, optional_yield{io, yield}));
-    std::string key = dir->build_index(&victim);
-    policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
+    std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size);
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield}));
+    policyDriver->get_cache_policy()->insert(env->dpp, victimKey, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
 
     /* Remote block */
     block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */
@@ -201,11 +195,12 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     cacheDriver->shutdown();
     policyDriver->get_cache_policy()->shutdown();
 
+    std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
     boost::system::error_code ec;
     request req;
-    req.push("EXISTS", "RedisCache/victimName");
-    req.push("HGET", "testBucket_victimName_0", "globalWeight");
-    req.push("HGET", "testBucket_testName_0", "globalWeight");
+    req.push("EXISTS", "RedisCache/" + victimKey);
+    req.push("HGET", victimKey, "globalWeight");
+    req.push("HGET", key, "globalWeight");
     req.push("FLUSHALL");
 
     response<int, std::string, std::string,
@@ -237,13 +232,9 @@ TEST_F(LFUDAPolicyFixture, BackendGetBlockYield)
     req.push("FLUSHALL");
 
     response<boost::redis::ignore_t> resp;
-    //response< std::vector<std::string>,
-    //          boost::redis::ignore_t > resp;
 
     conn->async_exec(req, resp, yield[ec]);
 
-    //ASSERT_EQ((bool)ec, false);
-    //EXPECT_EQ(std::get<0>(resp).value(), vals);
     conn->cancel();
   });