]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: squashing following commits related
authorSamarah <samarah.uriarte@ibm.com>
Tue, 15 Oct 2024 18:37:39 +0000 (18:37 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
to lazy deletion - delete dirty objects data
blocks in cleaning method and non-dirty objects
data blocks in eviction method

1. rgw/d4n: Implement lazy deletion
2. rgw/d4n: cleaning method now supports delete markers
also (both versioned and null). The delete markers
are written correctly to the backend store.
Also modifying the description of rgw_d4n_cache_cleaning_interval
to explicitly state that the duration is in seconds.
3. rgw/d4n: do not call invalidate_dirty_object in case
of a simple delete request for dirty objects belonging
to a versioned bucket.
In this case, a delete marker needs to be created instead of
invalidating/deleting an object.
4. rgw/d4n: Update lazy deletion in policy

Co-authored-by: Pritha Srivastava <prsrivas@redhat.com>
Added code for creation of delete marker for a simple
delete request.

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
Commit 2c1adbbd2b363cd5e02fd7da0cb496ca6a93aa77

d4n/policy.cc: Use ceph::split

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/common/options/rgw.yaml.in
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_cache_driver.h
src/rgw/rgw_ssd_driver.cc

index 0d6d1894f363e06f936afed44bf49804a41508ac..9ee344c883419b1897a8af597dd6cf81a7f24d15 100644 (file)
@@ -4136,7 +4136,7 @@ options:
 - name: rgw_d4n_cache_cleaning_interval
   type: int
   level: advanced
-  desc: This is the interval for invoking write cache cleaning process
+  desc: This is the interval in seconds for invoking write cache cleaning process
   default: 1000
   services: 
   - rgw
index e7d481a29aa944a4be63f935b679c07a4ebe8d0a..04a760d0f2adf08d1d01d2b00322235bb8c2efd2 100644 (file)
@@ -351,7 +351,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl
     if (!multi) {
       if (std::get<0>(resp).value() != "1") {
         ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
-        return -EINVAL;
+        return -ENOENT;
       }
     }
 
@@ -382,7 +382,7 @@ int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int
 
     if (std::get<0>(resp).value().empty()) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
-      return -EINVAL;
+      return -ENOENT;
     }
 
     members = std::get<0>(resp).value();
@@ -440,7 +440,7 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const
     if (!multi) {
       if (std::get<0>(resp).value() != "1") {
         ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-        return -EINVAL;
+        return -ENOENT;
       }
     }
 
@@ -471,7 +471,7 @@ int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* o
     if (!multi) {
       if (std::get<0>(resp).value() == "0") {
         ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
-        return -EINVAL;
+        return -ENOENT;
       }
     }
 
@@ -729,7 +729,7 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option
     } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string
       response<std::string> resp;
       redis_exec(conn, ec, req, resp, y);
-      }
+    }
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl;
index 7a39a0ff5a8bcba6e5264ffcdef6db7013711442..a638309fb010c0de2f87b41f173b8a951a171dd1 100644 (file)
@@ -2,7 +2,7 @@
 
 #include "../../../common/async/yield_context.h"
 #include "common/async/blocked_completion.h"
-#include "common/dout.h"
+#include "common/split.h"
 #include "rgw_perf_counters.h"
 
 namespace rgw { namespace d4n {
@@ -54,8 +54,8 @@ int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_
   static auto obj_callback = [this](
           const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
                            time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-                           const rgw_obj_key& obj_key, optional_yield y) {
-    update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y);
+                           const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val) {
+    update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y, restore_val);
   };
 
   static auto block_callback = [this](
@@ -243,6 +243,40 @@ asio::awaitable<void> LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, opt
   }
 }
 
