]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: adding a thread to asynchronously update
authorPritha Srivastava <prsrivas@redhat.com>
Mon, 1 Sep 2025 08:56:17 +0000 (14:26 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 16 Jun 2026 08:01:44 +0000 (13:31 +0530)
localweight to the cache backend. Removing the code
to update the localweight from GET and PUT requests.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/common/options/rgw.yaml.in
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h

index d9111ffeb071f6c61f1d89b367308c9502cce165..06defc466e1f9380d604f26394f91fb156ef4627 100644 (file)
@@ -4468,6 +4468,16 @@ options:
   flags:
   - startup
   with_legacy: true
+- name: rgw_d4n_localweight_processing_interval
+  type: int
+  level: advanced
+  desc: This is the interval in seconds for invoking local weight writer thread
+  default: 3600
+  services:
+  - rgw
+  flags:
+  - startup
+  with_legacy: true
 - name: rgw_topic_persistency_time_to_live
   type: uint
   level: advanced
index 78259af039dbebb3ab273ce252673ac70b874dac..45122f5436ce914957ad64966d7190dc6749f09e 100644 (file)
@@ -71,6 +71,9 @@ int LFUDAPolicy::init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_
     tc = std::thread(&CachePolicy::cleaning, this, dpp);
   }
 
+  lwthread = std::thread(&LFUDAPolicy::localweight_writer, this, dpp);
+  lw_quit = false;
+
   try {
     boost::system::error_code ec;
     response<
@@ -432,74 +435,80 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key,
 {
   ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updating entry: " << key << dendl;
   using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
-  const std::lock_guard l(lfuda_lock);
-  int localWeight = age;
-  auto entry = find_entry(key);
-  bool updateLocalWeight = true;
-  uint64_t refcount = 0;
-
-  if (!restore_val.empty()) {
-    updateLocalWeight = false;
-    localWeight = std::stoull(restore_val);
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl;
-  }
+  bool updateLocalWeight = true, should_notify = false;
+  {
+    const std::lock_guard l(lfuda_lock);
+    int localWeight = age;
+    auto entry = find_entry(key);
+    uint64_t refcount = 0;
+    if (!restore_val.empty()) {
+      updateLocalWeight = false;
+      localWeight = std::stoull(restore_val);
+      ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl;
+    }
 
-  /* check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
-     incoming dirty flag is false, that means update() is invoked as part of cleaning process,
-     so we must not update its localWeight. */
-  if (entry) {
-    refcount = entry->refcount;
-    if (entry->dirty && dirty.has_value()) {
-      bool is_dirty = dirty.value();
-      if (!is_dirty) {
-        localWeight = entry->localWeight;
-        updateLocalWeight = false;
+    /* check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
+      incoming dirty flag is false, that means update() is invoked as part of cleaning process,
+      so we must not update its localWeight. */
+    if (entry) {
+      refcount = entry->refcount;
+      if (entry->dirty && dirty.has_value()) {
+        bool is_dirty = dirty.value();
+        if (!is_dirty) {
+          localWeight = entry->localWeight;
+          updateLocalWeight = false;
+        }
+      }
+      if (updateLocalWeight) {
+        localWeight = entry->localWeight + age;
+      }
+      if (op == RefCount::INCR) {
+        refcount += 1;
+      }
+      if (op == RefCount::DECR) {
+        if (refcount > 0) {
+          refcount -= 1;
+        }
       }
     }
-    if (updateLocalWeight) {
-      localWeight = entry->localWeight + age;
+    //pick the existing value of dirty, if no value has been passed in
+    bool is_dirty = false;
+    if (dirty.has_value()) {
+      is_dirty = dirty.value();
+    } else if (entry) {
+      is_dirty = entry->dirty;
     }
-    if (op == RefCount::INCR) {
-      refcount += 1;
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << refcount << dendl;
+
+    if (entry) {
+      entry->key = key;
+      entry->offset = offset;
+      entry->len = len;
+      entry->version = version;
+      entry->dirty = is_dirty;
+      entry->refcount = refcount;
+      entry->localWeight = localWeight;
+      entries_heap.update(entry->handle, entry);
+    } else {
+      LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, is_dirty, refcount, localWeight);
+      handle_type handle = entries_heap.push(e);
+      e->set_handle(handle);
+      entries_map.emplace(key, e);
     }
-    if (op == RefCount::DECR) {
-      if (refcount > 0) {
-        refcount -= 1;
+
+    if (updateLocalWeight) {
+      updated_blocks.emplace(key, localWeight);
+      if (updated_blocks.size() >= LOCALWEIGHT_BATCH_SIZE) {
+        should_notify = true;
       }
     }
-  }
-  //pick the existing value of dirty, if no value has been passed in
-  bool is_dirty = false;
-  if (dirty.has_value()) {
-    is_dirty = dirty.value();
-  } else if (entry) {
-    is_dirty = entry->dirty;
-  }
-  ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << refcount << dendl;
-
-  if (entry) {
-    entry->key = key;
-    entry->offset = offset;
-    entry->len = len;
-    entry->version = version;
-    entry->dirty = is_dirty;
-    entry->refcount = refcount;
-    entry->localWeight = localWeight;
-    entries_heap.update(entry->handle, entry);
-  } else {
-    LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, is_dirty, refcount, localWeight);
-    handle_type handle = entries_heap.push(e);
-    e->set_handle(handle);
-    entries_map.emplace(key, e);
-  }
 
-  if (updateLocalWeight) {
-    int ret = -1;
-    if ((ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y)) < 0) 
-      ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl;
+    weightSum += ((localWeight < 0) ? 0 : localWeight);
+  } //lock will be released here
+  if (should_notify) {
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): notify_one: "<< dendl;
+    lw_cond.notify_one();
   }
-
-  weightSum += ((localWeight < 0) ? 0 : localWeight);
 }
 
 void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, double creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val)
@@ -515,11 +524,13 @@ void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::
     state = State::INIT;
   }
 
-  const std::lock_guard l(lfuda_cleaning_lock);
-  LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
-  handle_type handle = object_heap.push(e);
-  e->set_handle(handle);
-  o_entries_map.emplace(key, std::make_pair(e, state));
+  {
+    const std::lock_guard l(lfuda_cleaning_lock);
+    LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
+    handle_type handle = object_heap.push(e);
+    e->set_handle(handle);
+    o_entries_map.emplace(key, std::make_pair(e, state));
+  }
   cond.notify_one();
 }
 
@@ -548,18 +559,19 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o
 
 bool LFUDAPolicy::erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
 {
-  const std::lock_guard l(lfuda_cleaning_lock);
-  auto p = o_entries_map.find(key);
-  if (p == o_entries_map.end()) {
-    return false;
-  }
+  {
+    const std::lock_guard l(lfuda_cleaning_lock);
+    auto p = o_entries_map.find(key);
+    if (p == o_entries_map.end()) {
+      return false;
+    }
 
-  object_heap.erase(p->second.first->handle);
-  delete p->second.first;
-  p->second.first = nullptr;
-  o_entries_map.erase(p);
+    object_heap.erase(p->second.first->handle);
+    delete p->second.first;
+    p->second.first = nullptr;
+    o_entries_map.erase(p);
+  }
   state_cond.notify_one();
-
   return true;
 }
 
@@ -908,7 +920,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
              }
            } //end-if (block.version == entry->version)
          } //end - else if op_ret == 0
