]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
RGW: Add D4N chunking mechanism
authorSamarah Uriarte <samarah.uriarte@ibm.com>
Fri, 23 Jun 2023 18:41:43 +0000 (14:41 -0400)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:50 +0000 (21:24 +0530)
Signed-off-by: Samarah Uriarte <samarah.uriarte@ibm.com>
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_redis_driver.cc
src/rgw/rgw_redis_driver.h

index 594bab15bc1b9c4c82cff75ec8063bbed7e10f55..bd2b9011e86b21227cab00bb30b4dd71d3e6fab3 100644 (file)
@@ -392,10 +392,202 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
   return ret;
 }
 
+void D4NFilterObject::D4NFilterReadOp::cancel() {
+  aio->drain();
+}
+
+int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp) {
+  auto c = aio->wait();
+  while (!c.empty()) {
+    int r = flush(dpp, std::move(c));
+    if (r < 0) {
+      cancel();
+      return r;
+    }
+    c = aio->wait();
+  }
+  return flush(dpp, std::move(c));
+}
+
+int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
+  int r = rgw::check_for_errors(results);
+
+  if (r < 0) {
+    return r;
+  }
+
+  std::list<bufferlist> bl_list;
+
+  auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+  results.sort(cmp); // merge() requires results to be sorted first
+  completed.merge(results, cmp); // merge results in sorted order
+
+  ldpp_dout(dpp, 20) << "D4NFilterObject::In flush:: " << dendl;
+
+  while (!completed.empty() && completed.front().id == offset) {
+    auto bl = std::move(completed.front().data);
+
+    ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl;
+
+    bl_list.push_back(bl);
+    offset += bl.length();
+    int r = client_cb->handle_data(bl, 0, bl.length());
+    if (r < 0) {
+      return r;
+    }
+    completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
+  }
+
+  return 0;
+}
+
 int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
                         RGWGetDataCB* cb, optional_yield y) 
 {
-  /* Execute cache replacement policy */
+  const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
+  std::string oid = source->get_key().get_oid();
+
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl;
+
+  this->client_cb = cb;
+  this->cb->set_client_cb(cb); // what's this for? -Sam
+
+  uint64_t obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
+  uint64_t start_part_num = 0;
+  uint64_t part_num = ofs/obj_max_req_size; //part num of ofs wrt start of the object
+  uint64_t adjusted_start_ofs = part_num*obj_max_req_size; //in case of ranged request, adjust the start offset to the beginning of a chunk/ part
+  uint64_t diff_ofs = ofs - adjusted_start_ofs; //difference between actual offset and adjusted offset
+  off_t len = (end - adjusted_start_ofs) + 1;
+  uint64_t num_parts = (len%obj_max_req_size) == 0 ? len/obj_max_req_size : (len/obj_max_req_size) + 1; //calculate num parts based on adjusted offset
+  //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache 
+  uint64_t cost = 0, len_to_read = 0, part_len = 0;
+
+  if (y) {
+    aio = rgw::make_throttle(window_size, y);
+
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << 
+      " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
+
+    this->offset = ofs;
+
+    do {
+      uint64_t id = adjusted_start_ofs;
+      if (start_part_num == (num_parts - 1)) {
+        len_to_read = len;
+        part_len = len;
+        cost = len;
+      } else {
+        len_to_read = obj_max_req_size;
+        cost = obj_max_req_size;
+        part_len = obj_max_req_size;
+      }
+      if (start_part_num == 0) {
+        len_to_read -= diff_ofs;
+        id += diff_ofs;
+      }
+
+      uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
+      std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+      rgw_raw_obj r_obj;
+      r_obj.oid = oid_in_cache;
+      ceph::bufferlist bl;
+
+      ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << 
+       oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
+       " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+
+      if (source->driver->get_cache_driver()->get(dpp, oid_in_cache, ofs, part_len, bl, source->get_attrs()) == 0) { 
+        // Read From Cache
+        auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), 
+                          read_ofs, len_to_read, oid_in_cache), cost, id); 
+
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+
+        auto r = flush(dpp, std::move(completed));
+
+        if (r < 0) {
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+          return r;
+        }
+      } else {
+        //for ranged requests, for last part, the whole part might exist in the cache
+        oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+        r_obj.oid = oid_in_cache;
+
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << 
+         oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
+         " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+
+        if (source->driver->get_cache_driver()->get(dpp, oid_in_cache, ofs, obj_max_req_size, bl, source->get_attrs()) == 0) {
+          // Read From Cache
+         auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), 
+                            read_ofs, len_to_read, oid_in_cache), cost, id); 
+
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+
+          auto r = flush(dpp, std::move(completed));
+
+          if (r < 0) {
+            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+            return r;
+          }
+        } else {
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+          auto r = drain(dpp);
+
+          if (r < 0) {
+            ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+            return r;
+          }
+
+          break;
+        }
+      }
+
+      if (start_part_num == (num_parts - 1)) {
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+        return drain(dpp);
+      } else {
+        adjusted_start_ofs += obj_max_req_size;
+      }
+
+      start_part_num += 1;
+      len -= obj_max_req_size;
+    } while (start_part_num < num_parts);
+  }
+
+  ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
+
+  Attrs obj_attrs;
+  if (source->has_attrs()) {
+    obj_attrs = source->get_attrs();
+  }
+
+  if (source->is_compressed() || obj_attrs.find(RGW_ATTR_CRYPT_MODE) != obj_attrs.end() || !y) {
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Skipping writing to cache" << dendl;
+    this->cb->bypass_cache_write();
+  }
+
+  if (start_part_num == 0) {
+    this->cb->set_ofs(ofs);
+  } else {
+    this->cb->set_ofs(adjusted_start_ofs);
+    ofs = adjusted_start_ofs; // redundant? -Sam
+  }
+
+  this->cb->set_ofs(ofs);
+  auto r = next->iterate(dpp, ofs, end, this->cb.get(), y);
+  
+  if (r < 0) {
+    ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl;
+    return r;
+  }
+  
+  return this->cb->flush_last_part(dpp);
+
+  /*
+  / Execute cache replacement policy /
   int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(), 
                    source->driver->get_cache_driver());
   
@@ -410,12 +602,12 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   uint64_t len = end - ofs + 1;
   std::string oid(source->get_name());
   
-  /* Local cache check */
+  / Local cache check /
   if (source->driver->get_cache_driver()->key_exists(dpp, oid)) { // Entire object for now -Sam
     ret = source->driver->get_cache_driver()->get(dpp, source->get_key().get_oid(), ofs, len, bl, source->get_attrs());
     cb->handle_data(bl, ofs, len);
   } else {
-    /* Block directory check */
+    / Block directory check /
     int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block()); 
 
     if (getDirReturn >= -1) {
@@ -427,11 +619,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
       // remote cache get
 
-      /* Cache block locally */
+      / Cache block locally /
       ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
 
       if (!ret) {
-       int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+       int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/local cache ip from config/);
 
        if (updateValueReturn < 0) {
          ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
@@ -442,7 +634,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
        cb->handle_data(bl, ofs, len);
       }
     } else {
-      /* Write tier retrieval */
+      / Write tier retrieval /
       ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
       getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj));
 
