]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: Add directory probing to read workflow
authorSamarah <samarah.uriarte@ibm.com>
Wed, 6 Mar 2024 20:58:03 +0000 (20:58 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 1fd9437691d0347682a40f95afe0e1343dffb6f6..20f81ffd7da39ab3c98a613179f09633d710e25d 100644 (file)
@@ -444,7 +444,7 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y)
         std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
        block->hostsList.clear();
 
-       while (!ss.eof()) {
+       while (!ss.eof()) { // Replace with boost::split? -Sam
           std::string host;
          std::getline(ss, host, '_');
          block->hostsList.push_back(host);
index 605b8f9b6a7a2cac5a58a92254cddc615fcd48fa..88c10b552babb70e43f8e798dbea847e3873aa72 100644 (file)
@@ -572,6 +572,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
   this->offset = ofs;
 
+  rgw::d4n::CacheBlock block;
+  block.cacheObj.objName = source->get_key().get_oid();
+  block.cacheObj.bucketName = source->get_bucket()->get_name();
+
   do {
     uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
     if (start_part_num == (num_parts - 1)) {
@@ -589,59 +593,91 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       read_ofs = diff_ofs;
     }
 
+    block.blockID = adjusted_start_ofs;
+    block.size = part_len; 
+
     ceph::bufferlist bl;
     std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
 
     ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
     " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-    if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { 
-      // Read From Cache
-      auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
+    int ret = -1;
+    if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && (ret = source->driver->get_block_dir()->get(&block, y)) == 0) { 
+      auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address);
+      if (it != block.hostsList.end()) { /* Local copy */
+       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl;
+
+       if (block.version == version) {
+         if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { 
+           // Read From Cache
+           auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
+
+           this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
+
+           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+           auto r = flush(dpp, std::move(completed), y);
+
+           if (r < 0) {
+             drain(dpp, y);
+             ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+             return r;
+           }
+         // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) 
+         } else {
+           oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+           //for ranged requests, for last part, the whole part might exist in the cache
+            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << 
+            " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+
+           if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+             // Read From Cache
+             auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
+
+             this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
+
+             ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+             auto r = flush(dpp, std::move(completed), y);
+
+             if (r < 0) {
+               drain(dpp, y);
+               ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+               return r;
+             }
+           }
+         }
+       // if (block.version == version)
+       } else {
+         // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
+
+         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+         auto r = drain(dpp, y);
+
+         if (r < 0) {
+           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+           return r;
+         }
+       }
+      } else if (block.hostsList.size()) { /* Remote copy */
+       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
+       // TODO: Retrieve remotely
+      }
+    // if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && int ret = source->driver->get_block_dir()->get(&block, y) == 0)  
+    } else { /* Fetch from backend */
+      if (ret < 0)
+       ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << dendl;
 
-      this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
-      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-      auto r = flush(dpp, std::move(completed), y);
+      auto r = drain(dpp, y);
 
       if (r < 0) {
-        drain(dpp, y);
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
-        return r;
+       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+       return r;
       }
-    } else {
-      oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
-      //for ranged requests, for last part, the whole part might exist in the cache
-       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
-      " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
-
-      if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
-        // Read From Cache
-        auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
-
-        this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
-
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-        auto r = flush(dpp, std::move(completed), y);
 
-        if (r < 0) {
-          drain(dpp, y);
-          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
-          return r;
-        }
-
-      } else {
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
-        auto r = drain(dpp, y);
-
-        if (r < 0) {
-          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
-          return r;
-        }
-
-        break;
-      }
+      break;
     }
 
     if (start_part_num == (num_parts - 1)) {
@@ -756,7 +792,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
               }
             }
           } else {
-                 ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
+           ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
           }
         }
       }
index 40c98d5d2f884d69b50f4387b9f5921d7e5dcd7c..7011f2a64bf123742995c9609aa50cd1907dc57c 100644 (file)
@@ -159,7 +159,7 @@ class D4NFilterObject : public FilterObject {
         std::unique_ptr<rgw::Aio> aio;
        uint64_t offset = 0; // next offset to write to client
         rgw::AioResultList completed; // completed read results, sorted by offset
-      std::unordered_map<uint64_t, std::pair<uint64_t,uint64_t>> blocks_info;
+       std::unordered_map<uint64_t, std::pair<uint64_t,uint64_t>> blocks_info;
 
        int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y);
        void cancel();