From 92ebe54b853cec6146aae58fd822cdc98d5850f8 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Mon, 1 Sep 2025 14:26:17 +0530 Subject: [PATCH] rgw/d4n: adding a thread to asynchronously update localweight to the cache backend. Removing the code to update the localweight from GET and PUT requests. Signed-off-by: Pritha Srivastava --- src/common/options/rgw.yaml.in | 10 ++ src/rgw/driver/d4n/d4n_policy.cc | 191 ++++++++++++++++++++----------- src/rgw/driver/d4n/d4n_policy.h | 15 ++- 3 files changed, 148 insertions(+), 68 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 41c9a611270..dfbca544af2 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -4264,6 +4264,16 @@ options: flags: - startup with_legacy: true +- name: rgw_d4n_localweight_processing_interval + type: int + level: advanced + desc: This is the interval in seconds for invoking local weight writer thread + default: 3600 + services: + - rgw + flags: + - startup + with_legacy: true - name: rgw_topic_persistency_time_to_live type: uint level: advanced diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index f04c348398f..79341595134 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -71,6 +71,9 @@ int LFUDAPolicy::init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_ tc = std::thread(&CachePolicy::cleaning, this, dpp); } + lwthread = std::thread(&LFUDAPolicy::localweight_writer, this, dpp); + lw_quit = false; + try { boost::system::error_code ec; response< @@ -432,63 +435,69 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, { ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updating entry: " << key << dendl; using handle_type = boost::heap::fibonacci_heap>>::handle_type; - const std::lock_guard l(lfuda_lock); - int localWeight = age; - auto entry = find_entry(key); - bool updateLocalWeight = true; - uint64_t refcount = 0; - - if (!restore_val.empty()) { - updateLocalWeight = false; - localWeight = std::stoull(restore_val); - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl; - } + bool updateLocalWeight = true, should_notify = false; + { + const std::lock_guard l(lfuda_lock); + int localWeight = age; + auto entry = find_entry(key); + uint64_t refcount = 0; + if (!restore_val.empty()) { + updateLocalWeight = false; + localWeight = std::stoull(restore_val); + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl; + } - /* check the dirty flag in the existing entry for the key and the incoming dirty flag. If the - incoming dirty flag is false, that means update() is invoked as part of cleaning process, - so we must not update its localWeight. */ - if (entry) { - refcount = entry->refcount; - if (entry->dirty && dirty.has_value()) { - bool is_dirty = dirty.value(); - if (!is_dirty) { - localWeight = entry->localWeight; - updateLocalWeight = false; + /* check the dirty flag in the existing entry for the key and the incoming dirty flag. If the + incoming dirty flag is false, that means update() is invoked as part of cleaning process, + so we must not update its localWeight. */ + if (entry) { + refcount = entry->refcount; + if (entry->dirty && dirty.has_value()) { + bool is_dirty = dirty.value(); + if (!is_dirty) { + localWeight = entry->localWeight; + updateLocalWeight = false; + } + } + if (updateLocalWeight) { + localWeight = entry->localWeight + age; + } + if (op == RefCount::INCR) { + refcount += 1; + } + if (op == RefCount::DECR) { + if (refcount > 0) { + refcount -= 1; + } } } - if (updateLocalWeight) { - localWeight = entry->localWeight + age; - } - if (op == RefCount::INCR) { - refcount += 1; + //pick the existing value of dirty, if no value has been passed in + bool is_dirty = false; + if (dirty.has_value()) { + is_dirty = dirty.value(); + } else if (entry) { + is_dirty = entry->dirty; } - if (op == RefCount::DECR) { - if (refcount > 0) { - refcount -= 1; + _erase(dpp, key, y); + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << refcount << dendl; + LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, is_dirty, refcount, localWeight); + handle_type handle = entries_heap.push(e); + e->set_handle(handle); + entries_map.emplace(key, e); + + if (updateLocalWeight) { + updated_blocks.push_back(std::make_pair(key, localWeight)); + if (updated_blocks.size() >= LOCALWEIGHT_BATCH_SIZE) { + should_notify = true; } } - } - //pick the existing value of dirty, if no value has been passed in - bool is_dirty = false; - if (dirty.has_value()) { - is_dirty = dirty.value(); - } else if (entry) { - is_dirty = entry->dirty; - } - _erase(dpp, key, y); - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << refcount << dendl; - LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, is_dirty, refcount, localWeight); - handle_type handle = entries_heap.push(e); - e->set_handle(handle); - entries_map.emplace(key, e); - if (updateLocalWeight) { - int ret = -1; - if ((ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y)) < 0) - ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl; + weightSum += ((localWeight < 0) ? 0 : localWeight); + } //lock will be released here + if (should_notify) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): notify_one: "<< dendl; + lw_cond.notify_one(); } - - weightSum += ((localWeight < 0) ? 0 : localWeight); } void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, double creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val) @@ -504,11 +513,13 @@ void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std:: state = State::INIT; } - const std::lock_guard l(lfuda_cleaning_lock); - LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key}; - handle_type handle = object_heap.push(e); - e->set_handle(handle); - o_entries_map.emplace(key, std::make_pair(e, state)); + { + const std::lock_guard l(lfuda_cleaning_lock); + LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key}; + handle_type handle = object_heap.push(e); + e->set_handle(handle); + o_entries_map.emplace(key, std::make_pair(e, state)); + } cond.notify_one(); } @@ -537,18 +548,19 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o bool LFUDAPolicy::erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) { - const std::lock_guard l(lfuda_cleaning_lock); - auto p = o_entries_map.find(key); - if (p == o_entries_map.end()) { - return false; - } + { + const std::lock_guard l(lfuda_cleaning_lock); + auto p = o_entries_map.find(key); + if (p == o_entries_map.end()) { + return false; + } - object_heap.erase(p->second.first->handle); - delete p->second.first; - p->second.first = nullptr; - o_entries_map.erase(p); + object_heap.erase(p->second.first->handle); + delete p->second.first; + p->second.first = nullptr; + o_entries_map.erase(p); + } state_cond.notify_one(); - return true; } @@ -902,7 +914,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) } } //end-if (block.version == entry->version) } //end - else if op_ret == 0 - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ .objName = c_obj->get_name(), .bucketName = c_obj->get_bucket()->get_bucket_id(), @@ -956,7 +968,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) continue; } } - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ .objName = c_obj->get_name(), .bucketName = c_obj->get_bucket()->get_bucket_id(), @@ -982,6 +994,53 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) } //end-while true } +void LFUDAPolicy::localweight_writer(const DoutPrefixProvider* dpp) +{ + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Starting thread " << dendl; + auto TIMEOUT_DURATION = std::chrono::seconds(dpp->get_cct()->_conf->rgw_d4n_localweight_processing_interval); + while (!lw_quit.load()) { + std::vector> temp; + bool woke_up = false; + //sleep for some duration or till size crosses 10K before processing + { + std::unique_lock wait_lock(lfuda_lock); + woke_up = lw_cond.wait_for(wait_lock, TIMEOUT_DURATION, [this] { + return updated_blocks.size() >= LOCALWEIGHT_BATCH_SIZE || lw_quit.load(); + }); + if (lw_quit.load()) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Quit signal received, exiting" << dendl; + break; + } + if (!updated_blocks.empty()) { + updated_blocks.swap(temp); + if (woke_up) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Woke up due to size threshold, processing " << temp.size() << " items" << dendl; + } else { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Woke up due to timeout, processing " << temp.size() << " items" << dendl; + } + } + } //lock released here + if (!temp.empty()) { + ldpp_dout(dpp, 5) << "LFUDAPolicy::" << __func__ << "(): Processing batch of " << temp.size() << " items" << dendl; + for (auto& it : temp) { + if (lw_quit.load()) { + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Quit signal received, exiting" << lw_quit << dendl; + break; + } + auto& key = it.first; + auto localWeight = it.second; + ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method called for key: " << key << dendl; + int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl; + } + } //end-for + ldpp_dout(dpp, 5) << "LFUDAPolicy::" << __func__ << "(): Finished processing batch" << dendl; + } //end-if + }//end-while + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Thread exiting" << dendl; +} + int LRUPolicy::exist_key(const std::string& key) { const std::lock_guard l(lru_lock); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index c51dd8e0d3f..51f3e93405d 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -38,6 +38,7 @@ class CachePolicy { std::string version; bool dirty; uint64_t refcount{0}; + Entry() = default; Entry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount) : key(key), offset(offset), len(len), version(version), dirty(dirty), refcount(refcount) {} }; @@ -120,7 +121,7 @@ class LFUDAPolicy : public CachePolicy { int localWeight; using handle_type = boost::heap::fibonacci_heap>>::handle_type; handle_type handle; - + LFUDAEntry() = default; LFUDAEntry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount, int localWeight) : Entry(key, offset, len, version, dirty, refcount), localWeight(localWeight) {} @@ -149,7 +150,9 @@ class LFUDAPolicy : public CachePolicy { std::mutex lfuda_cleaning_lock; std::condition_variable cond; std::condition_variable state_cond; + std::condition_variable lw_cond; inline static std::atomic quit{false}; + inline static std::atomic lw_quit{false}; int age = 1, weightSum = 0, postedSum = 0; optional_yield y = null_yield; @@ -161,6 +164,10 @@ class LFUDAPolicy : public CachePolicy { std::optional rthread_timer; rgw::sal::Driver* driver; std::thread tc; + std::thread lwthread; + //data structure for accumulating updated blocks + std::vector> updated_blocks; + static constexpr size_t LOCALWEIGHT_BATCH_SIZE = 10000; CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y); int age_sync(const DoutPrefixProvider* dpp, optional_yield y); @@ -197,15 +204,18 @@ class LFUDAPolicy : public CachePolicy { delete blockDir; delete objDir; quit = true; + lw_quit = true; cond.notify_all(); + lw_cond.notify_all(); if (tc.joinable()) { tc.join(); } + if (lwthread.joinable()) { lwthread.join(); } for (auto& it : entries_map) { delete it.second; } for (auto& it : o_entries_map) { delete it.second.first; } - } + } virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver); virtual int exist_key(const std::string& key) override; @@ -228,6 +238,7 @@ class LFUDAPolicy : public CachePolicy { return it->second.first; } void save_y(optional_yield y) { this->y = y; } + void localweight_writer(const DoutPrefixProvider* dpp); }; class LRUPolicy : public CachePolicy { -- 2.39.5