@@ -455,11 +647,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
        
        // retrieve from write back cache, which will be stored as a cache driver instance in the filter
 
-       /* Cache block locally */
+       / Cache block locally /
        ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
 
        if (!ret) {
-         int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+         int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/local cache ip from config/);
 
          if (updateValueReturn < 0) {
            ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
@@ -470,15 +662,15 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
          cb->handle_data(bl, ofs, len);
        }
       } else {
-       /* Backend store retrieval */
+       / Backend store retrieval /
        ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl;
        ret = next->iterate(dpp, ofs, end, cb, y);
 
        if (!ret) {
-         /* Cache block locally */
+         / Cache block locally /
          ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
 
-         /* Store block in directory */
+         / Store block in directory /
          rgw::d4n::BlockDirectory* tempBlockDir = source->driver->get_block_dir(); // remove later -Sam
 
          source->driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); // local cache address -Sam 
@@ -501,7 +693,63 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   if (ret < 0) 
     ldpp_dout(dpp, 20) << "D4N Filter: Cache iterate operation failed." << dendl;
 
-  return next->iterate(dpp, ofs, end, cb, y); 
+  return next->iterate(dpp, ofs, end, cb, y); */
+}
+
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(const DoutPrefixProvider* dpp)
+{
+  save_dpp = dpp;
+  last_part = true;
+  return handle_data(bl_rem, 0, bl_rem.length());
+}
+
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+  auto rgw_get_obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
+
+  if (!last_part && bl.length() <= rgw_get_obj_max_req_size) {
+    auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
+
+    if (r < 0) {
+      return r;
+    }
+  }
+
+  //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
+  if (write_to_cache) {
+    const std::lock_guard l(d3n_get_data.d3n_lock);
+    Attrs attrs;
+
+    if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
+      std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+
+      filter->get_cache_driver()->put(save_dpp, oid, bl, bl.length(), attrs); // need attrs for just chunk? -Sam
+    } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
+      std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+      ofs += bl_len;
+
+      filter->get_cache_driver()->put(save_dpp, oid, bl, bl.length(), attrs); // need attrs for just chunk? -Sam
+    } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
+      uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
+      uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
+      bufferlist bl_copy;
+
+      bl.splice(0, len_to_copy, &bl_copy);
+      bl_rem.claim_append(bl_copy);
+
+      if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) {
+        std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+        ofs += bl_rem.length();
+
+        filter->get_cache_driver()->put(save_dpp, oid, bl_rem, bl_rem.length(), attrs); // need attrs for just chunk? -Sam
+
+        bl_rem.clear();
+        bl_rem = std::move(bl);
+      }
+    }
+  }
+
+  return 0;
 }
 
 int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