+/* Changes state to INVALID for dirty objects. An INVALID state indicates that a delete request has been
+ issued on an object and it must be deleted rather than written to the backend. This lazy deletion occurs
+ in the Cleaning method and prevents data races during concurrent requests. The method below returns "false"
+ if the state has not been set to INVALID, and "true" if it has. The state is not set to INVALID when
+ cleaning is in progress, a process which writes the object to the backend store. */
+bool LFUDAPolicy::invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) {
+  std::unique_lock<std::mutex> l(lfuda_cleaning_lock);
+
+  if (o_entries_map.empty())
+    return false;
+
+  auto p = o_entries_map.find(key);
+  if (p == o_entries_map.end()) {
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): key=" << key << " not found" << dendl;
+    return false;
+  }
+
+  if (p->second.second == State::INIT) {
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Setting State::INVALID for key=" << key << dendl;
+    p->second.second = State::INVALID;
+    int ret = cacheDriver->set_attr(dpp, DIRTY_BLOCK_PREFIX + key, RGW_CACHE_ATTR_INVALID, "1", y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Failed to set xattr, ret=" << ret << dendl;
+      return false;
+    }
+
+    return true;
+  } else if (p->second.second == State::IN_PROGRESS) {
+    state_cond.wait(l, [this, &key]{ return (o_entries_map.find(key) == o_entries_map.end()); });
+  }
+
+  return false;
+}
+
 CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optional_yield y) {
   const std::lock_guard l(lfuda_lock);
   if (entries_heap.empty())
@@ -252,13 +286,20 @@ CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optiona
   std::string key = entries_heap.top()->key;
   CacheBlock* victim = new CacheBlock();
 
-  victim->cacheObj.bucketName = key.substr(0, key.find('_')); 
-  key.erase(0, key.find('_') + 1);
-  victim->version = key.substr(0, key.find('_'));
-  key.erase(0, key.find('_') + 1);
-  victim->cacheObj.objName = key.substr(0, key.find('_'));
-  victim->blockID = entries_heap.top()->offset;
-  victim->size = entries_heap.top()->len;
+  auto parts = split(key, "#");
+  std::vector<std::string> block_info;
+  block_info.assign(parts.begin(), parts.end());
+  
+  if (block_info.size() != 5) {
+    ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Key of the top entry in the min heap has not been constructed correctly." << dendl;
+    return nullptr;
+  }
+
+  victim->cacheObj.bucketName = block_info[0]; 
+  victim->version = block_info[1]; 
+  victim->cacheObj.objName = block_info[2]; 
+  victim->blockID = std::stoull(block_info[3]); 
+  victim->size = std::stoull(block_info[4]); 
 
   if (blockDir->get(dpp, victim, y) < 0) {
     return nullptr;
@@ -280,6 +321,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
   if (entries_heap.empty())
     return 0;
 
+  int ret = -1;
   uint64_t freeSpace = cacheDriver->get_free_space(dpp);
 
   while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop? 
@@ -288,7 +330,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
     if (victim == nullptr) {
       ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl;
       delete victim;
-      return 0; // not necessarily an error? -Sam
+      return -ENOSPC;
     }
 
     const std::lock_guard l(lfuda_lock);
@@ -301,7 +343,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
     // check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty
     if (it->second->dirty) {
       ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl;
-      return 0;
+      return -ENOSPC;
     }
     int avgWeight = weightSum / entries_map.size();
 
@@ -311,7 +353,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
         (*it->second->handle)->localWeight = it->second->localWeight;
        entries_heap.decrease(it->second->handle); // larger value means node must be decreased to maintain min heap 
 
-       if (int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y) < 0) { 
+       if ((ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y)) < 0) { 
          delete victim;
          return ret;
         }
@@ -326,20 +368,21 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
     }
 
     victim->globalWeight += it->second->localWeight;
-    if (int ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) {
+    if ((ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y)) < 0) {
       delete victim;
       return ret;
     }
 
-    if (int ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
+    if ((ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) {
       delete victim;
       return ret;
     }
 
     delete victim;
 
-    if (int ret = cacheDriver->del(dpp, key, y) < 0) 
+    if ((ret = cacheDriver->delete_data(dpp, key, y)) < 0) {
       return ret;
+    }
 
     ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl;
 
@@ -404,15 +447,24 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key,
   weightSum += ((localWeight < 0) ? 0 : localWeight);
 }
 
-void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
+void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)
 {
   using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
+  State state{State::INIT};
   ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl;
+
+  if (!restore_val.empty() && restore_val == "1") { // No need to set the xattr because this case only occurs when the state has
+    state = State::INVALID;                         // been retrieved from the xattr itself.
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): State restored to INVALID." << dendl;
+  } else {
+    state = State::INIT;
+  }
+
   const std::lock_guard l(lfuda_cleaning_lock);
   LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
   handle_type handle = object_heap.push(e);
   e->set_handle(handle);
-  o_entries_map.emplace(key, e);
+  o_entries_map.emplace(key, std::make_pair(e, state));
   cond.notify_one();
 }
 
@@ -441,14 +493,45 @@ bool LFUDAPolicy::erase_dirty_object(const DoutPrefixProvider* dpp, const std::s
     return false;
   }
 
-  object_heap.erase(p->second->handle);
+  object_heap.erase(p->second.first->handle);
   o_entries_map.erase(p);
-  delete p->second;
-  p->second = nullptr;
+  delete p->second.first;
+  p->second.first = nullptr;
+  state_cond.notify_one();
 
   return true;
 }
 
