]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: this commit squashes the following two commits.
authorSamarah <samarah.uriarte@ibm.com>
Thu, 19 Oct 2023 15:34:20 +0000 (15:34 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:51 +0000 (21:24 +0530)
rgw/d4n: Remove `get_block` from policy and add logic to D4N filter
d4n/policy: Create a min heap of the local weights

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/test/rgw/test_d4n_policy.cc

index 48ff4543fe2763878d1aabbacf19c975f99d6080..71ba05115b330c9b8469a880a3ae2f9a8aff933e 100644 (file)
@@ -6,10 +6,6 @@
 
 namespace rgw { namespace d4n {
 
-std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) {
-  return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size); 
-}
-
 // initiate a call to async_exec() on the connection's executor
 struct initiate_exec {
   std::shared_ptr<boost::redis::connection> conn;
@@ -178,21 +174,18 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) {
 }
 
 CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) {
-  if (entries_map.empty())
+  if (entries_heap.empty())
     return {};
 
-  auto it = std::min_element(std::begin(entries_map), std::end(entries_map),
-                             [](const auto& l, const auto& r) { return l.second->localWeight < r.second->localWeight; });
-
   /* Get victim cache block */
-  std::string key = it->second->key;
+  std::string key = entries_heap.top()->key;
   CacheBlock victim;
 
   victim.cacheObj.bucketName = key.substr(0, key.find('_')); 
   key.erase(0, key.find('_') + 1);
   victim.cacheObj.objName = key.substr(0, key.find('_'));
-  victim.blockID = it->second->offset;
-  victim.size = it->second->len;
+  victim.blockID = entries_heap.top()->offset;
+  victim.size = entries_heap.top()->len;
 
   if (dir->get(&victim, y) < 0) {
     return {};
@@ -216,23 +209,18 @@ int LFUDAPolicy::exist_key(std::string key) {
   return false;
 }
 
-int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
+#if 0
+int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
   response<std::string> resp;
   int age = get_age(y);
 
   if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
     auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size));
     it->second->localWeight += age;
-    return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
+    return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
   } else {
-    uint64_t freeSpace = cacheNode->get_free_space(dpp);
-
-    while (freeSpace < block->size) { /* Not enough space in local cache */
-      if (int ret = eviction(dpp, cacheNode, y) > 0)
-        freeSpace += ret;
-      else 
-        return -1;
-    }
+    if (eviction(dpp, block->size, cacheDriver, y) < 0)
+      return -1; // what if eviction turns into infinite loop? -Sam
 
     int exists = dir->exist_key(block, y);
     if (exists > 0) { /* Remote copy */
@@ -252,107 +240,141 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw
         }
       }
     } else if (!exists) { /* No remote copy */
-      // how to get bufferlist data? -Sam
-      // do I need to add the block to the local cache here? -Sam
-      // update hosts list for block as well?
-      // insert entry here? -Sam
       // localWeight += age;
-      //return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
+      //return cacheDriver->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
       return 0;
     } else {
       return -1;
     }
   }
 }
+#endif
 
-uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
-  CacheBlock victim = find_victim(dpp, y);
+int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) {
+  uint64_t freeSpace = cacheDriver->get_free_space(dpp);
 
-  if (victim.cacheObj.objName.empty()) {
-    ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
-    return 0; /* Return zero for failure */
-  }
+  while (freeSpace < size) {
+    CacheBlock victim = find_victim(dpp, y);
 
-  std::string key = build_index(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.blockID, victim.size);
-  auto it = entries_map.find(key);
-  if (it == entries_map.end()) {
-    return 0;
-  }
+    if (victim.cacheObj.objName.empty()) {
+      ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not retrieve victim block" << dendl;
+      return -1;
+    }
 
-  int avgWeight = get_min_avg_weight(y);
-  if (avgWeight < 0) {
-    return 0;
-  }
+    std::string key = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size);
+    auto it = entries_map.find(key);
+    if (it == entries_map.end()) {
+      return -1;
+    }
+
+    int avgWeight = get_min_avg_weight(y);
+    if (avgWeight < 0) {
+      return -1;
+    }
+
+    if (victim.hostsList.size() == 1 && victim.hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */
+      if (victim.globalWeight) {
+       it->second->localWeight += victim.globalWeight;
+
+       for (auto& entry : entries_heap) {
+         if (entry->key == key) {
+           (*(entry->handle))->localWeight = it->second->localWeight;
+           entries_heap.increase(entry->handle);
+         }
+       }
 
-  if (victim.hostsList.size() == 1 && victim.hostsList[0] == dir->cct->_conf->rgw_local_cache_address) { /* Last copy */
-    if (victim.globalWeight) {
-      it->second->localWeight += victim.globalWeight;
-      if (cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y) < 0) {
-       return 0;
+       if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(it->second->localWeight), y) < 0) {
+         return -1;
+       }
+
+       victim.globalWeight = 0;
+       if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) {
+         return -1;
+       }
       }
 
-      victim.globalWeight = 0;
-      if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) {
-       return 0;
+      if (it->second->localWeight > avgWeight) {
+       // TODO: push victim block to remote cache
       }
     }
 