-         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+         ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
          rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
            .objName = c_obj->get_name(),
            .bucketName = c_obj->get_bucket()->get_bucket_id(),
@@ -962,7 +974,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
                continue;
              }
            }
-           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+           ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
            rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
              .objName = c_obj->get_name(),
              .bucketName = c_obj->get_bucket()->get_bucket_id(),
@@ -988,6 +1000,53 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
   } //end-while true
 }
 
+void LFUDAPolicy::localweight_writer(const DoutPrefixProvider* dpp)
+{
+  ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Starting thread " << dendl;
+  auto TIMEOUT_DURATION = std::chrono::seconds(dpp->get_cct()->_conf->rgw_d4n_localweight_processing_interval);
+  while (!lw_quit.load()) {
+    std::unordered_map<std::string, uint64_t> temp;
+    bool woke_up = false;
+    //sleep for some duration or till size crosses 10K before processing
+    {
+      std::unique_lock<std::mutex> wait_lock(lfuda_lock);
+      woke_up = lw_cond.wait_for(wait_lock, TIMEOUT_DURATION, [this] {
+                return updated_blocks.size() >= LOCALWEIGHT_BATCH_SIZE || lw_quit.load();
+      });
+      if (lw_quit.load()) {
+          ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Quit signal received, exiting" << dendl;
+          break;
+      }
+      if (!updated_blocks.empty()) {
+          updated_blocks.swap(temp);
+          if (woke_up) {
+              ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Woke up due to size threshold, processing " << temp.size() << " items" << dendl;
+          } else {
+              ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Woke up due to timeout, processing " << temp.size() << " items" << dendl;
+          }
+      }
+    } //lock released here
+    if (!temp.empty()) {
+      ldpp_dout(dpp, 5) << "LFUDAPolicy::" << __func__ << "(): Processing batch of " << temp.size() << " items" << dendl;
+      for (auto& it : temp) {
+        if (lw_quit.load()) {
+          ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Quit signal received, exiting" << lw_quit << dendl;
+          break;
+        }
+        auto& key = it.first;
+        auto localWeight = it.second;
+        ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method called for key: " << key << dendl;
+        int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y);
+        if (ret < 0) {
+          ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl;
+        }
+      } //end-for
+      ldpp_dout(dpp, 5) << "LFUDAPolicy::" << __func__ << "(): Finished processing batch" << dendl;
+    } //end-if
+  }//end-while
+  ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Thread exiting" << dendl;
+}
+
 int LRUPolicy::exist_key(const std::string& key)
 {
   const std::lock_guard l(lru_lock);
index c51dd8e0d3f258c0a876614627da8faf27afcf6d..c3ca641b3cb5a877b0e8a9d462b8d1f521f8fec9 100644 (file)
@@ -38,6 +38,7 @@ class CachePolicy {
       std::string version;
       bool dirty;
       uint64_t refcount{0};
+      Entry() = default;
       Entry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount) : key(key), offset(offset), 
                                                                                                len(len), version(version), dirty(dirty), refcount(refcount) {}
       };
