]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: optimizing iterate method to align
authorPritha Srivastava <prsrivas@redhat.com>
Wed, 7 May 2025 06:05:13 +0000 (11:35 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Fri, 22 Aug 2025 15:02:18 +0000 (20:32 +0530)
last block also with max_chunk_size(object size
or rgw_max_chunk_size) and to perform checks
based on object size.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/test/rgw/test_d4n_filter.cc

index de759eb60972f9d31c053643f42c97dd6a5fadef..db419142deb2707bc470fa4034bfed87afd7b110 100644 (file)
@@ -1910,6 +1910,11 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
 int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
                         RGWGetDataCB* cb, optional_yield y) 
 {
+  //special handling in case object size is zero
+  if (source->get_size() == 0) {
+    return 0;
+  }
+
   const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
   std::string version = source->get_object_version();
   std::string prefix = get_cache_block_prefix(source, version);
@@ -1926,17 +1931,24 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
      chunks using the larger chunk, but all corner cases need to be considered like the last chunk which might be smaller than max_chunk_size
      and also ranged requests where a smaller chunk is overwritten by a larger chunk size != max_chunk_size */
 
-  uint64_t max_chunk_size = g_conf()->rgw_max_chunk_size;
+  uint64_t max_chunk_size = std::min(g_conf()->rgw_max_chunk_size, source->get_size());
   uint64_t start_part_num = 0;
   uint64_t part_num = ofs/max_chunk_size; //part num of ofs wrt start of the object
   uint64_t adjusted_start_ofs = part_num*max_chunk_size; //in case of ranged request, adjust the start offset to the beginning of a chunk/ part
-  uint64_t diff_ofs = ofs - adjusted_start_ofs; //difference between actual offset and adjusted offset
+  uint64_t start_diff_ofs = ofs - adjusted_start_ofs; //difference between actual start offset and adjusted start offset
   off_t len = (end - adjusted_start_ofs) + 1;
   uint64_t num_parts = (len%max_chunk_size) == 0 ? len/max_chunk_size : (len/max_chunk_size) + 1; //calculate num parts based on adjusted offset
+  uint64_t last_part_num = end/max_chunk_size;
+  uint64_t adjusted_end_ofs = std::min(((last_part_num + 1)*max_chunk_size - 1), (source->get_size() - 1)); //align end offset to max_chunk_size boundary in case of ranged request
+  uint64_t end_diff_ofs = adjusted_end_ofs - end; //difference between actual end offset and adjusted end offset
+  uint64_t adjusted_len = (adjusted_end_ofs - adjusted_start_ofs) + 1;
   //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache 
   uint64_t cost = 0, len_to_read = 0, part_len = 0;
 
-  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_start_offset: " << adjusted_start_ofs << dendl;
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_end_ofs: " << adjusted_end_ofs << dendl;
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_len: " << adjusted_len << dendl;
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " len: " << len << dendl;
 
   if ((params.part_num && !source->is_multipart()) || !params.part_num) {
     aio = rgw::make_throttle(window_size, y);
@@ -1952,18 +1964,18 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     do {
       uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
       if (start_part_num == (num_parts - 1)) {
-        len_to_read = len;
-        part_len = len;
-        cost = len;
+        len_to_read = adjusted_len - end_diff_ofs;
+        part_len = adjusted_len;
+        cost = adjusted_len;
       } else {
         len_to_read = max_chunk_size;
         cost = max_chunk_size;
         part_len = max_chunk_size;
       }
       if (start_part_num == 0) {
-        len_to_read -= diff_ofs;
-        id += diff_ofs;
-        read_ofs = diff_ofs;
+        len_to_read -= start_diff_ofs;
+        id += start_diff_ofs;
+        read_ofs = start_diff_ofs;
       }
 
       block.blockID = adjusted_start_ofs;
@@ -1972,7 +1984,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       ceph::bufferlist bl;
       std::string oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(part_len));
 
-      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ <<  " " << __LINE__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
       " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
       int ret;
@@ -2031,143 +2043,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
           // TODO: Retrieve remotely
           // Policy decision: should we cache remote blocks locally?
         }
-      } else if (ret == -ENOENT) { // end - if ((ret = source->driver->get_block_dir()->get
-        block.blockID = adjusted_start_ofs;
-        uint64_t obj_size = source->get_size(), chunk_size = 0;
-        if (obj_size < max_chunk_size) {
-          chunk_size = obj_size;
-        } else {
-          chunk_size = max_chunk_size;
-        }
-        block.size = chunk_size;
-
-        if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
-          auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
-          if (it != block.cacheObj.hostsList.end()) { /* Local copy */
-            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
-
-            if (block.version == version) {
-              oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(chunk_size));
-
-              //for range requests, for last part, the whole part might exist in the cache
-              ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
-                " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
-
-              if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::INCR, y) > 0) {
-                // Read From Cache
-                ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: oid_in_cache=" << oid_in_cache << dendl;
-
-                auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
-                this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, chunk_size)));
-                ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-                auto r = flush(dpp, std::move(completed), y);
-                if (r < 0) {
-                  drain(dpp, y);
-                  ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
-                  return r;
-                }
-              } else { // end - if ((part_len != chunk_size) && update_refcount_if_key_exists
-                int r = -1;
-                if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
-                  ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
-
-                if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
-                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
-                  // TODO: Retrieve remotely
-                  // Policy decision: should we cache remote blocks locally?
-                } else {
-                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
-                  auto r = drain(dpp, y);
-                  if (r < 0) {
-                    ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-                    return r;
-                  }
-                  break;
-                }
-              }
-            } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
-              ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
-              // TODO: Retrieve remotely
-              // Policy decision: should we cache remote blocks locally?
-            }
-          } else { // end - if (block.version == version)
-            // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
-            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-            auto r = drain(dpp, y);
-            if (r < 0) {
-              ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-              return r;
-            }
-            break;
-          }
-        } else if (ret == -ENOENT) { // end - if source->driver->get_block_dir()->get(dpp, &block, y))
-          block.blockID = adjusted_start_ofs;
-          uint64_t last_part_size = source->get_size() - adjusted_start_ofs;
-          block.size = last_part_size;
-          if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
-            auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-            if (it != block.cacheObj.hostsList.end()) { /* Local copy */
-              ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
-              if (block.version == version) {
-                oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(last_part_size));
-                ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
-                  " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
-                if ((part_len != last_part_size) && source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::INCR, y) > 0) {
-                  // Read From Cache
-                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
-                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: oid_in_cache=" << oid_in_cache << dendl;
-                  auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
-                  this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, last_part_size)));
-                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-                  auto r = flush(dpp, std::move(completed), y);
-                  if (r < 0) {
-                    drain(dpp, y);
-                    ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
-                    return r;
-                  }
-                } else { // if get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists
-                  int r = -1;
-                  if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
-                    ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
-                  if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
-                    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
-                    // TODO: Retrieve remotely
-                    // Policy decision: should we cache remote blocks locally?
-                  } else {
-                    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-                    auto r = drain(dpp, y);
-                    if (r < 0) {
-                      ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-                      return r;
-                    }
-                    break;
-                  }
-                }
-              } else {// end - if (block.version == version)
-                //TODO: return retry error
-                ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-                auto r = drain(dpp, y);
-                if (r < 0) {
-                  ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-                  return r;
-                }
-                break;
-              }
-            } else if (block.cacheObj.hostsList.size()) {
-              //TODO: get remote copy
-            }
-          } else if (ret == -ENOENT) {
-            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-            auto r = drain(dpp, y);
-            if (r < 0) {
-              ldpp_dout(dpp, 10) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-              return r;
-            }
-            break;
-          }
-        }
       } else { // else if (ret == -ENOENT)
         if (ret < 0)
           ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
@@ -2188,7 +2063,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       }
 
       start_part_num += 1;