-    if (it->second->localWeight > avgWeight) {
-      // TODO: push victim block to remote cache
+    victim.globalWeight += it->second->localWeight;
+    if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) {
+      return -1;
     }
-  }
 
-  victim.globalWeight += it->second->localWeight;
-  if (dir->update_field(&victim, "globalWeight", std::to_string(victim.globalWeight), y) < 0) { // just have one update? -Sam
-    return 0;
-  }
+    if (dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
+      return -1;
+    } else {
+      if (cacheDriver->del(dpp, key, y) < 0) {
+        return -1;
+      } else {
+       ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
 
-  ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
+       uint64_t num_entries = entries_map.size();
 
-  if (cacheNode->del(dpp, key, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
-    return 0;
-  } else {
-    uint64_t num_entries = entries_map.size();
+       if (!avgWeight) {
+         if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam 
+           return -1;
+       } else {
+         if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam 
+           return -1;
+       } 
 
-    if (!avgWeight) {
-      if (set_min_avg_weight(0, dir->cct->_conf->rgw_local_cache_address, y) < 0) // Where else must this be set? -Sam 
-       return 0;
-    } else {
-      if (set_min_avg_weight(avgWeight - (it->second->localWeight/num_entries), dir->cct->_conf->rgw_local_cache_address, y) < 0) { // Where else must this be set? -Sam 
-       return 0;
-    } 
-      int age = get_age(y);
-      age = std::max(it->second->localWeight, age);
-      if (set_age(age, y) < 0)
-       return 0;
+       int age = get_age(y);
+       age = std::max(it->second->localWeight, age);
+       if (set_age(age, y) < 0)
+         return -1;
+      }
     }
-  }
 
-  return victim.size; // this doesn't account for the additional attributes that were removed and need to be set with the new block -Sam
+    freeSpace = cacheDriver->get_free_space(dpp);
+  }
+  
+  return 0;
 }
 
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
 {
-  erase(dpp, key);
+  using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
 
-  int age = get_age(y);
-  assert(age > -1);
+  int age = get_age(y); 
+  int localWeight = age;
+  auto entry = find_entry(key);
+  if (entry != nullptr) { 
+    entry->localWeight += age;
+    localWeight = entry->localWeight;
+  }  
+
+  erase(dpp, key);
   
-  LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, age);
-  entries_lfuda_list.push_back(*e);
+  LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, localWeight);
+  handle_type handle = entries_heap.push(e);
+  e->set_handle(handle);
   entries_map.emplace(key, e);
+
+  if (cacheDriver->set_attr(dpp, key, "localWeight", std::to_string(localWeight), y) < 0) {
+    ldpp_dout(dpp, 10) << "LFUDAPolicy::update:: " << __func__ << "(): Cache driver set_attr method failed." << dendl;
+  }
 }
 
 bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
 {
+  for (auto const& it : entries_heap) {
+    if (it->key == key) {
+      entries_heap.erase(it->handle);
+      break;
+    }
+  }
+
   auto p = entries_map.find(key);
   if (p == entries_map.end()) {
     return false;
   }
 
   entries_map.erase(p);
-  entries_lfuda_list.erase_and_dispose(entries_lfuda_list.iterator_to(*(p->second)), LFUDA_Entry_delete_disposer());
-  return true;
+
+  return false;
 }
 
 int LRUPolicy::exist_key(std::string key)
@@ -364,26 +386,24 @@ int LRUPolicy::exist_key(std::string key)
     return false;
 }
 
