]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: support for dirty objects in delete object method.
authorPritha Srivastava <prsrivas@redhat.com>
Wed, 25 Sep 2024 09:19:08 +0000 (14:49 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
1. ordered set to maintain versions of a dirty object
2. creation of delete marker in case of a simple delete request
3. deletion of a specific version from the ordered set
4. cleaning method deletes from ordered set for dirty objects
5. use of redis atomicity constructs wherever needed

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_cache_driver.h
src/rgw/rgw_ssd_driver.cc
src/test/rgw/test_d4n_directory.cc

index feee1a844723db15495f32cae0417fb9716129da..e7d481a29aa944a4be63f935b679c07a4ebe8d0a 100644 (file)
@@ -332,7 +332,7 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec
   }
 }
 
-int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y)
+int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi)
 {
   std::string key = build_index(object);
   try {
@@ -348,9 +348,11 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl
       return -ec.value();
     }
 
-    if (std::get<0>(resp).value() != "1") {
-      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
-      return -EINVAL;
+    if (!multi) {
+      if (std::get<0>(resp).value() != "1") {
+        ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+        return -EINVAL;
+      }
     }
 
   } catch (std::exception &e) {
@@ -409,11 +411,6 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object,
       return -ec.value();
     }
 
-    if (std::get<0>(resp).value().empty()) {
-      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
-      return -EINVAL;
-    }
-
     members = std::get<0>(resp).value();
 
   } catch (std::exception &e) {
@@ -424,7 +421,7 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object,
   return 0;
 }
 
-int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y)
+int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi)
 {
   std::string key = build_index(object);
   try {
@@ -440,9 +437,42 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const
       return -ec.value();
     }
 
-    if (std::get<0>(resp).value() != "1") {
-      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-      return -EINVAL;
+    if (!multi) {
+      if (std::get<0>(resp).value() != "1") {
+        ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+        return -EINVAL;
+      }
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi)
+{
+  std::string key = build_index(object);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZREMRANGEBYSCORE", key, std::to_string(min), std::to_string(max));
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (!multi) {
+      if (std::get<0>(resp).value() == "0") {
+        ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
+        return -EINVAL;
+      }
     }
 
   } catch (std::exception &e) {
@@ -839,7 +869,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block
   return 0;
 }
 
-int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y)
+int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi)
 {
   std::string key = build_index(block);
   try {
@@ -854,10 +884,11 @@ int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, doubl
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
     }
-
-    if (std::get<0>(resp).value() != "1") {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
-      return -EINVAL;
+    if (!multi) {
+      if (std::get<0>(resp).value() != "1") {
+        ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+        return -EINVAL;
+      }
     }
 
   } catch (std::exception &e) {
@@ -1076,4 +1107,32 @@ int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y)
   return 0;
 }
 
+int BlockDirectory::unwatch(const DoutPrefixProvider* dpp, optional_yield y)
+{
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("UNWATCH");
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "OK") {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
 } } // namespace rgw::d4n
index 1a262b38eb2d198f79aa5e5d2e0ff566f02f2820..8319ba93587a48bd64b271d26deff0b882e587c9 100644 (file)
@@ -48,10 +48,11 @@ class ObjectDirectory: public Directory {
     int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
     int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
     int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y);
-    int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y);
+    int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi=false);
     int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
-    int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y);
+    int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi=false);
+    int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi=false);
     //Return value is the incremented value, else return error
     int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
 
@@ -73,14 +74,16 @@ class BlockDirectory: public Directory {
     int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false);
     int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
     int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
-    int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y);
+    int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi=false);
     int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y);
     int watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+    //Move MULTI, EXEC and DISCARD to directory? As they do not operate on a key
     int exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y);
     int multi(const DoutPrefixProvider* dpp, optional_yield y);
     int discard(const DoutPrefixProvider* dpp, optional_yield y);
+    int unwatch(const DoutPrefixProvider* dpp, optional_yield y);
 
   private:
     std::shared_ptr<connection> conn;
index 4fd195e398f3f1c6c6d325a7be601e660179ab25..7a39a0ff5a8bcba6e5264ffcdef6db7013711442 100644 (file)
@@ -404,12 +404,12 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key,
   weightSum += ((localWeight < 0) ? 0 : localWeight);
 }
 
-void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
+void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
 {
   using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
   ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl;
   const std::lock_guard l(lfuda_cleaning_lock);
-  LFUDAObjEntry* e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
+  LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
   handle_type handle = object_heap.push(e);
   e->set_handle(handle);
   o_entries_map.emplace(key, e);
@@ -467,14 +467,14 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
       continue;
     }
     ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl;
-    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl;
+    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->delete_marker=" << e->delete_marker << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->version=" << e->version << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_name=" << e->bucket_name << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_id=" << e->bucket_id << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl;
     l.unlock();
-    if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
+    if (!e->key.empty() && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
       rgw_user c_rgw_user = e->user;
       //writing data to the backend
       //we need to create an atomic_writer
@@ -605,8 +605,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
       } while(fst < lst);
 
       cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
-      //data is clean now, updating in-memory metadata for an object
-      e->dirty = false;
+
       //invoke update() with dirty flag set to false, to update in-memory metadata for head
       this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
 
@@ -642,6 +641,63 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
             ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
         }
       }
