]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
d4n/policy: Update local weight sum + age handling and d4n policy
authorSamarah <samarah.uriarte@ibm.com>
Fri, 12 Jan 2024 19:01:05 +0000 (19:01 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:52 +0000 (21:24 +0530)
test

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/common/options/rgw.yaml.in
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/test/rgw/test_d4n_policy.cc

index bcce8389a534174e2db309977ee2e883f9eaf57e..a3c11093fba484fef7847d8fd02b235d85891bd4 100644 (file)
@@ -3666,6 +3666,14 @@ options:
   see_also:
   - rgw_thread_pool_size
   with_legacy: true
+- name: rgw_lfuda_sync_frequency
+  type: int
+  level: advanced
+  desc: LFUDA variables' sync frequency in seconds 
+  default: 60
+  services:
+  - rgw
+  with_legacy: true
 - name: rgw_backend_store
   type: str
   level: advanced
index 5adba67db01c7f6595519b2269c6f522b100a11b..08386cf419ee2bd363fc658b2290243695ee56bb 100644 (file)
@@ -1,6 +1,5 @@
 #include "d4n_policy.h"
 
-#include <boost/lexical_cast.hpp>
 #include "../../../common/async/yield_context.h"
 #include "common/async/blocked_completion.h"
 #include "common/dout.h" 
@@ -17,8 +16,8 @@ struct initiate_exec {
   template <typename Handler, typename Response>
   void operator()(Handler handler, const boost::redis::request& req, Response& resp)
   {
-    auto h = boost::asio::consign(std::move(handler), conn);
-    return boost::asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] {
+    auto h = asio::consign(std::move(handler), conn);
+    return asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] {
       c->async_exec(req, resp, std::move(h));
     });
   }
@@ -29,13 +28,16 @@ auto async_exec(std::shared_ptr<connection> conn,
                 const boost::redis::request& req,
                 Response& resp, CompletionToken&& token)
 {
-  return boost::asio::async_initiate<CompletionToken,
+  return asio::async_initiate<CompletionToken,
          void(boost::system::error_code, std::size_t)>(
       initiate_exec{std::move(conn)}, token, req, resp);
 }
 
-template <typename T>
-void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response<T>& resp, optional_yield y)
+template <typename... Types>
+void redis_exec(std::shared_ptr<connection> conn,
+                boost::system::error_code& ec,
+                const boost::redis::request& req,
+                boost::redis::response<Types...>& resp, optional_yield y)
 {
   if (y) {
     auto yield = y.get_yield_context();
@@ -45,133 +47,195 @@ void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec,
   }
 }
 
-int LFUDAPolicy::set_age(int age, optional_yield y) {
+int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) {
+  dir->init(cct);
+  int result = 0;
+  response<int, int, int, int> resp;
+
   try {
     boost::system::error_code ec;
-    response<int> resp;
     request req;
-    req.push("HSET", "lfuda", "age", std::to_string(age));
-
+    req.push("HEXISTS", "lfuda", "age"); 
+    req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum)); /* New cache node will always have the minimum average weight */
+    req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size()));
+    req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_local_cache_address);
+  
     redis_exec(conn, ec, req, resp, y);
 
     if (ec) {
+      ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
     }
 
-    return std::get<0>(resp).value(); /* Returns number of fields set */
+    result = std::min(std::get<1>(resp).value(), std::min(std::get<2>(resp).value(), std::get<3>(resp).value()));
   } catch (std::exception &e) {
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
     return -EINVAL;
   }
