]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: completing the read flow for a local read cache,
authorPritha Srivastava <prsrivas@redhat.com>
Tue, 11 Jul 2023 15:32:20 +0000 (21:02 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:50 +0000 (21:24 +0530)
with ssd cache driver as the backend.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_ssd_driver.cc
src/rgw/rgw_ssd_driver.h

index 7676bf93ac667716ccd7db17ad50c75e576fe51d..4a2e0034e62cfc212c286546e647039f7998b7d7 100644 (file)
@@ -13,6 +13,8 @@
  *
  */
 
+#include "rgw_redis_driver.h"
+#include "rgw_ssd_driver.h"
 #include "rgw_sal_d4n.h"
 
 #define dout_subsys ceph_subsys_rgw
@@ -36,6 +38,30 @@ static inline Object* nextObject(Object* t)
   return dynamic_cast<FilterObject*>(t)->get_next();
 }
 
+D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next) 
+{
+  rgw::cache::Partition partition_info;
+  partition_info.location = g_conf()->rgw_d3n_l1_datacache_persistent_path;
+  partition_info.name = "d4n";
+  partition_info.type = "read-cache";
+  partition_info.size = g_conf()->rgw_d3n_l1_datacache_size;
+
+  cacheDriver = new rgw::cache::SSDDriver(partition_info);
+  objDir = new rgw::d4n::ObjectDirectory();
+  blockDir = new rgw::d4n::BlockDirectory();
+  cacheBlock = new rgw::d4n::CacheBlock();
+  policyDriver = new rgw::d4n::PolicyDriver("lfuda");
+}
+
+ D4NFilterDriver::~D4NFilterDriver()
+ {
+    delete cacheDriver;
+    delete objDir; 
+    delete blockDir; 
+    delete cacheBlock;
+    delete policyDriver;
+}
+
 int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
 {
   FilterDriver::initialize(cct, dpp);
@@ -438,6 +464,7 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
     completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
   }
 
+  ldpp_dout(dpp, 20) << "D4NFilterObject::returning from flush:: " << dendl;
   return 0;
 }
 
@@ -462,45 +489,59 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache 
   uint64_t cost = 0, len_to_read = 0, part_len = 0;
 
-  if (y) {
-    aio = rgw::make_throttle(window_size, y);
+  aio = rgw::make_throttle(window_size, y);
 
-    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << 
-      " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << 
+  " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
 
-    this->offset = ofs;
+  this->offset = ofs;
 
-    do {
-      uint64_t id = adjusted_start_ofs;
-      if (start_part_num == (num_parts - 1)) {
-        len_to_read = len;
-        part_len = len;
-        cost = len;
-      } else {
-        len_to_read = obj_max_req_size;
-        cost = obj_max_req_size;
-        part_len = obj_max_req_size;
-      }
-      if (start_part_num == 0) {
-        len_to_read -= diff_ofs;
-        id += diff_ofs;
-      }
+  do {
+    uint64_t id = adjusted_start_ofs;
+    if (start_part_num == (num_parts - 1)) {
+      len_to_read = len;
+      part_len = len;
+      cost = len;
+    } else {
+      len_to_read = obj_max_req_size;
+      cost = obj_max_req_size;
+      part_len = obj_max_req_size;
+    }
+    if (start_part_num == 0) {
+      len_to_read -= diff_ofs;
+      id += diff_ofs;
+    }
+
+    uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
+    ceph::bufferlist bl;
+    std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
+    " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-      uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
-      ceph::bufferlist bl;
-      rgw_raw_obj r_obj;
-      r_obj.oid = oid;
+    if (source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) { 
+      // Read From Cache
+      auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
 
-      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << 
-       oid << " length to read is: " << len_to_read << " part num: " << start_part_num << 
-       " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+
+      auto r = flush(dpp, std::move(completed));
+
+      if (r < 0) {
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+        return r;
+      }
+    } else {
+      oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+      //for ranged requests, for last part, the whole part might exist in the cache
+       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
+      " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-      if (source->driver->get_cache_driver()->get(dpp, oid, ofs, part_len, bl, source->get_attrs()) == 0) { 
+      if ((part_len != obj_max_req_size) && source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) {
         // Read From Cache
-        auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), 
-                          read_ofs, len_to_read, oid), cost, id); 
+        auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
 
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid << dendl;
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
         auto r = flush(dpp, std::move(completed));
 
@@ -508,50 +549,33 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
           return r;
         }
-      } else {
-        //for ranged requests, for last part, the whole part might exist in the cache
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << 
-         oid << " length to read is: " << len_to_read << " part num: " << start_part_num << 
-         " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
-
-        if (source->driver->get_cache_driver()->get(dpp, oid, ofs, obj_max_req_size, bl, source->get_attrs()) == 0) {
-          // Read From Cache
-         auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), 
-                            read_ofs, len_to_read, oid), cost, id); 
 