+
+      //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets
+      if (!c_obj->have_instance()) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+        rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+          .objName = c_obj->get_name(),
+          .bucketName = c_obj->get_bucket()->get_bucket_id(),
+        };
+        ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+        if (ret < 0) {
+          ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+        }
+      } else {
+        rgw::d4n::CacheBlock latest_block = block;
+        latest_block.cacheObj.objName = c_obj->get_name();
+        //add watch on latest entry, as it can be modified by a put or a del
+        ret = blockDir->watch(dpp, &latest_block, y);
+        if (ret < 0) {
+          ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+        }
+        int retry = 3;
+        while(retry) {
+          retry--;
+          //get latest entry
+          ret = blockDir->get(dpp, &latest_block, y);
+          if (ret < 0) {
+            ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+          }
+          //start redis transaction using MULTI
+          blockDir->multi(dpp, y);
+          if (latest_block.version == e->version) {
+            //remove object entry from ordered set
+            if (c_obj->have_instance()) {
+              blockDir->del(dpp, &latest_block, y, true);
+              if (ret < 0) {
+                ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+              }
+            }
+          }
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+          rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+            .objName = c_obj->get_name(),
+            .bucketName = c_obj->get_bucket()->get_bucket_id(),
+          };
+          ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+          if (ret < 0) {
+            ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+          }
+          std::vector<std::string> responses;
+          ret = blockDir->exec(dpp, responses, y);
+          if (responses.empty()) {
+            ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
+            continue;
+          }
+          break;
+        }//end-while (retry)
+      }
       //remove entry from map and queue, erase_dirty_object locks correctly
       erase_dirty_object(dpp, e->key, null_yield);
     } else { //end-if std::difftime(time(NULL), e->creationTime) > interval
@@ -689,11 +745,11 @@ void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, ui
   entries_map.emplace(key, e);
 }
 
-void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
 const rgw_obj_key& obj_key, optional_yield y)
 {
   const std::lock_guard l(lru_lock);
-  ObjEntry* e = new ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key);
+  ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key);
   o_entries_map.emplace(key, e);
   return;
 }
index 8699e39a5a01591fd64c964dc2f0408319710167..e4c826aafefc569ab9412b5648699e9e2301e9d9 100644 (file)
@@ -39,7 +39,7 @@ class CachePolicy {
     struct ObjEntry {
       std::string key;
       std::string version;
-      bool dirty;
+      bool delete_marker;
       uint64_t size;
       time_t creationTime;
       rgw_user user;
@@ -48,9 +48,9 @@ class CachePolicy {
       std::string bucket_id;
       rgw_obj_key obj_key;
       ObjEntry() = default;
-      ObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, 
+      ObjEntry(const std::string& key, const std::string& version, bool delete_marker, uint64_t size,
                time_t creationTime, rgw_user user, const std::string& etag, 
-               const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), 
+               const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : key(key), version(version), delete_marker(delete_marker), size(size),
                                                                              creationTime(creationTime), user(user), etag(etag), 
                                                                              bucket_name(bucket_name), bucket_id(bucket_id), obj_key(obj_key) {}
     };
@@ -63,7 +63,7 @@ class CachePolicy {
     virtual int exist_key(std::string key) = 0;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
     virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0;
-    virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, 
+    virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, 
                            time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
                            const rgw_obj_key& obj_key, optional_yield y) = 0;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
@@ -112,9 +112,9 @@ class LFUDAPolicy : public CachePolicy {
       using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
       handle_type handle;
 
-      LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size,
-                     time_t creationTime, rgw_user user, const std::string& etag,
-                     const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size,
+      LFUDAObjEntry(const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
+                     time_t creationTime, rgw_user user, const std::string& etag, 
+                     const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, deleteMarker, size,
                                                                            creationTime, user, etag, bucket_name, bucket_id, obj_key) {}
 
       void set_handle(handle_type handle_) { handle = handle_; }
@@ -182,7 +182,7 @@ class LFUDAPolicy : public CachePolicy {
     virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     void save_y(optional_yield y) { this->y = y; }
-    virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, 
+    virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, 
                            time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
                            const rgw_obj_key& obj_key, optional_yield y) override;
     virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
@@ -215,7 +215,7 @@ class LRUPolicy : public CachePolicy {
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
     virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
-    virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, 
+    virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, 
                            time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
                            const rgw_obj_key& obj_key, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
index 2cb25ec6423c25beede3e19d306cc70801f06c8f..6626c4690e326e6dc803be2346c111e9c48ed146 100644 (file)
@@ -18,6 +18,8 @@
 
 namespace rgw { namespace sal {
 
+static constexpr uint8_t OBJ_INSTANCE_LEN = 32;
+
 static inline Bucket* nextBucket(Bucket* t)
 {
   if (!t)
@@ -233,7 +235,6 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
       if (dest_object->get_bucket()->versioned() && !dest_object->get_bucket()->versioning_enabled()) { //if versioning is suspended
         dest_version = "null";
       } else {
-        constexpr uint32_t OBJ_INSTANCE_LEN = 32;
         char buf[OBJ_INSTANCE_LEN + 1];
         gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
         dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
@@ -305,13 +306,13 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
       bufferlist bl;
       driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), dest_version, dirty, y);
       d4n_dest_object->set_object_version(dest_version);
-      ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, dirty);
+      ret = d4n_dest_object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
         return ret;
       }
       if (dirty) {
-        driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, true, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), y);
+        driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, false, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), y);
       }
     }
   }
@@ -519,16 +520,83 @@ int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_y
   return 0;
 }
 