-}
-
-int LFUDAPolicy::get_age(optional_yield y) {
-  response<int> resp;
 
-  try {
-    boost::system::error_code ec;
-    request req;
-    req.push("HEXISTS", "lfuda", "age");
-
-    redis_exec(conn, ec, req, resp, y);
+  if (!std::get<0>(resp).value()) { /* Only set maximum age if it doesn't exist */
+    try {
+      boost::system::error_code ec;
+      response<int> value;
+      request req;
+      req.push("HSET", "lfuda", "age", age);
+    
+      redis_exec(conn, ec, req, value, y);
+
+      if (ec) {
+       ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
+       return -ec.value();
+      }
 
-    if (ec) {
-      return -ec.value();
+      result = std::min(result, std::get<0>(value).value());
+    } catch (std::exception &e) {
+      ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
+      return -EINVAL;
     }
-  } catch (std::exception &e) {
-    return -EINVAL;
   }
 
-  if (!std::get<0>(resp).value()) {
-    if (set_age(1, y)) /* Initialize age */
-      return 1;
-    else
-      return -ENOENT;
-  }
+  asio::co_spawn(io_context.get_executor(),
+                  redis_sync(dpp, y), asio::detached);
+
+  return result;
+}
+
+int LFUDAPolicy::age_sync(const DoutPrefixProvider* dpp, optional_yield y) {
+  response<std::string> resp;
 
   try { 
     boost::system::error_code ec;
-    response<std::string> value;
     request req;
     req.push("HGET", "lfuda", "age");
       
-    redis_exec(conn, ec, req, value, y);
+    redis_exec(conn, ec, req, resp, y);
 
     if (ec) {
       return -ec.value();
     }
-
-    return std::stoi(std::get<0>(value).value());
   } catch (std::exception &e) {
     return -EINVAL;
   }
-}
-
-int LFUDAPolicy::set_local_weight_sum(int weight, optional_yield y) {
-  weight = weight > 0 ? weight : 0;
 
-  try {
-    boost::system::error_code ec;
-    response<int> resp;
-    request req;
-    req.push("HSET", dir->cct->_conf->rgw_local_cache_address, "localWeights", std::to_string(weight));
+  if (age > std::stoi(std::get<0>(resp).value()) || std::get<0>(resp).value().empty()) { /* Set new maximum age */
+    try { 
+      boost::system::error_code ec;
+      request req;
+      response<int> value;
+      req.push("HSET", "lfuda", "age", age);
+      redis_exec(conn, ec, req, resp, y);
 
-    redis_exec(conn, ec, req, resp, y);
+      if (ec) {
+       return -ec.value();
+      }
 
-    if (ec) {
-      return -ec.value();
+      return std::get<0>(value).value();
+    } catch (std::exception &e) {
+      return -EINVAL;
     }
-
-    return std::get<0>(resp).value(); /* Returns number of fields set */
-  } catch (std::exception &e) {
-    return -EINVAL;
+  } else {
+    age = std::stoi(std::get<0>(resp).value());
+    return 0;
   }
 }
 
-int LFUDAPolicy::get_local_weight_sum(optional_yield y) {
-  response<int> resp;
+int LFUDAPolicy::local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y) {
+  int result; 
 
-  try {
-    boost::system::error_code ec;
-    request req;
-    req.push("HEXISTS", dir->cct->_conf->rgw_local_cache_address, "localWeights");
+  if (fabs(weightSum - postedSum) > (postedSum * 0.1)) {
+    response<std::string, std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    try { 
+      boost::system::error_code ec;
+      request req;
+      req.push("HGET", "lfuda", "minLocalWeights_sum");
+      req.push("HGET", "lfuda", "minLocalWeights_size");
+       
+      redis_exec(conn, ec, req, resp, y);
 
-    if (ec) {
-      return -ec.value();
+      if (ec) {
+       return -ec.value();
+      }
+    } catch (std::exception &e) {
+      return -EINVAL;
     }
-  } catch (std::exception &e) {
-    return -EINVAL;
-  }
-
-  if (!std::get<0>(resp).value()) {
-    int sum = 0;
-    for (auto& entry : entries_map)
-      sum += entry.second->localWeight; 
-    if (int ret = set_local_weight_sum(sum, y) < 0) { /* Initialize */ 
-      return ret;
+  
+    float minAvgWeight = std::stof(std::get<0>(resp).value()) / std::stof(std::get<1>(resp).value());
+
+    if ((static_cast<float>(weightSum) / static_cast<float>(entries_map.size())) < minAvgWeight) { /* Set new minimum weight */
+      try { 
+       boost::system::error_code ec;
+       request req;
+       response<int, int, int> value;
+       req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum));
+       req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size()));
+       req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_local_cache_address);
+       redis_exec(conn, ec, req, resp, y);
+
+       if (ec) {
+         return -ec.value();
+       }
+
+       result = std::min(std::get<0>(value).value(), std::get<1>(value).value());
+       result = std::min(result, std::get<2>(value).value());
+      } catch (std::exception &e) {
+       return -EINVAL;
+      }
     } else {
-      return sum;
+      weightSum = std::stoi(std::get<0>(resp).value());
+      postedSum = std::stoi(std::get<0>(resp).value());
     }
   }
 
-  try { 
+  try { /* Post update for local cache */
     boost::system::error_code ec;
-    response<std::string> value;
     request req;
-    req.push("HGET", dir->cct->_conf->rgw_local_cache_address, "localWeights");
-      
-    redis_exec(conn, ec, req, value, y);
+    response<int, int> resp;
+    req.push("HSET", dpp->get_cct()->_conf->rgw_local_cache_address, "avgLocalWeight_sum", std::to_string(weightSum));
+    req.push("HSET", dpp->get_cct()->_conf->rgw_local_cache_address, "avgLocalWeight_size", std::to_string(entries_map.size()));
+    redis_exec(conn, ec, req, resp, y);
 
     if (ec) {
       return -ec.value();
     }
 
-    return std::stoi(std::get<0>(value).value());
+    result = std::min(std::get<0>(resp).value(), std::get<1>(resp).value());
   } catch (std::exception &e) {
     return -EINVAL;
   }
+  
+  return result;
+}
+
+asio::awaitable<void> LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, optional_yield y) {
+  rthread_timer.emplace(co_await asio::this_coro::executor);
+  co_await asio::this_coro::throw_if_cancelled(true);
+  co_await asio::this_coro::reset_cancellation_state(
+    asio::enable_terminal_cancellation());
+
+  for (;;) try {
+    /* Update age */
+    if (int ret = age_sync(dpp, y) < 0) {
+      ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl;
+    }
+    
+    /* Update minimum local weight sum */
+    if (int ret = local_weight_sync(dpp, y) < 0) {
+      ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl;
+    }
+
+    int interval = dpp->get_cct()->_conf->rgw_lfuda_sync_frequency;
+    rthread_timer->expires_after(std::chrono::seconds(interval));
+    co_await rthread_timer->async_wait(asio::use_awaitable);
+  } catch (sys::system_error& e) {
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
+
+    if (e.code() == asio::error::operation_aborted) {
+      break;
+    } else {
+      continue;
+    }
+  }
 }
 
 CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optional_yield y) {
@@ -205,47 +269,6 @@ int LFUDAPolicy::exist_key(std::string key) {
   return false;
 }
 
-#if 0
-int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
-  response<std::string> resp;
-  int age = get_age(y);
-
-  if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
-    auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size));
-    it->second->localWeight += age;
-    return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
-  } else {
-    if (eviction(dpp, block->size, cacheDriver, y) < 0)
-      return -1; 
-
-    int exists = dir->exist_key(block, y);
-    if (exists > 0) { /* Remote copy */
-      if (dir->get(block, y) < 0) {
-       return -1;
-      } else {
-       if (!block->hostsList.empty()) { 
-         block->globalWeight += age;
-         
-         if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) {
-           return -1;
-         } else {
-           return 0;
-         }
-       } else {
-          return -1;
-        }
-      }
-    } else if (!exists) { /* No remote copy */
-      // localWeight += age;
-      //return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
-      return 0;
-    } else {
-      return -1;
-    }
-  }
-}
-#endif
-
 int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) {
   uint64_t freeSpace = cacheDriver->get_free_space(dpp);
 
@@ -266,13 +289,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
       return -ENOENT;
     }
 