+int LFUDAPolicy::delete_data_blocks(const DoutPrefixProvider* dpp, LFUDAObjEntry* e, optional_yield y) {
+  off_t lst = e->size, fst = 0, ofs = 0;
+  uint64_t len = 0;
+
+  do {
+    if (fst >= lst) {
+      break;
+    }
+    off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+    off_t cur_len = cur_size - fst;
+    std::string prefix = e->key + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+    std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix;
+
+    int ret = -1;
+    if ((ret = cacheDriver->delete_data(dpp, oid_in_cache, y)) == 0) { // Sam: do we want del or delete_data here?
+      if (!(ret = erase(dpp, prefix, y))) {
+       ldpp_dout(dpp, 0) << "Failed to delete policy entry for: " << oid_in_cache << ", ret=" << ret << dendl;
+        return -EINVAL;
+      }
+    } else {
+      ldpp_dout(dpp, 0) << "Failed to delete data block " << oid_in_cache << ", ret=" << ret << dendl;
+      return -EINVAL;
+    }
+
+    ofs += len;
+  } while (len > 0);
+
+  return 0;
+}
+
 void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
 {
   const int interval = dpp->get_cct()->_conf->rgw_d4n_cache_cleaning_interval;
@@ -456,6 +539,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
     ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl;
     uint64_t len = 0;
     rgw::sal::Attrs obj_attrs;
+    bool invalid = false;
   
     ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock" << dendl;
     std::unique_lock<std::mutex> l(lfuda_cleaning_lock);
@@ -474,234 +558,366 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl;
     l.unlock();
-    if (!e->key.empty() && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
-      rgw_user c_rgw_user = e->user;
-      //writing data to the backend
-      //we need to create an atomic_writer
-      std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
-
-      std::unique_ptr<rgw::sal::Bucket> c_bucket;
-      rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, e->bucket_id);
-
-      RGWBucketInfo c_bucketinfo;
-      c_bucketinfo.bucket = c_rgw_bucket;
-      c_bucketinfo.owner = c_rgw_user;
-      int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
-      if (ret < 0) {
-        ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
-        //Remove bucket should be implemented in d4n which will take care of deleting objects belonging to the bucket, and hence we should not reach here
-        erase_dirty_object(dpp, e->key, null_yield);
-        continue;
-      }
-
-      std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(e->obj_key);
-      ldpp_dout(dpp, 20) << __func__ << "(): c_obj oid =" << c_obj->get_oid() << dendl;
-
-      ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
-
-      std::unique_ptr<rgw::sal::Writer> processor =  driver->get_atomic_writer(dpp,
-              null_yield,
-              c_obj.get(),
-              owner,
-              NULL,
-              0,
-              "");
 
-      int op_ret = processor->prepare(null_yield);
-      if (op_ret < 0) {
-       ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
-        erase_dirty_object(dpp, e->key, null_yield);
+    int diff = std::difftime(time(NULL), e->creationTime);
+    if (!e->key.empty() && (diff > interval)) { // if block is dirty and written more than interval seconds ago
+      l.lock();
+      auto p = o_entries_map.find(e->key);
+      if (p == o_entries_map.end()) {
+       l.unlock();
+       continue;
       }
-
-      std::string prefix = url_encode(e->bucket_id) + CACHE_DELIM + url_encode(e->version) + CACHE_DELIM + url_encode(c_obj->get_name());
-      off_t lst = e->size;
-      off_t fst = 0;
-      off_t ofs = 0;
-
-      rgw::sal::DataProcessor* filter = processor.get();
-      std::string head_oid_in_cache = DIRTY_BLOCK_PREFIX + prefix;
-      std::string new_head_oid_in_cache = prefix;
-      ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl;
-      ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
-      bufferlist bl;
-      cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
-      obj_attrs.erase(RGW_CACHE_ATTR_MTIME);
-      obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
-      obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
-      obj_attrs.erase(RGW_CACHE_ATTR_EPOCH);
-
-      do {
-        ceph::bufferlist data;
-        if (fst >= lst){
-         break;
-        }
-        off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
-        off_t cur_len = cur_size - fst;
-        std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
-        ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl;
-        rgw::sal::Attrs attrs;
-        cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield);
-        len = data.length();
-        fst += len;
-
-        if (len == 0) {
-          // TODO: if len of any block is 0 for some reason, we must return from here?
-          break;
-        }
-
-        op_ret = filter->process(std::move(data), ofs);
-        if (op_ret < 0) {
-         ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
-         << op_ret << dendl;
-          erase_dirty_object(dpp, e->key, null_yield);
-        }
-
-        ofs += len;
-      } while (len > 0);
-
-      op_ret = filter->process({}, ofs);
-
-      const req_context rctx{dpp, null_yield, nullptr};
-      ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime);
-      op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs,
-                              std::nullopt, ceph::real_time(), nullptr, nullptr,
-                              nullptr, nullptr, nullptr,
-                              rctx, rgw::sal::FLAG_LOG_OP);
-
-      if (op_ret < 0) {
-        ldpp_dout(dpp, 20) << __func__ << "processor->complete() returned ret=" << op_ret << dendl;
-        erase_dirty_object(dpp, e->key, null_yield);
-      }
-      //invoke update() with dirty flag set to false, to update in-memory metadata for each block
-      // reset values
-      lst = e->size;
-      fst = 0;
-      do {
-        if (fst >= lst) {
-         break;
-        }
-        off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
-        off_t cur_len = cur_size - fst;
-
-        std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
-        ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl;
-        std::string new_oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
-        //Rename block to remove "D" prefix
-        cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
-        //Update in-memory data structure for each block
-        this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y);
-
-        rgw::d4n::CacheBlock block;
-        block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
-        block.cacheObj.objName = c_obj->get_key().get_oid();
-        block.size = cur_len;
-        block.blockID = fst;
-        std::string dirty = "false";
-        op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
-        if (op_ret < 0) {
-         ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
-        }
-        fst += cur_len;
-      } while(fst < lst);
-
-      cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
-
-      //invoke update() with dirty flag set to false, to update in-memory metadata for head
-      this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
-
-      rgw::d4n::CacheBlock block;
-      block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
-      block.cacheObj.objName = c_obj->get_name();
-      block.size = 0;
-      block.blockID = 0;
-      if (c_obj->have_instance()) {
-        blockDir->get(dpp, &block, null_yield);
-        if (block.version == c_obj->get_instance()) { //versioned case - update head block entry that has latest version
-          std::string dirty = "false";
-          op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
-          if (op_ret < 0) {
-              ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
-          }
-        }
-      } else { //non-versioned case
-        std::string dirty = "false";
-        op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
-        if (op_ret < 0) {
-            ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
-        }
-      }
-      if (c_obj->have_instance()) {
-        rgw::d4n::CacheBlock instance_block;
-        instance_block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
-        instance_block.cacheObj.objName = c_obj->get_oid();
-        instance_block.size = 0;
-        instance_block.blockID = 0;
-        op_ret = blockDir->update_field(dpp, &instance_block, "dirty", "false", null_yield);
-        if (op_ret < 0) {
-            ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
-        }
+      if (p->second.second == State::INVALID) {
+       invalid = true;
       }
+      l.unlock();
+      
+      // If the state is invalid, the blocks must be deleted from the cache rather than written to the backend.
+      if (invalid) {
+       ldpp_dout(dpp, 10) << __func__ << "(): State is INVALID; deleting object." << dendl;
+       int ret = -1;
+       if ((ret = cacheDriver->delete_data(dpp, DIRTY_BLOCK_PREFIX + e->key, y)) == 0) { // Sam: do we want del or delete_data here?
+         if (!(ret = erase(dpp, e->key, y))) {
+           ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << e->key << ", ret=" << ret << dendl; // TODO: what must occur during failure?
+         }
+       } else {
+         ldpp_dout(dpp, 0) << "Failed to delete head object for: " << e->key << ", ret=" << ret << dendl;
+       }
 
-      //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets
-      if (!c_obj->have_instance()) {
-        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
-        rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
-          .objName = c_obj->get_name(),
-          .bucketName = c_obj->get_bucket()->get_bucket_id(),
-        };
-        ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
-        if (ret < 0) {
-          ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
-        }
+       if (!e->delete_marker) {
+         ret = delete_data_blocks(dpp, e, y);
+         if (ret == 0) {
+           erase_dirty_object(dpp, e->key, null_yield);
+         } else {
+           ldpp_dout(dpp, 0) << "Failed to delete blocks for: " << e->key << ", ret=" << ret << dendl;
+         }
+       }
       } else {
-        rgw::d4n::CacheBlock latest_block = block;
-        latest_block.cacheObj.objName = c_obj->get_name();
-        //add watch on latest entry, as it can be modified by a put or a del
-        ret = blockDir->watch(dpp, &latest_block, y);
-        if (ret < 0) {
-          ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
-        }
-        int retry = 3;
-        while(retry) {
-          retry--;
-          //get latest entry
-          ret = blockDir->get(dpp, &latest_block, y);
-          if (ret < 0) {
-            ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
-          }
-          //start redis transaction using MULTI
-          blockDir->multi(dpp, y);
-          if (latest_block.version == e->version) {
-            //remove object entry from ordered set
-            if (c_obj->have_instance()) {
-              blockDir->del(dpp, &latest_block, y, true);
-              if (ret < 0) {
-                ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
-              }
-            }
-          }
-          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
-          rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
-            .objName = c_obj->get_name(),
-            .bucketName = c_obj->get_bucket()->get_bucket_id(),
-          };
-          ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
-          if (ret < 0) {
-            ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
-          }
-          std::vector<std::string> responses;
-          ret = blockDir->exec(dpp, responses, y);
-          if (responses.empty()) {
-            ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
-            continue;
-          }
-          break;
-        }//end-while (retry)
+       l.lock();
+       p->second.second = State::IN_PROGRESS;
+       l.unlock();
+
+       rgw_user c_rgw_user = e->user; 
+       //writing data to the backend
+       //we need to create an atomic_writer
+       std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
+
+       std::unique_ptr<rgw::sal::Bucket> c_bucket;
+       rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, e->bucket_id);
+
+       RGWBucketInfo c_bucketinfo;
+       c_bucketinfo.bucket = c_rgw_bucket;
+       c_bucketinfo.owner = c_rgw_user;
+       int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
+       if (ret < 0) {
+         ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
+         //Remove bucket should be implemented in d4n which will take care of deleting objects belonging to the bucket, and hence we should not reach here
+         erase_dirty_object(dpp, e->key, null_yield);
+         continue;
+       }
+
+       std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(e->obj_key);
+       bool null_instance = (c_obj->get_instance() == "null");
+       if (null_instance) {
+         //clear the instance for backend store
+         c_obj->clear_instance();
+       }
+       ldpp_dout(dpp, 20) << __func__ << "(): c_obj oid =" << c_obj->get_oid() << dendl;
+
+       ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
+
+       std::string prefix = url_encode(e->bucket_id) + CACHE_DELIM + url_encode(e->version) + CACHE_DELIM + url_encode(c_obj->get_name());
+       std::string head_oid_in_cache = DIRTY_BLOCK_PREFIX + prefix;
+       std::string new_head_oid_in_cache = prefix;
+       ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl;
+       ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
+       int op_ret;
+       if (e->delete_marker) {
+         bool null_delete_marker = (c_obj->get_instance() == "null");
+         if (null_delete_marker) {
+           //clear the instance for backend store
+           c_obj->clear_instance();
+         }
+         std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = c_obj->get_delete_op();
+         del_op->params.obj_owner = owner;
+         del_op->params.bucket_owner = c_bucket->get_owner();
+         del_op->params.versioning_status = c_bucket->get_info().versioning_status();
+         //populate marker_version_id only when delete marker is not null
+         del_op->params.marker_version_id = e->version;
+         op_ret = del_op->delete_obj(dpp, null_yield, rgw::sal::FLAG_LOG_OP);
+         if (op_ret >= 0) {
+           bool delete_marker = del_op->result.delete_marker;
+           std::string version_id = del_op->result.version_id;
+           ldpp_dout(dpp, 20) << __func__ << "delete_obj delete_marker=" << delete_marker << dendl;
+           ldpp_dout(dpp, 20) << __func__ << "delete_obj version_id=" << version_id << dendl;
+         } else {
+           ldpp_dout(dpp, 20) << __func__ << "delete_obj returned ret=" << op_ret << dendl;
+           erase_dirty_object(dpp, e->key, null_yield);
+           continue;
+         }
+         if (null_delete_marker) {
+           //restore instance for directory data processing in later steps
+           c_obj->set_instance("null");
+         }
+       } else { //end-if delete_marker
+
+         std::unique_ptr<rgw::sal::Writer> processor =  driver->get_atomic_writer(dpp,
+                 null_yield,
+                 c_obj.get(),
+                 owner,
+                 NULL,
+                 0,
+                 "");
+
+         op_ret = processor->prepare(null_yield);
+         if (op_ret < 0) {
+      ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
+           erase_dirty_object(dpp, e->key, null_yield);
+           continue;
+         }
+
+         off_t lst = e->size;
+         off_t fst = 0;
+         off_t ofs = 0;
+
+         rgw::sal::DataProcessor* filter = processor.get();
+         bufferlist bl;
+         op_ret = cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
+         if (op_ret < 0) {
+           ldpp_dout(dpp, 20) << __func__ << "cacheDriver->get_attrs returned ret=" << op_ret << dendl;
+           erase_dirty_object(dpp, e->key, null_yield);
+           continue;
+         }
+         obj_attrs.erase(RGW_CACHE_ATTR_MTIME);
+         obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+         obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+         obj_attrs.erase(RGW_CACHE_ATTR_EPOCH);
+         obj_attrs.erase(RGW_CACHE_ATTR_MULTIPART);
+         obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+         obj_attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
+         obj_attrs.erase(RGW_CACHE_ATTR_LOCAL_WEIGHT);
+
+         do {
+           ceph::bufferlist data;
+           if (fst >= lst){
+       break;
+           }
+           off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+           off_t cur_len = cur_size - fst;
+           std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+           ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl;
+           rgw::sal::Attrs attrs;
+           cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield);
+           if (op_ret < 0) {
+             ldpp_dout(dpp, 20) << __func__ << "cacheDriver->get returned ret=" << op_ret << dendl;
+             erase_dirty_object(dpp, e->key, null_yield);
+             continue;
+           }
+           len = data.length();
+           fst += len;
+
+           if (len == 0) {
+             // TODO: if len of any block is 0 for some reason, we must return from here?
+             break;
+           }
+
+           op_ret = filter->process(std::move(data), ofs);
+           if (op_ret < 0) {
+       ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
+       << op_ret << dendl;
+             erase_dirty_object(dpp, e->key, null_yield);
+             continue;
+           }
+
+           ofs += len;
+         } while (len > 0);
+
+         op_ret = filter->process({}, ofs);
+
+         const req_context rctx{dpp, null_yield, nullptr};
+         ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime);
+         op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs,
+                                 std::nullopt, ceph::real_time(), nullptr, nullptr,
+                                 nullptr, nullptr, nullptr,
+                                 rctx, rgw::sal::FLAG_LOG_OP);
+
+         if (op_ret < 0) {
+           ldpp_dout(dpp, 20) << __func__ << "processor->complete() returned ret=" << op_ret << dendl;
+           erase_dirty_object(dpp, e->key, null_yield);
+           continue;
+         }
+         //invoke update() with dirty flag set to false, to update in-memory metadata for each block
+         // reset values
+         lst = e->size;
+         fst = 0;
+         do {
+           if (fst >= lst) {
+       break;
+           }
+           off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+           off_t cur_len = cur_size - fst;
+
+           std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+           ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl;
+           std::string new_oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+           //Rename block to remove "D" prefix
+           cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
+           //Update in-memory data structure for each block
+           this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y);
+
+           rgw::d4n::CacheBlock block;
+           block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
+           block.cacheObj.objName = c_obj->get_key().get_oid();
+           block.size = cur_len;
+           block.blockID = fst;
+      std::string dirty = "false";
+           op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
+           if (op_ret < 0) {
+       ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
+           }
+           fst += cur_len;
+         } while(fst < lst);
+       } //end-else if delete_marker
+
+       cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
+
+       //invoke update() with dirty flag set to false, to update in-memory metadata for head
+       this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
+
+       if (null_instance) {
+         //restore instance for directory data processing in later steps
+         c_obj->set_instance("null");
+       }
+       rgw::d4n::CacheBlock block;
+       block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
+       block.cacheObj.objName = c_obj->get_name();
+       block.size = 0;
+       block.blockID = 0;
+       //non-versioned case
+       if (!c_obj->have_instance()) {
+         //add watch on latest entry, as it can be modified by a put or a del
+         ret = blockDir->watch(dpp, &block, y);
+         if (ret < 0) {
+           ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+         }
+         // hash entry for latest version
+         op_ret = blockDir->get(dpp, &block, y);
+         if (op_ret < 0) {
+           ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+         } else {
+           // if this entry is the latest, it could have been overwritten by a newer one
+           if (block.version == e->version) {
+             rgw::d4n::CacheBlock null_block;
+             null_block = block;
+             null_block.cacheObj.objName = "_:null_" + c_obj->get_name();
+             //hash entry for null block
+             op_ret = blockDir->get(dpp, &null_block, y);
+             if (op_ret < 0) {
+               ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl;
+             } else {
+               if (null_block.version == e->version) {
+                 block.cacheObj.dirty = false;
+                 null_block.cacheObj.dirty = false;
+                 //start redis transaction using MULTI
+                 blockDir->multi(dpp, y);
+                 auto blk_op_ret = blockDir->set(dpp, &block, y);
+                 auto null_op_ret = blockDir->set(dpp, &null_block, y);
+                 if (blk_op_ret < 0 || null_op_ret < 0) {
+                   blockDir->discard(dpp, y);
+                   ldpp_dout(dpp, 0) << __func__ << "(): Failed to Queue update dirty flag for latest entry/null entry in block directory" << dendl;
+                 } else {
+                   std::vector<std::string> responses;
+                   ret = blockDir->exec(dpp, responses, y);
+                   if (responses.empty()) {
+                     //transaction failed, which means latest hash entry has been modified by a put/del so ignore and do not update the entries
+                     ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty which means transaction failed!" << dendl;
+                   }
+                 }
+               }
+             }
+           } //end-if (block.version == entry->version)
+         } //end - else if op_ret == 0
+         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+         rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+           .objName = c_obj->get_name(),
+           .bucketName = c_obj->get_bucket()->get_bucket_id(),
+         };
+         //remove the entry from the ordered set using its score, as the object is already cleaned
+         //need not be part of a transaction as it is being removed based on its score which is its creation time.
+         ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y);
+         if (ret < 0) {
+           ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+         }
+       }
+       if (c_obj->have_instance()) { //versioned case
+         std::string objName = c_obj->get_oid();
+         if (c_obj->get_instance() == "null") {
+           objName = "_:null_" + c_obj->get_name();
+         }
+         rgw::d4n::CacheBlock instance_block;
+         instance_block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
+         instance_block.cacheObj.objName = objName;
+         instance_block.size = 0;
+         instance_block.blockID = 0;
+    std::string dirty = "false";
+         op_ret = blockDir->update_field(dpp, &instance_block, "dirty", dirty, null_yield);
+         if (op_ret < 0) {
+             ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
+         }
+         //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets
+         rgw::d4n::CacheBlock latest_block = block;
+         latest_block.cacheObj.objName = c_obj->get_name();
+         //add watch on latest entry, as it can be modified by a put or a del
+         ret = blockDir->watch(dpp, &latest_block, y);
+         if (ret < 0) {
+           ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+         }
+         int retry = 3;
+         while(retry) {
+           retry--;
+           //get latest entry
+           ret = blockDir->get(dpp, &latest_block, y);
+           if (ret < 0) {
+             ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+           }
+           //start redis transaction using MULTI
+           blockDir->multi(dpp, y);
+           if (latest_block.version == e->version) {
+             //remove object entry from ordered set
+             if (c_obj->have_instance()) {
+               blockDir->del(dpp, &latest_block, y, true);
+               if (ret < 0) {
+                 blockDir->discard(dpp, y);
+                 ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+                 continue;
+               }
+             }
+           }
+           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+           rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+             .objName = c_obj->get_name(),
+             .bucketName = c_obj->get_bucket()->get_bucket_id(),
+           };
+           ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+           if (ret < 0) {
+             blockDir->discard(dpp, y);
+             ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+             continue;
+           }
+           std::vector<std::string> responses;
+           ret = blockDir->exec(dpp, responses, y);
+           if (responses.empty()) {
+             ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
+             continue;
+           }
+           break;
+         }//end-while (retry)
+       }
+       //remove entry from map and queue, erase_dirty_object locks correctly
+       erase_dirty_object(dpp, e->key, null_yield);
       }
