]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: squashing the following commits that fix s3 test
authorPritha Srivastava <prsrivas@redhat.com>
Mon, 10 Jun 2024 07:02:42 +0000 (12:32 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
failures.

1. rgw/d4n: fixing logical errors for ranged requests in iterate(), aligning
start chunk/block to beginning of the chunk/block boundary.
2. cache head object in write path in case write-cache
is disabled - for maintaining cache consistensy
3. delete dirty blocks of older versions of a non-versioned
bucket as dirty blocks are not evicted, and older versions won't
be written to the disk also.
4. remove dirty objects from the dirty objects data structure,
in case the bucket it belongs to has been deleted.
5. rgw/d4n: fixing compilation issue after rebasing with main.
6. Handle multipart objects correctly in copy object
7. Fix more ranged request errors.
8. handling part_num correctly for multipart and non-multipart objects
9. handling obect name starting with '_'.
10. bug in range request for sending back data to the client, after reading from
backend store.
11. rgw/d4n: code to populate RGW_ATTR_CRYPT_PARTS attr
in d4n filter.
12. rgw/d4n: fix for 'if-match' and 'if-nomatch' for
put object and copy object.
13. rgw/cache: Account for slashes in object name
14. d4n/filter: Handle request headers during read if `next->prepare` isn't called
15. d4n/filter: Fix overwriting of object size and instance
16. rgw/d4n: Supply attrs to `calculate_version`
17. rgw/d4n: Handle ACL and etag in copy_object
18. rgw/filter: Ensure cksum values are maintained
19. rgw/d4n: Set parts_count correctly and check for bufferlist length in calculate_version
20. rgw/d4n: adding a case for iterate method, where last
part to read is smaller than chunk size.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_op.cc
src/rgw/rgw_sal_filter.cc
src/rgw/rgw_ssd_driver.cc

index 686f94c6aa0eb5d51a2fff95cb670a04c3345d60..81d879ce56c854f4354c86315c25958d58e4c414 100644 (file)
@@ -53,8 +53,10 @@ int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_
   response<int, int, int, int> resp;
 
   driver = _driver;
-  tc = std::thread(&CachePolicy::cleaning, this, dpp);
-  tc.detach();
+  if (dpp->get_cct()->_conf->d4n_writecache_enabled) {
+    tc = std::thread(&CachePolicy::cleaning, this, dpp);
+    tc.detach();
+  }
 
   try {
     boost::system::error_code ec;
@@ -448,26 +450,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
     }
     ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl;
+    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->version=" << e->version << dendl;
     l.unlock();
     if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
-      name = e->key;
       rgw_user c_rgw_user = e->user;
-
-      size_t pos = 0;
-      std::string delimiter = "_";
-      int count = 0;
-      while ((pos = name.find(delimiter)) != std::string::npos) {
-        if (count == 0) {
-          b_name = name.substr(0, pos);
-          ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): b_name=" << b_name << dendl;
-              name.erase(0, pos + delimiter.length());
-          ldpp_dout(dpp, 10) << __LINE__ <<  " " << __func__ << "(): name=" << name << dendl;
-          break;
-        }
-        count++;
-        ldpp_dout(dpp, 10) << __LINE__ <<  " " << __func__ << "(): count=" << b_name << dendl;
-      }
-      key = name;
       //writing data to the backend
       //we need to create an atomic_writer
       std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
@@ -481,7 +467,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
       int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
-        break;
+        //delete the object from the cache and update all internal metadata?
+        //TODO: clean up all dirty blocks from cache and update metadata
+        eraseObj(dpp, e->key, null_yield);
+        continue;
       }
 
       std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(e->obj_key);
@@ -500,10 +489,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
       int op_ret = processor->prepare(null_yield);
       if (op_ret < 0) {
        ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
-       break;
+        eraseObj(dpp, e->key, null_yield);
       }
 
-      std::string prefix = b_name + "_" + e->version + "_" + c_obj->get_name();
+      std::string prefix = e->bucket_name + "_" + e->version + "_" + c_obj->get_name();
       off_t lst = e->size;
       off_t fst = 0;
       off_t ofs = 0;
@@ -543,7 +532,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
         if (op_ret < 0) {
          ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
          << op_ret << dendl;
-         return;
+          eraseObj(dpp, e->key, null_yield);
         }
 
         ofs += len;
@@ -558,6 +547,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
                               nullptr, nullptr, nullptr,
                               rctx, rgw::sal::FLAG_LOG_OP);
 
+      if (op_ret < 0) {
+        ldpp_dout(dpp, 20) << __func__ << "processor->complete() returned ret=" << op_ret << dendl;
+        eraseObj(dpp, e->key, null_yield);
+      }
       //invoke update() with dirty flag set to false, to update in-memory metadata for each block
       // reset values
       lst = e->size;
@@ -586,7 +579,6 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
         op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
         if (op_ret < 0) {
          ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
-         return;
         }
         fst += cur_len;
       } while(fst < lst);
@@ -609,7 +601,6 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
           op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
           if (op_ret < 0) {
               ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
-              //return;
           }
         }
       } else { //non-versioned case
@@ -617,7 +608,6 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
         op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
         if (op_ret < 0) {
             ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
-            //return;
         }
       }
       //In case of distributed cache, we may have to update this for every instance
@@ -630,14 +620,12 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
         op_ret = blockDir->update_field(dpp, &instance_block, "dirty", "false", null_yield);
         if (op_ret < 0) {
             ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
-            //return;
         }
       }
 
       op_ret = blockDir->update_field(dpp, &block, "dirty", "false", null_yield);
       if (op_ret < 0) {
          ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory for head failed, ret=" << op_ret << dendl;
-         return;
       }
 
       //remove entry from map and queue, eraseObj locks correctly
index 9ff07b05e2c1fdf9490cc170e8bf8c9e09d3bc9c..88a7578f43b675c1172d26197fafaf8e5b7d3243 100644 (file)
 #include "rgw_sal_d4n.h"
 #include "rgw_cache_driver.h"
 
-namespace rgw::sal {
-  class D4NFilterObject;
-}
-
 namespace rgw { namespace d4n {
 
 namespace asio = boost::asio;
index a09a3a8cb29c81b38eaf84f5d0ddccae4c37d284..5ad6b0c3a880e1389399f3d4f54f0ddbbbf1e535 100644 (file)
@@ -115,6 +115,17 @@ int D4NFilterBucket::create(const DoutPrefixProvider* dpp,
   return next->create(dpp, params, y);
 }
 
+std::unique_ptr<MultipartUpload> D4NFilterBucket::get_multipart_upload(
+                                 const std::string& oid,
+                                 std::optional<std::string> upload_id,
+                                 ACLOwner owner, ceph::real_time mtime)
+{
+  std::unique_ptr<MultipartUpload> nmu =
+    next->get_multipart_upload(oid, upload_id, owner, mtime);
+
+  return std::make_unique<D4NFilterMultipartUpload>(std::move(nmu), this, this->filter);
+}
+
 int D4NFilterObject::copy_object(const ACLOwner& owner,
                               const rgw_user& remote_user,
                               req_info* info,
@@ -144,57 +155,99 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
                               const DoutPrefixProvider* dpp,
                               optional_yield y)
 {
-  if (g_conf()->d4n_writecache_enabled) {
-    this->dest_object = dest_object;
-    this->dest_bucket = dest_bucket;
+  bool write_to_cache = g_conf()->d4n_writecache_enabled;
+  bool dirty{false};
+  std::unique_ptr<rgw::sal::Object::ReadOp> read_op(this->get_read_op());
+  read_op->params.mod_ptr = mod_ptr;
+  read_op->params.unmod_ptr = unmod_ptr;
+  read_op->params.high_precision_time = high_precision_time;
+  read_op->params.if_match = if_match;
+  read_op->params.if_nomatch = if_nomatch;
+  if (auto ret = read_op->prepare(y, dpp); ret < 0) {
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): prepare method failed with ret: " << ret << dendl;
+    if (ret == -ERR_NOT_MODIFIED) {
+      ret = ERR_PRECONDITION_FAILED;
+    }
+    return ret;
+  }
+
+  ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): is_multipart: " << is_multipart() << dendl;
+  //for multipart objects or for read only cache, write to backend store
+  if (is_multipart() || !write_to_cache) {
+    write_to_cache = false;
+    auto ret = next->copy_object(owner, remote_user, info, source_zone,
+                           nextObject(dest_object),
+                           nextBucket(dest_bucket),
+                           nextBucket(src_bucket),
+                           dest_placement, src_mtime, mtime,
+                           mod_ptr, unmod_ptr, high_precision_time, if_match,
+                           if_nomatch, attrs_mod, copy_if_newer, attrs,
+                           category, olh_epoch, delete_at, version_id, tag,
+                           etag, progress_cb, progress_data, dpp, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->copy_object failed with ret: " << ret << dendl;
+      return ret;
+    }
+  }
+
+  this->dest_object = dest_object;
+  this->dest_bucket = dest_bucket;
+  D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(dest_object);
+
+  rgw::sal::Attrs baseAttrs;
+  //ATTRSMOD_NONE - the attributes of the source object will be copied without modifications, attrs parameter is ignored
+  if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
+    baseAttrs = this->get_attrs();
+    baseAttrs.erase("user.rgw.version_id"); //delete source version id
+    if (version_id) {
+      bufferlist bl_val;
+      bl_val.append(*version_id);
+      baseAttrs["user.rgw.version_id"] = std::move(bl_val); //populate destination version id
+    }
+  }
 
+  //ATTRSMOD_MERGE - any conflicting meta keys on the source object's attributes are overwritten by values contained in attrs parameter.
+  if (attrs_mod == rgw::sal::ATTRSMOD_MERGE) { /* Merge */
+    rgw::sal::Attrs::iterator iter;
+
+    for (const auto& pair : attrs) {
+      iter = baseAttrs.find(pair.first);
+
+      if (iter != baseAttrs.end()) {
+        iter->second = pair.second;
+      } else {
+        baseAttrs.insert({pair.first, pair.second});
+      }
+    }
+  } else if (attrs_mod == rgw::sal::ATTRSMOD_REPLACE) { /* Replace */
+    //ATTRSMOD_REPLACE - new object will have the attributes provided by attrs parameter, source object attributes are not copied;
+    baseAttrs.insert(attrs.begin(), attrs.end());
+  }
+
+
+  time_t creationTime = -1;
+  std::string dest_version;
+  if (write_to_cache) {
+    dirty = true;
     if (!dest_object->have_instance()) {
       if (dest_object->get_bucket()->versioned() && !dest_object->get_bucket()->versioning_enabled()) { //if versioning is suspended
-        this->dest_version = "null";
+        dest_version = "null";
       } else {
         constexpr uint32_t OBJ_INSTANCE_LEN = 32;
         char buf[OBJ_INSTANCE_LEN + 1];
         gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
-        this->dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
+        dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): generating version: " << version << dendl;
       }
     } else {
-      this->dest_version = dest_object->get_instance();
-    }
-
-    std::unique_ptr<rgw::sal::Object::ReadOp> read_op(this->get_read_op());
-    if (auto ret = read_op->prepare(y, dpp); ret < 0) {
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): prepare method failed with ret: " << ret << dendl;
-      return ret;
+      dest_version = dest_object->get_instance();
     }