-          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid << dendl;
+      } else {
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
-          auto r = flush(dpp, std::move(completed));
+        auto r = drain(dpp);
 
-          if (r < 0) {
-            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
-            return r;
-          }
-        } else {
-          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid << dendl;
+        if (r < 0) {
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+          return r;
+        }
 
-          auto r = drain(dpp);
+        break;
+      }
+    }
 
-          if (r < 0) {
-            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
-            return r;
-          }
+    if (start_part_num == (num_parts - 1)) {
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+      return drain(dpp);
+    } else {
+      adjusted_start_ofs += obj_max_req_size;
+    }
 
-          break;
-        }
-      }
+    start_part_num += 1;
+    len -= obj_max_req_size;
+  } while (start_part_num < num_parts);
 
-      if (start_part_num == (num_parts - 1)) {
-        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid << dendl;
-        return drain(dpp);
-      } else {
-        adjusted_start_ofs += obj_max_req_size;
-      }
 
-      start_part_num += 1;
-      len -= obj_max_req_size;
-    } while (start_part_num < num_parts);
-  }
 
   ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
 
@@ -716,11 +740,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     const std::lock_guard l(d3n_get_data.d3n_lock);
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
-      filter->get_cache_driver()->put(save_dpp, this->oid, bl, bl.length(), source->get_attrs()); 
+      std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+      filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
     } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
+      std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
       ofs += bl_len;
-
-      filter->get_cache_driver()->put(save_dpp, this->oid, bl, bl.length(), source->get_attrs());
+      filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
     } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
       uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
       uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
@@ -730,9 +755,10 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       bl_rem.claim_append(bl_copy);
 
       if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) {
+        std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
         ofs += bl_rem.length();
 
-        filter->get_cache_driver()->put(save_dpp, this->oid, bl_rem, bl_rem.length(), source->get_attrs());
+        filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs());
 
         bl_rem.clear();
         bl_rem = std::move(bl);
index 1a66d1a025dd32f4a3acb49f6f60b0036d7eacaf..d51d5ed0e9c8a7025310f3afc69caa0f1249dfe1 100644 (file)
@@ -22,7 +22,6 @@
 #include "common/dout.h" 
 #include "rgw_aio_throttle.h"
 
-#include "rgw_redis_driver.h"
 #include "driver/d4n/d4n_directory.h"
 #include "driver/d4n/d4n_policy.h"
 
@@ -37,24 +36,9 @@ class D4NFilterDriver : public FilterDriver {
     rgw::d4n::PolicyDriver* policyDriver;
 
   public:
-    D4NFilterDriver(Driver* _next) : FilterDriver(_next) 
-    {
-      rgw::cache::Partition partition_info;
-      partition_info.location = "RedisCache"; // figure out how to fill rest of partition information -Sam
-
-      cacheDriver = new rgw::cache::RedisDriver(partition_info); // change later -Sam
-      objDir = new rgw::d4n::ObjectDirectory();
-      blockDir = new rgw::d4n::BlockDirectory();
-      cacheBlock = new rgw::d4n::CacheBlock();
-      policyDriver = new rgw::d4n::PolicyDriver("lfuda");
-    }
-    virtual ~D4NFilterDriver() {
-      delete cacheDriver;
-      delete objDir; 
-      delete blockDir; 
-      delete cacheBlock;
-      delete policyDriver;
-    }
+    D4NFilterDriver(Driver* _next);
+
+    virtual ~D4NFilterDriver();
 
     virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override;
     virtual std::unique_ptr<User> get_user(const rgw_user& u) override;
index 8aa7466e328790bfad33f09bf1e6d3a6e6bc93c6..5cd2ccf0d95420d66b3f72fa2f117333ef65c95f 100644 (file)
@@ -276,6 +276,15 @@ int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& ke
     return remove_entry(dpp, key);
 }
 
+int SSDDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data)
+{
+    std::string location = partition_info.location + key;
+
+    //TODO - Implement append_data
+
+    return 0;
+}
+
 int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location)
 {
     std::string location = cache_location + key;
@@ -525,4 +534,9 @@ void SSDCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_
     cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, SSDDriver::libaio_handler{aio, r}));
 }
 
+void SSDCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
+{
+    //TODO - implement cache_aio_write
+}
+
 } } // namespace rgw::cache
index bc3efedab038db53c1d614b3475b8368388797a7..81a60442869cef71af553202d38cc10c77a7e3b1 100644 (file)
@@ -28,7 +28,7 @@ public:
   virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override;
   virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
   virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
-  virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0;
+  virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data);
   virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override;
   virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
   virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
@@ -65,7 +65,7 @@ public:
   auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
                   off_t read_ofs, off_t read_len, CompletionToken&& token);
 protected:
-  static std::unordered_map<std::string, Partition> partitions;
+  inline static std::unordered_map<std::string, Partition> partitions;
   std::unordered_map<std::string, Entry> entries;
   Partition partition_info;
   uint64_t free_space;