-      //remove entry from map and queue, erase_dirty_object locks correctly
-      erase_dirty_object(dpp, e->key, null_yield);
-    } else { //end-if std::difftime(time(NULL), e->creationTime) > interval
-      std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised?
+    } else if (diff < interval) { //end-if std::difftime(time(NULL), e->creationTime) > interval
+      std::this_thread::sleep_for(std::chrono::seconds(interval - diff)); //TODO:: should this time be optimised?
     }
   } //end-while true
 }
@@ -746,7 +962,7 @@ void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, ui
 }
 
 void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-const rgw_obj_key& obj_key, optional_yield y)
+const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)
 {
   const std::lock_guard l(lru_lock);
   ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key);
index e4c826aafefc569ab9412b5648699e9e2301e9d9..6b8c60666fce33bb16bfe8b6ead5de2ecc90fda1 100644 (file)
@@ -17,6 +17,12 @@ namespace sys = boost::system;
 
 static std::string empty = std::string();
 
+enum class State { // state machine for dirty objects in the cache
+  INIT,
+  IN_PROGRESS, // object is being written to the backend
+  INVALID // object is to be deleted during cleanup 
+};
+
 class CachePolicy {
   protected:
     struct Entry : public boost::intrusive::list_base_hook<> {
@@ -65,9 +71,10 @@ class CachePolicy {
     virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0;
     virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, 
                            time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-                           const rgw_obj_key& obj_key, optional_yield y) = 0;
+                           const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) = 0;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
     virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
+    virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) = 0;
     virtual void cleaning(const DoutPrefixProvider* dpp) = 0;
 };
 
