]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: squashing all commits related to caching head in the
authorPritha Srivastava <prsrivas@redhat.com>
Wed, 20 Mar 2024 04:14:50 +0000 (09:44 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
write-back workflow, modifying set_obj_attrs(), get_obj_attrs() and
delete_obj_attrs based on the cached head and modifying the cleaning
method to use a min-heap data structure for storing dirty objects only.

1. rgw/d4n: implementation for caching head object in write-back workflow.
2. rgw/d4n: modifications to get write back cache working after cleaning
process.
3. rgw/d4n: modifications for eviction of dirty blocks.
4. rgw/d4n: modifications include adding a heap of dirty objects
which has objects ordered by their creation time and the top
element of which is fetched in the cleaning method, processed
and deleted in a loop.
5. rgw/d4n: changing the format of cached blocks to
bucket_name_version_object_name_ofs_len, to avoid checks
for versioned and non-versioned objects.
6. rgw/d4n: modifications to set_obj_attrs(), modify_obj_attrs()
and delete_obj_attrs() to check if the head object exists in a cache,
else direct the calls to backend store.
7. rgw/d4n: handling version in case of bucket versioning being suspended
while writing the object.

Co-authored by: Samarah <samarah.uriarte@ibm.com>
Changed dynamic_cast to static_cast for D4NFilterObject
in D4NFilterWriter class constructors.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/test/rgw/test_d4n_policy.cc

index 0be1d1784670ec874fde13d8a9907adb8743000b..96dac74890e8cc3780cd113365f85f9c07a7dfca 100644 (file)
@@ -311,11 +311,11 @@ std::string BlockDirectory::build_index(CacheBlock* block)
   return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
 }
 
-int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) 
+int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) 
 {
   std::string key = build_index(block);
   response<int> resp;
-
+  ldpp_dout(dpp, 10) << __func__ << "(): index is: " << key << dendl;
   try {
     boost::system::error_code ec;
     request req;
@@ -330,7 +330,7 @@ int BlockDirectory::exist_key(CacheBlock* block, optional_yield y)
   return std::get<0>(resp).value();
 }
 
-int BlockDirectory::set(CacheBlock* block, optional_yield y) 
+int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) 
 {
   std::string key = build_index(block);
     
@@ -368,6 +368,12 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y)
   redisValues.push_back("creationTime");
   redisValues.push_back(block->cacheObj.creationTime); 
   redisValues.push_back("dirty");
+  if (block->cacheObj.dirty == true || block->cacheObj.dirty == 1) {
+    block->cacheObj.dirty = 1;
+  }
+  if (block->cacheObj.dirty == false || block->cacheObj.dirty == 0) {
+    block->cacheObj.dirty = 0;
+  }
   redisValues.push_back(std::to_string(block->cacheObj.dirty));
   redisValues.push_back("objHosts");
   
@@ -402,11 +408,13 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y)
   return 0;
 }
 
-int BlockDirectory::get(CacheBlock* block, optional_yield y) 
+int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) 
 {
   std::string key = build_index(block);
 
-  if (exist_key(block, y)) {
+  ldpp_dout(dpp, 10) << __func__ << "(): index is: " << key << dendl;
+
+  if (exist_key(dpp, block, y)) {
     std::vector<std::string> fields;
 
     fields.push_back("blockID");
@@ -439,7 +447,6 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y)
       block->version = std::get<0>(resp).value()[1];
       block->size = boost::lexical_cast<uint64_t>(std::get<0>(resp).value()[2]);
       block->globalWeight = boost::lexical_cast<int>(std::get<0>(resp).value()[3]);
-
       {
         std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
        block->hostsList.clear();
@@ -456,7 +463,6 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y)
       block->cacheObj.creationTime = std::get<0>(resp).value()[7];
       block->cacheObj.dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
       block->dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
-
       {
         std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[9]));
        block->cacheObj.hostsList.clear();
@@ -478,13 +484,13 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y)
 }
 
 /* Note: This method is not compatible for use on Ubuntu systems. */
