]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: squashing the following commits for restoring
authorPritha Srivastava <prsrivas@redhat.com>
Tue, 3 Sep 2024 04:59:20 +0000 (10:29 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
in-memory data for LFUDA policy and dirty objects from
cache on disk

1. rgw/d4n: restore in memory data structure for dirty
objects using the xattrs in the head block of an object
in the cache.
2. rgw/d4n: replacing string cache attr names with constexpr char.
3. rgw/d4n: restore LFUDA policy data from cache on disk.
4. rgw/d4n: correcting the key used while updating the LFUDA data structure
for the head object block in the read path.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_cache_driver.h
src/rgw/rgw_redis_driver.h
src/rgw/rgw_ssd_driver.cc
src/rgw/rgw_ssd_driver.h
src/test/rgw/test_d4n_policy.cc

index d5b83928841258ff1c3ce47c309180d44d5881de..0f5edc2bfb0c95002fba7ac702ca002507492ae9 100644 (file)
@@ -51,6 +51,19 @@ static inline void redis_exec(std::shared_ptr<connection> conn,
 
 int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) {
   response<int, int, int, int> resp;
+  static auto obj_callback = [this](
+          const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
+                           time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+                           const rgw_obj_key& obj_key, optional_yield y) {
+    update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y);
+  };
+
+  static auto block_callback = [this](
+          const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val) {
+    update(dpp, key, offset, len, version, dirty, y, restore_val);
+  };
+
+  cacheDriver->restore_blocks_objects(dpp, obj_callback, block_callback);
 
   driver = _driver;
   if (dpp->get_cct()->_conf->d4n_writecache_enabled) {
@@ -298,7 +311,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
         (*it->second->handle)->localWeight = it->second->localWeight;
        entries_heap.decrease(it->second->handle); // larger value means node must be decreased to maintain min heap 
 
-       if (int ret = cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(it->second->localWeight), y) < 0) { 
+       if (int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y) < 0) { 
          delete victim;
          return ret;
         }
@@ -345,13 +358,26 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
   return 0;
 }
 
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val)
 {
+  ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updating entry: " << key << dendl;
   using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
   const std::lock_guard l(lfuda_lock);
   int localWeight = age;
   auto entry = find_entry(key);
   bool updateLocalWeight = true;
+
+  std::string oid_in_cache = key;
+  if (dirty == true) {
+    oid_in_cache = "D_" + key;
+  }
+
+  if (!restore_val.empty()) {
+    updateLocalWeight = false;
+    localWeight = std::stoull(restore_val);
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl;
+  }
+
   // check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
   // incoming dirty flag is false, that means update() is invoked as part of cleaning process,
   // so we must not update its localWeight.
@@ -369,24 +395,19 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key,
   e->set_handle(handle);
   entries_map.emplace(key, e);
 
-  std::string oid_in_cache = key;
-  if (dirty == true) {
-    oid_in_cache = "D_" + key;
-  }
-
   if (updateLocalWeight) {
     int ret = -1;
-    if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y)) < 0) 
+    if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y)) < 0) 
       ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl;
   }
 
   weightSum += ((localWeight < 0) ? 0 : localWeight);
 }
 
