From: Pritha Srivastava Date: Wed, 20 Mar 2024 04:14:50 +0000 (+0530) Subject: rgw/d4n: squashing all commits related to caching head in the X-Git-Tag: testing/wip-rishabh-testing-20250426.123842-debug~14^2~35 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d5bcd3a6afbb02083e39126c5052792c8dc129f9;p=ceph-ci.git rgw/d4n: squashing all commits related to caching head in the write-back workflow, modifying set_obj_attrs(), get_obj_attrs() and delete_obj_attrs based on the cached head and modifying the cleaning method to use a min-heap data structure for storing dirty objects only. 1. rgw/d4n: implementation for caching head object in write-back workflow. 2. rgw/d4n: modifications to get write back cache working after cleaning process. 3. rgw/d4n: modifications for eviction of dirty blocks. 4. rgw/d4n: modifications include adding a heap of dirty objects which has objects ordered by their creation time and the top element of which is fetched in the cleaning method, processed and deleted in a loop. 5. rgw/d4n: changing the format of cached blocks to bucket_name_version_object_name_ofs_len, to avoid checks for versioned and non-versioned objects. 6. rgw/d4n: modifications to set_obj_attrs(), modify_obj_attrs() and delete_obj_attrs() to check if the head object exists in a cache, else direct the calls to backend store. 7. rgw/d4n: handling version in case of bucket versioning being suspended while writing the object. Co-authored by: Samarah Changed dynamic_cast to static_cast for D4NFilterObject in D4NFilterWriter class constructors. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 0be1d178467..96dac74890e 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -311,11 +311,11 @@ std::string BlockDirectory::build_index(CacheBlock* block) return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); } -int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) +int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); response resp; - + ldpp_dout(dpp, 10) << __func__ << "(): index is: " << key << dendl; try { boost::system::error_code ec; request req; @@ -330,7 +330,7 @@ int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) return std::get<0>(resp).value(); } -int BlockDirectory::set(CacheBlock* block, optional_yield y) +int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); @@ -368,6 +368,12 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y) redisValues.push_back("creationTime"); redisValues.push_back(block->cacheObj.creationTime); redisValues.push_back("dirty"); + if (block->cacheObj.dirty == true || block->cacheObj.dirty == 1) { + block->cacheObj.dirty = 1; + } + if (block->cacheObj.dirty == false || block->cacheObj.dirty == 0) { + block->cacheObj.dirty = 0; + } redisValues.push_back(std::to_string(block->cacheObj.dirty)); redisValues.push_back("objHosts"); @@ -402,11 +408,13 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y) return 0; } -int BlockDirectory::get(CacheBlock* block, optional_yield y) +int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); - if (exist_key(block, y)) { + ldpp_dout(dpp, 10) << __func__ << "(): index is: " << key << dendl; + + if (exist_key(dpp, block, y)) { std::vector fields; fields.push_back("blockID"); @@ -439,7 +447,6 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y) block->version = std::get<0>(resp).value()[1]; block->size = boost::lexical_cast(std::get<0>(resp).value()[2]); block->globalWeight = boost::lexical_cast(std::get<0>(resp).value()[3]); - { std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[4])); block->hostsList.clear(); @@ -456,7 +463,6 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y) block->cacheObj.creationTime = std::get<0>(resp).value()[7]; block->cacheObj.dirty = boost::lexical_cast(std::get<0>(resp).value()[8]); block->dirty = boost::lexical_cast(std::get<0>(resp).value()[8]); - { std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[9])); block->cacheObj.hostsList.clear(); @@ -478,13 +484,13 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y) } /* Note: This method is not compatible for use on Ubuntu systems. */ -int BlockDirectory::copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) +int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) { std::string key = build_index(block); auto copyBlock = CacheBlock{ .cacheObj = { .objName = copyName, .bucketName = copyBucketName }, .blockID = 0 }; std::string copyKey = build_index(©Block); - if (exist_key(block, y)) { + if (exist_key(dpp, block, y)) { try { response resp; @@ -522,11 +528,11 @@ int BlockDirectory::copy(CacheBlock* block, std::string copyName, std::string co } } -int BlockDirectory::del(CacheBlock* block, optional_yield y) +int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); - if (exist_key(block, y)) { + if (exist_key(dpp, block, y)) { try { boost::system::error_code ec; request req; @@ -548,11 +554,11 @@ int BlockDirectory::del(CacheBlock* block, optional_yield y) } } -int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value, optional_yield y) +int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y) { std::string key = build_index(block); - if (exist_key(block, y)) { + if (exist_key(dpp, block, y)) { try { /* Ensure field exists */ { @@ -589,7 +595,14 @@ int BlockDirectory::update_field(CacheBlock* block, std::string field, std::stri std::get<0>(resp).value() += value; value = std::get<0>(resp).value(); } - + if (field == "dirty") { + if (value == "true" || value == "1") { + value = "1"; + } + if (value == "false" || value == "0") { + value = "0"; + } + } { boost::system::error_code ec; request req; @@ -612,11 +625,11 @@ int BlockDirectory::update_field(CacheBlock* block, std::string field, std::stri } } -int BlockDirectory::remove_host(CacheBlock* block, std::string delValue, optional_yield y) +int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& delValue, optional_yield y) { std::string key = build_index(block); - if (exist_key(block, y)) { + if (exist_key(dpp, block, y)) { try { /* Ensure field exists */ { @@ -649,7 +662,7 @@ int BlockDirectory::remove_host(CacheBlock* block, std::string delValue, optiona } if (std::get<0>(resp).value().find("_") == std::string::npos) /* Last host, delete entirely */ - return del(block, y); + return del(dpp, block, y); std::string result = std::get<0>(resp).value(); auto it = result.find(delValue); diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index be7bde82691..9a7a0dfe3c6 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -18,7 +18,7 @@ struct CacheObj { std::string objName; /* S3 object name */ std::string bucketName; /* S3 bucket name */ std::string creationTime; /* Creation time of the S3 Object */ - bool dirty; + bool dirty{false}; std::vector hostsList; /* List of hostnames of object locations for multiple backends */ }; @@ -26,7 +26,7 @@ struct CacheBlock { CacheObj cacheObj; uint64_t blockID; std::string version; - bool dirty; + bool dirty{false}; uint64_t size; /* Block size in bytes */ int globalWeight = 0; /* LFUDA policy variable */ std::vector hostsList; /* List of hostnames of block locations */ @@ -67,14 +67,14 @@ class BlockDirectory: public Directory { void init(CephContext* cct) { this->cct = cct; } - int exist_key(CacheBlock* block, optional_yield y); - - int set(CacheBlock* block, optional_yield y); - int get(CacheBlock* block, optional_yield y); - int copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y); - int del(CacheBlock* block, optional_yield y); - int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y); - int remove_host(CacheBlock* block, std::string value, optional_yield y); + int exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); + + int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); + int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); + int copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y); + int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); + int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y); + int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y); private: std::shared_ptr conn; diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index b75e1c6a423..878245c029d 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -258,7 +258,7 @@ CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optiona victim->blockID = entries_heap.top()->offset; victim->size = entries_heap.top()->len; - if (dir->get(victim, y) < 0) { + if (dir->get(dpp, victim, y) < 0) { return nullptr; } @@ -279,7 +279,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop? CacheBlock* victim = get_victim_block(dpp, y); - + if (victim == nullptr) { ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl; delete victim; @@ -293,7 +293,11 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional delete victim; return -ENOENT; } - + // check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty + if (it->second->dirty) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl; + return -ENOENT; + } int avgWeight = weightSum / entries_map.size(); if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */ @@ -308,7 +312,8 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional } victim->globalWeight = 0; - if (int ret = dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) { + auto globalWeight = std::to_string(victim->globalWeight); + if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) { delete victim; return ret; } @@ -321,12 +326,13 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional } victim->globalWeight += it->second->localWeight; - if (int ret = dir->update_field(victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) { + auto globalWeight = std::to_string(victim->globalWeight); + if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) { delete victim; return ret; } - if (int ret = dir->remove_host(victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) { + if (int ret = dir->remove_host(dpp, victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) { delete victim; return ret; } @@ -349,43 +355,55 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional return 0; } -void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) +void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) { using handle_type = boost::heap::fibonacci_heap>>::handle_type; const std::lock_guard l(lfuda_lock); int localWeight = age; auto entry = find_entry(key); - if (entry != nullptr) { - localWeight = entry->localWeight + age; + bool updateLocalWeight = true; + // check the dirty flag in the existing entry for the key and the incoming dirty flag. If the + // incoming dirty flag is false, that means update() is invoked as part of cleaning process, + // so we must not update its localWeight. + if (entry != nullptr) { + if (entry->dirty && !dirty) { + localWeight = entry->localWeight; + updateLocalWeight = false; + } else { + localWeight = entry->localWeight + age; + } } - erase(dpp, key, y); - - LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, creationTime, user, localWeight); + LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, localWeight); handle_type handle = entries_heap.push(e); e->set_handle(handle); entries_map.emplace(key, e); std::string oid_in_cache = key; - if (dirty == true) - oid_in_cache = "D_"+key; + if (dirty == true) { + oid_in_cache = "D_" + key; + } - if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl; + if (updateLocalWeight) { + if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl; + } weightSum += ((localWeight < 0) ? 0 : localWeight); } -void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) +void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) { - eraseObj(dpp, key, y); - - const std::lock_guard l(lfuda_lock); - LFUDAObjEntry *e = new LFUDAObjEntry(key, version, dirty, size, creationTime, user, etag); + using handle_type = boost::heap::fibonacci_heap>>::handle_type; + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock." << dendl; + const std::lock_guard l(lfuda_cleaning_lock); + LFUDAObjEntry *e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key}; + handle_type handle = object_heap.push(e); + e->set_handle(handle); o_entries_map.emplace(key, e); + cond.notify_one(); } - bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) { auto p = entries_map.find(key); @@ -403,13 +421,15 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) { - const std::lock_guard l(lfuda_lock); + const std::lock_guard l(lfuda_cleaning_lock); auto p = o_entries_map.find(key); if (p == o_entries_map.end()) { return false; } + object_heap.erase(p->second->handle); o_entries_map.erase(p); + delete p->second; return true; } @@ -417,131 +437,191 @@ bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) { const int interval = cct->_conf->rgw_d4n_cache_cleaning_interval; - while(true){ + while(!quit) { ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl; std::string name = ""; std::string b_name = ""; std::string key = ""; uint64_t len = 0; rgw::sal::Attrs obj_attrs; - int count = 0; - - for (auto it = o_entries_map.begin(); it != o_entries_map.end(); it++){ - if ((it->second->dirty == true) && (std::difftime(time(NULL), it->second->creationTime) > interval)){ //if block is dirty and written more than interval seconds ago - name = it->first; - rgw_user c_rgw_user = it->second->user; - - size_t pos = 0; - std::string delimiter = "_"; - while ((pos = name.find(delimiter)) != std::string::npos) { - if (count == 0){ - b_name = name.substr(0, pos); - name.erase(0, pos + delimiter.length()); - } - count ++; - } - key = name; - - //writing data to the backend - //we need to create an atomic_writer - rgw_obj_key c_obj_key = rgw_obj_key(key); - std::unique_ptr c_user = driver->get_user(c_rgw_user); - - std::unique_ptr c_bucket; - rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, b_name, ""); - - RGWBucketInfo c_bucketinfo; - c_bucketinfo.bucket = c_rgw_bucket; - c_bucketinfo.owner = c_rgw_user; - - - int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield); - if (ret < 0) { - ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl; - break; + + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "" << __LINE__ << "(): Before acquiring cleaning-lock." << dendl; + std::unique_lock l(lfuda_cleaning_lock); + LFUDAObjEntry* e; + if (object_heap.size() > 0) { + e = object_heap.top(); + } else { + cond.wait(l, [this]{ return (!object_heap.empty() || quit); }); + continue; + } + ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl; + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl; + l.unlock(); + if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago + name = e->key; + rgw_user c_rgw_user = e->user; + + size_t pos = 0; + std::string delimiter = "_"; + int count = 0; + while ((pos = name.find(delimiter)) != std::string::npos) { + if (count == 0) { + b_name = name.substr(0, pos); + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): b_name=" << b_name << dendl; + name.erase(0, pos + delimiter.length()); + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): name=" << name << dendl; + break; } + count++; + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): count=" << b_name << dendl; + } + key = name; + //writing data to the backend + //we need to create an atomic_writer + std::unique_ptr c_user = driver->get_user(c_rgw_user); + + std::unique_ptr c_bucket; + rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, e->bucket_name, ""); + + RGWBucketInfo c_bucketinfo; + c_bucketinfo.bucket = c_rgw_bucket; + c_bucketinfo.owner = c_rgw_user; + int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl; + break; + } + + std::unique_ptr c_obj = c_bucket->get_object(e->obj_key); - std::unique_ptr c_obj = c_bucket->get_object(c_obj_key); + ACLOwner owner{c_user->get_id(), c_user->get_display_name()}; - ACLOwner owner{c_user->get_id(), c_user->get_display_name()}; + std::unique_ptr processor = driver->get_atomic_writer(dpp, + null_yield, + c_obj.get(), + owner, + NULL, + 0, + ""); - std::unique_ptr processor = driver->get_atomic_writer(dpp, - null_yield, - c_obj.get(), - owner, - NULL, - 0, - ""); + int op_ret = processor->prepare(null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl; + break; + } - int op_ret = processor->prepare(null_yield); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << "processor->prepare() returned ret=" << op_ret << dendl; - break; - } + std::string prefix = b_name + "_" + e->version + "_" + c_obj->get_name(); + off_t lst = e->size; + off_t fst = 0; + off_t ofs = 0; + + rgw::sal::DataProcessor *filter = processor.get(); + std::string head_oid_in_cache = "D_" + prefix; + std::string new_head_oid_in_cache = prefix; + ldpp_dout(dpp, 10) << __func__ << "(): head_oid_in_cache=" << head_oid_in_cache << dendl; + ldpp_dout(dpp, 10) << __func__ << "(): new_head_oid_in_cache=" << new_head_oid_in_cache << dendl; + bufferlist bl; + cacheDriver->get_attrs(dpp, head_oid_in_cache, obj_attrs, null_yield); //get obj attrs from head + obj_attrs.erase("user.rgw.mtime"); + obj_attrs.erase("user.rgw.object_size"); + obj_attrs.erase("user.rgw.accounted_size"); + obj_attrs.erase("user.rgw.epoch"); + + do { + ceph::bufferlist data; + if (fst >= lst){ + break; + } + off_t cur_size = std::min(fst + cct->_conf->rgw_max_chunk_size, lst); + off_t cur_len = cur_size - fst; + std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len); + ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl; + rgw::sal::Attrs attrs; + cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, attrs, null_yield); + len = data.length(); + fst += len; + + if (len == 0) { + // TODO: if len of any block is 0 for some reason, we must return from here? + break; + } - std::string prefix = b_name+"_"+key; - off_t lst = it->second->size; - off_t fst = 0; - off_t ofs = 0; + op_ret = filter->process(std::move(data), ofs); + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret=" + << op_ret << dendl; + return; + } - - rgw::sal::DataProcessor *filter = processor.get(); - do { - ceph::bufferlist data; - if (fst >= lst){ - break; - } - off_t cur_size = std::min(fst + cct->_conf->rgw_max_chunk_size, lst); - off_t cur_len = cur_size - fst; - std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len); - std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len); - cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, obj_attrs, null_yield); - len = data.length(); - fst += len; - - if (len == 0) { - break; - } - - op_ret = filter->process(std::move(data), ofs); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << "processor->process() returned ret=" - << op_ret << dendl; - return; - } - - rgw::d4n::CacheBlock block; - block.cacheObj.bucketName = c_obj->get_bucket()->get_name(); - block.cacheObj.objName = c_obj->get_key().get_oid(); - block.size = len; - block.blockID = ofs; - op_ret = dir->update_field(&block, "dirty", "false", null_yield); - if (op_ret < 0) { - ldpp_dout(dpp, 20) << "updating dirty flag in Block directory failed!" << dendl; - return; - } - - cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield); - - ofs += len; - } while (len > 0); - - op_ret = filter->process({}, ofs); - - const req_context rctx{dpp, null_yield, nullptr}; - ceph::real_time mtime = ceph::real_clock::from_time_t(it->second->creationTime); - op_ret = processor->complete(lst, it->second->etag, &mtime, ceph::real_clock::from_time_t(it->second->creationTime), obj_attrs, - std::nullopt, ceph::real_time(), nullptr, nullptr, - nullptr, nullptr, nullptr, - rctx, rgw::sal::FLAG_LOG_OP); - - //data is clean now, updating in-memory metadata - it->second->dirty = false; + ofs += len; + } while (len > 0); + + op_ret = filter->process({}, ofs); + + const req_context rctx{dpp, null_yield, nullptr}; + ceph::real_time mtime = ceph::real_clock::from_time_t(e->creationTime); + op_ret = processor->complete(lst, e->etag, &mtime, ceph::real_clock::from_time_t(e->creationTime), obj_attrs, + std::nullopt, ceph::real_time(), nullptr, nullptr, + nullptr, nullptr, nullptr, + rctx, rgw::sal::FLAG_LOG_OP); + + //invoke update() with dirty flag set to false, to update in-memory metadata for each block + // reset values + lst = e->size; + fst = 0; + do { + if (fst >= lst) { + break; + } + off_t cur_size = std::min(fst + cct->_conf->rgw_max_chunk_size, lst); + off_t cur_len = cur_size - fst; + + std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len); + ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl; + std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len); + //Rename block to remove "D" prefix + cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield); + //Update in-memory data structure for each block + this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y); + + rgw::d4n::CacheBlock block; + block.cacheObj.bucketName = c_obj->get_bucket()->get_name(); + block.cacheObj.objName = c_obj->get_key().get_oid(); + block.size = cur_len; + block.blockID = fst; + std::string dirty = "false"; + op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 5) << __func__ << "updating dirty flag in Block directory failed!" << dendl; + return; + } + fst += cur_len; + } while(fst < lst); + + cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield); + //data is clean now, updating in-memory metadata for an object + e->dirty = false; + //invoke update() with dirty flag set to false, to update in-memory metadata for head + this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y); + + rgw::d4n::CacheBlock block; + block.cacheObj.bucketName = c_obj->get_bucket()->get_name(); + block.cacheObj.objName = c_obj->get_name(); + block.size = 0; + block.blockID = 0; + std::string dirty = "false"; + op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl; + return; } - } - std::this_thread::sleep_for(std::chrono::milliseconds(interval)); - } + //remove entry from map and queue, eraseObj locks correctly + eraseObj(dpp, e->key, null_yield); + } else { //end-if std::difftime(time(NULL), e->creationTime) > interval + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised? + } + } //end-while true } @@ -575,20 +655,19 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y return 0; } -void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) +void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) { const std::lock_guard l(lru_lock); _erase(dpp, key, y); - Entry *e = new Entry(key, offset, len, version, dirty, creationTime, user); + Entry* e = new Entry(key, offset, len, version, dirty); entries_lru_list.push_back(*e); entries_map.emplace(key, e); } -void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) +void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) { - eraseObj(dpp, key, y); const std::lock_guard l(lru_lock); - ObjEntry *e = new ObjEntry(key, version, dirty, size, creationTime, user, etag); + ObjEntry* e = new ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key); o_entries_map.emplace(key, e); return; } @@ -600,25 +679,25 @@ bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, opt return _erase(dpp, key, y); } -bool LRUPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) +bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) { - auto p = entries_map.find(key); - if (p == entries_map.end()) { + const std::lock_guard l(lru_lock); + auto p = o_entries_map.find(key); + if (p == o_entries_map.end()) { return false; } - entries_map.erase(p); - entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer()); + o_entries_map.erase(p); return true; } -bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) +bool LRUPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) { - const std::lock_guard l(lru_lock); - auto p = o_entries_map.find(key); - if (p == o_entries_map.end()) { + auto p = entries_map.find(key); + if (p == entries_map.end()) { return false; } - o_entries_map.erase(p); + entries_map.erase(p); + entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer()); return true; } diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 61a8743a172..6301028b1ca 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -27,21 +27,19 @@ class CachePolicy { uint64_t len; std::string version; bool dirty; - time_t creationTime; - rgw_user user; - Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, rgw_user user) : key(key), offset(offset), - len(len), version(version), dirty(dirty), creationTime(creationTime), user(user) {} + Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty) : key(key), offset(offset), + len(len), version(version), dirty(dirty) {} }; //The disposer object function struct Entry_delete_disposer { - void operator()(Entry *e) { + void operator()(Entry* e) { delete e; } }; - struct ObjEntry : public boost::intrusive::list_base_hook<> { + struct ObjEntry { std::string key; std::string version; bool dirty; @@ -49,24 +47,21 @@ class CachePolicy { time_t creationTime; rgw_user user; std::string etag; - ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag) {} - }; - - struct ObjEntry_delete_disposer { - void operator()(ObjEntry *e) { - delete e; - } + std::string bucket_name; + rgw_obj_key obj_key; + ObjEntry() = default; + ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag), bucket_name(bucket_name), obj_key(obj_key) {} }; public: CachePolicy() {} virtual ~CachePolicy() = default; - virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) = 0; + virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0; virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) = 0; - virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) = 0; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) = 0; + virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) = 0; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual void cleaning(const DoutPrefixProvider* dpp) = 0; @@ -77,29 +72,56 @@ class LFUDAPolicy : public CachePolicy { template struct EntryComparator { bool operator()(T* const e1, T* const e2) const { - return e1->localWeight > e2->localWeight; + // order the min heap using localWeight and dirty flag so that dirty blocks are at the bottom + if ((e1->dirty && e2->dirty) || (!e1->dirty && !e2->dirty)) { + return e1->localWeight > e2->localWeight; + } else if (e1->dirty && !e2->dirty){ + return true; + } else if (!e1->dirty && e2->dirty) { + return false; + } else { + return e1->localWeight > e2->localWeight; + } } }; + template + struct ObjectComparator { + bool operator()(T* const e1, T* const e2) const { + // order the min heap using creationTime + return e1->creationTime > e2->creationTime; + } + }; + struct LFUDAEntry : public Entry { int localWeight; using handle_type = boost::heap::fibonacci_heap>>::handle_type; handle_type handle; - LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, time_t creationTime, rgw_user user, int localWeight) : Entry(key, offset, len, version, dirty, creationTime, user), localWeight(localWeight) {} + LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty), localWeight(localWeight) {} - void set_handle(handle_type handle_) { handle = handle_; } + void set_handle(handle_type handle_) { handle = handle_; } }; struct LFUDAObjEntry : public ObjEntry { - LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : ObjEntry(key, version, dirty, size, creationTime, user, etag) {} + using handle_type = boost::heap::fibonacci_heap>>::handle_type; + handle_type handle; + + LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key) {} + + void set_handle(handle_type handle_) { handle = handle_; } }; using Heap = boost::heap::fibonacci_heap>>; + using Object_Heap = boost::heap::fibonacci_heap>>; Heap entries_heap; + Object_Heap object_heap; //This heap contains dirty objects ordered by their creation time, used for cleaning method std::unordered_map entries_map; - std::unordered_map o_entries_map; + std::unordered_map o_entries_map; //Contains only dirty objects, used for look-up std::mutex lfuda_lock; + std::mutex lfuda_cleaning_lock; + std::condition_variable cond; + bool quit{false}; int age = 1, weightSum = 0, postedSum = 0; optional_yield y = null_yield; @@ -107,7 +129,7 @@ class LFUDAPolicy : public CachePolicy { BlockDirectory* dir; rgw::cache::CacheDriver* cacheDriver; std::optional rthread_timer; - rgw::sal::Driver *driver; + rgw::sal::Driver* driver; std::thread tc; CephContext* cct; @@ -139,21 +161,25 @@ class LFUDAPolicy : public CachePolicy { ~LFUDAPolicy() { rthread_stop(); delete dir; + std::lock_guard l(lfuda_cleaning_lock); + quit = true; + cond.notify_all(); } virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver); virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; void save_y(optional_yield y) { this->y = y; } - virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override; - virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; + virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override; + virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y); virtual void cleaning(const DoutPrefixProvider* dpp) override; LFUDAObjEntry* find_obj_entry(const std::string& key) { auto it = o_entries_map.find(key); - if (it == o_entries_map.end()) + if (it == o_entries_map.end()) { return nullptr; + } return it->second; } }; @@ -173,11 +199,11 @@ class LRUPolicy : public CachePolicy { public: LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {} - virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; } + virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; } virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override; - virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override; + virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual void cleaning(const DoutPrefixProvider* dpp) override {} diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 5803ce2ca9b..55da49e81f4 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -105,115 +105,101 @@ int D4NFilterBucket::create(const DoutPrefixProvider* dpp, int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) { - if (setattrs != NULL) { - /* Ensure setattrs and delattrs do not overlap */ - if (delattrs != NULL) { - for (const auto& attr : *delattrs) { - if (std::find(setattrs->begin(), setattrs->end(), attr) != setattrs->end()) { - delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr)); + rgw::sal::Attrs attrs; + std::string head_oid_in_cache; + if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) { + if (setattrs != nullptr) { + /* Ensure setattrs and delattrs do not overlap */ + if (delattrs != nullptr) { + for (const auto& attr : *delattrs) { + if (std::find(setattrs->begin(), setattrs->end(), attr) != setattrs->end()) { + delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr)); + } } } - } + //if set_obj_attrs() can be called to update existing attrs, then update_attrs() need to be called + if (auto ret = driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, *setattrs, y); ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed with ret: " << ret << dendl; + return ret; + } + } //if setattrs != nullptr - if (driver->get_cache_driver()->set_attrs(dpp, this->get_key().get_oid(), *setattrs, y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed." << dendl; - } + if (delattrs != nullptr) { + Attrs::iterator attr; + Attrs currentattrs = this->get_attrs(); - if (delattrs != NULL) { - Attrs::iterator attr; - Attrs currentattrs = this->get_attrs(); + /* Ensure all delAttrs exist */ + for (const auto& attr : *delattrs) { + if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) { + delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr)); + } + } - /* Ensure all delAttrs exist */ - for (const auto& attr : *delattrs) { - if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) { - delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr)); + if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, *delattrs, y); ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl; + return ret; } + } //if delattrs != nullptr + } else { + auto ret = next->set_obj_attrs(dpp, setattrs, delattrs, y, flags); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl; + return ret; } - - if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), *delattrs, y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl; } - - return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags); + return 0; } bool D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y) { - rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir(); - rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ - .objName = this->get_name(), - .bucketName = this->get_bucket()->get_name(), - }; - - rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ - .cacheObj = object, - .blockID = 0, - .version = version, - .size = 0 - }; - - bool found_in_cache = true; - //if the block corresponding to head object does not exist in directory, implies it is not cached - if (blockDir->exist_key(&block, y) && (blockDir->get(&block, y) == 0)) { - rgw::sal::Attrs attrs; - std::string version = block.version; - this->set_object_version(version); - //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version - std::string head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name(); - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache." << dendl; - auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y); + std::string head_oid_in_cache; + rgw::sal::Attrs attrs; + bool found_in_cache = check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y); + + if (found_in_cache) { + /* Set metadata locally */ + + std::string instance; + for (auto& attr : attrs) { + if (attr.second.length() > 0) { + if (attr.first == "user.rgw.mtime") { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl; + auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str())); + this->set_mtime(mtime); + } else if (attr.first == "user.rgw.object_size") { + auto size = std::stoull(attr.second.c_str()); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl; + this->set_obj_size(size); + } else if (attr.first == "user.rgw.accounted_size") { + auto accounted_size = std::stoull(attr.second.c_str()); + this->set_accounted_size(accounted_size); + } else if (attr.first == "user.rgw.epoch") { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl; + auto epoch = std::stoull(attr.second.c_str()); + this->set_epoch(epoch); + } else if (attr.first == "user.rgw.version_id") { + instance = attr.second.to_str(); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id to: " << instance << dendl; + } else if (attr.first == "user.rgw.source_zone") { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl; + auto short_zone_id = static_cast(std::stoul(attr.second.c_str())); + this->set_short_zone_id(short_zone_id); + } else { + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl; + } + }//end-if + }//end-for + this->set_instance(instance); //set this only after setting object state else it won't take effect + attrs.erase("user.rgw.mtime"); + attrs.erase("user.rgw.object_size"); + attrs.erase("user.rgw.accounted_size"); + attrs.erase("user.rgw.epoch"); + /* Set attributes locally */ + auto ret = this->set_attrs(attrs); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; - found_in_cache = false; - } else { - /* Set metadata locally */ - RGWQuotaInfo quota_info; - - std::string instance; - for (auto& attr : attrs) { - if (attr.second.length() > 0) { - if (attr.first == "user.rgw.mtime") { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting mtime." << dendl; - auto mtime = ceph::real_clock::from_double(std::stod(attr.second.c_str())); - this->set_mtime(mtime); - } else if (attr.first == "user.rgw.object_size") { - auto size = std::stoull(attr.second.c_str()); - this->set_obj_size(size); - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting object_size to: " << size << dendl; - } else if (attr.first == "user.rgw.accounted_size") { - auto accounted_size = std::stoull(attr.second.c_str()); - this->set_accounted_size(accounted_size); - } else if (attr.first == "user.rgw.epoch") { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting epoch." << dendl; - auto epoch = std::stoull(attr.second.c_str()); - this->set_epoch(epoch); - } else if (attr.first == "user.rgw.version_id") { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting version_id." << dendl; - instance = attr.second.to_str(); - } else if (attr.first == "user.rgw.source_zone") { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): setting source zone id." << dendl; - auto zone_short_id = static_cast(std::stoul(attr.second.c_str())); - this->set_short_zone_id(zone_short_id); - } else { - ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Unexpected attribute; not locally set, attr name: " << attr.first << dendl; - } - }//end-if - }//end-for - //this->set_obj_state(astate); - this->set_instance(instance); //set this only after setting object state else it won't take effect - attrs.erase("user.rgw.mtime"); - attrs.erase("user.rgw.object_size"); - attrs.erase("user.rgw.accounted_size"); - attrs.erase("user.rgw.epoch"); - /* Set attributes locally */ - ret = this->set_attrs(attrs); - if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; - } + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; } - } else { - found_in_cache = false; - } + } // if found_in_cache = true return found_in_cache; } @@ -267,7 +253,35 @@ int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_y return 0; } -int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y) +int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool dirty) +{ + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl; + rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir(); + rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ + .objName = this->get_name(), + .bucketName = this->get_bucket()->get_name(), + .dirty = dirty, + .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }, + }; + + rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ + .cacheObj = object, + .blockID = 0, + .version = this->get_object_version(), + .dirty = dirty, + .size = 0, + .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }, + }; + + auto ret = blockDir->set(dpp, &block, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + } + + return ret; +} + +bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, optional_yield y) { rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir(); rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ @@ -278,16 +292,37 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ .cacheObj = object, .blockID = 0, - .version = this->get_object_version(), .size = 0 }; - auto ret = blockDir->set(&block, y); - if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + bool found_in_cache = true; + //if the block corresponding to head object does not exist in directory, implies it is not cached + if (blockDir->exist_key(dpp, &block, y) && (blockDir->get(dpp, &block, y) == 0)) { + std::string version; + if (have_instance()) { + version = get_instance(); + } else { + version = block.version; + } + this->set_object_version(version); + //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version + head_oid_in_cache = get_bucket()->get_name() + "_" + version + "_" + get_name(); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl; + auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y); + if (ret < 0) { + //check for dirty blocks also + head_oid_in_cache = "D_" + get_bucket()->get_name() + "_" + version + "_" + get_name(); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl; + ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y); + if (ret < 0) { + found_in_cache = false; + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; + } + } + } else { //if blockDir->exist_key + found_in_cache = false; } - - return ret; + return found_in_cache; } int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, @@ -299,7 +334,10 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d std::string version; ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from backend store." << dendl; auto ret = next->get_obj_attrs(y, dpp, target_obj); - if (ret < 0) { + if (ret < 0 || !target_obj) { + if (!target_obj) { + ret = -ENOENT; + } ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl; return ret; } @@ -314,17 +352,26 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d } head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name(); - ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y); + if (this->driver->get_policy_driver()->get_cache_policy()->exist_key(head_oid_in_cache) > 0) { + ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y); + } else { + ret = this->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y); + if (ret == 0) { + bufferlist bl; + ret = this->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y); + } else { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to evict data with err: " << ret << dendl; + } + } if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl; - time_t creationTime = ceph::real_clock::to_time_t(this->get_mtime()); - this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, creationTime, get(this->get_bucket()->get_owner()), y); + this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y); ret = set_head_obj_dir_entry(dpp, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; } } else { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in block dir with error: " << ret << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend with error: " << ret << dendl; } } @@ -336,30 +383,50 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va { Attrs update; update[(std::string)attr_name] = attr_val; - - if (driver->get_cache_driver()->update_attrs(dpp, this->get_key().get_oid(), update, y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver update_attrs method failed." << dendl; - - return next->modify_obj_attrs(attr_name, attr_val, y, dpp, flags); + std::string head_oid_in_cache; + rgw::sal::Attrs attrs; + if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) { + if (auto ret = driver->get_cache_driver()->update_attrs(dpp, head_oid_in_cache, update, y); ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver update_attrs method failed with ret: " << ret << dendl; + return ret; + } + } else { + auto ret = next->modify_obj_attrs(attr_name, attr_val, y, dpp, flags); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): modify_obj_attrs of backend store failed with ret: " << ret << dendl; + return ret; + } + } + return 0; } int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) { buffer::list bl; + std::string head_oid_in_cache; + rgw::sal::Attrs attrs; Attrs delattr; - delattr.insert({attr_name, bl}); - Attrs currentattrs = this->get_attrs(); - rgw::sal::Attrs::iterator attr = delattr.begin(); + if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, y)) { + delattr.insert({attr_name, bl}); + Attrs currentattrs = this->get_attrs(); + rgw::sal::Attrs::iterator attr = delattr.begin(); - /* Ensure delAttr exists */ - if (std::find_if(currentattrs.begin(), currentattrs.end(), - [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) { + /* Ensure delAttr exists */ + if (std::find_if(currentattrs.begin(), currentattrs.end(), + [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) { - if (driver->get_cache_driver()->delete_attrs(dpp, this->get_key().get_oid(), delattr, y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl; - } else - return next->delete_obj_attrs(dpp, attr_name, y); + if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, delattr, y); ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl; + return ret; + } + } + } else { + if (auto ret = next->delete_obj_attrs(dpp, attr_name, y); ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): delete_obj_attrs method of backend store failed with ret: " << ret << dendl; + return ret; + } + } return 0; } @@ -447,8 +514,7 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y); if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl; - time_t creationTime = ceph::real_clock::to_time_t(this->source->get_mtime()); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, creationTime, std::get(source->get_bucket()->get_owner()), y); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y); ret = source->set_head_obj_dir_entry(dpp, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; @@ -469,16 +535,13 @@ void D4NFilterObject::D4NFilterReadOp::cancel() { } int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) { - auto c = aio->wait(); - while (!c.empty()) { - int r = flush(dpp, std::move(c), y); - if (r < 0) { - cancel(); - return r; - } - c = aio->wait(); + auto c = aio->drain(); + int r = flush(dpp, std::move(c), y); + if (r < 0) { + cancel(); + return r; } - return flush(dpp, std::move(c), y); + return 0; } int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y) { @@ -514,12 +577,6 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw:: uint64_t ofs = ofs_len_pair.first; uint64_t len = ofs_len_pair.second; bool dirty = false; - /* - std::stringstream s; - utime_t ut(source->get_mtime()); - ut.gmtime(s); - */ - time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime()); rgw::d4n::CacheBlock block; block.cacheObj.objName = source->get_key().get_oid(); @@ -529,15 +586,15 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw:: std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len); - if (source->driver->get_block_dir()->get(&block, y) == 0){ - if (block.dirty == true){ + if (source->driver->get_block_dir()->get(dpp, &block, y) == 0){ + if (block.dirty){ dirty = true; } } ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl; ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl; - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, creationTime, std::get(source->get_bucket()->get_owner()), y); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y); blocks_info.erase(it); } else { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl; @@ -557,14 +614,8 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int const uint64_t window_size = g_conf()->rgw_get_obj_window_size; std::string version = source->get_object_version(); std::string prefix; - /* After prepare() method, for versioned objects, get_oid() returns an oid with versionId added, - * even for versioned objects, where version id is not provided as input - */ - if (source->have_instance()) { - prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid(); - } else { - prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid(); - } + + prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_name(); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "prefix: " << prefix << dendl; ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl; @@ -626,10 +677,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int " read_ofs: " << read_ofs << " part len: " << part_len << dendl; int ret = -1; - if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && (ret = source->driver->get_block_dir()->get(&block, y)) == 0) { + if (source->driver->get_block_dir()->exist_key(dpp, &block, y) > 0 && (ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address); if (it != block.hostsList.end()) { /* Local copy */ - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl; std::string key = oid_in_cache; ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.dirty << dendl; if (block.dirty == true) { @@ -753,6 +804,29 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int return this->cb->flush_last_part(); } +int D4NFilterObject::D4NFilterReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) +{ + rgw::sal::Attrs& attrs = source->get_attrs(); + if (attrs.empty()) { + rgw_obj obj = source->get_obj(); + auto ret = source->get_obj_attrs(y, dpp, &obj); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Error: failed to fetch attrs, ret= " << ret << dendl; + return ret; + } + //get_obj_attrs() calls set_attrs() internally, hence get_attrs() can be invoked to get the latest attrs. + attrs = source->get_attrs(); + } + auto it = attrs.find(name); + if (it == attrs.end()) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Attribute value NOT found for attr name= " << name << dendl; + return next->get_attr(dpp, name, dest, y); + } + + dest = it->second; + return 0; +} + int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part() { last_part = true; @@ -773,6 +847,10 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache if (write_to_cache) { + Attrs attrs; // empty attrs for cache sets + std::string version = source->get_object_version(); + std::string prefix = source->get_prefix(); + rgw::d4n::CacheBlock block, existing_block; rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); @@ -781,15 +859,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::stringstream s; block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime())); block.cacheObj.dirty = false; - bool dirty = false; - time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime()); + bool dirty = block.dirty = false; //Reading from the backend, data is clean + block.version = version; //populating fields needed for building directory index existing_block.cacheObj.objName = block.cacheObj.objName; existing_block.cacheObj.bucketName = block.cacheObj.bucketName; - Attrs attrs; // empty attrs for cache sets - std::string version = source->get_object_version(); - std::string prefix = source->get_prefix(); ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl; @@ -798,34 +873,32 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { block.blockID = ofs; block.size = bl.length(); - block.version = version; - block.dirty = false; //Reading from the backend, data is clean + auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); if (ret == 0) { ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y); if (ret == 0) { std::string objEtag = ""; - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get(source->get_bucket()->get_owner()), *y); - filter->get_policy_driver()->get_cache_policy()->updateObj(dpp, prefix, version, dirty, source->get_size(), creationTime, std::get(source->get_bucket()->get_owner()), objEtag, *y); + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y); /* Store block in directory */ - if (!blockDir->exist_key(&block, *y)) { - if (blockDir->set(&block, *y) < 0) //should we revert previous steps if this step fails? + if (!blockDir->exist_key(dpp, &block, *y)) { + if (blockDir->set(dpp, &block, *y) < 0) //should we revert previous steps if this step fails? ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { existing_block.blockID = block.blockID; existing_block.size = block.size; existing_block.dirty = block.dirty; - if (blockDir->get(&existing_block, *y) < 0) { + if (blockDir->get(dpp, &existing_block, *y) < 0) { ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; } else { if (existing_block.version != block.version) { - if (blockDir->del(&existing_block, *y) < 0) //delete existing block + if (blockDir->del(dpp, &existing_block, *y) < 0) //delete existing block ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(&block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? + if (blockDir->set(dpp, &block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) + if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl; } } @@ -839,34 +912,32 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); block.blockID = ofs; block.size = bl.length(); - block.version = version; ofs += bl_len; - block.dirty = dirty; if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); if (ret == 0) { ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y); if (ret == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get(source->get_bucket()->get_owner()), *y); + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y); /* Store block in directory */ - if (!blockDir->exist_key(&block, *y)) { - if (blockDir->set(&block, *y) < 0) + if (!blockDir->exist_key(dpp, &block, *y)) { + if (blockDir->set(dpp, &block, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { existing_block.blockID = block.blockID; existing_block.size = block.size; existing_block.dirty = block.dirty; - if (blockDir->get(&existing_block, *y) < 0) { + if (blockDir->get(dpp, &existing_block, *y) < 0) { ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; } if (existing_block.version != block.version) { - if (blockDir->del(&existing_block, *y) < 0) + if (blockDir->del(dpp, &existing_block, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(&block, *y) < 0) + if (blockDir->set(dpp, &block, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) + if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl; } } @@ -888,33 +959,31 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { block.blockID = ofs; block.size = bl_rem.length(); - block.version = version; ofs += bl_rem.length(); - block.dirty = dirty; auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); if (ret == 0) { ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y); if (ret == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, creationTime, std::get(source->get_bucket()->get_owner()), *y); + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, *y); /* Store block in directory */ - if (!blockDir->exist_key(&block, *y)) { - if (blockDir->set(&block, *y) < 0) + if (!blockDir->exist_key(dpp, &block, *y)) { + if (blockDir->set(dpp, &block, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { existing_block.blockID = block.blockID; existing_block.size = block.size; existing_block.dirty = block.dirty; - if (blockDir->get(&existing_block, *y) < 0) { + if (blockDir->get(dpp, &existing_block, *y) < 0) { ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; } else { if (existing_block.version != block.version) { - if (blockDir->del(&existing_block, *y) < 0) + if (blockDir->del(dpp, &existing_block, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(&block, *y) < 0) + if (blockDir->set(dpp, &block, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) + if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; } } @@ -945,6 +1014,17 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) { + /* + 1. Check if object exists in Object Directory + 2. get dirty flag and version to construct the oid in cache correctly + 3. loop and delete all blocks in the cache + 4. delete the head block + 5. Update the in memory data structure for all blocks, and update the block directory also + 6. Update the in memory data structure for head block, and update the block directory also + 7. If the blocks reside in other caches, send remote request for the same + 8. Need to figure out a way to get all versions to be deleted in case of versioned objects when a version is not specified. + 9. If the object is not in object directory call next->delete_obj + */ rgw::d4n::CacheObj obj = rgw::d4n::CacheObj{ // TODO: Add logic to ObjectDirectory del method to also delete all blocks belonging to that object .objName = source->get_key().get_oid(), .bucketName = source->get_bucket()->get_name() @@ -972,15 +1052,27 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp int D4NFilterWriter::prepare(optional_yield y) { startTime = time(NULL); - if (driver->get_cache_driver()->delete_data(save_dpp, obj->get_key().get_oid(), y) < 0) - ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl; + if (driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl; d4n_writecache = g_conf()->d4n_writecache_enabled; if (!d4n_writecache){ - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl; return next->prepare(y); } - else - return 0; + if (!object->have_instance()) { + if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended + this->version = "null"; + } else { // this holds true for non-versioned object and for version enabled object with no versionId given as input + constexpr uint32_t OBJ_INSTANCE_LEN = 32; + char buf[OBJ_INSTANCE_LEN + 1]; + gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN); + this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): generating version: " << version << dendl; + } + } else { + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): " << "version is: " << object->get_instance() << dendl; + } + return 0; } int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) @@ -990,17 +1082,15 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) off_t ofs = offset; bool dirty = true; rgw::d4n::CacheBlock block, existing_block; - auto creationTime = startTime; - - auto version = obj->get_instance(); + std::string version; std::string prefix; - if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added - prefix =obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid(); + if (object->have_instance()) { + version = obj->get_instance(); } else { - prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid(); + version = this->version; } - + prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name(); rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir(); block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); @@ -1013,40 +1103,9 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) int ret = 0; - if (!d4n_writecache){ - std::string oid = prefix + "_" + std::to_string(ofs)+ "_" + std::to_string(bl_len); - block.size = bl.length(); - block.blockID = ofs; - block.dirty = false; //writing to the backend, hence the data is clean - block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_backend_address); - - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl; + if (!d4n_writecache) { + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl; ret = next->process(std::move(data), offset); - if (ret == 0){ - if (!blockDir->exist_key(&block, y)) { - if (blockDir->set(&block, y) < 0) - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - existing_block.blockID = block.blockID; - existing_block.size = block.size; - if (blockDir->get(&existing_block, y) < 0) { - ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; - } else { - if (existing_block.version != block.version) { - if (blockDir->del(&existing_block, y) < 0) //delete existing block - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl; - } - } - } - } else{ - ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writing data to the backend failed!" << dendl; - return ret; - } } else { std::string oid = prefix + "_" + std::to_string(ofs); std::string key = "D_" + oid + "_" + std::to_string(bl_len); @@ -1055,36 +1114,38 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) block.blockID = ofs; block.dirty = true; block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); + block.version = version; dirty = true; - ret = driver->get_policy_driver()->get_cache_policy()->eviction(save_dpp, block.size, y); - if (ret == 0) { - //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released? + ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, y); + if (ret == 0) { if (bl.length() > 0) { - ret = driver->get_cache_driver()->put(save_dpp, key, bl, bl.length(), obj->get_attrs(), y); + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): key is: " << key << dendl; + ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), obj->get_attrs(), y); if (ret == 0) { - driver->get_policy_driver()->get_cache_policy()->update(save_dpp, oid_in_cache, ofs, bl.length(), version, dirty, creationTime, std::get(obj->get_bucket()->get_owner()), y); - if (!blockDir->exist_key(&block, y)) { - if (blockDir->set(&block, y) < 0) //should we revert previous steps if this step fails? - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl; + driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y); + if (!blockDir->exist_key(dpp, &block, y)) { + if (blockDir->set(dpp, &block, y) < 0) //should we revert previous steps if this step fails? + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { existing_block.blockID = block.blockID; existing_block.size = block.size; - if (blockDir->get(&existing_block, y) < 0) { - ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; + if (blockDir->get(dpp, &existing_block, y) < 0) { + ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; } else { if (existing_block.version != block.version) { - if (blockDir->del(&existing_block, y) < 0) //delete existing block - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; + if (blockDir->del(dpp, &existing_block, y) < 0) //delete existing block + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl; + if (blockDir->set(dpp, &block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) - ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl; + if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl; } } } } else { - ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl; + ldpp_dout(dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl; return ret; } } @@ -1104,99 +1165,88 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, const req_context& rctx, uint32_t flags) { - bool dirty = true; + bool dirty = false; std::vector hostsList = {}; auto creationTime = startTime; std::string objEtag = etag; - - auto version = obj->get_instance(); - - std::string prefix; - if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added - prefix = obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid(); - } else { - prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid(); - } - - - if (d4n_writecache){ - driver->get_policy_driver()->get_cache_policy()->updateObj(save_dpp, prefix, version, dirty, accounted_size, creationTime, std::get(obj->get_bucket()->get_owner()), objEtag, y); - hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }; + bool write_to_backend_store = false; + int ret; - rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ - .objName = obj->get_key().get_oid(), - .bucketName = obj->get_bucket()->get_name(), - .creationTime = std::to_string(creationTime), - .dirty = dirty, - .hostsList = hostsList - }; - - if (driver->get_obj_dir()->set(&object, y) < 0) - ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed." << dendl; - return 0; - } - /* Retrieve complete set of attrs */ - int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum, - delete_at, if_match, if_nomatch, user_data, zones_trace, - canceled, rctx, flags); - obj->get_obj_attrs(rctx.y, save_dpp, NULL); - - /* Append additional metadata to attributes */ - rgw::sal::Attrs baseAttrs = obj->get_attrs(); - rgw::sal::Attrs attrs_temp = baseAttrs; - buffer::list bl; - obj->load_obj_state(save_dpp, rctx.y); - - bl.append(std::to_string(creationTime)); - baseAttrs.insert({"mtime", bl}); - bl.clear(); - - bl.append(std::to_string(obj->get_size())); - baseAttrs.insert({"object_size", bl}); - bl.clear(); - - bl.append(std::to_string(accounted_size)); - baseAttrs.insert({"accounted_size", bl}); - bl.clear(); - - bl.append(std::to_string(obj->get_epoch())); - baseAttrs.insert({"epoch", bl}); - bl.clear(); - - if (obj->have_instance()) { - bl.append(obj->get_instance()); - baseAttrs.insert({"version_id", bl}); - bl.clear(); - } else { - bl.append(""); /* Empty value */ - baseAttrs.insert({"version_id", bl}); - bl.clear(); - } + if (d4n_writecache) { + dirty = true; + std::string version; + if (object->have_instance()) { + version = obj->get_instance(); + } else { + version = this->version; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being + if (obj->get_bucket()->versioned()) { + object->set_instance(version); + } + } + std::string key = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name(); - auto iter = attrs_temp.find(RGW_ATTR_SOURCE_ZONE); - if (iter != attrs_temp.end()) { - bl.append(std::to_string(obj->get_short_zone_id())); - baseAttrs.insert({"source_zone_short_id", bl}); - bl.clear(); - } else { - bl.append("0"); /* Initialized to zero */ - baseAttrs.insert({"source_zone_short_id", bl}); - bl.clear(); + ceph::real_time m_time; + if (mtime) { + m_time = *mtime; + } else { + m_time = real_clock::now(); + } + object->set_mtime(m_time); + object->set_accounted_size(accounted_size); + ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " size is: " << object->get_size() << dendl; + object->set_obj_state_attrs(dpp, y, attrs); + bufferlist bl; + std::string head_oid_in_cache = "D_" + key; //same as key, as there is no len or offset attached to head oid in cache + ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y); + attrs.erase("user.rgw.mtime"); + attrs.erase("user.rgw.object_size"); + attrs.erase("user.rgw.accounted_size"); + attrs.erase("user.rgw.epoch"); + object->set_object_version(version); + if (ret == 0) { + object->set_attrs(attrs); + ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << " version stored in update method is: " << version << dendl; + driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y); + ret = object->set_head_obj_dir_entry(dpp, y, true); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + return ret; + } + driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, version, dirty, accounted_size, creationTime, std::get(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y); + + //write object to directory. + hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }; + rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ + .objName = obj->get_name(), + .bucketName = obj->get_bucket()->get_name(), + .creationTime = std::to_string(creationTime), + .dirty = dirty, + .hostsList = hostsList + }; + ret = driver->get_obj_dir()->set(&object, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl; + return ret; + } + } else { //if get_cache_driver()->put() + write_to_backend_store = true; + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " put failed for head_oid_in_cache wih error: " << ret << dendl; + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " calling complete of backend store: " << dendl; + } + } else { // if d4n_writecache = true + write_to_backend_store = true; } - baseAttrs.insert(attrs.begin(), attrs.end()); - - //bufferlist bl_empty; - //int putReturn = driver->get_cache_driver()-> - // put(save_dpp, obj->get_key().get_oid(), bl_empty, accounted_size, baseAttrs, y); /* Data already written during process call */ - /* - if (putReturn < 0) { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation failed." << dendl; - } else { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation succeeded." << dendl; + if (write_to_backend_store) { + ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum, + delete_at, if_match, if_nomatch, user_data, zones_trace, + canceled, rctx, flags); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): Writing to backend store failed with err: " << ret << dendl; + } } - */ - return ret; + + return 0; } } } // namespace rgw::sal diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 27f4aa582fd..6788e1122d8 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -105,11 +105,6 @@ class D4NFilterObject : public FilterObject { D4NFilterDriver* driver; std::string version; std::string prefix; - - bool get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y); - void set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs); - int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version); - int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y); public: struct D4NFilterReadOp : FilterReadOp { public: @@ -152,6 +147,8 @@ class D4NFilterObject : public FilterObject { virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override; virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, RGWGetDataCB* cb, optional_yield y) override; + virtual int get_attr(const DoutPrefixProvider* dpp, const char* name, + bufferlist& dest, optional_yield y) override; private: RGWGetDataCB* client_cb; @@ -204,26 +201,33 @@ class D4NFilterObject : public FilterObject { void set_prefix(const std::string& prefix) { this->prefix = prefix; } const std::string get_prefix() { return this->prefix; } + bool get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y); + void set_obj_state_attrs(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs); + int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version); + int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool dirty = false); + bool check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, optional_yield y); }; class D4NFilterWriter : public FilterWriter { private: D4NFilterDriver* driver; - const DoutPrefixProvider* save_dpp; + D4NFilterObject* object; + const DoutPrefixProvider* dpp; bool atomic; optional_yield y; bool d4n_writecache; time_t startTime; + std::string version; public: D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _driver, Object* _obj, const DoutPrefixProvider* _dpp, optional_yield _y) : FilterWriter(std::move(_next), _obj), driver(_driver), - save_dpp(_dpp), atomic(false), y(_y) {} + dpp(_dpp), atomic(false), y(_y) { object = static_cast(obj); } D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _driver, Object* _obj, const DoutPrefixProvider* _dpp, bool _atomic, optional_yield _y) : FilterWriter(std::move(_next), _obj), driver(_driver), - save_dpp(_dpp), atomic(_atomic), y(_y) {} + dpp(_dpp), atomic(_atomic), y(_y) { object = static_cast(obj); } virtual ~D4NFilterWriter() = default; virtual int prepare(optional_yield y); @@ -239,7 +243,7 @@ class D4NFilterWriter : public FilterWriter { const req_context& rctx, uint32_t flags) override; bool is_atomic() { return atomic; }; - const DoutPrefixProvider* dpp() { return save_dpp; } + const DoutPrefixProvider* get_dpp() { return this->dpp; } }; } } // namespace rgw::sal diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index dea349af514..12bfd7913b7 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -119,8 +119,8 @@ class LFUDAPolicyFixture : public ::testing::Test { } else { if (!block->hostsList.empty()) { block->globalWeight += age; - - if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) { + auto globalWeight = std::to_string(block->globalWeight); + if (dir->update_field(block, "globalWeight", globalWeight, y) < 0) { return -1; } else { return 0;