+    d4n_dest_object->set_object_version(dest_version);
     if (auto ret = read_op->iterate(dpp, 0, (this->get_size() - 1), nullptr, y); ret < 0) {
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): iterate method failed with ret: " << ret << dendl;
       return ret;
     }
 
-    rgw::sal::Attrs baseAttrs;
-    if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
-      baseAttrs = attrs;
-    } else {
-      baseAttrs = this->get_attrs();
-    }
-
-    if (attrs_mod == rgw::sal::ATTRSMOD_REPLACE) { /* Replace */
-      rgw::sal::Attrs::iterator iter;
-
-      for (const auto& pair : attrs) {
-        iter = baseAttrs.find(pair.first);
-      
-        if (iter != baseAttrs.end()) {
-          iter->second = pair.second;
-        } else {
-          baseAttrs.insert({pair.first, pair.second});
-        }
-      }
-    } else if (attrs_mod == rgw::sal::ATTRSMOD_MERGE) { /* Merge */
-      baseAttrs.insert(attrs.begin(), attrs.end()); 
-    }
-  
     ceph::real_time dest_mtime;
     if (mtime) {
       if (real_clock::is_zero(*mtime)) {
@@ -204,40 +257,68 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
     } else {
       dest_mtime = real_clock::now();
     }
+    creationTime = ceph::real_clock::to_time_t(dest_mtime);
     dest_object->set_mtime(dest_mtime);
     dest_object->set_obj_size(this->get_size());
-    dest_object->set_accounted_size(this->get_size());
+    dest_object->set_accounted_size(this->get_accounted_size());
     ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " size is: " << dest_object->get_size() << dendl;
-    D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(dest_object);
-    d4n_dest_object->set_obj_state_attrs(dpp, y, baseAttrs);
-    bufferlist bl_data;
-    std::string key = dest_bucket->get_name() + "_" + this->dest_version + "_" + dest_object->get_name();
-    std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
-    auto ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
+    d4n_dest_object->set_attrs_from_obj_state(dpp, y, baseAttrs);
+  } else {
+    auto o_attrs = baseAttrs; 
+    dest_object->load_obj_state(dpp, y);
+    baseAttrs = dest_object->get_attrs();
+    d4n_dest_object->set_attrs_from_obj_state(dpp, y, baseAttrs);
+    d4n_dest_object->calculate_version(dpp, y, dest_version, o_attrs);
+    if (dest_version.empty()) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+    }
+  }
+  bufferlist bl_val;
+  bl_val.append(std::to_string(this->is_multipart()));
+  baseAttrs["user.rgw.multipart"] = std::move(bl_val);
+  bl_val.append(*etag);
+  baseAttrs[RGW_ATTR_ETAG] = std::move(bl_val);
+  baseAttrs[RGW_ATTR_ACL] = std::move(attrs[RGW_ATTR_ACL]);
+
+  bufferlist bl_data;
+  dest_version = d4n_dest_object->get_object_version();
+
+  std::string key = dest_bucket->get_name() + "_" + dest_version + "_" + dest_object->get_name();
+  std::string head_oid_in_cache;
+  if (dirty) {
+    head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+  } else {
+    head_oid_in_cache = key;
+  }
+  auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, baseAttrs.size(), y);
+  if (ret == 0) {
+    ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
     baseAttrs.erase("user.rgw.mtime");
     baseAttrs.erase("user.rgw.object_size");
     baseAttrs.erase("user.rgw.accounted_size");
     baseAttrs.erase("user.rgw.epoch");
+    baseAttrs.erase("user.rgw.multipart");
     if (ret == 0) {
-      time_t creationTime = ceph::real_clock::to_time_t(dest_mtime);
-      dest_object->set_attrs(baseAttrs);
-      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->dest_version << dendl;
+      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << dest_version << dendl;
       bufferlist bl;
-      driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), this->dest_version, true, y);
-      d4n_dest_object->set_object_version(this->dest_version);
-      ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, true);
+      driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), dest_version, dirty, y);
+      d4n_dest_object->set_object_version(dest_version);
+      ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, dirty);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
         return ret;
       }
-      driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, this->dest_version, true, dest_object->get_accounted_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_key(), y);
+      if (dirty) {
+        std::string object_key = dest_object->get_bucket()->get_name() + "_" + dest_object->get_oid();
+        driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, object_key, dest_version, true, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_key(), y);
+      }
 
       //write object to directory.
       rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
           .objName = dest_object->get_oid(),
           .bucketName = dest_object->get_bucket()->get_name(),
           .creationTime = std::to_string(creationTime),
-          .dirty = true,
+          .dirty = dirty,
           .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
       };
       ret = driver->get_obj_dir()->set(dpp, &object, y);
@@ -246,21 +327,8 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
         return ret;
       }
     }
-  } else {
-    auto ret = next->copy_object(owner, remote_user, info, source_zone,
-                           nextObject(dest_object),
-                           nextBucket(dest_bucket),
-                           nextBucket(src_bucket),
-                           dest_placement, src_mtime, mtime,
-                           mod_ptr, unmod_ptr, high_precision_time, if_match,
-                           if_nomatch, attrs_mod, copy_if_newer, attrs,
-                           category, olh_epoch, delete_at, version_id, tag,
-                           etag, progress_cb, progress_data, dpp, y);
-    if (ret < 0) {
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->copy_object failed with ret: " << ret << dendl;
-      return ret;
-    }
   }
+
   return 0;
 }
 
@@ -336,7 +404,7 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt
       if (attr.second.length() > 0) {
         if (attr.first == "user.rgw.mtime") {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
-          auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
+          auto mtime = ceph::real_clock::from_double(std::stod(attr.second.to_str()));
           this->set_mtime(mtime);
         } else if (attr.first == "user.rgw.object_size") {
           auto size = std::stoull(attr.second.to_str());
@@ -347,15 +415,19 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt
           this->set_accounted_size(accounted_size);
         } else if (attr.first == "user.rgw.epoch") {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
-          auto epoch = std::stoull(attr.second.c_str());
+          auto epoch = std::stoull(attr.second.to_str());
           this->set_epoch(epoch);
         } else if (attr.first == "user.rgw.version_id") {
           instance = attr.second.to_str();
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl;
         } else if (attr.first == "user.rgw.source_zone") {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
-          auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
+          auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.to_str()));
           this->set_short_zone_id(short_zone_id);
+        } else if (attr.first == "user.rgw.multipart") {
+          std::string multipart = attr.second.to_str();
+          this->multipart = (multipart == "1") ? true : false;
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): is_multipart: " << this->multipart << " multipart: " << multipart << dendl;
         } else {
           ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
         }
@@ -366,6 +438,7 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt
     attrs.erase("user.rgw.object_size");
     attrs.erase("user.rgw.accounted_size");
     attrs.erase("user.rgw.epoch");
+    attrs.erase("user.rgw.multipart");
     /* Set attributes locally */
     auto ret = this->set_attrs(attrs);
     if (ret < 0) {
@@ -376,7 +449,34 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt
   return found_in_cache;
 }
 
