]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: implementation of caching head in read workflow.
authorPritha Srivastava <prsrivas@redhat.com>
Fri, 8 Mar 2024 08:07:34 +0000 (13:37 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
modifications in ReadOp::prepare() method of the d4n filter driver
to cache the head object.

modification in get_obj_attrs to read from cache or backend store.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 3903e81dfddceb16c03209a943bb94c4d4ae5b35..605b8f9b6a7a2cac5a58a92254cddc615fcd48fa 100644 (file)
@@ -137,60 +137,193 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr
   return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
 }
 
+bool D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y)
+{
+  rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
+  rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+        .objName = this->get_name(),
+        .bucketName = this->get_bucket()->get_name(),
+        };
+
+  rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+          .cacheObj = object,
+          .blockID = 0,
+          .version = version,
+          .size = 0
+          };
+
+  bool found_in_cache = true;
+  //if the block corresponding to head object does not exist in directory, implies it is not cached
+  if (blockDir->exist_key(&block, y) && (blockDir->get(&block, y) == 0)) {
+    rgw::sal::Attrs attrs;
+    std::string version = block.version;
+    this->set_object_version(version);
+    //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version
+    std::string head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name();
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache." << dendl;
+    auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
+      found_in_cache = false;
+    } else {
+      /* Set metadata locally */
+      RGWQuotaInfo quota_info;
+
+      std::string instance;
+      for (auto& attr : attrs) {
+        if (attr.second.length() > 0) {
+          if (attr.first == "user.rgw.mtime") {
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
+            auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
+            this->set_mtime(mtime);
+          } else if (attr.first == "user.rgw.object_size") {
+            auto size = std::stoull(attr.second.c_str());
+            this->set_obj_size(size);
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
+          } else if (attr.first == "user.rgw.accounted_size") {
+            auto accounted_size = std::stoull(attr.second.c_str());
+            this->set_accounted_size(accounted_size);
+          } else if (attr.first == "user.rgw.epoch") {
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
+            auto epoch = std::stoull(attr.second.c_str());
+            this->set_epoch(epoch);
+          } else if (attr.first == "user.rgw.version_id") {
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id." << dendl;
+            instance = attr.second.to_str();
+          } else if (attr.first == "user.rgw.source_zone") {
+            ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
+            auto zone_short_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
+            this->set_short_zone_id(zone_short_id);
+          } else {
+            ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
+          }
+        }//end-if
+      }//end-for
+      //this->set_obj_state(astate);
+      this->set_instance(instance); //set this only after setting object state else it won't take effect
+      attrs.erase("user.rgw.mtime");
+      attrs.erase("user.rgw.object_size");
+      attrs.erase("user.rgw.accounted_size");
+      attrs.erase("user.rgw.epoch");
+      /* Set attributes locally */
+      ret = this->set_attrs(attrs);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
+      }
+    }
+  } else {
+    found_in_cache = false;
+  }
+
+  return found_in_cache;
+}
+
+void D4NFilterObject::set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs)
+{
+  bufferlist bl_val;
+  bl_val.append(std::to_string(this->get_size()));
+  attrs["user.rgw.object_size"] = std::move(bl_val);
+
+  bl_val.append(std::to_string(this->get_epoch()));
+  attrs["user.rgw.epoch"] = std::move(bl_val);
+
+  bl_val.append(std::to_string(ceph::real_clock::to_double(this->get_mtime())));
+  attrs["user.rgw.mtime"] = std::move(bl_val);
+
+  if(this->have_instance()) {
+    bl_val.append(this->get_instance());
+    attrs["user.rgw.version_id"] = std::move(bl_val);
+  }
+
+  bl_val.append(std::to_string(this->get_short_zone_id()));
+  attrs["user.rgw.source_zone"] = std::move(bl_val);
+
+  bl_val.append(std::to_string(this->get_accounted_size()));
+  attrs["user.rgw.accounted_size"] = std::move(bl_val); // will this get updated?
+
+  return;
+}
+
+int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version)
+{
+  //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
+  if (! this->have_instance() && version.empty()) {
+    bufferlist bl;
+     if (this->get_attr(RGW_ATTR_ID_TAG, bl)) {
+      version = bl.c_str();
+      ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl;
+    } else {
+      ldpp_dout(dpp, 20) << __func__ << " Failed to find id tag" << dendl;
+      return -ENOENT;
+    }
+  }
+  bufferlist bl;
+  if (this->have_instance()) {
+    version = this->get_instance();
+  }
+
+  this->set_object_version(version);
+
+  return 0;
+}
+
+int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y)
+{
+  rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
+  rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+        .objName = this->get_name(),
+        .bucketName = this->get_bucket()->get_name(),
+        };
+
+  rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+          .cacheObj = object,
+          .blockID = 0,
+          .version = this->get_object_version(),
+          .size = 0
+          };
+
+  auto ret = blockDir->set(&block, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+  }
+
+  return ret;
+}
+
 int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                                 rgw_obj* target_obj)
 {
-  rgw::sal::Attrs attrs;
-
-  if (driver->get_cache_driver()->get_attrs(dpp, this->get_key().get_oid(), attrs, y) < 0) {
-    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
-    return next->get_obj_attrs(y, dpp, target_obj);
-  } else {
-    /* Set metadata locally */
-    RGWQuotaInfo quota_info;
+  if (!get_obj_attrs_from_cache(dpp, y)) {
+    std::string head_oid_in_cache;
+    rgw::sal::Attrs attrs;
+    std::string version;
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from backend store." << dendl;
+    auto ret = next->get_obj_attrs(y, dpp, target_obj);
+    if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl;
+      return ret;
+    }
+  
     this->load_obj_state(dpp, y);
+    attrs = this->get_attrs();
+    this->set_obj_state_attrs(dpp, y, attrs);
 
-    for (auto it = attrs.begin(); it != attrs.end(); ++it) {
-      if (it->second.length() > 0) {
-       if (it->first == "mtime") {
-         ceph::real_time mtime;
-         parse_time(it->second.c_str(), &mtime);
-         this->set_mtime(mtime);
-         attrs.erase(it->first);
-       } else if (it->first == "object_size") {
-         this->set_obj_size(std::stoull(it->second.c_str()));
-         attrs.erase(it->first);
-       } else if (it->first == "accounted_size") {
-         this->set_accounted_size(std::stoull(it->second.c_str()));
-         attrs.erase(it->first);
-       } else if (it->first == "epoch") {
-         this->set_epoch(std::stoull(it->second.c_str()));
-         attrs.erase(it->first);
-       } else if (it->first == "version_id") {
-         this->set_instance(it->second.c_str());
-         attrs.erase(it->first);
-       } else if (it->first == "this_zone_short_id") {
-         this->set_short_zone_id(static_cast<uint32_t>(std::stoul(it->second.c_str())));
-         attrs.erase(it->first);
-       } else if (it->first == "user_quota.max_size") {
-         quota_info.max_size = std::stoull(it->second.c_str());
-         attrs.erase(it->first);
-       } else if (it->first == "user_quota.max_objects") {
-         quota_info.max_objects = std::stoull(it->second.c_str());
-         attrs.erase(it->first);
-       } else if (it->first == "max_buckets") {
-         attrs.erase(it->first);
-       } else {
-         ldpp_dout(dpp, 20) << "D4N Filter: Unexpected attribute; not locally set." << dendl;
-         attrs.erase(it->first);
-       }
-      }
+    ret = calculate_version(dpp, y, version);
+    if (ret < 0 || version.empty()) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
     }
 
-    /* Set attributes locally */
-    if (this->set_attrs(attrs) < 0) {
-      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
-      return next->get_obj_attrs(y, dpp, target_obj);
+    head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name();
+    ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
+    if (ret == 0) {
+      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
+      this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, y);
+      ret = set_head_obj_dir_entry(dpp, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+      }
+    } else {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in block dir with error: " << ret << dendl;
     }
   }
 
@@ -279,79 +412,54 @@ std::unique_ptr<Object::DeleteOp> D4NFilterObject::get_delete_op()
 
 int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp)
 {
-  next->params.mod_ptr = params.mod_ptr;
-  next->params.unmod_ptr = params.unmod_ptr;
-  next->params.high_precision_time = params.high_precision_time;
-  next->params.mod_zone_id = params.mod_zone_id;
-  next->params.mod_pg_ver = params.mod_pg_ver;
-  next->params.if_match = params.if_match;
-  next->params.if_nomatch = params.if_nomatch;
-  next->params.lastmod = params.lastmod;
-  int ret = next->prepare(y, dpp);
-
-  rgw::sal::Attrs attrs;
-
-  if (source->driver->get_cache_driver()->get_attrs(dpp, source->get_key().get_oid(), attrs, y) < 0) {
-    ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
-  } else {
-    /* Set metadata locally */
-    RGWQuotaInfo quota_info;
-    source->load_obj_state(dpp, y);
-
-    for (auto& attr : attrs) {
-      if (attr.second.length() > 0) {
-       if (attr.first == "mtime") {
-         ceph::real_time mtime;
-         parse_time(attr.second.c_str(), &mtime);
-         source->set_mtime(mtime);
-       } else if (attr.first == "object_size") {
-         source->set_obj_size(std::stoull(attr.second.c_str()));
-         attrs.erase(attr.first);
-       } else if (attr.first == "accounted_size") {
-         source->set_accounted_size(std::stoull(attr.second.c_str()));
-         attrs.erase(attr.first);
-       } else if (attr.first == "epoch") {
-         source->set_epoch(std::stoull(attr.second.c_str()));
-         attrs.erase(attr.first);
-       } else if (attr.first == "version_id") {
-         source->set_instance(attr.second.c_str());
-         attrs.erase(attr.first);
-       } else if (attr.first == "source_zone_short_id") {
-         source->set_short_zone_id(static_cast<uint32_t>(std::stoul(attr.second.c_str())));
-         attrs.erase(attr.first);
-       } else if (attr.first == "user_quota.max_size") {
-         quota_info.max_size = std::stoull(attr.second.c_str());
-         attrs.erase(attr.first);
-       } else if (attr.first == "user_quota.max_objects") {
-         quota_info.max_objects = std::stoull(attr.second.c_str());
-         attrs.erase(attr.first);
-       } else if (attr.first == "max_buckets") {
-         attrs.erase(attr.first);
-       } else {
-         ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): Unexpected attribute; not locally set." << dendl;
-       }
+  if (!source->get_obj_attrs_from_cache(dpp, y)) {
+    std::string head_oid_in_cache;
+    rgw::sal::Attrs attrs;
+    std::string version;
+    ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): fetching head object from backend store" << dendl;
+    next->params = params;
+    auto ret = next->prepare(y, dpp);
+    if (ret < 0) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed with error: " << ret << dendl;
+      return ret;
+    }
+    if (params.part_num) {
+      params.parts_count = next->params.parts_count;
+      if (params.parts_count > 1) {
+        ldpp_dout(dpp, 20) << __func__ << "params.part_count: " << params.parts_count << dendl;
+        return 0; // d4n wont handle multipart read requests with part number for now
       }
-   
-    /* Set attributes locally */
-    if (source->set_attrs(attrs) < 0)
-      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
-  }
-  }
+    }
+    this->source->load_obj_state(dpp, y);
+    attrs = source->get_attrs();
+    source->set_obj_state_attrs(dpp, y, attrs);
 