-int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y)
 {
-  uint64_t freeSpace = cacheNode->get_free_space(dpp);
-  while(freeSpace < block->size) {
-    freeSpace = eviction(dpp, cacheNode, y);
+  uint64_t freeSpace = cacheDriver->get_free_space(dpp);
+
+  while (freeSpace < size) {
+    const std::lock_guard l(lru_lock);
+    auto p = entries_lru_list.front();
+    entries_map.erase(entries_map.find(p.key));
+    entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
+    cacheDriver->delete_data(dpp, p.key, null_yield);
+
+    freeSpace = cacheDriver->get_free_space(dpp);
   }
-  return 0;
-}
 
-uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y)
-{
-  const std::lock_guard l(lru_lock);
-  auto p = entries_lru_list.front();
-  entries_map.erase(entries_map.find(p.key));
-  entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
-  cacheNode->delete_data(dpp, p.key, null_yield);
-  return cacheNode->get_free_space(dpp);
+  return 0;
 }
 
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
 {
   erase(dpp, key);
 
@@ -404,16 +424,4 @@ bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
   return true;
 }
 
-int PolicyDriver::init() {
-  if (policyName == "lfuda") {
-    cachePolicy = new LFUDAPolicy(io);
-    return 0;
-  } else if (policyName == "lru") {
-    cachePolicy = new LRUPolicy();
-    return 0;
-  }
-
-  return -1;
-}
-
 } } // namespace rgw::d4n
index 63da686d9a67f1f277ea535d7693f69fe4b44f1f..1a34aad6bd713ef6ce574c8e3391dc7989fd9db9 100644 (file)
@@ -1,14 +1,18 @@
 #pragma once
 
-#include <string>
-#include <iostream>
+#include <boost/heap/fibonacci_heap.hpp>
 #include "rgw_common.h"
 #include "d4n_directory.h"
-#include "../../rgw_redis_driver.h"
+#include "rgw_sal_d4n.h"
+#include "rgw_cache_driver.h"
 
 #define dout_subsys ceph_subsys_rgw
 #define dout_context g_ceph_context
 
+namespace rgw::sal {
+  class D4NFilterObject;
+}
+
 namespace rgw { namespace d4n {
 
 class CachePolicy {
@@ -35,34 +39,40 @@ class CachePolicy {
 
     virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; } 
     virtual int exist_key(std::string key) = 0;
-    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
-    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+    virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) = 0;
     virtual void shutdown() = 0;
 };
 
 class LFUDAPolicy : public CachePolicy {
   private:
+    template<typename T>
+    struct EntryComparator {
+      bool operator()(T* const e1, T* const e2) const {
+       return e1->localWeight > e2->localWeight;
+      }
+    }; 
+
     struct LFUDAEntry : public Entry {
       int localWeight;
-      LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string version, int localWeight) : Entry(key, offset, len, version), 
-                                                                                                         localWeight(localWeight) {}
-    };
+      using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
+      handle_type handle;
 
-    struct LFUDA_Entry_delete_disposer : public Entry_delete_disposer {
-      void operator()(LFUDAEntry *e) {
-        delete e;
-      }
+      LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, int localWeight) : Entry(key, offset, len, version),
+                                                                                                           localWeight(localWeight) {}
+      
+      void set_handle(handle_type handle_) { handle = handle_; } 
     };
-    typedef boost::intrusive::list<LFUDAEntry> List;
 
+    using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
+    Heap entries_heap;
     std::unordered_map<std::string, LFUDAEntry*> entries_map;
 
     net::io_context& io;
     std::shared_ptr<connection> conn;
-    List entries_lfuda_list;
     BlockDirectory* dir;
+    rgw::cache::CacheDriver* cacheDriver;
 
     int set_age(int age, optional_yield y);
     int get_age(optional_yield y);