-void D4NFilterObject::set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
+int D4NFilterObject::set_attr_crypt_parts(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
+{
+  if (attrs.count(RGW_ATTR_CRYPT_MODE)) {
+    std::vector<size_t> parts_len;
+    uint64_t obj_size = this->get_size();
+    uint64_t obj_max_req_size = dpp->get_cct()->_conf->rgw_get_obj_max_req_size;
+    uint64_t num_parts = (obj_size%obj_max_req_size) == 0 ? obj_size/obj_max_req_size : (obj_size/obj_max_req_size) + 1;
+    size_t remainder_size = obj_size;
+    for (uint64_t part = 0; part < num_parts; part++) {
+      size_t part_len;
+      if (part == (num_parts - 1)) { //last part
+        part_len = remainder_size;
+      } else {
+        part_len = obj_max_req_size;
+      }
+      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): part_num: " << part << " part_len: " << part_len << dendl;
+      parts_len.emplace_back(part_len);
+      remainder_size -= part_len;
+    }
+
+    bufferlist parts_bl;
+    ceph::encode(parts_len, parts_bl);
+    attrs[RGW_ATTR_CRYPT_PARTS] = std::move(parts_bl);
+  }
+  return 0;
+}
+
+void D4NFilterObject::set_attrs_from_obj_state(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
 {
   bufferlist bl_val;
   bl_val.append(std::to_string(this->get_size()));
@@ -402,20 +502,18 @@ void D4NFilterObject::set_obj_state_attrs(const DoutPrefixProvider* dpp, optiona
   return;
 }
 
-int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version)
+int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, rgw::sal::Attrs& attrs)
 {
   //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
   if (! this->have_instance() && version.empty()) {
-    bufferlist bl;
-     if (this->get_attr(RGW_ATTR_ID_TAG, bl)) {
+    bufferlist bl = attrs[RGW_ATTR_ID_TAG];
+    if (bl.length()) {
       version = bl.c_str();
-      ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl;
-    } else {
-      ldpp_dout(dpp, 0) << __func__ << " Failed to find id tag" << dendl;
-      return -ENOENT;
+      if (!version.empty()) {
+       ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl;
+      }
     }
   }
-  bufferlist bl;
   if (this->have_instance()) {
     version = this->get_instance();
   }
@@ -432,8 +530,14 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio
   int ret = -1;
   rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
   if (is_latest_version) {
+    std::string objName = this->get_name();
+    // special handling for name starting with '_'
+    if (objName[0] == '_') {
+      objName = "_" + this->get_name();
+    }
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): objName after special Handling: " << objName << dendl;
     rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
-      .objName = this->get_name(),
+      .objName = objName,
       .bucketName = this->get_bucket()->get_name(),
       .dirty = dirty,
       .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
@@ -511,8 +615,98 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio
   return ret;
 }
 
+int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty)
+{
+  rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
+
+  //update data block entries in directory
+  off_t lst = this->get_size();
+  ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Object size =" << lst << dendl;
+  off_t fst = 0;
+  do {
+    rgw::d4n::CacheBlock block, existing_block;
+    if (fst >= lst){
+      break;
+    }
+    off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+    off_t cur_len = cur_size - fst;
+    block.cacheObj.bucketName = this->get_bucket()->get_name();
+    block.cacheObj.objName = this->get_key().get_oid();
+    block.cacheObj.dirty = dirty;
+    block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+    existing_block.cacheObj.objName = block.cacheObj.objName;
+    existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
+
+    block.size = cur_len;
+    block.blockID = fst;
+    block.version = version;
+
+    /* Store block in directory */
+    existing_block.blockID = block.blockID;
+    existing_block.size = block.size;
+
+    int ret;
+    if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) {
+      if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
+        block = existing_block;
+        block.version = version;
+      }
+
+      block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
+      if ((ret = blockDir->set(dpp, &block, y)) < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
+        return ret;
+      }
+    } else {
+      ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
+      return ret;
+    }
+    fst += cur_len;
+  } while(fst < lst);
+
+  return 0;
+}
+
+int D4NFilterObject::delete_data_block_cache_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty)
+{
+  //delete cache entries
+  off_t lst = this->get_size();
+  ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Object size =" << lst << dendl;
+  off_t fst = 0;
+  do {
+    if (fst >= lst){
+      break;
+    }
+    off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+    off_t cur_len = cur_size - fst;
+
+    std::string key = get_bucket()->get_name() + "_" + version + "_" + get_name() + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+    std::string key_in_cache;
+    if (dirty) {
+      key_in_cache = "D_" + key;
+    } else {
+      key_in_cache = key;
+    }
+    int ret;
+    if ((ret = driver->get_cache_driver()->delete_data(dpp, key_in_cache, y)) == 0) {
+           if (!(ret = driver->get_policy_driver()->get_cache_policy()->erase(dpp, key, y))) {
+             ldpp_dout(dpp, 0) << "Failed to delete policy entry for: " << key << ", ret=" << ret << dendl;
+             return ret;
+           }
+         } else {
+      ldpp_dout(dpp, 0) << "Failed to delete cache entry for: " << key_in_cache << ", ret=" << ret << dendl;
+           return ret;
+    }
+    fst += cur_len;
+  } while(fst < lst);
+
+  return 0;
+}
+
 bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, rgw::d4n::CacheBlock& blk, optional_yield y)
 {
+  ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): this->get_oid(): " << this->get_oid() << dendl;
   rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
   rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
         .objName = this->get_oid(), //version-enabled buckets will not have version for latest version, so this will work even when version is not provided in input
@@ -600,13 +794,13 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d
     }
     ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): this->obj oid is: " << this->obj.key.name << "instance is: " << this->obj.key.instance << dendl;
     attrs = this->get_attrs();
-    this->set_obj_state_attrs(dpp, y, attrs);
+    this->set_attrs_from_obj_state(dpp, y, attrs);
 
-    ret = calculate_version(dpp, y, version);
-    if (ret < 0 || version.empty()) {
+    calculate_version(dpp, y, version, attrs);
+    if (version.empty()) {
       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
     }
-
+    std::string objName = this->get_name();
     head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name();
     if (this->driver->get_policy_driver()->get_cache_policy()->exist_key(head_oid_in_cache) > 0) {
       ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
@@ -776,22 +970,18 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed, ret=" << ret << dendl;
       return ret;
     }
-    if (params.part_num) {
-      params.parts_count = next->params.parts_count;
-      if (params.parts_count > 1) {
-        ldpp_dout(dpp, 20) << __func__ << "params.part_count: " << params.parts_count << dendl;
-        return 0; // d4n wont handle multipart read requests with part number for now
-      }
-    }
+
+    params.parts_count = next->params.parts_count;
     this->source->load_obj_state(dpp, y);
     attrs = source->get_attrs();
-    source->set_obj_state_attrs(dpp, y, attrs);
-
-    ret = source->calculate_version(dpp, y, version);
-    if (ret < 0 || version.empty()) {
+    source->set_attrs_from_obj_state(dpp, y, attrs);
+    source->calculate_version(dpp, y, version, attrs);
+    if (version.empty()) {
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
     }
 
+    this->source->set_attr_crypt_parts(dpp, y, attrs);
+
     bufferlist bl;
     head_oid_in_cache = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name();
     ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
@@ -824,6 +1014,73 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): failed to cache head object during eviction, ret=" << ret << dendl;
     }
   } else {
+    /* 
+      The following if statement handles the following:
+      1. When part_num is given: if it is anything other than 1 and if source is not multipart, then return error
+      2. When part_num is 0 and source is multipart
+      In both the cases the head is fetched from the backend store.
+    */
+    if (params.part_num || (!params.part_num && source->is_multipart())) {
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): source->is_multipart()= " << source->is_multipart() << dendl;
+      if (params.part_num) { 
+       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): *(params.part_num)= " << *(params.part_num) << dendl;
+      }
+      if (!source->is_multipart()) {
+        if (params.part_num && *(params.part_num) != 1) {
+          return -ERR_INVALID_PART;
+        }
+      } else {
+        next->params = params;
+        auto ret = next->prepare(y, dpp);
+        if (ret < 0) {
+          ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): next->prepare failed, ret=" << ret << dendl;
+          return ret;
+        }
+        params.parts_count = next->params.parts_count;
+        return 0;
+      }
+    }
+    bufferlist etag_bl;
+    if (get_attr(dpp, RGW_ATTR_ETAG, etag_bl, y) < 0) {
+      return -EINVAL;
+    }
+
+    if (params.mod_ptr || params.unmod_ptr) {
+      if (params.mod_ptr && !params.if_nomatch) {
+       ldpp_dout(dpp, 10) << "If-Modified-Since: " << *params.mod_ptr << " Last-Modified: " << source->get_mtime() << dendl;
+       if (!(*params.mod_ptr < source->get_mtime())) {
+         return -ERR_NOT_MODIFIED;
+       }
+      }
+
+      if (params.unmod_ptr && !params.if_match) {
+       ldpp_dout(dpp, 10) << "If-Modified-Since: " << *params.unmod_ptr << " Last-Modified: " << source->get_mtime() << dendl;
+       if (*params.unmod_ptr < source->get_mtime()) {
+         return -ERR_PRECONDITION_FAILED;
+       }
+      }
+    }
+
+    if (params.if_match) {
+      std::string if_match_str = rgw_string_unquote(params.if_match);
+      ldpp_dout(dpp, 10) << "If-Match: " << if_match_str << " ETAG: " << etag_bl.c_str() << dendl;
+
+      if (if_match_str.compare(0, etag_bl.length(), etag_bl.c_str(), etag_bl.length()) != 0) {
+       return -ERR_PRECONDITION_FAILED;
+      }
+    }
+    if (params.if_nomatch) {
+      std::string if_nomatch_str = rgw_string_unquote(params.if_nomatch);
+      ldpp_dout(dpp, 10) << "If-No-Match: " << if_nomatch_str << " ETAG: " << etag_bl.c_str() << dendl;
+      if (if_nomatch_str.compare(0, etag_bl.length(), etag_bl.c_str(), etag_bl.length()) == 0) {
+       return -ERR_NOT_MODIFIED;
+      }
+    }
+
+    if (params.lastmod) {
+      *params.lastmod = source->get_mtime();
+    }
+
     if(perfcounter) {
       perfcounter->inc(l_rgw_d4n_cache_hits);
     }
