From aae7642e69bb5f9ce66f130dc48869363da9a632 Mon Sep 17 00:00:00 2001 From: Samarah Date: Tue, 15 Oct 2024 18:37:39 +0000 Subject: [PATCH] rgw/d4n: squashing following commits related to lazy deletion - delete dirty objects data blocks in cleaning method and non-dirty objects data blocks in eviction method 1. rgw/d4n: Implement lazy deletion 2. rgw/d4n: cleaning method now supports delete markers also (both versioned and null). The delete markers are written correctly to the backend store. Also modifying the description of rgw_d4n_cache_cleaning_interval to explicitly state that the duration is in seconds. 3. rgw/d4n: do not call invalidate_dirty_object in case of a simple delete request for dirty objects belonging to a versioned bucket. In this case, a delete marker needs to be created instead of invalidating/deleting an object. 4. rgw/d4n: Update lazy deletion in policy Co-authored-by: Pritha Srivastava Added code for creation of delete marker for a simple delete request. Signed-off-by: Samarah Signed-off-by: Pritha Srivastava Commit 2c1adbbd2b363cd5e02fd7da0cb496ca6a93aa77 d4n/policy.cc: Use ceph::split Signed-off-by: Samarah --- src/common/options/rgw.yaml.in | 2 +- src/rgw/driver/d4n/d4n_directory.cc | 10 +- src/rgw/driver/d4n/d4n_policy.cc | 704 ++++++++++++++++++---------- src/rgw/driver/d4n/d4n_policy.h | 23 +- src/rgw/driver/d4n/rgw_sal_d4n.cc | 97 ++-- src/rgw/driver/d4n/rgw_sal_d4n.h | 1 - src/rgw/rgw_cache_driver.h | 5 +- src/rgw/rgw_ssd_driver.cc | 8 +- 8 files changed, 521 insertions(+), 329 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 0d6d1894f36..9ee344c8834 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -4136,7 +4136,7 @@ options: - name: rgw_d4n_cache_cleaning_interval type: int level: advanced - desc: This is the interval for invoking write cache cleaning process + desc: This is the interval in seconds for invoking write cache cleaning process default: 1000 services: - rgw diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index e7d481a29aa..04a760d0f2a 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -351,7 +351,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl if (!multi) { if (std::get<0>(resp).value() != "1") { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; + return -ENOENT; } } @@ -382,7 +382,7 @@ int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int if (std::get<0>(resp).value().empty()) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; - return -EINVAL; + return -ENOENT; } members = std::get<0>(resp).value(); @@ -440,7 +440,7 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const if (!multi) { if (std::get<0>(resp).value() != "1") { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; + return -ENOENT; } } @@ -471,7 +471,7 @@ int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* o if (!multi) { if (std::get<0>(resp).value() == "0") { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl; - return -EINVAL; + return -ENOENT; } } @@ -729,7 +729,7 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string response resp; redis_exec(conn, ec, req, resp, y); - } + } if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl; diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 7a39a0ff5a8..a638309fb01 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -2,7 +2,7 @@ #include "../../../common/async/yield_context.h" #include "common/async/blocked_completion.h" -#include "common/dout.h" +#include "common/split.h" #include "rgw_perf_counters.h" namespace rgw { namespace d4n { @@ -54,8 +54,8 @@ int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_ static auto obj_callback = [this]( const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, - const rgw_obj_key& obj_key, optional_yield y) { - update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y); + const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val) { + update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y, restore_val); }; static auto block_callback = [this]( @@ -243,6 +243,40 @@ asio::awaitable LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, opt } } +/* Changes state to INVALID for dirty objects. An INVALID state indicates that a delete request has been + issued on an object and it must be deleted rather than written to the backend. This lazy deletion occurs + in the Cleaning method and prevents data races during concurrent requests. The method below returns "false" + if the state has not been set to INVALID, and "true" if it has. The state is not set to INVALID when + cleaning is in progress, a process which writes the object to the backend store. */ +bool LFUDAPolicy::invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) { + std::unique_lock l(lfuda_cleaning_lock); + + if (o_entries_map.empty()) + return false; + + auto p = o_entries_map.find(key); + if (p == o_entries_map.end()) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): key=" << key << " not found" << dendl; + return false; + } + + if (p->second.second == State::INIT) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Setting State::INVALID for key=" << key << dendl; + p->second.second = State::INVALID; + int ret = cacheDriver->set_attr(dpp, DIRTY_BLOCK_PREFIX + key, RGW_CACHE_ATTR_INVALID, "1", y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Failed to set xattr, ret=" << ret << dendl; + return false; + } + + return true; + } else if (p->second.second == State::IN_PROGRESS) { + state_cond.wait(l, [this, &key]{ return (o_entries_map.find(key) == o_entries_map.end()); }); + } + + return false; +} + CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optional_yield y) { const std::lock_guard l(lfuda_lock); if (entries_heap.empty()) @@ -252,13 +286,20 @@ CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optiona std::string key = entries_heap.top()->key; CacheBlock* victim = new CacheBlock(); - victim->cacheObj.bucketName = key.substr(0, key.find('_')); - key.erase(0, key.find('_') + 1); - victim->version = key.substr(0, key.find('_')); - key.erase(0, key.find('_') + 1); - victim->cacheObj.objName = key.substr(0, key.find('_')); - victim->blockID = entries_heap.top()->offset; - victim->size = entries_heap.top()->len; + auto parts = split(key, "#"); + std::vector block_info; + block_info.assign(parts.begin(), parts.end()); + + if (block_info.size() != 5) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Key of the top entry in the min heap has not been constructed correctly." << dendl; + return nullptr; + } + + victim->cacheObj.bucketName = block_info[0]; + victim->version = block_info[1]; + victim->cacheObj.objName = block_info[2]; + victim->blockID = std::stoull(block_info[3]); + victim->size = std::stoull(block_info[4]); if (blockDir->get(dpp, victim, y) < 0) { return nullptr; @@ -280,6 +321,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional if (entries_heap.empty()) return 0; + int ret = -1; uint64_t freeSpace = cacheDriver->get_free_space(dpp); while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop? @@ -288,7 +330,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional if (victim == nullptr) { ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl; delete victim; - return 0; // not necessarily an error? -Sam + return -ENOSPC; } const std::lock_guard l(lfuda_lock); @@ -301,7 +343,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional // check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty if (it->second->dirty) { ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl; - return 0; + return -ENOSPC; } int avgWeight = weightSum / entries_map.size(); @@ -311,7 +353,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional (*it->second->handle)->localWeight = it->second->localWeight; entries_heap.decrease(it->second->handle); // larger value means node must be decreased to maintain min heap - if (int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y) < 0) { + if ((ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y)) < 0) { delete victim; return ret; } @@ -326,20 +368,21 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional } victim->globalWeight += it->second->localWeight; - if (int ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) { + if ((ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y)) < 0) { delete victim; return ret; } - if (int ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y) < 0) { + if ((ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) { delete victim; return ret; } delete victim; - if (int ret = cacheDriver->del(dpp, key, y) < 0) + if ((ret = cacheDriver->delete_data(dpp, key, y)) < 0) { return ret; + } ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl; @@ -404,15 +447,24 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, weightSum += ((localWeight < 0) ? 0 : localWeight); } -void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) +void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val) { using handle_type = boost::heap::fibonacci_heap>>::handle_type; + State state{State::INIT}; ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl; + + if (!restore_val.empty() && restore_val == "1") { // No need to set the xattr because this case only occurs when the state has + state = State::INVALID; // been retrieved from the xattr itself. + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): State restored to INVALID." << dendl; + } else { + state = State::INIT; + } + const std::lock_guard l(lfuda_cleaning_lock); LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key}; handle_type handle = object_heap.push(e); e->set_handle(handle); - o_entries_map.emplace(key, e); + o_entries_map.emplace(key, std::make_pair(e, state)); cond.notify_one(); } @@ -441,14 +493,45 @@ bool LFUDAPolicy::erase_dirty_object(const DoutPrefixProvider* dpp, const std::s return false; } - object_heap.erase(p->second->handle); + object_heap.erase(p->second.first->handle); o_entries_map.erase(p); - delete p->second; - p->second = nullptr; + delete p->second.first; + p->second.first = nullptr; + state_cond.notify_one(); return true; } +int LFUDAPolicy::delete_data_blocks(const DoutPrefixProvider* dpp, LFUDAObjEntry* e, optional_yield y) { + off_t lst = e->size, fst = 0, ofs = 0; + uint64_t len = 0; + + do { + if (fst >= lst) { + break; + } + off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); + off_t cur_len = cur_size - fst; + std::string prefix = e->key + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); + std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix; + + int ret = -1; + if ((ret = cacheDriver->delete_data(dpp, oid_in_cache, y)) == 0) { // Sam: do we want del or delete_data here? + if (!(ret = erase(dpp, prefix, y))) { + ldpp_dout(dpp, 0) << "Failed to delete policy entry for: " << oid_in_cache << ", ret=" << ret << dendl; + return -EINVAL; + } + } else { + ldpp_dout(dpp, 0) << "Failed to delete data block " << oid_in_cache << ", ret=" << ret << dendl; + return -EINVAL; + } + + ofs += len; + } while (len > 0); + + return 0; +} + void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) { const int interval = dpp->get_cct()->_conf->rgw_d4n_cache_cleaning_interval; @@ -456,6 +539,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl; uint64_t len = 0; rgw::sal::Attrs obj_attrs; + bool invalid = false; ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock" << dendl; std::unique_lock l(lfuda_cleaning_lock); @@ -474,234 +558,366 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl; l.unlock(); - if (!e->key.empty() && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago - rgw_user c_rgw_user = e->user; - //writing data to the backend - //we need to create an atomic_writer - std::unique_ptr c_user = driver->get_user(c_rgw_user); - - std::unique_ptr c_bucket; - rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, e->bucket_id); - - RGWBucketInfo c_bucketinfo; - c_bucketinfo.bucket = c_rgw_bucket; - c_bucketinfo.owner = c_rgw_user; - int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield); - if (ret < 0) { - ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl; - //Remove bucket should be implemented in d4n which will take care of deleting objects belonging to the bucket, and hence we should not reach here - erase_dirty_object(dpp, e->key, null_yield); - continue; - } - - std::unique_ptr c_obj = c_bucket->get_object(e->obj_key); - ldpp_dout(dpp, 20) << __func__ << "(): c_obj oid =" << c_obj->get_oid() << dendl; - - ACLOwner owner{c_user->get_id(), c_user->get_display_name()}; - - std::unique_ptr processor = driver->get_atomic_writer(dpp, - null_yield, - c_obj.get(), - owner, - NULL, - 0, - ""); - int op_ret = processor->prepare(null_yield); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl; - erase_dirty_object(dpp, e->key, null_yield); + int diff = std::difftime(time(NULL), e->creationTime); + if (!e->key.empty() && (diff > interval)) { // if block is dirty and written more than interval seconds ago + l.lock(); + auto p = o_entries_map.find(e->key); + if (p == o_entries_map.end()) { + l.unlock(); + continue; } - - std::string prefix = url_encode(e->bucket_id) + CACHE_DELIM + url_encode(e->version) + CACHE_DELIM + url_encode(c_obj->get_name()); - off_t lst = e->size; - off_t fst = 0; - off_t ofs = 0; - - rgw::sal::DataProcessor* filter = processor.get(); - std::string head_oid_in_cache = DIRTY_BLOCK_PREFIX + prefix; - std::string new_head_oid_in_cache = prefix; - ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl; - ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl; - bufferlist bl; - cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head - obj_attrs.erase(RGW_CACHE_ATTR_MTIME); - obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE); - obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE); - obj_attrs.erase(RGW_CACHE_ATTR_EPOCH); - - do { - ceph::bufferlist data; - if (fst >= lst){ - break; - } - off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); - off_t cur_len = cur_size - fst; - std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); - ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl; - rgw::sal::Attrs attrs; - cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield); - len = data.length(); - fst += len; - - if (len == 0) { - // TODO: if len of any block is 0 for some reason, we must return from here? - break; - } - - op_ret = filter->process(std::move(data), ofs); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret=" - << op_ret << dendl; - erase_dirty_object(dpp, e->key, null_yield); - } - - ofs += len; - } while (len > 0); - - op_ret = filter->process({}, ofs); - - const req_context rctx{dpp, null_yield, nullptr}; - ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime); - op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs, - std::nullopt, ceph::real_time(), nullptr, nullptr, - nullptr, nullptr, nullptr, - rctx, rgw::sal::FLAG_LOG_OP); - - if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "processor->complete() returned ret=" << op_ret << dendl; - erase_dirty_object(dpp, e->key, null_yield); - } - //invoke update() with dirty flag set to false, to update in-memory metadata for each block - // reset values - lst = e->size; - fst = 0; - do { - if (fst >= lst) { - break; - } - off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); - off_t cur_len = cur_size - fst; - - std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); - ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl; - std::string new_oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); - //Rename block to remove "D" prefix - cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield); - //Update in-memory data structure for each block - this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y); - - rgw::d4n::CacheBlock block; - block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id(); - block.cacheObj.objName = c_obj->get_key().get_oid(); - block.size = cur_len; - block.blockID = fst; - std::string dirty = "false"; - op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield); - if (op_ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl; - } - fst += cur_len; - } while(fst < lst); - - cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield); - - //invoke update() with dirty flag set to false, to update in-memory metadata for head - this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y); - - rgw::d4n::CacheBlock block; - block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id(); - block.cacheObj.objName = c_obj->get_name(); - block.size = 0; - block.blockID = 0; - if (c_obj->have_instance()) { - blockDir->get(dpp, &block, null_yield); - if (block.version == c_obj->get_instance()) { //versioned case - update head block entry that has latest version - std::string dirty = "false"; - op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl; - } - } - } else { //non-versioned case - std::string dirty = "false"; - op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl; - } - } - if (c_obj->have_instance()) { - rgw::d4n::CacheBlock instance_block; - instance_block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id(); - instance_block.cacheObj.objName = c_obj->get_oid(); - instance_block.size = 0; - instance_block.blockID = 0; - op_ret = blockDir->update_field(dpp, &instance_block, "dirty", "false", null_yield); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl; - } + if (p->second.second == State::INVALID) { + invalid = true; } + l.unlock(); + + // If the state is invalid, the blocks must be deleted from the cache rather than written to the backend. + if (invalid) { + ldpp_dout(dpp, 10) << __func__ << "(): State is INVALID; deleting object." << dendl; + int ret = -1; + if ((ret = cacheDriver->delete_data(dpp, DIRTY_BLOCK_PREFIX + e->key, y)) == 0) { // Sam: do we want del or delete_data here? + if (!(ret = erase(dpp, e->key, y))) { + ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << e->key << ", ret=" << ret << dendl; // TODO: what must occur during failure? + } + } else { + ldpp_dout(dpp, 0) << "Failed to delete head object for: " << e->key << ", ret=" << ret << dendl; + } - //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets - if (!c_obj->have_instance()) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; - rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ - .objName = c_obj->get_name(), - .bucketName = c_obj->get_bucket()->get_bucket_id(), - }; - ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl; - } + if (!e->delete_marker) { + ret = delete_data_blocks(dpp, e, y); + if (ret == 0) { + erase_dirty_object(dpp, e->key, null_yield); + } else { + ldpp_dout(dpp, 0) << "Failed to delete blocks for: " << e->key << ", ret=" << ret << dendl; + } + } } else { - rgw::d4n::CacheBlock latest_block = block; - latest_block.cacheObj.objName = c_obj->get_name(); - //add watch on latest entry, as it can be modified by a put or a del - ret = blockDir->watch(dpp, &latest_block, y); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; - } - int retry = 3; - while(retry) { - retry--; - //get latest entry - ret = blockDir->get(dpp, &latest_block, y); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; - } - //start redis transaction using MULTI - blockDir->multi(dpp, y); - if (latest_block.version == e->version) { - //remove object entry from ordered set - if (c_obj->have_instance()) { - blockDir->del(dpp, &latest_block, y, true); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; - } - } - } - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; - rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ - .objName = c_obj->get_name(), - .bucketName = c_obj->get_bucket()->get_bucket_id(), - }; - ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl; - } - std::vector responses; - ret = blockDir->exec(dpp, responses, y); - if (responses.empty()) { - ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl; - continue; - } - break; - }//end-while (retry) + l.lock(); + p->second.second = State::IN_PROGRESS; + l.unlock(); + + rgw_user c_rgw_user = e->user; + //writing data to the backend + //we need to create an atomic_writer + std::unique_ptr c_user = driver->get_user(c_rgw_user); + + std::unique_ptr c_bucket; + rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, e->bucket_id); + + RGWBucketInfo c_bucketinfo; + c_bucketinfo.bucket = c_rgw_bucket; + c_bucketinfo.owner = c_rgw_user; + int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl; + //Remove bucket should be implemented in d4n which will take care of deleting objects belonging to the bucket, and hence we should not reach here + erase_dirty_object(dpp, e->key, null_yield); + continue; + } + + std::unique_ptr c_obj = c_bucket->get_object(e->obj_key); + bool null_instance = (c_obj->get_instance() == "null"); + if (null_instance) { + //clear the instance for backend store + c_obj->clear_instance(); + } + ldpp_dout(dpp, 20) << __func__ << "(): c_obj oid =" << c_obj->get_oid() << dendl; + + ACLOwner owner{c_user->get_id(), c_user->get_display_name()}; + + std::string prefix = url_encode(e->bucket_id) + CACHE_DELIM + url_encode(e->version) + CACHE_DELIM + url_encode(c_obj->get_name()); + std::string head_oid_in_cache = DIRTY_BLOCK_PREFIX + prefix; + std::string new_head_oid_in_cache = prefix; + ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl; + ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl; + int op_ret; + if (e->delete_marker) { + bool null_delete_marker = (c_obj->get_instance() == "null"); + if (null_delete_marker) { + //clear the instance for backend store + c_obj->clear_instance(); + } + std::unique_ptr del_op = c_obj->get_delete_op(); + del_op->params.obj_owner = owner; + del_op->params.bucket_owner = c_bucket->get_owner(); + del_op->params.versioning_status = c_bucket->get_info().versioning_status(); + //populate marker_version_id only when delete marker is not null + del_op->params.marker_version_id = e->version; + op_ret = del_op->delete_obj(dpp, null_yield, rgw::sal::FLAG_LOG_OP); + if (op_ret >= 0) { + bool delete_marker = del_op->result.delete_marker; + std::string version_id = del_op->result.version_id; + ldpp_dout(dpp, 20) << __func__ << "delete_obj delete_marker=" << delete_marker << dendl; + ldpp_dout(dpp, 20) << __func__ << "delete_obj version_id=" << version_id << dendl; + } else { + ldpp_dout(dpp, 20) << __func__ << "delete_obj returned ret=" << op_ret << dendl; + erase_dirty_object(dpp, e->key, null_yield); + continue; + } + if (null_delete_marker) { + //restore instance for directory data processing in later steps + c_obj->set_instance("null"); + } + } else { //end-if delete_marker + + std::unique_ptr processor = driver->get_atomic_writer(dpp, + null_yield, + c_obj.get(), + owner, + NULL, + 0, + ""); + + op_ret = processor->prepare(null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl; + erase_dirty_object(dpp, e->key, null_yield); + continue; + } + + off_t lst = e->size; + off_t fst = 0; + off_t ofs = 0; + + rgw::sal::DataProcessor* filter = processor.get(); + bufferlist bl; + op_ret = cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "cacheDriver->get_attrs returned ret=" << op_ret << dendl; + erase_dirty_object(dpp, e->key, null_yield); + continue; + } + obj_attrs.erase(RGW_CACHE_ATTR_MTIME); + obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_SIZE); + obj_attrs.erase(RGW_CACHE_ATTR_ACCOUNTED_SIZE); + obj_attrs.erase(RGW_CACHE_ATTR_EPOCH); + obj_attrs.erase(RGW_CACHE_ATTR_MULTIPART); + obj_attrs.erase(RGW_CACHE_ATTR_OBJECT_NS); + obj_attrs.erase(RGW_CACHE_ATTR_BUCKET_NAME); + obj_attrs.erase(RGW_CACHE_ATTR_LOCAL_WEIGHT); + + do { + ceph::bufferlist data; + if (fst >= lst){ + break; + } + off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); + off_t cur_len = cur_size - fst; + std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); + ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl; + rgw::sal::Attrs attrs; + cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "cacheDriver->get returned ret=" << op_ret << dendl; + erase_dirty_object(dpp, e->key, null_yield); + continue; + } + len = data.length(); + fst += len; + + if (len == 0) { + // TODO: if len of any block is 0 for some reason, we must return from here? + break; + } + + op_ret = filter->process(std::move(data), ofs); + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret=" + << op_ret << dendl; + erase_dirty_object(dpp, e->key, null_yield); + continue; + } + + ofs += len; + } while (len > 0); + + op_ret = filter->process({}, ofs); + + const req_context rctx{dpp, null_yield, nullptr}; + ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime); + op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs, + std::nullopt, ceph::real_time(), nullptr, nullptr, + nullptr, nullptr, nullptr, + rctx, rgw::sal::FLAG_LOG_OP); + + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "processor->complete() returned ret=" << op_ret << dendl; + erase_dirty_object(dpp, e->key, null_yield); + continue; + } + //invoke update() with dirty flag set to false, to update in-memory metadata for each block + // reset values + lst = e->size; + fst = 0; + do { + if (fst >= lst) { + break; + } + off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); + off_t cur_len = cur_size - fst; + + std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); + ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl; + std::string new_oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); + //Rename block to remove "D" prefix + cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield); + //Update in-memory data structure for each block + this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y); + + rgw::d4n::CacheBlock block; + block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id(); + block.cacheObj.objName = c_obj->get_key().get_oid(); + block.size = cur_len; + block.blockID = fst; + std::string dirty = "false"; + op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl; + } + fst += cur_len; + } while(fst < lst); + } //end-else if delete_marker + + cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield); + + //invoke update() with dirty flag set to false, to update in-memory metadata for head + this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y); + + if (null_instance) { + //restore instance for directory data processing in later steps + c_obj->set_instance("null"); + } + rgw::d4n::CacheBlock block; + block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id(); + block.cacheObj.objName = c_obj->get_name(); + block.size = 0; + block.blockID = 0; + //non-versioned case + if (!c_obj->have_instance()) { + //add watch on latest entry, as it can be modified by a put or a del + ret = blockDir->watch(dpp, &block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << block.cacheObj.objName << ", ret=" << ret << dendl; + } + // hash entry for latest version + op_ret = blockDir->get(dpp, &block, y); + if (op_ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + } else { + // if this entry is the latest, it could have been overwritten by a newer one + if (block.version == e->version) { + rgw::d4n::CacheBlock null_block; + null_block = block; + null_block.cacheObj.objName = "_:null_" + c_obj->get_name(); + //hash entry for null block + op_ret = blockDir->get(dpp, &null_block, y); + if (op_ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl; + } else { + if (null_block.version == e->version) { + block.cacheObj.dirty = false; + null_block.cacheObj.dirty = false; + //start redis transaction using MULTI + blockDir->multi(dpp, y); + auto blk_op_ret = blockDir->set(dpp, &block, y); + auto null_op_ret = blockDir->set(dpp, &null_block, y); + if (blk_op_ret < 0 || null_op_ret < 0) { + blockDir->discard(dpp, y); + ldpp_dout(dpp, 0) << __func__ << "(): Failed to Queue update dirty flag for latest entry/null entry in block directory" << dendl; + } else { + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (responses.empty()) { + //transaction failed, which means latest hash entry has been modified by a put/del so ignore and do not update the entries + ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty which means transaction failed!" << dendl; + } + } + } + } + } //end-if (block.version == entry->version) + } //end - else if op_ret == 0 + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = c_obj->get_name(), + .bucketName = c_obj->get_bucket()->get_bucket_id(), + }; + //remove the entry from the ordered set using its score, as the object is already cleaned + //need not be part of a transaction as it is being removed based on its score which is its creation time. + ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl; + } + } + if (c_obj->have_instance()) { //versioned case + std::string objName = c_obj->get_oid(); + if (c_obj->get_instance() == "null") { + objName = "_:null_" + c_obj->get_name(); + } + rgw::d4n::CacheBlock instance_block; + instance_block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id(); + instance_block.cacheObj.objName = objName; + instance_block.size = 0; + instance_block.blockID = 0; + std::string dirty = "false"; + op_ret = blockDir->update_field(dpp, &instance_block, "dirty", dirty, null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl; + } + //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets + rgw::d4n::CacheBlock latest_block = block; + latest_block.cacheObj.objName = c_obj->get_name(); + //add watch on latest entry, as it can be modified by a put or a del + ret = blockDir->watch(dpp, &latest_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + } + int retry = 3; + while(retry) { + retry--; + //get latest entry + ret = blockDir->get(dpp, &latest_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + } + //start redis transaction using MULTI + blockDir->multi(dpp, y); + if (latest_block.version == e->version) { + //remove object entry from ordered set + if (c_obj->have_instance()) { + blockDir->del(dpp, &latest_block, y, true); + if (ret < 0) { + blockDir->discard(dpp, y); + ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + continue; + } + } + } + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = c_obj->get_name(), + .bucketName = c_obj->get_bucket()->get_bucket_id(), + }; + ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true); + if (ret < 0) { + blockDir->discard(dpp, y); + ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl; + continue; + } + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (responses.empty()) { + ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl; + continue; + } + break; + }//end-while (retry) + } + //remove entry from map and queue, erase_dirty_object locks correctly + erase_dirty_object(dpp, e->key, null_yield); } - //remove entry from map and queue, erase_dirty_object locks correctly - erase_dirty_object(dpp, e->key, null_yield); - } else { //end-if std::difftime(time(NULL), e->creationTime) > interval - std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised? + } else if (diff < interval) { //end-if std::difftime(time(NULL), e->creationTime) > interval + std::this_thread::sleep_for(std::chrono::seconds(interval - diff)); //TODO:: should this time be optimised? } } //end-while true } @@ -746,7 +962,7 @@ void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, ui } void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, -const rgw_obj_key& obj_key, optional_yield y) +const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val) { const std::lock_guard l(lru_lock); ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index e4c826aafef..6b8c60666fc 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -17,6 +17,12 @@ namespace sys = boost::system; static std::string empty = std::string(); +enum class State { // state machine for dirty objects in the cache + INIT, + IN_PROGRESS, // object is being written to the backend + INVALID // object is to be deleted during cleanup +}; + class CachePolicy { protected: struct Entry : public boost::intrusive::list_base_hook<> { @@ -65,9 +71,10 @@ class CachePolicy { virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0; virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, - const rgw_obj_key& obj_key, optional_yield y) = 0; + const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) = 0; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; + virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) = 0; virtual void cleaning(const DoutPrefixProvider* dpp) = 0; }; @@ -125,10 +132,11 @@ class LFUDAPolicy : public CachePolicy { Heap entries_heap; Object_Heap object_heap; //This heap contains dirty objects ordered by their creation time, used for cleaning method std::unordered_map entries_map; - std::unordered_map o_entries_map; //Contains only dirty objects, used for look-up + std::unordered_map > o_entries_map; //Contains only dirty objects, used for look-up std::mutex lfuda_lock; std::mutex lfuda_cleaning_lock; std::condition_variable cond; + std::condition_variable state_cond; bool quit{false}; int age = 1, weightSum = 0, postedSum = 0; @@ -158,6 +166,7 @@ class LFUDAPolicy : public CachePolicy { return nullptr; return it->second; } + int delete_data_blocks(const DoutPrefixProvider* dpp, LFUDAObjEntry* e, optional_yield y); public: LFUDAPolicy(std::shared_ptr& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), @@ -184,15 +193,16 @@ class LFUDAPolicy : public CachePolicy { void save_y(optional_yield y) { this->y = y; } virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, - const rgw_obj_key& obj_key, optional_yield y) override; - virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y); + const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) override; + virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; + virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override; virtual void cleaning(const DoutPrefixProvider* dpp) override; LFUDAObjEntry* find_obj_entry(const std::string& key) { auto it = o_entries_map.find(key); if (it == o_entries_map.end()) { return nullptr; } - return it->second; + return it->second.first; } }; @@ -217,9 +227,10 @@ class LRUPolicy : public CachePolicy { virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override; virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, - const rgw_obj_key& obj_key, optional_yield y) override; + const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; + virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override { return false; } virtual void cleaning(const DoutPrefixProvider* dpp) override {} }; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 6626c4690e3..bd101bd27f6 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -774,6 +774,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: .cacheObj = version_object, .blockID = 0, .version = this->get_object_version(), + .deleteMarker = this->delete_marker, .size = 0, }; @@ -1997,22 +1998,6 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl return 0; } -int D4NFilterObject::D4NFilterDeleteOp::delete_from_cache_and_policy(const DoutPrefixProvider* dpp, std::string oid, - std::string prefix, optional_yield y) -{ - int ret = -1; - - if ((ret = source->driver->get_cache_driver()->delete_data(dpp, oid, y)) == 0) { // Sam: do we want del or delete_data here? - if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->erase(dpp, prefix, y))) { - ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << source->get_key().get_oid() << ", ret=" << ret << dendl; - } - } else { - ldpp_dout(dpp, 0) << "Failed to delete head object for: " << source->get_key().get_oid() << ", ret=" << ret << dendl; - } - - return ret; -} - int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) { @@ -2026,7 +2011,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp /* check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker. As a result, the below check guarantees the head object is not in the cache. */ if (!source->check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y) && !block.deleteMarker) { - /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a dlete marker there + /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a delete marker there since no object with source->get_name() will be found in the cache (and this is correct) */ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): head object not found; calling next->delete_obj" << dendl; next->params = params; @@ -2039,11 +2024,6 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp auto objDir = source->driver->get_obj_dir(); std::string policy_prefix = head_oid_in_cache; std::string version = source->get_object_version(); - - // call invalidate_object based on whether the object is dirty(objDirty) - //if the object is still dirty and has been marked invalid, then let objDirty be true, else set it to false - //remove code that deletes head/data block in this method. - std::string objName = source->get_name(); // special handling for name starting with '_' if (objName[0] == '_') { @@ -2051,8 +2031,16 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp } if (objDirty) { // head object dirty flag represents object dirty flag - policy_prefix.erase(0, 2); // remove "D_" prefix from policy key since the policy keys do not hold this information - } + //for versioned buckets, for a simple delete we need to create a delete marker (and not invalidate/delete any object) + if (!source->get_bucket()->versioned() || (block.cacheObj.objName != source->get_name())) { + policy_prefix.erase(0, 2); // remove "D_" prefix from policy key since the policy keys do not hold this information + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): calling invalidate_dirty_object for: " << head_oid_in_cache << dendl; + if (!source->driver->get_policy_driver()->get_cache_policy()->invalidate_dirty_object(dpp, policy_prefix)) { + objDirty = false; + } + } + } + // Versioned buckets - this will delete the head object indexed by version-id (even null) and latest en if (source->get_bucket()->versioned()) { /* 1. clean objects - no latest head entry as latest entry to be retrieved from backend now @@ -2189,12 +2177,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp //start redis transaction using MULTI blockDir->multi(dpp, y); } - if ((ret = blockDir->del(dpp, &block, y, true)) == 0) { - if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) { - blockDir->discard(dpp, y); - return ret; - } - } else if (ret < 0 && ret != -ENOENT) { + if ((ret = blockDir->del(dpp, &block, y, true)) < 0 && ret != -ENOENT) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; blockDir->discard(dpp, y); return ret; @@ -2236,14 +2219,13 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp /* Non-versioned buckets - we will delete the latest entry and the "null" entry dirty objects - delete "null" entry from ordered set also */ if (!source->get_bucket()->versioned()) { - if ((ret = blockDir->del(dpp, &block, y)) == 0) { - if (objDirty) { - if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) { - return ret; - } - } - } else if (ret < 0 && ret != -ENOENT) { - ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + //start redis transaction using MULTI to delete the latest entry and the "null" entry together + blockDir->multi(dpp, y); + //explore redis pipelining to send the two 'DEL' commands together in a single request + ret = blockDir->del(dpp, &block, y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue delete head object op in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); return ret; } //if we get request for latest head entry, delete the null block and vice versa @@ -2253,8 +2235,8 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp block.cacheObj.objName = source->get_name(); } if ((ret = blockDir->del(dpp, &block, y)) < 0) { - ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; - return ret; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); } //dirty objects - delete from ordered set if (objDirty) { @@ -2275,11 +2257,6 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl; return ret; } - if (objDirty) { - if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) { - return ret; - } - } } //end-if non-versioned buckets int size; @@ -2334,43 +2311,25 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp } } - if ((ret = blockDir->del(dpp, &block, y)) == 0) { - prefix = DIRTY_BLOCK_PREFIX + prefix; - std::string oid_in_cache = prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); - - if (objDirty) { - std::string key = policy_prefix + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len); - if ((ret = delete_from_cache_and_policy(dpp, oid_in_cache, key, y)) < 0) { - ldpp_dout(dpp, 0) << "ERROR for block " << source->get_name() << " blockID: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl; - return ret; - } + if ((ret = blockDir->del(dpp, &block, y)) == -ENOENT) { + continue; + } else if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl; + return ret; } - } else if (ret == -ENOENT) { - continue; - } else { - ldpp_dout(dpp, 0) << "Failed to delete directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl; - return ret; - } fst += cur_len; } while (fst < lst); } - std::string key = policy_prefix; if (!objDirty) { next->params = params; ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object is not dirty; calling next->delete_obj" << dendl; ret = next->delete_obj(dpp, y, flags); result = next->result; return ret; - } else { - if (!(ret = source->driver->get_policy_driver()->get_cache_policy()->erase_dirty_object(dpp, key, y))) { - ldpp_dout(dpp, 0) << "Failed to delete policy object entry for: " << source->get_name() << ", ret=" << ret << dendl; - return -ENOENT; - } else { - return 0; - } } + return 0; } } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 85be4675cf4..4a9b7582aca 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -203,7 +203,6 @@ class D4NFilterObject : public FilterObject { virtual ~D4NFilterDeleteOp() = default; virtual int delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) override; - int delete_from_cache_and_policy(const DoutPrefixProvider* dpp, std::string oid, std::string prefix, optional_yield y); }; D4NFilterObject(std::unique_ptr _next, D4NFilterDriver* _driver) : FilterObject(std::move(_next)), diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index 6d416bc20d9..dd600f7a7ae 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -14,15 +14,16 @@ constexpr char RGW_CACHE_ATTR_VERSION_ID[] = "user.rgw.version_id"; constexpr char RGW_CACHE_ATTR_SOURC_ZONE[] = "user.rgw.source_zone"; constexpr char RGW_CACHE_ATTR_LOCAL_WEIGHT[] = "user.rgw.localWeight"; constexpr char RGW_CACHE_ATTR_DELETE_MARKER[] = "user.rgw.deleteMarker"; +constexpr char RGW_CACHE_ATTR_INVALID[] = "user.rgw.invalid"; constexpr char DIRTY_BLOCK_PREFIX[] = "D#"; constexpr char CACHE_DELIM = '#'; namespace rgw { namespace cache { -typedef std::function ObjectDataCallback; + const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)> ObjectDataCallback; typedef std::function BlockDataCallback; diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc index 095925091dd..d4f6438d4f4 100644 --- a/src/rgw/rgw_ssd_driver.cc +++ b/src/rgw/rgw_ssd_driver.cc @@ -279,6 +279,7 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC uint64_t len = 0, offset = 0; std::string localWeightStr; + std::string invalidStr; if (parts.size() == 2) { rgw::sal::Attrs attrs; get_attrs(dpp, file_entry.path(), attrs, null_yield); @@ -340,8 +341,13 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): deleteMarker: " << deleteMarker << dendl; } + if (attrs.find(RGW_CACHE_ATTR_INVALID) != attrs.end()) { + invalidStr = attrs[RGW_CACHE_ATTR_INVALID].to_str(); + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): invalidStr: " << invalidStr << dendl; + } + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): calling func for: " << key << dendl; - obj_func(dpp, key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield); + obj_func(dpp, key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield, invalidStr); block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr); parsed = true; } //end-if part.size() == 2 -- 2.39.5