@@ -71,7 +81,7 @@ class LFUDAPolicy : public CachePolicy {
     CacheBlock find_victim(const DoutPrefixProvider* dpp, optional_yield y);
 
   public:
-    LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) {
+    LFUDAPolicy(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver) : CachePolicy(), io(io_context), cacheDriver{cacheDriver} {
       conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
       dir = new BlockDirectory{io};
     }
@@ -99,11 +109,19 @@ class LFUDAPolicy : public CachePolicy {
       return 0;
     }
     virtual int exist_key(std::string key) override;
-    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
-    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    //virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
     virtual void shutdown() override;
+
+    void set_local_weight(std::string& key, int localWeight);
+    LFUDAEntry* find_entry(std::string key) { 
+      auto it = entries_map.find(key); 
+      if (it == entries_map.end())
+        return nullptr;
+      return it->second;
+    }
 };
 
 class LRUPolicy : public CachePolicy {
@@ -113,31 +131,36 @@ class LRUPolicy : public CachePolicy {
     std::unordered_map<std::string, Entry*> entries_map;
     std::mutex lru_lock;
     List entries_lru_list;
+    rgw::cache::CacheDriver* cacheDriver;
 
   public:
-    LRUPolicy() = default;
+    LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
 
     virtual int exist_key(std::string key) override;
-    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
-    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
     virtual void shutdown() override {}
 };
 
 class PolicyDriver {
   private:
-    net::io_context& io;
     std::string policyName;
     CachePolicy* cachePolicy;
 
   public:
-    PolicyDriver(net::io_context& io_context, std::string _policyName) : io(io_context), policyName(_policyName) {}
+    PolicyDriver(net::io_context& io_context, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName) 
+    {
+      if (policyName == "lfuda") {
+       cachePolicy = new LFUDAPolicy(io_context, cacheDriver);
+      } else if (policyName == "lru") {
+       cachePolicy = new LRUPolicy(cacheDriver);
+      }
+    }
     ~PolicyDriver() {
       delete cachePolicy;
     }
 
-    int init();
     CachePolicy* get_cache_policy() { return cachePolicy; }
     std::string get_policy_name() { return policyName; }
 };
index f0c9650a249fa0f548b20159dd65e46c1bcacd62..8c8e7a8d92f1e8d6cb9ef0b03649ecf15cec89bc 100644 (file)
@@ -48,7 +48,7 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont
   cacheDriver = new rgw::cache::SSDDriver(partition_info);
   objDir = new rgw::d4n::ObjectDirectory(io_context);
   blockDir = new rgw::d4n::BlockDirectory(io_context);
-  policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda");
+  policyDriver = new rgw::d4n::PolicyDriver(io_context, cacheDriver, "lfuda");
 }
 
  D4NFilterDriver::~D4NFilterDriver()
@@ -68,7 +68,6 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
   objDir->init(cct, dpp);
   blockDir->init(cct, dpp);
 
-  policyDriver->init(); 
   policyDriver->get_cache_policy()->init(cct, dpp);
 
   return 0;
@@ -583,7 +582,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       // Read From Cache
       auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
 
-      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", source->driver->get_cache_driver(), y);
+      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", y);
 
       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
@@ -604,7 +603,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
         // Read From Cache
         auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
 
-        source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", source->driver->get_cache_driver(), y);
+       source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", y);
 
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
@@ -641,8 +640,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     len -= obj_max_req_size;
   } while (start_part_num < num_parts);
 
-
-
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
 
   Attrs obj_attrs;
@@ -669,7 +666,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl;
     return r;
   }
-  
+
   return this->cb->flush_last_part();
 }
 
@@ -708,24 +705,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
       block.blockID = ofs; // TODO: fill out block correctly
       block.size = bl.length();
-      block.blockID = ofs;
-      uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
-      while(freeSpace < block.size) {
-        freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
-      }
-      if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
-        filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
-        /* Store block in directory */
-        if (!blockDir->exist_key(&block, null_yield)) {
-          #if 0
-          int ret = blockDir->set_value(&block);
-          if (ret < 0) {
-            ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
-            return ret;
+      if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
+        if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+          filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
+
+          /* Store block in directory */
+          if (!blockDir->exist_key(&block, *y)) {
+            int ret = blockDir->set(&block, *y);
+            if (ret < 0) {
+              ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+              return ret;
+            } else {
+              ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+            }
           } else {
-            ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+            if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) {
+              ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl;
+              return -1; 
+            } else {
+              ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl;
+            }
           }
-          #endif
         }
       }
     } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
@@ -733,23 +733,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       ofs += bl_len;
       block.blockID = ofs;
       block.size = bl.length();
-      uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
-      while(freeSpace < block.size) {
-        freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
-      }
-      if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
-        filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
-        /* Store block in directory */
-        if (!blockDir->exist_key(&block, null_yield)) {
-          #if 0
-          int ret = blockDir->set_value(&block);
-          if (ret < 0) {
-            ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
-            return ret;
-          } else {
-            ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+      if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { //only block size because attributes are stored for entire obj? -Sam
+        if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+               filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y);
+
+          /* Store block in directory */
+          if (!blockDir->exist_key(&block, *y)) {
+            int ret = blockDir->set(&block, *y); 
+            if (ret < 0) {
+              ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+              return ret;
+            } else {
+              ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+            }
+               } else {
+            if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) {
+              ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl;
+              return -1; 
+            } else {
+              ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl;
+            }
           }
-          #endif
         }
       }
     } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