@@ -900,6 +1157,8 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
       source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y);
       if (source->dest_object && source->dest_bucket) {
+        D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+        std::string dest_version = d4n_dest_object->get_object_version();
         rgw::d4n::CacheBlock dest_block;
         dest_block.cacheObj.objName = source->dest_object->get_oid();
         dest_block.cacheObj.bucketName = source->dest_bucket->get_name();
@@ -907,18 +1166,18 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
         dest_block.blockID = ofs;
         dest_block.size = len;
         dest_block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-        dest_block.version = source->dest_version;
+        dest_block.version = dest_version;
         dest_block.cacheObj.dirty = true;
-        std::string key = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name() +
+        std::string key = source->dest_bucket->get_name() + "_" + dest_version + "_" + source->dest_object->get_name() +
                                         "_" + std::to_string(ofs) + "_" + std::to_string(len);
         std::string dest_oid_in_cache = "D_" + key;
         auto ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, y);
         if (ret == 0) {
           rgw::sal::Attrs attrs;
-          ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " destination object version in update method is: " << source->dest_version << dendl;
+          ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " destination object version in update method is: " << dest_version << dendl;
           ret = source->driver->get_cache_driver()->put(dpp, dest_oid_in_cache, bl, bl.length(), attrs, y);
           if (ret == 0) {
-            source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), source->dest_version, true, y);
+            source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), dest_version, true, y);
           }
           if (ret = source->driver->get_block_dir()->set(dpp, &dest_block, y); ret < 0){
             ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
@@ -974,238 +1233,301 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache 
   uint64_t cost = 0, len_to_read = 0, part_len = 0;
 
-  aio = rgw::make_throttle(window_size, y);
-
-  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << 
-  " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
-
-  this->offset = ofs;
-
-  rgw::d4n::CacheBlock block;
-  block.cacheObj.objName = source->get_key().get_oid();
-  block.cacheObj.bucketName = source->get_bucket()->get_name();
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
 
-  do {
-    uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
-    if (start_part_num == (num_parts - 1)) {
-      len_to_read = len;
-      part_len = len;
-      cost = len;
-    } else {
-      len_to_read = obj_max_req_size;
-      cost = obj_max_req_size;
-      part_len = obj_max_req_size;
-    }
-    if (start_part_num == 0) {
-      len_to_read -= diff_ofs;
-      id += diff_ofs;
-      read_ofs = diff_ofs;
-    }
+  if ((params.part_num && !source->is_multipart()) || !params.part_num) {
+    aio = rgw::make_throttle(window_size, y);
 
-    block.blockID = adjusted_start_ofs;
-    block.size = part_len; 
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << " num_parts " << num_parts << dendl;
 
-    ceph::bufferlist bl;
-    std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+    this->offset = ofs;
 
-    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
-    " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+    rgw::d4n::CacheBlock block;
+    block.cacheObj.objName = source->get_key().get_oid();
+    block.cacheObj.bucketName = source->get_bucket()->get_name();
 
-    int ret;
-    if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { 
-      auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+    do {
+      uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
+      if (start_part_num == (num_parts - 1)) {
+        len_to_read = len;
+        part_len = len;
+        cost = len;
+      } else {
+        len_to_read = obj_max_req_size;
+        cost = obj_max_req_size;
+        part_len = obj_max_req_size;
+      }
+      if (start_part_num == 0) {
+        len_to_read -= diff_ofs;
+        id += diff_ofs;
+        read_ofs = diff_ofs;
+      }
 
-      if (it != block.cacheObj.hostsList.end()) { /* Local copy */
-       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
-        std::string key = oid_in_cache;
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.cacheObj.dirty << dendl;
+      block.blockID = adjusted_start_ofs;
+      block.size = part_len;
 
-        if (block.cacheObj.dirty == true) { 
-          key = "D_" + oid_in_cache; // we keep track of dirty data in the cache for the metadata failure case
-          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl;
-        }
+      ceph::bufferlist bl;
+      std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
 
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
+      " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-       if (block.version == version) {
-         if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { 
-           // Read From Cache
-           auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id); 
+      int ret;
+      if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+        auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
 
-           this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
+        if (it != block.cacheObj.hostsList.end()) { /* Local copy */
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
+          std::string key = oid_in_cache;
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.cacheObj.dirty << dendl;
 
-           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-           auto r = flush(dpp, std::move(completed), y);
+          if (block.cacheObj.dirty == true) {
+            key = "D_" + oid_in_cache; // we keep track of dirty data in the cache for the metadata failure case
+            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl;
+          }
 
-           if (r < 0) {
-             drain(dpp, y);
-             ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
-             return r;
-           }
-         // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) 
-         } else {
-           int r = -1;
-           if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
-             ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl;
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
 
-           if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
-             ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
-             // TODO: Retrieve remotely
-             // Policy decision: should we cache remote blocks locally?
-           } else {
-             ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+    if (block.version == version) {
+      if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+        // Read From Cache
+        auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
 
-             auto r = drain(dpp, y);
+        this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
 
-             if (r < 0) {
-               ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-               return r;
-             }
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+        auto r = flush(dpp, std::move(completed), y);
 
-             break;
-            }
-         }
-       // if (block.version == version)
-       } else {
-         // TODO: If data has already been returned for any older versioned block, then return â€˜retry’ error, else
+        if (r < 0) {
+          drain(dpp, y);
+          ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+          return r;
+        }
+      // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) 
+      } else {
+        int r = -1;
+        if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+          ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl;
+
+        if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+          // TODO: Retrieve remotely
+          // Policy decision: should we cache remote blocks locally?
+        } else {
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
-         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+          auto r = drain(dpp, y);
 
-         auto r = drain(dpp, y);
+          if (r < 0) {
+      ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+      return r;
+          }
 
-         if (r < 0) {
-           ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-           return r;
-         }
-       }
-      // if (it != block.cacheObj.hostsList.end())
-      } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
-       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
-       // TODO: Retrieve remotely
-       // Policy decision: should we cache remote blocks locally?
+          break;
+              }
       }
-    // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)  
-    } else if (ret == -ENOENT) { 
-      block.blockID = adjusted_start_ofs;
-      block.size = obj_max_req_size;
-
-      if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { 
-       auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
-       if (it != block.cacheObj.hostsList.end()) { /* Local copy */
-         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
+    // if (block.version == version)
+    } else {
+      // TODO: If data has already been returned for any older versioned block, then return â€˜retry’ error, else
 
-         if (block.version == version) {
-           oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
-           std::string key = oid_in_cache;
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
-           //for ranged requests, for last part, the whole part might exist in the cache
-           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << 
-             " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+      auto r = drain(dpp, y);
 
-           if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
-             // Read From Cache
-             if (block.cacheObj.dirty == true){ 
-               key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
-             }
+      if (r < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+        return r;
+      }
+      break;
+    }
+        // if (it != block.cacheObj.hostsList.end())
+        } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
+    // TODO: Retrieve remotely
+    // Policy decision: should we cache remote blocks locally?
+        }
+      // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) 
+      } else if (ret == -ENOENT) {
+        block.blockID = adjusted_start_ofs;
+        uint64_t obj_size = source->get_size(), chunk_size = 0;
+        if (obj_size < obj_max_req_size) {
+          chunk_size = obj_size;
+        } else {
+          chunk_size = obj_max_req_size;
+        }
+        block.size = chunk_size;
 
-             ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
-             ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
+        if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+    auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
 
-             auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);  
+    if (it != block.cacheObj.hostsList.end()) { /* Local copy */
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
 
-             this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
+      if (block.version == version) {
+        oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(chunk_size);
+        std::string key = oid_in_cache;
 
-             ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-             auto r = flush(dpp, std::move(completed), y);
+        //for range requests, for last part, the whole part might exist in the cache
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
+          " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-             if (r < 0) {
-               drain(dpp, y);
-               ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
-               return r;
-             }
-           // if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
-           } else {
-             int r = -1;
-             if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
-               ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+        if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+          // Read From Cache
+          if (block.cacheObj.dirty == true){
+      key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+          }
 
-             if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
-               ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
-               // TODO: Retrieve remotely
-               // Policy decision: should we cache remote blocks locally?
-             } else {
-               ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
 
-               auto r = drain(dpp, y);
+          auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
 
-               if (r < 0) {
-                 ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-                 return r;
-               }
+          this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, chunk_size)));
 