-int BlockDirectory::copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) 
+int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) 
 {
   std::string key = build_index(block);
   auto copyBlock = CacheBlock{ .cacheObj = { .objName = copyName, .bucketName = copyBucketName }, .blockID = 0 };
   std::string copyKey = build_index(&copyBlock);
 
-  if (exist_key(block, y)) {
+  if (exist_key(dpp, block, y)) {
     try {
       response<int> resp;
      
@@ -522,11 +528,11 @@ int BlockDirectory::copy(CacheBlock* block, std::string copyName, std::string co
   }
 }
 
-int BlockDirectory::del(CacheBlock* block, optional_yield y) 
+int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) 
 {
   std::string key = build_index(block);
 
-  if (exist_key(block, y)) {
+  if (exist_key(dpp, block, y)) {
     try {
       boost::system::error_code ec;
       request req;
@@ -548,11 +554,11 @@ int BlockDirectory::del(CacheBlock* block, optional_yield y)
   }
 }
 
-int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value, optional_yield y) 
+int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y) 
 {
   std::string key = build_index(block);
 
-  if (exist_key(block, y)) {
+  if (exist_key(dpp, block, y)) {
     try {
       /* Ensure field exists */
       {
@@ -589,7 +595,14 @@ int BlockDirectory::update_field(CacheBlock* block, std::string field, std::stri
        std::get<0>(resp).value() += value;
        value = std::get<0>(resp).value();
       }
-
+  if (field == "dirty") { 
+    if (value == "true" || value == "1") {
+      value = "1";
+    }
+    if (value == "false" || value == "0") {
+      value = "0";
+    }
+  }
       {
        boost::system::error_code ec;
        request req;
@@ -612,11 +625,11 @@ int BlockDirectory::update_field(CacheBlock* block, std::string field, std::stri
   }
 }
 
-int BlockDirectory::remove_host(CacheBlock* block, std::string delValue, optional_yield y) 
+int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& delValue, optional_yield y) 
 {
   std::string key = build_index(block);
 
-  if (exist_key(block, y)) {
+  if (exist_key(dpp, block, y)) {
     try {
       /* Ensure field exists */
       {
@@ -649,7 +662,7 @@ int BlockDirectory::remove_host(CacheBlock* block, std::string delValue, optiona
        }
 
        if (std::get<0>(resp).value().find("_") == std::string::npos) /* Last host, delete entirely */
-          return del(block, y);
+          return del(dpp, block, y);
 
         std::string result = std::get<0>(resp).value();
         auto it = result.find(delValue);
index be7bde82691d19f6b8fbe7014ee3b1f92176d51f..9a7a0dfe3c62fa19135de937c0099f25e2a97d35 100644 (file)
@@ -18,7 +18,7 @@ struct CacheObj {
   std::string objName; /* S3 object name */
   std::string bucketName; /* S3 bucket name */
   std::string creationTime; /* Creation time of the S3 Object */
-  bool dirty;
+  bool dirty{false};
   std::vector<std::string> hostsList; /* List of hostnames <ip:port> of object locations for multiple backends */
 };
 
@@ -26,7 +26,7 @@ struct CacheBlock {
   CacheObj cacheObj;
   uint64_t blockID;
   std::string version;
-  bool dirty;
+  bool dirty{false};
   uint64_t size; /* Block size in bytes */
   int globalWeight = 0; /* LFUDA policy variable */
   std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
@@ -67,14 +67,14 @@ class BlockDirectory: public Directory {
     void init(CephContext* cct) {
       this->cct = cct;
     }
-    int exist_key(CacheBlock* block, optional_yield y);
-
-    int set(CacheBlock* block, optional_yield y);
-    int get(CacheBlock* block, optional_yield y);
-    int copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y);
-    int del(CacheBlock* block, optional_yield y);
-    int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y);
-    int remove_host(CacheBlock* block, std::string value, optional_yield y);
+    int exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+
+    int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+    int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+    int copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y);
+    int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+    int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y);
+    int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
 
   private:
     std::shared_ptr<connection> conn;
index b75e1c6a423dc74b26f7a63bc3afe177fb27abfb..878245c029dfdb7072e78deed9ae564067bcb5d9 100644 (file)
@@ -258,7 +258,7 @@ CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optiona
   victim->blockID = entries_heap.top()->offset;
   victim->size = entries_heap.top()->len;
 
-  if (dir->get(victim, y) < 0) {
+  if (dir->get(dpp, victim, y) < 0) {
     return nullptr;
   }
 
@@ -279,7 +279,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
 
   while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop? 
     CacheBlock* victim = get_victim_block(dpp, y);
-
+  
     if (victim == nullptr) {
       ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl;
       delete victim;
@@ -293,7 +293,11 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
       delete victim;
       return -ENOENT;
     }
-
+    // check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty
+    if (it->second->dirty) {
+      ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl;
+      return -ENOENT;
+    }
     int avgWeight = weightSum / entries_map.size();
 
     if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */
@@ -308,7 +312,8 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
         }
 
        victim->globalWeight = 0;
-       if (int ret = dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) {
+  auto globalWeight = std::to_string(victim->globalWeight);
+       if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) {
          delete victim;
          return ret;
         }
@@ -321,12 +326,13 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
     }
 
     victim->globalWeight += it->second->localWeight;
-    if (int ret = dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) {
+    auto globalWeight = std::to_string(victim->globalWeight);
+    if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) {
       delete victim;
       return ret;
     }
 
-    if (int ret = dir->remove_host(victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
+    if (int ret = dir->remove_host(dpp, victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
       delete victim;
       return ret;
     }
@@ -349,43 +355,55 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
   return 0;
 }
 
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y)
 {
   using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
   const std::lock_guard l(lfuda_lock);
   int localWeight = age;
   auto entry = find_entry(key);
-  if (entry != nullptr) { 
-    localWeight = entry->localWeight + age;
+  bool updateLocalWeight = true;
+  // check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
+  // incoming dirty flag is false, that means update() is invoked as part of cleaning process,
+  // so we must not update its localWeight.
+  if (entry != nullptr) {
+    if (entry->dirty && !dirty) {
+      localWeight = entry->localWeight;
+      updateLocalWeight = false;
+    } else {
+      localWeight = entry->localWeight + age;
+    }
   }  
-
   erase(dpp, key, y);
-  
-  LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, creationTime, user, localWeight);
+  LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, localWeight);
   handle_type handle = entries_heap.push(e);
   e->set_handle(handle);
   entries_map.emplace(key, e);
 
   std::string oid_in_cache = key;
-  if (dirty == true)
-    oid_in_cache = "D_"+key;
+  if (dirty == true) {
+    oid_in_cache = "D_" + key;
+  }
 
-  if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) 
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
+  if (updateLocalWeight) {
+    if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) 
+      ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
+  }
 
   weightSum += ((localWeight < 0) ? 0 : localWeight);
 }
 
-void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y)
 {
-  eraseObj(dpp, key, y);
-  
-  const std::lock_guard l(lfuda_lock);
-  LFUDAObjEntry *e = new LFUDAObjEntry(key, version, dirty, size, creationTime, user, etag);
+  using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
+  ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock." << dendl;
+  const std::lock_guard l(lfuda_cleaning_lock);
+  LFUDAObjEntry *e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key};
+  handle_type handle = object_heap.push(e);
+  e->set_handle(handle);
   o_entries_map.emplace(key, e);
+  cond.notify_one();
 }
 
-
 bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
 {
   auto p = entries_map.find(key);
@@ -403,13 +421,15 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o
 
 bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
 {
-  const std::lock_guard l(lfuda_lock);
+  const std::lock_guard l(lfuda_cleaning_lock);
   auto p = o_entries_map.find(key);
   if (p == o_entries_map.end()) {
     return false;
   }
 
+  object_heap.erase(p->second->handle);
   o_entries_map.erase(p);
+  delete p->second;
 
   return true;
 }
@@ -417,131 +437,191 @@ bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key
 void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
 {
   const int interval = cct->_conf->rgw_d4n_cache_cleaning_interval;
-  while(true){
+  while(!quit) {
     ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl;
     std::string name = ""; 
     std::string b_name = ""; 
     std::string key = ""; 
     uint64_t len = 0;
     rgw::sal::Attrs obj_attrs;
-    int count = 0;
-
-    for (auto it = o_entries_map.begin(); it != o_entries_map.end(); it++){
-      if ((it->second->dirty == true) && (std::difftime(time(NULL), it->second->creationTime) > interval)){ //if block is dirty and written more than interval seconds ago
-       name = it->first;
-       rgw_user c_rgw_user = it->second->user;
-
-       size_t pos = 0;
-       std::string delimiter = "_";
-       while ((pos = name.find(delimiter)) != std::string::npos) {
-         if (count == 0){
-           b_name = name.substr(0, pos);
-           name.erase(0, pos + delimiter.length());
-         }
-         count ++;
-       }
-       key = name;
-
-       //writing data to the backend
-       //we need to create an atomic_writer
-       rgw_obj_key c_obj_key = rgw_obj_key(key);               
-       std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
-
-       std::unique_ptr<rgw::sal::Bucket> c_bucket;
-        rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, b_name, "");
-
-       RGWBucketInfo c_bucketinfo;
-       c_bucketinfo.bucket = c_rgw_bucket;
-       c_bucketinfo.owner = c_rgw_user;
-       
-       
-       int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
-       if (ret < 0) {
-         ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
-         break;
+  
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock." << dendl;
+    std::unique_lock<std::mutex> l(lfuda_cleaning_lock);
+    LFUDAObjEntry* e;
+    if (object_heap.size() > 0) {
+      e = object_heap.top();
+    } else {
+      cond.wait(l, [this]{ return (!object_heap.empty() || quit); });
+      continue;
+    }
+    ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl;
+    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl;
+    l.unlock();
+    if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
+      name = e->key;
+      rgw_user c_rgw_user = e->user;
+
+      size_t pos = 0;
+      std::string delimiter = "_";
+      int count = 0;
+      while ((pos = name.find(delimiter)) != std::string::npos) {
+        if (count == 0) {
+          b_name = name.substr(0, pos);
+          ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): b_name=" << b_name << dendl;
+              name.erase(0, pos + delimiter.length());
+          ldpp_dout(dpp, 10) << __LINE__ <<  " " << __func__ << "(): name=" << name << dendl;
+          break;
         }
+        count++;
+        ldpp_dout(dpp, 10) << __LINE__ <<  " " << __func__ << "(): count=" << b_name << dendl;
+      }
+      key = name;
+      //writing data to the backend
+      //we need to create an atomic_writer
+      std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
+
+      std::unique_ptr<rgw::sal::Bucket> c_bucket;
+      rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, "");
+
+      RGWBucketInfo c_bucketinfo;
+      c_bucketinfo.bucket = c_rgw_bucket;
+      c_bucketinfo.owner = c_rgw_user;
+      int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
+        break;
+      }
+
+      std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(e->obj_key);
 
-       std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(c_obj_key);
+      ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
 
-  ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
+      std::unique_ptr<rgw::sal::Writer> processor =  driver->get_atomic_writer(dpp,
+              null_yield,
+              c_obj.get(),
+              owner,
+              NULL,
+              0,
+              "");
 
-       std::unique_ptr<rgw::sal::Writer> processor =  driver->get_atomic_writer(dpp,
-                                 null_yield,
-                                 c_obj.get(),
-          owner,
-                                 NULL,
-                                 0,
-                                 "");
+      int op_ret = processor->prepare(null_yield);
+      if (op_ret < 0) {
+          ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
+          break;
+      }
 
-       int op_ret = processor->prepare(null_yield);
-       if (op_ret < 0) {
-         ldpp_dout(dpp, 20) << "processor->prepare() returned ret=" << op_ret << dendl;
-         break;
-       }
+      std::string prefix = b_name + "_" + e->version + "_" + c_obj->get_name();
+      off_t lst = e->size;
+      off_t fst = 0;
+      off_t ofs = 0;
+
+      rgw::sal::DataProcessor *filter = processor.get();
+      std::string head_oid_in_cache = "D_" + prefix;
+      std::string new_head_oid_in_cache = prefix;
+      ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl;
+      ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
+      bufferlist bl;
+      cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
+      obj_attrs.erase("user.rgw.mtime");
+      obj_attrs.erase("user.rgw.object_size");
+      obj_attrs.erase("user.rgw.accounted_size");
+      obj_attrs.erase("user.rgw.epoch");
+
+      do {
+        ceph::bufferlist data;
+        if (fst >= lst){
+            break;
+        }
+        off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+        off_t cur_len = cur_size - fst;
+        std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+        ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl;
+        rgw::sal::Attrs attrs;
+        cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield);
+        len = data.length();
+        fst += len;
+
+        if (len == 0) {
+          // TODO: if len of any block is 0 for some reason, we must return from here?
+          break;
+        }
 
-       std::string prefix = b_name+"_"+key;
-       off_t lst = it->second->size;
-       off_t fst = 0;
-       off_t ofs = 0;
+        op_ret = filter->process(std::move(data), ofs);
+        if (op_ret < 0) {
+            ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
+            << op_ret << dendl;
+            return;
+        }
 
-       
-       rgw::sal::DataProcessor *filter = processor.get();
-       do {
-         ceph::bufferlist data;
-         if (fst >= lst){
-           break;
-         }
-         off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
-         off_t cur_len = cur_size - fst;
-         std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
-         std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
-         cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, obj_attrs, null_yield);
-         len = data.length();
-         fst += len;
-
-         if (len == 0) {
-           break;
-         }
-
-         op_ret = filter->process(std::move(data), ofs);
-         if (op_ret < 0) {
-           ldpp_dout(dpp, 20) << "processor->process() returned ret="
-               << op_ret << dendl;
-           return;
-         }
-
-         rgw::d4n::CacheBlock block;
-         block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
-         block.cacheObj.objName = c_obj->get_key().get_oid();
-         block.size = len;
-         block.blockID = ofs;
-         op_ret = dir->update_field(&block, "dirty", "false", null_yield); 
-         if (op_ret < 0) {
-           ldpp_dout(dpp, 20) << "updating dirty flag in Block directory failed!" << dendl;
-           return;
-         }
-
-         cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
-
-         ofs += len;
-       } while (len > 0);
-
-       op_ret = filter->process({}, ofs);
-       
-       const req_context rctx{dpp, null_yield, nullptr};
-       ceph::real_time mtime = ceph::real_clock::from_time_t(it->second->creationTime);
-        op_ret = processor->complete(lst, it->second->etag, &mtime, ceph::real_clock::from_time_t(it->second->creationTime), obj_attrs,
-                               std::nullopt, ceph::real_time(), nullptr, nullptr,
-                               nullptr, nullptr, nullptr,
-                               rctx, rgw::sal::FLAG_LOG_OP);
-
-       //data is clean now, updating in-memory metadata
-       it->second->dirty = false;
+        ofs += len;
+      } while (len > 0);
+
+      op_ret = filter->process({}, ofs);
+
+      const req_context rctx{dpp, null_yield, nullptr};
+      ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime);
+      op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs,
+                              std::nullopt, ceph::real_time(), nullptr, nullptr,
+                              nullptr, nullptr, nullptr,
+                              rctx, rgw::sal::FLAG_LOG_OP);
+
+      //invoke update() with dirty flag set to false, to update in-memory metadata for each block
+      // reset values
+      lst = e->size;
+      fst = 0;
+      do {
+        if (fst >= lst) {
+            break;
+        }
+        off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+        off_t cur_len = cur_size - fst;
+
+        std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+        ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl;
+        std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+        //Rename block to remove "D" prefix
+        cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
+        //Update in-memory data structure for each block
+        this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y);
+
+        rgw::d4n::CacheBlock block;
+        block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
+        block.cacheObj.objName = c_obj->get_key().get_oid();
+        block.size = cur_len;
+        block.blockID = fst;
+        std::string dirty = "false";
+        op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield);
+        if (op_ret < 0) {
+            ldpp_dout(dpp, 5) << __func__ << "updating dirty flag in Block directory failed!" << dendl;
+            return;
+        }
+        fst += cur_len;
+      } while(fst < lst);
+
+      cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
+      //data is clean now, updating in-memory metadata for an object
+      e->dirty = false;
+      //invoke update() with dirty flag set to false, to update in-memory metadata for head
+      this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
+
+      rgw::d4n::CacheBlock block;
+      block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
+      block.cacheObj.objName = c_obj->get_name();
+      block.size = 0;
+      block.blockID = 0;
+      std::string dirty = "false";
+      op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield);
+      if (op_ret < 0) {
+          ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
+          return;
       }
