]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/cache: moving LRU related data structures and interfaces to the Policy Manager.
authorPritha Srivastava <prsrivas@redhat.com>
Mon, 28 Aug 2023 11:42:18 +0000 (17:12 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:50 +0000 (21:24 +0530)
Replacing the related calls in d4n filter driver with that of the cachepolicy in the
Policy Manager and adding cache block to the block directory
once the block is written to the cache backend and the
policy manager data structure.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 13744ae42cb9b7e18381498ab19a07dedfe6e951..06a33f9a24fdde1d74694016bb1171ad2ddac6ff 100644 (file)
@@ -24,7 +24,7 @@ struct CacheObj {
 
 struct CacheBlock {
   CacheObj cacheObj;
-  uint64_t blockId; /* RADOS object block ID */
+  uint64_t blockId; /* block ID */
   uint64_t size; /* Block size in bytes */
   int globalWeight = 0;
   std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
index 1986939c6a4937368ee6859f96657f60d92ffc48..5004a096420897d0932972ae700d3447616a72a6 100644 (file)
@@ -400,12 +400,61 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD
   return victim.size;
 }
 
+int LRUPolicy::exist_key(std::string key)
+{
+  const std::lock_guard l(lru_lock);
+  if (entries_map.count(key) != 0) {
+      return true;
+    }
+    return false;
+}
+
+int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode)
+{
+  //Does not apply to LRU
+  return 0;
+}
+
+uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode)
+{
+  const std::lock_guard l(lru_lock);
+  auto p = entries_lru_list.front();
+  entries_map.erase(entries_map.find(p.key));
+  entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
+  cacheNode->delete_data(dpp, p.key, null_yield);
+  return cacheNode->get_free_space(dpp);
+}
+
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode)
+{
+  erase(dpp, key);
+  const std::lock_guard l(lru_lock);
+  Entry *e = new Entry(key, offset, len);
+  entries_lru_list.push_back(*e);
+  entries_map.emplace(key, e);
+}
+
+bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
+{
+  const std::lock_guard l(lru_lock);
+  auto p = entries_map.find(key);
+  if (p == entries_map.end()) {
+    return false;
+  }
+  entries_map.erase(p);
+  entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
+  return true;
+}
+
 int PolicyDriver::init() {
   if (policyName == "lfuda") {
     cachePolicy = new LFUDAPolicy();
     return 0;
-  } else
-    return -1;
+  } else if (policyName == "lru") {
+    cachePolicy = new LRUPolicy();
+    return 0;
+  }
+  return -1;
 }
 
 } } // namespace rgw::d4n
index e7c0d5bb92909f16950d62160e76a53f45369e03..6a514d80808804583228723a9a8b6bc0cf40da60 100644 (file)
@@ -31,6 +31,7 @@ class CachePolicy {
     virtual Address get_addr() { return addr; }
     virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0;
     virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) = 0;
 };
 
 class LFUDAPolicy : public CachePolicy {
@@ -52,6 +53,36 @@ class LFUDAPolicy : public CachePolicy {
     virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); }
     virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
     virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override {}
+};
+
+class LRUPolicy : public CachePolicy {
+  public:
+  struct Entry : public boost::intrusive::list_base_hook<> {
+      std::string key;
+      uint64_t offset;
+      uint64_t len;
+      Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {}
+  };
+  LRUPolicy() = default;
+  private:
+    std::mutex lru_lock;
+    //The disposer object function
+    struct Entry_delete_disposer {
+      void operator()(Entry *e) {
+        delete e;
+      }
+    };
+    typedef boost::intrusive::list<Entry> List;
+    List entries_lru_list;
+    std::unordered_map<std::string, Entry*> entries_map;
+  public:
+    virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return 0; };
+    virtual int exist_key(std::string key) override;
+    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
+    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override;
+    bool erase(const DoutPrefixProvider* dpp, const std::string& key);
 };
 
 class PolicyDriver {
index 960e61d1688d8d5ab9c2b0fe52b0aef1754e885b..a9404826d69aab7728a8adb38596d2103a96eeb3 100644 (file)
@@ -51,8 +51,7 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next)
   blockDir = new rgw::d4n::BlockDirectory();
   cacheBlock = new rgw::d4n::CacheBlock();
   policyDriver = new rgw::d4n::PolicyDriver("lfuda");