index 1345b8aa26541bd5abd24e9e955becdb0e75ada8..6150223d1e893723a7141b0b414f63d2c1ac06ad 100644 (file)
@@ -20,6 +20,7 @@
 #include "rgw_oidc_provider.h"
 #include "rgw_role.h"
 #include "common/dout.h" 
+#include "rgw_aio_throttle.h"
 
 #include "rgw_redis_driver.h"
 #include "driver/d4n/d4n_directory.h"
@@ -104,15 +105,55 @@ class D4NFilterObject : public FilterObject {
 
   public:
     struct D4NFilterReadOp : FilterReadOp {
-      D4NFilterObject* source;
-
-      D4NFilterReadOp(std::unique_ptr<ReadOp> _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)),
-                                                                                source(_source) {}
-      virtual ~D4NFilterReadOp() = default;
-
-      virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
-      virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
-        RGWGetDataCB* cb, optional_yield y) override;
+      public:
+       class D4NFilterGetCB: public RGWGetDataCB {
+         private:
+           D4NFilterDriver* filter; // don't need -Sam ?
+           std::string oid;
+           RGWGetDataCB* client_cb;
+           uint64_t ofs = 0, len = 0;
+           bufferlist bl_rem;
+           bool last_part{false};
+           D3nGetObjData d3n_get_data; // should make d4n version? -Sam
+           bool write_to_cache{true};
+
+         public:
+           D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid) : filter(_filter), 
+                                                                         oid(_oid) {}
+
+           const DoutPrefixProvider* save_dpp;
+
+           int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
+           void set_client_cb(RGWGetDataCB* client_cb) { this->client_cb = client_cb;}
+           void set_ofs(uint64_t ofs) { this->ofs = ofs; }
+           int flush_last_part(const DoutPrefixProvider* dpp);
+           void bypass_cache_write() { this->write_to_cache = false; }
+       };
+
+       D4NFilterObject* source;
+
+       D4NFilterReadOp(std::unique_ptr<ReadOp> _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)),
+                                                                                  source(_source) 
+        {
+         std::string oid = source->get_bucket()->get_marker() + "_" + source->get_key().get_oid();
+          cb = std::make_unique<D4NFilterGetCB>(source->driver, oid); 
+       }
+       virtual ~D4NFilterReadOp() = default;
+
+       virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
+       virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
+         RGWGetDataCB* cb, optional_yield y) override;
+
+      private:
+       RGWGetDataCB* client_cb;
+       std::unique_ptr<D4NFilterGetCB> cb;
+        std::unique_ptr<rgw::Aio> aio;
+       uint64_t offset = 0; // next offset to write to client
+        rgw::AioResultList completed; // completed read results, sorted by offset
+
+       int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results);
+       void cancel();
+       int drain(const DoutPrefixProvider* dpp);
     };
 
     struct D4NFilterDeleteOp : FilterDeleteOp {
@@ -191,7 +232,7 @@ class D4NFilterWriter : public FilterWriter {
                        const req_context& rctx,
                        uint32_t flags) override;
    bool is_atomic() { return atomic; };
-   const DoutPrefixProvider* dpp() { return save_dpp; }
+   const DoutPrefixProvider* dpp() { return save_dpp; } 
 };
 
 } } // namespace rgw::sal