-/* This method maintains adds the following entries:
+/* This method creates a delete marker for dirty objects:
+1. creates a head block entry in cache driver - so that data can be restored from this when rgw goes down
+2. calls set_head_obj_dir_entry to set block entries for a delete marker */
+int D4NFilterObject::create_delete_marker(const DoutPrefixProvider* dpp, optional_yield y)
+{
+  this->delete_marker = true;
+  if (this->get_bucket()->versioned() && !this->get_bucket()->versioning_enabled()) { //if versioning is suspended
+    this->version = "null";
+    this->set_instance("null");
+  } else {
+    char buf[OBJ_INSTANCE_LEN + 1];
+    gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
+    this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
+    this->set_instance(version);
+    ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): generating delete marker: " << version << dendl;
+  }
+
+  auto m_time = real_clock::now();
+
+  this->set_mtime(m_time);
+  this->set_accounted_size(0); //setting 0 as this is a delete marker
+  this->set_obj_size(0); // setting 0 as this is a delete marker
+  ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " size is: " << this->get_size() << dendl;
+  rgw::sal::Attrs attrs;
+  this->set_attrs_from_obj_state(dpp, y, attrs);
+  bufferlist bl_val;
+  bl_val.append(std::to_string(this->delete_marker));
+  attrs[RGW_CACHE_ATTR_DELETE_MARKER] = std::move(bl_val);
+  std::string key = get_cache_block_prefix(this, this->version, false);
+  std::string oid_in_cache = DIRTY_BLOCK_PREFIX + key;
+
+  bufferlist bl;
+  ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): key is: " << key << dendl;
+  auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+  if (ret == 0) {
+    ret = driver->get_cache_driver()->put(dpp, oid_in_cache, bl, 0, attrs, y);
+    if (ret == 0) {
+      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): version stored in update method is: " << version << dendl;
+      driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, true, y);
+      std::vector<std::string> exec_responses;
+      ret = this->set_head_obj_dir_entry(dpp, &exec_responses , y, true, true);
+      if (exec_responses.empty()) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Exec respones are empty, error occured!" << dendl;
+        driver->get_policy_driver()->get_cache_policy()->erase(dpp, key, y);
+        driver->get_cache_driver()->delete_data(dpp, oid_in_cache, y);
+        return -ERR_INTERNAL_ERROR;
+      }
+      if (ret < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
+        return ret;
+      }
+      auto creationTime = ceph::real_clock::to_time_t(this->get_mtime());
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): key=" << key << dendl;
+      std::string objEtag = "";
+      driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, true, this->get_accounted_size(), creationTime, std::get<rgw_user>(this->get_bucket()->get_owner()), objEtag, this->get_bucket()->get_name(), this->get_bucket()->get_bucket_id(), this->get_key(), y);
+    } else { //if get_cache_driver()->put()
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): put failed for oid_in_cache, ret=" << ret << " oid_in_cache: " << oid_in_cache << dendl;
+      return ret;
+    }
+  } else {
+    ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): eviction failed for oid_in_cache, ret=" << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+/*This method maintains adds the following entries:
 1. A hash entry that maintains the latest version for dirty objects (versioned and non-versioned) and non-versioned clean objects.
 2. A "null" hash entry that maintains the same version as the latest hash entry - this is used when get/delete requests are received
- for "null" versions, when bucket is non-versioned.
-3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended
-4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified */
-int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool is_latest_version, bool dirty)
+for "null" versions, when bucket is non-versioned.
+3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended.
+4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified
+5. Redis ordered set to maintain the order of dirty objects added for a version enabled bucket. Even when the bucket is non-versioned, this set maintains a "null" entry */
+int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::vector<std::string>* exec_responses, optional_yield y, bool is_latest_version, bool dirty)
 {
   ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl;
-  int ret = -1;
   rgw::d4n::CacheBlock block; 
   rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
   if (is_latest_version) {
@@ -541,6 +609,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio
     rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
       .objName = objName,
       .bucketName = this->get_bucket()->get_bucket_id(),
+      .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())),
       .dirty = dirty,
       .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
       };
@@ -549,59 +618,138 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio
     block.blockID = 0;
     block.version = this->get_object_version();
     block.size = 0;
+    block.deleteMarker = this->delete_marker;
 
-    rgw::d4n::CacheBlock latest = block;
-    ret = blockDir->get(dpp, &latest, y);
-    if (ret == -ENOENT) {
-      /* adding an entry to maintain latest version, to serve simple get requests (without any version)
-         but not for a clean object that belongs to a versioned bucket, as we will get the latest version from backend store
-         to simplify delete object (maintaining correct order of versions) */
-      if (dirty || (!dirty && !(this->get_bucket()->versioned()))) {
-        ret = blockDir->set(dpp, &block, y);
-        if (ret < 0) {
-          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
-          return ret;
-        }
-        /* bucket is non versioned, set a null instance
-           even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly
-           delete-obj with "null" as version-id deletes the latest version */
+    /* adding an entry to maintain latest version, to serve simple get requests (without any version)
+       but not for a clean object that belongs to a versioned bucket, as we will get the latest version from backend store
+       to simplify delete object (maintaining correct order of versions) */
+
+    //dirty objects
+    if (dirty) {
+    //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time.
+      blockDir->multi(dpp, y);
+      auto ret = blockDir->set(dpp, &block, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+        blockDir->discard(dpp, y);
+        return ret;
+      }
+      /* bucket is non versioned, set a null instance
+         even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly
+         delete-obj with "null" as version-id deletes the latest version */
+      if (!(this->get_bucket()->versioned())) {
         block.cacheObj.objName = "_:null_" + this->get_name();
         ret = blockDir->set(dpp, &block, y);
         if (ret < 0) {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
+          blockDir->discard(dpp, y);
           return ret;
         }
       }
-    } else if (ret < 0) {
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl;
-    } else { //head block is found
+      std::string object_version;
+      //add an entry to ordered set for both versioned and non versioned bucket
+      if (!this->get_bucket()->versioned() || !this->get_bucket()->versioning_enabled()) {
+        object_version = "null";
+      } else {
+        object_version = this->get_object_version();
+      }
+      auto mtime = this->get_mtime();
+      auto score = ceph::real_clock::to_time_t(mtime);
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: "  << std::setprecision(std::numeric_limits<double>::max_digits10) << score << ret << dendl;
+      rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir();
+      ret = objDir->zadd(dpp, &object, score, object_version, y, true);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl;
+        blockDir->discard(dpp, y);
+        return ret;
+      }
+      //execute redis transaction using EXEC
+      std::vector<std::string> responses;
+      ret = blockDir->exec(dpp, responses, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl;
+        return ret;
+      }
+      if (exec_responses) {
+        *exec_responses = responses;
+      }
+    } else { //for clean/non-dirty objects
+      rgw::d4n::CacheBlock latest = block;
+      auto ret = blockDir->get(dpp, &latest, y);
+      if (ret == -ENOENT) {
+        if (!(this->get_bucket()->versioned())) {
+          //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time.
+          blockDir->multi(dpp, y);
+          //we can explore pipelining to send the two 'HSET' commands together
+          ret = blockDir->set(dpp, &block, y);
+          if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+            blockDir->discard(dpp, y);
+            return ret;
+          }
+          //bucket is non versioned, set a null instance
+          block.cacheObj.objName = "_:null_" + this->get_name();
+          ret = blockDir->set(dpp, &block, y);
+          if (ret < 0) {
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
+            blockDir->discard(dpp, y);
+            return ret;
+          }
+          //execute redis transaction using EXEC
+          std::vector<std::string> responses;
+          ret = blockDir->exec(dpp, responses, y);
+          if (ret < 0) {
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl;
+            return ret;
+          }
+          if (exec_responses) {
+            *exec_responses = responses;
+          }
+        }
+      } else if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl;
+      } else { //head block is found
         /* for clean objects belonging to versioned buckets we will fetch the latest entry from backend store, hence removing latest head entry
            once a bucket transitions to a versioned state */
-        if (!dirty && this->get_bucket()->versioned()) {
+        if (this->get_bucket()->versioned()) {
           ret = blockDir->del(dpp, &block, y);
-          if (ret < 0) {
+          //Ignore a racing delete that could have deleted the latest block
+          if (ret < 0 && ret != -ENOENT) {
             ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory del method failed for head object with ret: " << ret << dendl;
           }
         }
         /* even if the head block is found, overwrite existing values with new version in case of non-versioned bucket, clean objects
            and versioned and non-versioned buckets dirty objects */
-        if (dirty || (!dirty && !(this->get_bucket()->versioned()))) {
+        if (!(this->get_bucket()->versioned())) {
+          //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time.
+          blockDir->multi(dpp, y);
           ret = blockDir->set(dpp, &block, y);
           if (ret < 0) {
             ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+            blockDir->discard(dpp, y);
             return ret;
           }
-          /* bucket is non versioned, set a null instance
-             even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly
-             delete-obj with "null" as version-id deletes the latest version */
+          //bucket is non versioned, set a null instance
           block.cacheObj.objName = "_:null_" + this->get_name();
           ret = blockDir->set(dpp, &block, y);
           if (ret < 0) {
             ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
+            blockDir->discard(dpp, y);
             return ret;
           }
-        }//end-if dirty || (!dirty && !(this->get_bucket()->versioned()))
-    }
+          //execute redis transaction using EXEC
+          std::vector<std::string> responses;
+          ret = blockDir->exec(dpp, responses, y);
+          if (ret < 0) {
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl;
+            return ret;
+          }
+          if (exec_responses) {
+            *exec_responses = responses;
+          }
+        }//end-if !(this->get_bucket()->versioned())
+      } //end-if ret = 0
+    } //end-else
   }//end-if latest-version
 
   /* An entry corresponding to each instance will be needed to locate the head block
@@ -616,6 +764,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio
     rgw::d4n::CacheObj version_object = rgw::d4n::CacheObj{
     .objName = objName,
     .bucketName = this->get_bucket()->get_bucket_id(),
+    .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())),
     .dirty = dirty,
     };
 
@@ -628,14 +777,14 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio
       .size = 0,
     };
 
-    ret = blockDir->set(dpp, &version_block, y);
+    auto ret = blockDir->set(dpp, &version_block, y);
     if (ret < 0) {
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for versioned head object with ret: " << ret << dendl;
       return ret;
     }
   }//end-if get_bucket_versioned()
 
-  return ret;
+  return 0;
 }
 
 int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty)
@@ -754,9 +903,6 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide
   //if the block corresponding to head object does not exist in directory, implies it is not cached
   if ((ret = blockDir->get(dpp, &block, y)) == 0) {
     blk = block;
-    if (block.deleteMarker) {
-      return false;
-    }
 
     std::string version;
     version = block.version;
@@ -787,6 +933,9 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide
     ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed, ret=" << ret << dendl;
   }
 
+  if (block.deleteMarker) {
+    found_in_cache = false;
+  }
   return found_in_cache;
 }
 
@@ -848,7 +997,7 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d
     if (ret == 0) {
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
       this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y);
-      ret = set_head_obj_dir_entry(dpp, y, is_latest_version);
+      ret = set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
       }
@@ -1022,7 +1171,7 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
       if (ret == 0) {
         ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
         source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
-        ret = source->set_head_obj_dir_entry(dpp, y, is_latest_version);
+        ret = source->set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version);
         if (ret < 0) {
           ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
         }
@@ -1867,19 +2016,18 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_from_cache_and_policy(const DoutP
 int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
                                                    optional_yield y, uint32_t flags)
 {
-  // TODO: 
-  // 1. Send delete request to cache nodes with remote copies
-  // 2. See if we can derive dirty flag from the head block 
-  // 3. Add lock so cleaning method doesn't remove "D_" prefix
+  // TODO: Send delete request to cache nodes with remote copies
 
   rgw::sal::Attrs attrs;
   std::string head_oid_in_cache;
   rgw::d4n::CacheBlock block;
   int ret = -1;
 
-  // check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker.
-  // As a result, the below check guarantees the head object is not in the cache.  
+  /* check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker.
+     As a result, the below check guarantees the head object is not in the cache. */
   if (!source->check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y) && !block.deleteMarker) {
+    /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a dlete marker there
+       since no object with source->get_name() will be found in the cache (and this is correct) */
     ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): head object not found; calling next->delete_obj" << dendl;
     next->params = params;
     ret = next->delete_obj(dpp, y, flags);