-               break;
-             }
-           }
-         // if (it != block.cacheObj.hostsList.end())
-         } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
-           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
-           // TODO: Retrieve remotely
-           // Policy decision: should we cache remote blocks locally?
-         }
-       // if (block.version == version)
-       } else {
-         // TODO: If data has already been returned for any older versioned block, then return â€˜retry’ error, else
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+          auto r = flush(dpp, std::move(completed), y);
 
-         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+          if (r < 0) {
+      drain(dpp, y);
+      ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+      return r;
+          }
+        // if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
+        } else {
+          int r = -1;
+          if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+      ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+
+          if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+      // TODO: Retrieve remotely
+      // Policy decision: should we cache remote blocks locally?
+          } else {
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
-         auto r = drain(dpp, y);
+      auto r = drain(dpp, y);
 
-         if (r < 0) {
-           ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-           return r;
-         }
-       }
-      // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)  
-      } else if (ret == -ENOENT) { /* Fetch from backend */
-       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
-       auto r = drain(dpp, y);
-       if (r < 0) {
-         ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-         return r;
-       }
+      if (r < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+        return r;
+      }
 
-       break;
+      break;
+          }
+        }
+      // if (it != block.cacheObj.hostsList.end())
+      } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+        // TODO: Retrieve remotely
+        // Policy decision: should we cache remote blocks locally?
       }
-    // else if (ret == -ENOENT) 
-    } else { /* Fetch from backend */
-      if (ret < 0)
-       ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
+    // if (block.version == version)
+    } else {
+      // TODO: If data has already been returned for any older versioned block, then return â€˜retry’ error, else
 
       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
       auto r = drain(dpp, y);
+
       if (r < 0) {
-       ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
-       return r;
+        ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+        return r;
       }
-
       break;
     }
+        } else if (ret == -ENOENT) { // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
+          block.blockID = adjusted_start_ofs;
+          uint64_t last_part_size = source->get_size() - adjusted_start_ofs;
+          block.size = last_part_size;
+          if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+            auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+            if (it != block.cacheObj.hostsList.end()) { /* Local copy */
+              ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
+              if (block.version == version) {
+                oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(last_part_size);
+                ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
+                  " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+                if ((part_len != last_part_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+                  // Read From Cache
+                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
+                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: oid_in_cache=" << oid_in_cache << dendl;
+                  auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+                  this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, last_part_size)));
+                  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+                  auto r = flush(dpp, std::move(completed), y);
+                  if (r < 0) {
+                    drain(dpp, y);
+                    ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+                    return r;
+                  }
+                } else { // if get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists
+                  int r = -1;
+                  if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+                    ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+                  if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+                    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+                    // TODO: Retrieve remotely
+                    // Policy decision: should we cache remote blocks locally?
+                  } else {
+                    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+                    auto r = drain(dpp, y);
+                    if (r < 0) {
+                      ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+                      return r;
+                    }
+                    break;
+                  }
+                }
+              } else {// if (block.version == version)
+                //TODO: return retry error
+                ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+                auto r = drain(dpp, y);
+                if (r < 0) {
+                  ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+                  return r;
+                }
+                break;
+              }
+            } else if (block.cacheObj.hostsList.size()) {
+              //TODO: get remote copy
+            }
+          } else if (ret == -ENOENT) {
+            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+            auto r = drain(dpp, y);
+            if (r < 0) {
+              ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+              return r;
+            }
+            break;
+          }
+        }
+      } else { // else if (ret == -ENOENT)
+        if (ret < 0)
+    ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
 
-    if (start_part_num == (num_parts - 1)) {
-      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-      return drain(dpp, y);
-    } else {
-      adjusted_start_ofs += obj_max_req_size;
-    }
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
-    start_part_num += 1;
-    len -= obj_max_req_size;
-  } while (start_part_num < num_parts);
+        auto r = drain(dpp, y);
+        if (r < 0) {
+    ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+    return r;
+        }
+
+        break;
+      }
+
+      if (start_part_num == (num_parts - 1)) {
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+        return drain(dpp, y);
+      } else {
+        adjusted_start_ofs += obj_max_req_size;
+      }
 
+      start_part_num += 1;
+      len -= obj_max_req_size;
+    } while (start_part_num < num_parts);
+  }
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
 
   Attrs obj_attrs;
@@ -1213,21 +1535,14 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     obj_attrs = source->get_attrs();
   }
 
-  if (source->is_compressed() || obj_attrs.find(RGW_ATTR_CRYPT_MODE) != obj_attrs.end() || !y) {
-    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Skipping writing to cache" << dendl;
-    this->cb->bypass_cache_write();
-  }
-
-  if (start_part_num != 0) {
-    ofs = adjusted_start_ofs;
-  }
-
-  this->cb->set_ofs(ofs);
-  auto r = next->iterate(dpp, ofs, end, this->cb.get(), y);
-
+  this->cb->set_ofs(diff_ofs);
+  this->cb->set_adjusted_start_ofs(adjusted_start_ofs);
+  this->cb->set_part_num(start_part_num);
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): adjusted_start_ofs: " << adjusted_start_ofs << " end: " << end << dendl;
+  auto r = next->iterate(dpp, adjusted_start_ofs, end, this->cb.get(), y);
   //calculate the number of blocks read from backend store, and increment the perfcounter using that
   if(perfcounter) {
-    uint64_t len_to_read_from_store = ((end - ofs) + 1);
+    uint64_t len_to_read_from_store = ((end - adjusted_start_ofs) + 1);
     uint64_t num_blocks = (len_to_read_from_store%obj_max_req_size) == 0 ? len_to_read_from_store/obj_max_req_size : (len_to_read_from_store/obj_max_req_size) + 1;
     perfcounter->inc(l_rgw_d4n_cache_misses, num_blocks);
   }
@@ -1236,7 +1551,8 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, ret=" << r << dendl;
     return r;
   }
-
+  /* Copy params out of next */
+  params = next->params;
   return this->cb->flush_last_part();
 }
 
@@ -1272,12 +1588,31 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
 int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   auto rgw_get_obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
-
+  ldpp_dout(dpp, 20) << __func__ << ": bl_ofs is: " << bl_ofs << " bl_len is: " << bl_len << " ofs is: " << ofs << " part_count: " << part_count << dendl;
   if (!last_part && bl.length() <= rgw_get_obj_max_req_size) {
     if (client_cb) {
-      auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
+      int r = 0;
+      //ranged request
+      if (bl_ofs != ofs && part_count == 0) {
+        if (ofs < bl_len) { // this can happen in case of multipart where each chunk returned is not always of size rgw_get_obj_max_req_size
+          off_t bl_part_len = bl_len - ofs;
+          ldpp_dout(dpp, 20) << __func__ << ": bl_part_len is: " << bl_part_len << dendl;
+          bufferlist bl_part;
+          bl.begin(ofs).copy(bl_part_len, bl_part);
+          ldpp_dout(dpp, 20) << __func__ << ": bl_part.length() is: " << bl_part.length() << dendl;
+          r = client_cb->handle_data(bl_part, 0, bl_part_len);
+          part_count += 1;
+        } else {
+          ofs = ofs - bl_len; //re-adjust the offset
+          ldpp_dout(dpp, 20) << __func__ << ": New value ofs is: " << ofs << dendl;
+        }
+      } else {
+        r = client_cb->handle_data(bl, bl_ofs, bl_len);
+        part_count += 1;
+      }
 
       if (r < 0) {
+        ldpp_dout(dpp, 20) << __func__ << ": error returned is: " << r << dendl;
         return r;
       }
     }
@@ -1300,13 +1635,15 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     block.version = version;
 
     if (source->dest_object && source->dest_bucket) {
-      dest_prefix = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name();
+      D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+      std::string dest_version = d4n_dest_object->get_object_version();
+      dest_prefix = source->dest_bucket->get_name() + "_" + dest_version + "_" + source->dest_object->get_name();
       dest_block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
       dest_block.cacheObj.objName = source->dest_object->get_key().get_oid();
       dest_block.cacheObj.bucketName = source->dest_object->get_bucket()->get_name();
       //dest_block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
       dest_block.cacheObj.dirty = false;
-      dest_block.version = source->dest_version;
+      dest_block.version = dest_version;
     }
 
     //populating fields needed for building directory index
@@ -1316,9 +1653,9 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
-      std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+      std::string oid = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
       if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
-        block.blockID = ofs;
+        block.blockID = adjusted_start_ofs;
         block.size = bl.length();
 
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
@@ -1326,7 +1663,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
            std::string objEtag = "";
-           filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
+           filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, *y);
 
            /* Store block in directory */
            existing_block.blockID = block.blockID;
@@ -1363,14 +1700,16 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         }
       }
       if (source->dest_object && source->dest_bucket) {
-        std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
-        dest_block.blockID = ofs;
+        D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+        std::string dest_version = d4n_dest_object->get_object_version();
+        std::string dest_oid = dest_prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
+        dest_block.blockID = adjusted_start_ofs;
         dest_block.size = bl.length();
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, *y);
             if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
               ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
             }
@@ -1378,15 +1717,15 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         }
       }
     } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