index 72c839807751e7c116d7bab02bdcda5f030ab1eb..87c488dd4ea9438bcf8248d3af7a9047ae3969d6 100644 (file)
@@ -1,5 +1,6 @@
 #include <boost/algorithm/string.hpp>
 #include "rgw_redis_driver.h"
+//#include "rgw_ssd_driver.h"
 
 #define dout_subsys ceph_subsys_rgw
 #define dout_context g_ceph_context
@@ -21,7 +22,8 @@ std::vector<std::string> baseFields {
   "max_buckets",
   "data"};
 
-std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary) {
+std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary) 
+{
   std::vector< std::pair<std::string, std::string> > values;
   rgw::sal::Attrs::iterator attrs;
 
@@ -35,16 +37,37 @@ std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs*
   return values;
 }
 
-int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len) {
+int RedisDriver::find_client(const DoutPrefixProvider* dpp) 
+{
+  if (client.is_connected())
+    return 0;
+
+  if (addr.host == "" || addr.port == 0) { 
+    dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
+    return EDESTADDRREQ;
+  }
+
+  client.connect(addr.host, addr.port, nullptr);
+
+  if (!client.is_connected())
+    return ECONNREFUSED;
+
+  return 0;
+}
+
+int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len) 
+{
   auto ret = entries.emplace(key, Entry(key, offset, len));
   return ret.second;
 }
 
-int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) {
+int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) 
+{
   return entries.erase(key);
 }
 
-std::optional<Entry> RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) {
+std::optional<Entry> RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) 
+{
   auto iter = entries.find(key);
 
   if (iter != entries.end()) {
@@ -54,7 +77,23 @@ std::optional<Entry> RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::
   return std::nullopt;
 }
 
-int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) {
+/* Currently an attribute but will also be part of the Entry metadata once consistency is guaranteed -Sam
+int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight) 
+{
+  auto iter = entries.find(key);
+
+  if (iter != entries.end()) {
+    iter->second.localWeight = localWeight;
+    return 0;
+  }
+
+  return -1;
+}
+*/
+
+int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) 
+{
+  this->cct = cct;
   addr.host = cct->_conf->rgw_d4n_host; // change later -Sam
   addr.port = cct->_conf->rgw_d4n_port;
 
@@ -71,166 +110,8 @@ int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) {
   return 0;
 }
 
-bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) {
-  int result = -1;
-  std::string entryName = "rgw-object:" + key + ":cache";
-  std::vector<std::string> keys;
-  keys.push_back(entryName);
-
-  if (!client.is_connected()) 
-    return ECONNREFUSED;
-
-  try {
-    client.exists(keys, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {}
-
-  return result;
-}
-
-std::vector<Entry> RedisDriver::list_entries(const DoutPrefixProvider* dpp) {
-  std::vector<std::string> keys;
-  std::vector<Entry> entries;
-
-  if (!client.is_connected()) 
-    return {};
-
-  try {
-    size_t cursor = 0;
-    const std::string pattern = "*:cache";
-
-    do {
-      auto reply = client.scan(cursor, pattern); 
-      client.sync_commit(std::chrono::milliseconds(1000));
-
-      auto arr = reply.get().as_array();
-      cursor = std::stoi(arr[0].as_string());
-      auto result = arr[1].as_array();
-  
-      for (auto it = result.begin(); it != result.end(); ++it) {
-        int i = std::distance(result.begin(), it);
-       std::string entryName = result[i].as_string();
-       keys.push_back(entryName.substr(11, entryName.length() - 17));
-      }
-    } while (cursor != 0);
-  } catch(std::exception &e) {
-    return {};
-  }
-
-  /* Construct list of entries */
-  for (auto it = keys.begin(); it != keys.end(); ++it) {
-    Entry entry;
-
-    if (key_exists(dpp, *it)) {
-      try {
-       std::vector<std::string> fields;
-        std::string entryName = "rgw-object:" + *it + ":cache";
-
-       entry.key = *it;
-       fields.push_back("offset");
-       fields.push_back("len");
-       fields.push_back("localWeight");
-
-       client.hmget(entryName, fields, [&entry](cpp_redis::reply &reply) {
-         if (reply.is_array()) {
-           auto arr = reply.as_array();
-      
-           if (!arr[0].is_null()) {
-             entry.offset = std::stol(arr[0].as_string().c_str());
-             entry.len = std::stoi(arr[1].as_string());
-             entry.localWeight = std::stoi(arr[2].as_string());
-           }
-         }
-       });
-
-       client.sync_commit(std::chrono::milliseconds(1000));
-      } catch(std::exception &e) {
-       return {}; // return failure or skip entry? -Sam
-      }
-    } else { // if one entry isn't found, shoud entire operation return a failure? -Sam
-      dout(20) << "RGW Redis Cache: Entry " << *it << " was not retrievable." << dendl;
-    }
-
-    entries.push_back(entry);
-  } 
-
-  return entries;
-}
-
-size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) {
-  int result = -1;
-
-  if (!client.is_connected()) 
-    return ECONNREFUSED;
-
-  try {
-    client.keys(":cache", [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_integer();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-
-    if (result < 0) {
-      return -1;
-    }
-  } catch(std::exception &e) {
-    return -1;
-  }
-
-  return result;
-}
-
-Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) {
-  Partition part;
-  return part; // Implement -Sam
-}
-
-uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) {
-  int result = -1;
-
-  if (!client.is_connected()) 
-    return ECONNREFUSED;
-
-  try {
-    client.info([&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        int usedMem = -1;
-       int maxMem = -1;
-
-        std::istringstream iss(reply.as_string());
-       std::string line;    
-        while (std::getline(iss, line)) {
-         size_t pos = line.find_first_of(":");
-         if (pos != std::string::npos) {
-           if (line.substr(0, pos) == "used_memory") {
-             usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
-           } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") {
-             maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
-           } 
-         }
-        }
-
-       if (usedMem > -1 && maxMem > -1)
-         result = maxMem - usedMem;
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {
-    return -1;
-  }
-
-  return result;
-}
-
-int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) {
+int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) 
+{
   std::string entryName = "rgw-object:" + key + ":cache";
 
   if (!client.is_connected()) 
@@ -249,7 +130,7 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff
 
     client.sync_commit(std::chrono::milliseconds(1000));
 
-    if (result != 0) {
+    if (result <= 0) {
       return -1;
     }
   } catch(std::exception &e) {
@@ -279,7 +160,8 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff
   return 0;
 }
 
-int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) { // for whole objects? -Sam
+int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) 
+{
   std::string result;
   std::string entryName = "rgw-object:" + key + ":cache";
   
@@ -324,7 +206,13 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_
   return 0;
 }
 
-int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) {
+rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) 
+{
+  return {};
+}
+
+int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) 
+{
   std::string result;
   std::string value = "";
   std::string entryName = "rgw-object:" + key + ":cache";
@@ -370,7 +258,8 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string&
   return 0;
 }
 
-int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) {
+int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) 
+{
   int result = 0;
   std::string entryName = "rgw-object:" + key + ":cache";
   std::vector<std::string> deleteField;
@@ -398,8 +287,8 @@ int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string&
   return result - 1;
 }
 
-int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
-  int exists = -2;
+int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) 
+{
   std::string result;
   std::string entryName = "rgw-object:" + key + ":cache";
 
@@ -441,7 +330,7 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key
     }
 
     getFields.erase(std::find(getFields.begin(), getFields.end(), "data")); /* Do not query for data field */
-    
+    int exists = -1;
     /* Get attributes from cache */
     try {
       client.hmget(entryName, getFields, [&exists, &attrs, &getFields](cpp_redis::reply &reply) {
@@ -475,7 +364,8 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key
   return 0;
 }
 
-int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
+int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) 
+{
   /* Creating the index based on oid */
   std::string entryName = "rgw-object:" + key + ":cache";
   std::string result;
@@ -509,7 +399,8 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key
   return 0;
 }
 
-int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
+int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) 
+{
   std::string result;
   std::string entryName = "rgw-object:" + key + ":cache";
 
@@ -544,7 +435,8 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string&
   return 0;
 }
 