-  cache = rgw::sal::LRUCache(cacheDriver);
-
+  lruPolicyDriver = new rgw::d4n::PolicyDriver("lru");
 }
 
  D4NFilterDriver::~D4NFilterDriver()
@@ -62,6 +61,7 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next)
     delete blockDir; 
     delete cacheBlock;
     delete policyDriver;
+    delete lruPolicyDriver;
 }
 
 int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
@@ -75,6 +75,9 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
 
   policyDriver->init();
   policyDriver->cachePolicy->init(cct);
+
+  lruPolicyDriver->init();
+  lruPolicyDriver->cachePolicy->init(cct);
   
   return 0;
 }
@@ -410,9 +413,9 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
     int set_attrsReturn = source->set_attrs(attrs);
 
     if (set_attrsReturn < 0) {
-      ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation failed." << dendl;
+      ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation failed." << dendl;
     } else {
-      ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation succeeded." << dendl;
+      ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation succeeded." << dendl;
     }   
   }
   }
@@ -481,6 +484,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   this->client_cb = cb;
   this->cb->set_client_cb(cb); // what's this for? -Sam
 
+  /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
+     One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
+     chunks using the larger chunk, but all corner cases need to be considered like the last chunk which might be smaller than obj_max_req_size
+     and also ranged requests where a smaller chunk is overwritten by a larger chunk size != obj_max_req_size */
+
   uint64_t obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
   uint64_t start_part_num = 0;
   uint64_t part_num = ofs/obj_max_req_size; //part num of ofs wrt start of the object
@@ -499,7 +507,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   this->offset = ofs;
 
   do {
-    uint64_t id = adjusted_start_ofs;
+    uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
     if (start_part_num == (num_parts - 1)) {
       len_to_read = len;
       part_len = len;
@@ -512,24 +520,27 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     if (start_part_num == 0) {
       len_to_read -= diff_ofs;
       id += diff_ofs;
+      read_ofs = diff_ofs;
     }
 
-    uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
     ceph::bufferlist bl;
     std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
 
     ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
     " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-    if (source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) { 
+    if (source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) { 
       // Read From Cache
       auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
 
+      source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, source->driver->get_cache_driver());
+
       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
       auto r = flush(dpp, std::move(completed));
 
       if (r < 0) {
+        drain(dpp);
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
         return r;
       }
@@ -539,15 +550,18 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
       " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-      if ((part_len != obj_max_req_size) && source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) {
+      if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) {
         // Read From Cache
         auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
 
+        source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, source->driver->get_cache_driver());
+
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
         auto r = flush(dpp, std::move(completed));
 
         if (r < 0) {
+          drain(dpp);
           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
           return r;
         }
@@ -737,24 +751,60 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     }
   }
 
-  int ret = 0;
   //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
   if (write_to_cache) {
-    const std::lock_guard l(d3n_get_data.d3n_lock);
+    const std::lock_guard l(d4n_get_data_lock);
+    rgw::d4n::CacheBlock block;
+    rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
+    block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port));
+    block.cacheObj.bucketName = source->get_bucket()->get_name();
+    block.cacheObj.objName = source->get_key().get_oid();
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
       std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
-      ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
-      if (ret == 0) {
-        filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()}); //TODO - move to flush, where operation actually completes
+      block.size = bl.length();
+      block.blockId = ofs;
+      uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp);
+      while(freeSpace < block.size) {
+        freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver());
       }
+      if (filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+        filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+        /* Store block in directory */
+        if (!blockDir->exist_key(oid)) {
+          int ret = blockDir->set_value(&block);
+          if (ret < 0) {
+            ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+            return ret;
+          } else {
+            ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+          }
+        }
+      }
+      filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
     } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
       std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
       ofs += bl_len;
-      ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
-      if (ret == 0) {
-        filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()});
+      block.blockId = ofs;
+      block.size = bl.length();
+      uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp);
+      while(freeSpace < block.size) {
+        freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver());
+      }
+      if (filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+        filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+        /* Store block in directory */
+        if (!blockDir->exist_key(oid)) {
+          int ret = blockDir->set_value(&block);
+          if (ret < 0) {
+            ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+            return ret;
+          } else {
+            ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+          }
+        }
       }
+      filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
     } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
       uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
       uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
@@ -763,13 +813,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       bl.splice(0, len_to_copy, &bl_copy);
       bl_rem.claim_append(bl_copy);
 
