]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: using 'id_tag' as 'version' of an object for non-versioned
authorPritha Srivastava <prsrivas@redhat.com>
Wed, 22 Nov 2023 07:50:34 +0000 (13:20 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:51 +0000 (21:24 +0530)
objects. Using 'instance' of 'oid' as 'version' for
versioned objects.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 622d91dac02518a6fa76d34ff8ed1f23dad105f2..edb5efcca68b9921a61611f415807d233fd29163 100644 (file)
@@ -421,6 +421,21 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
   }
   }
 
+  //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
+  if (! this->source->have_instance()) {
+    RGWObjState* state = nullptr;
+    if (this->source->get_obj_state(dpp, &state, y) == 0) {
+      auto it = state->attrset.find(RGW_ATTR_ID_TAG);
+      if (it != state->attrset.end()) {
+        bufferlist bl = it->second;
+        this->source->set_object_version(bl.c_str());
+        ldpp_dout(dpp, 20) << __func__ << "id tag version is: " << this->source->get_object_version() << dendl;
+      } else {
+        ldpp_dout(dpp, 20) << __func__ << "Failed to find id tag" << dendl;
+      }
+    }
+  }
+
   return ret;
 }
 
@@ -478,12 +493,20 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
                         RGWGetDataCB* cb, optional_yield y) 
 {
   const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
-  std::string oid = source->get_key().get_oid();
+  std::string version = source->get_object_version();
+  std::string prefix;
+  if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+    prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid();
+  } else {
+    prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid();
+  }
 
-  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl;
+  ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "prefix: " << prefix << dendl;
+  ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl;
 
   this->client_cb = cb;
-  this->cb->set_client_cb(cb, dpp, &y); // what's this for? -Sam
+  this->cb->set_client_cb(cb, dpp, &y);
+  this->cb->set_prefix(prefix);
 
   /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
      One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
@@ -525,7 +548,12 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     }
 
     ceph::bufferlist bl;
-    std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+    std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+    if (version.empty()) {
+      version = source->get_instance();
+    }
+
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl;
 
     ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
     " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
@@ -534,7 +562,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       // Read From Cache
       auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
 
-      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", y);
+      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, version, y);
 
       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
@@ -546,7 +574,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
         return r;
       }
     } else {
-      oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+      oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
       //for ranged requests, for last part, the whole part might exist in the cache
        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
       " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
@@ -555,7 +583,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
         // Read From Cache
         auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
 
-       source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", y);
+             source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y);
 
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
@@ -604,11 +632,8 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     this->cb->bypass_cache_write();
   }
 
-  if (start_part_num == 0) {
-    this->cb->set_ofs(ofs);
-  } else {
-    this->cb->set_ofs(adjusted_start_ofs);
-    ofs = adjusted_start_ofs; // redundant? -Sam
+  if (start_part_num != 0) {
+    ofs = adjusted_start_ofs;
   }
 
   this->cb->set_ofs(ofs);
@@ -643,28 +668,58 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
   //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
   if (write_to_cache) {
     const std::lock_guard l(d4n_get_data_lock);
-    rgw::d4n::CacheBlock block;
+    rgw::d4n::CacheBlock block, existing_block;
     rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
-    block.version = ""; // TODO: initialize correctly 
     block.hostsList.push_back(blockDir->cct->_conf->rgw_local_cache_address); 
     block.cacheObj.objName = source->get_key().get_oid();
     block.cacheObj.bucketName = source->get_bucket()->get_name();
     block.cacheObj.creationTime = to_iso_8601(source->get_mtime()); 
     block.cacheObj.dirty = false;
+
+    //populating fields needed for building directory index
+    existing_block.cacheObj.objName = block.cacheObj.objName;
+    existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
     Attrs attrs; // empty attrs for cache sets
+    std::string version = source->get_object_version();
+    if (version.empty()) {
+      version = source->get_instance();
+    }
+    ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
-      std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
-      block.blockID = ofs; 
-      block.size = bl.length();
-      if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
-        if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) {
-               filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
-
-               /* Store block in directory */
-          if (!blockDir->exist_key(&block, *y)) { // If the block exists, do we want to update anything else? -Sam
-            if (blockDir->set(&block, *y) < 0) 
-                   ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+      std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+        block.blockID = ofs;
+        block.size = bl.length();
+        block.version = version;
+        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+        if (ret == 0) {
+          //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
+          ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+          if (ret == 0) {
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+
+      /* Store block in directory */
+            if (!blockDir->exist_key(&block, *y)) {
+              if (blockDir->set(&block, *y) < 0) //should we revert previous steps if this step fails?
+          ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+            } else {
+              existing_block.blockID = block.blockID;
+              existing_block.size = block.size;
+              if (blockDir->get(&existing_block, *y) < 0) {
+                ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+              } else {
+                if (existing_block.version != block.version) {
+                  if (blockDir->del(&existing_block, *y) < 0) //delete existing block
+                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+                  if (blockDir->set(&block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+                } else {
+                if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
+                  ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+                }
+              }
+            }
           } else {
             if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
@@ -672,18 +727,38 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         }
       }
     } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
-      std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+      std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
       ofs += bl_len;
       block.blockID = ofs;
       block.size = bl.length();
-      if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
-        if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) {
-               filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
-
-          /* Store block in directory */
-          if (!blockDir->exist_key(&block, *y)) {
-            if (blockDir->set(&block, *y) < 0) 
-                   ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+      block.version = version;
+      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+        if (ret == 0) {
+          ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+          if (ret == 0) {
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+
+            /* Store block in directory */
+            if (!blockDir->exist_key(&block, *y)) {
+              if (blockDir->set(&block, *y) < 0)
+          ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+            } else {
+              existing_block.blockID = block.blockID;
+              existing_block.size = block.size;
+              if (blockDir->get(&existing_block, *y) < 0) {
+                ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+              }
+              if (existing_block.version != block.version) {
+                if (blockDir->del(&existing_block, *y) < 0)
+                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+                  if (blockDir->set(&block, *y) < 0)
+                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+              } else {
+                if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
+                  ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl;
+              }
+            }
           } else {
             if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
                    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
@@ -699,31 +774,59 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       bl_rem.claim_append(bl_copy);
 
       if (bl_rem.length() == rgw_get_obj_max_req_size) {
-        std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
-        ofs += bl_rem.length();
-        block.blockID = ofs;
-        block.size = bl_rem.length();
-        if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
-          if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs) == 0) {
-                 filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", *y);
-
-                 /* Store block in directory */
-            if (!blockDir->exist_key(&block, *y)) {
-              if (blockDir->set(&block, *y) < 0) 
-                ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+        std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+          if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+          ofs += bl_rem.length();
+          block.blockID = ofs;
+          block.size = bl_rem.length();
+          block.version = version;
+          auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
+          if (ret == 0) {
+            ret = filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs);
+            if (ret == 0) {
+              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y);
+
+              /* Store block in directory */
+              if (!blockDir->exist_key(&block, *y)) {
+                if (blockDir->set(&block, *y) < 0)
+                  ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+                } else {
+                  existing_block.blockID = block.blockID;
+                  existing_block.size = block.size;
+                  if (blockDir->get(&existing_block, *y) < 0) {
+                    ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+                  } else {
+                    if (existing_block.version != block.version) {
+                      if (blockDir->del(&existing_block, *y) < 0)
+                        ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+                      if (blockDir->set(&block, *y) < 0)
+                        ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+                    } else {
+                    if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
+                      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+                    }
+                  }
+                }
             } else {
-              if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
-                ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
-            } 
+              ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured while caching oid: " << oid << " error: " << ret << dendl;
+            }
+          } else {
+            ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured during eviction: " << " error: " << ret << dendl;
           }
         }
 
         bl_rem.clear();
         bl_rem = std::move(bl);
-      }
+      }//bl_rem.length()
     }
   }
 