-    }
 
-    std::this_thread::sleep_for(std::chrono::milliseconds(interval));
-  }
+      //remove entry from map and queue, eraseObj locks correctly
+      eraseObj(dpp, e->key, null_yield);
+    } else { //end-if std::difftime(time(NULL), e->creationTime) > interval
+      std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised?
+    }
+  } //end-while true
 }
 
 
@@ -575,20 +655,19 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y
   return 0;
 }
 
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y)
 {
   const std::lock_guard l(lru_lock);
   _erase(dpp, key, y);
-  Entry *e = new Entry(key, offset, len, version, dirty, creationTime, user);
+  Entry* e = new Entry(key, offset, len, version, dirty);
   entries_lru_list.push_back(*e);
   entries_map.emplace(key, e);
 }
 
-void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y)
 {
-  eraseObj(dpp, key, y);
   const std::lock_guard l(lru_lock);
-  ObjEntry *e = new ObjEntry(key, version, dirty, size, creationTime, user, etag);
+  ObjEntry* e = new ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key);
   o_entries_map.emplace(key, e);
   return;
 }
@@ -600,25 +679,25 @@ bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, opt
   return _erase(dpp, key, y);
 }
 
-bool LRUPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
 {
-  auto p = entries_map.find(key);
-  if (p == entries_map.end()) {
+  const std::lock_guard l(lru_lock);
+  auto p = o_entries_map.find(key);
+  if (p == o_entries_map.end()) {
     return false;
   }
-  entries_map.erase(p);
-  entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
+  o_entries_map.erase(p);
   return true;
 }
 
-bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+bool LRUPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
 {
-  const std::lock_guard l(lru_lock);
-  auto p = o_entries_map.find(key);
-  if (p == o_entries_map.end()) {
+  auto p = entries_map.find(key);
+  if (p == entries_map.end()) {
     return false;
   }
-  o_entries_map.erase(p);
+  entries_map.erase(p);
+  entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
   return true;
 }
 
index 61a8743a172ba0fbc5e03fe8dff95d4fef6e4087..6301028b1cadd2517b11de569de353d93f069b42 100644 (file)
@@ -27,21 +27,19 @@ class CachePolicy {
       uint64_t len;
       std::string version;
       bool dirty;
-      time_t creationTime;
-      rgw_user user;
-      Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, rgw_user user) : key(key), offset(offset), 
-                                                                                     len(len), version(version), dirty(dirty), creationTime(creationTime), user(user) {}
+      Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty) : key(key), offset(offset), 
+                                                                                     len(len), version(version), dirty(dirty) {}
     };
 
    
     //The disposer object function
     struct Entry_delete_disposer {
-      void operator()(Entry *e) {
+      void operator()(Entrye) {
         delete e;
       }
     };
 
-    struct ObjEntry : public boost::intrusive::list_base_hook<> {
+    struct ObjEntry {
       std::string key;
       std::string version;
       bool dirty;
@@ -49,24 +47,21 @@ class CachePolicy {
       time_t creationTime;
       rgw_user user;
       std::string etag;
-      ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag) {}
-    };
-
-    struct ObjEntry_delete_disposer {
-      void operator()(ObjEntry *e) {
-        delete e;
-      }
+      std::string bucket_name;
+      rgw_obj_key obj_key;
+      ObjEntry() = default;
+      ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag), bucket_name(bucket_name), obj_key(obj_key) {}
     };
 
   public:
     CachePolicy() {}
     virtual ~CachePolicy() = default; 
 
-    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) = 0; 
+    virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0;
     virtual int exist_key(std::string key) = 0;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) = 0;