-  //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only
-  if (! source->have_instance()) {
-    if (source->load_obj_state(dpp, y) == 0) {
-      bufferlist bl;
-      if (source->get_attr(RGW_ATTR_ID_TAG, bl)) {
-        source->set_object_version(bl.c_str());
-        ldpp_dout(dpp, 20) << __func__ << "id tag version is: " << source->get_object_version() << dendl;
+    ret = source->calculate_version(dpp, y, version);
+    if (ret < 0 || version.empty()) {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+    }
+
+    bufferlist bl;
+    head_oid_in_cache = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name();
+    ret = source->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+    if (ret == 0) {
+      ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+      if (ret == 0) {
+        ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
+        source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, y);
+        ret = source->set_head_obj_dir_entry(dpp, y);
+        if (ret < 0) {
+          ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+        }
       } else {
-        ldpp_dout(dpp, 20) << __func__ << "Failed to find id tag" << dendl;
+        ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): put for head object failed with error: " << ret << dendl;
       }
+    } else {
+      ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object, eviction returned error: " << ret << dendl;
     }
   }
-
-  return ret;
+  
+  return 0;
 }
 
 void D4NFilterObject::D4NFilterReadOp::cancel() {
@@ -400,14 +508,12 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
     if (it != blocks_info.end()) {
       std::string version = source->get_object_version();
       std::string prefix = source->get_prefix();
-      if (version.empty()) {
-        version = source->get_instance();
-      }
       std::pair<uint64_t, uint64_t> ofs_len_pair = it->second;
       uint64_t ofs = ofs_len_pair.first;
       uint64_t len = ofs_len_pair.second;
       std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs  << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
+      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
       source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y);
       blocks_info.erase(it);
     } else {
@@ -428,7 +534,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
   std::string version = source->get_object_version();
   std::string prefix;
-  if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+  /* After prepare() method, for versioned objects, get_oid() returns an oid with versionId added,
+   * even for versioned objects, where version id is not provided as input
+   */
+  if (source->have_instance()) {
     prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid();
   } else {
     prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid();
@@ -463,10 +572,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
   this->offset = ofs;
 
-  if (version.empty()) {
-    version = source->get_instance();
-  }
-
   do {
     uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
     if (start_part_num == (num_parts - 1)) {
@@ -487,8 +592,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     ceph::bufferlist bl;
     std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
 
-    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl;
-
     ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
     " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
@@ -616,9 +719,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     Attrs attrs; // empty attrs for cache sets
     std::string version = source->get_object_version();
     std::string prefix = source->get_prefix();
-    if (version.empty()) {
-      version = source->get_instance();
-    }
+
     ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
index 508bc3a604356e77a62fe41cc09bfc77e6245b78..40c98d5d2f884d69b50f4387b9f5921d7e5dcd7c 100644 (file)
@@ -106,6 +106,10 @@ class D4NFilterObject : public FilterObject {
     std::string version;
     std::string prefix;
 
+    bool get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y);
+    void set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs);
+    int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version);
+    int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y);
   public:
     struct D4NFilterReadOp : FilterReadOp {
       public: