]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: implementation of copyObject.
authorPritha Srivastava <prsrivas@redhat.com>
Thu, 9 May 2024 07:19:40 +0000 (12:49 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 1354c3418e900a11928aa39590c32eb542c47b6a..cf8095b207abc2639529cbeae95b2acbbc5edda6 100644 (file)
@@ -424,7 +424,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
     uint64_t len = 0;
     rgw::sal::Attrs obj_attrs;
   
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock." << dendl;
+    ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock" << dendl;
     std::unique_lock<std::mutex> l(lfuda_cleaning_lock);
     LFUDAObjEntry* e;
     if (object_heap.size() > 0) {
index ae30e0891cc0ed47b28db845554f2fbbca26f182..df8b3f5513a729bf59ff637fa11f0c746cec9a88 100644 (file)
@@ -100,6 +100,155 @@ int D4NFilterBucket::create(const DoutPrefixProvider* dpp,
   return next->create(dpp, params, y);
 }
 
+int D4NFilterObject::copy_object(const ACLOwner& owner,
+                              const rgw_user& remote_user,
+                              req_info* info,
+                              const rgw_zone_id& source_zone,
+                              rgw::sal::Object* dest_object,
+                              rgw::sal::Bucket* dest_bucket,
+                              rgw::sal::Bucket* src_bucket,
+                              const rgw_placement_rule& dest_placement,
+                              ceph::real_time* src_mtime,
+                              ceph::real_time* mtime,
+                              const ceph::real_time* mod_ptr,
+                              const ceph::real_time* unmod_ptr,
+                              bool high_precision_time,
+                              const char* if_match,
+                              const char* if_nomatch,
+                              AttrsMod attrs_mod,
+                              bool copy_if_newer,
+                              Attrs& attrs,
+                              RGWObjCategory category,
+                              uint64_t olh_epoch,
+                              boost::optional<ceph::real_time> delete_at,
+                              std::string* version_id,
+                              std::string* tag,
+                              std::string* etag,
+                              void (*progress_cb)(off_t, void *),
+                              void* progress_data,
+                              const DoutPrefixProvider* dpp,
+                              optional_yield y)
+{
+  if (g_conf()->d4n_writecache_enabled) {
+    this->dest_object = dest_object;
+    this->dest_bucket = dest_bucket;
+
+    if (!dest_object->have_instance()) {
+      if (dest_object->get_bucket()->versioned() && !dest_object->get_bucket()->versioning_enabled()) { //if versioning is suspended
+        this->dest_version = "null";
+      } else {
+        constexpr uint32_t OBJ_INSTANCE_LEN = 32;
+        char buf[OBJ_INSTANCE_LEN + 1];
+        gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
+        this->dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): generating version: " << version << dendl;
+      }
+    } else {
+      this->dest_version = dest_object->get_instance();
+    }
+
+    std::unique_ptr<rgw::sal::Object::ReadOp> read_op(this->get_read_op());
+    if (auto ret = read_op->prepare(y, dpp); ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): prepare method failed with ret: " << ret << dendl;
+      return ret;
+    }
+    if (auto ret = read_op->iterate(dpp, 0, (this->get_size() - 1), nullptr, y); ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): iterate method failed with ret: " << ret << dendl;
+      return ret;
+    }
+
+    rgw::sal::Attrs baseAttrs;
+    if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
+      baseAttrs = attrs;
+    } else {
+      baseAttrs = this->get_attrs();
+    }
+
+    if (attrs_mod == rgw::sal::ATTRSMOD_REPLACE) { /* Replace */
+      rgw::sal::Attrs::iterator iter;
+
+      for (const auto& pair : attrs) {
+        iter = baseAttrs.find(pair.first);
+      
+        if (iter != baseAttrs.end()) {
+          iter->second = pair.second;
+        } else {
+          baseAttrs.insert({pair.first, pair.second});
+        }
+      }
+    } else if (attrs_mod == rgw::sal::ATTRSMOD_MERGE) { /* Merge */
+      baseAttrs.insert(attrs.begin(), attrs.end()); 
+    }
+  
+    ceph::real_time dest_mtime;
+    if (mtime) {
+      if (real_clock::is_zero(*mtime)) {
+        *mtime = real_clock::now();
+      }
+      dest_mtime = *mtime;
+    } else {
+      dest_mtime = real_clock::now();
+    }
+    dest_object->set_mtime(dest_mtime);
+    dest_object->set_obj_size(this->get_size());
+    dest_object->set_accounted_size(this->get_size());
+    ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " size is: " << dest_object->get_size() << dendl;
+    D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(dest_object);
+    d4n_dest_object->set_obj_state_attrs(dpp, y, baseAttrs);
+    bufferlist bl_data;
+    std::string key = dest_bucket->get_name() + "_" + this->dest_version + "_" + dest_object->get_name();
+    std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+    auto ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
+    baseAttrs.erase("user.rgw.mtime");
+    baseAttrs.erase("user.rgw.object_size");
+    baseAttrs.erase("user.rgw.accounted_size");
+    baseAttrs.erase("user.rgw.epoch");
+    if (ret == 0) {
+      time_t creationTime = ceph::real_clock::to_time_t(dest_mtime);
+      dest_object->set_attrs(baseAttrs);
+      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->dest_version << dendl;
+      bufferlist bl;
+      driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), this->dest_version, true, y);
+      d4n_dest_object->set_object_version(this->dest_version);
+      ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, true);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+        return ret;
+      }
+      driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, this->dest_version, true, dest_object->get_accounted_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_key(), y);
+
+      //write object to directory.
+      rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+          .objName = dest_object->get_oid(),
+          .bucketName = dest_object->get_bucket()->get_name(),
+          .creationTime = std::to_string(creationTime),
+          .dirty = true,
+          .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
+      };
+      ret = driver->get_obj_dir()->set(dpp, &object, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
+        return ret;
+      }
+    }
+  } else {
+    auto ret = next->copy_object(owner, remote_user, info, source_zone,
+                           nextObject(dest_object),
+                           nextBucket(dest_bucket),
+                           nextBucket(src_bucket),
+                           dest_placement, src_mtime, mtime,
+                           mod_ptr, unmod_ptr, high_precision_time, if_match,
+                           if_nomatch, attrs_mod, copy_if_newer, attrs,
+                           category, olh_epoch, delete_at, version_id, tag,
+                           etag, progress_cb, progress_data, dpp, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->copy_object failed with ret: " << ret << dendl;
+      return ret;
+    }
+  }
+  return 0;
+}
+
 int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
                             Attrs* delattrs, optional_yield y, uint32_t flags)
 {
@@ -624,9 +773,11 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
     ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl;
 
     bl_list.push_back(bl);
-    int r = client_cb->handle_data(bl, 0, bl.length());
-    if (r < 0) {
-      return r;
+    if (client_cb) {
+      int r = client_cb->handle_data(bl, 0, bl.length());
+      if (r < 0) {
+        return r;
+      }
     }
     auto it = blocks_info.find(offset);
     if (it != blocks_info.end()) {
@@ -654,6 +805,34 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs  << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
       source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y);
+      if (source->dest_object && source->dest_bucket) {
+        rgw::d4n::CacheBlock dest_block;
+        dest_block.cacheObj.objName = source->dest_object->get_oid();
+        dest_block.cacheObj.bucketName = source->dest_bucket->get_name();
+        dest_block.cacheObj.dirty = true; //writing to cache
+        dest_block.blockID = ofs;
+        dest_block.size = len;
+        dest_block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+        dest_block.version = source->dest_version;
+        dest_block.dirty = true;
+        std::string key = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name() +
+                                        "_" + std::to_string(ofs) + "_" + std::to_string(len);
+        std::string dest_oid_in_cache = "D_" + key;
+        auto ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, y);
+        if (ret == 0) {
+          rgw::sal::Attrs attrs;
+          ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " destination object version in update method is: " << source->dest_version << dendl;
+          ret = source->driver->get_cache_driver()->put(dpp, dest_oid_in_cache, bl, bl.length(), attrs, y);
+          if (ret == 0) {
+            source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), source->dest_version, true, y);
+          }
+          if (ret = source->driver->get_block_dir()->set(dpp, &dest_block, y); ret < 0){
+            ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+          }
+        } else {
+          ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " eviction returned ret: " << ret << dendl;
+        }
+      }
       blocks_info.erase(it);
     } else {
       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
@@ -991,10 +1170,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
   auto rgw_get_obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
 
   if (!last_part && bl.length() <= rgw_get_obj_max_req_size) {
-    auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
+    if (client_cb) {
+      auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
 
-    if (r < 0) {
-      return r;
+      if (r < 0) {
+        return r;
+      }
     }
   }
 
@@ -1003,8 +1184,9 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     Attrs attrs; // empty attrs for cache sets
     std::string version = source->get_object_version();
     std::string prefix = source->get_prefix();
+    std::string dest_prefix;
 
-    rgw::d4n::CacheBlock block, existing_block;
+    rgw::d4n::CacheBlock block, existing_block, dest_block;
     rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
     block.cacheObj.objName = source->get_key().get_oid();
     block.cacheObj.bucketName = source->get_bucket()->get_name();
@@ -1014,6 +1196,16 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     bool dirty = block.dirty = false; //Reading from the backend, data is clean
     block.version = version;
 
+    if (source->dest_object && source->dest_bucket) {
+      dest_prefix = source->dest_bucket->get_name() + "_" + source->dest_version + "_" + source->dest_object->get_name();
+      dest_block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+      dest_block.cacheObj.objName = source->dest_object->get_key().get_oid();
+      dest_block.cacheObj.bucketName = source->dest_object->get_bucket()->get_name();
+      //dest_block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
+      dest_block.cacheObj.dirty = dest_block.dirty = false;
+      dest_block.version = source->dest_version;
+    }
+
     //populating fields needed for building directory index
     existing_block.cacheObj.objName = block.cacheObj.objName;
     existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
@@ -1067,11 +1259,25 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           }
         }
       }