-    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) = 0;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) = 0;
+    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) = 0;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
     virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
     virtual void cleaning(const DoutPrefixProvider* dpp) = 0;
@@ -77,29 +72,56 @@ class LFUDAPolicy : public CachePolicy {
     template<typename T>
     struct EntryComparator {
       bool operator()(T* const e1, T* const e2) const {
-       return e1->localWeight > e2->localWeight;
+        // order the min heap using localWeight and dirty flag so that dirty blocks are at the bottom
+        if ((e1->dirty && e2->dirty) || (!e1->dirty && !e2->dirty)) {
+               return e1->localWeight > e2->localWeight;
+        } else if (e1->dirty && !e2->dirty){
+          return true;
+        } else if (!e1->dirty && e2->dirty) {
+          return false;
+        } else {
+          return e1->localWeight > e2->localWeight;
+        }
       }
     }; 
 
+    template<typename T>
+    struct ObjectComparator {
+      bool operator()(T* const e1, T* const e2) const {
+        // order the min heap using creationTime
+        return e1->creationTime > e2->creationTime;
+      }
+    };
+
     struct LFUDAEntry : public Entry {
       int localWeight;
       using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
       handle_type handle;
 
-      LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, time_t creationTime, rgw_user user, int localWeight) : Entry(key, offset, len, version, dirty, creationTime, user), localWeight(localWeight) {}
+      LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty), localWeight(localWeight) {}
       
-      void set_handle(handle_type handle_) { handle = handle_; } 
+      void set_handle(handle_type handle_) { handle = handle_; }
     };
 
     struct LFUDAObjEntry : public ObjEntry {
-      LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : ObjEntry(key, version, dirty, size, creationTime, user, etag) {}
+      using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
+      handle_type handle;
+
+      LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key) {}
+
+      void set_handle(handle_type handle_) { handle = handle_; }
     };
 
     using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
+    using Object_Heap = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>;
     Heap entries_heap;
+    Object_Heap object_heap; //This heap contains dirty objects ordered by their creation time, used for cleaning method
     std::unordered_map<std::string, LFUDAEntry*> entries_map;
-    std::unordered_map<std::string, LFUDAObjEntry*> o_entries_map;
+    std::unordered_map<std::string, LFUDAObjEntry*> o_entries_map; //Contains only dirty objects, used for look-up
     std::mutex lfuda_lock;
+    std::mutex lfuda_cleaning_lock;
+    std::condition_variable cond;
+    bool quit{false};
 
     int age = 1, weightSum = 0, postedSum = 0;
     optional_yield y = null_yield;
@@ -107,7 +129,7 @@ class LFUDAPolicy : public CachePolicy {
     BlockDirectory* dir;
     rgw::cache::CacheDriver* cacheDriver;
     std::optional<asio::steady_timer> rthread_timer;
-    rgw::sal::Driver *driver;
+    rgw::sal::Driverdriver;
     std::thread tc;
     CephContext* cct;
 
@@ -139,21 +161,25 @@ class LFUDAPolicy : public CachePolicy {
     ~LFUDAPolicy() {
       rthread_stop();
       delete dir;
+      std::lock_guard l(lfuda_cleaning_lock);
+      quit = true;
+      cond.notify_all();
     } 
 
     virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     void save_y(optional_yield y) { this->y = y; }
-    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
-    virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override;
+    virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
     virtual void cleaning(const DoutPrefixProvider* dpp) override;
     LFUDAObjEntry* find_obj_entry(const std::string& key) {
       auto it = o_entries_map.find(key);
-      if (it == o_entries_map.end())
+      if (it == o_entries_map.end()) {
         return nullptr;
+      }
       return it->second;
     }
 };
@@ -173,11 +199,11 @@ class LRUPolicy : public CachePolicy {
   public:
     LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
 
-    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; } 
+    virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
-    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override;
+    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     virtual void cleaning(const DoutPrefixProvider* dpp) override {}
index 5803ce2ca9b27c41bb8e4120d3a1b4720052e1c7..55da49e81f4da4d36c0fbcdbece60c993822176b 100644 (file)
@@ -105,115 +105,101 @@ int D4NFilterBucket::create(const DoutPrefixProvider* dpp,
 int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
                             Attrs* delattrs, optional_yield y, uint32_t flags)
 {
-  if (setattrs != NULL) {
-    /* Ensure setattrs and delattrs do not overlap */
-    if (delattrs != NULL) {
-      for (const auto& attr : *delattrs) {
-        if (std::find(setattrs->begin(), setattrs->end(), attr) != setattrs->end()) {
-          delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+  rgw::sal::Attrs attrs;
+  std::string head_oid_in_cache;
+  if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) {
+    if (setattrs != nullptr) {
+      /* Ensure setattrs and delattrs do not overlap */
+      if (delattrs != nullptr) {
+        for (const auto& attr : *delattrs) {
+          if (std::find(setattrs->begin(), setattrs->end(), attr) != setattrs->end()) {
+            delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+          }
         }
       }
-    }
+      //if set_obj_attrs() can be called to update existing attrs, then update_attrs() need to be called
+      if (auto ret = driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, *setattrs, y); ret < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed with ret: " << ret << dendl;
+        return ret;
+      }
+    } //if setattrs != nullptr
 
-    if (driver->get_cache_driver()->set_attrs(dpp, this->get_key().get_oid(), *setattrs, y) < 0)
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed." << dendl;
-  }
+    if (delattrs != nullptr) {
+      Attrs::iterator attr;
+      Attrs currentattrs = this->get_attrs();
 
-  if (delattrs != NULL) {
-    Attrs::iterator attr;
-    Attrs currentattrs = this->get_attrs();
+      /* Ensure all delAttrs exist */
+      for (const auto& attr : *delattrs) {
+        if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) {
+          delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+        }
+      }
 
-    /* Ensure all delAttrs exist */
-    for (const auto& attr : *delattrs) {
-      if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) {
-       delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+      if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, *delattrs, y); ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
+        return ret;
       }
+    } //if delattrs != nullptr
+  } else {
+    auto ret = next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
+    if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl;
+      return ret;
     }
-
-    if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), *delattrs, y) < 0) 
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl;
   }