-      len -= max_chunk_size;
+      adjusted_len -= max_chunk_size;
     } while (start_part_num < num_parts);
   }
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
@@ -2198,14 +2073,16 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     obj_attrs = source->get_attrs();
   }
 
-  this->cb->set_ofs(diff_ofs);
+  this->cb->set_start_ofs(start_diff_ofs);
+  this->cb->set_len(len);
   this->cb->set_adjusted_start_ofs(adjusted_start_ofs);
   this->cb->set_part_num(start_part_num);
+  this->cb->set_num_parts(num_parts);
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): adjusted_start_ofs: " << adjusted_start_ofs << " end: " << end << dendl;
-  auto r = next->iterate(dpp, adjusted_start_ofs, end, this->cb.get(), y);
+  auto r = next->iterate(dpp, adjusted_start_ofs, adjusted_end_ofs, this->cb.get(), y);
   //calculate the number of blocks read from backend store, and increment the perfcounter using that
   if(perfcounter) {
-    uint64_t len_to_read_from_store = ((end - adjusted_start_ofs) + 1);
+    uint64_t len_to_read_from_store = ((adjusted_end_ofs - adjusted_start_ofs) + 1);
     uint64_t num_blocks = (len_to_read_from_store%max_chunk_size) == 0 ? len_to_read_from_store/max_chunk_size : (len_to_read_from_store/max_chunk_size) + 1;
     perfcounter->inc(l_rgw_d4n_cache_misses, num_blocks);
   }
