From 77947f9bbce13f1c57cafc951dc4d2819abebd08 Mon Sep 17 00:00:00 2001 From: Samarah Date: Thu, 15 Jan 2026 19:40:15 +0000 Subject: [PATCH] rgw/d4n: Implement base cache API ops Signed-off-by: Samarah Uriarte --- src/rgw/driver/d4n/rgw_sal_d4n.cc | 134 ++++++++++++++++++++++++++---- src/rgw/driver/d4n/rgw_sal_d4n.h | 7 ++ src/rgw/rgw_op.cc | 24 ++++++ 3 files changed, 150 insertions(+), 15 deletions(-) diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 2eaed616bfcb..59c9573715cc 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -515,6 +515,14 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int } } //d4n_write_cache_enabled = true + /* Cache requests are indicated with the x-rgw-cache-request custom header during S3 ops so users can interact + * only with the cache. If the object is found in the cache, the request succeeds. If it is only in the backend, + * the request returns -ENOENT. */ + if (cache_request) { + results = std::move(cache_results); + return 0; + } + //Get objects from backend store auto ret = next->list(dpp, params, max, store_results, y); if (ret < 0) { @@ -820,6 +828,9 @@ int D4NFilterObject::load_obj_state(const DoutPrefixProvider *dpp, optional_yiel bool follow_olh) { if (load_from_store) { + if (cache_request) { + return -ENOENT; + } return next->load_obj_state(dpp, y, follow_olh); } bool has_instance = false; @@ -838,6 +849,9 @@ int D4NFilterObject::load_obj_state(const DoutPrefixProvider *dpp, optional_yiel } return 0; } + if (cache_request) { + return -ENOENT; + } return next->load_obj_state(dpp, y, follow_olh); } @@ -901,6 +915,9 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr } if (!found_in_cache) { + if (cache_request) { + return -ENOENT; + } auto ret = next->set_obj_attrs(dpp, setattrs, delattrs, y, flags); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl; @@ -1390,6 +1407,21 @@ int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, o return 0; } +int D4NFilterObject::delete_cache_entry(const DoutPrefixProvider* dpp, const std::string key, optional_yield y) { + int ret; + if ((ret = driver->get_cache_driver()->delete_data(dpp, key, y)) == 0) { // Inline cache delete + if (!(ret = driver->get_policy_driver()->get_cache_policy()->erase(dpp, key, y))) { + ldpp_dout(dpp, 10) << "Failed to delete policy entry for: " << key << ", ret=" << ret << dendl; + return ret; + } + } else { + ldpp_dout(dpp, 10) << "Failed to delete object in cache for: " << key << ", ret=" << ret << dendl; + return ret; + } + + return 0; +} + int D4NFilterObject::delete_data_block_cache_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty) { //delete cache entries @@ -1405,14 +1437,8 @@ int D4NFilterObject::delete_data_block_cache_entries(const DoutPrefixProvider* d std::string key = get_key_in_cache(get_cache_block_prefix(this, version), std::to_string(fst), std::to_string(cur_len)); int ret; - if ((ret = driver->get_cache_driver()->delete_data(dpp, key, y)) == 0) { - if (!(ret = driver->get_policy_driver()->get_cache_policy()->erase(dpp, key, y))) { - ldpp_dout(dpp, 0) << "Failed to delete policy entry for: " << key << ", ret=" << ret << dendl; - return ret; - } - } else { - ldpp_dout(dpp, 0) << "Failed to delete cache entry for: " << key << ", ret=" << ret << dendl; - return ret; + if ((ret = delete_cache_entry(dpp, key, y)) < 0) { + return ret; } fst += cur_len; } while(fst < lst); @@ -1495,6 +1521,9 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): " << " object " << this->get_name() << " does not exist." << dendl; return -ENOENT; } else if (!ret) { + if (cache_request) { + return -ENOENT; + } if(perfcounter) { perfcounter->inc(l_rgw_d4n_cache_misses); } @@ -1568,6 +1597,10 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va return ret; } } else { + if (cache_request) { + return -ENOENT; + } + if (block.deleteMarker) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object " << this->get_name() << " does not exist." << dendl; return -ENOENT; @@ -1618,6 +1651,9 @@ int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* } } if (!found_in_cache) { + if (cache_request) { + return -ENOENT; + } if (auto ret = next->delete_obj_attrs(dpp, attr_name, y); ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): delete_obj_attrs method of backend store failed with ret: " << ret << dendl; return ret; @@ -1688,6 +1724,9 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::" << __func__ << "(): object " << source->get_name() << " does not exist." << dendl; return -ENOENT; } else if (!ret) { + if (source->is_cache_request()) { + return -ENOENT; + } if(perfcounter) { perfcounter->inc(l_rgw_d4n_cache_misses); } @@ -2062,6 +2101,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int adjusted_len -= max_chunk_size; } while (start_part_num < num_parts); } + + if (source->cache_request) { + return -ENOENT; + } + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl; Attrs obj_attrs; @@ -2346,12 +2390,16 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp std::string head_oid_in_cache; rgw::d4n::CacheBlock block; int ret = -1; + bool cache_request = source->cache_request; /* check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker. As a result, the below check guarantees the head object is not in the cache. */ if (!source->check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y) && !block.deleteMarker) { /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a delete marker there since no object with source->get_name() will be found in the cache (and this is correct) */ + if (cache_request) { + return -ENOENT; + } ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): head object not found; calling next->delete_obj" << dendl; next->params = params; ret = next->delete_obj(dpp, y, flags); @@ -2369,7 +2417,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp objName = "_" + source->get_name(); } - if (objDirty) { // head object dirty flag represents object dirty flag + if (objDirty && !cache_request) { // head object dirty flag represents object dirty flag //for versioned buckets, for a simple delete we need to create a delete marker (and not invalidate/delete any object) if (!source->get_bucket()->versioned() || (block.cacheObj.objName != source->get_name())) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): calling invalidate_dirty_object for: " << head_oid_in_cache << dendl; @@ -2389,6 +2437,11 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; return ret; } + if (cache_request) { + if ((ret = source->delete_cache_entry(dpp, get_cache_block_prefix(source, version), y)) < 0) { + return ret; + } + } } /* if versioning is suspended, we might have a latest head entry created from when bucket was non-versioned don't return error as that could already be deleted by set_head_obj_dir_entry */ @@ -2397,6 +2450,11 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp if ((ret = blockDir->del(dpp, &block, y)) < 0) { ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; } + if (cache_request) { + if ((ret = source->delete_cache_entry(dpp, head_oid_in_cache, y)) < 0) { + return ret; + } + } } } else if (objDirty) { //2. dirty objects - 1. add delete marker for simple request 2. delete version if given and correctly promote latest version if needed bool transaction_success = false; @@ -2483,6 +2541,12 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl; return ret; } + if (cache_request) { + std::string req_oid_in_cache = get_key_in_cache(head_oid_in_cache + "#0#0", std::to_string(0), std::to_string(0)); + if ((ret = source->delete_cache_entry(dpp, req_oid_in_cache, y)) < 0) { + return ret; + } + } } } //end-if latest_block.version == block.version //delete versioned entry (handles delete markers also) @@ -2502,6 +2566,12 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl; return ret; } + if (cache_request) { + std::string req_oid_in_cache = get_key_in_cache(get_cache_block_prefix(source, version), std::to_string(0), std::to_string(0)); + if ((ret = source->delete_cache_entry(dpp, req_oid_in_cache, y)) < 0) { + return ret; + } + } result.delete_marker = block.deleteMarker; result.version_id = version; //success, hence break from loop @@ -2526,6 +2596,11 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue delete head object op in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; return ret; } + if (cache_request) { + if ((ret = source->delete_cache_entry(dpp, head_oid_in_cache, y)) < 0) { + return ret; + } + } //if we get request for latest head entry, delete the null block and vice versa if (block.cacheObj.objName == objName) { block.cacheObj.objName = "_:null_" + source->get_name(); @@ -2617,11 +2692,20 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp return ret; } + std::string req_oid_in_cache = get_key_in_cache(get_cache_block_prefix(source, version), std::to_string(block.blockID), std::to_string(block.size)); + if (cache_request) { + if ((ret = source->delete_cache_entry(dpp, req_oid_in_cache, y)) < 0) { + return ret; + } + } fst += cur_len; } while (fst < lst); } if (!objDirty) { + if (cache_request) { + return 0; + } next->params = params; ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object is not dirty; calling next->delete_obj" << dendl; ret = next->delete_obj(dpp, y, flags); @@ -2637,6 +2721,9 @@ int D4NFilterWriter::prepare(optional_yield y) d4n_writecache = g_conf()->d4n_writecache_enabled; if (!d4n_writecache) { + if (object->is_cache_request()) { + return -EINVAL; + } ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling next->prepare" << dendl; return next->prepare(y); } else { @@ -2678,7 +2765,7 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) bufferlist bl = data; off_t bl_len = bl.length(); off_t ofs = offset; - bool dirty = true; + bool dirty; std::string version = object->get_object_version(); std::string prefix = get_cache_block_prefix(obj, version); @@ -2686,22 +2773,34 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) int ret = 0; if (!d4n_writecache) { + if (object->is_cache_request()) { + return -EINVAL; + } ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): calling next process" << dendl; return next->process(std::move(data), offset); } else { rgw::sal::Attrs attrs; std::string oid = prefix + CACHE_DELIM + std::to_string(ofs); std::string oid_in_cache = oid + CACHE_DELIM + std::to_string(bl_len); - dirty = true; + if (!object->is_cache_request()) { + dirty = true; + } ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, bl.length(), y); if (ret == 0) { if (bl.length() > 0) { ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl; ret = driver->get_cache_driver()->put(dpp, oid_in_cache, bl, bl.length(), attrs, y); if (ret == 0) { - ret = driver->get_cache_driver()->set_attr(dpp, oid_in_cache, RGW_CACHE_ATTR_DIRTY, "1", y); - if (ret == 0) { - driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, y); + if (!object->is_cache_request()) { + dirty = true; + } + if (!object->is_cache_request()) { + ret = driver->get_cache_driver()->set_attr(dpp, oid_in_cache, RGW_CACHE_ATTR_DIRTY, "1", y); + if (ret == 0) { + driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, y); + } + } else { + driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, y); } } else { ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): ERROR: writing data to the cache failed, ret=" << ret << dendl; @@ -2794,7 +2893,9 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, return ret; } - dirty = true; + if (!object->is_cache_request()) { + dirty = true; + } ceph::real_time m_time; if (mtime) { if (real_clock::is_zero(*mtime)) { @@ -2811,6 +2912,9 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, object->set_attrs(attrs); object->set_attrs_from_obj_state(dpp, y, attrs, dirty); } else { + if (object->is_cache_request()) { + return -EINVAL; + } // we need to call next->complete here so that we are able to correctly get the object state needed for caching head ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum, delete_at, if_match, if_nomatch, user_data, zones_trace, diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index baa25d2c5f13..9bf937d006cc 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -112,6 +112,7 @@ class D4NFilterBucket : public FilterBucket { uint16_t flags; }; D4NFilterDriver* filter; + bool cache_request{false}; public: D4NFilterBucket(std::unique_ptr _next, D4NFilterDriver* _filter) : @@ -129,6 +130,7 @@ class D4NFilterBucket : public FilterBucket { const std::string& oid, std::optional upload_id=std::nullopt, ACLOwner owner={}, ceph::real_time mtime=real_clock::now()) override; + void set_cache_request() { cache_request = true; } }; class D4NFilterObject : public FilterObject { @@ -144,6 +146,7 @@ class D4NFilterObject : public FilterObject { bool exists_in_cache{false}; bool load_from_store{false}; bool attrs_read_from_cache{false}; + bool cache_request{false}; public: struct D4NFilterReadOp : FilterReadOp { @@ -297,6 +300,9 @@ class D4NFilterObject : public FilterObject { bool exists(void) override { if (exists_in_cache) { return true;} return next->exists(); }; bool load_obj_from_store() { return load_from_store; } void set_load_obj_from_store(bool load_from_store) { this->load_from_store = load_from_store; } + int delete_cache_entry(const DoutPrefixProvider* dpp, const std::string key, optional_yield y); + void set_cache_request() { cache_request = true; } + bool is_cache_request() { return cache_request; } }; class D4NFilterWriter : public FilterWriter { @@ -335,6 +341,7 @@ class D4NFilterWriter : public FilterWriter { uint32_t flags) override; bool is_atomic() { return atomic; }; const DoutPrefixProvider* get_dpp() { return this->dpp; } + void set_cache_request() { object->set_cache_request(); } }; class D4NFilterMultipartUpload : public FilterMultipartUpload { diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index a0a6068ae78a..b5fd4bb8ddf9 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -87,6 +87,10 @@ #include "rgw_flight_frontend.h" #endif +#ifdef WITH_RADOSGW_D4N +#include "driver/d4n/rgw_sal_d4n.h" +#endif + #ifdef WITH_LTTNG #define TRACEPOINT_DEFINE #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE @@ -2495,6 +2499,11 @@ void RGWGetObj::execute(optional_yield y) if (multipart_part_num) { read_op->params.part_num = &*multipart_part_num; } +#ifdef WITH_RADOSGW_D4N + if (s->info.env->get_optional("HTTP_X_RGW_CACHE_REQUEST") && (g_conf().get_val("rgw_filter") == "d4n")) { + dynamic_cast(s->object.get())->set_cache_request(); + } +#endif op_ret = read_op->prepare(s->yield, this); version_id = s->object->get_instance(); @@ -3350,6 +3359,11 @@ void RGWListBucket::execute(optional_yield y) params.list_versions = list_versions; params.allow_unordered = allow_unordered; params.shard_id = shard_id; +#ifdef WITH_RADOSGW_D4N + if (s->info.env->get_optional("HTTP_X_RGW_CACHE_REQUEST") && (g_conf().get_val("rgw_filter") == "d4n")) { + dynamic_cast(s->bucket.get())->set_cache_request(); + } +#endif rgw::sal::Bucket::ListResults results; @@ -4566,6 +4580,11 @@ void RGWPutObj::execute(optional_yield y) s->owner, pdest_placement, olh_epoch, s->req_id); } +#ifdef WITH_RADOSGW_D4N + if (s->info.env->get_optional("HTTP_X_RGW_CACHE_REQUEST") && (g_conf().get_val("rgw_filter") == "d4n")) { + dynamic_cast(processor.get())->set_cache_request(); + } +#endif op_ret = processor->prepare(s->yield); if (op_ret < 0) { @@ -5686,6 +5705,11 @@ void RGWDeleteObj::execute(optional_yield y) del_op->params.null_verid = null_verid; del_op->params.size_match = size_match; del_op->params.if_match = if_match; +#ifdef WITH_RADOSGW_D4N + if (s->info.env->get_optional("HTTP_X_RGW_CACHE_REQUEST") && (g_conf().get_val("rgw_filter") == "d4n")) { + dynamic_cast(s->object.get())->set_cache_request(); + } +#endif op_ret = del_op->delete_obj(this, y, rgw::sal::FLAG_LOG_OP); if (op_ret >= 0) { -- 2.47.3