@@ -125,10 +132,11 @@ class LFUDAPolicy : public CachePolicy {
     Heap entries_heap;
     Object_Heap object_heap; //This heap contains dirty objects ordered by their creation time, used for cleaning method
     std::unordered_map<std::string, LFUDAEntry*> entries_map;
-    std::unordered_map<std::string, LFUDAObjEntry*> o_entries_map; //Contains only dirty objects, used for look-up
+    std::unordered_map<std::string, std::pair<LFUDAObjEntry*, State> > o_entries_map; //Contains only dirty objects, used for look-up
     std::mutex lfuda_lock;
     std::mutex lfuda_cleaning_lock;
     std::condition_variable cond;
+    std::condition_variable state_cond;
     bool quit{false};
 
     int age = 1, weightSum = 0, postedSum = 0;
@@ -158,6 +166,7 @@ class LFUDAPolicy : public CachePolicy {
         return nullptr;
       return it->second;
     }
+    int delete_data_blocks(const DoutPrefixProvider* dpp, LFUDAObjEntry* e, optional_yield y);
 
   public:
     LFUDAPolicy(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), 
@@ -184,15 +193,16 @@ class LFUDAPolicy : public CachePolicy {
     void save_y(optional_yield y) { this->y = y; }
     virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, 
                            time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-                           const rgw_obj_key& obj_key, optional_yield y) override;
-    virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
+                           const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) override;
+    virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+    virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override;
     virtual void cleaning(const DoutPrefixProvider* dpp) override;
     LFUDAObjEntry* find_obj_entry(const std::string& key) {
       auto it = o_entries_map.find(key);
       if (it == o_entries_map.end()) {
         return nullptr;
       }
-      return it->second;
+      return it->second.first;
     }
 };
 