-      if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) {
+      if (bl_rem.length() == rgw_get_obj_max_req_size) {
         std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
         ofs += bl_rem.length();
-
-        ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs());
-        if (ret == 0) {
-          filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl_rem.length()});
+        block.blockId = ofs;
+        block.size = bl_rem.length();
+        uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp);
+        while(freeSpace < block.size) {
+          freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver());
+        }
+        if (filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
+          filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl_rem.length(), filter->get_cache_driver());
+          /* Store block in directory */
+          if (!blockDir->exist_key(oid)) {
+            int ret = blockDir->set_value(&block);
+            if (ret < 0) {
+              ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+              return ret;
+            } else {
+              ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+            }
+          }
         }
 
         bl_rem.clear();
index c65e3f385c49554573dfd722479e44b21a7ab35f..2114ce3e578beab49cf5a04bc1b965b7b3726afb 100644 (file)
 #include <boost/intrusive/list.hpp>
 
 namespace rgw { namespace sal {
-// Temporarily added to d4n filter driver - need to figure out the best way to incorporate this as part of Policy Manager
-class LRUCache {
-  public:
-    struct Entry : public boost::intrusive::list_base_hook<> {
-      std::string key;
-      uint64_t offset;
-      uint64_t len;
-      Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {}
-    };
-  private:
-    //The disposer object function
-    struct Entry_delete_disposer {
-      void operator()(Entry *e) {
-        delete e;
-      }
-    };
-    typedef boost::intrusive::list<Entry> List;
-    List entries_lru_list;
-    std::unordered_map<std::string, Entry*> entries_map;
-    rgw::cache::CacheDriver* cacheDriver;
-  
-    void evict() {
-      auto p = entries_lru_list.front();
-      entries_map.erase(entries_map.find(p.key));
-      entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
-    }
-  
-  public:
-    LRUCache() = default;
-    LRUCache(rgw::cache::CacheDriver* cacheDriver) : cacheDriver(cacheDriver) {} //in case we want to access cache backend apis from here
-
-    void insert(const DoutPrefixProvider* dpp, const Entry& entry) {
-      erase(dpp, entry);
-      //TODO - Get free space using cache api and if there isn't enough space then evict
-      Entry *e = new Entry(entry);
-      entries_lru_list.push_back(*e);
-      entries_map.emplace(entry.key, e);
-    }
-
-  bool erase(const DoutPrefixProvider* dpp, const Entry& entry) {
-    auto p = entries_map.find(entry.key);
-    if (p == entries_map.end()) {
-      return false;
-    }
-    entries_map.erase(p);
-    entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
-    return true;
-  }
-
-  bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) {
-    if (entries_map.count(key) != 0) {
-      return true;
-    }
-    return false;
-  }
-};
 
 class D4NFilterDriver : public FilterDriver {
   private:
@@ -92,7 +36,7 @@ class D4NFilterDriver : public FilterDriver {
     rgw::d4n::BlockDirectory* blockDir;
     rgw::d4n::CacheBlock* cacheBlock;
     rgw::d4n::PolicyDriver* policyDriver;
-    rgw::sal::LRUCache cache;
+    rgw::d4n::PolicyDriver* lruPolicyDriver;
 
   public:
     D4NFilterDriver(Driver* _next);
@@ -115,8 +59,7 @@ class D4NFilterDriver : public FilterDriver {
     rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir; }
     rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; }
     rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; }
-    rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; }
-    rgw::sal::LRUCache& get_lru_cache() { return cache; }
+    rgw::d4n::PolicyDriver* get_policy_driver() { return lruPolicyDriver; }
 };
 
 class D4NFilterUser : public FilterUser {
@@ -155,14 +98,14 @@ class D4NFilterObject : public FilterObject {
       public:
        class D4NFilterGetCB: public RGWGetDataCB {
          private:
-           D4NFilterDriver* filter; // don't need -Sam ?
+           D4NFilterDriver* filter;
            std::string oid;
            D4NFilterObject* source;
            RGWGetDataCB* client_cb;
            uint64_t ofs = 0, len = 0;
            bufferlist bl_rem;
            bool last_part{false};
-           D3nGetObjData d3n_get_data; // should make d4n version? -Sam
+           std::mutex d4n_get_data_lock;
            bool write_to_cache{true};
 
          public: