From: Pritha Srivastava Date: Tue, 3 Sep 2024 04:59:20 +0000 (+0530) Subject: rgw/d4n: squashing the following commits for restoring X-Git-Tag: v20.3.0~8^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a2a101ef49eed2d8256df61351e99823be08bdf6;p=ceph.git rgw/d4n: squashing the following commits for restoring in-memory data for LFUDA policy and dirty objects from cache on disk 1. rgw/d4n: restore in memory data structure for dirty objects using the xattrs in the head block of an object in the cache. 2. rgw/d4n: replacing string cache attr names with constexpr char. 3. rgw/d4n: restore LFUDA policy data from cache on disk. 4. rgw/d4n: correcting the key used while updating the LFUDA data structure for the head object block in the read path. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index d5b839288412..0f5edc2bfb0c 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -51,6 +51,19 @@ static inline void redis_exec(std::shared_ptr conn, int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) { response resp; + static auto obj_callback = [this]( + const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, + time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, + const rgw_obj_key& obj_key, optional_yield y) { + update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y); + }; + + static auto block_callback = [this]( + const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val) { + update(dpp, key, offset, len, version, dirty, y, restore_val); + }; + + cacheDriver->restore_blocks_objects(dpp, obj_callback, block_callback); driver = _driver; if (dpp->get_cct()->_conf->d4n_writecache_enabled) { @@ -298,7 +311,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional (*it->second->handle)->localWeight = it->second->localWeight; entries_heap.decrease(it->second->handle); // larger value means node must be decreased to maintain min heap - if (int ret = cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(it->second->localWeight), y) < 0) { + if (int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y) < 0) { delete victim; return ret; } @@ -345,13 +358,26 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional return 0; } -void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) +void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updating entry: " << key << dendl; using handle_type = boost::heap::fibonacci_heap>>::handle_type; const std::lock_guard l(lfuda_lock); int localWeight = age; auto entry = find_entry(key); bool updateLocalWeight = true; + + std::string oid_in_cache = key; + if (dirty == true) { + oid_in_cache = "D_" + key; + } + + if (!restore_val.empty()) { + updateLocalWeight = false; + localWeight = std::stoull(restore_val); + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl; + } + // check the dirty flag in the existing entry for the key and the incoming dirty flag. If the // incoming dirty flag is false, that means update() is invoked as part of cleaning process, // so we must not update its localWeight. @@ -369,24 +395,19 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, e->set_handle(handle); entries_map.emplace(key, e); - std::string oid_in_cache = key; - if (dirty == true) { - oid_in_cache = "D_" + key; - } - if (updateLocalWeight) { int ret = -1; - if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y)) < 0) + if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y)) < 0) ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl; } weightSum += ((localWeight < 0) ? 0 : localWeight); } -void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) +void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) { using handle_type = boost::heap::fibonacci_heap>>::handle_type; - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock." << dendl; + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl; const std::lock_guard l(lfuda_cleaning_lock); LFUDAObjEntry* e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key}; handle_type handle = object_heap.push(e); @@ -448,6 +469,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->version=" << e->version << dendl; + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_name=" << e->bucket_name << dendl; + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_id=" << e->bucket_id << dendl; + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl; + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl; l.unlock(); if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago rgw_user c_rgw_user = e->user; @@ -500,10 +525,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl; bufferlist bl; cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head - obj_attrs.erase("user.rgw.mtime"); - obj_attrs.erase("user.rgw.object_size"); - obj_attrs.erase("user.rgw.accounted_size"); - obj_attrs.erase("user.rgw.epoch"); + obj_attrs.erase(RGW_CACHE_ATTR_MTIME); + obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE); + obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE); + obj_attrs.erase(RGW_CACHE_ATTR_EPOCH); do { ceph::bufferlist data; @@ -655,7 +680,7 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y return 0; } -void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) +void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val) { const std::lock_guard l(lru_lock); _erase(dpp, key, y); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index acc9a3d1eb7b..b23ee407354d 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -15,6 +15,8 @@ namespace rgw { namespace d4n { namespace asio = boost::asio; namespace sys = boost::system; +static std::string empty = std::string(); + class CachePolicy { protected: struct Entry : public boost::intrusive::list_base_hook<> { @@ -47,7 +49,7 @@ class CachePolicy { rgw_obj_key obj_key; ObjEntry() = default; ObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, - time_t creationTime, rgw_user user, std::string& etag, + time_t creationTime, rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag), bucket_name(bucket_name), bucket_id(bucket_id), obj_key(obj_key) {} @@ -62,7 +64,7 @@ class CachePolicy { virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0; virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, - time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id, + time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) = 0; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; @@ -110,9 +112,9 @@ class LFUDAPolicy : public CachePolicy { using handle_type = boost::heap::fibonacci_heap>>::handle_type; handle_type handle; - LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, - time_t creationTime, rgw_user user, std::string& etag, - const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, + LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, + time_t creationTime, rgw_user user, const std::string& etag, + const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key) {} void set_handle(handle_type handle_) { handle = handle_; } @@ -177,11 +179,11 @@ class LFUDAPolicy : public CachePolicy { virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver); virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; void save_y(optional_yield y) { this->y = y; } virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, - time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id, + time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) override; virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y); virtual void cleaning(const DoutPrefixProvider* dpp) override; @@ -212,9 +214,9 @@ class LRUPolicy : public CachePolicy { virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; } virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override; virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, - time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id, + time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index e0b1ac779a7e..28ea81972f12 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -198,11 +198,11 @@ int D4NFilterObject::copy_object(const ACLOwner& owner, //ATTRSMOD_NONE - the attributes of the source object will be copied without modifications, attrs parameter is ignored if (attrs_mod == rgw::sal::ATTRSMOD_NONE) { baseAttrs = this->get_attrs(); - baseAttrs.erase("user.rgw.version_id"); //delete source version id + baseAttrs.erase(RGW_CACHE_ATTR_VERSION_ID); //delete source version id if (version_id) { bufferlist bl_val; bl_val.append(*version_id); - baseAttrs["user.rgw.version_id"] = std::move(bl_val); //populate destination version id + baseAttrs[RGW_CACHE_ATTR_VERSION_ID] = std::move(bl_val); //populate destination version id } } @@ -275,7 +275,7 @@ int D4NFilterObject::copy_object(const ACLOwner& owner, } bufferlist bl_val; bl_val.append(std::to_string(this->is_multipart())); - baseAttrs["user.rgw.multipart"] = std::move(bl_val); + baseAttrs[RGW_CACHE_ATTR_MULTIPART] = std::move(bl_val); bl_val.append(*etag); baseAttrs[RGW_ATTR_ETAG] = std::move(bl_val); baseAttrs[RGW_ATTR_ACL] = std::move(attrs[RGW_ATTR_ACL]); @@ -293,11 +293,13 @@ int D4NFilterObject::copy_object(const ACLOwner& owner, auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, baseAttrs.size(), y); if (ret == 0) { ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y); - baseAttrs.erase("user.rgw.mtime"); - baseAttrs.erase("user.rgw.object_size"); - baseAttrs.erase("user.rgw.accounted_size"); - baseAttrs.erase("user.rgw.epoch"); - baseAttrs.erase("user.rgw.multipart"); + baseAttrs.erase(RGW_CACHE_ATTR_MTIME); + baseAttrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE); + baseAttrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE); + baseAttrs.erase(RGW_CACHE_ATTR_EPOCH); + baseAttrs.erase(RGW_CACHE_ATTR_MULTIPART); + baseAttrs.erase(RGW_CACHE_ATTR_OBJECT_NS); + baseAttrs.erase(RGW_CACHE_ATTR_BUCKET_NAME); if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << dest_version << dendl; bufferlist bl; @@ -388,29 +390,29 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt std::string instance; for (auto& attr : attrs) { if (attr.second.length() > 0) { - if (attr.first == "user.rgw.mtime") { + if (attr.first == RGW_CACHE_ATTR_MTIME) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl; auto mtime = ceph::real_clock::from_double(std::stod(attr.second.to_str())); this->set_mtime(mtime); - } else if (attr.first == "user.rgw.object_size") { + } else if (attr.first == RGW_CACHE_ATTR_OBJECT_SIZE) { auto size = std::stoull(attr.second.to_str()); ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl; this->set_obj_size(size); - } else if (attr.first == "user.rgw.accounted_size") { + } else if (attr.first == RGW_CACHE_ATTR_ACCOUNTED_SIZE) { auto accounted_size = std::stoull(attr.second.to_str()); this->set_accounted_size(accounted_size); - } else if (attr.first == "user.rgw.epoch") { + } else if (attr.first == RGW_CACHE_ATTR_EPOCH) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl; auto epoch = std::stoull(attr.second.to_str()); this->set_epoch(epoch); - } else if (attr.first == "user.rgw.version_id") { + } else if (attr.first == RGW_CACHE_ATTR_VERSION_ID) { instance = attr.second.to_str(); ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl; - } else if (attr.first == "user.rgw.source_zone") { + } else if (attr.first == RGW_CACHE_ATTR_SOURC_ZONE) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl; auto short_zone_id = static_cast(std::stoul(attr.second.to_str())); this->set_short_zone_id(short_zone_id); - } else if (attr.first == "user.rgw.multipart") { + } else if (attr.first == RGW_CACHE_ATTR_MULTIPART) { std::string multipart = attr.second.to_str(); this->multipart = (multipart == "1") ? true : false; ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): is_multipart: " << this->multipart << " multipart: " << multipart << dendl; @@ -420,11 +422,13 @@ int D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, opt }//end-if }//end-for this->set_instance(instance); //set this only after setting object state else it won't take effect - attrs.erase("user.rgw.mtime"); - attrs.erase("user.rgw.object_size"); - attrs.erase("user.rgw.accounted_size"); - attrs.erase("user.rgw.epoch"); - attrs.erase("user.rgw.multipart"); + attrs.erase(RGW_CACHE_ATTR_MTIME); + attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE); + attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE); + attrs.erase(RGW_CACHE_ATTR_EPOCH); + attrs.erase(RGW_CACHE_ATTR_MULTIPART); + attrs.erase(RGW_CACHE_ATTR_OBJECT_NS); + attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME); /* Set attributes locally */ auto ret = this->set_attrs(attrs); if (ret < 0) { @@ -466,24 +470,30 @@ void D4NFilterObject::set_attrs_from_obj_state(const DoutPrefixProvider* dpp, op { bufferlist bl_val; bl_val.append(std::to_string(this->get_size())); - attrs["user.rgw.object_size"] = std::move(bl_val); + attrs[RGW_CACHE_ATTR_OBJECT_SIZE] = std::move(bl_val); bl_val.append(std::to_string(this->get_epoch())); - attrs["user.rgw.epoch"] = std::move(bl_val); + attrs[RGW_CACHE_ATTR_EPOCH] = std::move(bl_val); bl_val.append(std::to_string(ceph::real_clock::to_double(this->get_mtime()))); - attrs["user.rgw.mtime"] = std::move(bl_val); + attrs[RGW_CACHE_ATTR_MTIME] = std::move(bl_val); if(this->have_instance()) { bl_val.append(this->get_instance()); - attrs["user.rgw.version_id"] = std::move(bl_val); + attrs[RGW_CACHE_ATTR_VERSION_ID] = std::move(bl_val); } bl_val.append(std::to_string(this->get_short_zone_id())); - attrs["user.rgw.source_zone"] = std::move(bl_val); + attrs[RGW_CACHE_ATTR_SOURC_ZONE] = std::move(bl_val); bl_val.append(std::to_string(this->get_accounted_size())); - attrs["user.rgw.accounted_size"] = std::move(bl_val); // will this get updated? + attrs[RGW_CACHE_ATTR_ACCOUNTED_SIZE] = std::move(bl_val); // will this get updated? + + bl_val.append(this->get_key().ns); + attrs[RGW_CACHE_ATTR_OBJECT_NS] = std::move(bl_val); + + bl_val.append(this->get_bucket()->get_name()); + attrs[RGW_CACHE_ATTR_BUCKET_NAME] = std::move(bl_val); return; } @@ -731,6 +741,12 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide found_in_cache = false; ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; } + std::string key = head_oid_in_cache; + if (block.cacheObj.dirty) { + // Remove dirty prefix + key = key.erase(0, 2); + } + this->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, 0, version, block.cacheObj.dirty, y); } else if (ret == -ENOENT) { //if blockDir->get found_in_cache = false; } else { @@ -1925,8 +1941,8 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp } std::string size; - if (attrs.find("user.rgw.object_size") != attrs.end()) { - size = attrs.find("user.rgw.object_size")->second.to_str(); + if (attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE) != attrs.end()) { + size = attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE)->second.to_str(); } else { ldpp_dout(dpp, 0) << "Failed to retrieve size for for: " << block.cacheObj.objName << ", ret=" << ret << dendl; return -EINVAL; @@ -2017,7 +2033,6 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp int D4NFilterWriter::prepare(optional_yield y) { - startTime = time(NULL); d4n_writecache = g_conf()->d4n_writecache_enabled; if (!d4n_writecache) { @@ -2107,7 +2122,6 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, { bool dirty = false; std::unordered_set hostsList = {}; - auto creationTime = startTime; std::string objEtag = etag; auto size = object->get_size(); std::string instance; @@ -2180,6 +2194,9 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, ceph::real_time m_time; dirty = true; if (mtime) { + if (real_clock::is_zero(*mtime)) { + *mtime = real_clock::now(); + } m_time = *mtime; } else { m_time = real_clock::now(); @@ -2225,11 +2242,13 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y); if (ret == 0) { ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y); - attrs.erase("user.rgw.mtime"); - attrs.erase("user.rgw.object_size"); - attrs.erase("user.rgw.accounted_size"); - attrs.erase("user.rgw.epoch"); - attrs.erase("user.rgw.multipart"); + attrs.erase(RGW_CACHE_ATTR_MTIME); + attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE); + attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE); + attrs.erase(RGW_CACHE_ATTR_EPOCH); + attrs.erase(RGW_CACHE_ATTR_MULTIPART); + attrs.erase(RGW_CACHE_ATTR_OBJECT_NS); + attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME); object->set_object_version(version); if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl; @@ -2242,6 +2261,7 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, if (dirty) { //using object oid here so that version is automatically picked for versioned buckets, and for non-versioned buckets the old version is replaced by the latest version std::string object_key = obj->get_bucket()->get_bucket_id() + "_" + obj->get_oid(); + auto creationTime = ceph::real_clock::to_time_t(object->get_mtime()); ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): object_key=" << object_key << dendl; driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, object_key, version, dirty, accounted_size, creationTime, std::get(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y); } @@ -2283,7 +2303,7 @@ int D4NFilterMultipartUpload::complete(const DoutPrefixProvider *dpp, bufferlist bl_val; bool is_multipart = true; bl_val.append(std::to_string(is_multipart)); - attrs["user.rgw.multipart"] = std::move(bl_val); + attrs[RGW_CACHE_ATTR_MULTIPART] = std::move(bl_val); std::string version; d4n_target_obj->calculate_version(dpp, y, version, attrs); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 4d76df0ded4c..48e364a66acd 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -284,7 +284,6 @@ class D4NFilterWriter : public FilterWriter { bool atomic; optional_yield y; bool d4n_writecache; - time_t startTime; std::string version; public: diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index 50886e51d430..8c3491666f36 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -3,13 +3,31 @@ #include "rgw_common.h" #include "rgw_aio.h" +constexpr char RGW_CACHE_ATTR_MTIME[] = "user.rgw.mtime"; +constexpr char RGW_CACHE_ATTR_EPOCH[] = "user.rgw.epoch"; +constexpr char RGW_CACHE_ATTR_OBJECT_SIZE[] = "user.rgw.object_size"; +constexpr char RGW_CACHE_ATTR_ACCOUNTED_SIZE[] = "user.rgw.accounted_size"; +constexpr char RGW_CACHE_ATTR_MULTIPART[] = "user.rgw.multipart"; +constexpr char RGW_CACHE_ATTR_OBJECT_NS[] = "user.rgw.object_ns"; +constexpr char RGW_CACHE_ATTR_BUCKET_NAME[] = "user.rgw.bucket_name"; +constexpr char RGW_CACHE_ATTR_VERSION_ID[] = "user.rgw.version_id"; +constexpr char RGW_CACHE_ATTR_SOURC_ZONE[] = "user.rgw.source_zone"; +constexpr char RGW_CACHE_ATTR_LOCAL_WEIGHT[] = "user.rgw.localWeight"; + namespace rgw { namespace cache { +typedef std::function ObjectDataCallback; + +typedef std::function BlockDataCallback; + struct Partition { - std::string name; - std::string type; - std::string location; - uint64_t size; + std::string name; + std::string type; + std::string location; + uint64_t size; }; class CacheDriver { @@ -36,6 +54,9 @@ class CacheDriver { /* Partition */ virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) = 0; virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) = 0; + + /* Data Recovery from Cache */ + virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) = 0; }; } } // namespace rgw::cache diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index 8496a4388751..e100976cace3 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -47,7 +47,8 @@ class RedisDriver : public CacheDriver { virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) override; virtual int get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, std::string& attr_val, optional_yield y) override; void shutdown(); - + + virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) override { return 0; } private: std::shared_ptr conn; Partition partition_info; diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc index b68530efdc3b..83647c320295 100644 --- a/src/rgw/rgw_ssd_driver.cc +++ b/src/rgw/rgw_ssd_driver.cc @@ -13,8 +13,6 @@ namespace efs = std::filesystem; namespace rgw { namespace cache { -constexpr std::string_view ATTR_PREFIX = "user.rgw."; - int SSDDriver::initialize(const DoutPrefixProvider* dpp) { if(partition_info.location.back() != '/') { @@ -92,6 +90,161 @@ int SSDDriver::initialize(const DoutPrefixProvider* dpp) return 0; } +int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) +{ + if (dpp->get_cct()->_conf->rgw_d4n_l1_evict_cache_on_start) { + return 0; //don't do anything as the cache directory must have been evicted during start-up + } + for (auto const& dir_entry : std::filesystem::directory_iterator{partition_info.location}) { + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): dir_entry.path: " << dir_entry.path() << dendl; + std::string file_name = dir_entry.path().filename(); + std::vector parts; + std::string part; + bool parsed = false; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): filename: " << file_name << dendl; + try { + std::stringstream ss(file_name); + while (std::getline(ss, part, '_')) { + parts.push_back(part); + } + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): parts.size(): " << parts.size() << dendl; + //non-dirty or clean blocks - bucket_id, version, object_name in head block and offset, len in data blocks + if (parts.size() == 3 || parts.size() == 5) { + std::string key = file_name; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_name: " << file_name << dendl; + + std::string version = parts[1]; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): version: " << version << dendl; + + uint64_t offset = 0, len = 0; + if (parts.size() == 5) { + offset = std::stoull(parts[3]); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): offset: " << offset << dendl; + + len = std::stoull(parts[4]); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): len: " << len << dendl; + } + std::string localWeightStr; + auto ret = get_attr(dpp, file_name, RGW_CACHE_ATTR_LOCAL_WEIGHT, localWeightStr, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): Failed to get attr: " << RGW_CACHE_ATTR_LOCAL_WEIGHT << dendl; + } else { + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl; + } + block_func(dpp, key, offset, len, version, false, null_yield, localWeightStr); + parsed = true; + } + //dirty blocks - "D", bucket_id, version, object_name in head block and offset, len in data blocks + if ((parts.size() == 4 || parts.size() == 6) && parts[0] == "D") { + std::string prefix = "D_"; + if (file_name.starts_with(prefix)) { + std::string key = file_name.substr(prefix.length()); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key: " << key << dendl; + + bool dirty = true; + + std::string bucket_id = parts[1]; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): bucket_id: " << bucket_id << dendl; + + std::string version = parts[2]; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): version: " << version << dendl; + + std::string obj_name = parts[3]; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): obj_name: " << obj_name << dendl; + + uint64_t len = 0, offset = 0; + std::string localWeightStr; + if (parts.size() == 4) { + rgw::sal::Attrs attrs; + get_attrs(dpp, file_name, attrs, null_yield); + std::string etag, bucket_name; + uint64_t size = 0; + time_t creationTime = time_t(nullptr); + rgw_user user; + rgw_obj_key obj_key; + if (attrs.find(RGW_ATTR_ETAG) != attrs.end()) { + etag = attrs[RGW_ATTR_ETAG].to_str(); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): etag: " << etag << dendl; + } + if (attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE) != attrs.end()) { + size = std::stoull(attrs[RGW_CACHE_ATTR_OBJECT_SIZE].to_str()); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): size: " << size << dendl; + } + if (attrs.find(RGW_CACHE_ATTR_MTIME) != attrs.end()) { + creationTime = ceph::real_clock::to_time_t(ceph::real_clock::from_double(std::stod(attrs[RGW_CACHE_ATTR_MTIME].to_str()))); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): creationTime: " << creationTime << dendl; + } + if (attrs.find(RGW_ATTR_ACL) != attrs.end()) { + bufferlist bl_acl = attrs[RGW_ATTR_ACL]; + RGWAccessControlPolicy policy; + auto iter = bl_acl.cbegin(); + try { + policy.decode(iter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl; + continue; + } + user = std::get(policy.get_owner().id); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): rgw_user: " << user.to_str() << dendl; + } + obj_key.name = obj_name; + if (attrs.find(RGW_CACHE_ATTR_VERSION_ID) != attrs.end()) { + std::string instance = attrs[RGW_CACHE_ATTR_VERSION_ID].to_str(); + if (instance != "null") { + obj_key.instance = instance; + } + } + if (attrs.find(RGW_CACHE_ATTR_OBJECT_NS) != attrs.end()) { + obj_key.ns = attrs[RGW_CACHE_ATTR_OBJECT_NS].to_str(); + } + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): rgw_obj_key: " << obj_key.get_oid() << dendl; + if (attrs.find(RGW_CACHE_ATTR_BUCKET_NAME) != attrs.end()) { + bucket_name = attrs[RGW_CACHE_ATTR_BUCKET_NAME].to_str(); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): bucket_name: " << bucket_name << dendl; + } + + if (attrs.find(RGW_CACHE_ATTR_LOCAL_WEIGHT) != attrs.end()) { + localWeightStr = attrs[RGW_CACHE_ATTR_LOCAL_WEIGHT].to_str(); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl; + } + + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): calling func for: " << key << dendl; + obj_func(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield); + block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr); + parsed = true; + } //end-if part.size() == 4 + if (parts.size() == 6) { + offset = std::stoull(parts[4]); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): offset: " << offset << dendl; + + len = std::stoull(parts[5]); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): len: " << len << dendl; + std::string localWeightStr; + auto ret = get_attr(dpp, file_name, RGW_CACHE_ATTR_LOCAL_WEIGHT, localWeightStr, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): Failed to get attr: " << RGW_CACHE_ATTR_LOCAL_WEIGHT << dendl; + } else { + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl; + } + block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr); + parsed = true; + } + } //end-if file_name.starts_with + } //end-if parts.size() == 4 || parts.size() == 6 + if (!parsed) { + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Unable to parse file_name: " << file_name << dendl; + continue; + } + }//end-try + catch(...) { + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Execption while parsing file_name: " << file_name << dendl; + continue; + } + } + + return 0; +} + int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) { ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl; @@ -526,7 +679,7 @@ int SSDDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, keylen = strlen(keyptr) + 1; std::string attr_name(keyptr); - std::string::size_type prefixloc = attr_name.find(ATTR_PREFIX); + std::string::size_type prefixloc = attr_name.find(RGW_ATTR_PREFIX); buflen -= keylen; keyptr += keylen; if (prefixloc == std::string::npos) { diff --git a/src/rgw/rgw_ssd_driver.h b/src/rgw/rgw_ssd_driver.h index e12d1646dc60..e610e8e3872c 100644 --- a/src/rgw/rgw_ssd_driver.h +++ b/src/rgw/rgw_ssd_driver.h @@ -33,6 +33,8 @@ public: virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; } + virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) override; + private: Partition partition_info; uint64_t free_space; diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index cc5ee935eafb..f50e42bc9c7a 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -179,7 +179,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) boost::system::error_code ec; request req; - req.push("HGET", "RedisCache/testBucket_testName_0_0", "user.rgw.localWeight"); + req.push("HGET", "RedisCache/testBucket_testName_0_0", RGW_CACHE_ATTR_LOCAL_WEIGHT); req.push("FLUSHALL"); response resp;