victim->blockID = entries_heap.top()->offset;
victim->size = entries_heap.top()->len;
- if (dir->get(victim, y) < 0) {
+ if (dir->get(dpp, victim, y) < 0) {
return nullptr;
}
while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop?
CacheBlock* victim = get_victim_block(dpp, y);
-
+
if (victim == nullptr) {
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl;
delete victim;
delete victim;
return -ENOENT;
}
-
+ // check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty
+ if (it->second->dirty) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl;
+ return -ENOENT;
+ }
int avgWeight = weightSum / entries_map.size();
if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */
}
victim->globalWeight = 0;
- if (int ret = dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) {
+ auto globalWeight = std::to_string(victim->globalWeight);
+ if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) {
delete victim;
return ret;
}
}
victim->globalWeight += it->second->localWeight;
- if (int ret = dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) {
+ auto globalWeight = std::to_string(victim->globalWeight);
+ if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) {
delete victim;
return ret;
}
- if (int ret = dir->remove_host(victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
+ if (int ret = dir->remove_host(dpp, victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
delete victim;
return ret;
}
return 0;
}
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y)
{
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
const std::lock_guard l(lfuda_lock);
int localWeight = age;
auto entry = find_entry(key);
- if (entry != nullptr) {
- localWeight = entry->localWeight + age;
+ bool updateLocalWeight = true;
+ // check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
+ // incoming dirty flag is false, that means update() is invoked as part of cleaning process,
+ // so we must not update its localWeight.
+ if (entry != nullptr) {
+ if (entry->dirty && !dirty) {
+ localWeight = entry->localWeight;
+ updateLocalWeight = false;
+ } else {
+ localWeight = entry->localWeight + age;
+ }
}
-
erase(dpp, key, y);
-
- LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, creationTime, user, localWeight);
+ LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, localWeight);
handle_type handle = entries_heap.push(e);
e->set_handle(handle);
entries_map.emplace(key, e);
std::string oid_in_cache = key;
- if (dirty == true)
- oid_in_cache = "D_"+key;
+ if (dirty == true) {
+ oid_in_cache = "D_" + key;
+ }
- if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0)
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
+ if (updateLocalWeight) {
+ if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0)
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
+ }
weightSum += ((localWeight < 0) ? 0 : localWeight);
}
-void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y)
{
- eraseObj(dpp, key, y);
-
- const std::lock_guard l(lfuda_lock);
- LFUDAObjEntry *e = new LFUDAObjEntry(key, version, dirty, size, creationTime, user, etag);
+ using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock." << dendl;
+ const std::lock_guard l(lfuda_cleaning_lock);
+ LFUDAObjEntry *e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key};
+ handle_type handle = object_heap.push(e);
+ e->set_handle(handle);
o_entries_map.emplace(key, e);
+ cond.notify_one();
}
-
bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
auto p = entries_map.find(key);
bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
- const std::lock_guard l(lfuda_lock);
+ const std::lock_guard l(lfuda_cleaning_lock);
auto p = o_entries_map.find(key);
if (p == o_entries_map.end()) {
return false;
}
+ object_heap.erase(p->second->handle);
o_entries_map.erase(p);
+ delete p->second;
return true;
}
void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
{
const int interval = cct->_conf->rgw_d4n_cache_cleaning_interval;
- while(true){
+ while(!quit) {
ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl;
std::string name = "";
std::string b_name = "";
std::string key = "";
uint64_t len = 0;
rgw::sal::Attrs obj_attrs;
- int count = 0;
-
- for (auto it = o_entries_map.begin(); it != o_entries_map.end(); it++){
- if ((it->second->dirty == true) && (std::difftime(time(NULL), it->second->creationTime) > interval)){ //if block is dirty and written more than interval seconds ago
- name = it->first;
- rgw_user c_rgw_user = it->second->user;
-
- size_t pos = 0;
- std::string delimiter = "_";
- while ((pos = name.find(delimiter)) != std::string::npos) {
- if (count == 0){
- b_name = name.substr(0, pos);
- name.erase(0, pos + delimiter.length());
- }
- count ++;
- }
- key = name;
-
- //writing data to the backend
- //we need to create an atomic_writer
- rgw_obj_key c_obj_key = rgw_obj_key(key);
- std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
-
- std::unique_ptr<rgw::sal::Bucket> c_bucket;
- rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, b_name, "");
-
- RGWBucketInfo c_bucketinfo;
- c_bucketinfo.bucket = c_rgw_bucket;
- c_bucketinfo.owner = c_rgw_user;
-
-
- int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
- if (ret < 0) {
- ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
- break;
+
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock." << dendl;
+ std::unique_lock<std::mutex> l(lfuda_cleaning_lock);
+ LFUDAObjEntry* e;
+ if (object_heap.size() > 0) {
+ e = object_heap.top();
+ } else {
+ cond.wait(l, [this]{ return (!object_heap.empty() || quit); });
+ continue;
+ }
+ ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl;
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl;
+ l.unlock();
+ if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
+ name = e->key;
+ rgw_user c_rgw_user = e->user;
+
+ size_t pos = 0;
+ std::string delimiter = "_";
+ int count = 0;
+ while ((pos = name.find(delimiter)) != std::string::npos) {
+ if (count == 0) {
+ b_name = name.substr(0, pos);
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): b_name=" << b_name << dendl;
+ name.erase(0, pos + delimiter.length());
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): name=" << name << dendl;
+ break;
}
+ count++;
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): count=" << b_name << dendl;
+ }
+ key = name;
+ //writing data to the backend
+ //we need to create an atomic_writer
+ std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
+
+ std::unique_ptr<rgw::sal::Bucket> c_bucket;
+ rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, "");
+
+ RGWBucketInfo c_bucketinfo;
+ c_bucketinfo.bucket = c_rgw_bucket;
+ c_bucketinfo.owner = c_rgw_user;
+ int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
+ break;
+ }
+
+ std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(e->obj_key);
- std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(c_obj_key);
+ ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
- ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
+ std::unique_ptr<rgw::sal::Writer> processor = driver->get_atomic_writer(dpp,
+ null_yield,
+ c_obj.get(),
+ owner,
+ NULL,
+ 0,
+ "");
- std::unique_ptr<rgw::sal::Writer> processor = driver->get_atomic_writer(dpp,
- null_yield,
- c_obj.get(),
- owner,
- NULL,
- 0,
- "");
+ int op_ret = processor->prepare(null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
+ break;
+ }
- int op_ret = processor->prepare(null_yield);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << "processor->prepare() returned ret=" << op_ret << dendl;
- break;
- }
+ std::string prefix = b_name + "_" + e->version + "_" + c_obj->get_name();
+ off_t lst = e->size;
+ off_t fst = 0;
+ off_t ofs = 0;
+
+ rgw::sal::DataProcessor *filter = processor.get();
+ std::string head_oid_in_cache = "D_" + prefix;
+ std::string new_head_oid_in_cache = prefix;
+ ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl;
+ ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
+ bufferlist bl;
+ cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
+ obj_attrs.erase("user.rgw.mtime");
+ obj_attrs.erase("user.rgw.object_size");
+ obj_attrs.erase("user.rgw.accounted_size");
+ obj_attrs.erase("user.rgw.epoch");
+
+ do {
+ ceph::bufferlist data;
+ if (fst >= lst){
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+ std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+ ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl;
+ rgw::sal::Attrs attrs;
+ cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield);
+ len = data.length();
+ fst += len;
+
+ if (len == 0) {
+ // TODO: if len of any block is 0 for some reason, we must return from here?
+ break;
+ }
- std::string prefix = b_name+"_"+key;
- off_t lst = it->second->size;
- off_t fst = 0;
- off_t ofs = 0;
+ op_ret = filter->process(std::move(data), ofs);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
+ << op_ret << dendl;
+ return;
+ }
-
- rgw::sal::DataProcessor *filter = processor.get();
- do {
- ceph::bufferlist data;
- if (fst >= lst){
- break;
- }
- off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
- off_t cur_len = cur_size - fst;
- std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
- std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
- cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, obj_attrs, null_yield);
- len = data.length();
- fst += len;
-
- if (len == 0) {
- break;
- }
-
- op_ret = filter->process(std::move(data), ofs);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << "processor->process() returned ret="
- << op_ret << dendl;
- return;
- }
-
- rgw::d4n::CacheBlock block;
- block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
- block.cacheObj.objName = c_obj->get_key().get_oid();
- block.size = len;
- block.blockID = ofs;
- op_ret = dir->update_field(&block, "dirty", "false", null_yield);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << "updating dirty flag in Block directory failed!" << dendl;
- return;
- }
-
- cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
-
- ofs += len;
- } while (len > 0);
-
- op_ret = filter->process({}, ofs);
-
- const req_context rctx{dpp, null_yield, nullptr};
- ceph::real_time mtime = ceph::real_clock::from_time_t(it->second->creationTime);
- op_ret = processor->complete(lst, it->second->etag, &mtime, ceph::real_clock::from_time_t(it->second->creationTime), obj_attrs,
- std::nullopt, ceph::real_time(), nullptr, nullptr,
- nullptr, nullptr, nullptr,
- rctx, rgw::sal::FLAG_LOG_OP);
-
- //data is clean now, updating in-memory metadata
- it->second->dirty = false;
+ ofs += len;
+ } while (len > 0);
+
+ op_ret = filter->process({}, ofs);
+
+ const req_context rctx{dpp, null_yield, nullptr};
+ ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime);
+ op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs,
+ std::nullopt, ceph::real_time(), nullptr, nullptr,
+ nullptr, nullptr, nullptr,
+ rctx, rgw::sal::FLAG_LOG_OP);
+
+ //invoke update() with dirty flag set to false, to update in-memory metadata for each block
+ // reset values
+ lst = e->size;
+ fst = 0;
+ do {
+ if (fst >= lst) {
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+
+ std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+ ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl;
+ std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+ //Rename block to remove "D" prefix
+ cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
+ //Update in-memory data structure for each block
+ this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y);
+
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
+ block.cacheObj.objName = c_obj->get_key().get_oid();
+ block.size = cur_len;
+ block.blockID = fst;
+ std::string dirty = "false";
+ op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 5) << __func__ << "updating dirty flag in Block directory failed!" << dendl;
+ return;
+ }
+ fst += cur_len;
+ } while(fst < lst);
+
+ cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
+ //data is clean now, updating in-memory metadata for an object
+ e->dirty = false;
+ //invoke update() with dirty flag set to false, to update in-memory metadata for head
+ this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
+
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
+ block.cacheObj.objName = c_obj->get_name();
+ block.size = 0;
+ block.blockID = 0;
+ std::string dirty = "false";
+ op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
+ return;
}
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(interval));
- }
+ //remove entry from map and queue, eraseObj locks correctly
+ eraseObj(dpp, e->key, null_yield);
+ } else { //end-if std::difftime(time(NULL), e->creationTime) > interval
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised?
+ }
+ } //end-while true
}
return 0;
}
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y)
{
const std::lock_guard l(lru_lock);
_erase(dpp, key, y);
- Entry *e = new Entry(key, offset, len, version, dirty, creationTime, user);
+ Entry* e = new Entry(key, offset, len, version, dirty);
entries_lru_list.push_back(*e);
entries_map.emplace(key, e);
}
-void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y)
{
- eraseObj(dpp, key, y);
const std::lock_guard l(lru_lock);
- ObjEntry *e = new ObjEntry(key, version, dirty, size, creationTime, user, etag);
+ ObjEntry* e = new ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key);
o_entries_map.emplace(key, e);
return;
}
return _erase(dpp, key, y);
}
-bool LRUPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
- auto p = entries_map.find(key);
- if (p == entries_map.end()) {
+ const std::lock_guard l(lru_lock);
+ auto p = o_entries_map.find(key);
+ if (p == o_entries_map.end()) {
return false;
}
- entries_map.erase(p);
- entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
+ o_entries_map.erase(p);
return true;
}
-bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+bool LRUPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
- const std::lock_guard l(lru_lock);
- auto p = o_entries_map.find(key);
- if (p == o_entries_map.end()) {
+ auto p = entries_map.find(key);
+ if (p == entries_map.end()) {
return false;
}
- o_entries_map.erase(p);
+ entries_map.erase(p);
+ entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
return true;
}
uint64_t len;
std::string version;
bool dirty;
- time_t creationTime;
- rgw_user user;
- Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, rgw_user user) : key(key), offset(offset),
- len(len), version(version), dirty(dirty), creationTime(creationTime), user(user) {}
+ Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty) : key(key), offset(offset),
+ len(len), version(version), dirty(dirty) {}
};
//The disposer object function
struct Entry_delete_disposer {
- void operator()(Entry *e) {
+ void operator()(Entry* e) {
delete e;
}
};
- struct ObjEntry : public boost::intrusive::list_base_hook<> {
+ struct ObjEntry {
std::string key;
std::string version;
bool dirty;
time_t creationTime;
rgw_user user;
std::string etag;
- ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag) {}
- };
-
- struct ObjEntry_delete_disposer {
- void operator()(ObjEntry *e) {
- delete e;
- }
+ std::string bucket_name;
+ rgw_obj_key obj_key;
+ ObjEntry() = default;
+ ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag), bucket_name(bucket_name), obj_key(obj_key) {}
};
public:
CachePolicy() {}
virtual ~CachePolicy() = default;
- virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) = 0;
+ virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0;
virtual int exist_key(std::string key) = 0;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) = 0;
- virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) = 0;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) = 0;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) = 0;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual void cleaning(const DoutPrefixProvider* dpp) = 0;
template<typename T>
struct EntryComparator {
bool operator()(T* const e1, T* const e2) const {
- return e1->localWeight > e2->localWeight;
+ // order the min heap using localWeight and dirty flag so that dirty blocks are at the bottom
+ if ((e1->dirty && e2->dirty) || (!e1->dirty && !e2->dirty)) {
+ return e1->localWeight > e2->localWeight;
+ } else if (e1->dirty && !e2->dirty){
+ return true;
+ } else if (!e1->dirty && e2->dirty) {
+ return false;
+ } else {
+ return e1->localWeight > e2->localWeight;
+ }
}
};
+ template<typename T>
+ struct ObjectComparator {
+ bool operator()(T* const e1, T* const e2) const {
+ // order the min heap using creationTime
+ return e1->creationTime > e2->creationTime;
+ }
+ };
+
struct LFUDAEntry : public Entry {
int localWeight;
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
handle_type handle;
- LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, time_t creationTime, rgw_user user, int localWeight) : Entry(key, offset, len, version, dirty, creationTime, user), localWeight(localWeight) {}
+ LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty), localWeight(localWeight) {}
- void set_handle(handle_type handle_) { handle = handle_; }
+ void set_handle(handle_type handle_) { handle = handle_; }
};
struct LFUDAObjEntry : public ObjEntry {
- LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : ObjEntry(key, version, dirty, size, creationTime, user, etag) {}
+ using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
+ handle_type handle;
+
+ LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key) {}
+
+ void set_handle(handle_type handle_) { handle = handle_; }
};
using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
+ using Object_Heap = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>;
Heap entries_heap;
+ Object_Heap object_heap; //This heap contains dirty objects ordered by their creation time, used for cleaning method
std::unordered_map<std::string, LFUDAEntry*> entries_map;
- std::unordered_map<std::string, LFUDAObjEntry*> o_entries_map;
+ std::unordered_map<std::string, LFUDAObjEntry*> o_entries_map; //Contains only dirty objects, used for look-up
std::mutex lfuda_lock;
+ std::mutex lfuda_cleaning_lock;
+ std::condition_variable cond;
+ bool quit{false};
int age = 1, weightSum = 0, postedSum = 0;
optional_yield y = null_yield;
BlockDirectory* dir;
rgw::cache::CacheDriver* cacheDriver;
std::optional<asio::steady_timer> rthread_timer;
- rgw::sal::Driver *driver;
+ rgw::sal::Driver* driver;
std::thread tc;
CephContext* cct;
~LFUDAPolicy() {
rthread_stop();
delete dir;
+ std::lock_guard l(lfuda_cleaning_lock);
+ quit = true;
+ cond.notify_all();
}
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
void save_y(optional_yield y) { this->y = y; }
- virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
- virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override;
+ virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
virtual void cleaning(const DoutPrefixProvider* dpp) override;
LFUDAObjEntry* find_obj_entry(const std::string& key) {
auto it = o_entries_map.find(key);
- if (it == o_entries_map.end())
+ if (it == o_entries_map.end()) {
return nullptr;
+ }
return it->second;
}
};
public:
LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
- virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
+ virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
- virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual void cleaning(const DoutPrefixProvider* dpp) override {}
int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
Attrs* delattrs, optional_yield y, uint32_t flags)
{
- if (setattrs != NULL) {
- /* Ensure setattrs and delattrs do not overlap */
- if (delattrs != NULL) {
- for (const auto& attr : *delattrs) {
- if (std::find(setattrs->begin(), setattrs->end(), attr) != setattrs->end()) {
- delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+ rgw::sal::Attrs attrs;
+ std::string head_oid_in_cache;
+ if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) {
+ if (setattrs != nullptr) {
+ /* Ensure setattrs and delattrs do not overlap */
+ if (delattrs != nullptr) {
+ for (const auto& attr : *delattrs) {
+ if (std::find(setattrs->begin(), setattrs->end(), attr) != setattrs->end()) {
+ delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+ }
}
}
- }
+ //if set_obj_attrs() can be called to update existing attrs, then update_attrs() need to be called
+ if (auto ret = driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, *setattrs, y); ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed with ret: " << ret << dendl;
+ return ret;
+ }
+ } //if setattrs != nullptr
- if (driver->get_cache_driver()->set_attrs(dpp, this->get_key().get_oid(), *setattrs, y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed." << dendl;
- }
+ if (delattrs != nullptr) {
+ Attrs::iterator attr;
+ Attrs currentattrs = this->get_attrs();
- if (delattrs != NULL) {
- Attrs::iterator attr;
- Attrs currentattrs = this->get_attrs();
+ /* Ensure all delAttrs exist */
+ for (const auto& attr : *delattrs) {
+ if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) {
+ delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+ }
+ }
- /* Ensure all delAttrs exist */
- for (const auto& attr : *delattrs) {
- if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) {
- delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+ if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, *delattrs, y); ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
+ return ret;
}
+ } //if delattrs != nullptr
+ } else {
+ auto ret = next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl;
+ return ret;
}
-
- if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), *delattrs, y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl;
}
-
- return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
+ return 0;
}
bool D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y)
{
- rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
- rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
- .objName = this->get_name(),
- .bucketName = this->get_bucket()->get_name(),
- };
-
- rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
- .cacheObj = object,
- .blockID = 0,
- .version = version,
- .size = 0
- };
-
- bool found_in_cache = true;
- //if the block corresponding to head object does not exist in directory, implies it is not cached
- if (blockDir->exist_key(&block, y) && (blockDir->get(&block, y) == 0)) {
- rgw::sal::Attrs attrs;
- std::string version = block.version;
- this->set_object_version(version);
- //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version
- std::string head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name();
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache." << dendl;
- auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+ std::string head_oid_in_cache;
+ rgw::sal::Attrs attrs;
+ bool found_in_cache = check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y);
+
+ if (found_in_cache) {
+ /* Set metadata locally */
+
+ std::string instance;
+ for (auto& attr : attrs) {
+ if (attr.second.length() > 0) {
+ if (attr.first == "user.rgw.mtime") {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
+ auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
+ this->set_mtime(mtime);
+ } else if (attr.first == "user.rgw.object_size") {
+ auto size = std::stoull(attr.second.c_str());
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
+ this->set_obj_size(size);
+ } else if (attr.first == "user.rgw.accounted_size") {
+ auto accounted_size = std::stoull(attr.second.c_str());
+ this->set_accounted_size(accounted_size);
+ } else if (attr.first == "user.rgw.epoch") {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
+ auto epoch = std::stoull(attr.second.c_str());
+ this->set_epoch(epoch);
+ } else if (attr.first == "user.rgw.version_id") {
+ instance = attr.second.to_str();
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl;
+ } else if (attr.first == "user.rgw.source_zone") {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
+ auto short_zone_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
+ this->set_short_zone_id(short_zone_id);
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
+ }
+ }//end-if
+ }//end-for
+ this->set_instance(instance); //set this only after setting object state else it won't take effect
+ attrs.erase("user.rgw.mtime");
+ attrs.erase("user.rgw.object_size");
+ attrs.erase("user.rgw.accounted_size");
+ attrs.erase("user.rgw.epoch");
+ /* Set attributes locally */
+ auto ret = this->set_attrs(attrs);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
- found_in_cache = false;
- } else {
- /* Set metadata locally */
- RGWQuotaInfo quota_info;
-
- std::string instance;
- for (auto& attr : attrs) {
- if (attr.second.length() > 0) {
- if (attr.first == "user.rgw.mtime") {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl;
- auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str()));
- this->set_mtime(mtime);
- } else if (attr.first == "user.rgw.object_size") {
- auto size = std::stoull(attr.second.c_str());
- this->set_obj_size(size);
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl;
- } else if (attr.first == "user.rgw.accounted_size") {
- auto accounted_size = std::stoull(attr.second.c_str());
- this->set_accounted_size(accounted_size);
- } else if (attr.first == "user.rgw.epoch") {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl;
- auto epoch = std::stoull(attr.second.c_str());
- this->set_epoch(epoch);
- } else if (attr.first == "user.rgw.version_id") {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id." << dendl;
- instance = attr.second.to_str();
- } else if (attr.first == "user.rgw.source_zone") {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl;
- auto zone_short_id = static_cast<uint32_t>(std::stoul(attr.second.c_str()));
- this->set_short_zone_id(zone_short_id);
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl;
- }
- }//end-if
- }//end-for
- //this->set_obj_state(astate);
- this->set_instance(instance); //set this only after setting object state else it won't take effect
- attrs.erase("user.rgw.mtime");
- attrs.erase("user.rgw.object_size");
- attrs.erase("user.rgw.accounted_size");
- attrs.erase("user.rgw.epoch");
- /* Set attributes locally */
- ret = this->set_attrs(attrs);
- if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
- }
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
}
- } else {
- found_in_cache = false;
- }
+ } // if found_in_cache = true
return found_in_cache;
}
return 0;
}
-int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y)
+int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool dirty)
+{
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl;
+ rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ .objName = this->get_name(),
+ .bucketName = this->get_bucket()->get_name(),
+ .dirty = dirty,
+ .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
+ };
+
+ rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+ .cacheObj = object,
+ .blockID = 0,
+ .version = this->get_object_version(),
+ .dirty = dirty,
+ .size = 0,
+ .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
+ };
+
+ auto ret = blockDir->set(dpp, &block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ }
+
+ return ret;
+}
+
+bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, optional_yield y)
{
rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
.cacheObj = object,
.blockID = 0,
- .version = this->get_object_version(),
.size = 0
};
- auto ret = blockDir->set(&block, y);
- if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ bool found_in_cache = true;
+ //if the block corresponding to head object does not exist in directory, implies it is not cached
+ if (blockDir->exist_key(dpp, &block, y) && (blockDir->get(dpp, &block, y) == 0)) {
+ std::string version;
+ if (have_instance()) {
+ version = get_instance();
+ } else {
+ version = block.version;
+ }
+ this->set_object_version(version);
+ //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version
+ head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name();
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl;
+ auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+ if (ret < 0) {
+ //check for dirty blocks also
+ head_oid_in_cache = "D_" + get_bucket()->get_name() + "_" + version + "_" + get_name();
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl;
+ ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+ if (ret < 0) {
+ found_in_cache = false;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
+ }
+ }
+ } else { //if blockDir->exist_key
+ found_in_cache = false;
}
-
- return ret;
+ return found_in_cache;
}
int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
std::string version;
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from backend store." << dendl;
auto ret = next->get_obj_attrs(y, dpp, target_obj);
- if (ret < 0) {
+ if (ret < 0 || !target_obj) {
+ if (!target_obj) {
+ ret = -ENOENT;
+ }
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl;
return ret;
}
}
head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name();
- ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
+ if (this->driver->get_policy_driver()->get_cache_policy()->exist_key(head_oid_in_cache) > 0) {
+ ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
+ } else {
+ ret = this->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+ if (ret == 0) {
+ bufferlist bl;
+ ret = this->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+ } else {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to evict data with err: " << ret << dendl;
+ }
+ }
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
- time_t creationTime = ceph::real_clock::to_time_t(this->get_mtime());
- this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, creationTime, get<rgw_user>(this->get_bucket()->get_owner()), y);
+ this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y);
ret = set_head_obj_dir_entry(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
}
} else {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in block dir with error: " << ret << dendl;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend with error: " << ret << dendl;
}
}
{
Attrs update;
update[(std::string)attr_name] = attr_val;
-
- if (driver->get_cache_driver()->update_attrs(dpp, this->get_key().get_oid(), update, y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver update_attrs method failed." << dendl;
-
- return next->modify_obj_attrs(attr_name, attr_val, y, dpp, flags);
+ std::string head_oid_in_cache;
+ rgw::sal::Attrs attrs;
+ if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) {
+ if (auto ret = driver->get_cache_driver()->update_attrs(dpp, head_oid_in_cache, update, y); ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver update_attrs method failed with ret: " << ret << dendl;
+ return ret;
+ }
+ } else {
+ auto ret = next->modify_obj_attrs(attr_name, attr_val, y, dpp, flags);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): modify_obj_attrs of backend store failed with ret: " << ret << dendl;
+ return ret;
+ }
+ }
+ return 0;
}
int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name,
optional_yield y)
{
buffer::list bl;
+ std::string head_oid_in_cache;
+ rgw::sal::Attrs attrs;
Attrs delattr;
- delattr.insert({attr_name, bl});
- Attrs currentattrs = this->get_attrs();
- rgw::sal::Attrs::iterator attr = delattr.begin();
+ if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) {
+ delattr.insert({attr_name, bl});
+ Attrs currentattrs = this->get_attrs();
+ rgw::sal::Attrs::iterator attr = delattr.begin();
- /* Ensure delAttr exists */
- if (std::find_if(currentattrs.begin(), currentattrs.end(),
- [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) {
+ /* Ensure delAttr exists */
+ if (std::find_if(currentattrs.begin(), currentattrs.end(),
+ [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) {
- if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), delattr, y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl;
- } else
- return next->delete_obj_attrs(dpp, attr_name, y);
+ if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, delattr, y); ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
+ return ret;
+ }
+ }
+ } else {
+ if (auto ret = next->delete_obj_attrs(dpp, attr_name, y); ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): delete_obj_attrs method of backend store failed with ret: " << ret << dendl;
+ return ret;
+ }
+ }
return 0;
}
ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
- time_t creationTime = ceph::real_clock::to_time_t(this->source->get_mtime());
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
ret = source->set_head_obj_dir_entry(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
}
int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) {
- auto c = aio->wait();
- while (!c.empty()) {
- int r = flush(dpp, std::move(c), y);
- if (r < 0) {
- cancel();
- return r;
- }
- c = aio->wait();
+ auto c = aio->drain();
+ int r = flush(dpp, std::move(c), y);
+ if (r < 0) {
+ cancel();
+ return r;
}
- return flush(dpp, std::move(c), y);
+ return 0;
}
int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y) {
uint64_t ofs = ofs_len_pair.first;
uint64_t len = ofs_len_pair.second;
bool dirty = false;
- /*
- std::stringstream s;
- utime_t ut(source->get_mtime());
- ut.gmtime(s);
- */
- time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
rgw::d4n::CacheBlock block;
block.cacheObj.objName = source->get_key().get_oid();
std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
- if (source->driver->get_block_dir()->get(&block, y) == 0){
- if (block.dirty == true){
+ if (source->driver->get_block_dir()->get(dpp, &block, y) == 0){
+ if (block.dirty){
dirty = true;
}
}
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y);
blocks_info.erase(it);
} else {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
std::string version = source->get_object_version();
std::string prefix;
- /* After prepare() method, for versioned objects, get_oid() returns an oid with versionId added,
- * even for versioned objects, where version id is not provided as input
- */
- if (source->have_instance()) {
- prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid();
- } else {
- prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid();
- }
+
+ prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name();
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "prefix: " << prefix << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl;
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
int ret = -1;
- if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && (ret = source->driver->get_block_dir()->get(&block, y)) == 0) {
+ if (source->driver->get_block_dir()->exist_key(dpp, &block, y) > 0 && (ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address);
if (it != block.hostsList.end()) { /* Local copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
std::string key = oid_in_cache;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.dirty << dendl;
if (block.dirty == true) {
return this->cb->flush_last_part();
}
+int D4NFilterObject::D4NFilterReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y)
+{
+ rgw::sal::Attrs& attrs = source->get_attrs();
+ if (attrs.empty()) {
+ rgw_obj obj = source->get_obj();
+ auto ret = source->get_obj_attrs(y, dpp, &obj);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Error: failed to fetch attrs, ret= " << ret << dendl;
+ return ret;
+ }
+ //get_obj_attrs() calls set_attrs() internally, hence get_attrs() can be invoked to get the latest attrs.
+ attrs = source->get_attrs();
+ }
+ auto it = attrs.find(name);
+ if (it == attrs.end()) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Attribute value NOT found for attr name= " << name << dendl;
+ return next->get_attr(dpp, name, dest, y);
+ }
+
+ dest = it->second;
+ return 0;
+}
+
int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
{
last_part = true;
//Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
if (write_to_cache) {
+ Attrs attrs; // empty attrs for cache sets
+ std::string version = source->get_object_version();
+ std::string prefix = source->get_prefix();
+
rgw::d4n::CacheBlock block, existing_block;
rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
std::stringstream s;
block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
block.cacheObj.dirty = false;
- bool dirty = false;
- time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
+ bool dirty = block.dirty = false; //Reading from the backend, data is clean
+ block.version = version;
//populating fields needed for building directory index
existing_block.cacheObj.objName = block.cacheObj.objName;
existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
- Attrs attrs; // empty attrs for cache sets
- std::string version = source->get_object_version();
- std::string prefix = source->get_prefix();
ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
block.blockID = ofs;
block.size = bl.length();
- block.version = version;
- block.dirty = false; //Reading from the backend, data is clean
+
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
std::string objEtag = "";
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
- filter->get_policy_driver()->get_cache_policy()->updateObj(dpp, prefix, version, dirty, source->get_size(), creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), objEtag, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
/* Store block in directory */
- if (!blockDir->exist_key(&block, *y)) {
- if (blockDir->set(&block, *y) < 0) //should we revert previous steps if this step fails?
+ if (!blockDir->exist_key(dpp, &block, *y)) {
+ if (blockDir->set(dpp, &block, *y) < 0) //should we revert previous steps if this step fails?
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
existing_block.blockID = block.blockID;
existing_block.size = block.size;
existing_block.dirty = block.dirty;
- if (blockDir->get(&existing_block, *y) < 0) {
+ if (blockDir->get(dpp, &existing_block, *y) < 0) {
ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
} else {
if (existing_block.version != block.version) {
- if (blockDir->del(&existing_block, *y) < 0) //delete existing block
+ if (blockDir->del(dpp, &existing_block, *y) < 0) //delete existing block
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(&block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+ if (blockDir->set(dpp, &block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+ if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
}
}
std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
block.blockID = ofs;
block.size = bl.length();
- block.version = version;
ofs += bl_len;
- block.dirty = dirty;
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
/* Store block in directory */
- if (!blockDir->exist_key(&block, *y)) {
- if (blockDir->set(&block, *y) < 0)
+ if (!blockDir->exist_key(dpp, &block, *y)) {
+ if (blockDir->set(dpp, &block, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
existing_block.blockID = block.blockID;
existing_block.size = block.size;
existing_block.dirty = block.dirty;
- if (blockDir->get(&existing_block, *y) < 0) {
+ if (blockDir->get(dpp, &existing_block, *y) < 0) {
ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
}
if (existing_block.version != block.version) {
- if (blockDir->del(&existing_block, *y) < 0)
+ if (blockDir->del(dpp, &existing_block, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(&block, *y) < 0)
+ if (blockDir->set(dpp, &block, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+ if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl;
}
}
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
block.blockID = ofs;
block.size = bl_rem.length();
- block.version = version;
ofs += bl_rem.length();
- block.dirty = dirty;
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, *y);
/* Store block in directory */
- if (!blockDir->exist_key(&block, *y)) {
- if (blockDir->set(&block, *y) < 0)
+ if (!blockDir->exist_key(dpp, &block, *y)) {
+ if (blockDir->set(dpp, &block, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
existing_block.blockID = block.blockID;
existing_block.size = block.size;
existing_block.dirty = block.dirty;
- if (blockDir->get(&existing_block, *y) < 0) {
+ if (blockDir->get(dpp, &existing_block, *y) < 0) {
ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
} else {
if (existing_block.version != block.version) {
- if (blockDir->del(&existing_block, *y) < 0)
+ if (blockDir->del(dpp, &existing_block, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(&block, *y) < 0)
+ if (blockDir->set(dpp, &block, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+ if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
}
}
int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
optional_yield y, uint32_t flags)
{
+ /*
+ 1. Check if object exists in Object Directory
+ 2. get dirty flag and version to construct the oid in cache correctly
+ 3. loop and delete all blocks in the cache
+ 4. delete the head block
+ 5. Update the in memory data structure for all blocks, and update the block directory also
+ 6. Update the in memory data structure for head block, and update the block directory also
+ 7. If the blocks reside in other caches, send remote request for the same
+ 8. Need to figure out a way to get all versions to be deleted in case of versioned objects when a version is not specified.
+ 9. If the object is not in object directory call next->delete_obj
+ */
rgw::d4n::CacheObj obj = rgw::d4n::CacheObj{ // TODO: Add logic to ObjectDirectory del method to also delete all blocks belonging to that object
.objName = source->get_key().get_oid(),
.bucketName = source->get_bucket()->get_name()
int D4NFilterWriter::prepare(optional_yield y)
{
startTime = time(NULL);
- if (driver->get_cache_driver()->delete_data(save_dpp, obj->get_key().get_oid(), y) < 0)
- ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl;
+ if (driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl;
d4n_writecache = g_conf()->d4n_writecache_enabled;
if (!d4n_writecache){
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl;
return next->prepare(y);
}
- else
- return 0;
+ if (!object->have_instance()) {
+ if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended
+ this->version = "null";
+ } else { // this holds true for non-versioned object and for version enabled object with no versionId given as input
+ constexpr uint32_t OBJ_INSTANCE_LEN = 32;
+ char buf[OBJ_INSTANCE_LEN + 1];
+ gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
+ this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): generating version: " << version << dendl;
+ }
+ } else {
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): " << "version is: " << object->get_instance() << dendl;
+ }
+ return 0;
}
int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
off_t ofs = offset;
bool dirty = true;
rgw::d4n::CacheBlock block, existing_block;
- auto creationTime = startTime;
-
- auto version = obj->get_instance();
+ std::string version;
std::string prefix;
- if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
- prefix =obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
+ if (object->have_instance()) {
+ version = obj->get_instance();
} else {
- prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
+ version = this->version;
}
-
+ prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
int ret = 0;
- if (!d4n_writecache){
- std::string oid = prefix + "_" + std::to_string(ofs)+ "_" + std::to_string(bl_len);
- block.size = bl.length();
- block.blockID = ofs;
- block.dirty = false; //writing to the backend, hence the data is clean
- block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_backend_address);
-
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl;
+ if (!d4n_writecache) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl;
ret = next->process(std::move(data), offset);
- if (ret == 0){
- if (!blockDir->exist_key(&block, y)) {
- if (blockDir->set(&block, y) < 0)
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
- if (blockDir->get(&existing_block, y) < 0) {
- ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
- } else {
- if (existing_block.version != block.version) {
- if (blockDir->del(&existing_block, y) < 0) //delete existing block
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
- }
- }
- }
- } else{
- ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writing data to the backend failed!" << dendl;
- return ret;
- }
} else {
std::string oid = prefix + "_" + std::to_string(ofs);
std::string key = "D_" + oid + "_" + std::to_string(bl_len);
block.blockID = ofs;
block.dirty = true;
block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+ block.version = version;
dirty = true;
- ret = driver->get_policy_driver()->get_cache_policy()->eviction(save_dpp, block.size, y);
- if (ret == 0) {
- //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
+ ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, y);
+ if (ret == 0) {
if (bl.length() > 0) {
- ret = driver->get_cache_driver()->put(save_dpp, key, bl, bl.length(), obj->get_attrs(), y);
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): key is: " << key << dendl;
+ ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), obj->get_attrs(), y);
if (ret == 0) {
- driver->get_policy_driver()->get_cache_policy()->update(save_dpp, oid_in_cache, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), y);
- if (!blockDir->exist_key(&block, y)) {
- if (blockDir->set(&block, y) < 0) //should we revert previous steps if this step fails?
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl;
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y);
+ if (!blockDir->exist_key(dpp, &block, y)) {
+ if (blockDir->set(dpp, &block, y) < 0) //should we revert previous steps if this step fails?
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
existing_block.blockID = block.blockID;
existing_block.size = block.size;
- if (blockDir->get(&existing_block, y) < 0) {
- ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+ if (blockDir->get(dpp, &existing_block, y) < 0) {
+ ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
} else {
if (existing_block.version != block.version) {
- if (blockDir->del(&existing_block, y) < 0) //delete existing block
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ if (blockDir->del(dpp, &existing_block, y) < 0) //delete existing block
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+ if (blockDir->set(dpp, &block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
- ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+ if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
}
}
}
} else {
- ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl;
+ ldpp_dout(dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl;
return ret;
}
}
const req_context& rctx,
uint32_t flags)
{
- bool dirty = true;
+ bool dirty = false;
std::vector<std::string> hostsList = {};
auto creationTime = startTime;
std::string objEtag = etag;
-
- auto version = obj->get_instance();
-
- std::string prefix;
- if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
- prefix = obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
- } else {
- prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
- }
-
-
- if (d4n_writecache){
- driver->get_policy_driver()->get_cache_policy()->updateObj(save_dpp, prefix, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, y);
- hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address };
+ bool write_to_backend_store = false;
+ int ret;
- rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
- .objName = obj->get_key().get_oid(),
- .bucketName = obj->get_bucket()->get_name(),
- .creationTime = std::to_string(creationTime),
- .dirty = dirty,
- .hostsList = hostsList
- };
-
- if (driver->get_obj_dir()->set(&object, y) < 0)
- ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed." << dendl;
- return 0;
- }
- /* Retrieve complete set of attrs */
- int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
- delete_at, if_match, if_nomatch, user_data, zones_trace,
- canceled, rctx, flags);
- obj->get_obj_attrs(rctx.y, save_dpp, NULL);
-
- /* Append additional metadata to attributes */
- rgw::sal::Attrs baseAttrs = obj->get_attrs();
- rgw::sal::Attrs attrs_temp = baseAttrs;
- buffer::list bl;
- obj->load_obj_state(save_dpp, rctx.y);
-
- bl.append(std::to_string(creationTime));
- baseAttrs.insert({"mtime", bl});
- bl.clear();
-
- bl.append(std::to_string(obj->get_size()));
- baseAttrs.insert({"object_size", bl});
- bl.clear();
-
- bl.append(std::to_string(accounted_size));
- baseAttrs.insert({"accounted_size", bl});
- bl.clear();
-
- bl.append(std::to_string(obj->get_epoch()));
- baseAttrs.insert({"epoch", bl});
- bl.clear();
-
- if (obj->have_instance()) {
- bl.append(obj->get_instance());
- baseAttrs.insert({"version_id", bl});
- bl.clear();
- } else {
- bl.append(""); /* Empty value */
- baseAttrs.insert({"version_id", bl});
- bl.clear();
- }
+ if (d4n_writecache) {
+ dirty = true;
+ std::string version;
+ if (object->have_instance()) {
+ version = obj->get_instance();
+ } else {
+ version = this->version; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
+ if (obj->get_bucket()->versioned()) {
+ object->set_instance(version);
+ }
+ }
+ std::string key = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
- auto iter = attrs_temp.find(RGW_ATTR_SOURCE_ZONE);
- if (iter != attrs_temp.end()) {
- bl.append(std::to_string(obj->get_short_zone_id()));
- baseAttrs.insert({"source_zone_short_id", bl});
- bl.clear();
- } else {
- bl.append("0"); /* Initialized to zero */
- baseAttrs.insert({"source_zone_short_id", bl});
- bl.clear();
+ ceph::real_time m_time;
+ if (mtime) {
+ m_time = *mtime;
+ } else {
+ m_time = real_clock::now();
+ }
+ object->set_mtime(m_time);
+ object->set_accounted_size(accounted_size);
+ ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " size is: " << object->get_size() << dendl;
+ object->set_obj_state_attrs(dpp, y, attrs);
+ bufferlist bl;
+ std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache
+ ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
+ attrs.erase("user.rgw.mtime");
+ attrs.erase("user.rgw.object_size");
+ attrs.erase("user.rgw.accounted_size");
+ attrs.erase("user.rgw.epoch");
+ object->set_object_version(version);
+ if (ret == 0) {
+ object->set_attrs(attrs);
+ ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " version stored in update method is: " << version << dendl;
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
+ ret = object->set_head_obj_dir_entry(dpp, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ return ret;
+ }
+ driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y);
+
+ //write object to directory.
+ hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address };
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ .objName = obj->get_name(),
+ .bucketName = obj->get_bucket()->get_name(),
+ .creationTime = std::to_string(creationTime),
+ .dirty = dirty,
+ .hostsList = hostsList
+ };
+ ret = driver->get_obj_dir()->set(&object, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
+ return ret;
+ }
+ } else { //if get_cache_driver()->put()
+ write_to_backend_store = true;
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " put failed for head_oid_in_cache wih error: " << ret << dendl;
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " calling complete of backend store: " << dendl;
+ }
+ } else { // if d4n_writecache = true
+ write_to_backend_store = true;
}
- baseAttrs.insert(attrs.begin(), attrs.end());
-
- //bufferlist bl_empty;
- //int putReturn = driver->get_cache_driver()->
- // put(save_dpp, obj->get_key().get_oid(), bl_empty, accounted_size, baseAttrs, y); /* Data already written during process call */
- /*
- if (putReturn < 0) {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation failed." << dendl;
- } else {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation succeeded." << dendl;
+ if (write_to_backend_store) {
+ ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
+ delete_at, if_match, if_nomatch, user_data, zones_trace,
+ canceled, rctx, flags);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): Writing to backend store failed with err: " << ret << dendl;
+ }
}
- */
- return ret;
+
+ return 0;
}
} } // namespace rgw::sal