-int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) {
+int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) 
+{
   int result = 0;
   std::string entryName = "rgw-object:" + key + ":cache";
 
@@ -605,7 +497,8 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string&
   return -2;
 }
 
-std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) {
+std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) 
+{
   int exists = -2;
   std::string result;
   std::string entryName = "rgw-object:" + key + ":cache";
@@ -658,7 +551,8 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri
   return attrValue;
 }
 
-int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) {
+int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) 
+{
   /* Creating the index based on key */
   std::string entryName = "rgw-object:" + key + ":cache";
   int result = -1;
@@ -682,19 +576,185 @@ int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key,
   return result;
 }
 
-std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp)
+std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) 
+{
+  return std::make_unique<RedisCacheAioRequest>(this);  
+}
+
+bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) 
+{
+  int result = -1;
+  std::string entryName = "rgw-object:" + key + ":cache";
+  std::vector<std::string> keys;
+  keys.push_back(entryName);
+
+  if (!client.is_connected()) 
+    find_client(dpp);
+
+  try {
+    client.exists(keys, [&result](cpp_redis::reply &reply) {
+      if (reply.is_integer()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {}
+
+  return result;
+}
+
+std::vector<Entry> RedisDriver::list_entries(const DoutPrefixProvider* dpp) 
+{
+  std::vector<Entry> result;
+
+  for (auto it = entries.begin(); it != entries.end(); ++it) { 
+    result.push_back(it->second);
+  }
+
+  return result;
+}
+
+size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) 
+{
+  return entries.size();
+}
+
+Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) 
+{
+  Partition part;
+  return part; // Implement -Sam
+}
+
+uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) 
+{
+  int result = -1;
+
+  if (!client.is_connected()) 
+    find_client(dpp);
+
+  try {
+    client.info([&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        int usedMem = -1;
+       int maxMem = -1;
+
+        std::istringstream iss(reply.as_string());
+       std::string line;    
+        while (std::getline(iss, line)) {
+         size_t pos = line.find_first_of(":");
+         if (pos != std::string::npos) {
+           if (line.substr(0, pos) == "used_memory") {
+             usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
+           } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") {
+             maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
+           } 
+         }
+        }
+
+       if (usedMem > -1 && maxMem > -1)
+         result = maxMem - usedMem;
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return result;
+}
+
+int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
 {
-    return std::make_unique<RedisCacheAioRequest>(this);
+    ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl;
+    aio_cb.reset(new struct aiocb);
+    memset(aio_cb.get(), 0, sizeof(struct aiocb));
+    aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY));
+
+    if (aio_cb->aio_fildes < 0) {
+        int err = errno;
+        ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl;
+        return -err;
+    }
+
+    if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) {
+        posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
+    }
+
+    bufferptr bp(read_len);
+    aio_cb->aio_buf = bp.c_str();
+    result.append(std::move(bp));
+
+    aio_cb->aio_nbytes = read_len;
+    aio_cb->aio_offset = read_ofs;
+    aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+    aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch;
+    aio_cb->aio_sigevent.sigev_notify_attributes = nullptr;
+    aio_cb->aio_sigevent.sigev_value.sival_ptr = arg;
+
+    return 0;
 }
 
-rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
+void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval)
 {
-    rgw_raw_obj r_obj;
-    r_obj.oid = key;
-    return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
+    auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
+    auto op = std::move(p->user_data);
+    const int ret = -aio_error(op.aio_cb.get());
+    boost::system::error_code ec;
+    if (ret < 0) {
+        ec.assign(-ret, boost::system::system_category());
+    }
+
+    ceph::async::dispatch(std::move(p), ec, std::move(op.result));
+}
+
+template <typename Executor1, typename CompletionHandler>
+auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler)
+{
+    auto p = Completion::create(ex1, std::move(handler));
+    return p;
 }
 