+      if (source->dest_object && source->dest_bucket) {
+        std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+        dest_block.blockID = ofs;
+        dest_block.size = bl.length();
+        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+        if (ret == 0) {
+          ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
+          if (ret == 0) {
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+            if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
+              ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+            }
+          }
+        }
+      }
     } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
       std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
       block.blockID = ofs;
       block.size = bl.length();
-      ofs += bl_len;
       if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
@@ -1101,6 +1307,22 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           }
         }
       }
+      if (source->dest_object && source->dest_bucket) {
+        std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+        dest_block.blockID = ofs;
+        dest_block.size = bl.length();
+        auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+        if (ret == 0) {
+          ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
+          if (ret == 0) {
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl.length(), source->dest_version, dirty, *y);
+            if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
+              ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+            }
+          }
+        }
+      }
+      ofs += bl_len;
     } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
       uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
       uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
@@ -1114,7 +1336,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
           block.blockID = ofs;
           block.size = bl_rem.length();
-          ofs += bl_rem.length();
+          
           auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
           if (ret == 0) {
             ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
@@ -1146,6 +1368,22 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           }
         }
 
+        if (source->dest_object && source->dest_bucket) {
+          std::string dest_oid = dest_prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+          dest_block.blockID = ofs;
+          dest_block.size = bl_rem.length();
+          auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, dest_block.size, *y);
+          if (ret == 0) {
+            ret = filter->get_cache_driver()->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
+            if (ret == 0) {
+              filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, ofs, bl_rem.length(), source->dest_version, dirty, *y);
+              if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
+                ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
+              }
+            }
+          }
+        }
+        ofs += bl_rem.length();
         bl_rem.clear();
         bl_rem = std::move(bl);
       }//bl_rem.length()