-      std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
-      block.blockID = ofs;
+      std::string oid = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
+      block.blockID = adjusted_start_ofs;
       block.size = bl.length();
       if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, *y);
 
             /* Store block in directory */
            existing_block.blockID = block.blockID;
@@ -1411,21 +1750,23 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         }
       }
       if (source->dest_object && source->dest_bucket) {
-        std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
-        dest_block.blockID = ofs;
+        D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+        std::string dest_version = d4n_dest_object->get_object_version();
+        std::string dest_oid = dest_prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_len);
+        dest_block.blockID = adjusted_start_ofs;
         dest_block.size = bl.length();
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, *y);
             if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
               ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
             }
           }
         }
       }
-      ofs += bl_len;
+      adjusted_start_ofs += bl_len;
     } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
       uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
       uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
@@ -1435,16 +1776,16 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       bl_rem.claim_append(bl_copy);
 
       if (bl_rem.length() == rgw_get_obj_max_req_size) {
-        std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+        std::string oid = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_rem.length());
           if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
-          block.blockID = ofs;
+          block.blockID = adjusted_start_ofs;
           block.size = bl_rem.length();
           
           auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
           if (ret == 0) {
             ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
-              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, *y);
+              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, *y);
 
               /* Store block in directory */
              existing_block.blockID = block.blockID;
@@ -1472,21 +1813,23 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         }
 
         if (source->dest_object && source->dest_bucket) {
-          std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
-          dest_block.blockID = ofs;
+          D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
+          std::string dest_version = d4n_dest_object->get_object_version();
+          std::string dest_oid = dest_prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(bl_rem.length());
+          dest_block.blockID = adjusted_start_ofs;
           dest_block.size = bl_rem.length();
           auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
           if (ret == 0) {
             ret = filter->get_cache_driver()->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
-              filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl_rem.length(), source->dest_version, dirty, *y);
+              filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, *y);
               if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
                 ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
               }
             }
           }
         }
-        ofs += bl_rem.length();
+        adjusted_start_ofs += bl_rem.length();
         bl_rem.clear();
         bl_rem = std::move(bl);
       }//bl_rem.length()
@@ -1522,7 +1865,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
     std::string version, policy_prefix;
 
     if (!source->get_bucket()->versioned()) {
-      version = source->get_object_version(); 
+      version = source->get_object_version();
     } else if (source->get_bucket()->versioned() && !source->have_instance()) {
       rgw::d4n::CacheBlock deleteBlock;
       block.prevVersion = std::pair<std::string, bool>(block.version, block.deleteMarker);
@@ -1675,7 +2018,8 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
       if (!objDirty) { // object written to backend  
        return next->delete_obj(dpp, y, flags);
       } else {
-       if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->eraseObj(dpp, policy_prefix, y))) {
+        std::string object_key = source->get_bucket()->get_name() + "_" + source->get_oid();
+       if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->eraseObj(dpp, object_key, y))) {
          ldpp_dout(dpp, 0) << "Failed to delete policy object entry for: " << source->get_name() << ", ret=" << ret << dendl;
          return -ENOENT;
         } else {
@@ -1714,31 +2058,41 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
 int D4NFilterWriter::prepare(optional_yield y) 
 {
   startTime = time(NULL);
-
-  int ret;
-  if ((ret = driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y)) < 0) 
-    ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data() method failed, ret=" << ret << dendl;
-
   d4n_writecache = g_conf()->d4n_writecache_enabled;
+
   if (!d4n_writecache) {
-    ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): calling next process" << dendl;
+    ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling next->prepare" << dendl;
     return next->prepare(y);
+  } else {
+    //for non-versioned buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted
+    //alternatively, we could add logic to delete this lazily
+    if (!object->get_bucket()->versioned()) {
+      std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = object->get_delete_op();
+      auto ret = del_op->delete_obj(dpp, y, rgw::sal::FLAG_LOG_OP);
+      if (ret < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): delete_obj failed, ret=" << ret << dendl;
+      }
+    }
   }
 
+  std::string version;
   if (!object->have_instance()) {
     if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended
-      this->version = "null";
-      object->set_instance(this->version);
+      version = "null";
+      object->set_instance(version);
     } else {
       constexpr uint32_t OBJ_INSTANCE_LEN = 32;
       char buf[OBJ_INSTANCE_LEN + 1];
       gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
-      this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
+      version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
       ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): generating version: " << version << dendl;
     }
   } else {
     ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version is: " << object->get_instance() << dendl;
+    version = object->get_instance();
   }
+  object->set_object_version(version);
+  this->version = version;
 
   return 0;
 }
@@ -1749,66 +2103,30 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
     off_t bl_len = bl.length();
     off_t ofs = offset;
     bool dirty = true;
-    rgw::d4n::CacheBlock block, existing_block;
 
-    std::string version;
+    std::string version = object->get_object_version();
     std::string prefix;
-    if (object->have_instance()) {
-      version = obj->get_instance();
-    } else {
-      version = this->version;
-    }
-    prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
-    rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
-    block.cacheObj.bucketName = obj->get_bucket()->get_name();
-    block.cacheObj.objName = obj->get_key().get_oid();
-    block.cacheObj.dirty = dirty;
-    block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-    existing_block.cacheObj.objName = block.cacheObj.objName;
-    existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
 
+    prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
     int ret = 0;
 
     if (!d4n_writecache) {
       ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): calling next process" << dendl;
       return next->process(std::move(data), offset);
     } else {
+      rgw::sal::Attrs attrs;
       std::string oid = prefix + "_" + std::to_string(ofs);
       std::string key = "D_" + oid + "_" + std::to_string(bl_len);
       std::string oid_in_cache = oid + "_" + std::to_string(bl_len);
-      block.size = bl.length();
-      block.blockID = ofs;
-      block.cacheObj.dirty = true;
-      block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-      block.version = version;
       dirty = true;
-      ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, y);
+      ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, bl.length(), y);
       if (ret == 0) {     
        if (bl.length() > 0) {          
           ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): key is: " << key << dendl;
-          ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), obj->get_attrs(), y);
+          ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), attrs, y);
           if (ret == 0) {
             ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl;
            driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y);
-
-           /* Store block in directory */
-           existing_block.blockID = block.blockID;
-           existing_block.size = block.size;
-
-           if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) {
-             if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
-               block = existing_block;
-                block.version = version;
-              }
-
-             block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); 
-
-             if ((ret = blockDir->set(dpp, &block, y)) < 0)
-               ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
-           } else {
-             ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
-           }
           } else {
             ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): ERROR: writting data to the cache failed, ret=" << ret << dendl;
            return ret;
@@ -1834,20 +2152,76 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
   std::unordered_set<std::string> hostsList = {};
   auto creationTime = startTime;
   std::string objEtag = etag;
-  bool write_to_backend_store = false;
+  auto size = object->get_size();
+  std::string instance;
+  if (object->have_instance()) {
+    instance = object->get_instance();
+  }
   int ret;
   
