#include "../../../common/async/yield_context.h"
#include "common/async/blocked_completion.h"
-#include "common/dout.h"
+#include "common/split.h"
#include "rgw_perf_counters.h"
namespace rgw { namespace d4n {
static auto obj_callback = [this](
const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
- const rgw_obj_key& obj_key, optional_yield y) {
- update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y);
+ const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val) {
+ update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y, restore_val);
};
static auto block_callback = [this](
}
}
+/* Changes state to INVALID for dirty objects. An INVALID state indicates that a delete request has been
+ issued on an object and it must be deleted rather than written to the backend. This lazy deletion occurs
+ in the Cleaning method and prevents data races during concurrent requests. The method below returns "false"
+ if the state has not been set to INVALID, and "true" if it has. The state is not set to INVALID when
+ cleaning is in progress, a process which writes the object to the backend store. */
+bool LFUDAPolicy::invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) {
+ std::unique_lock<std::mutex> l(lfuda_cleaning_lock);
+
+ if (o_entries_map.empty())
+ return false;
+
+ auto p = o_entries_map.find(key);
+ if (p == o_entries_map.end()) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): key=" << key << " not found" << dendl;
+ return false;
+ }
+
+ if (p->second.second == State::INIT) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Setting State::INVALID for key=" << key << dendl;
+ p->second.second = State::INVALID;
+ int ret = cacheDriver->set_attr(dpp, DIRTY_BLOCK_PREFIX + key, RGW_CACHE_ATTR_INVALID, "1", y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Failed to set xattr, ret=" << ret << dendl;
+ return false;
+ }
+
+ return true;
+ } else if (p->second.second == State::IN_PROGRESS) {
+ state_cond.wait(l, [this, &key]{ return (o_entries_map.find(key) == o_entries_map.end()); });
+ }
+
+ return false;
+}
+
CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optional_yield y) {
const std::lock_guard l(lfuda_lock);
if (entries_heap.empty())
std::string key = entries_heap.top()->key;
CacheBlock* victim = new CacheBlock();
- victim->cacheObj.bucketName = key.substr(0, key.find('_'));
- key.erase(0, key.find('_') + 1);
- victim->version = key.substr(0, key.find('_'));
- key.erase(0, key.find('_') + 1);
- victim->cacheObj.objName = key.substr(0, key.find('_'));
- victim->blockID = entries_heap.top()->offset;
- victim->size = entries_heap.top()->len;
+ auto parts = split(key, "#");
+ std::vector<std::string> block_info;
+ block_info.assign(parts.begin(), parts.end());
+
+ if (block_info.size() != 5) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Key of the top entry in the min heap has not been constructed correctly." << dendl;
+ return nullptr;
+ }
+
+ victim->cacheObj.bucketName = block_info[0];
+ victim->version = block_info[1];
+ victim->cacheObj.objName = block_info[2];
+ victim->blockID = std::stoull(block_info[3]);
+ victim->size = std::stoull(block_info[4]);
if (blockDir->get(dpp, victim, y) < 0) {
return nullptr;
if (entries_heap.empty())
return 0;
+ int ret = -1;
uint64_t freeSpace = cacheDriver->get_free_space(dpp);
while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop?
if (victim == nullptr) {
ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl;
delete victim;
- return 0; // not necessarily an error? -Sam
+ return -ENOSPC;
}
const std::lock_guard l(lfuda_lock);
// check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty
if (it->second->dirty) {
ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl;
- return 0;
+ return -ENOSPC;
}
int avgWeight = weightSum / entries_map.size();
(*it->second->handle)->localWeight = it->second->localWeight;
entries_heap.decrease(it->second->handle); // larger value means node must be decreased to maintain min heap
- if (int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y) < 0) {
+ if ((ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y)) < 0) {
delete victim;
return ret;
}
}
victim->globalWeight += it->second->localWeight;
- if (int ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) {
+ if ((ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y)) < 0) {
delete victim;
return ret;
}
- if (int ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
+ if ((ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) {
delete victim;
return ret;
}
delete victim;
- if (int ret = cacheDriver->del(dpp, key, y) < 0)
+ if ((ret = cacheDriver->delete_data(dpp, key, y)) < 0) {
return ret;
+ }
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl;
weightSum += ((localWeight < 0) ? 0 : localWeight);
}
-void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
+void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)
{
using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
+ State state{State::INIT};
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl;
+
+ if (!restore_val.empty() && restore_val == "1") { // No need to set the xattr because this case only occurs when the state has
+ state = State::INVALID; // been retrieved from the xattr itself.
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): State restored to INVALID." << dendl;
+ } else {
+ state = State::INIT;
+ }
+
const std::lock_guard l(lfuda_cleaning_lock);
LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
handle_type handle = object_heap.push(e);
e->set_handle(handle);
- o_entries_map.emplace(key, e);
+ o_entries_map.emplace(key, std::make_pair(e, state));
cond.notify_one();
}
return false;
}
- object_heap.erase(p->second->handle);
+ object_heap.erase(p->second.first->handle);
o_entries_map.erase(p);
- delete p->second;
- p->second = nullptr;
+ delete p->second.first;
+ p->second.first = nullptr;
+ state_cond.notify_one();
return true;
}
+int LFUDAPolicy::delete_data_blocks(const DoutPrefixProvider* dpp, LFUDAObjEntry* e, optional_yield y) {
+ off_t lst = e->size, fst = 0, ofs = 0;
+ uint64_t len = 0;
+
+ do {
+ if (fst >= lst) {
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+ std::string prefix = e->key + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+ std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix;
+
+ int ret = -1;
+ if ((ret = cacheDriver->delete_data(dpp, oid_in_cache, y)) == 0) { // Sam: do we want del or delete_data here?
+ if (!(ret = erase(dpp, prefix, y))) {
+ ldpp_dout(dpp, 0) << "Failed to delete policy entry for: " << oid_in_cache << ", ret=" << ret << dendl;
+ return -EINVAL;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to delete data block " << oid_in_cache << ", ret=" << ret << dendl;
+ return -EINVAL;
+ }
+
+ ofs += len;
+ } while (len > 0);
+
+ return 0;
+}
+
void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
{
const int interval = dpp->get_cct()->_conf->rgw_d4n_cache_cleaning_interval;
ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl;
uint64_t len = 0;
rgw::sal::Attrs obj_attrs;
+ bool invalid = false;
ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock" << dendl;
std::unique_lock<std::mutex> l(lfuda_cleaning_lock);
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl;
l.unlock();
- if (!e->key.empty() && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
- rgw_user c_rgw_user = e->user;
- //writing data to the backend
- //we need to create an atomic_writer
- std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
-
- std::unique_ptr<rgw::sal::Bucket> c_bucket;
- rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, e->bucket_id);
-
- RGWBucketInfo c_bucketinfo;
- c_bucketinfo.bucket = c_rgw_bucket;
- c_bucketinfo.owner = c_rgw_user;
- int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
- if (ret < 0) {
- ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
- //Remove bucket should be implemented in d4n which will take care of deleting objects belonging to the bucket, and hence we should not reach here
- erase_dirty_object(dpp, e->key, null_yield);
- continue;
- }
-
- std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(e->obj_key);
- ldpp_dout(dpp, 20) << __func__ << "(): c_obj oid =" << c_obj->get_oid() << dendl;
-
- ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
-
- std::unique_ptr<rgw::sal::Writer> processor = driver->get_atomic_writer(dpp,
- null_yield,
- c_obj.get(),
- owner,
- NULL,
- 0,
- "");
- int op_ret = processor->prepare(null_yield);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
- erase_dirty_object(dpp, e->key, null_yield);
+ int diff = std::difftime(time(NULL), e->creationTime);
+ if (!e->key.empty() && (diff > interval)) { // if block is dirty and written more than interval seconds ago
+ l.lock();
+ auto p = o_entries_map.find(e->key);
+ if (p == o_entries_map.end()) {
+ l.unlock();
+ continue;
}
-
- std::string prefix = url_encode(e->bucket_id) + CACHE_DELIM + url_encode(e->version) + CACHE_DELIM + url_encode(c_obj->get_name());
- off_t lst = e->size;
- off_t fst = 0;
- off_t ofs = 0;
-
- rgw::sal::DataProcessor* filter = processor.get();
- std::string head_oid_in_cache = DIRTY_BLOCK_PREFIX + prefix;
- std::string new_head_oid_in_cache = prefix;
- ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl;
- ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
- bufferlist bl;
- cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
- obj_attrs.erase(RGW_CACHE_ATTR_MTIME);
- obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
- obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
- obj_attrs.erase(RGW_CACHE_ATTR_EPOCH);
-
- do {
- ceph::bufferlist data;
- if (fst >= lst){
- break;
- }
- off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
- off_t cur_len = cur_size - fst;
- std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
- ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl;
- rgw::sal::Attrs attrs;
- cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield);
- len = data.length();
- fst += len;
-
- if (len == 0) {
- // TODO: if len of any block is 0 for some reason, we must return from here?
- break;
- }
-
- op_ret = filter->process(std::move(data), ofs);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
- << op_ret << dendl;
- erase_dirty_object(dpp, e->key, null_yield);
- }
-
- ofs += len;
- } while (len > 0);
-
- op_ret = filter->process({}, ofs);
-
- const req_context rctx{dpp, null_yield, nullptr};
- ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime);
- op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs,
- std::nullopt, ceph::real_time(), nullptr, nullptr,
- nullptr, nullptr, nullptr,
- rctx, rgw::sal::FLAG_LOG_OP);
-
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "processor->complete() returned ret=" << op_ret << dendl;
- erase_dirty_object(dpp, e->key, null_yield);
- }
- //invoke update() with dirty flag set to false, to update in-memory metadata for each block
- // reset values
- lst = e->size;
- fst = 0;
- do {
- if (fst >= lst) {
- break;
- }
- off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
- off_t cur_len = cur_size - fst;
-
- std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
- ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl;
- std::string new_oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
- //Rename block to remove "D" prefix
- cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
- //Update in-memory data structure for each block
- this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y);
-
- rgw::d4n::CacheBlock block;
- block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
- block.cacheObj.objName = c_obj->get_key().get_oid();
- block.size = cur_len;
- block.blockID = fst;
- std::string dirty = "false";
- op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
- if (op_ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
- }
- fst += cur_len;
- } while(fst < lst);
-
- cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
-
- //invoke update() with dirty flag set to false, to update in-memory metadata for head
- this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
-
- rgw::d4n::CacheBlock block;
- block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
- block.cacheObj.objName = c_obj->get_name();
- block.size = 0;
- block.blockID = 0;
- if (c_obj->have_instance()) {
- blockDir->get(dpp, &block, null_yield);
- if (block.version == c_obj->get_instance()) { //versioned case - update head block entry that has latest version
- std::string dirty = "false";
- op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
- }
- }
- } else { //non-versioned case
- std::string dirty = "false";
- op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
- }
- }
- if (c_obj->have_instance()) {
- rgw::d4n::CacheBlock instance_block;
- instance_block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
- instance_block.cacheObj.objName = c_obj->get_oid();
- instance_block.size = 0;
- instance_block.blockID = 0;
- op_ret = blockDir->update_field(dpp, &instance_block, "dirty", "false", null_yield);
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
- }
+ if (p->second.second == State::INVALID) {
+ invalid = true;
}
+ l.unlock();
+
+ // If the state is invalid, the blocks must be deleted from the cache rather than written to the backend.
+ if (invalid) {
+ ldpp_dout(dpp, 10) << __func__ << "(): State is INVALID; deleting object." << dendl;
+ int ret = -1;
+ if ((ret = cacheDriver->delete_data(dpp, DIRTY_BLOCK_PREFIX + e->key, y)) == 0) { // Sam: do we want del or delete_data here?
+ if (!(ret = erase(dpp, e->key, y))) {
+ ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << e->key << ", ret=" << ret << dendl; // TODO: what must occur during failure?
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to delete head object for: " << e->key << ", ret=" << ret << dendl;
+ }
- //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets
- if (!c_obj->have_instance()) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
- rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
- .objName = c_obj->get_name(),
- .bucketName = c_obj->get_bucket()->get_bucket_id(),
- };
- ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
- }
+ if (!e->delete_marker) {
+ ret = delete_data_blocks(dpp, e, y);
+ if (ret == 0) {
+ erase_dirty_object(dpp, e->key, null_yield);
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to delete blocks for: " << e->key << ", ret=" << ret << dendl;
+ }
+ }
} else {
- rgw::d4n::CacheBlock latest_block = block;
- latest_block.cacheObj.objName = c_obj->get_name();
- //add watch on latest entry, as it can be modified by a put or a del
- ret = blockDir->watch(dpp, &latest_block, y);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
- }
- int retry = 3;
- while(retry) {
- retry--;
- //get latest entry
- ret = blockDir->get(dpp, &latest_block, y);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
- }
- //start redis transaction using MULTI
- blockDir->multi(dpp, y);
- if (latest_block.version == e->version) {
- //remove object entry from ordered set
- if (c_obj->have_instance()) {
- blockDir->del(dpp, &latest_block, y, true);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
- }
- }
- }
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
- rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
- .objName = c_obj->get_name(),
- .bucketName = c_obj->get_bucket()->get_bucket_id(),
- };
- ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
- }
- std::vector<std::string> responses;
- ret = blockDir->exec(dpp, responses, y);
- if (responses.empty()) {
- ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
- continue;
- }
- break;
- }//end-while (retry)
+ l.lock();
+ p->second.second = State::IN_PROGRESS;
+ l.unlock();
+
+ rgw_user c_rgw_user = e->user;
+ //writing data to the backend
+ //we need to create an atomic_writer
+ std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
+
+ std::unique_ptr<rgw::sal::Bucket> c_bucket;
+ rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, e->bucket_id);
+
+ RGWBucketInfo c_bucketinfo;
+ c_bucketinfo.bucket = c_rgw_bucket;
+ c_bucketinfo.owner = c_rgw_user;
+ int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
+ //Remove bucket should be implemented in d4n which will take care of deleting objects belonging to the bucket, and hence we should not reach here
+ erase_dirty_object(dpp, e->key, null_yield);
+ continue;
+ }
+
+ std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(e->obj_key);
+ bool null_instance = (c_obj->get_instance() == "null");
+ if (null_instance) {
+ //clear the instance for backend store
+ c_obj->clear_instance();
+ }
+ ldpp_dout(dpp, 20) << __func__ << "(): c_obj oid =" << c_obj->get_oid() << dendl;
+
+ ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
+
+ std::string prefix = url_encode(e->bucket_id) + CACHE_DELIM + url_encode(e->version) + CACHE_DELIM + url_encode(c_obj->get_name());
+ std::string head_oid_in_cache = DIRTY_BLOCK_PREFIX + prefix;
+ std::string new_head_oid_in_cache = prefix;
+ ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl;
+ ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl;
+ int op_ret;
+ if (e->delete_marker) {
+ bool null_delete_marker = (c_obj->get_instance() == "null");
+ if (null_delete_marker) {
+ //clear the instance for backend store
+ c_obj->clear_instance();
+ }
+ std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = c_obj->get_delete_op();
+ del_op->params.obj_owner = owner;
+ del_op->params.bucket_owner = c_bucket->get_owner();
+ del_op->params.versioning_status = c_bucket->get_info().versioning_status();
+ //populate marker_version_id only when delete marker is not null
+ del_op->params.marker_version_id = e->version;
+ op_ret = del_op->delete_obj(dpp, null_yield, rgw::sal::FLAG_LOG_OP);
+ if (op_ret >= 0) {
+ bool delete_marker = del_op->result.delete_marker;
+ std::string version_id = del_op->result.version_id;
+ ldpp_dout(dpp, 20) << __func__ << "delete_obj delete_marker=" << delete_marker << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "delete_obj version_id=" << version_id << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << __func__ << "delete_obj returned ret=" << op_ret << dendl;
+ erase_dirty_object(dpp, e->key, null_yield);
+ continue;
+ }
+ if (null_delete_marker) {
+ //restore instance for directory data processing in later steps
+ c_obj->set_instance("null");
+ }
+ } else { //end-if delete_marker
+
+ std::unique_ptr<rgw::sal::Writer> processor = driver->get_atomic_writer(dpp,
+ null_yield,
+ c_obj.get(),
+ owner,
+ NULL,
+ 0,
+ "");
+
+ op_ret = processor->prepare(null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
+ erase_dirty_object(dpp, e->key, null_yield);
+ continue;
+ }
+
+ off_t lst = e->size;
+ off_t fst = 0;
+ off_t ofs = 0;
+
+ rgw::sal::DataProcessor* filter = processor.get();
+ bufferlist bl;
+ op_ret = cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "cacheDriver->get_attrs returned ret=" << op_ret << dendl;
+ erase_dirty_object(dpp, e->key, null_yield);
+ continue;
+ }
+ obj_attrs.erase(RGW_CACHE_ATTR_MTIME);
+ obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE);
+ obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE);
+ obj_attrs.erase(RGW_CACHE_ATTR_EPOCH);
+ obj_attrs.erase(RGW_CACHE_ATTR_MULTIPART);
+ obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_NS);
+ obj_attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME);
+ obj_attrs.erase(RGW_CACHE_ATTR_LOCAL_WEIGHT);
+
+ do {
+ ceph::bufferlist data;
+ if (fst >= lst){
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+ std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+ ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl;
+ rgw::sal::Attrs attrs;
+ cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "cacheDriver->get returned ret=" << op_ret << dendl;
+ erase_dirty_object(dpp, e->key, null_yield);
+ continue;
+ }
+ len = data.length();
+ fst += len;
+
+ if (len == 0) {
+ // TODO: if len of any block is 0 for some reason, we must return from here?
+ break;
+ }
+
+ op_ret = filter->process(std::move(data), ofs);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
+ << op_ret << dendl;
+ erase_dirty_object(dpp, e->key, null_yield);
+ continue;
+ }
+
+ ofs += len;
+ } while (len > 0);
+
+ op_ret = filter->process({}, ofs);
+
+ const req_context rctx{dpp, null_yield, nullptr};
+ ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime);
+ op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs,
+ std::nullopt, ceph::real_time(), nullptr, nullptr,
+ nullptr, nullptr, nullptr,
+ rctx, rgw::sal::FLAG_LOG_OP);
+
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "processor->complete() returned ret=" << op_ret << dendl;
+ erase_dirty_object(dpp, e->key, null_yield);
+ continue;
+ }
+ //invoke update() with dirty flag set to false, to update in-memory metadata for each block
+ // reset values
+ lst = e->size;
+ fst = 0;
+ do {
+ if (fst >= lst) {
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+
+ std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+ ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl;
+ std::string new_oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
+ //Rename block to remove "D" prefix
+ cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
+ //Update in-memory data structure for each block
+ this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y);
+
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
+ block.cacheObj.objName = c_obj->get_key().get_oid();
+ block.size = cur_len;
+ block.blockID = fst;
+ std::string dirty = "false";
+ op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
+ }
+ fst += cur_len;
+ } while(fst < lst);
+ } //end-else if delete_marker
+
+ cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
+
+ //invoke update() with dirty flag set to false, to update in-memory metadata for head
+ this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
+
+ if (null_instance) {
+ //restore instance for directory data processing in later steps
+ c_obj->set_instance("null");
+ }
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
+ block.cacheObj.objName = c_obj->get_name();
+ block.size = 0;
+ block.blockID = 0;
+ //non-versioned case
+ if (!c_obj->have_instance()) {
+ //add watch on latest entry, as it can be modified by a put or a del
+ ret = blockDir->watch(dpp, &block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+ }
+ // hash entry for latest version
+ op_ret = blockDir->get(dpp, &block, y);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+ } else {
+ // if this entry is the latest, it could have been overwritten by a newer one
+ if (block.version == e->version) {
+ rgw::d4n::CacheBlock null_block;
+ null_block = block;
+ null_block.cacheObj.objName = "_:null_" + c_obj->get_name();
+ //hash entry for null block
+ op_ret = blockDir->get(dpp, &null_block, y);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl;
+ } else {
+ if (null_block.version == e->version) {
+ block.cacheObj.dirty = false;
+ null_block.cacheObj.dirty = false;
+ //start redis transaction using MULTI
+ blockDir->multi(dpp, y);
+ auto blk_op_ret = blockDir->set(dpp, &block, y);
+ auto null_op_ret = blockDir->set(dpp, &null_block, y);
+ if (blk_op_ret < 0 || null_op_ret < 0) {
+ blockDir->discard(dpp, y);
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to Queue update dirty flag for latest entry/null entry in block directory" << dendl;
+ } else {
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (responses.empty()) {
+ //transaction failed, which means latest hash entry has been modified by a put/del so ignore and do not update the entries
+ ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty which means transaction failed!" << dendl;
+ }
+ }
+ }
+ }
+ } //end-if (block.version == entry->version)
+ } //end - else if op_ret == 0
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = c_obj->get_name(),
+ .bucketName = c_obj->get_bucket()->get_bucket_id(),
+ };
+ //remove the entry from the ordered set using its score, as the object is already cleaned
+ //need not be part of a transaction as it is being removed based on its score which is its creation time.
+ ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+ }
+ }
+ if (c_obj->have_instance()) { //versioned case
+ std::string objName = c_obj->get_oid();
+ if (c_obj->get_instance() == "null") {
+ objName = "_:null_" + c_obj->get_name();
+ }
+ rgw::d4n::CacheBlock instance_block;
+ instance_block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
+ instance_block.cacheObj.objName = objName;
+ instance_block.size = 0;
+ instance_block.blockID = 0;
+ std::string dirty = "false";
+ op_ret = blockDir->update_field(dpp, &instance_block, "dirty", dirty, null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
+ }
+ //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets
+ rgw::d4n::CacheBlock latest_block = block;
+ latest_block.cacheObj.objName = c_obj->get_name();
+ //add watch on latest entry, as it can be modified by a put or a del
+ ret = blockDir->watch(dpp, &latest_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ }
+ int retry = 3;
+ while(retry) {
+ retry--;
+ //get latest entry
+ ret = blockDir->get(dpp, &latest_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ }
+ //start redis transaction using MULTI
+ blockDir->multi(dpp, y);
+ if (latest_block.version == e->version) {
+ //remove object entry from ordered set
+ if (c_obj->have_instance()) {
+ blockDir->del(dpp, &latest_block, y, true);
+ if (ret < 0) {
+ blockDir->discard(dpp, y);
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ continue;
+ }
+ }
+ }
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = c_obj->get_name(),
+ .bucketName = c_obj->get_bucket()->get_bucket_id(),
+ };
+ ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+ if (ret < 0) {
+ blockDir->discard(dpp, y);
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+ continue;
+ }
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (responses.empty()) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
+ continue;
+ }
+ break;
+ }//end-while (retry)
+ }
+ //remove entry from map and queue, erase_dirty_object locks correctly
+ erase_dirty_object(dpp, e->key, null_yield);
}
- //remove entry from map and queue, erase_dirty_object locks correctly
- erase_dirty_object(dpp, e->key, null_yield);
- } else { //end-if std::difftime(time(NULL), e->creationTime) > interval
- std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised?
+ } else if (diff < interval) { //end-if std::difftime(time(NULL), e->creationTime) > interval
+ std::this_thread::sleep_for(std::chrono::seconds(interval - diff)); //TODO:: should this time be optimised?
}
} //end-while true
}
}
void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-const rgw_obj_key& obj_key, optional_yield y)
+const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)
{
const std::lock_guard l(lru_lock);
ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key);