@@ -763,25 +767,29 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       if (bl_rem.length() == rgw_get_obj_max_req_size) {
         std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
         ofs += bl_rem.length();
-        block.blockID = ofs;
+        block.blockID = ofs; // TODO: fill out block correctly
         block.size = bl_rem.length();
-        uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
-        while(freeSpace < block.size) {
-          freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
-        }
-        if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
-          filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", filter->get_cache_driver(), null_yield);
-          /* Store block in directory */
-          if (!blockDir->exist_key(&block, null_yield)) {
-            #if 0
-            int ret = blockDir->set_value(&block);
-            if (ret < 0) {
+        if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) {
+          if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
+                 filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", *y);
+
+            /* Store block in directory */
+            if (!blockDir->exist_key(&block, *y)) {
+                 int ret = blockDir->set(&block, *y);
+                 if (ret < 0) {
               ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
               return ret;
-            } else {
-              ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
-            }
-            #endif
+              } else {
+                           ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+                   }
+                 } else {
+                   if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) {
+                ldpp_dout(dpp, 0) << "D4N Filter: Block directory update operation failed." << dendl;
+                return -1; 
+              } else {
+                           ldpp_dout(dpp, 20) << "D4N Filter: Block directory update operation succeeded." << dendl;
+                   }
+                 } 
           }
         }
 
index 78428a01ad2baa1e37aaebe0ec8605beee1f9da0..ddf446e48618507fe5a3b0cf718ccee667a79e36 100644 (file)
@@ -24,7 +24,6 @@
 #include "rgw_ssd_driver.h"
 #include "rgw_redis_driver.h"
 
-#include "rgw_redis_driver.h"
 #include "driver/d4n/d4n_directory.h"
 #include "driver/d4n/d4n_policy.h"
 
 #define dout_subsys ceph_subsys_rgw
 #define dout_context g_ceph_context
 
