int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) {
response<int, int, int, int> resp;
+ static auto obj_callback = [this](
+ const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
+ time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+ const rgw_obj_key& obj_key, optional_yield y) {
+ update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y);
+ };
+
+ static auto block_callback = [this](
+ const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val) {
+ update(dpp, key, offset, len, version, dirty, y, restore_val);
+ };
+
+ cacheDriver->restore_blocks_objects(dpp, obj_callback, block_callback);
driver = _driver;
if (dpp->get_cct()->_conf->d4n_writecache_enabled) {
(*it->second->handle)->localWeight = it->second->localWeight;
entries_heap.decrease(it->second->handle); // larger value means node must be decreased to maintain min heap
- if (int ret = cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(it->second->localWeight), y) < 0) {
+ if (int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y) < 0) {
delete victim;
return ret;
}
return 0;
}
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val)
{
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updating entry: " << key << dendl;
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
const std::lock_guard l(lfuda_lock);
int localWeight = age;
auto entry = find_entry(key);
bool updateLocalWeight = true;
+
+ std::string oid_in_cache = key;
+ if (dirty == true) {
+ oid_in_cache = "D_" + key;
+ }
+
+ if (!restore_val.empty()) {
+ updateLocalWeight = false;
+ localWeight = std::stoull(restore_val);
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl;
+ }
+
// check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
// incoming dirty flag is false, that means update() is invoked as part of cleaning process,
// so we must not update its localWeight.
e->set_handle(handle);
entries_map.emplace(key, e);
- std::string oid_in_cache = key;
- if (dirty == true) {
- oid_in_cache = "D_" + key;
- }
-
if (updateLocalWeight) {
int ret = -1;
- if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y)) < 0)
+ if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y)) < 0)
ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl;
}
weightSum += ((localWeight < 0) ? 0 : localWeight);
}
-void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
+void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
{
using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock." << dendl;
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl;
const std::lock_guard l(lfuda_cleaning_lock);
LFUDAObjEntry* e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
handle_type handle = object_heap.push(e);
ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->version=" << e->version << dendl;
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_name=" << e->bucket_name << dendl;
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_id=" << e->bucket_id << dendl;
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl;
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl;
l.unlock();
if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
rgw_user c_rgw_user = e->user;
ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
bufferlist bl;
cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
- obj_attrs.erase("user.rgw.mtime");
- obj_attrs.erase("user.rgw.object_size");
- obj_attrs.erase("user.rgw.accounted_size");
- obj_attrs.erase("user.rgw.epoch");
+ obj_attrs.erase(RGW_CACHE_ATTR_MTIME);
+ obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+ obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+ obj_attrs.erase(RGW_CACHE_ATTR_EPOCH);
do {
ceph::bufferlist data;
return 0;
}
-void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val)
{
const std::lock_guard l(lru_lock);
_erase(dpp, key, y);
namespace asio = boost::asio;
namespace sys = boost::system;
+static std::string empty = std::string();
+
class CachePolicy {
protected:
struct Entry : public boost::intrusive::list_base_hook<> {
rgw_obj_key obj_key;
ObjEntry() = default;
ObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size,
- time_t creationTime, rgw_user user, std::string& etag,
+ time_t creationTime, rgw_user user, const std::string& etag,
const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size),
creationTime(creationTime), user(user), etag(etag),
bucket_name(bucket_name), bucket_id(bucket_id), obj_key(obj_key) {}
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0;
virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
- time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+ time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
const rgw_obj_key& obj_key, optional_yield y) = 0;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
handle_type handle;
- LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size,
- time_t creationTime, rgw_user user, std::string& etag,
- const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size,
+ LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size,
+ time_t creationTime, rgw_user user, const std::string& etag,
+ const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size,
creationTime, user, etag, bucket_name, bucket_id, obj_key) {}
void set_handle(handle_type handle_) { handle = handle_; }
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
void save_y(optional_yield y) { this->y = y; }
virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
- time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+ time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
const rgw_obj_key& obj_key, optional_yield y) override;
virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
virtual void cleaning(const DoutPrefixProvider* dpp) override;
virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
- time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+ time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
const rgw_obj_key& obj_key, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
//ATTRSMOD_NONE - the attributes of the source object will be copied without modifications, attrs parameter is ignored
if (attrs_mod == rgw::sal::ATTRSMOD_NONE) {
baseAttrs = this->get_attrs();
- baseAttrs.erase("user.rgw.version_id"); //delete source version id
+ baseAttrs.erase(RGW_CACHE_ATTR_VERSION_ID); //delete source version id
if (version_id) {
bufferlist bl_val;
bl_val.append(*version_id);
- baseAttrs["user.rgw.version_id"] = std::move(bl_val); //populate destination version id
+ baseAttrs[RGW_CACHE_ATTR_VERSION_ID] = std::move(bl_val); //populate destination version id
}
}
}
bufferlist bl_val;
bl_val.append(std::to_string(this->is_multipart()));
- baseAttrs["user.rgw.multipart"] = std::move(bl_val);
+ baseAttrs[RGW_CACHE_ATTR_MULTIPART] = std::move(bl_val);
bl_val.append(*etag);
baseAttrs[RGW_ATTR_ETAG] = std::move(bl_val);
baseAttrs[RGW_ATTR_ACL] = std::move(attrs[RGW_ATTR_ACL]);
auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, baseAttrs.size(), y);
if (ret == 0) {
ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl_data, 0, baseAttrs, y);
- baseAttrs.erase("user.rgw.mtime");
- baseAttrs.erase("user.rgw.object_size");
- baseAttrs.erase("user.rgw.accounted_size");
- baseAttrs.erase("user.rgw.epoch");
- baseAttrs.erase("user.rgw.multipart");
+ baseAttrs.erase(RGW_CACHE_ATTR_MTIME);
+ baseAttrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+ baseAttrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+ baseAttrs.erase(RGW_CACHE_ATTR_EPOCH);
+ baseAttrs.erase(RGW_CACHE_ATTR_MULTIPART);
+ baseAttrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+ baseAttrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << dest_version << dendl;
bufferlist bl;
std::string instance;
for (auto& attr : attrs) {
if (attr.second.length() > 0) {
- if (attr.first == "user.rgw.mtime") {
+ if (attr.first == RGW_CACHE_ATTR_MTIME) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
auto mtime = ceph::real_clock::from_double(std::stod(attr.second.to_str()));
this->set_mtime(mtime);
- } else if (attr.first == "user.rgw.object_size") {
+ } else if (attr.first == RGW_CACHE_ATTR_OBJECT_SIZE) {
auto size = std::stoull(attr.second.to_str());
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
this->set_obj_size(size);
- } else if (attr.first == "user.rgw.accounted_size") {
+ } else if (attr.first == RGW_CACHE_ATTR_ACCOUNTED_SIZE) {
auto accounted_size = std::stoull(attr.second.to_str());
this->set_accounted_size(accounted_size);
- } else if (attr.first == "user.rgw.epoch") {
+ } else if (attr.first == RGW_CACHE_ATTR_EPOCH) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
auto epoch = std::stoull(attr.second.to_str());
this->set_epoch(epoch);
- } else if (attr.first == "user.rgw.version_id") {
+ } else if (attr.first == RGW_CACHE_ATTR_VERSION_ID) {
instance = attr.second.to_str();
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl;
- } else if (attr.first == "user.rgw.source_zone") {
+ } else if (attr.first == RGW_CACHE_ATTR_SOURC_ZONE) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.to_str()));
this->set_short_zone_id(short_zone_id);
- } else if (attr.first == "user.rgw.multipart") {
+ } else if (attr.first == RGW_CACHE_ATTR_MULTIPART) {
std::string multipart = attr.second.to_str();
this->multipart = (multipart == "1") ? true : false;
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): is_multipart: " << this->multipart << " multipart: " << multipart << dendl;
}//end-if
}//end-for
this->set_instance(instance); //set this only after setting object state else it won't take effect
- attrs.erase("user.rgw.mtime");
- attrs.erase("user.rgw.object_size");
- attrs.erase("user.rgw.accounted_size");
- attrs.erase("user.rgw.epoch");
- attrs.erase("user.rgw.multipart");
+ attrs.erase(RGW_CACHE_ATTR_MTIME);
+ attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+ attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+ attrs.erase(RGW_CACHE_ATTR_EPOCH);
+ attrs.erase(RGW_CACHE_ATTR_MULTIPART);
+ attrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+ attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
/* Set attributes locally */
auto ret = this->set_attrs(attrs);
if (ret < 0) {
{
bufferlist bl_val;
bl_val.append(std::to_string(this->get_size()));
- attrs["user.rgw.object_size"] = std::move(bl_val);
+ attrs[RGW_CACHE_ATTR_OBJECT_SIZE] = std::move(bl_val);
bl_val.append(std::to_string(this->get_epoch()));
- attrs["user.rgw.epoch"] = std::move(bl_val);
+ attrs[RGW_CACHE_ATTR_EPOCH] = std::move(bl_val);
bl_val.append(std::to_string(ceph::real_clock::to_double(this->get_mtime())));
- attrs["user.rgw.mtime"] = std::move(bl_val);
+ attrs[RGW_CACHE_ATTR_MTIME] = std::move(bl_val);
if(this->have_instance()) {
bl_val.append(this->get_instance());
- attrs["user.rgw.version_id"] = std::move(bl_val);
+ attrs[RGW_CACHE_ATTR_VERSION_ID] = std::move(bl_val);
}
bl_val.append(std::to_string(this->get_short_zone_id()));
- attrs["user.rgw.source_zone"] = std::move(bl_val);
+ attrs[RGW_CACHE_ATTR_SOURC_ZONE] = std::move(bl_val);
bl_val.append(std::to_string(this->get_accounted_size()));
- attrs["user.rgw.accounted_size"] = std::move(bl_val); // will this get updated?
+ attrs[RGW_CACHE_ATTR_ACCOUNTED_SIZE] = std::move(bl_val); // will this get updated?
+
+ bl_val.append(this->get_key().ns);
+ attrs[RGW_CACHE_ATTR_OBJECT_NS] = std::move(bl_val);
+
+ bl_val.append(this->get_bucket()->get_name());
+ attrs[RGW_CACHE_ATTR_BUCKET_NAME] = std::move(bl_val);
return;
}
found_in_cache = false;
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
}
+ std::string key = head_oid_in_cache;
+ if (block.cacheObj.dirty) {
+ // Remove dirty prefix
+ key = key.erase(0, 2);
+ }
+ this->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, 0, version, block.cacheObj.dirty, y);
} else if (ret == -ENOENT) { //if blockDir->get
found_in_cache = false;
} else {
}
std::string size;
- if (attrs.find("user.rgw.object_size") != attrs.end()) {
- size = attrs.find("user.rgw.object_size")->second.to_str();
+ if (attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE) != attrs.end()) {
+ size = attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE)->second.to_str();
} else {
ldpp_dout(dpp, 0) << "Failed to retrieve size for for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
return -EINVAL;
int D4NFilterWriter::prepare(optional_yield y)
{
- startTime = time(NULL);
d4n_writecache = g_conf()->d4n_writecache_enabled;
if (!d4n_writecache) {
{
bool dirty = false;
std::unordered_set<std::string> hostsList = {};
- auto creationTime = startTime;
std::string objEtag = etag;
auto size = object->get_size();
std::string instance;
ceph::real_time m_time;
dirty = true;
if (mtime) {
+ if (real_clock::is_zero(*mtime)) {
+ *mtime = real_clock::now();
+ }
m_time = *mtime;
} else {
m_time = real_clock::now();
ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
if (ret == 0) {
ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
- attrs.erase("user.rgw.mtime");
- attrs.erase("user.rgw.object_size");
- attrs.erase("user.rgw.accounted_size");
- attrs.erase("user.rgw.epoch");
- attrs.erase("user.rgw.multipart");
+ attrs.erase(RGW_CACHE_ATTR_MTIME);
+ attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+ attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+ attrs.erase(RGW_CACHE_ATTR_EPOCH);
+ attrs.erase(RGW_CACHE_ATTR_MULTIPART);
+ attrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+ attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
object->set_object_version(version);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl;
if (dirty) {
//using object oid here so that version is automatically picked for versioned buckets, and for non-versioned buckets the old version is replaced by the latest version
std::string object_key = obj->get_bucket()->get_bucket_id() + "_" + obj->get_oid();
+ auto creationTime = ceph::real_clock::to_time_t(object->get_mtime());
ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): object_key=" << object_key << dendl;
driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, object_key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y);
}
bufferlist bl_val;
bool is_multipart = true;
bl_val.append(std::to_string(is_multipart));
- attrs["user.rgw.multipart"] = std::move(bl_val);
+ attrs[RGW_CACHE_ATTR_MULTIPART] = std::move(bl_val);
std::string version;
d4n_target_obj->calculate_version(dpp, y, version, attrs);
bool atomic;
optional_yield y;
bool d4n_writecache;
- time_t startTime;
std::string version;
public:
#include "rgw_common.h"
#include "rgw_aio.h"
+constexpr char RGW_CACHE_ATTR_MTIME[] = "user.rgw.mtime";
+constexpr char RGW_CACHE_ATTR_EPOCH[] = "user.rgw.epoch";
+constexpr char RGW_CACHE_ATTR_OBJECT_SIZE[] = "user.rgw.object_size";
+constexpr char RGW_CACHE_ATTR_ACCOUNTED_SIZE[] = "user.rgw.accounted_size";
+constexpr char RGW_CACHE_ATTR_MULTIPART[] = "user.rgw.multipart";
+constexpr char RGW_CACHE_ATTR_OBJECT_NS[] = "user.rgw.object_ns";
+constexpr char RGW_CACHE_ATTR_BUCKET_NAME[] = "user.rgw.bucket_name";
+constexpr char RGW_CACHE_ATTR_VERSION_ID[] = "user.rgw.version_id";
+constexpr char RGW_CACHE_ATTR_SOURC_ZONE[] = "user.rgw.source_zone";
+constexpr char RGW_CACHE_ATTR_LOCAL_WEIGHT[] = "user.rgw.localWeight";
+
namespace rgw { namespace cache {
+typedef std::function<void(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
+ time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+ const rgw_obj_key& obj_key, optional_yield y)> ObjectDataCallback;
+
+typedef std::function<void(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version,
+ bool dirty, optional_yield y, std::string& restore_val)> BlockDataCallback;
+
struct Partition {
- std::string name;
- std::string type;
- std::string location;
- uint64_t size;
+ std::string name;
+ std::string type;
+ std::string location;
+ uint64_t size;
};
class CacheDriver {
/* Partition */
virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) = 0;
virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) = 0;
+
+ /* Data Recovery from Cache */
+ virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) = 0;
};
} } // namespace rgw::cache
virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) override;
virtual int get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, std::string& attr_val, optional_yield y) override;
void shutdown();
-
+
+ virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) override { return 0; }
private:
std::shared_ptr<connection> conn;
Partition partition_info;
namespace rgw { namespace cache {
-constexpr std::string_view ATTR_PREFIX = "user.rgw.";
-
int SSDDriver::initialize(const DoutPrefixProvider* dpp)
{
if(partition_info.location.back() != '/') {
return 0;
}
+int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func)
+{
+ if (dpp->get_cct()->_conf->rgw_d4n_l1_evict_cache_on_start) {
+ return 0; //don't do anything as the cache directory must have been evicted during start-up
+ }
+ for (auto const& dir_entry : std::filesystem::directory_iterator{partition_info.location}) {
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): dir_entry.path: " << dir_entry.path() << dendl;
+ std::string file_name = dir_entry.path().filename();
+ std::vector<std::string> parts;
+ std::string part;
+ bool parsed = false;
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): filename: " << file_name << dendl;
+ try {
+ std::stringstream ss(file_name);
+ while (std::getline(ss, part, '_')) {
+ parts.push_back(part);
+ }
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): parts.size(): " << parts.size() << dendl;
+ //non-dirty or clean blocks - bucket_id, version, object_name in head block and offset, len in data blocks
+ if (parts.size() == 3 || parts.size() == 5) {
+ std::string key = file_name;
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_name: " << file_name << dendl;
+
+ std::string version = parts[1];
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): version: " << version << dendl;
+
+ uint64_t offset = 0, len = 0;
+ if (parts.size() == 5) {
+ offset = std::stoull(parts[3]);
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): offset: " << offset << dendl;
+
+ len = std::stoull(parts[4]);
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): len: " << len << dendl;
+ }
+ std::string localWeightStr;
+ auto ret = get_attr(dpp, file_name, RGW_CACHE_ATTR_LOCAL_WEIGHT, localWeightStr, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): Failed to get attr: " << RGW_CACHE_ATTR_LOCAL_WEIGHT << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl;
+ }
+ block_func(dpp, key, offset, len, version, false, null_yield, localWeightStr);
+ parsed = true;
+ }
+ //dirty blocks - "D", bucket_id, version, object_name in head block and offset, len in data blocks
+ if ((parts.size() == 4 || parts.size() == 6) && parts[0] == "D") {
+ std::string prefix = "D_";
+ if (file_name.starts_with(prefix)) {
+ std::string key = file_name.substr(prefix.length());
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key: " << key << dendl;
+
+ bool dirty = true;
+
+ std::string bucket_id = parts[1];
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): bucket_id: " << bucket_id << dendl;
+
+ std::string version = parts[2];
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): version: " << version << dendl;
+
+ std::string obj_name = parts[3];
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): obj_name: " << obj_name << dendl;
+
+ uint64_t len = 0, offset = 0;
+ std::string localWeightStr;
+ if (parts.size() == 4) {
+ rgw::sal::Attrs attrs;
+ get_attrs(dpp, file_name, attrs, null_yield);
+ std::string etag, bucket_name;
+ uint64_t size = 0;
+ time_t creationTime = time_t(nullptr);
+ rgw_user user;
+ rgw_obj_key obj_key;
+ if (attrs.find(RGW_ATTR_ETAG) != attrs.end()) {
+ etag = attrs[RGW_ATTR_ETAG].to_str();
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): etag: " << etag << dendl;
+ }
+ if (attrs.find(RGW_CACHE_ATTR_OBJECT_SIZE) != attrs.end()) {
+ size = std::stoull(attrs[RGW_CACHE_ATTR_OBJECT_SIZE].to_str());
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): size: " << size << dendl;
+ }
+ if (attrs.find(RGW_CACHE_ATTR_MTIME) != attrs.end()) {
+ creationTime = ceph::real_clock::to_time_t(ceph::real_clock::from_double(std::stod(attrs[RGW_CACHE_ATTR_MTIME].to_str())));
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): creationTime: " << creationTime << dendl;
+ }
+ if (attrs.find(RGW_ATTR_ACL) != attrs.end()) {
+ bufferlist bl_acl = attrs[RGW_ATTR_ACL];
+ RGWAccessControlPolicy policy;
+ auto iter = bl_acl.cbegin();
+ try {
+ policy.decode(iter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
+ continue;
+ }
+ user = std::get<rgw_user>(policy.get_owner().id);
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): rgw_user: " << user.to_str() << dendl;
+ }
+ obj_key.name = obj_name;
+ if (attrs.find(RGW_CACHE_ATTR_VERSION_ID) != attrs.end()) {
+ std::string instance = attrs[RGW_CACHE_ATTR_VERSION_ID].to_str();
+ if (instance != "null") {
+ obj_key.instance = instance;
+ }
+ }
+ if (attrs.find(RGW_CACHE_ATTR_OBJECT_NS) != attrs.end()) {
+ obj_key.ns = attrs[RGW_CACHE_ATTR_OBJECT_NS].to_str();
+ }
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): rgw_obj_key: " << obj_key.get_oid() << dendl;
+ if (attrs.find(RGW_CACHE_ATTR_BUCKET_NAME) != attrs.end()) {
+ bucket_name = attrs[RGW_CACHE_ATTR_BUCKET_NAME].to_str();
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): bucket_name: " << bucket_name << dendl;
+ }
+
+ if (attrs.find(RGW_CACHE_ATTR_LOCAL_WEIGHT) != attrs.end()) {
+ localWeightStr = attrs[RGW_CACHE_ATTR_LOCAL_WEIGHT].to_str();
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl;
+ }
+
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): calling func for: " << key << dendl;
+ obj_func(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield);
+ block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr);
+ parsed = true;
+ } //end-if part.size() == 4
+ if (parts.size() == 6) {
+ offset = std::stoull(parts[4]);
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): offset: " << offset << dendl;
+
+ len = std::stoull(parts[5]);
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): len: " << len << dendl;
+ std::string localWeightStr;
+ auto ret = get_attr(dpp, file_name, RGW_CACHE_ATTR_LOCAL_WEIGHT, localWeightStr, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): Failed to get attr: " << RGW_CACHE_ATTR_LOCAL_WEIGHT << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl;
+ }
+ block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr);
+ parsed = true;
+ }
+ } //end-if file_name.starts_with
+ } //end-if parts.size() == 4 || parts.size() == 6
+ if (!parsed) {
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Unable to parse file_name: " << file_name << dendl;
+ continue;
+ }
+ }//end-try
+ catch(...) {
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Execption while parsing file_name: " << file_name << dendl;
+ continue;
+ }
+ }
+
+ return 0;
+}
+
int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
keylen = strlen(keyptr) + 1;
std::string attr_name(keyptr);
- std::string::size_type prefixloc = attr_name.find(ATTR_PREFIX);
+ std::string::size_type prefixloc = attr_name.find(RGW_ATTR_PREFIX);
buflen -= keylen;
keyptr += keylen;
if (prefixloc == std::string::npos) {
virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; }
void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; }
+ virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) override;
+
private:
Partition partition_info;
uint64_t free_space;
boost::system::error_code ec;
request req;
- req.push("HGET", "RedisCache/testBucket_testName_0_0", "user.rgw.localWeight");
+ req.push("HGET", "RedisCache/testBucket_testName_0_0", RGW_CACHE_ATTR_LOCAL_WEIGHT);
req.push("FLUSHALL");
response<std::string, boost::redis::ignore_t> resp;