-void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
+void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
 {
   using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
-  ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock." << dendl;
+  ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl;
   const std::lock_guard l(lfuda_cleaning_lock);
   LFUDAObjEntry* e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
   handle_type handle = object_heap.push(e);
@@ -448,6 +469,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
     ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl;
     ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->version=" << e->version << dendl;
+    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_name=" << e->bucket_name << dendl;
+    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_id=" << e->bucket_id << dendl;
+    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl;
+    ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl;
     l.unlock();
     if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
       rgw_user c_rgw_user = e->user;
@@ -500,10 +525,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
       ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
       bufferlist bl;
       cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
-      obj_attrs.erase("user.rgw.mtime");
-      obj_attrs.erase("user.rgw.object_size");
-      obj_attrs.erase("user.rgw.accounted_size");
-      obj_attrs.erase("user.rgw.epoch");
+      obj_attrs.erase(RGW_CACHE_ATTR_MTIME);
+      obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+      obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+      obj_attrs.erase(RGW_CACHE_ATTR_EPOCH);
 
       do {
         ceph::bufferlist data;
@@ -655,7 +680,7 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y
   return 0;
 }
 
-void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val)
 {
   const std::lock_guard l(lru_lock);
   _erase(dpp, key, y);
index acc9a3d1eb7b1dcee1eb634e961542ebd28505e1..b23ee407354de1b04aa6bd44eecf66d46564ad02 100644 (file)
@@ -15,6 +15,8 @@ namespace rgw { namespace d4n {
 namespace asio = boost::asio;
 namespace sys = boost::system;
 
+static std::string empty = std::string();
+
 class CachePolicy {
   protected:
     struct Entry : public boost::intrusive::list_base_hook<> {
@@ -47,7 +49,7 @@ class CachePolicy {
       rgw_obj_key obj_key;
       ObjEntry() = default;
       ObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, 
-               time_t creationTime, rgw_user user, std::string& etag, 
+               time_t creationTime, rgw_user user, const std::string& etag, 
                const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), 
                                                                              creationTime(creationTime), user(user), etag(etag), 
                                                                              bucket_name(bucket_name), bucket_id(bucket_id), obj_key(obj_key) {}
@@ -62,7 +64,7 @@ class CachePolicy {
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
     virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0;
     virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, 
-                           time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+                           time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
                            const rgw_obj_key& obj_key, optional_yield y) = 0;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
     virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
@@ -110,9 +112,9 @@ class LFUDAPolicy : public CachePolicy {
       using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
       handle_type handle;
 
-      LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, 
-                     time_t creationTime, rgw_user user, std::string& etag, 
-                     const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, 
+      LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size,
+                     time_t creationTime, rgw_user user, const std::string& etag,
+                     const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size,
                                                                            creationTime, user, etag, bucket_name, bucket_id, obj_key) {}
 
       void set_handle(handle_type handle_) { handle = handle_; }
@@ -177,11 +179,11 @@ class LFUDAPolicy : public CachePolicy {
     virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     void save_y(optional_yield y) { this->y = y; }
     virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, 
-                           time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+                           time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
                            const rgw_obj_key& obj_key, optional_yield y) override;
     virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
     virtual void cleaning(const DoutPrefixProvider* dpp) override;
@@ -212,9 +214,9 @@ class LRUPolicy : public CachePolicy {
     virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
     virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, 
-                           time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+                           time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
                            const rgw_obj_key& obj_key, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
index e0b1ac779a7eba1d231309dd33f98cd2d68130a6..28ea81972f12b8e1569ec5732c9213bac508e78a 100644 (file)
@@ -198,11 +198,11 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
   //ATTRSMOD_NONE - the attributes of the source object will be copied without modifications, attrs parameter is ignored
   if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
     baseAttrs = this->get_attrs();
-    baseAttrs.erase("user.rgw.version_id"); //delete source version id
+    baseAttrs.erase(RGW_CACHE_ATTR_VERSION_ID); //delete source version id
     if (version_id) {
       bufferlist bl_val;
       bl_val.append(*version_id);
-      baseAttrs["user.rgw.version_id"] = std::move(bl_val); //populate destination version id
+      baseAttrs[RGW_CACHE_ATTR_VERSION_ID] = std::move(bl_val); //populate destination version id
     }
   }
 
@@ -275,7 +275,7 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
   }
   bufferlist bl_val;
   bl_val.append(std::to_string(this->is_multipart()));
-  baseAttrs["user.rgw.multipart"] = std::move(bl_val);
+  baseAttrs[RGW_CACHE_ATTR_MULTIPART] = std::move(bl_val);
   bl_val.append(*etag);
   baseAttrs[RGW_ATTR_ETAG] = std::move(bl_val);
   baseAttrs[RGW_ATTR_ACL] = std::move(attrs[RGW_ATTR_ACL]);
@@ -293,11 +293,13 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
   auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, baseAttrs.size(), y);
   if (ret == 0) {
     ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
-    baseAttrs.erase("user.rgw.mtime");
-    baseAttrs.erase("user.rgw.object_size");
-    baseAttrs.erase("user.rgw.accounted_size");
-    baseAttrs.erase("user.rgw.epoch");
-    baseAttrs.erase("user.rgw.multipart");
+    baseAttrs.erase(RGW_CACHE_ATTR_MTIME);
+    baseAttrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+    baseAttrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+    baseAttrs.erase(RGW_CACHE_ATTR_EPOCH);
+    baseAttrs.erase(RGW_CACHE_ATTR_MULTIPART);
+    baseAttrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+    baseAttrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
     if (ret == 0) {
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << dest_version << dendl;
       bufferlist bl;
@@ -388,29 +390,29 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt
     std::string instance;
     for (auto& attr : attrs) {
       if (attr.second.length() > 0) {
-        if (attr.first == "user.rgw.mtime") {
+        if (attr.first == RGW_CACHE_ATTR_MTIME) {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
           auto mtime = ceph::real_clock::from_double(std::stod(attr.second.to_str()));
           this->set_mtime(mtime);
-        } else if (attr.first == "user.rgw.object_size") {
+        } else if (attr.first == RGW_CACHE_ATTR_OBJECT_SIZE) {
           auto size = std::stoull(attr.second.to_str());
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
           this->set_obj_size(size);
-        } else if (attr.first == "user.rgw.accounted_size") {
+        } else if (attr.first == RGW_CACHE_ATTR_ACCOUNTED_SIZE) {
           auto accounted_size = std::stoull(attr.second.to_str());
           this->set_accounted_size(accounted_size);
-        } else if (attr.first == "user.rgw.epoch") {
+        } else if (attr.first == RGW_CACHE_ATTR_EPOCH) {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
           auto epoch = std::stoull(attr.second.to_str());
           this->set_epoch(epoch);
-        } else if (attr.first == "user.rgw.version_id") {
+        } else if (attr.first == RGW_CACHE_ATTR_VERSION_ID) {
           instance = attr.second.to_str();
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl;
-        } else if (attr.first == "user.rgw.source_zone") {
+        } else if (attr.first == RGW_CACHE_ATTR_SOURC_ZONE) {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
           auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.to_str()));
           this->set_short_zone_id(short_zone_id);
-        } else if (attr.first == "user.rgw.multipart") {
+        } else if (attr.first == RGW_CACHE_ATTR_MULTIPART) {
           std::string multipart = attr.second.to_str();
           this->multipart = (multipart == "1") ? true : false;
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): is_multipart: " << this->multipart << " multipart: " << multipart << dendl;
@@ -420,11 +422,13 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt
       }//end-if
     }//end-for
     this->set_instance(instance); //set this only after setting object state else it won't take effect
-    attrs.erase("user.rgw.mtime");
-    attrs.erase("user.rgw.object_size");
-    attrs.erase("user.rgw.accounted_size");
-    attrs.erase("user.rgw.epoch");
-    attrs.erase("user.rgw.multipart");
+    attrs.erase(RGW_CACHE_ATTR_MTIME);
+    attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+    attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+    attrs.erase(RGW_CACHE_ATTR_EPOCH);
+    attrs.erase(RGW_CACHE_ATTR_MULTIPART);
+    attrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+    attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
     /* Set attributes locally */
     auto ret = this->set_attrs(attrs);
     if (ret < 0) {
@@ -466,24 +470,30 @@ void D4NFilterObject::set_attrs_from_obj_state(const DoutPrefixProvider* dpp, op
 {
   bufferlist bl_val;
   bl_val.append(std::to_string(this->get_size()));
-  attrs["user.rgw.object_size"] = std::move(bl_val);
+  attrs[RGW_CACHE_ATTR_OBJECT_SIZE] = std::move(bl_val);
 
   bl_val.append(std::to_string(this->get_epoch()));
-  attrs["user.rgw.epoch"] = std::move(bl_val);
+  attrs[RGW_CACHE_ATTR_EPOCH] = std::move(bl_val);
 
   bl_val.append(std::to_string(ceph::real_clock::to_double(this->get_mtime())));
-  attrs["user.rgw.mtime"] = std::move(bl_val);
+  attrs[RGW_CACHE_ATTR_MTIME] = std::move(bl_val);
 
   if(this->have_instance()) {
     bl_val.append(this->get_instance());
-    attrs["user.rgw.version_id"] = std::move(bl_val);
+    attrs[RGW_CACHE_ATTR_VERSION_ID] = std::move(bl_val);
   }
 
   bl_val.append(std::to_string(this->get_short_zone_id()));
-  attrs["user.rgw.source_zone"] = std::move(bl_val);
+  attrs[RGW_CACHE_ATTR_SOURC_ZONE] = std::move(bl_val);
 
   bl_val.append(std::to_string(this->get_accounted_size()));
-  attrs["user.rgw.accounted_size"] = std::move(bl_val); // will this get updated?
+  attrs[RGW_CACHE_ATTR_ACCOUNTED_SIZE] = std::move(bl_val); // will this get updated?
+
+  bl_val.append(this->get_key().ns);
+  attrs[RGW_CACHE_ATTR_OBJECT_NS] = std::move(bl_val);
+
+  bl_val.append(this->get_bucket()->get_name());
+  attrs[RGW_CACHE_ATTR_BUCKET_NAME] = std::move(bl_val);
 
   return;
 }
@@ -731,6 +741,12 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide
       found_in_cache = false;
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
     }
+    std::string key = head_oid_in_cache;
+    if (block.cacheObj.dirty) {
+      // Remove dirty prefix
+      key = key.erase(0, 2);
+    }
+    this->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, 0, version, block.cacheObj.dirty, y);
   } else if (ret == -ENOENT) { //if blockDir->get
     found_in_cache = false;
   } else {
@@ -1925,8 +1941,8 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
       }
       
       std::string size;
-      if (attrs.find("user.rgw.object_size") != attrs.end()) {
-       size = attrs.find("user.rgw.object_size")->second.to_str();
+      if (attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE) != attrs.end()) {
+       size = attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE)->second.to_str();
       } else {
        ldpp_dout(dpp, 0) << "Failed to retrieve size for for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
         return -EINVAL;
@@ -2017,7 +2033,6 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
 
 int D4NFilterWriter::prepare(optional_yield y) 
 {
-  startTime = time(NULL);
   d4n_writecache = g_conf()->d4n_writecache_enabled;
 
   if (!d4n_writecache) {
@@ -2107,7 +2122,6 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
 {
   bool dirty = false;
   std::unordered_set<std::string> hostsList = {};
-  auto creationTime = startTime;
   std::string objEtag = etag;
   auto size = object->get_size();
   std::string instance;
@@ -2180,6 +2194,9 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
     ceph::real_time m_time;
     dirty = true;
     if (mtime) {
+      if (real_clock::is_zero(*mtime)) {
+        *mtime = real_clock::now();
+      }
       m_time = *mtime;
     } else {
       m_time = real_clock::now();
@@ -2225,11 +2242,13 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
   ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
   if (ret == 0) {
     ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
-    attrs.erase("user.rgw.mtime");
-    attrs.erase("user.rgw.object_size");
-    attrs.erase("user.rgw.accounted_size");
-    attrs.erase("user.rgw.epoch");
-    attrs.erase("user.rgw.multipart");
+    attrs.erase(RGW_CACHE_ATTR_MTIME);
+    attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+    attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+    attrs.erase(RGW_CACHE_ATTR_EPOCH);
+    attrs.erase(RGW_CACHE_ATTR_MULTIPART);
+    attrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+    attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
     object->set_object_version(version);
     if (ret == 0) {
       ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl;
@@ -2242,6 +2261,7 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
       if (dirty) {
         //using object oid here so that version is automatically picked for versioned buckets, and for non-versioned buckets the old version is replaced by the latest version
         std::string object_key = obj->get_bucket()->get_bucket_id() + "_" + obj->get_oid();
+        auto creationTime = ceph::real_clock::to_time_t(object->get_mtime());
         ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): object_key=" << object_key << dendl;
         driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, object_key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y);
       }
@@ -2283,7 +2303,7 @@ int D4NFilterMultipartUpload::complete(const DoutPrefixProvider *dpp,
   bufferlist bl_val;
   bool is_multipart = true;
   bl_val.append(std::to_string(is_multipart));
-  attrs["user.rgw.multipart"] = std::move(bl_val);
+  attrs[RGW_CACHE_ATTR_MULTIPART] = std::move(bl_val);
 
   std::string version;
   d4n_target_obj->calculate_version(dpp, y, version, attrs);
index 4d76df0ded4cfe1dcba8dd5064e77fe60f9cbe86..48e364a66acde124bd84d3c7ba68c83568bf1da7 100644 (file)
@@ -284,7 +284,6 @@ class D4NFilterWriter : public FilterWriter {
     bool atomic;
     optional_yield y;
     bool d4n_writecache;
-    time_t startTime;
     std::string version;
 
   public:
index 50886e51d430ac52f7c7b3f10534fb5add4e23f5..8c3491666f3691037aa182db4122542655d26a0f 100644 (file)
@@ -3,13 +3,31 @@
 #include "rgw_common.h"
 #include "rgw_aio.h"
 
+constexpr char RGW_CACHE_ATTR_MTIME[] = "user.rgw.mtime";
+constexpr char RGW_CACHE_ATTR_EPOCH[] = "user.rgw.epoch";
+constexpr char RGW_CACHE_ATTR_OBJECT_SIZE[] = "user.rgw.object_size";
+constexpr char RGW_CACHE_ATTR_ACCOUNTED_SIZE[] = "user.rgw.accounted_size";
+constexpr char RGW_CACHE_ATTR_MULTIPART[] = "user.rgw.multipart";
+constexpr char RGW_CACHE_ATTR_OBJECT_NS[] = "user.rgw.object_ns";
+constexpr char RGW_CACHE_ATTR_BUCKET_NAME[] = "user.rgw.bucket_name";
+constexpr char RGW_CACHE_ATTR_VERSION_ID[] = "user.rgw.version_id";
+constexpr char RGW_CACHE_ATTR_SOURC_ZONE[] = "user.rgw.source_zone";
+constexpr char RGW_CACHE_ATTR_LOCAL_WEIGHT[] = "user.rgw.localWeight";
+
 namespace rgw { namespace cache {
 
+typedef std::function<void(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
+                           time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+                           const rgw_obj_key& obj_key, optional_yield y)> ObjectDataCallback;
+
+typedef std::function<void(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version,
+        bool dirty, optional_yield y, std::string& restore_val)> BlockDataCallback;
+
 struct Partition {
-    std::string name;
-    std::string type;
-    std::string location;
-    uint64_t size;
+  std::string name;
+  std::string type;
+  std::string location;
+  uint64_t size;
 };
 
 class CacheDriver {
@@ -36,6 +54,9 @@ class CacheDriver {
     /* Partition */
     virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) = 0;
     virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) = 0;
+
+    /* Data Recovery from Cache */
+    virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) = 0;
 };
 
 } } // namespace rgw::cache
index 8496a438875151147d64827dff089438f94d6409..e100976cace31556018686da58193b88fc015171 100644 (file)
@@ -47,7 +47,8 @@ class RedisDriver : public CacheDriver {
     virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) override;
     virtual int get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, std::string& attr_val, optional_yield y) override;
     void shutdown();
-     
+
+    virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) override { return 0; }
   private:
     std::shared_ptr<connection> conn;
     Partition partition_info;
index b68530efdc3b8151f4817a2884f92136071037dc..83647c3202954f06d7273662ce27665bc821cc4c 100644 (file)
@@ -13,8 +13,6 @@ namespace efs = std::filesystem;
 
 namespace rgw { namespace cache {
 
-constexpr std::string_view ATTR_PREFIX = "user.rgw.";
-
 int SSDDriver::initialize(const DoutPrefixProvider* dpp)
 {
     if(partition_info.location.back() != '/') {
@@ -92,6 +90,161 @@ int SSDDriver::initialize(const DoutPrefixProvider* dpp)
     return 0;
 }
 
+int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func)
+{
+    if (dpp->get_cct()->_conf->rgw_d4n_l1_evict_cache_on_start) {
+        return 0; //don't do anything as the cache directory must have been evicted during start-up
+    }
+    for (auto const& dir_entry : std::filesystem::directory_iterator{partition_info.location}) {
+        ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): dir_entry.path: " << dir_entry.path() << dendl;
+        std::string file_name = dir_entry.path().filename();
+        std::vector<std::string> parts;
+        std::string part;
+        bool parsed = false;
+        ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): filename: " << file_name << dendl;
+        try {
+            std::stringstream ss(file_name);
+            while (std::getline(ss, part, '_')) {
+                parts.push_back(part);
+            }
+            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): parts.size(): " << parts.size() << dendl;
+            //non-dirty or clean blocks - bucket_id, version, object_name in head block and offset, len in data blocks
+            if (parts.size() == 3 || parts.size() == 5) {
+                std::string key = file_name;
+                ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_name: " << file_name << dendl;
+
+                std::string version = parts[1];
+                ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): version: " << version << dendl;
+
+                uint64_t offset = 0, len = 0;
+                if (parts.size() == 5) {
+                    offset = std::stoull(parts[3]);
+                    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): offset: " << offset << dendl;
+
+                    len = std::stoull(parts[4]);
+                    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): len: " << len << dendl;
+                }
+                std::string localWeightStr;
+                auto ret = get_attr(dpp, file_name, RGW_CACHE_ATTR_LOCAL_WEIGHT, localWeightStr, null_yield);
+                if (ret < 0) {
+                    ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): Failed to get attr: " << RGW_CACHE_ATTR_LOCAL_WEIGHT << dendl;
+                } else {
+                    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl;
+                }
+                block_func(dpp, key, offset, len, version, false, null_yield, localWeightStr);
+                parsed = true;
+            }
+            //dirty blocks - "D", bucket_id, version, object_name in head block and offset, len in data blocks
+            if ((parts.size() == 4 || parts.size() == 6) && parts[0] == "D") {
+                std::string prefix = "D_";
+                if (file_name.starts_with(prefix)) {
+                    std::string key = file_name.substr(prefix.length());
+                    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key: " << key << dendl;
+
+                    bool dirty = true;
+
+                    std::string bucket_id = parts[1];
+                    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): bucket_id: " << bucket_id << dendl;
+
+                    std::string version = parts[2];
+                    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): version: " << version << dendl;
+
+                    std::string obj_name = parts[3];
+                    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): obj_name: " << obj_name << dendl;
+
+                    uint64_t len = 0, offset = 0;
+                    std::string localWeightStr;
+                    if (parts.size() == 4) {
+                        rgw::sal::Attrs attrs;
+                        get_attrs(dpp, file_name, attrs, null_yield);
+                        std::string etag, bucket_name;
+                        uint64_t size = 0;
+                        time_t creationTime = time_t(nullptr);
+                        rgw_user user;
+                        rgw_obj_key obj_key;
+                        if (attrs.find(RGW_ATTR_ETAG) != attrs.end()) {
+                            etag = attrs[RGW_ATTR_ETAG].to_str();
+                            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): etag: " << etag << dendl;
+                        }
+                        if (attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE) != attrs.end()) {
+                            size = std::stoull(attrs[RGW_CACHE_ATTR_OBJECT_SIZE].to_str());
+                            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): size: " << size << dendl;
+                        }
+                        if (attrs.find(RGW_CACHE_ATTR_MTIME) != attrs.end()) {
+                            creationTime = ceph::real_clock::to_time_t(ceph::real_clock::from_double(std::stod(attrs[RGW_CACHE_ATTR_MTIME].to_str())));
+                            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): creationTime: " << creationTime << dendl;
+                        }
+                        if (attrs.find(RGW_ATTR_ACL) != attrs.end()) {
+                            bufferlist bl_acl = attrs[RGW_ATTR_ACL];
+                            RGWAccessControlPolicy policy;
+                            auto iter = bl_acl.cbegin();
+                            try {
+                                policy.decode(iter);
+                            } catch (buffer::error& err) {
+                                ldpp_dout(dpp, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
+                                continue;
+                            }
+                            user = std::get<rgw_user>(policy.get_owner().id);
+                            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): rgw_user: " << user.to_str() << dendl;
+                        }
+                        obj_key.name = obj_name;
+                        if (attrs.find(RGW_CACHE_ATTR_VERSION_ID) != attrs.end()) {
+                            std::string instance = attrs[RGW_CACHE_ATTR_VERSION_ID].to_str();
+                            if (instance != "null") {
+                                obj_key.instance = instance;
+                            }
+                        }
+                        if (attrs.find(RGW_CACHE_ATTR_OBJECT_NS) != attrs.end()) {
+                            obj_key.ns = attrs[RGW_CACHE_ATTR_OBJECT_NS].to_str();
+                        }
+                        ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): rgw_obj_key: " << obj_key.get_oid() << dendl;
+                        if (attrs.find(RGW_CACHE_ATTR_BUCKET_NAME) != attrs.end()) {
+                            bucket_name = attrs[RGW_CACHE_ATTR_BUCKET_NAME].to_str();
+                            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): bucket_name: " << bucket_name << dendl;
+                        }
+
+                        if (attrs.find(RGW_CACHE_ATTR_LOCAL_WEIGHT) != attrs.end()) {
+                            localWeightStr = attrs[RGW_CACHE_ATTR_LOCAL_WEIGHT].to_str();
+                            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl;
+                        }
+
+                        ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): calling func for: " << key << dendl;
+                        obj_func(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield);
+                        block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr);
+                        parsed = true;
+                    } //end-if part.size() == 4
+                    if (parts.size() == 6) {
+                        offset = std::stoull(parts[4]);
+                        ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): offset: " << offset << dendl;
+
+                        len = std::stoull(parts[5]);
+                        ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): len: " << len << dendl;
+                        std::string localWeightStr;
+                        auto ret = get_attr(dpp, file_name, RGW_CACHE_ATTR_LOCAL_WEIGHT, localWeightStr, null_yield);
+                        if (ret < 0) {
+                            ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): Failed to get attr: " << RGW_CACHE_ATTR_LOCAL_WEIGHT << dendl;
+                        } else {
+                            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl;
+                        }
+                        block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr);
+                        parsed = true;
+                    }
+                } //end-if file_name.starts_with
+            } //end-if parts.size() == 4 || parts.size() == 6
+            if (!parsed) {
+                ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Unable to parse file_name: " << file_name << dendl;
+                continue;
+            }
+        }//end-try
+        catch(...) {
+            ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Execption while parsing file_name: " << file_name << dendl;
+            continue;
+        }
+    }
+
+    return 0;
+}
+
 int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y)
 {
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
@@ -526,7 +679,7 @@ int SSDDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key,
 
         keylen = strlen(keyptr) + 1;
         std::string attr_name(keyptr);
-        std::string::size_type prefixloc = attr_name.find(ATTR_PREFIX);
+        std::string::size_type prefixloc = attr_name.find(RGW_ATTR_PREFIX);
         buflen -= keylen;
         keyptr += keylen;
         if (prefixloc == std::string::npos) {
index e12d1646dc60c731923a0779371d26539ae1a1a7..e610e8e3872c2a7956d4229e1040e59dcd501d44 100644 (file)
@@ -33,6 +33,8 @@ public:
   virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; }
   void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; }
 
+  virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) override;
+
 private:
   Partition partition_info;
   uint64_t free_space;
index cc5ee935eafbf243e927966f7bf4e0ce2948c6d5..f50e42bc9c7a03d3550d2f5fe06889318718b0fd 100644 (file)
@@ -179,7 +179,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
 
     boost::system::error_code ec;
     request req;
-    req.push("HGET", "RedisCache/testBucket_testName_0_0", "user.rgw.localWeight");
+    req.push("HGET", "RedisCache/testBucket_testName_0_0", RGW_CACHE_ATTR_LOCAL_WEIGHT);
     req.push("FLUSHALL");
 
     response<std::string, boost::redis::ignore_t> resp;