@@ -2251,27 +2128,37 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
 int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   auto rgw_max_chunk_size = g_conf()->rgw_max_chunk_size;
-  ldpp_dout(dpp, 20) << __func__ << ": bl_ofs is: " << bl_ofs << " bl_len is: " << bl_len << " ofs is: " << ofs << " part_count: " << part_count << dendl;
+  ldpp_dout(dpp, 20) << __func__ << ": bl_ofs is: " << bl_ofs << " bl_len is: " << bl_len << " part_num: " << part_num << dendl;
+  ldpp_dout(dpp, 20) << __func__ << ": start_ofs is: " << start_ofs << " end_ofs is: " << end_ofs << " part_num: " << part_num << dendl;
   if (!last_part && bl.length() <= rgw_max_chunk_size) {
     if (client_cb) {
       int r = 0;
       //ranged request
-      if (bl_ofs != ofs && part_count == 0) {
-        if (ofs < bl_len) { // this can happen in case of multipart where each chunk returned is not always of size rgw_max_chunk_size
-          off_t bl_part_len = bl_len - ofs;
+      if (bl_ofs != start_ofs && part_num == 0) {
+        if (start_ofs < bl_len) { // this can happen in case of multipart where each chunk returned is not always of size rgw_max_chunk_size
+          off_t bl_part_len = bl_len - start_ofs;
           ldpp_dout(dpp, 20) << __func__ << ": bl_part_len is: " << bl_part_len << dendl;
           bufferlist bl_part;
-          bl.begin(ofs).copy(bl_part_len, bl_part);
+          bl.begin(start_ofs).copy(bl_part_len, bl_part);
           ldpp_dout(dpp, 20) << __func__ << ": bl_part.length() is: " << bl_part.length() << dendl;
           r = client_cb->handle_data(bl_part, 0, bl_part_len);
-          part_count += 1;
+          part_num += 1;
+          len_sent += bl_part_len;
         } else {
-          ofs = ofs - bl_len; //re-adjust the offset
-          ldpp_dout(dpp, 20) << __func__ << ": New value ofs is: " << ofs << dendl;
+          start_ofs = start_ofs - bl_len; //re-adjust the offset
+          ldpp_dout(dpp, 20) << __func__ << ": New value ofs is: " << start_ofs << dendl;
         }
+      } else if (part_num == (num_parts - 1) && (len_sent + bl_len) > len) {
+        uint64_t extra = (len_sent + bl_len) - len;
+        uint64_t len_to_send = bl_len - extra;
+        bufferlist bl_part;
+        bl.begin(bl_ofs).copy(len_to_send, bl_part);
+        ldpp_dout(dpp, 20) << __func__ << ": last part bl_part.length() is: " << bl_part.length() << dendl;
+        r = client_cb->handle_data(bl_part, 0, bl_part.length());
       } else {
         r = client_cb->handle_data(bl, bl_ofs, bl_len);
-        part_count += 1;
+        part_num += 1;
+        len_sent += bl_len;
       }
 
       if (r < 0) {
@@ -2290,6 +2177,8 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 
     rgw::d4n::CacheBlock block, existing_block, dest_block;
     rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
+    auto policy = filter->get_policy_driver()->get_cache_policy();
+    auto cache_driver = filter->get_cache_driver();
     block.cacheObj.objName = source->get_key().get_oid();
     block.cacheObj.bucketName = source->get_bucket()->get_bucket_id();
     std::stringstream s;
@@ -2317,16 +2206,16 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
       std::string oid = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len));
-      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
+      if (!policy->exist_key(oid)) {
         block.blockID = adjusted_start_ofs;
         block.size = bl.length();
 
-        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+        auto ret = policy->eviction(dpp, block.size, *y);
         if (ret == 0) {
-          ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
+          ret = cache_driver->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
             std::string objEtag = "";
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
+            policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
 
             /* Store block in directory */
             existing_block.blockID = block.blockID;
@@ -2356,11 +2245,11 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         std::string dest_oid = get_key_in_cache(dest_prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len));
         dest_block.blockID = adjusted_start_ofs;
         dest_block.size = bl.length();
-        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+        auto ret = policy->eviction(dpp, dest_block.size, *y);
         if (ret == 0) {
-          ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
+          ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
+            policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
             if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
               ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB:: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
             }
@@ -2371,12 +2260,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       std::string oid = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len));
       block.blockID = adjusted_start_ofs;
       block.size = bl.length();
-      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
-        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+      if (!policy->exist_key(oid)) {
+        auto ret = policy->eviction(dpp, block.size, *y);
         if (ret == 0) {
-          ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
+          ret = cache_driver->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
+            policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
 
             /* Store block in directory */
             existing_block.blockID = block.blockID;
@@ -2406,11 +2295,11 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         std::string dest_oid = get_key_in_cache(dest_prefix, std::to_string(adjusted_start_ofs), std::to_string(bl_len));
         dest_block.blockID = adjusted_start_ofs;
         dest_block.size = bl.length();
-        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+        auto ret = policy->eviction(dpp, dest_block.size, *y);
         if (ret == 0) {
-          ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
+          ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
+            policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
             if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
               ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
             }
@@ -2428,15 +2317,15 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 
       if (bl_rem.length() == rgw_max_chunk_size) {
         std::string oid = prefix + CACHE_DELIM + std::to_string(adjusted_start_ofs) + CACHE_DELIM + std::to_string(bl_rem.length());
-          if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
+          if (!policy->exist_key(oid)) {
           block.blockID = adjusted_start_ofs;
           block.size = bl_rem.length();
           
-          auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+          auto ret = policy->eviction(dpp, block.size, *y);
           if (ret == 0) {
-            ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
+            ret = cache_driver->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
-              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
+              policy->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
 
               /* Store block in directory */
               existing_block.blockID = block.blockID;
@@ -2469,11 +2358,11 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           std::string dest_oid = dest_prefix + CACHE_DELIM + std::to_string(adjusted_start_ofs) + CACHE_DELIM + std::to_string(bl_rem.length());
           dest_block.blockID = adjusted_start_ofs;
           dest_block.size = bl_rem.length();
-          auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+          auto ret = policy->eviction(dpp, dest_block.size, *y);
           if (ret == 0) {
-            ret = filter->get_cache_driver()->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
+            ret = cache_driver->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
-              filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
+              policy->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
               if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
                 ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
               }
index 65b4f31733d2d673a58d19fec498f28dac7ba30b..8511e03554de3d522f0cdfac5bc7a7483c70a1ea 100644 (file)
@@ -147,14 +147,16 @@ class D4NFilterObject : public FilterObject {
            D4NFilterDriver* filter;
            D4NFilterObject* source;
            RGWGetDataCB* client_cb;
-           int64_t ofs = 0, len = 0;
+           int64_t start_ofs = 0, len = 0, end_ofs = 0;
       int64_t adjusted_start_ofs{0};
+      int64_t adjusted_end_ofs{0};
            bufferlist bl_rem;
            bool last_part{false};
            bool write_to_cache{true};
            const DoutPrefixProvider* dpp;
            optional_yield* y;
-      int part_count{0};
+      int part_num{0}, num_parts{0};
+      int len_sent = 0;
 
          public:
            D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter),
@@ -166,9 +168,11 @@ class D4NFilterObject : public FilterObject {
               this->dpp = dpp;
               this->y = y;
             }
-           void set_ofs(uint64_t ofs) { this->ofs = ofs; }
+           void set_start_ofs(uint64_t ofs) { this->start_ofs = ofs; }
+      void set_len(uint64_t len) { this->len = len; }
       void set_adjusted_start_ofs(uint64_t adjusted_start_ofs) { this->adjusted_start_ofs = adjusted_start_ofs; }
-      void set_part_num(uint64_t part_num) { this->part_count = part_num; }
+      void set_part_num(uint64_t part_num) { this->part_num = part_num; }
+      void set_num_parts(uint64_t num_parts) { this->num_parts = num_parts; }
            int flush_last_part();
            void bypass_cache_write() { this->write_to_cache = false; }
        };
index f43a8f56b202102aa4113814228355ed81bb6fb0..7c9fca4843f693407c8642be1579375ac858fc4c 100755 (executable)
@@ -1537,7 +1537,7 @@ TEST_F(D4NFilterFixture, GetObjectWrite)
     Read_CB cb(&bl);
     std::unique_ptr<rgw::sal::Object::ReadOp> read_op(obj->get_read_op());
     EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0);
-    EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0);
+    EXPECT_EQ(read_op->iterate(env->dpp, 0, (ofs-1), &cb, optional_yield{yield}), 0);
     
     boost::system::error_code ec;
     request req;
@@ -2216,7 +2216,7 @@ TEST_F(D4NFilterFixture, GetVersionedObjectWrite)
       Read_CB cb(&bl);
       std::unique_ptr<rgw::sal::Object::ReadOp> read_op_enabled(objEnabled->get_read_op());
       EXPECT_EQ(read_op_enabled->prepare(optional_yield{yield}, env->dpp), 0);
-      EXPECT_EQ(read_op_enabled->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0);
+      EXPECT_EQ(read_op_enabled->iterate(env->dpp, 0, (ofs - 1), &cb, optional_yield{yield}), 0);
 
       boost::system::error_code ec;
       request req;
@@ -2285,7 +2285,7 @@ TEST_F(D4NFilterFixture, GetVersionedObjectWrite)
       Read_CB cb(&bl);
       std::unique_ptr<rgw::sal::Object::ReadOp> read_op_suspended(objSuspended->get_read_op());
       EXPECT_EQ(read_op_suspended->prepare(optional_yield{yield}, env->dpp), 0);
-      EXPECT_EQ(read_op_suspended->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0);
+      EXPECT_EQ(read_op_suspended->iterate(env->dpp, 0, (ofs - 1), &cb, optional_yield{yield}), 0);
 
       boost::system::error_code ec;
       request req;
@@ -3189,7 +3189,6 @@ TEST_F(D4NFilterFixture, SimpleDeleteBeforeCleaning)
 
     std::unique_ptr<rgw::sal::Object::ReadOp> read_op(objEnabled->get_read_op());
     EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), -2); // Simple read; should return -ENOENT
-    EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), -2);
 
     /* TODO: 
     std::string attr_val;
@@ -3266,7 +3265,7 @@ TEST_F(D4NFilterFixture, VersionedDeleteBeforeCleaning)
 
     std::unique_ptr<rgw::sal::Object::ReadOp> read_op(objEnabled->get_read_op());
     EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0);
-    EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, nullptr, optional_yield{yield}), 0);
+    EXPECT_EQ(read_op->iterate(env->dpp, 0, (ofs - 1), nullptr, optional_yield{yield}), 0);
     EXPECT_EQ(objEnabled->get_instance(), instances[0]); // Next latest version
 
     objEnabled->set_instance(instances[0]);
@@ -3304,7 +3303,7 @@ TEST_F(D4NFilterFixture, VersionedDeleteBeforeCleaning)
       next->set_instance("");
       std::unique_ptr<rgw::sal::Object::ReadOp> read_op(next->get_read_op());
       EXPECT_EQ(read_op->prepare(optional_yield{yield}, env->dpp), 0);
-      EXPECT_EQ(read_op->iterate(env->dpp, 0, ofs, &cb, optional_yield{yield}), 0);
+      EXPECT_EQ(read_op->iterate(env->dpp, 0, (ofs - 1), &cb, optional_yield{yield}), 0);
       EXPECT_EQ(next->get_instance(), instances[0]); // Next latest version
     }