@@ -1887,18 +2035,28 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
     return ret;
   } else {
     bool objDirty = block.cacheObj.dirty;
-    auto blockDir = source->driver->get_block_dir(); 
+    auto blockDir = source->driver->get_block_dir();
+    auto objDir = source->driver->get_obj_dir();
     std::string policy_prefix = head_oid_in_cache;
     std::string version = source->get_object_version();
 
+    // call invalidate_object based on whether the object is dirty(objDirty)
+    //if the object is still dirty and has been marked invalid, then let objDirty be true, else set it to false
+    //remove code that deletes head/data block in this method.
+
+    std::string objName = source->get_name();
+    // special handling for name starting with '_'
+    if (objName[0] == '_') {
+      objName = "_" + source->get_name();
+    }
+
     if (objDirty) { // head object dirty flag represents object dirty flag
       policy_prefix.erase(0, 2); // remove "D_" prefix from policy key since the policy keys do not hold this information
     }    
-
-    // Versioned buckets - this will delete the head object indexed by version-id (even null)
+    // Versioned buckets - this will delete the head object indexed by version-id (even null) and latest en
     if (source->get_bucket()->versioned()) {
-        //1. clean objects - no latest head entry as latest entry to be retrieved from backend now
-        // hence delete only versioned head object
+        /1. clean objects - no latest head entry as latest entry to be retrieved from backend now
+           hence delete only versioned head object */
         if (!objDirty) {
           if (source->have_instance()) {
             if ((ret = blockDir->del(dpp, &block, y)) < 0) {
@@ -1906,29 +2064,177 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
               return ret;
             }
           }
-          // if versioning is suspended, we might have a latest head entry created from when bucket was non-versioned
-          // don't return error as that could already be deleted by set_head_obj_dir_entry
+          /* if versioning is suspended, we might have a latest head entry created from when bucket was non-versioned
+             don't return error as that could already be deleted by set_head_obj_dir_entry */
           if (!source->get_bucket()->versioning_enabled()) {
-            block.cacheObj.objName = source->get_name();
+            block.cacheObj.objName = objName;
             if ((ret = blockDir->del(dpp, &block, y)) < 0) {
               ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
             }
           }
-        } else if (objDirty) { //2. dirty objects - TBD for now
-          if ((ret = blockDir->del(dpp, &block, y)) == 0) { // delete head object
-            if (objDirty) {
-        if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
-          return ret;
-        }
-            }
-          } else if (ret < 0 && ret != -ENOENT) {
-            ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+        } else if (objDirty) { //2. dirty objects - 1. add delete marker for simple request 2. delete version if given and correctly promote latest version if needed
+          bool transaction_success = false;
+          //add watch on latest entry, as it can be modified by a put or another del
+          rgw::d4n::CacheBlock latest_block = block;
+          latest_block.cacheObj.objName = objName;
+          ret = blockDir->watch(dpp, &latest_block, y);
+          if (ret < 0) {
+            ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
             return ret;
           }
-        }
+          int retry = 3;
+          while(retry) {
+            retry--;
+            //get latest entry
+            ret = blockDir->get(dpp, &latest_block, y);
+            if (ret < 0) {
+              ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+              blockDir->unwatch(dpp, y);
+              return ret;
+            }
+            //simple delete request with no version id - create a delete marker
+            if (block.cacheObj.objName == objName) {
+              /* we are checking for latest_block and not block because latest_block has the most updated value of latest hash entry
+                 if existing latest entry is already a delete marker, do not create a new one and simply return */
+              if (!latest_block.deleteMarker) {
+                ret = source->create_delete_marker(dpp, y);
+                if (ret < 0) {
+                  ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to create a delete marker for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+                  //ERR_INTERNAL_ERROR is returned when exec_responses are empty which means the watched key has been modified, hence retry
+                  if (ret == -ERR_INTERNAL_ERROR) {
+                    continue;
+                  } else {
+                    blockDir->unwatch(dpp, y);
+                    return ret;
+                  }
+                }
+                if (ret >= 0) {
+                  result.delete_marker = true;
+                  result.version_id = source->get_instance();
+                  transaction_success = true;
+                  return 0;
+                }
+              }
+              //unwatch the key (latest entry), as it is already a delete marker and we won't do anything
+              ret = blockDir->unwatch(dpp, y);
+              if (ret < 0) {
+                ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+                return ret;
+              }
+              transaction_success = true;
+              return 0;
+            } else { //not a simple request, delete version requested
+              //get latest entry ret is 0
+              if (ret == 0) {
+                rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+                  .objName = objName,
+                  .bucketName = source->get_bucket()->get_bucket_id(),
+                };
+                bool startmulti = false;
+                //check if version to be deleted is the same as latest version
+                if (latest_block.version == block.version) {
+                  std::vector<std::string> members;
+                  //get the second latest version
+                  ret = objDir->zrevrange(dpp, &dir_obj, 0, 1, members, y);
+                  if (ret < 0) {
+                    ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the second latest version for: " << dir_obj.objName << ", ret=" << ret << dendl;
+                    blockDir->unwatch(dpp, y);
+                    return ret;
+                  }
+                  //if there is a second latest version
+                  if (members.size() == 2) {
+                    rgw::d4n::CacheBlock version_block = latest_block;
+                    version_block.cacheObj.objName = "_:" + members[1] + "_" + source->get_name();
+                    //add watch on the second latest versioned entry also as it might be modified by another del
+                    ret = blockDir->watch(dpp, &version_block, y);
+                    if (ret < 0) {
+                      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << version_block.cacheObj.objName << ", ret=" << ret << dendl;
+                      blockDir->unwatch(dpp, y);
+                      return ret;
+                    }
+                    //get versioned entry
+                    ret = blockDir->get(dpp, &version_block, y);
+                    if (ret < 0) {
+                      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the versioned entry for: " << version_block.cacheObj.objName << ", ret=" << ret << dendl;
+                      blockDir->unwatch(dpp, y);
+                      return 0;
+                    }
+                    //start redis transaction using MULTI
+                    blockDir->multi(dpp, y);
+                    startmulti = true;
+                    //set versioned entry as the latest entry
+                    version_block.cacheObj.objName = latest_block.cacheObj.objName;
+                    ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): INFO: promoting latest version entry to version: " << version_block.version << ", ret=" << ret << dendl;
+                    ret = blockDir->set(dpp, &version_block, y);
+                    if (ret < 0) {
+                      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to set new latest entry for: " << version_block.cacheObj.objName << ", ret=" << ret << dendl;
+                      blockDir->discard(dpp, y);
+                      return 0;
+                    }
+                  } else { // there are no more versions left
+                    //start redis transaction using MULTI
+                    blockDir->multi(dpp, y);
+                    startmulti = true;
+                    //delete latest block entry
+                    ret = blockDir->del(dpp, &latest_block, y, true);
+                    if (ret < 0) {
+                      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete latest entry in block directory, when it is the same as version requested, for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+                      blockDir->discard(dpp, y);
+                      return ret;
+                    }
+                  }
+                } //end-if latest_block.version == block.version
+                //delete versioned entry (handles delete markers also)
+                if (!startmulti) {
+                  //start redis transaction using MULTI
+                  blockDir->multi(dpp, y);
+                }
+                if ((ret = blockDir->del(dpp, &block, y, true)) == 0) {
+                  if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
+                    blockDir->discard(dpp, y);
+                    return ret;
+                  }
+                } else if (ret < 0 && ret != -ENOENT) {
+                  ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+                  blockDir->discard(dpp, y);
+                  return ret;
+                }
+                //delete entry from ordered set
+                std::string version = source->get_instance();
+                ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl;
+                ret = objDir->zrem(dpp, &dir_obj, version, y, true);
+                if (ret < 0) {
+                  ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
+                  blockDir->discard(dpp, y);
+                  return ret;
+                }
+                std::vector<std::string> responses;
+                ret = blockDir->exec(dpp, responses, y);
+                if (responses.empty()) {
+                  ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
+                  continue;
+                }
+                if (ret < 0) {
+                  ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl;
+                  return ret;
+                }
+                result.delete_marker = block.deleteMarker;
+                result.version_id = version;
+                //success, hence break from loop
+                transaction_success = true;
+                break;
+              }
+            } //end-else (simple request)
+          } //end-while retry
+          if (!transaction_success) {
+            ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Redis transaction failed after retrying! " << dendl;
+            return -ERR_INTERNAL_ERROR;
+          }
+        } //end-if objDirty
     } //end-if versioned buckets
 