-void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
-void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
+template <typename ExecutionContext, typename CompletionToken>
+auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+                off_t read_ofs, off_t read_len, CompletionToken&& token)
+{
+    std::string location = "";//partition_info.location + key;
+    ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl;
+
+    using Op = AsyncReadOp;
+    using Signature = typename Op::Signature;
+    boost::asio::async_completion<CompletionToken, Signature> init(token);
+    auto p = Op::create(ctx.get_executor(), init.completion_handler);
+    auto& op = p->user_data;
+
+    int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
+    if(0 == ret) {
+        ret = ::aio_read(op.aio_cb.get());
+    }
+  //  ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
+   /* if(ret < 0) {
+        auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+        ceph::async::post(std::move(p), ec, bufferlist{});
+    } else {
+        (void)p.release();
+    }*/
+    //return init.result.get();
+}
+
+void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
+{
+  using namespace boost::asio;
+  async_completion<yield_context, void()> init(y.get_yield_context());
+  auto ex = get_associated_executor(init.completion_handler);
+
+  ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): key=" << key << dendl;
+  cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, RedisDriver::libaio_handler{aio, r}));
+}
+
+void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
+{
+}
 
-} } // namespace rgw::cal
+} } // namespace rgw::cache
index af7c34a299ddc2ebc844b987934013111d484c11..1f8ea953b94b572e408fce1215a117c5e87e2b2d 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef CEPH_REDISDRIVER_H
 #define CEPH_REDISDRIVER_H
 
+#include <aio.h>
+#include "common/async/completion.h"
 #include <string>
 #include <iostream>
 #include <cpp_redis/cpp_redis>
@@ -13,26 +15,16 @@ namespace rgw { namespace cache {
 class RedisDriver;
 
 class RedisCacheAioRequest: public CacheAioRequest {
-public:
-  RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {}
-  virtual ~RedisCacheAioRequest() = default;
-  virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
-  virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
-private:
-  RedisDriver* cache_driver;
+  public:
+    RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {}
+    virtual ~RedisCacheAioRequest() = default;
+    virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+    virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+  private:
+    RedisDriver* cache_driver;
 };
 
 class RedisDriver : public CacheDriver {
-  private:
-    cpp_redis::client client;
-    rgw::d4n::Address addr;
-    std::unordered_map<std::string, Entry> entries;
-
-    int find_client(const DoutPrefixProvider* dpp);
-    int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len);
-    int remove_entry(const DoutPrefixProvider* dpp, std::string key);
-    std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
-
   public:
     RedisDriver() : CacheDriver() {}
 
@@ -60,6 +52,58 @@ class RedisDriver : public CacheDriver {
     virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override;
 
     virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
+  
+    struct libaio_handler { // should this be the same as SSDDriver? -Sam
+      rgw::Aio* throttle = nullptr;
+      rgw::AioResult& r;
+
+      // read callback
+      void operator()(boost::system::error_code ec, bufferlist bl) const {
+       r.result = -ec.value();
+       r.data = std::move(bl);
+       throttle->put(r);
+      }
+    };
+    template <typename ExecutionContext, typename CompletionToken>
+    auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+                  off_t read_ofs, off_t read_len, CompletionToken&& token);
+
+  private:
+    cpp_redis::client client;
+    rgw::d4n::Address addr;
+    std::unordered_map<std::string, Entry> entries;
+    CephContext* cct;
+
+    int find_client(const DoutPrefixProvider* dpp);
+    int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len);
+    int remove_entry(const DoutPrefixProvider* dpp, std::string key);
+    std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
+
+    // unique_ptr with custom deleter for struct aiocb
+    struct libaio_aiocb_deleter {
+      void operator()(struct aiocb* c) {
+        if(c->aio_fildes > 0) {
+          if( ::close(c->aio_fildes) != 0) {
+          }
+        }
+        delete c;
+      }
+    };
+
+    using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
+
+    struct AsyncReadOp {
+      bufferlist result;
+      unique_aio_cb_ptr aio_cb;
+      using Signature = void(boost::system::error_code, bufferlist);
+      using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
+
+      int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
+      static void libaio_cb_aio_dispatch(sigval sigval);
+
+      template <typename Executor1, typename CompletionHandler>
+      static auto create(const Executor1& ex1, CompletionHandler&& handler);
+    };
 };
 
 } } // namespace rgw::cal