@@ -1153,9 +1391,8 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
   }
 
   /* Clean-up:
-  1. do we need to clean up older versions of the cache backend, when we update version in block directory?
-  2. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
-  3. do we need to revert the cache ops, in case the directory ops fail
+  1. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
+  2. do we need to revert the cache ops, in case the directory ops fail
   */
 
   return 0;
index 3f1e3e9f0e390b8fd02b539d15505cb010291139..a38012fb81373e58f3638e3a7a3e09dac5bbed30 100644 (file)
@@ -106,6 +106,9 @@ class D4NFilterObject : public FilterObject {
     std::string version;
     std::string prefix;
     rgw_obj obj;
+    rgw::sal::Object* dest_object{nullptr}; //for copy-object
+    rgw::sal::Bucket* dest_bucket{nullptr}; //for copy-object
+    std::string dest_version;
   public:
     struct D4NFilterReadOp : FilterReadOp {
       public:
@@ -182,6 +185,35 @@ class D4NFilterObject : public FilterObject {
                                                                    driver(_driver) {}
     virtual ~D4NFilterObject() = default;
 
+    virtual int copy_object(const ACLOwner& owner,
+                              const rgw_user& remote_user,
+                              req_info* info,
+                              const rgw_zone_id& source_zone,
+                              rgw::sal::Object* dest_object,
+                              rgw::sal::Bucket* dest_bucket,
+                              rgw::sal::Bucket* src_bucket,
+                              const rgw_placement_rule& dest_placement,
+                              ceph::real_time* src_mtime,
+                              ceph::real_time* mtime,
+                              const ceph::real_time* mod_ptr,
+                              const ceph::real_time* unmod_ptr,
+                              bool high_precision_time,
+                              const char* if_match,
+                              const char* if_nomatch,
+                              AttrsMod attrs_mod,
+                              bool copy_if_newer,
+                              Attrs& attrs,
+                              RGWObjCategory category,
+                              uint64_t olh_epoch,
+                              boost::optional<ceph::real_time> delete_at,
+                              std::string* version_id,
+                              std::string* tag,
+                              std::string* etag,
+                              void (*progress_cb)(off_t, void *),
+                              void* progress_data,
+                              const DoutPrefixProvider* dpp,
+                              optional_yield y) override;
+
     virtual const std::string &get_name() const override { return next->get_name(); }
     virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
                             Attrs* delattrs, optional_yield y, uint32_t flags) override;
@@ -207,6 +239,8 @@ class D4NFilterObject : public FilterObject {
     int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version);
     int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool is_latest_version = true, bool dirty = false);
     bool check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, optional_yield y);
+    rgw::sal::Bucket* get_destination_bucket(const DoutPrefixProvider* dpp) { return dest_bucket;}
+    rgw::sal::Object* get_destination_object(const DoutPrefixProvider* dpp) { return dest_object; }
 };
 
 class D4NFilterWriter : public FilterWriter {