-    int avgWeight = get_local_weight_sum(y);
-    if (avgWeight < 0) {
-      delete victim;
-      return avgWeight;
-    }
-
-    avgWeight /= entries_map.size();
+    int avgWeight = weightSum / entries_map.size();
 
     if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */
       if (victim->globalWeight) {
@@ -316,14 +333,9 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
 
     ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl;
 
-    int weight = (avgWeight * entries_map.size()) - it->second->localWeight;
-    if (int ret = set_local_weight_sum(weight, y) < 0)
-      return ret;
+    weightSum = (avgWeight * entries_map.size()) - it->second->localWeight;
 
-    int age = get_age(y);
     age = std::max(it->second->localWeight, age);
-    if (int ret = set_age(age, y) < 0)
-      return ret;
 
     erase(dpp, key, y);
     freeSpace = cacheDriver->get_free_space(dpp);
@@ -336,17 +348,10 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64
 {
   using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
   const std::lock_guard l(lfuda_lock);
-
-  int age = get_age(y); 
-  if (age < 0) {
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): LFUDAPolicy get_age method failed." << dendl;
-    return;
-  }
   int localWeight = age;
   auto entry = find_entry(key);
   if (entry != nullptr) { 
-    entry->localWeight += age;
-    localWeight = entry->localWeight;
+    localWeight = entry->localWeight + age;
   }  
 
   erase(dpp, key, y);
@@ -359,15 +364,7 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64
   if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) 
     ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
 
-  int localWeights = get_local_weight_sum(y);
-  if (localWeights < 0) {
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl;
-    return;
-  }
-
-  localWeights += ((localWeight < 0) ? 0 : localWeight);
-  if (set_local_weight_sum(localWeights, y) < 0)
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl;
+  weightSum += ((localWeight < 0) ? 0 : localWeight);
 }
 
 bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