-
-  return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
+  return 0;
 }
 
 bool D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y)
 {
-  rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
-  rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
-        .objName = this->get_name(),
-        .bucketName = this->get_bucket()->get_name(),
-        };
-
-  rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
-          .cacheObj = object,
-          .blockID = 0,
-          .version = version,
-          .size = 0
-          };
-
-  bool found_in_cache = true;
-  //if the block corresponding to head object does not exist in directory, implies it is not cached
-  if (blockDir->exist_key(&block, y) && (blockDir->get(&block, y) == 0)) {
-    rgw::sal::Attrs attrs;
-    std::string version = block.version;
-    this->set_object_version(version);
-    //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version
-    std::string head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name();
-    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache." << dendl;
-    auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+  std::string head_oid_in_cache;
+  rgw::sal::Attrs attrs;
+  bool found_in_cache = check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y);
+
+  if (found_in_cache) {
+    /* Set metadata locally */
+
+    std::string instance;
+    for (auto& attr : attrs) {
+      if (attr.second.length() > 0) {
+        if (attr.first == "user.rgw.mtime") {
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
+          auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
+          this->set_mtime(mtime);
+        } else if (attr.first == "user.rgw.object_size") {
+          auto size = std::stoull(attr.second.c_str());
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
+          this->set_obj_size(size);
+        } else if (attr.first == "user.rgw.accounted_size") {
+          auto accounted_size = std::stoull(attr.second.c_str());
+          this->set_accounted_size(accounted_size);
+        } else if (attr.first == "user.rgw.epoch") {
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
+          auto epoch = std::stoull(attr.second.c_str());
+          this->set_epoch(epoch);
+        } else if (attr.first == "user.rgw.version_id") {
+          instance = attr.second.to_str();
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl;
+        } else if (attr.first == "user.rgw.source_zone") {
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
+          auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
+          this->set_short_zone_id(short_zone_id);
+        } else {
+          ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
+        }
+      }//end-if
+    }//end-for
+    this->set_instance(instance); //set this only after setting object state else it won't take effect
+    attrs.erase("user.rgw.mtime");
+    attrs.erase("user.rgw.object_size");
+    attrs.erase("user.rgw.accounted_size");
+    attrs.erase("user.rgw.epoch");
+    /* Set attributes locally */
+    auto ret = this->set_attrs(attrs);
     if (ret < 0) {
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
-      found_in_cache = false;
-    } else {
-      /* Set metadata locally */
-      RGWQuotaInfo quota_info;
-
-      std::string instance;
-      for (auto& attr : attrs) {
-        if (attr.second.length() > 0) {
-          if (attr.first == "user.rgw.mtime") {
-            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
-            auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
-            this->set_mtime(mtime);
-          } else if (attr.first == "user.rgw.object_size") {
-            auto size = std::stoull(attr.second.c_str());
-            this->set_obj_size(size);
-            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
-          } else if (attr.first == "user.rgw.accounted_size") {
-            auto accounted_size = std::stoull(attr.second.c_str());
-            this->set_accounted_size(accounted_size);
-          } else if (attr.first == "user.rgw.epoch") {
-            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
-            auto epoch = std::stoull(attr.second.c_str());
-            this->set_epoch(epoch);
-          } else if (attr.first == "user.rgw.version_id") {
-            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id." << dendl;
-            instance = attr.second.to_str();
-          } else if (attr.first == "user.rgw.source_zone") {
-            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
-            auto zone_short_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
-            this->set_short_zone_id(zone_short_id);
-          } else {
-            ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
-          }
-        }//end-if
-      }//end-for
-      //this->set_obj_state(astate);
-      this->set_instance(instance); //set this only after setting object state else it won't take effect
-      attrs.erase("user.rgw.mtime");
-      attrs.erase("user.rgw.object_size");
-      attrs.erase("user.rgw.accounted_size");
-      attrs.erase("user.rgw.epoch");
-      /* Set attributes locally */
-      ret = this->set_attrs(attrs);
-      if (ret < 0) {
-        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
-      }
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
     }
-  } else {
-    found_in_cache = false;
-  }
+  } // if found_in_cache = true
 
   return found_in_cache;
 }
@@ -267,7 +253,35 @@ int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_y
   return 0;
 }
 
-int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y)
+int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool dirty)
+{
+  ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl;
+  rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
+  rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+    .objName = this->get_name(),
+    .bucketName = this->get_bucket()->get_name(),
+    .dirty = dirty,
+    .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
+    };
+
+  rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+    .cacheObj = object,
+    .blockID = 0,
+    .version = this->get_object_version(),
+    .dirty = dirty,
+    .size = 0,
+    .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
+    };
+
+  auto ret = blockDir->set(dpp, &block, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+  }
+
+  return ret;
+}
+
+bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, optional_yield y)
 {
   rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
   rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
@@ -278,16 +292,37 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio
   rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
           .cacheObj = object,
           .blockID = 0,
-          .version = this->get_object_version(),
           .size = 0
           };
 
-  auto ret = blockDir->set(&block, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+  bool found_in_cache = true;
+  //if the block corresponding to head object does not exist in directory, implies it is not cached
+  if (blockDir->exist_key(dpp, &block, y) && (blockDir->get(dpp, &block, y) == 0)) {
+    std::string version;
+    if (have_instance()) {
+      version = get_instance();
+    } else {
+      version = block.version;
+    }
+    this->set_object_version(version);
+    //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version
+    head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name();
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl;
+    auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+    if (ret < 0) {
+      //check for dirty blocks also
+      head_oid_in_cache = "D_" + get_bucket()->get_name() + "_" + version + "_" + get_name();
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl;
+      ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+      if (ret < 0) {
+        found_in_cache = false;
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
+      }
+    }
+  } else { //if blockDir->exist_key
+    found_in_cache = false;
   }
-
-  return ret;
+  return found_in_cache;
 }
 
 int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
@@ -299,7 +334,10 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d
     std::string version;
     ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from backend store." << dendl;
     auto ret = next->get_obj_attrs(y, dpp, target_obj);
-    if (ret < 0) {
+    if (ret < 0 || !target_obj) {
+      if (!target_obj) {
+        ret = -ENOENT;
+      }
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl;
       return ret;
     }
@@ -314,17 +352,26 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d
     }
 
     head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name();
-    ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
+    if (this->driver->get_policy_driver()->get_cache_policy()->exist_key(head_oid_in_cache) > 0) {
+      ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
+    } else {
+      ret = this->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+      if (ret == 0) {
+        bufferlist bl;
+        ret = this->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+      } else {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to evict data with err: " << ret << dendl;
+      }
+    }
     if (ret == 0) {
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
-      time_t creationTime = ceph::real_clock::to_time_t(this->get_mtime());
-      this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, creationTime, get<rgw_user>(this->get_bucket()->get_owner()), y);
+      this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y);
       ret = set_head_obj_dir_entry(dpp, y);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
       }
     } else {
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in block dir with error: " << ret << dendl;
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend with error: " << ret << dendl;
     }
   }
 
@@ -336,30 +383,50 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va
 {
   Attrs update;
   update[(std::string)attr_name] = attr_val;
-
-  if (driver->get_cache_driver()->update_attrs(dpp, this->get_key().get_oid(), update, y) < 0) 
-    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver update_attrs method failed." << dendl;
-
-  return next->modify_obj_attrs(attr_name, attr_val, y, dpp, flags);
+  std::string head_oid_in_cache;
+  rgw::sal::Attrs attrs;
+  if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) {
+    if (auto ret = driver->get_cache_driver()->update_attrs(dpp, head_oid_in_cache, update, y); ret < 0) {
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver update_attrs method failed with ret: " << ret << dendl;
+      return ret;
+    }
+  } else {
+    auto ret = next->modify_obj_attrs(attr_name, attr_val, y, dpp, flags);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): modify_obj_attrs of backend store failed with ret: " << ret << dendl;
+      return ret;
+    }
+  }
+  return 0;
 }
 
 int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name,
                                optional_yield y)
 {
   buffer::list bl;
+  std::string head_oid_in_cache;
+  rgw::sal::Attrs attrs;
   Attrs delattr;
-  delattr.insert({attr_name, bl});
-  Attrs currentattrs = this->get_attrs();
-  rgw::sal::Attrs::iterator attr = delattr.begin();
+  if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) {
+    delattr.insert({attr_name, bl});
+    Attrs currentattrs = this->get_attrs();
+    rgw::sal::Attrs::iterator attr = delattr.begin();
 
-  /* Ensure delAttr exists */
-  if (std::find_if(currentattrs.begin(), currentattrs.end(),
-       [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) {
+    /* Ensure delAttr exists */
+    if (std::find_if(currentattrs.begin(), currentattrs.end(),
+        [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) {
 
-    if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), delattr, y) < 0) 
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl;
-  } else 
-    return next->delete_obj_attrs(dpp, attr_name, y);  
+      if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, delattr, y); ret < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
+        return ret;
+      }
+    }
+  } else {
+    if (auto ret = next->delete_obj_attrs(dpp, attr_name, y); ret < 0) {
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): delete_obj_attrs method of backend store failed with ret: " << ret << dendl;
+      return ret;
+    }
+  }
 
   return 0;
 }