@@ -120,7 +121,7 @@ class LFUDAPolicy : public CachePolicy {
       int localWeight;
       using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
       handle_type handle;
-
+      LFUDAEntry() = default;
       LFUDAEntry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount, int localWeight) : Entry(key, offset, len, version, dirty, refcount), 
                                                                                                                       localWeight(localWeight) {}
       
@@ -149,7 +150,9 @@ class LFUDAPolicy : public CachePolicy {
     std::mutex lfuda_cleaning_lock;
     std::condition_variable cond;
     std::condition_variable state_cond;
+    std::condition_variable lw_cond;
     inline static std::atomic<bool> quit{false};
+    inline static std::atomic<bool> lw_quit{false};
 
     int age = 1, weightSum = 0, postedSum = 0;
     optional_yield y = null_yield;
@@ -161,6 +164,10 @@ class LFUDAPolicy : public CachePolicy {
     std::optional<asio::steady_timer> rthread_timer;
     rgw::sal::Driver* driver;
     std::thread tc;
+    std::thread lwthread;
+    //data structure for accumulating updated blocks
+    std::unordered_map<std::string, uint64_t> updated_blocks;
+    static constexpr size_t LOCALWEIGHT_BATCH_SIZE = 10000;
 
     CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
     int age_sync(const DoutPrefixProvider* dpp, optional_yield y); 
@@ -197,15 +204,18 @@ class LFUDAPolicy : public CachePolicy {
       delete blockDir;
       delete objDir;
       quit = true;
+      lw_quit = true;
       cond.notify_all();
+      lw_cond.notify_all();
       if (tc.joinable()) { tc.join(); }
+      if (lwthread.joinable()) { lwthread.join(); }
       for (auto& it : entries_map) {
         delete it.second;
       }
       for (auto& it : o_entries_map) {
         delete it.second.first;
       }
-    } 
+    }
 
     virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
     virtual int exist_key(const std::string& key) override;
@@ -228,6 +238,7 @@ class LFUDAPolicy : public CachePolicy {
       return it->second.first;
     }
     void save_y(optional_yield y) { this->y = y; }
+    void localweight_writer(const DoutPrefixProvider* dpp);
 };
 
 class LRUPolicy : public CachePolicy {