@@ -217,9 +227,10 @@ class LRUPolicy : public CachePolicy {
     virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
     virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, 
                            time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-                           const rgw_obj_key& obj_key, optional_yield y) override;
+                           const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+    virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override { return false; }
     virtual void cleaning(const DoutPrefixProvider* dpp) override {}
 };
 
index 6626c4690e326e6dc803be2346c111e9c48ed146..bd101bd27f680b7838f2bc81ba1fb57d7ecf03f3 100644 (file)
@@ -774,6 +774,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
       .cacheObj = version_object,
       .blockID = 0,
       .version = this->get_object_version(),
+      .deleteMarker = this->delete_marker,
       .size = 0,
     };
 
@@ -1997,22 +1998,6 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
   return 0;
 }
 
-int D4NFilterObject::D4NFilterDeleteOp::delete_from_cache_and_policy(const DoutPrefixProvider* dpp, std::string oid, 
-                                                                    std::string prefix, optional_yield y)
-{
-  int ret = -1;
-
-  if ((ret = source->driver->get_cache_driver()->delete_data(dpp, oid, y)) == 0) { // Sam: do we want del or delete_data here? 
-    if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->erase(dpp, prefix, y))) {
-      ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << source->get_key().get_oid() << ", ret=" << ret << dendl;
-    }
-  } else {
-    ldpp_dout(dpp, 0) << "Failed to delete head object for: " << source->get_key().get_oid() << ", ret=" << ret << dendl;
-  }
-
-  return ret;
-}
-
 int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
                                                    optional_yield y, uint32_t flags)
 {
@@ -2026,7 +2011,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
   /* check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker.
      As a result, the below check guarantees the head object is not in the cache. */
   if (!source->check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y) && !block.deleteMarker) {
-    /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a dlete marker there
+    /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a delete marker there
        since no object with source->get_name() will be found in the cache (and this is correct) */
     ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): head object not found; calling next->delete_obj" << dendl;
     next->params = params;