@@ -447,8 +514,7 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
       ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
       if (ret == 0) {
         ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
-        time_t creationTime = ceph::real_clock::to_time_t(this->source->get_mtime());
-        source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
+        source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
         ret = source->set_head_obj_dir_entry(dpp, y);
         if (ret < 0) {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
@@ -469,16 +535,13 @@ void D4NFilterObject::D4NFilterReadOp::cancel() {
 }
 
 int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) {
-  auto c = aio->wait();
-  while (!c.empty()) {
-    int r = flush(dpp, std::move(c), y);
-    if (r < 0) {
-      cancel();
-      return r;
-    }
-    c = aio->wait();
+  auto c = aio->drain();
+  int r = flush(dpp, std::move(c), y);
+  if (r < 0) {
+    cancel();
+    return r;
   }
-  return flush(dpp, std::move(c), y);
+  return 0;
 }
 
 int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y) {
@@ -514,12 +577,6 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
       uint64_t ofs = ofs_len_pair.first;
       uint64_t len = ofs_len_pair.second;
       bool dirty = false;
-      /*
-      std::stringstream s;
-      utime_t ut(source->get_mtime());
-      ut.gmtime(s);
-      */
-      time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
 
       rgw::d4n::CacheBlock block;
       block.cacheObj.objName = source->get_key().get_oid();
@@ -529,15 +586,15 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
 
       std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
 
-      if (source->driver->get_block_dir()->get(&block, y) == 0){
-        if (block.dirty == true){ 
+      if (source->driver->get_block_dir()->get(dpp, &block, y) == 0){
+        if (block.dirty){
           dirty = true;
         }
       }
 
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs  << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
-      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
+      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y);
       blocks_info.erase(it);
     } else {
       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
@@ -557,14 +614,8 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
   std::string version = source->get_object_version();
   std::string prefix;
-  /* After prepare() method, for versioned objects, get_oid() returns an oid with versionId added,
-   * even for versioned objects, where version id is not provided as input
-   */
-  if (source->have_instance()) {
-    prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid();
-  } else {
-    prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid();
-  }
+
+  prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name();
 
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "prefix: " << prefix << dendl;
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl;
@@ -626,10 +677,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
     int ret = -1;
-    if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && (ret = source->driver->get_block_dir()->get(&block, y)) == 0) { 
+    if (source->driver->get_block_dir()->exist_key(dpp, &block, y) > 0 && (ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { 
       auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address);
       if (it != block.hostsList.end()) { /* Local copy */
-       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl;
+       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
         std::string key = oid_in_cache;
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.dirty << dendl;
         if (block.dirty == true) { 
@@ -753,6 +804,29 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   return this->cb->flush_last_part();
 }
 
+int D4NFilterObject::D4NFilterReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y)
+{
+  rgw::sal::Attrs& attrs = source->get_attrs();
+  if (attrs.empty()) {
+    rgw_obj obj = source->get_obj();
+    auto ret = source->get_obj_attrs(y, dpp, &obj);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Error: failed to fetch attrs, ret= " << ret << dendl;
+      return ret;
+    }
+    //get_obj_attrs() calls set_attrs() internally, hence get_attrs() can be invoked to get the latest attrs.
+    attrs = source->get_attrs();
+  }
+  auto it = attrs.find(name);
+  if (it == attrs.end()) {
+    ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Attribute value NOT found for attr name= " << name << dendl;
+    return next->get_attr(dpp, name, dest, y);
+  }
+
+  dest = it->second;
+  return 0;
+}
+
 int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
 {
   last_part = true;
@@ -773,6 +847,10 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 
   //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
   if (write_to_cache) {
+    Attrs attrs; // empty attrs for cache sets
+    std::string version = source->get_object_version();
+    std::string prefix = source->get_prefix();
+
     rgw::d4n::CacheBlock block, existing_block;
     rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
     block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); 
@@ -781,15 +859,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     std::stringstream s;
     block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
     block.cacheObj.dirty = false;
-    bool dirty = false;
-    time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
+    bool dirty = block.dirty = false; //Reading from the backend, data is clean
+    block.version = version;
 
     //populating fields needed for building directory index
     existing_block.cacheObj.objName = block.cacheObj.objName;
     existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
-    Attrs attrs; // empty attrs for cache sets
-    std::string version = source->get_object_version();
-    std::string prefix = source->get_prefix();
 
     ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
 
@@ -798,34 +873,32 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
         block.blockID = ofs;
         block.size = bl.length();
-        block.version = version;
-        block.dirty = false; //Reading from the backend, data is clean
+
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
            std::string objEtag = "";
-           filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime,  std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
-           filter->get_policy_driver()->get_cache_policy()->updateObj(dpp, prefix, version, dirty, source->get_size(), creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), objEtag, *y);
+           filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
 
            /* Store block in directory */
-            if (!blockDir->exist_key(&block, *y)) {
-              if (blockDir->set(&block, *y) < 0) //should we revert previous steps if this step fails?
+            if (!blockDir->exist_key(dpp, &block, *y)) {
+              if (blockDir->set(dpp, &block, *y) < 0) //should we revert previous steps if this step fails?
                ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
             } else {
               existing_block.blockID = block.blockID;
               existing_block.size = block.size;
              existing_block.dirty = block.dirty;
-              if (blockDir->get(&existing_block, *y) < 0) {
+              if (blockDir->get(dpp, &existing_block, *y) < 0) {
                 ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
               } else {
                 if (existing_block.version != block.version) {
-                  if (blockDir->del(&existing_block, *y) < 0) //delete existing block
+                  if (blockDir->del(dpp, &existing_block, *y) < 0) //delete existing block
                     ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
-                  if (blockDir->set(&block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+                  if (blockDir->set(dpp, &block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
                     ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
                 } else {
-                if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+                if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
                   ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
                 }
               }
@@ -839,34 +912,32 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
       block.blockID = ofs;
       block.size = bl.length();
-      block.version = version;
       ofs += bl_len;
-      block.dirty = dirty;
       if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
 
             /* Store block in directory */
-            if (!blockDir->exist_key(&block, *y)) {
-              if (blockDir->set(&block, *y) < 0)
+            if (!blockDir->exist_key(dpp, &block, *y)) {
+              if (blockDir->set(dpp, &block, *y) < 0)
                ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
             } else {
               existing_block.blockID = block.blockID;
               existing_block.size = block.size;
              existing_block.dirty = block.dirty;
-              if (blockDir->get(&existing_block, *y) < 0) {
+              if (blockDir->get(dpp, &existing_block, *y) < 0) {
                 ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
               }
               if (existing_block.version != block.version) {
-                if (blockDir->del(&existing_block, *y) < 0)
+                if (blockDir->del(dpp, &existing_block, *y) < 0)
                     ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
-                  if (blockDir->set(&block, *y) < 0)
+                  if (blockDir->set(dpp, &block, *y) < 0)
                     ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
               } else {
-                if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+                if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
                   ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl;
               }
             }
@@ -888,33 +959,31 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
           block.blockID = ofs;
           block.size = bl_rem.length();
-          block.version = version;
           ofs += bl_rem.length();
-               block.dirty = dirty;
           auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
           if (ret == 0) {
             ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
-              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
+              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, *y);
 
               /* Store block in directory */
-              if (!blockDir->exist_key(&block, *y)) {
-                if (blockDir->set(&block, *y) < 0)
+              if (!blockDir->exist_key(dpp, &block, *y)) {
+                if (blockDir->set(dpp, &block, *y) < 0)
                   ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
              } else {
                existing_block.blockID = block.blockID;
                existing_block.size = block.size;
                existing_block.dirty = block.dirty;
-               if (blockDir->get(&existing_block, *y) < 0) {
+               if (blockDir->get(dpp, &existing_block, *y) < 0) {
                  ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
                } else {
                  if (existing_block.version != block.version) {
-                   if (blockDir->del(&existing_block, *y) < 0)
+                   if (blockDir->del(dpp, &existing_block, *y) < 0)
                      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
-                   if (blockDir->set(&block, *y) < 0)
+                   if (blockDir->set(dpp, &block, *y) < 0)
                      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
                  } else {
-                   if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+                   if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
                      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
                  }
                }
@@ -945,6 +1014,17 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
                                                    optional_yield y, uint32_t flags)
 {
+  /*
+  1. Check if object exists in Object Directory
+  2. get dirty flag and version to construct the oid in cache correctly 
+  3. loop and delete all blocks in the cache
+  4. delete the head block
+  5. Update the in memory data structure for all blocks, and update the block directory also
+  6. Update the in memory data structure for head block, and update the block directory also
+  7. If the blocks reside in other caches, send remote request for the same
+  8. Need to figure out a way to get all versions to be deleted in case of versioned objects when a version is not specified.
+  9. If the object is not in object directory call next->delete_obj
+  */
   rgw::d4n::CacheObj obj = rgw::d4n::CacheObj{ // TODO: Add logic to ObjectDirectory del method to also delete all blocks belonging to that object
                             .objName = source->get_key().get_oid(),
                             .bucketName = source->get_bucket()->get_name()
@@ -972,15 +1052,27 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
 int D4NFilterWriter::prepare(optional_yield y) 
 {
   startTime = time(NULL);
-  if (driver->get_cache_driver()->delete_data(save_dpp, obj->get_key().get_oid(), y) < 0) 
-    ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl;
+  if (driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y) < 0) 
+    ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl;
   d4n_writecache = g_conf()->d4n_writecache_enabled;
   if (!d4n_writecache){
-    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl;
+    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl;
     return next->prepare(y);
   }
-  else
-    return 0;
+  if (!object->have_instance()) {
+    if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended
+      this->version = "null";
+    } else { // this holds true for non-versioned object and for version enabled object with no versionId given as input
+      constexpr uint32_t OBJ_INSTANCE_LEN = 32;
+      char buf[OBJ_INSTANCE_LEN + 1];
+      gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
+      this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
+      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): generating version: " << version << dendl;
+    }
+  } else {
+    ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): " << "version is: " << object->get_instance() << dendl;
+  }
+  return 0;
 }
 
 int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
@@ -990,17 +1082,15 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
     off_t ofs = offset;
     bool dirty = true;
     rgw::d4n::CacheBlock block, existing_block;
-    auto creationTime = startTime;
-
-    auto version = obj->get_instance();
 
+    std::string version;
     std::string prefix;
-    if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
-      prefix =obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
+    if (object->have_instance()) {
+      version = obj->get_instance();
     } else {
-      prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
+      version = this->version;
     }
-
+    prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
     rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
 
     block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); 
@@ -1013,40 +1103,9 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
 
     int ret = 0;
 
-    if (!d4n_writecache){
-      std::string oid = prefix + "_" + std::to_string(ofs)+ "_" + std::to_string(bl_len);
-      block.size = bl.length();
-      block.blockID = ofs;
-      block.dirty = false; //writing to the backend, hence the data is clean
-      block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_backend_address);
-       
-      ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl;
+    if (!d4n_writecache) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl;
       ret = next->process(std::move(data), offset);
-      if (ret == 0){
-        if (!blockDir->exist_key(&block, y)) {
-          if (blockDir->set(&block, y) < 0) 
-            ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
-          } else {
-            existing_block.blockID = block.blockID;
-            existing_block.size = block.size;
-            if (blockDir->get(&existing_block, y) < 0) {
-              ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
-            } else {
-              if (existing_block.version != block.version) {
-                if (blockDir->del(&existing_block, y) < 0) //delete existing block
-                  ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
-                if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
-                  ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
-              } else {
-            if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
-              ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
-            }
-          }
-        }
-      } else{
-          ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writing data to the backend failed!" << dendl;
-         return ret;
-      }
     } else {
       std::string oid = prefix + "_" + std::to_string(ofs);
       std::string key = "D_" + oid + "_" + std::to_string(bl_len);
@@ -1055,36 +1114,38 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
       block.blockID = ofs;
       block.dirty = true;
       block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+      block.version = version;
       dirty = true;
-      ret = driver->get_policy_driver()->get_cache_policy()->eviction(save_dpp, block.size, y);
-      if (ret == 0) {
-        //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
+      ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, y);
+      if (ret == 0) {     
        if (bl.length() > 0) {          
-          ret = driver->get_cache_driver()->put(save_dpp, key, bl, bl.length(), obj->get_attrs(), y);
+          ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): key is: " << key << dendl;
+          ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), obj->get_attrs(), y);
           if (ret == 0) {
-           driver->get_policy_driver()->get_cache_policy()->update(save_dpp, oid_in_cache, ofs, bl.length(), version, dirty, creationTime,  std::get<rgw_user>(obj->get_bucket()->get_owner()), y);
-            if (!blockDir->exist_key(&block, y)) {
-              if (blockDir->set(&block, y) < 0) //should we revert previous steps if this step fails?
-               ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+            ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl;
+           driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y);
+            if (!blockDir->exist_key(dpp, &block, y)) {
+              if (blockDir->set(dpp, &block, y) < 0) //should we revert previous steps if this step fails?
+               ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
             } else {
               existing_block.blockID = block.blockID;
               existing_block.size = block.size;
-              if (blockDir->get(&existing_block, y) < 0) {
-                ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+              if (blockDir->get(dpp, &existing_block, y) < 0) {
+                ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
               } else {
                 if (existing_block.version != block.version) {
-                  if (blockDir->del(&existing_block, y) < 0) //delete existing block
-                    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
-                  if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
-                    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+                  if (blockDir->del(dpp, &existing_block, y) < 0) //delete existing block
+                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+                  if (blockDir->set(dpp, &block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
                 } else {
-                  if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
-                    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+                  if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
+                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
                 }
               }
            }
           } else {
-            ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl;
+            ldpp_dout(dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl;
            return ret;
          }
        }
@@ -1104,99 +1165,88 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
                        const req_context& rctx,
                        uint32_t flags)
 {
-  bool dirty = true;
+  bool dirty = false;
   std::vector<std::string> hostsList = {};
   auto creationTime = startTime;
   std::string objEtag = etag;
-
-  auto version = obj->get_instance();
-
-  std::string prefix;
-  if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
-    prefix = obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
-  } else {
-    prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
-  }
-
-
-  if (d4n_writecache){
-    driver->get_policy_driver()->get_cache_policy()->updateObj(save_dpp, prefix, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, y);
-    hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address };
+  bool write_to_backend_store = false;
+  int ret;
   
-    rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
-                                .objName = obj->get_key().get_oid(), 
-                                .bucketName = obj->get_bucket()->get_name(),
-                                .creationTime = std::to_string(creationTime), 
-                                .dirty = dirty,
-                                .hostsList = hostsList
-                               };
-
-    if (driver->get_obj_dir()->set(&object, y) < 0) 
-      ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed." << dendl;
-    return 0;
-  }
-  /* Retrieve complete set of attrs */
-  int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
-                       delete_at, if_match, if_nomatch, user_data, zones_trace,
-                       canceled, rctx, flags);
-  obj->get_obj_attrs(rctx.y, save_dpp, NULL);
-
-  /* Append additional metadata to attributes */ 
-  rgw::sal::Attrs baseAttrs = obj->get_attrs();
-  rgw::sal::Attrs attrs_temp = baseAttrs;
-  buffer::list bl;
-  obj->load_obj_state(save_dpp, rctx.y);
-
-  bl.append(std::to_string(creationTime));
-  baseAttrs.insert({"mtime", bl});
-  bl.clear();
-
-  bl.append(std::to_string(obj->get_size()));
-  baseAttrs.insert({"object_size", bl});
-  bl.clear();
-
-  bl.append(std::to_string(accounted_size));
-  baseAttrs.insert({"accounted_size", bl});
-  bl.clear();
-  bl.append(std::to_string(obj->get_epoch()));
-  baseAttrs.insert({"epoch", bl});
-  bl.clear();
-
-  if (obj->have_instance()) {
-    bl.append(obj->get_instance());
-    baseAttrs.insert({"version_id", bl});
-    bl.clear();
-  } else {
-    bl.append(""); /* Empty value */
-    baseAttrs.insert({"version_id", bl});
-    bl.clear();
-  }
+  if (d4n_writecache) {
+    dirty = true;
+    std::string version;
+    if (object->have_instance()) {
+      version = obj->get_instance();
+    } else {
+      version = this->version; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
+      if (obj->get_bucket()->versioned()) {
+        object->set_instance(version);
+      }
+    } 
+    std::string key = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
 
-  auto iter = attrs_temp.find(RGW_ATTR_SOURCE_ZONE);
-  if (iter != attrs_temp.end()) {
-    bl.append(std::to_string(obj->get_short_zone_id()));
-    baseAttrs.insert({"source_zone_short_id", bl});
-    bl.clear();
-  } else {
-    bl.append("0"); /* Initialized to zero */
-    baseAttrs.insert({"source_zone_short_id", bl});
-    bl.clear();
+    ceph::real_time m_time;
+    if (mtime) {
+      m_time = *mtime;
+    } else {
+      m_time = real_clock::now();
+    }
+    object->set_mtime(m_time);
+    object->set_accounted_size(accounted_size);
+    ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " size is: " << object->get_size() << dendl;
+    object->set_obj_state_attrs(dpp, y, attrs);
+    bufferlist bl;
+    std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+    ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+    attrs.erase("user.rgw.mtime");
+    attrs.erase("user.rgw.object_size");
+    attrs.erase("user.rgw.accounted_size");
+    attrs.erase("user.rgw.epoch");
+    object->set_object_version(version);
+    if (ret == 0) {
+      object->set_attrs(attrs);
+      ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " version stored in update method is: " << version << dendl;
+      driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
+      ret = object->set_head_obj_dir_entry(dpp, y, true);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+        return ret;
+      }
+      driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y);
+
+      //write object to directory.
+      hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address };
+      rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+          .objName = obj->get_name(), 
+          .bucketName = obj->get_bucket()->get_name(),
+          .creationTime = std::to_string(creationTime), 
+          .dirty = dirty,
+          .hostsList = hostsList
+      };
+      ret = driver->get_obj_dir()->set(&object, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
+        return ret;
+      }
+    } else { //if get_cache_driver()->put()
+      write_to_backend_store = true;
+      ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " put failed for head_oid_in_cache wih error: " << ret << dendl;
+      ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " calling complete of backend store: " << dendl;
+    }
+  } else { // if d4n_writecache = true
+    write_to_backend_store = true;
   }
 
-  baseAttrs.insert(attrs.begin(), attrs.end());
-  
-  //bufferlist bl_empty;
-  //int putReturn = driver->get_cache_driver()->
-  //     put(save_dpp, obj->get_key().get_oid(), bl_empty, accounted_size, baseAttrs, y); /* Data already written during process call */
-  /*
-  if (putReturn < 0) {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation failed." << dendl;
-  } else {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation succeeded." << dendl;
+  if (write_to_backend_store) {
+    ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
+                            delete_at, if_match, if_nomatch, user_data, zones_trace,
+                            canceled, rctx, flags);
+    if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): Writing to backend store failed with err: " << ret << dendl;
+    }
   }
-  */
-  return ret;
+
+  return 0;
 }
 
 } } // namespace rgw::sal
index 27f4aa582fd17dcb5ae8b6388f6d72603b148c49..6788e1122d8a3645e28632c6bf9cc0536db83009 100644 (file)
@@ -105,11 +105,6 @@ class D4NFilterObject : public FilterObject {
     D4NFilterDriver* driver;
     std::string version;
     std::string prefix;
-
-    bool get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y);
-    void set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
-    int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version);
-    int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y);
   public:
     struct D4NFilterReadOp : FilterReadOp {
       public:
@@ -152,6 +147,8 @@ class D4NFilterObject : public FilterObject {
        virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
        virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
          RGWGetDataCB* cb, optional_yield y) override;
+  virtual int get_attr(const DoutPrefixProvider* dpp, const char* name,
+                        bufferlist& dest, optional_yield y) override;
 
       private:
        RGWGetDataCB* client_cb;
@@ -204,26 +201,33 @@ class D4NFilterObject : public FilterObject {
 
     void set_prefix(const std::string& prefix) { this->prefix = prefix; }
     const std::string get_prefix() { return this->prefix; }
+    bool get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y);
+    void set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
+    int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version);
+    int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool dirty = false);
+    bool check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, optional_yield y);
 };
 
 class D4NFilterWriter : public FilterWriter {
   private:
     D4NFilterDriver* driver; 
-    const DoutPrefixProvider* save_dpp;
+    D4NFilterObject* object;
+    const DoutPrefixProvider* dpp;
     bool atomic;
     optional_yield y;
     bool d4n_writecache;
     time_t startTime;
+    std::string version;
 
   public:
     D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj, 
        const DoutPrefixProvider* _dpp, optional_yield _y) : FilterWriter(std::move(_next), _obj),
                                                             driver(_driver),