+  /* for cache coherence, we are going to cache the head even in case when read-only cache is enabled, just that
+     the head will not be marked dirty and the entire object will written to backend store also. In case write-back
+     cache is enabled, the head will be cached as dirty. */
   if (d4n_writecache) {
-    dirty = true;
-    std::string version;
-    if (object->have_instance()) {
-      version = obj->get_instance();
-    } else {
-      version = this->version;
-    } 
-    std::string key = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
+    auto ret = object->get_obj_attrs(y, dpp);
+    if (if_match) {
+      if (strcmp(if_match, "*") == 0) {
+        if (ret == -ENOENT) {
+          object->delete_data_block_cache_entries(dpp, y, this->version, true);
+          return -ERR_PRECONDITION_FAILED;
+        }
+      } else {
+        rgw::sal::Attrs attrs = object->get_attrs();
+        bufferlist bl;
+        auto iter = attrs.find(RGW_ATTR_ETAG);
+        if (iter == attrs.end()) {
+          object->delete_data_block_cache_entries(dpp, y, this->version, true);
+          return -ERR_PRECONDITION_FAILED;
+        } else {
+          bl = iter->second;
+        }
+        if (strncmp(if_match, bl.c_str(), bl.length()) != 0) {
+          object->delete_data_block_cache_entries(dpp, y, this->version, true);
+          return -ERR_PRECONDITION_FAILED;
+        }
+      }
+    }
+    if (if_nomatch) {
+      if (strcmp(if_nomatch, "*") == 0) {
+        if (ret != -ENOENT) {
+          object->delete_data_block_cache_entries(dpp, y, this->version, true);
+          return -ERR_PRECONDITION_FAILED;
+        }
+      } else {
+        rgw::sal::Attrs attrs = object->get_attrs();
+        bufferlist bl;
+        auto iter = attrs.find(RGW_ATTR_ETAG);
+        if (iter == attrs.end()) {
+          object->delete_data_block_cache_entries(dpp, y, this->version, true);
+          return -ERR_PRECONDITION_FAILED;
+        } else {
+          bl = iter->second;
+        }
+        if (strncmp(if_nomatch, bl.c_str(), bl.length()) == 0) {
+          object->delete_data_block_cache_entries(dpp, y, this->version, true);
+          return -ERR_PRECONDITION_FAILED;
+        }
+      }
+    }
+    //get_obj_attrs will override object version and size with older values, hence setting it here again
+    object->set_object_version(this->version);
+    object->set_instance(instance);
+    object->set_obj_size(size);
 
+    //update data block entries in directory
+    ret = object->set_data_block_dir_entries(dpp, y, this->version, true);
+    if (ret < 0) {
+      return ret;
+    }
+
+    dirty = true;
     ceph::real_time m_time;
+    dirty = true;
     if (mtime) {
       m_time = *mtime;
     } else {
@@ -1856,25 +2230,64 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
     object->set_mtime(m_time);
     object->set_accounted_size(accounted_size);
     ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " size is: " << object->get_size() << dendl;
-    object->set_obj_state_attrs(dpp, y, attrs);
-    bufferlist bl;
-    std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+    object->set_attr_crypt_parts(dpp, y, attrs);
+    object->set_attrs(attrs);
+    object->set_attrs_from_obj_state(dpp, y, attrs);
+  } else {
+    // we need to call next->complete here so that we are able to correctly get the object state needed for caching head
+    ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
+                            delete_at, if_match, if_nomatch, user_data, zones_trace,
+                            canceled, rctx, flags);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): writing to backend store failed, ret=" << ret << dendl;
+      return ret;
+    }
+    object->load_obj_state(dpp, y);
+    attrs = object->get_attrs();
+    object->set_attrs_from_obj_state(dpp, y, attrs);
+
+    std::string version;
+    object->calculate_version(dpp, y, version, attrs);
+    if (version.empty()) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+    }
+  }
+
+  std::string version = object->get_object_version();
+  std::string key = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
+
+  bufferlist bl;
+  std::string head_oid_in_cache;
+  //same as key, as there is no len or offset attached to head oid in cache
+  if (dirty) {
+    head_oid_in_cache = "D_" + key;
+  } else {
+    head_oid_in_cache = key;
+  }
+  ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): head_oid_in_cache is: " << head_oid_in_cache << dendl;
+  ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+  if (ret == 0) {
     ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
     attrs.erase("user.rgw.mtime");
     attrs.erase("user.rgw.object_size");
     attrs.erase("user.rgw.accounted_size");
     attrs.erase("user.rgw.epoch");
+    attrs.erase("user.rgw.multipart");
     object->set_object_version(version);
     if (ret == 0) {
-      object->set_attrs(attrs);
       ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl;
       driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
-      ret = object->set_head_obj_dir_entry(dpp, y, true, true);
+      ret = object->set_head_obj_dir_entry(dpp, y, true, dirty);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
         return ret;
       }
-      driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y);
+      if (dirty) {
+        //using object oid here so that version is automatically picked for versioned buckets, and for non-versioned buckets the old version is replaced by the latest version
+        std::string object_key = obj->get_bucket()->get_name() + "_" + obj->get_oid();
+        ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): object_key=" << object_key << dendl;
+        driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, object_key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y);
+      }
 
       //write object to directory.
       hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address };
@@ -1891,21 +2304,86 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
         return ret;
       }
     } else { //if get_cache_driver()->put()
-      write_to_backend_store = true;
       ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): put failed for head_oid_in_cache, ret=" << ret << dendl;
-      ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling complete of backend store: " << dendl;
+      return ret;
     }
-  } else { // if d4n_writecache = true
-    write_to_backend_store = true;
+  } else {
+    ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): eviction failed for head_oid_in_cache, ret=" << ret << dendl;
+    return ret;
   }
+  return 0;
+}
 
-  if (write_to_backend_store) {
-    ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
-                            delete_at, if_match, if_nomatch, user_data, zones_trace,
-                            canceled, rctx, flags);
-    if (ret < 0) {
-      ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): writing to backend store failed, ret=" << ret << dendl;
+int D4NFilterMultipartUpload::complete(const DoutPrefixProvider *dpp,
+                                   optional_yield y, CephContext* cct,
+                                   std::map<int, std::string>& part_etags,
+                                   std::list<rgw_obj_index_key>& remove_objs,
+                                   uint64_t& accounted_size, bool& compressed,
+                                   RGWCompressionInfo& cs_info, off_t& ofs,
+                                   std::string& tag, ACLOwner& owner,
+                                   uint64_t olh_epoch,
+                                   rgw::sal::Object* target_obj,
+            prefix_map_t& processed_prefixes)
+{
+  //call next->complete to complete writing the object to the backend store
+  auto ret = next->complete(dpp, y, cct, part_etags, remove_objs, accounted_size,
+                       compressed, cs_info, ofs, tag, owner, olh_epoch,
+                       nextObject(target_obj), processed_prefixes);
+  if (ret < 0) {
+    return ret;
+  }
+
+  //Cache only the head object for multipart objects
+  D4NFilterObject* d4n_target_obj = dynamic_cast<D4NFilterObject*>(target_obj);
+  std::string head_oid_in_cache;
+  rgw::sal::Attrs attrs;
+  d4n_target_obj->load_obj_state(dpp, y);
+  attrs = d4n_target_obj->get_attrs();
+  d4n_target_obj->set_attrs_from_obj_state(dpp, y, attrs);
+  bufferlist bl_val;
+  bool is_multipart = true;
+  bl_val.append(std::to_string(is_multipart));
+  attrs["user.rgw.multipart"] = std::move(bl_val);
+
+  std::string version;
+  d4n_target_obj->calculate_version(dpp, y, version, attrs);
+  if (version.empty()) {
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+  }
+
+  bufferlist bl;
+  head_oid_in_cache = d4n_target_obj->get_bucket()->get_name() + "_" + version + "_" + d4n_target_obj->get_name();
+  // we are evicting data if needed, since the head object will be a part of read cache, as the whole multipart object is written to the backend store
+  ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+  if (ret == 0) {
+    ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+    if (ret == 0) {
+      ldpp_dout(dpp, 20) << "D4NFilterMultipartUpload::" << __func__ << " version stored in update method is: " << d4n_target_obj->get_object_version() << dendl;
+      driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
+      ret = d4n_target_obj->set_head_obj_dir_entry(dpp, y, true);
+      if (ret < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
+      }
+      //write object to directory.
+      rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+        .objName = d4n_target_obj->get_oid(),
+        .bucketName = d4n_target_obj->get_bucket()->get_name(),
+        .creationTime = std::to_string(ceph::real_clock::to_double(target_obj->get_mtime())),
+        .dirty = false,
+        .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
+      };
+      ret = driver->get_obj_dir()->set(dpp, &object, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterMultipartUpload::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
+        return ret;
+      }
+    } else {
+      ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): put for head object failed, ret=" << ret << dendl;
+      return ret;
     }
+  } else {
+    ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): failed to cache head object during eviction, ret=" << ret << dendl;
+    return ret;
   }
 
   return 0;
index 2978d78c115cd141af02d8bfa2326a5e2c3a1a74..9ae6f33b33bc1378dd9295ca3b6ae1e7452b71bf 100644 (file)
@@ -59,7 +59,6 @@ class D4NFilterDriver : public FilterDriver {
     virtual std::unique_ptr<Bucket> get_bucket(const RGWBucketInfo& i) override;
     int load_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b,
                   std::unique_ptr<Bucket>* bucket, optional_yield y) override;
-
     virtual std::unique_ptr<Writer> get_atomic_writer(const DoutPrefixProvider *dpp,
                                  optional_yield y,
                                  rgw::sal::Object* obj,
@@ -101,6 +100,10 @@ class D4NFilterBucket : public FilterBucket {
     virtual int create(const DoutPrefixProvider* dpp,
                        const CreateParams& params,
                        optional_yield y) override;
+    virtual std::unique_ptr<MultipartUpload> get_multipart_upload(
+                               const std::string& oid,
+                               std::optional<std::string> upload_id=std::nullopt,
+                               ACLOwner owner={}, ceph::real_time mtime=real_clock::now()) override;
 };
 
 class D4NFilterObject : public FilterObject {
@@ -111,7 +114,7 @@ class D4NFilterObject : public FilterObject {
     rgw_obj obj;
     rgw::sal::Object* dest_object{nullptr}; //for copy-object
     rgw::sal::Bucket* dest_bucket{nullptr}; //for copy-object
-    std::string dest_version;
+    bool multipart{false};
 
   public:
     struct D4NFilterReadOp : FilterReadOp {
@@ -121,12 +124,14 @@ class D4NFilterObject : public FilterObject {
            D4NFilterDriver* filter;
            D4NFilterObject* source;
            RGWGetDataCB* client_cb;
-           uint64_t ofs = 0, len = 0;
+           int64_t ofs = 0, len = 0;
+      int64_t adjusted_start_ofs{0};
            bufferlist bl_rem;
            bool last_part{false};
            bool write_to_cache{true};
            const DoutPrefixProvider* dpp;
            optional_yield* y;
+      int part_count{0};
 
          public:
            D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter),
@@ -139,6 +144,8 @@ class D4NFilterObject : public FilterObject {
               this->y = y;
             }
            void set_ofs(uint64_t ofs) { this->ofs = ofs; }
+      void set_adjusted_start_ofs(uint64_t adjusted_start_ofs) { this->adjusted_start_ofs = adjusted_start_ofs; }
+      void set_part_num(uint64_t part_num) { this->part_count = part_num; }
            int flush_last_part();
            void bypass_cache_write() { this->write_to_cache = false; }
        };
@@ -232,6 +239,7 @@ class D4NFilterObject : public FilterObject {
 
     virtual std::unique_ptr<ReadOp> get_read_op() override;
     virtual std::unique_ptr<DeleteOp> get_delete_op() override;
+    //virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pstate, optional_yield y, bool follow_olh = true) override;
 
     void set_object_version(const std::string& version) { this->version = version; }
     const std::string get_object_version() { return this->version; }
@@ -239,12 +247,16 @@ class D4NFilterObject : public FilterObject {
     void set_prefix(const std::string& prefix) { this->prefix = prefix; }
     const std::string get_prefix() { return this->prefix; }
     int get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y);
-    void set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
-    int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version);
+    void set_attrs_from_obj_state(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
+    int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, rgw::sal::Attrs& attrs);
     int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool is_latest_version = true, bool dirty = false);
+    int set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty = false);
+    int delete_data_block_cache_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty = false);
     bool check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, rgw::d4n::CacheBlock& blk, optional_yield y);
     rgw::sal::Bucket* get_destination_bucket(const DoutPrefixProvider* dpp) { return dest_bucket;}
     rgw::sal::Object* get_destination_object(const DoutPrefixProvider* dpp) { return dest_object; }