@@ -377,17 +374,7 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o
     return false;
   }
 
-  int localWeights = get_local_weight_sum(y);
-  if (localWeights < 0) {
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to retrieve sum of local weights for the cache backend." << dendl;
-    return false;
-  }
-
-  localWeights -= ((p->second->localWeight < 0) ? 0 : p->second->localWeight);
-  if (set_local_weight_sum(localWeights, y) < 0) {
-    ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Failed to update sum of local weights for the cache backend." << dendl;
-    return false;
-  }
+  weightSum -= ((p->second->localWeight < 0) ? 0 : p->second->localWeight);
 
   entries_heap.erase(p->second->handle);
   entries_map.erase(p);
index 60d55128ab0b377e7c722163d03a17401269c765..3740f5f047265ca7680a1ca8d4dd024519768f7e 100644 (file)
@@ -1,6 +1,11 @@
 #pragma once
 
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
 #include <boost/heap/fibonacci_heap.hpp>
+#include <boost/system/detail/errc.hpp>
+
 #include "d4n_directory.h"
 #include "rgw_sal_d4n.h"
 #include "rgw_cache_driver.h"
@@ -11,6 +16,9 @@ namespace rgw::sal {
 
 namespace rgw { namespace d4n {
 
+namespace asio = boost::asio;
+namespace sys = boost::system;
+
 class CachePolicy {
   protected:
     struct Entry : public boost::intrusive::list_base_hook<> {
@@ -33,7 +41,7 @@ class CachePolicy {
     CachePolicy() {}
     virtual ~CachePolicy() = default; 
 
-    virtual void init(CephContext *cct) = 0; 
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) = 0; 
     virtual int exist_key(std::string key) = 0;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
@@ -63,45 +71,51 @@ class LFUDAPolicy : public CachePolicy {
     using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
     Heap entries_heap;
     std::unordered_map<std::string, LFUDAEntry*> entries_map;
-    std::mutex lfuda_lock;
+    std::mutex lfuda_lock; 
 
+    int age = 1, weightSum = 0, postedSum = 0;
+    optional_yield y = null_yield;
     std::shared_ptr<connection> conn;
     BlockDirectory* dir;
     rgw::cache::CacheDriver* cacheDriver;
+    std::optional<asio::steady_timer> rthread_timer;
 
-    int set_age(int age, optional_yield y);
-    int get_age(optional_yield y);
-    int set_local_weight_sum(int weight, optional_yield y);
-    int get_local_weight_sum(optional_yield y);
     CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
+    int age_sync(const DoutPrefixProvider* dpp, optional_yield y); 
+    int local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y); 
+    asio::awaitable<void> redis_sync(const DoutPrefixProvider* dpp, optional_yield y);
+    void rthread_stop() {
+      std::lock_guard l{lfuda_lock};
+
+      if (rthread_timer) {
+       rthread_timer->cancel();
+      }
+    }
+    LFUDAEntry* find_entry(std::string key) { 
+      auto it = entries_map.find(key); 
+      if (it == entries_map.end())
+        return nullptr;
+      return it->second;
+    }
 
   public:
     LFUDAPolicy(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), 
-                                                                         conn(conn), 
-                                                                         cacheDriver(cacheDriver)
+                                                                                          conn(conn), 
+                                                                                          cacheDriver(cacheDriver)
     {
       dir = new BlockDirectory{conn};
     }
     ~LFUDAPolicy() {
+      rthread_stop();
       delete dir;
     } 
 
-    virtual void init(CephContext *cct) {
-      dir->init(cct);
-    }
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context);
     virtual int exist_key(std::string key) override;
-    //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
-
-    void set_local_weight(std::string& key, int localWeight);
-    LFUDAEntry* find_entry(std::string key) { 
-      auto it = entries_map.find(key); 
-      if (it == entries_map.end())
-        return nullptr;
-      return it->second;
-    }
+    void save_y(optional_yield y) { this->y = y; }
 };
 
 class LRUPolicy : public CachePolicy {
@@ -118,7 +132,7 @@ class LRUPolicy : public CachePolicy {
   public:
     LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
 
-    virtual void init(CephContext *cct) {
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) { return 0; 
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
     virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
index 691d589bf2ae5d2b763fbbb16df856c82d935a44..39dbd5772d95ea0e1a4a53574355be0ea7fa07d0 100644 (file)
@@ -33,7 +33,8 @@ static inline Object* nextObject(Object* t)
   return dynamic_cast<FilterObject*>(t)->get_next();
 }
 
-D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next) 
+D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next),
+                                                                                       io_context(io_context) 
 {
   conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
 
@@ -83,7 +84,7 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
   cacheDriver->initialize(dpp);
   objDir->init(cct);
   blockDir->init(cct);
-  policyDriver->get_cache_policy()->init(cct);
+  policyDriver->get_cache_policy()->init(cct, dpp, io_context);
 
   return 0;
 }
@@ -985,8 +986,6 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
   }
 
   baseAttrs.insert(attrs.begin(), attrs.end());
-
-  // is the accounted_size equivalent to the length? -Sam
   
   //bufferlist bl_empty;
   //int putReturn = driver->get_cache_driver()->
index 0893371a190bcd414716bf53c9fb13de35bf8687..46fcfb5c1ad1bb3d69fa82391d1c543570698c01 100644 (file)
@@ -47,6 +47,7 @@ class D4NFilterDriver : public FilterDriver {
     rgw::d4n::ObjectDirectory* objDir;
     rgw::d4n::BlockDirectory* blockDir;
     rgw::d4n::PolicyDriver* policyDriver;
+    boost::asio::io_context& io_context;
 
   public:
     D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context);
index b94fa426ef2afb949d66363aaf3ac50432995c94..78fead5732d7361342bc1a318639f2bef1437eb4 100644 (file)
@@ -5,6 +5,7 @@
 #include "gtest/gtest.h"
 #include "gtest/gtest_prod.h"
 #include "common/ceph_argparse.h"
+#include "common/async/blocked_completion.h"
 #include "rgw_auth_registry.h"
 #include "driver/d4n/d4n_policy.h"
 
@@ -60,7 +61,7 @@ class LFUDAPolicyFixture : public ::testing::Test {
        .hostsList = { env->redisHost }
       };
 
-      conn = std::make_shared<connection>(boost::asio::make_strand(io));
+      conn = std::make_shared<connection>(net::make_strand(io));
       rgw::cache::Partition partition_info{ .location = "RedisCache", .size = 1000 };
       cacheDriver = new rgw::cache::RedisDriver{io, partition_info};
       policyDriver = new rgw::d4n::PolicyDriver(conn, cacheDriver, "lfuda");
@@ -73,7 +74,6 @@ class LFUDAPolicyFixture : public ::testing::Test {
 
       dir->init(env->cct);
       cacheDriver->initialize(env->dpp);
-      policyDriver->get_cache_policy()->init(env->cct);
 
       bl.append("test data");
       bufferlist attrVal;
@@ -100,13 +100,12 @@ class LFUDAPolicyFixture : public ::testing::Test {
     }
 
     int lfuda(const DoutPrefixProvider* dpp, rgw::d4n::CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
-      int age = 5; /* Arbitrary number for testing */ 
+      int age = 1;  
       std::string oid = build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size);
 
       if (this->policyDriver->get_cache_policy()->exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
-       auto entry = dynamic_cast<rgw::d4n::LFUDAPolicy*>(this->policyDriver->get_cache_policy())->find_entry(oid);
-       entry->localWeight += age;
-       return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(entry->localWeight), y);
+       policyDriver->get_cache_policy()->update(env->dpp, oid, 0, bl.length(), "", y);
+        return 0;
       } else {
        if (this->policyDriver->get_cache_policy()->eviction(dpp, block->size, y) < 0)
          return -1;
@@ -162,13 +161,13 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
     ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield}));
     policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield});
 
-    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+    ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
 
     cacheDriver->shutdown();
 
     boost::system::error_code ec;
     request req;
-    req.push("HGET", "RedisCache/testBucket_testName_0_0", "localWeight");
+    req.push("HGET", "RedisCache/testBucket_testName_0_0", "user.rgw.localWeight");
     req.push("FLUSHALL");
 
     response<std::string, boost::redis::ignore_t> resp;
@@ -176,7 +175,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
     conn->async_exec(req, resp, yield[ec]);
 
     ASSERT_EQ((bool)ec, false);
-    EXPECT_EQ(std::get<0>(resp).value(), "6");
+    EXPECT_EQ(std::get<0>(resp).value(), "2");
     conn->cancel();
   });
 
@@ -243,7 +242,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     ASSERT_EQ((bool)ec, false);
     EXPECT_EQ(std::get<0>(resp).value(), 0);
     EXPECT_EQ(std::get<1>(resp).value(), 0);
-    EXPECT_EQ(std::get<2>(resp).value(), "5");
+    EXPECT_EQ(std::get<2>(resp).value(), "1");
     conn->cancel();
   });