@@ -2039,11 +2024,6 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
     auto objDir = source->driver->get_obj_dir();
     std::string policy_prefix = head_oid_in_cache;
     std::string version = source->get_object_version();
-
-    // call invalidate_object based on whether the object is dirty(objDirty)
-    //if the object is still dirty and has been marked invalid, then let objDirty be true, else set it to false
-    //remove code that deletes head/data block in this method.
-
     std::string objName = source->get_name();
     // special handling for name starting with '_'
     if (objName[0] == '_') {
@@ -2051,8 +2031,16 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
     }
 
     if (objDirty) { // head object dirty flag represents object dirty flag
-      policy_prefix.erase(0, 2); // remove "D_" prefix from policy key since the policy keys do not hold this information
-    }    
+      //for versioned buckets, for a simple delete we need to create a delete marker (and not invalidate/delete any object)
+      if (!source->get_bucket()->versioned() || (block.cacheObj.objName != source->get_name())) {
+        policy_prefix.erase(0, 2); // remove "D_" prefix from policy key since the policy keys do not hold this information
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): calling invalidate_dirty_object for: " << head_oid_in_cache << dendl;
+        if (!source->driver->get_policy_driver()->get_cache_policy()->invalidate_dirty_object(dpp, policy_prefix)) {
+          objDirty = false;
+        }
+      }
+    }
+
     // Versioned buckets - this will delete the head object indexed by version-id (even null) and latest en
     if (source->get_bucket()->versioned()) {
         /* 1. clean objects - no latest head entry as latest entry to be retrieved from backend now
@@ -2189,12 +2177,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
                   //start redis transaction using MULTI
                   blockDir->multi(dpp, y);
                 }
-                if ((ret = blockDir->del(dpp, &block, y, true)) == 0) {
-                  if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
-                    blockDir->discard(dpp, y);
-                    return ret;
-                  }
-                } else if (ret < 0 && ret != -ENOENT) {
+                if ((ret = blockDir->del(dpp, &block, y, true)) < 0 && ret != -ENOENT) {
                   ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
                   blockDir->discard(dpp, y);
                   return ret;
@@ -2236,14 +2219,13 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
     /* Non-versioned buckets - we will delete the latest entry and the "null" entry
        dirty objects - delete "null" entry from ordered set also */
     if (!source->get_bucket()->versioned()) {
-      if ((ret = blockDir->del(dpp, &block, y)) == 0) {
-        if (objDirty) {
-          if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
-            return ret;
-          }
-        }
-      } else if (ret < 0 && ret != -ENOENT) {
-        ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+      //start redis transaction using MULTI to delete the latest entry and the "null" entry together
+      blockDir->multi(dpp, y);
+      //explore redis pipelining to send the two 'DEL' commands together in a single request
+      ret = blockDir->del(dpp, &block, y, true);
+      if (ret < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue delete head object op in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+        blockDir->discard(dpp, y);
         return ret;
       }
       //if we get request for latest head entry, delete the null block and vice versa
@@ -2253,8 +2235,8 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
         block.cacheObj.objName = source->get_name();
       }
       if ((ret = blockDir->del(dpp, &block, y)) < 0) {
-        ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
-        return ret;
+        ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+        blockDir->discard(dpp, y);
       }
       //dirty objects - delete from ordered set
       if (objDirty) {
@@ -2275,11 +2257,6 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
         ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl;
         return ret;
       }
-      if (objDirty) {
-        if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
-          return ret;
-        }
-      }
     } //end-if non-versioned buckets
 
     int size;