-    //Non-versioned buckets - we will delete the latest entry and the "null" entry
+    /* Non-versioned buckets - we will delete the latest entry and the "null" entry
+       dirty objects - delete "null" entry from ordered set also */
     if (!source->get_bucket()->versioned()) {
       if ((ret = blockDir->del(dpp, &block, y)) == 0) {
         if (objDirty) {
@@ -1941,7 +2247,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
         return ret;
       }
       //if we get request for latest head entry, delete the null block and vice versa
-      if (block.cacheObj.objName == source->get_name()) {
+      if (block.cacheObj.objName == objName) {
         block.cacheObj.objName = "_:null_" + source->get_name();
       } else {
         block.cacheObj.objName = source->get_name();
@@ -1950,6 +2256,30 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
         ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
         return ret;
       }
+      //dirty objects - delete from ordered set
+      if (objDirty) {
+        rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+          .objName = source->get_name(),
+          .bucketName = source->get_bucket()->get_bucket_id(),
+        };
+        ret = objDir->zrem(dpp, &dir_obj, "null", y, true);
+        if (ret < 0) {
+          ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
+          blockDir->discard(dpp, y);
+          return ret;
+        }
+      }
+      std::vector<std::string> responses;
+      ret = blockDir->exec(dpp, responses, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl;
+        return ret;
+      }
+      if (objDirty) {
+        if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
+          return ret;
+        }
+      }
     } //end-if non-versioned buckets
 
     int size;