+  /* Clean-up:
+  1. do we need to clean up older versions of the cache backend, when we update version in block directory?
+  2. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
+  3. do we need to revert the cache ops, in case the directory ops fail
+  */
+
   return 0;
 }
 
index fc7f844ee7c779a807581b67fbd0b9e201296656..5f785fb65cb91a781e7d75bb693ac3cf9d30dc50 100644 (file)
@@ -97,6 +97,7 @@ class D4NFilterBucket : public FilterBucket {
 class D4NFilterObject : public FilterObject {
   private:
     D4NFilterDriver* driver;
+    std::string version;
 
   public:
     struct D4NFilterReadOp : FilterReadOp {
@@ -104,7 +105,7 @@ class D4NFilterObject : public FilterObject {
        class D4NFilterGetCB: public RGWGetDataCB {
          private:
            D4NFilterDriver* filter;
-           std::string oid;
+           std::string prefix;
            D4NFilterObject* source;
            RGWGetDataCB* client_cb;
            uint64_t ofs = 0, len = 0;
@@ -116,8 +117,8 @@ class D4NFilterObject : public FilterObject {
            optional_yield* y;
 
          public:
-           D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid, D4NFilterObject* _source) : filter(_filter), 
-                                                                                                   oid(_oid), source(_source) {}
+           D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter),
+                                                                                                       source(_source) {}
 
            int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
            void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp, optional_yield* y) { 
@@ -126,6 +127,7 @@ class D4NFilterObject : public FilterObject {
               this->y = y;
             }
            void set_ofs(uint64_t ofs) { this->ofs = ofs; }
+      void set_prefix(const std::string& prefix) { this->prefix = prefix; }
            int flush_last_part();
            void bypass_cache_write() { this->write_to_cache = false; }
        };
@@ -135,9 +137,8 @@ class D4NFilterObject : public FilterObject {
        D4NFilterReadOp(std::unique_ptr<ReadOp> _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)),
                                                                                   source(_source) 
         {
-         std::string oid = source->get_bucket()->get_marker() + "_" + source->get_key().get_oid();
-          cb = std::make_unique<D4NFilterGetCB>(source->driver, oid, source); 
-       }
+          cb = std::make_unique<D4NFilterGetCB>(source->driver, source);
+             }
        virtual ~D4NFilterReadOp() = default;
 
        virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
@@ -202,6 +203,9 @@ class D4NFilterObject : public FilterObject {
 
     virtual std::unique_ptr<ReadOp> get_read_op() override;
     virtual std::unique_ptr<DeleteOp> get_delete_op() override;
+
+    void set_object_version(const std::string& version) { this->version = version; }
+    const std::string get_object_version() { return this->version; }
 };
 
 class D4NFilterWriter : public FilterWriter {