@@ -2334,43 +2311,25 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
             }
           }
 
-        if ((ret = blockDir->del(dpp, &block, y)) == 0) {
-          prefix = DIRTY_BLOCK_PREFIX + prefix;
-          std::string oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
-
-          if (objDirty) {
-            std::string key = policy_prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
-            if ((ret = delete_from_cache_and_policy(dpp, oid_in_cache, key, y)) < 0) {
-              ldpp_dout(dpp, 0) << "ERROR for block " << source->get_name() << " blockID: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl;
-              return ret;
-            }
+          if ((ret = blockDir->del(dpp, &block, y)) == -ENOENT) { 
+            continue;
+          } else if (ret < 0) {
+            ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl;
+            return ret;
           }
-        } else if (ret == -ENOENT) {
-          continue;
-        } else {
-          ldpp_dout(dpp, 0) << "Failed to delete directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl;
-          return ret;
-        }
 
         fst += cur_len;
       } while (fst < lst);
     }
 
-    std::string key = policy_prefix;
     if (!objDirty) {
       next->params = params;
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object is not dirty; calling next->delete_obj" << dendl;
       ret = next->delete_obj(dpp, y, flags);
       result = next->result;
       return ret;
-    } else {
-      if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->erase_dirty_object(dpp, key, y))) {
-        ldpp_dout(dpp, 0) << "Failed to delete policy object entry for: " << source->get_name() << ", ret=" << ret << dendl;
-        return -ENOENT;
-      } else {
-        return 0;
-      }
     }
+    return 0;
   }
 }
 
index 85be4675cf4f283b978c9e2dc07cfa85a93db790..4a9b7582aca3215b3144545be8ec1ce35b74e862 100644 (file)
@@ -203,7 +203,6 @@ class D4NFilterObject : public FilterObject {
       virtual ~D4NFilterDeleteOp() = default;
 
       virtual int delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) override;
-      int delete_from_cache_and_policy(const DoutPrefixProvider* dpp, std::string oid, std::string prefix, optional_yield y);
     };
 
     D4NFilterObject(std::unique_ptr<Object> _next, D4NFilterDriver* _driver) : FilterObject(std::move(_next)),
index 6d416bc20d9bb5256d9c9035b0e765357826a81e..dd600f7a7ae3081c62ba952433404984ebff3540 100644 (file)
@@ -14,15 +14,16 @@ constexpr char RGW_CACHE_ATTR_VERSION_ID[] = "user.rgw.version_id";
 constexpr char RGW_CACHE_ATTR_SOURC_ZONE[] = "user.rgw.source_zone";
 constexpr char RGW_CACHE_ATTR_LOCAL_WEIGHT[] = "user.rgw.localWeight";
 constexpr char RGW_CACHE_ATTR_DELETE_MARKER[] = "user.rgw.deleteMarker";
+constexpr char RGW_CACHE_ATTR_INVALID[] = "user.rgw.invalid";
 
 constexpr char DIRTY_BLOCK_PREFIX[] = "D#";
 constexpr char CACHE_DELIM = '#';
 
 namespace rgw { namespace cache {
 
-typedef std::function<void(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
+typedef std::function<void(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, 
                            time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-                           const rgw_obj_key& obj_key, optional_yield y)> ObjectDataCallback;
+                           const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)> ObjectDataCallback;
 
 typedef std::function<void(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version,
         bool dirty, optional_yield y, std::string& restore_val)> BlockDataCallback;
index 095925091dd9747148a6a09c528a3cd3df71fc26..d4f6438d4f41fde1a35292024a86df9648548ab9 100644 (file)
@@ -279,6 +279,7 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC
                     
                                         uint64_t len = 0, offset = 0;
                                         std::string localWeightStr;
+                                       std::string invalidStr;
                                         if (parts.size() == 2) {
                                             rgw::sal::Attrs attrs;
                                             get_attrs(dpp, file_entry.path(), attrs, null_yield);
@@ -340,8 +341,13 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC
                                                 ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): deleteMarker: " << deleteMarker << dendl;
                                             }
 
+                                            if (attrs.find(RGW_CACHE_ATTR_INVALID) != attrs.end()) {
+                                                invalidStr = attrs[RGW_CACHE_ATTR_INVALID].to_str();
+                                                ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): invalidStr: " << invalidStr << dendl;
+                                            }
+
                                             ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): calling func for: " << key << dendl;
-                                            obj_func(dpp, key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield);
+                                            obj_func(dpp, key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield, invalidStr);
                                             block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr);
                                             parsed = true;
                                         } //end-if part.size() == 2