@@ -1968,41 +2298,41 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
     }
     ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Size of object is: " << size << dendl;
 
-    // delete data blocks, when,
-    // 1. object is clean, bucket is versioned and there is an instance in the request
-    // 2. object is clean, bucket is non-versioned
-    // 3. object is dirty - TBD
+    /* delete data blocks directory entries, when,
+       1. object is clean, bucket is versioned and there is an instance in the request
+       2. object is clean, bucket is non-versioned
+       3. object is dirty - delete blocks in cache also except for delete markers */
     if ((!objDirty && source->get_bucket()->versioned() && source->have_instance()) ||
         (!objDirty && !source->get_bucket()->versioned()) ||
-        objDirty) {
-      off_t lst = size;
-      off_t fst = 0;
+        (objDirty && !block.deleteMarker)) {
+        off_t lst = size;
+        off_t fst = 0;
 
-      do { // loop through the data blocks
-        std::string prefix = get_cache_block_prefix(source, version, false);
-        if (fst >= lst) {
-          break;
-        }
-        //data blocks have cacheObj.objName set to oid always
-        block.cacheObj.objName = source->get_oid();
-        off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
-        off_t cur_len = cur_size - fst;
-        block.blockID = static_cast<uint64_t>(fst);
-        block.size = static_cast<uint64_t>(cur_len);
-
-        if ((ret = blockDir->get(dpp, &block, y)) < 0) {
-          if (ret == -ENOENT) {
-            ldpp_dout(dpp, 0) << "Directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << " does not exist; continuing" << dendl;
-            fst += cur_len;
-            if (fst >= lst) {
-              break;
+        do { // loop through the data blocks
+          std::string prefix = get_cache_block_prefix(source, version, false);
+          if (fst >= lst) {
+            break;
+          }
+          //data blocks have cacheObj.objName set to oid always
+          block.cacheObj.objName = source->get_oid();
+          off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+          off_t cur_len = cur_size - fst;
+          block.blockID = static_cast<uint64_t>(fst);
+          block.size = static_cast<uint64_t>(cur_len);
+
+          if ((ret = blockDir->get(dpp, &block, y)) < 0) {
+            if (ret == -ENOENT) {
+              ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Directory entry for: " << source->get_oid() << " blockid: " << fst << " block size: " << cur_len << " does not exist; continuing" << dendl;
+              fst += cur_len;
+              if (fst >= lst) {
+                break;
+              }
+              continue;
+            } else {
+              ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to retrieve directory entry for: " << source->get_oid() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl;
+              return ret;
             }
-            continue;
-          } else {
-            ldpp_dout(dpp, 10) << "Failed to retrieve directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl;
-            return ret;
           }
-        }
 
         if ((ret = blockDir->del(dpp, &block, y)) == 0) {
           prefix = DIRTY_BLOCK_PREFIX + prefix;
@@ -2052,14 +2382,19 @@ int D4NFilterWriter::prepare(optional_yield y)
     ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling next->prepare" << dendl;
     return next->prepare(y);
   } else {
-    //for non-versioned buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted
+    //for non-versioned buckets or version suspended buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted
     //alternatively, we could add logic to delete this lazily
-    if (!object->get_bucket()->versioned()) {
+    if (!object->get_bucket()->versioned() || (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled())) {
       std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = object->get_delete_op();
+      if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) {
+        del_op->params.null_verid = true;
+        object->set_instance("null");
+      }
       auto ret = del_op->delete_obj(dpp, y, rgw::sal::FLAG_LOG_OP);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): delete_obj failed, ret=" << ret << dendl;
       }
+      object->clear_instance();
     }
   }
 
@@ -2068,7 +2403,6 @@ int D4NFilterWriter::prepare(optional_yield y)
     if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended
       object->set_instance("null");
     }
-    constexpr uint32_t OBJ_INSTANCE_LEN = 32;
     char buf[OBJ_INSTANCE_LEN + 1];
     gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
     version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
@@ -2205,7 +2539,6 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
 
     dirty = true;
     ceph::real_time m_time;
-    dirty = true;
     if (mtime) {
       if (real_clock::is_zero(*mtime)) {
         *mtime = real_clock::now();
@@ -2266,15 +2599,16 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
     if (ret == 0) {
       ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl;
       driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
-      ret = object->set_head_obj_dir_entry(dpp, y, true, dirty);
+      ret = object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
         return ret;
       }
       if (dirty) {
         auto creationTime = ceph::real_clock::to_time_t(object->get_mtime());
-        ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): key=" << key << dendl;
-        driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y);
+        ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): key=" << key << dendl;
+        ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): obj->get_key()=" << obj->get_key() << dendl;
+        driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, false, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y);
       }
     } else { //if get_cache_driver()->put()
       ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): put failed for head_oid_in_cache, ret=" << ret << dendl;