+    bool is_multipart() { return multipart; }
+    int set_attr_crypt_parts(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
 };
 
 class D4NFilterWriter : public FilterWriter {
@@ -285,4 +297,25 @@ class D4NFilterWriter : public FilterWriter {
    const DoutPrefixProvider* get_dpp() { return this->dpp; } 
 };
 
+class D4NFilterMultipartUpload : public FilterMultipartUpload {
+private:
+  D4NFilterDriver* driver;
+public:
+  D4NFilterMultipartUpload(std::unique_ptr<MultipartUpload> _next, Bucket* _b, D4NFilterDriver* driver) :
+    FilterMultipartUpload(std::move(_next), _b),
+    driver(driver) {}
+  virtual ~D4NFilterMultipartUpload() = default;
+
+  virtual int complete(const DoutPrefixProvider *dpp,
+                                   optional_yield y, CephContext* cct,
+                                   std::map<int, std::string>& part_etags,
+                                   std::list<rgw_obj_index_key>& remove_objs,
+                                   uint64_t& accounted_size, bool& compressed,
+                                   RGWCompressionInfo& cs_info, off_t& ofs,
+                                   std::string& tag, ACLOwner& owner,
+                                   uint64_t olh_epoch,
+                                   rgw::sal::Object* target_obj,
+            prefix_map_t& processed_prefixes) override;
+};
+
 } } // namespace rgw::sal
index 288a09ae16087f8e0d05b72944aef8d1280c9c3b..2ae557bd0a40d89a96d2d0c24d38b2f5c195b36a 100644 (file)
@@ -4923,6 +4923,7 @@ void RGWPostObj::execute(optional_yield y)
 
     s->obj_size = ofs;
     s->object->set_obj_size(ofs);
+    obj->set_obj_size(ofs);
 
 
     op_ret = s->bucket->check_quota(this, quota, s->obj_size, y);
index e7e23df02161c3289649b50f3d155216e05be2f0..43fdb006c96731f4482e54779ccb4a87fe08388f 100644 (file)
@@ -1225,7 +1225,12 @@ int FilterObject::FilterReadOp::prepare(optional_yield y, const DoutPrefixProvid
 {
   /* Copy params into next */
   next->params = params;
-  return next->prepare(y, dpp);
+  int ret = next->prepare(y, dpp);
+  if (ret < 0)
+    return ret;
+
+  params.parts_count = next->params.parts_count;
+  return 0;
 }
 
 int FilterObject::FilterReadOp::read(int64_t ofs, int64_t end, bufferlist& bl,
@@ -1279,6 +1284,9 @@ int FilterMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
                                ACLOwner& owner, rgw_placement_rule& dest_placement,
                                rgw::sal::Attrs& attrs)
 {
+  next->obj_legal_hold = obj_legal_hold;
+  next->obj_retention = obj_retention;
+  next->cksum_type = cksum_type;
   return next->init(dpp, y, owner, dest_placement, attrs);
 }
 
@@ -1337,7 +1345,15 @@ int FilterMultipartUpload::get_info(const DoutPrefixProvider *dpp,
                                    optional_yield y, rgw_placement_rule** rule,
                                    rgw::sal::Attrs* attrs)
 {
-  return next->get_info(dpp, y, rule, attrs);
+  auto ret = next->get_info(dpp, y, rule, attrs);
+  if (ret < 0) {
+    return ret;
+  }
+
+  this->obj_legal_hold = next->obj_legal_hold;
+  this->obj_retention = next->obj_retention;
+  this->cksum_type = next->cksum_type;
+  return 0;
 }
 
 std::unique_ptr<Writer> FilterMultipartUpload::get_writer(
index f46b9aac3cfcc98d96c7c2fd64a63f1d8a8e6cd6..b68530efdc3b8151f4817a2884f92136071037dc 100644 (file)
@@ -114,7 +114,7 @@ int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const
 int SSDDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y)
 {
     char buffer[len];
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
 
     ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl;
     FILE *cache_file = nullptr;
@@ -156,7 +156,7 @@ int SSDDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t
 int SSDDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y)
 {
     bufferlist src = bl_data;
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
 
     ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl;
     FILE *cache_file = nullptr;
@@ -214,7 +214,7 @@ auto SSDDriver::get_async(const DoutPrefixProvider *dpp, const Executor& ex, con
     auto p = Op::create(ex, handler);
     auto& op = p->user_data;
 
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     int ret = op.prepare_libaio_read_op(dpp, location, read_ofs, read_len, p.get());
@@ -244,7 +244,7 @@ void SSDDriver::put_async(const DoutPrefixProvider *dpp, const Executor& ex, con
     auto p = Op::create(ex, handler);
     auto& op = p->user_data;
 
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     int r = 0;
@@ -323,7 +323,7 @@ rgw::AioResultList SSDDriver::put_async(const DoutPrefixProvider* dpp, optional_
 
 int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
 
     if (!efs::remove(location)) {
         ldpp_dout(dpp, 0) << "ERROR: delete_data::remove has failed to remove the file: " << location << dendl;
@@ -337,12 +337,12 @@ int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& ke
 }
 
 int SSDDriver::rename(const DoutPrefixProvider* dpp, const::std::string& oldKey, const::std::string& newKey, optional_yield y)
-{
+{ 
     std::string location = partition_info.location;
 
-    int ret = std::rename((location+oldKey).c_str(), (location+newKey).c_str());
+    int ret = std::rename((location + "/" + url_encode(oldKey, true)).c_str(), (location + "/" + url_encode(newKey, true)).c_str());
     if (ret < 0) {
-        ldpp_dout(dpp, 0) << "SSDDriver: ERROR: failed to rename the file: " << location+oldKey << dendl;
+        ldpp_dout(dpp, 0) << "SSDDriver: ERROR: failed to rename the file: " << location + "/" + url_encode(oldKey, true) << dendl;
         return ret;
     }
 
@@ -352,7 +352,7 @@ int SSDDriver::rename(const DoutPrefixProvider* dpp, const::std::string& oldKey,
 
 int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location)
 {
-    std::string location = cache_location + key;
+    std::string location = cache_location + "/" + url_encode(key, true);
     int r = 0;
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl;
     cb.reset(new struct aiocb);
@@ -407,8 +407,8 @@ void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) {
     efs::space_info space = efs::space(partition_info.location);
     op.priv_data->set_free_space(op.dpp, space.available);
 
-    std::string new_path = partition_info.location + op.key;
-    std::string old_path = partition_info.location + op.temp_key;
+    std::string new_path = partition_info.location + "/" + url_encode(op.key, true);
+    std::string old_path = partition_info.location + "/" + url_encode(op.temp_key, true);
 
     ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_yield_cb: temp_key: " << op.temp_key << dendl;
 
@@ -465,7 +465,7 @@ void SSDDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval)
 
 int SSDDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     for (auto& it : attrs) {
@@ -490,7 +490,7 @@ int SSDDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& ke
 
 int SSDDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs, optional_yield y)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     for (auto& it : del_attrs) {
@@ -509,7 +509,7 @@ int SSDDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& ke
 
 int SSDDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     char namebuf[64 * 1024];
@@ -543,7 +543,7 @@ int SSDDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key,
 
 int SSDDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     for (auto& [attr_name, attr_val_bl] : attrs) {
@@ -565,7 +565,7 @@ int SSDDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key,
 
 int SSDDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, std::string& attr_val, optional_yield y)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): get_attr: key: " << attr_name << dendl;
@@ -596,7 +596,7 @@ int SSDDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, c
 
 int SSDDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): set_attr: key: " << attr_name << " val: " << attr_val << dendl;
@@ -615,7 +615,7 @@ int SSDDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, c
 
 int SSDDriver::delete_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name)
 {
-    std::string location = partition_info.location + key;
+    std::string location = partition_info.location + "/" + url_encode(key, true);
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
 
     auto ret = removexattr(location.c_str(), attr_name.c_str());