+namespace rgw::d4n {
+  class PolicyDriver;
+}
+
 namespace rgw { namespace sal {
 
 class D4NFilterDriver : public FilterDriver {
index 1c104cb2f64910243f0f1a3e7f7fe1aa21cdc2a1..e6c71db65e06c1b1447210748177b6f52e235cc9 100644 (file)
@@ -3,6 +3,7 @@
 #include <boost/redis/connection.hpp>
 
 #include "gtest/gtest.h"
+#include "gtest/gtest_prod.h"
 #include "common/ceph_argparse.h"
 #include "rgw_auth_registry.h"
 #include "driver/d4n/d4n_policy.h"
@@ -40,7 +41,7 @@ class Environment : public ::testing::Environment {
     DoutPrefixProvider* dpp;
 };
 
-class LFUDAPolicyFixture: public ::testing::Test {
+class LFUDAPolicyFixture : public ::testing::Test {
   protected:
     virtual void SetUp() {
       block = new rgw::d4n::CacheBlock{
@@ -59,7 +60,7 @@ class LFUDAPolicyFixture: public ::testing::Test {
 
       rgw::cache::Partition partition_info{ .location = "RedisCache" };
       cacheDriver = new rgw::cache::RedisDriver{io, partition_info};
-      policyDriver = new rgw::d4n::PolicyDriver(io, "lfuda");
+      policyDriver = new rgw::d4n::PolicyDriver(io, cacheDriver, "lfuda");
       dir = new rgw::d4n::BlockDirectory{io};
       conn = new connection{boost::asio::make_strand(io)};
 
@@ -70,7 +71,6 @@ class LFUDAPolicyFixture: public ::testing::Test {
 
       dir->init(env->cct, env->dpp);
       cacheDriver->initialize(env->cct, env->dpp);
-      policyDriver->init();
       policyDriver->get_cache_policy()->init(env->cct, env->dpp);
 
       bl.append("test data");
@@ -94,6 +94,55 @@ class LFUDAPolicyFixture: public ::testing::Test {
       delete policyDriver;
     }
 
+    std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) {
+      return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size);
+    }
+
+    int lfuda(const DoutPrefixProvider* dpp, rgw::d4n::CacheBlock* block, rgw::cache::CacheDriver* cacheDriver, optional_yield y) {
+      int age = 5; /* Arbitrary number for testing */ 
+      std::string oid = build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size);
+
+      if (this->policyDriver->get_cache_policy()->exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
+       auto entry = dynamic_cast<rgw::d4n::LFUDAPolicy*>(this->policyDriver->get_cache_policy())->find_entry(oid);
+       entry->localWeight += age;
+       return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(entry->localWeight), y);
+      } else {
+       if (this->policyDriver->get_cache_policy()->eviction(dpp, block->size, y) < 0)
+         return -1;
+
+       int exists = dir->exist_key(block, y);
+       if (exists > 0) { /* Remote copy */
+         if (dir->get(block, y) < 0) {
+           return -1;
+         } else {
+           if (!block->hostsList.empty()) { 
+             block->globalWeight += age;
+             
+             if (dir->update_field(block, "globalWeight", std::to_string(block->globalWeight), y) < 0) {
+               return -1;
+             } else {
+               return 0;
+             }
+           } else {
+             return -1;
+           }
+         }
+       } else if (!exists) { /* No remote copy */
+         block->hostsList.push_back(dir->cct->_conf->rgw_local_cache_address);
+         block->cacheObj.hostsList.push_back(dir->cct->_conf->rgw_local_cache_address);
+         if (dir->set(block, y) < 0)
+           return -1;
+
+         this->policyDriver->get_cache_policy()->update(dpp, oid, 0, bl.length(), "", y);
+         if (cacheDriver->put(dpp, oid, bl, bl.length(), attrs, y) < 0)
+            return -1;
+         return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(age), y);
+       } else {
+         return -1;
+       }
+      }
+    }
+
     rgw::d4n::CacheBlock* block;
     rgw::d4n::BlockDirectory* dir;
     rgw::d4n::PolicyDriver* policyDriver;
@@ -111,22 +160,9 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
   spawn::spawn(io, [this] (spawn::yield_context yield) {
     std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
     ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield}));
-    policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
-
-    /* Change cache age for testing purposes */
-    { 
-      boost::system::error_code ec;
-      request req;
-      req.push("HSET", "lfuda", "age", "5");
-      response<int> resp;
-
-      conn->async_exec(req, resp, yield[ec]);
-
-      ASSERT_EQ((bool)ec, false);
-      EXPECT_EQ(std::get<0>(resp).value(), 0);
-    }
+    policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield});
 
-    ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
 
     dir->shutdown();
     cacheDriver->shutdown();
@@ -134,7 +170,7 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
 
     boost::system::error_code ec;
     request req;
-    req.push("HGET", "RedisCache/testName", "localWeight");
+    req.push("HGET", "RedisCache/testBucket_testName_0_0", "localWeight");
     req.push("FLUSHALL");
 
     response<std::string, boost::redis::ignore_t> resp;
@@ -178,7 +214,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield}));
     std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size);
     ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield}));
-    policyDriver->get_cache_policy()->insert(env->dpp, victimKey, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
+    policyDriver->get_cache_policy()->update(env->dpp, victimKey, 0, bl.length(), "", optional_yield{io, yield});
 
     /* Remote block */
     block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */
@@ -189,7 +225,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
 
     ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
 
-    ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
 
     dir->shutdown();
     cacheDriver->shutdown();
@@ -199,19 +235,19 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     boost::system::error_code ec;
     request req;
     req.push("EXISTS", "RedisCache/" + victimKey);
-    req.push("HGET", victimKey, "globalWeight");
+    req.push("EXISTS", victimKey, "globalWeight");
     req.push("HGET", key, "globalWeight");
     req.push("FLUSHALL");
 
-    response<int, std::string, std::string,
-             std::string, boost::redis::ignore_t> resp;
+    response<int, int, std::string, std::string, 
+             boost::redis::ignore_t> resp;
 
     conn->async_exec(req, resp, yield[ec]);
 
     ASSERT_EQ((bool)ec, false);
     EXPECT_EQ(std::get<0>(resp).value(), 0);
-    EXPECT_EQ(std::get<1>(resp).value(), "5");
-    EXPECT_EQ(std::get<2>(resp).value(), "0");
+    EXPECT_EQ(std::get<1>(resp).value(), 0);
+    EXPECT_EQ(std::get<2>(resp).value(), "5");
     conn->cancel();
   });
 
@@ -221,7 +257,7 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
 TEST_F(LFUDAPolicyFixture, BackendGetBlockYield)
 {
   spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_GE(policyDriver->get_cache_policy()->get_block(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
 
     dir->shutdown();
     cacheDriver->shutdown();