@@ -2331,7 +2665,7 @@ int D4NFilterMultipartUpload::complete(const DoutPrefixProvider *dpp,
     if (ret == 0) {
       ldpp_dout(dpp, 20) << "D4NFilterMultipartUpload::" << __func__ << " version stored in update method is: " << d4n_target_obj->get_object_version() << dendl;
       driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
-      ret = d4n_target_obj->set_head_obj_dir_entry(dpp, y, true);
+      ret = d4n_target_obj->set_head_obj_dir_entry(dpp, nullptr, y, true);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
       }
index a135aef3151b2bcc78b1a27e2981f1c938656554..85be4675cf4f283b978c9e2dc07cfa85a93db790 100644 (file)
@@ -131,6 +131,7 @@ class D4NFilterObject : public FilterObject {
     rgw::sal::Object* dest_object{nullptr}; //for copy-object
     rgw::sal::Bucket* dest_bucket{nullptr}; //for copy-object
     bool multipart{false};
+    bool delete_marker{false};
 
   public:
     struct D4NFilterReadOp : FilterReadOp {
@@ -265,7 +266,7 @@ class D4NFilterObject : public FilterObject {
     int get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y);
     void set_attrs_from_obj_state(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
     int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, rgw::sal::Attrs& attrs);
-    int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool is_latest_version = true, bool dirty = false);
+    int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::vector<std::string>* exec_responses, optional_yield y, bool is_latest_version = true, bool dirty = false);
     int set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty = false);
     int delete_data_block_cache_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty = false);
     bool check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, rgw::d4n::CacheBlock& blk, optional_yield y);
@@ -273,6 +274,8 @@ class D4NFilterObject : public FilterObject {
     rgw::sal::Object* get_destination_object(const DoutPrefixProvider* dpp) { return dest_object; }
     bool is_multipart() { return multipart; }
     int set_attr_crypt_parts(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
+    int create_delete_marker(const DoutPrefixProvider* dpp, optional_yield y);
+    bool is_delete_marker() { return delete_marker; }
 };
 
 class D4NFilterWriter : public FilterWriter {
index c3055be71a86a02e162897996fd97b8ab48e491a..6d416bc20d9bb5256d9c9035b0e765357826a81e 100644 (file)
@@ -13,6 +13,7 @@ constexpr char RGW_CACHE_ATTR_BUCKET_NAME[] = "user.rgw.bucket_name";
 constexpr char RGW_CACHE_ATTR_VERSION_ID[] = "user.rgw.version_id";
 constexpr char RGW_CACHE_ATTR_SOURC_ZONE[] = "user.rgw.source_zone";
 constexpr char RGW_CACHE_ATTR_LOCAL_WEIGHT[] = "user.rgw.localWeight";
+constexpr char RGW_CACHE_ATTR_DELETE_MARKER[] = "user.rgw.deleteMarker";
 
 constexpr char DIRTY_BLOCK_PREFIX[] = "D#";
 constexpr char CACHE_DELIM = '#';
index 01b913307eb4b25aae9cdc5b18c23bc64523b158..095925091dd9747148a6a09c528a3cd3df71fc26 100644 (file)
@@ -287,6 +287,7 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC
                                             time_t creationTime = time_t(nullptr);
                                             rgw_user user;
                                             rgw_obj_key obj_key;
+                                            bool deleteMarker = false;
                                             if (attrs.find(RGW_ATTR_ETAG) != attrs.end()) {
                                                 etag = attrs[RGW_ATTR_ETAG].to_str();
                                                 ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): etag: " << etag << dendl;
@@ -333,8 +334,14 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC
                                                 ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl;
                                             }
 
+                                            if (attrs.find(RGW_CACHE_ATTR_DELETE_MARKER) != attrs.end()) {
+                                                std::string deleteMarkerStr = attrs[RGW_CACHE_ATTR_LOCAL_WEIGHT].to_str();
+                                                deleteMarker = (deleteMarkerStr == "1") ? true : false;
+                                                ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): deleteMarker: " << deleteMarker << dendl;
+                                            }
+
                                             ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): calling func for: " << key << dendl;
-                                            obj_func(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield);
+                                            obj_func(dpp, key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield);
                                             block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr);
                                             parsed = true;
                                         } //end-if part.size() == 2
index 310071db5bf0beb7cdf1bb9a6fdb9a614ff606b2..c75c0f0610b94386f4c02d1227caf7342a7b57d6 100644 (file)
@@ -707,12 +707,20 @@ TEST_F(BlockDirectoryFixture, MultiExecuteYield)
         ASSERT_EQ((bool)ec, false);
         std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
       }
+      {
+        request req;
+        response<std::string> resp;
+        req.push("ZADD", "key4", "1", "v1");                  // Command 3
+        conn->async_exec(req, resp, yield[ec]);
+        ASSERT_EQ((bool)ec, false);
+        std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl;
+      }
       {
         request req;
         //string as response here as the command is only getting queued, not executed
         //if response type is changed to int then the operation fails
         response<std::string> resp;
-        req.push("DEL", "key3");                  // Command 3
+        req.push("DEL", "key3");                  // Command 4
         conn->async_exec(req, resp, yield[ec]);
         ASSERT_EQ((bool)ec, false);
         std::cout << "DEL value: " << std::get<0>(resp).value() << std::endl;
@@ -738,6 +746,8 @@ TEST_F(BlockDirectoryFixture, MultiExecuteYield)
       ASSERT_EQ(0, dir->set(env->dpp, block, yield));
       block->cacheObj.objName = "testBlockA";
       ASSERT_EQ(0, dir->del(env->dpp, block, yield, true));
+      block->cacheObj.objName = "testBlockB";
+      ASSERT_EQ(0, dir->zadd(env->dpp, block, 100, "version1", yield, true));
       std::vector<std::string> responses;
       ASSERT_EQ(0, dir->exec(env->dpp, responses, optional_yield{yield}));
       for (auto r : responses) {