-                                                            save_dpp(_dpp), atomic(false), y(_y) {}
+                                                            dpp(_dpp), atomic(false), y(_y) { object = static_cast<D4NFilterObject*>(obj); }
     D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj, 
        const DoutPrefixProvider* _dpp, bool _atomic, optional_yield _y) : FilterWriter(std::move(_next), _obj),
                                                                           driver(_driver),
-                                                                          save_dpp(_dpp), atomic(_atomic), y(_y) {}
+                                                                          dpp(_dpp), atomic(_atomic), y(_y) { object = static_cast<D4NFilterObject*>(obj); }
     virtual ~D4NFilterWriter() = default;
 
     virtual int prepare(optional_yield y);
@@ -239,7 +243,7 @@ class D4NFilterWriter : public FilterWriter {
                         const req_context& rctx,
                         uint32_t flags) override;
    bool is_atomic() { return atomic; };
-   const DoutPrefixProvider* dpp() { return save_dpp; } 
+   const DoutPrefixProvider* get_dpp() { return this->dpp; } 
 };
 
 } } // namespace rgw::sal
index dea349af514edcb71305e3f99e91432fbef033e6..12bfd7913b79bf3cf20f57971b3ad38cc3663295 100644 (file)
@@ -119,8 +119,8 @@ class LFUDAPolicyFixture : public ::testing::Test {
          } else {
            if (!block->hostsList.empty()) { 
              block->globalWeight += age;
-             
-             if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) {
+             auto globalWeight = std::to_string(block->globalWeight);
+             if (dir->update_field(block, "globalWeight", globalWeight, y) < 0) {
                return -1;
              } else {
                return 0;