]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: this commit squashes the following commits related to
authorSamarah <samarah.uriarte@ibm.com>
Mon, 25 Sep 2023 14:49:43 +0000 (14:49 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:51 +0000 (21:24 +0530)
clean up and removal of cpp_redis.

d4n/filter: Add `optional_yield` to CacheDriver calls in D4N Filter and
make minor updates to several filter methods
rgw/d4n: fix compilation issue.
rgw/cache: Add `del` method to CacheDriver and SSDDriver
cmake/d4n: Remove unnecessary D4N lines
rgw: Add `io_context` to D4N Filter and RedisDriver, remove `cpp_redis`
library from RedisDriver, and perform minor cleanup
d4n: Remove `cpp_redis` library from D4N directory and policy; update calls in filter; move Entry struct to base CachePolicy class
build/cpp_redis: Remove `cpp_redis` library
rgw/d4n: including <boost/asio/detached.hpp> wheerever needed.
rgw/d4n : fixes to d4n filter, policy, directory and redis driver files
for compilation errors.

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
Signed-off-by: Casey Bodley <cbodley@redhat.com>
Co-authored-by: Pritha Srivastava <prsrivas@redhat.com>
15 files changed:
.gitmodules
src/CMakeLists.txt
src/cpp_redis [deleted submodule]
src/rgw/CMakeLists.txt
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_cache_driver.h
src/rgw/rgw_redis_driver.cc
src/rgw/rgw_redis_driver.h
src/rgw/rgw_sal.cc
src/rgw/rgw_ssd_driver.h

index 341a8246ece9efbcd6cf49764e59a39c10e56d58..b89bc7e7b28d18e1b9de0ce29f7828f229827355 100644 (file)
@@ -59,9 +59,6 @@
 [submodule "s3select"]
        path = src/s3select
        url = https://github.com/ceph/s3select.git
-[submodule "src/cpp_redis"]
-       path = src/cpp_redis
-       url = https://github.com/ceph/cpp_redis.git
 [submodule "src/libkmip"]
        path = src/libkmip
        url = https://github.com/ceph/libkmip
index da7c5ea9019d07acc1eec47e9cd93eaf697babc2..73fa27ae34e9e7fafcb948ccf1c84e618da57916 100644 (file)
@@ -293,10 +293,6 @@ if(WITH_CEPHFS_JAVA)
   add_subdirectory(java)
 endif()
 
-if(WITH_RADOSGW_D4N)
-  add_subdirectory(cpp_redis) # remove later -Sam
-endif()
-
 if (WITH_BLKIN)
   add_subdirectory(blkin/blkin-lib)
 endif(WITH_BLKIN)
diff --git a/src/cpp_redis b/src/cpp_redis
deleted file mode 160000 (submodule)
index 72d992f..0000000
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 72d992fff2a95edb37430a75909a844637549331
index 8ad74d1a1cabb8d4393abdb5df6ad77eb3d54c52..afd114f191d1d3f10f0dd215698bb7c46cb630e7 100644 (file)
@@ -202,7 +202,6 @@ set(librgw_common_srcs
   driver/rados/topic.cc)
 
 list(APPEND librgw_common_srcs
-  driver/d4n/d4n_directory.cc
   driver/immutable_config/store.cc
   driver/json_config/store.cc
   driver/rados/config/impl.cc
@@ -229,10 +228,6 @@ endif()
 if(WITH_RADOSGW_DAOS)
   list(APPEND librgw_common_srcs driver/motr/rgw_sal_daos.cc)
 endif()
-if(WITH_RADOSGW_D4N)
-  list(APPEND librgw_common_srcs driver/d4n/d4n_directory.cc)
-  list(APPEND librgw_common_srcs driver/d4n/rgw_sal_d4n.cc)
-endif()
 if(WITH_RADOSGW_POSIX)
   #add_subdirectory(driver/posix)
   find_package(LMDB REQUIRED)
@@ -297,11 +292,6 @@ target_include_directories(rgw_common
   PUBLIC "${LUA_INCLUDE_DIR}")
 
 if(WITH_RADOSGW_D4N)
-  add_dependencies(rgw_common cpp_redis) # remove later -Sam
-  target_link_libraries(rgw_common PRIVATE cpp_redis)
-  target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/cpp_redis/includes")
-  target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/cpp_redis/tacopie/includes")
-
   target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/boost_redis/include")
 endif()
 
index 9bd812a9245a3d291de4eaba88dc3c53cfac5522..edeb69ff3f7e60d98f028f52ffe904aac644113b 100644 (file)
+#include <boost/asio/consign.hpp>
+#include "common/async/blocked_completion.h"
 #include "d4n_directory.h"
-#include <time.h>
-
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
 
 namespace rgw { namespace d4n {
 
-int ObjectDirectory::find_client(cpp_redis::client* client) {
-  if (client->is_connected())
-    return 0;
-
-   if (addr.host == "" || addr.port == 0) {
-    dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
-    return EDESTADDRREQ;
-  }
+// initiate a call to async_exec() on the connection's executor
+struct initiate_exec {
+  std::shared_ptr<boost::redis::connection> conn;
+  boost::redis::request req;
 
-  client->connect(addr.host, addr.port, nullptr);
+  using executor_type = boost::redis::connection::executor_type;
+  executor_type get_executor() const noexcept { return conn->get_executor(); }
 
-  if (!client->is_connected())
-    return ECONNREFUSED;
+  template <typename Handler, typename Response>
+  void operator()(Handler handler, Response& resp)
+  {
+    conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn));
+  }
+};
+
+template <typename Response, typename CompletionToken>
+auto async_exec(std::shared_ptr<connection> conn,
+                const boost::redis::request& req,
+                Response& resp, CompletionToken&& token)
+{
+  return boost::asio::async_initiate<CompletionToken,
+         void(boost::system::error_code, std::size_t)>(
+      initiate_exec{std::move(conn), req}, token, resp);
+}
 
-  return 0;
+template <typename T>
+void redis_exec(std::shared_ptr<connection> conn,
+                boost::system::error_code& ec,
+                const boost::redis::request& req,
+                boost::redis::response<T>& resp, optional_yield y)
+{
+  if (y) {
+    auto yield = y.get_yield_context();
+    async_exec(std::move(conn), req, resp, yield[ec]);
+  } else {
+    async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
+  }
 }
 
 std::string ObjectDirectory::build_index(CacheObj* object) {
   return object->bucketName + "_" + object->objName;
 }
 
-int ObjectDirectory::exist_key(std::string key) {
+int ObjectDirectory::exist_key(std::string key, optional_yield y) {
   int result = 0;
   std::vector<std::string> keys;
   keys.push_back(key);
-  
+#if 0
   if (!client.is_connected()) {
     return result;
   }
-
+#endif
+  response<int> resp;
   try {
-    client.exists(keys, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer();
-      }
-    });
-    
-    client.sync_commit(std::chrono::milliseconds(1000));
+    boost::system::error_code ec;
+    request req;
+    req.push("EXISTS", key);
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if ((bool)ec)
+      return false;
   } catch(std::exception &e) {}
 
-  return result;
+  return std::get<0>(resp).value();
 }
 
-int ObjectDirectory::set_value(CacheObj* object) {
-  /* Creating the index based on objName */
-  std::string result;
-  std::string key = build_index(object);
-  if (!client.is_connected()) { 
-    find_client(&client);
-  }
+void ObjectDirectory::shutdown() // generalize -Sam
+{
+  // call cancel() on the connection's executor
+  boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
+}
 
-  /* Every set will be new */
-  if (addr.host == "" || addr.port == 0) {
-    dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
-    return -2;
-  }
+int ObjectDirectory::set(CacheObj* object, optional_yield y) {
+  std::string key = build_index(object);
     
-  std::string endpoint = addr.host + ":" + std::to_string(addr.port);
-  std::vector< std::pair<std::string, std::string> > list;
+  /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
+  std::string endpoint = cct->_conf->rgw_d4n_host + ":" + std::to_string(cct->_conf->rgw_d4n_port);
+  std::list<std::string> redisValues;
     
-  /* Creating a list of the entry's properties */
-  list.push_back(make_pair("key", key));
-  list.push_back(make_pair("objName", object->objName));
-  list.push_back(make_pair("bucketName", object->bucketName));
-  list.push_back(make_pair("creationTime", std::to_string(object->creationTime)));
-  list.push_back(make_pair("dirty", std::to_string(object->dirty)));
-  list.push_back(make_pair("hosts", endpoint)); 
+  /* Creating a redisValues of the entry's properties */
+  redisValues.push_back("objName");
+  redisValues.push_back(object->objName);
+  redisValues.push_back("bucketName");
+  redisValues.push_back(object->bucketName);
+  redisValues.push_back("creationTime");
+  redisValues.push_back(std::to_string(object->creationTime)); 
+  redisValues.push_back("dirty");
+  redisValues.push_back(std::to_string(object->dirty));
+  redisValues.push_back("objHosts");
+  redisValues.push_back(endpoint); // Set in filter -Sam
 
   try {
-    client.hmset(key, list, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
+    boost::system::error_code ec;
+    request req;
+    req.push_range("HMSET", key, redisValues);
+    response<std::string> resp;
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
 
-    if (result != "OK") {
+    if (std::get<0>(resp).value() != "OK" || (bool)ec) {
       return -1;
     }
   } catch(std::exception &e) {
@@ -93,15 +114,14 @@ int ObjectDirectory::set_value(CacheObj* object) {
   return 0;
 }
 
-int ObjectDirectory::get_value(CacheObj* object) {
-  int keyExist = -2;
+int ObjectDirectory::get(CacheObj* object, optional_yield y) {
   std::string key = build_index(object);
-
+#if 0
   if (!client.is_connected()) {
     find_client(&client);
   }
-
-  if (exist_key(key)) {
+#endif
+  if (exist_key(key, y)) {
     std::string key;
     std::string objName;
     std::string bucketName;
@@ -110,74 +130,88 @@ int ObjectDirectory::get_value(CacheObj* object) {
     std::string hosts;
     std::vector<std::string> fields;
 
-    fields.push_back("key");
     fields.push_back("objName");
     fields.push_back("bucketName");
     fields.push_back("creationTime");
     fields.push_back("dirty");
-    fields.push_back("hosts");
+    fields.push_back("objHosts");
 
     try {
-      client.hmget(key, fields, [&key, &objName, &bucketName, &creationTime, &dirty, &hosts, &keyExist](cpp_redis::reply &reply) {
-        if (reply.is_array()) {
-         auto arr = reply.as_array();
-
-         if (!arr[0].is_null()) {
-           keyExist = 0;
-           key = arr[0].as_string();
-           objName = arr[1].as_string();
-           bucketName = arr[2].as_string();
-           creationTime = arr[3].as_string();
-           dirty = arr[4].as_string();
-           hosts = arr[5].as_string();
-         }
-       }
-      });
+      boost::system::error_code ec;
+      request req;
+      req.push_range("HMGET", key, fields);
+      response< std::vector<std::string> > resp;
 
-      client.sync_commit(std::chrono::milliseconds(1000));
+      redis_exec(conn, ec, req, resp, y);
 
-      if (keyExist < 0) {
-        return keyExist;
+      if (!std::get<0>(resp).value().size() || (bool)ec) {
+       return -1;
       }
 
-      /* Currently, there can only be one host */
-      object->objName = objName;
-      object->bucketName = bucketName;
+      object->objName = std::get<0>(resp).value()[0];
+      object->bucketName = std::get<0>(resp).value()[1];
+      object->creationTime = boost::lexical_cast<time_t>(std::get<0>(resp).value()[2]);
+      object->dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[3]);
 
-      struct std::tm tm;
-      std::istringstream(creationTime) >> std::get_time(&tm, "%T");
-      strptime(creationTime.c_str(), "%T", &tm); // Need to check formatting -Sam
-      object->creationTime = mktime(&tm);
+      {
+        std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
 
-      std::istringstream(dirty) >> object->dirty;
+       while (!ss.eof()) {
+          std::string host;
+         std::getline(ss, host, '_');
+         object->hostsList.push_back(host);
+       }
+      }
     } catch(std::exception &e) {
-      keyExist = -1;
+      return -1;
     }
+  } else {
+    return -2;
   }
 
-  return keyExist;
+  return 0;
 }
 
-int ObjectDirectory::del_value(CacheObj* object) {
-  int result = 0;
-  std::vector<std::string> keys;
+int ObjectDirectory::copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y) {
   std::string key = build_index(object);
+  std::vector<std::string> keys;
   keys.push_back(key);
-  
+  std::string copyKey;
+#if 0 
   if (!client.is_connected()) {
     find_client(&client);
   }
-  
-  if (exist_key(key)) {
+#endif
+  if (exist_key(key, y)) {
     try {
-      client.del(keys, [&result](cpp_redis::reply &reply) {
-        if (reply.is_integer()) {
-          result = reply.as_integer();
+      response<int> resp;
+     
+      {
+       boost::system::error_code ec;
+       request req;
+       req.push("COPY", key, copyKey);
+
+       redis_exec(conn, ec, req, resp, y);
+
+       if ((bool)ec) {
+         return -1;
+       }
+      }
+
+      {
+       boost::system::error_code ec;
+       request req;
+       req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
+       response<std::string> res;
+
+       redis_exec(conn, ec, req, res, y);
+
+        if (std::get<0>(res).value() != "OK" || (bool)ec) {
+          return -1;
         }
-      });
-       
-      client.sync_commit(std::chrono::milliseconds(1000));     
-      return result - 1;
+      }
+
+      return std::get<0>(resp).value() - 1; 
     } catch(std::exception &e) {
       return -1;
     }
@@ -186,84 +220,96 @@ int ObjectDirectory::del_value(CacheObj* object) {
   }
 }
 
-int BlockDirectory::find_client(cpp_redis::client* client) {
-  if (client->is_connected())
-    return 0;
+int ObjectDirectory::del(CacheObj* object, optional_yield y) {
+  std::string key = build_index(object);
 
-   if (addr.host == "" || addr.port == 0) {
-    dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
-    return EDESTADDRREQ;
-  }
+  if (exist_key(key, y)) {
+    try {
+      boost::system::error_code ec;
+      request req;
+      req.push("DEL", key);
+      response<int> resp;
 
-  client->connect(addr.host, addr.port, nullptr);
+      redis_exec(conn, ec, req, resp, y);
 
-  if (!client->is_connected())
-    return ECONNREFUSED;
+      if ((bool)ec)
+        return -1;
 
-  return 0;
+      return std::get<0>(resp).value() - 1; 
+    } catch(std::exception &e) {
+      return -1;
+    }
+  } else {
+    return 0; /* No delete was necessary */
+  }
 }
 
 std::string BlockDirectory::build_index(CacheBlock* block) {
-  return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + boost::lexical_cast<std::string>(block->blockId);
+  return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID);
 }
 
-int BlockDirectory::exist_key(std::string key) {
-  int result = 0;
-  std::vector<std::string> keys;
-  keys.push_back(key);
-  
-  if (!client.is_connected()) {
-    return result;
-  }
+int BlockDirectory::exist_key(std::string key, optional_yield y) {
+  response<int> resp;
 
   try {
-    client.exists(keys, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer();
-      }
-    });
-    
-    client.sync_commit(std::chrono::milliseconds(1000));
+    boost::system::error_code ec;
+    request req;
+    req.push("EXISTS", key);
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if ((bool)ec)
+      return false;
   } catch(std::exception &e) {}
 
-  return result;
+  return std::get<0>(resp).value();
 }
 
-int BlockDirectory::set_value(CacheBlock* block) {
-  /* Creating the index based on objName */
-  std::string result;
-  std::string key = build_index(block);
-  if (!client.is_connected()) { 
-    find_client(&client);
-  }
+void BlockDirectory::shutdown()
+{
+  // call cancel() on the connection's executor
+  boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
+}
 
-  /* Every set will be new */
-  if (addr.host == "" || addr.port == 0) {
-    dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
-    return -2;
-  }
+int BlockDirectory::set(CacheBlock* block, optional_yield y) {
+  std::string key = build_index(block);
     
-  std::string endpoint = addr.host + ":" + std::to_string(addr.port);
-  std::vector< std::pair<std::string, std::string> > list;
+  /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
+  std::string endpoint = cct->_conf->rgw_d4n_host + ":" + std::to_string(cct->_conf->rgw_d4n_port);
+  std::list<std::string> redisValues;
     
-  /* Creating a list of the entry's properties */
-  list.push_back(make_pair("key", key));
-  list.push_back(make_pair("size", std::to_string(block->size)));
-  list.push_back(make_pair("globalWeight", std::to_string(block->globalWeight)));
-  list.push_back(make_pair("bucketName", block->cacheObj.bucketName));
-  list.push_back(make_pair("objName", block->cacheObj.objName));
-  list.push_back(make_pair("hosts", endpoint)); 
+  /* Creating a redisValues of the entry's properties */
+  redisValues.push_back("blockID");
+  redisValues.push_back(std::to_string(block->blockID));
+  redisValues.push_back("version");
+  redisValues.push_back(block->version);
+  redisValues.push_back("size");
+  redisValues.push_back(std::to_string(block->size));
+  redisValues.push_back("globalWeight");
+  redisValues.push_back(std::to_string(block->globalWeight));
+  redisValues.push_back("blockHosts");
+  redisValues.push_back(endpoint); // Set in filter -Sam
+
+  redisValues.push_back("objName");
+  redisValues.push_back(block->cacheObj.objName);
+  redisValues.push_back("bucketName");
+  redisValues.push_back(block->cacheObj.bucketName);
+  redisValues.push_back("creationTime");
+  redisValues.push_back(std::to_string(block->cacheObj.creationTime)); 
+  redisValues.push_back("dirty");
+  redisValues.push_back(std::to_string(block->cacheObj.dirty));
+  redisValues.push_back("objHosts");
+  redisValues.push_back(endpoint); // Set in filter -Sam
 
   try {
-    client.hmset(key, list, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
+    boost::system::error_code ec;
+    request req;
+    req.push_range("HMSET", key, redisValues);
+    response<std::string> resp;
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
 
-    if (result != "OK") {
+    if (std::get<0>(resp).value() != "OK" || (bool)ec) {
       return -1;
     }
   } catch(std::exception &e) {
@@ -273,81 +319,112 @@ int BlockDirectory::set_value(CacheBlock* block) {
   return 0;
 }
 
-int BlockDirectory::get_value(CacheBlock* block) {
-  int keyExist = -2;
+int BlockDirectory::get(CacheBlock* block, optional_yield y) {
   std::string key = build_index(block);
 
-  if (!client.is_connected()) {
-    find_client(&client);
-  }
-
-  if (exist_key(key)) {
-    std::string hosts;
-    std::string size;
-    std::string bucketName;
-    std::string objName;
+  if (exist_key(key, y)) {
     std::vector<std::string> fields;
 
-    fields.push_back("key");
-    fields.push_back("hosts");
+    fields.push_back("blockID");
+    fields.push_back("version");
     fields.push_back("size");
-    fields.push_back("bucketName");
+    fields.push_back("globalWeight");
+    fields.push_back("blockHosts");
+
     fields.push_back("objName");
+    fields.push_back("bucketName");
+    fields.push_back("creationTime");
+    fields.push_back("dirty");
+    fields.push_back("objHosts");
 
     try {
-      client.hmget(key, fields, [&key, &hosts, &size, &bucketName, &objName, &keyExist](cpp_redis::reply &reply) {
-        if (reply.is_array()) {
-         auto arr = reply.as_array();
-
-         if (!arr[0].is_null()) {
-           keyExist = 0;
-           key = arr[0].as_string();
-           hosts = arr[1].as_string();
-           size = arr[2].as_string();
-           bucketName = arr[3].as_string();
-           objName = arr[4].as_string();
-         }
-       }
-      });
+      boost::system::error_code ec;
+      request req;
+      req.push_range("HMGET", key, fields);
+      response< std::vector<std::string> > resp;
 
-      client.sync_commit(std::chrono::milliseconds(1000));
+      redis_exec(conn, ec, req, resp, y);
 
-      if (keyExist < 0 ) {
-        return keyExist;
+      if (!std::get<0>(resp).value().size() || (bool)ec) {
+       return -1;
       }
 
-      /* Currently, there can only be one host */ // update -Sam
-      block->size = std::stoi(size);
-      block->cacheObj.bucketName = bucketName;
-      block->cacheObj.objName = objName;
+      block->blockID = boost::lexical_cast<uint64_t>(std::get<0>(resp).value()[0]);
+      block->version = std::get<0>(resp).value()[1];
+      block->size = boost::lexical_cast<uint64_t>(std::get<0>(resp).value()[2]);
+      block->globalWeight = boost::lexical_cast<int>(std::get<0>(resp).value()[3]);
+
+      {
+        std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
+       block->hostsList.clear();
+
+       while (!ss.eof()) {
+          std::string host;
+         std::getline(ss, host, '_');
+         block->hostsList.push_back(host);
+       }
+      }
+
+      block->cacheObj.objName = std::get<0>(resp).value()[5];
+      block->cacheObj.bucketName = std::get<0>(resp).value()[6];
+      block->cacheObj.creationTime = boost::lexical_cast<time_t>(std::get<0>(resp).value()[7]);
+      block->cacheObj.dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
+
+      {
+        std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[9]));
+       block->cacheObj.hostsList.clear();
+
+       while (!ss.eof()) {
+          std::string host;
+         std::getline(ss, host, '_');
+         block->cacheObj.hostsList.push_back(host);
+       }
+      }
     } catch(std::exception &e) {
-      keyExist = -1;
+      return -1;
     }
+  } else {
+    return -2;
   }
 
-  return keyExist;
+  return 0;
 }
 
-int BlockDirectory::del_value(CacheBlock* block) {
-  int result = 0;
-  std::vector<std::string> keys;
+int BlockDirectory::copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) {
   std::string key = build_index(block);
-  keys.push_back(key);
-  
-  if (!client.is_connected()) {
-    find_client(&client);
-  }
-  
-  if (exist_key(key)) {
+  auto copyBlock = CacheBlock{ .cacheObj = { .objName = copyName, .bucketName = copyBucketName }, .blockID = 0 };
+  std::string copyKey = build_index(&copyBlock);
+
+  if (exist_key(key, y)) {
     try {
-      client.del(keys, [&result](cpp_redis::reply &reply) {
-        if (reply.is_integer()) {
-          result = reply.as_integer();
+      response<int> resp;
+     
+      {
+       boost::system::error_code ec;
+       request req;
+       req.push("COPY", key, copyKey);
+
+       redis_exec(conn, ec, req, resp, y);
+
+       if ((bool)ec) {
+         return -1;
+       }
+      }
+
+      {
+       boost::system::error_code ec;
+       request req;
+       req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
+       response<std::string> res;
+
+       redis_exec(conn, ec, req, res, y);
+
+        if (std::get<0>(res).value() != "OK" || (bool)ec) {
+          return -1;
         }
-      });
-       
-      client.sync_commit(std::chrono::milliseconds(1000));     
-      return result - 1;
+      }
+
+      return std::get<0>(resp).value() - 1; 
     } catch(std::exception &e) {
       return -1;
     }
@@ -356,66 +433,85 @@ int BlockDirectory::del_value(CacheBlock* block) {
   }
 }
 
-int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value) { // represent in cache block too -Sam
-  std::string result;
+int BlockDirectory::del(CacheBlock* block, optional_yield y) {
   std::string key = build_index(block);
 
-  if (!client.is_connected()) {
-    find_client(&client);
-  }
-  
-  if (exist_key(key)) {
-    if (field == "hostsList") {
-      /* Append rather than overwrite */
-      std::string hosts;
-
-      try {
-        client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
-          if (!reply.is_null()) {
-            hosts = reply.as_string();
-          }
-        });
-
-        client.sync_commit(std::chrono::milliseconds(1000));
-      } catch(std::exception &e) {
+  if (exist_key(key, y)) {
+    try {
+      boost::system::error_code ec;
+      request req;
+      req.push("DEL", key);
+      response<int> resp;
+
+      redis_exec(conn, ec, req, resp, y);
+
+      if ((bool)ec)
         return -1;
-      }
-      
-      value += "_";
-      value += hosts;
-    }
 
-    /* Update cache block */ // Remove ones that aren't used -Sam
-    if (field == "size")
-      block->size = std::stoi(value);
-    else if (field == "bucketName")
-      block->cacheObj.bucketName = value;
-    else if (field == "objName")
-      block->cacheObj.objName = value;
-    else if (field == "hostsList")
-      block->hostsList.push_back(value);
+      return std::get<0>(resp).value() - 1; 
+    } catch(std::exception &e) {
+      return -1;
+    }
+  } else {
+    return 0; /* No delete was necessary */
+  }
+}
 
-    std::vector< std::pair<std::string, std::string> > list;
-    list.push_back(std::make_pair(field, value));
+int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value, optional_yield y) {
+  std::string key = build_index(block);
 
+  if (exist_key(key, y)) {
     try {
-      client.hmset(key, list, [&result](cpp_redis::reply &reply) {
-       if (!reply.is_null()) {
-         result = reply.as_string();
-       }
-      });
+      /* Ensure field exists */
+      {
+       boost::system::error_code ec;
+       request req;
+       req.push("HEXISTS", key, field);
+       response<int> resp;
 
-      client.sync_commit(std::chrono::milliseconds(1000));
+       redis_exec(conn, ec, req, resp, y);
 
-      if (result != "OK") {
-       return -1;
+       if (!std::get<0>(resp).value() || (bool)ec)
+         return -1;
+      }
+
+      if (field == "blockHosts" || field == "objHosts") {
+       /* Append rather than overwrite */
+       boost::system::error_code ec;
+       request req;
+       req.push("HGET", key, field);
+       response<std::string> resp;
+
+       redis_exec(conn, ec, req, resp, y);
+
+       if (!std::get<0>(resp).value().size() || (bool)ec)
+         return -1;
+
+       std::get<0>(resp).value() += "_";
+       std::get<0>(resp).value() += value;
+       value = std::get<0>(resp).value();
+      }
+
+      {
+       boost::system::error_code ec;
+       request req;
+       req.push_range("HSET", key, std::map<std::string, std::string>{{field, value}});
+       response<int> resp;
+
+       redis_exec(conn, ec, req, resp, y);
+
+       if ((bool)ec) {
+         return -1;
+       }
+
+       return std::get<0>(resp).value(); /* Zero fields added since it is an update of an existing field */ 
       }
     } catch(std::exception &e) {
       return -1;
     }
+  } else {
+    return -2;
   }
-
-  return 0;
 }
 
 } } // namespace rgw::d4n
index b4b90f5e571de6f27f76f8a6e46fc710c2706ac5..fc10754b06374ef29023032fb7b76cad9063dc5f 100644 (file)
@@ -1,17 +1,21 @@
 #pragma once
 
 #include "rgw_common.h"
-#include <cpp_redis/cpp_redis>
-#include <string>
-#include <iostream>
+
 #include <boost/lexical_cast.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/redis/connection.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
 
 namespace rgw { namespace d4n {
 
-struct Address {
-  std::string host;
-  int port;
-};
+namespace net = boost::asio;
+using boost::redis::config;
+using boost::redis::connection;
+using boost::redis::request;
+using boost::redis::response;
 
 struct CacheObj {
   std::string objName; /* S3 object name */
@@ -23,75 +27,97 @@ struct CacheObj {
 
 struct CacheBlock {
   CacheObj cacheObj;
-  uint64_t blockId; /* block ID */
+  uint64_t blockID;
+  std::string version;
   uint64_t size; /* Block size in bytes */
-  int globalWeight = 0;
+  int globalWeight = 0; /* LFUDA policy variable */
   std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
 };
 
 class Directory {
   public:
-    Directory() {}
     CephContext* cct;
+
+    Directory() {}
 };
 
 class ObjectDirectory: public Directory { // weave into write workflow -Sam
   public:
-    ObjectDirectory() {}
-    ObjectDirectory(std::string host, int port) {
-      addr.host = host;
-      addr.port = port;
+    ObjectDirectory(net::io_context& io_context) {
+      conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
     }
-
-    void init(CephContext* _cct) {
-      cct = _cct;
-      addr.host = cct->_conf->rgw_d4n_host;
-      addr.port = cct->_conf->rgw_d4n_port;
+    ~ObjectDirectory() {
+      shutdown();
     }
 
-    int find_client(cpp_redis::client* client);
-    int exist_key(std::string key);
-    Address get_addr() { return addr; }
+    int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+      this->cct = cct;
+
+      config cfg;
+      cfg.addr.host = cct->_conf->rgw_d4n_host; // same or different address from block directory? -Sam
+      cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
 
-    int set_value(CacheObj* object);
-    int get_value(CacheObj* object);
-    int copy_value(CacheObj* object, CacheObj* copyObject);
-    int del_value(CacheObj* object);
+      if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
+       ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Object directory endpoint was not configured correctly" << dendl;
+       return -EDESTADDRREQ;
+      }
+      
+      conn->async_run(cfg, {}, net::consign(net::detached, conn)); 
+
+      return 0;
+    }
+    int exist_key(std::string key, optional_yield y);
+    void shutdown();
+
+    int set(CacheObj* object, optional_yield y);
+    int get(CacheObj* object, optional_yield y);
+    int copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y);
+    int del(CacheObj* object, optional_yield y);
 
   private:
-    cpp_redis::client client;
-    Address addr;
+    std::shared_ptr<connection> conn;
+
     std::string build_index(CacheObj* object);
 };
 
 class BlockDirectory: public Directory {
   public:
-    BlockDirectory() {}
-    BlockDirectory(std::string host, int port) {
-      addr.host = host;
-      addr.port = port;
+    BlockDirectory(net::io_context& io_context) {
+      conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
+    }
+    ~BlockDirectory() {
+      shutdown();
     }
     
-    void init(CephContext* _cct) {
-      cct = _cct;
-      addr.host = cct->_conf->rgw_d4n_host;
-      addr.port = cct->_conf->rgw_d4n_port;
+    int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+      this->cct = cct;
+
+      config cfg;
+      cfg.addr.host = cct->_conf->rgw_d4n_host;
+      cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
+
+      if (!cfg.addr.host.length() || !cfg.addr.port.length()) { // add logs to other methods -Sam
+       ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Block directory endpoint was not configured correctly" << dendl;
+       return -EDESTADDRREQ;
+      }
+
+      conn->async_run(cfg, {}, net::consign(net::detached, conn)); 
+
+      return 0;
     }
        
-    int find_client(cpp_redis::client* client);
-    int exist_key(std::string key);
-    Address get_addr() { return addr; }
+    int exist_key(std::string key, optional_yield y);
+    void shutdown();
 
-    int set_value(CacheBlock* block);
-    int get_value(CacheBlock* block);
-    int copy_value(CacheBlock* block, CacheBlock* copyBlock);
-    int del_value(CacheBlock* block);
-
-    int update_field(CacheBlock* block, std::string field, std::string value);
+    int set(CacheBlock* block, optional_yield y);
+    int get(CacheBlock* block, optional_yield y);
+    int copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y);
+    int del(CacheBlock* block, optional_yield y);
+    int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y);
 
   private:
-    cpp_redis::client client;
-    Address addr;
+    std::shared_ptr<connection> conn;
+
     std::string build_index(CacheBlock* block);
 };
 
index 5004a096420897d0932972ae700d3447616a72a6..987c7d394585e409092811653d81e42074121590 100644 (file)
 #include "../../../common/async/yield_context.h"
 #include "d4n_policy.h"
-
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
+#include "common/async/blocked_completion.h"
 
 namespace rgw { namespace d4n {
 
-int CachePolicy::find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) {
-  if (client->is_connected())
-    return 0;
-
-  if (get_addr().host == "" || get_addr().port == 0) {
-    ldpp_dout(dpp, 10) << "RGW D4N Policy: D4N policy endpoint was not configured correctly" << dendl;
-    return EDESTADDRREQ;
-  }
+// initiate a call to async_exec() on the connection's executor
+struct initiate_exec {
+  std::shared_ptr<boost::redis::connection> conn;
+  boost::redis::request req;
 
-  client->connect(get_addr().host, get_addr().port, nullptr);
+  using executor_type = boost::redis::connection::executor_type;
+  executor_type get_executor() const noexcept { return conn->get_executor(); }
 
-  if (!client->is_connected())
-    return ECONNREFUSED;
+  template <typename Handler, typename Response>
+  void operator()(Handler handler, Response& resp)
+  {
+    conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn));
+  }
+};
 
-  return 0;
+template <typename Response, typename CompletionToken>
+auto async_exec(std::shared_ptr<connection> conn,
+                const boost::redis::request& req,
+                Response& resp, CompletionToken&& token)
+{
+  return boost::asio::async_initiate<CompletionToken,
+         void(boost::system::error_code, std::size_t)>(
+      initiate_exec{std::move(conn), req}, token, resp);
 }
 
-int CachePolicy::exist_key(std::string key) {
-  int result = -1;
-  std::vector<std::string> keys;
-  keys.push_back(key);
-
-  if (!client.is_connected()) {
-    return result;
+template <typename T>
+void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response<T>& resp, optional_yield y)
+{
+  if (y) {
+    auto yield = y.get_yield_context();
+    async_exec(std::move(conn), req, resp, yield[ec]);
+  } else {
+    async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
   }
+}
 
+int LFUDAPolicy::set_age(int age, optional_yield y) {
   try {
-    client.exists(keys, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {}
+    boost::system::error_code ec;
+    response<int> resp;
+    request req;
+    req.push("HSET", "lfuda", "age", std::to_string(age));
 
-  return result;
-}
+    redis_exec(conn, ec, req, resp, y);
 
-int LFUDAPolicy::set_age(int age) {
-  int result = 0;
+    if (ec)
+      return {};
 
-  try {
-    client.hset("lfuda", "age", std::to_string(age), [&result](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-       result = reply.as_integer(); 
-      }
-    }); 
-
-    client.sync_commit();
+    return std::get<0>(resp).value(); /* Returns number of fields set */
   } catch(std::exception &e) {
     return -1;
   }
-
-  return result - 1;
 }
 
-int LFUDAPolicy::get_age() {
-  int ret = 0;
-  int age = -1;
+int LFUDAPolicy::get_age(optional_yield y) {
+  response<int> resp;
 
   try {
-    client.hexists("lfuda", "age", [&ret](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-        ret = reply.as_integer();
-      }
-    });
+    boost::system::error_code ec;
+    request req;
+    req.push("HEXISTS", "lfuda", "age");
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return -1;
   } catch(std::exception &e) {
     return -1;
   }
 
-  if (!ret) {
-    ret = set_age(0); /* Initialize age */
-
-    if (!ret) {
-      return 0; /* Success */
-    } else {
+  if (!std::get<0>(resp).value()) {
+    if (!set_age(0, y)) /* Initialize age */
+      return 0;
+    else
       return -1;
-    };
   }
 
-  try {
-    client.hget("lfuda", "age", [&age](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-        age = std::stoi(reply.as_string());
-      }
-    });
+  try { 
+    boost::system::error_code ec;
+    response<std::string> value;
+    request req;
+    req.push("HGET", "lfuda", "age");
+      
+    redis_exec(conn, ec, req, value, y);
+
+    if (ec)
+      return -1;
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    return std::stoi(std::get<0>(value).value());
   } catch(std::exception &e) {
     return -1;
   }
-
-  return age;
 }
 
-int LFUDAPolicy::set_global_weight(std::string key, int weight) {
-  int result = 0;
-
+int LFUDAPolicy::set_global_weight(std::string key, int weight, optional_yield y) {
   try {
-    client.hset(key, "globalWeight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-       result = reply.as_integer();
-      }
-    }); 
+    boost::system::error_code ec;
+    response<int> resp;
+    request req;
+    req.push("HSET", key, "globalWeight", std::to_string(weight));
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return {};
 
-    client.sync_commit();
+    return std::get<0>(resp).value(); /* Returns number of fields set */
   } catch(std::exception &e) {
     return -1;
   }
-
-  return result - 1;
 }
 
-int LFUDAPolicy::get_global_weight(std::string key) {
-  int weight = -1;
+int LFUDAPolicy::get_global_weight(std::string key, optional_yield y) {
+  try { 
+    boost::system::error_code ec;
+    response<std::string> resp;
+    request req;
+    req.push("HGET", key, "globalWeight");
+      
+    redis_exec(conn, ec, req, resp, y);
 
-  try {
-    client.hget(key, "globalWeight", [&weight](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-       weight = reply.as_integer();
-      }
-    });
+    if (ec)
+      return -1;
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    return std::stoi(std::get<0>(resp).value());
   } catch(std::exception &e) {
     return -1;
   }
-
-  return weight;
 }
 
-int LFUDAPolicy::set_min_avg_weight(int weight, std::string cacheLocation) {
-  int result = 0;
-
+int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y) {
   try {
-    client.hset("lfuda", "minAvgWeight:cache", cacheLocation, [&result](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-       result = reply.as_integer();
-      }
-    }); 
+    boost::system::error_code ec;
+    response<int> resp;
+    request req;
+    req.push("HSET", "lfuda", "minAvgWeight:cache", cacheLocation);
 
-    client.sync_commit();
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return {};
   } catch(std::exception &e) {
     return -1;
   }
+  
+  try {
+    boost::system::error_code ec;
+    response<int> resp;
+    request req;
+    req.push("HSET", "lfuda", "minAvgWeight:weight", cacheLocation);
 
-  if (result == 1) {
-    result = 0;
-    try {
-      client.hset("lfuda", "minAvgWeight:weight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
-       if (!reply.is_null()) {
-         result = reply.as_integer();
-       }
-      }); 
+    redis_exec(conn, ec, req, resp, y);
 
-      client.sync_commit();
-    } catch(std::exception &e) {
-      return -1;
-    }
-  }
+    if (ec)
+      return {};
 
-  return result - 1;
+    return std::get<0>(resp).value(); /* Returns number of fields set */
+  } catch(std::exception &e) {
+    return -1;
+  }
 }
 
-int LFUDAPolicy::get_min_avg_weight() {
-  int ret = 0;
-  int weight = -1;
+int LFUDAPolicy::get_min_avg_weight(optional_yield y) {
+  response<int> resp;
 
   try {
-    client.hexists("lfuda", "minAvgWeight:cache", [&ret](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-        ret = reply.as_integer();
-      }
-    });
+    boost::system::error_code ec;
+    request req;
+    req.push("HEXISTS", "lfuda", "minAvgWeight:cache");
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return -1;
   } catch(std::exception &e) {
     return -1;
   }
 
-  if (!ret) {
-    ret = set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */); /* Initialize minimum average weight */
-
-    if (!ret) {
-      return INT_MAX; /* Success */
+  if (!std::get<0>(resp).value()) {
+    if (!set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */, y)) { /* Initialize minimum average weight */
+      return INT_MAX;
     } else {
       return -1;
-    };
+    }
   }
 
-  try {
-    client.hget("lfuda", "minAvgWeight:weight", [&weight](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-        weight = std::stoi(reply.as_string());
-      }
-    });
+  try { 
+    boost::system::error_code ec;
+    response<std::string> value;
+    request req;
+    req.push("HGET", "lfuda", "minAvgWeight:weight");
+      
+    redis_exec(conn, ec, req, value, y);
+
+    if (ec)
+      return -1;
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    return std::stoi(std::get<0>(value).value());
   } catch(std::exception &e) {
     return -1;
   }
-
-  return weight;
 }
 
-CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
+CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
   #if 0
   std::vector<rgw::cache::Entry> entries = cacheNode->list_entries(dpp);
   std::string victimName;
   int minWeight = INT_MAX;
 
   for (auto it = entries.begin(); it != entries.end(); ++it) {
-    optional_yield y = null_yield;
     std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight", y); // should represent block -Sam
 
-    if (!std::stoi(localWeightStr)) { // maybe do this in some sort of initialization procedure instead of here? -Sam
-      /* Local weight hasn't been set */
-      int ret = cacheNode->set_attr(dpp, it->key, "localWeight", std::to_string(get_age())); 
+  /* Get victim cache block */
+  CacheBlock victim;
+  victim.cacheObj.objName = it->second->key;
+  victim.cacheObj.bucketName = cacheNode->get_attr(dpp, victim.cacheObj.objName, "bucket_name", y); // generalize for other cache backends -Sam
+  victim.blockID = 0; // find way to get ID -Sam 
 
       if (ret < 0)
        return {};
@@ -239,10 +233,10 @@ CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::C
   /* Get victim cache block */
   CacheBlock victimBlock;
   victimBlock.cacheObj.objName = victimName;
-  BlockDirectory blockDir;
-  blockDir.init(cct);
+  BlockDirectory blockDir(io);
+  blockDir.init(cct, dpp);
 
-  int ret = blockDir.get_value(&victimBlock);
+  int ret = blockDir.get(&victimBlock, y);
 
   if (ret < 0)
     return {};
@@ -251,19 +245,32 @@ CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::C
   return victimBlock;
 }
 
-int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) {
+int LFUDAPolicy::exist_key(std::string key, optional_yield y) {
+  response<int> resp;
+
+  try { 
+    boost::system::error_code ec;
+    request req;
+    req.push("EXISTS", key);
+  
+    redis_exec(conn, ec, req, resp, y);
+    
+    if (ec)
+      return false;
+  } catch(std::exception &e) {}
+
+  return std::get<0>(resp).value();
+}
+
+int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
   std::string key = "rgw-object:" + block->cacheObj.objName + ":directory";
-  optional_yield y = null_yield;
   std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight", y); // change to block name eventually -Sam
   int localWeight = -1;
-
-  if (!client.is_connected())
-    find_client(dpp, &client);
+  response<std::string> resp;
 
   if (localWeightStr.empty()) { // figure out where to set local weight -Sam
-    optional_yield y = null_yield;
-    int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age()), y); 
-    localWeight = get_age();
+    int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age(y)), y); 
+    localWeight = get_age(y);
 
     if (ret < 0)
       return -1;
@@ -271,27 +278,26 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw
     localWeight = std::stoi(localWeightStr);
   }
 
-  int age = get_age();
+  int age = get_age(y);
 
-  bool key_exists = true; //cacheNode->key_exists(dpp, block->cacheObj.objName) //TODO- correct this
-  if (key_exists) { /* Local copy */ 
+  if (exist_key(key, y)) { /* Local copy */ 
     localWeight += age;
   } else {
-    std::string hosts;
     uint64_t freeSpace = cacheNode->get_free_space(dpp);
 
     while (freeSpace < block->size) /* Not enough space in local cache */
-      freeSpace += eviction(dpp, cacheNode);
+      freeSpace += eviction(dpp, cacheNode, y);
 
-    if (exist_key(key)) {
+    if (exist_key(key, y)) { /* Remote copy */
       try {
-       client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
-         if (!reply.is_null()) {
-           hosts = reply.as_string();
-         }
-       });
-
-       client.sync_commit(std::chrono::milliseconds(1000));
+       boost::system::error_code ec;
+       request req;
+       req.push("HGET", key, "blockHosts");
+         
+       redis_exec(conn, ec, req, resp, y);
+
+       if (ec)
+         return -1;
       } catch(std::exception &e) {
        return -1;
       }
@@ -300,11 +306,11 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw
     }
 
     // should not hold local cache IP if in this else statement -Sam
-    if (hosts.length() > 0) { /* Remote copy */
-      int globalWeight = get_global_weight(key);
+    if (std::get<0>(resp).value().length() > 0) { /* Remote copy */
+      int globalWeight = get_global_weight(key, y);
       globalWeight += age;
       
-      if (set_global_weight(key, globalWeight))
+      if (set_global_weight(key, globalWeight, y))
        return -1;
     } else { /* No remote copy */
       // do I need to add the block to the local cache here? -Sam
@@ -317,8 +323,8 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw
   return 0;
 }
 
-uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
-  CacheBlock victim = find_victim(dpp, cacheNode);
+uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
+  CacheBlock victim = find_victim(dpp, cacheNode, y);
 
   if (victim.cacheObj.objName.empty()) {
     ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
@@ -326,21 +332,21 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD
   }
 
   std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory";
-  std::string hosts;
-  int globalWeight = get_global_weight(key);
-  optional_yield y = null_yield;
+  int globalWeight = get_global_weight(key, y);
   int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight", y)); // change to block name eventually -Sam
-  int avgWeight = get_min_avg_weight();
+  int avgWeight = get_min_avg_weight(y);
+  response<std::string> resp;
 
-  if (exist_key(key)) {
+  if (exist_key(key, y)) {
     try {
-      client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
-       if (!reply.is_null()) {
-         hosts = reply.as_string();
-       }
-      });
+      boost::system::error_code ec;
+      request req;
+      req.push("HGET", key, "blockHosts");
+       
+      redis_exec(conn, ec, req, resp, y);
 
-      client.sync_commit(std::chrono::milliseconds(1000));
+      if (ec)
+       return -1;
     } catch(std::exception &e) {
       return -1;
     }
@@ -348,14 +354,13 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD
     return -2; 
   }
 
-  if (hosts.empty()) { /* Last copy */
+  if (std::get<0>(resp).value().empty()) { /* Last copy */
     if (globalWeight > 0) {
       localWeight += globalWeight;
-      optional_yield y = null_yield;
       int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight), y);
 
       if (!ret)
-        ret = set_global_weight(key, 0);
+        ret = set_global_weight(key, 0, y);
       else
         return -1;
 
@@ -373,20 +378,19 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD
 
   globalWeight += localWeight;
 
-  if (set_global_weight(key, globalWeight))
+  if (set_global_weight(key, globalWeight, y))
     return -2;
 
   ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
-  int ret = cacheNode->delete_data(dpp, victim.cacheObj.objName, y);
+  int ret = cacheNode->del(dpp, victim.cacheObj.objName, y);
 
   if (!ret) {
-    uint64_t num_entries = 100; //cacheNode->get_num_entries(dpp) TODO - correct this
-    ret = set_min_avg_weight(avgWeight - (localWeight/num_entries), ""/*local cache location*/); // Where else must this be set? -Sam
+    //ret = set_min_avg_weight(avgWeight - (localWeight/entries_map.size()), ""/*local cache location*/, y); // Where else must this be set? -Sam
 
     if (!ret) {
-      int age = get_age();
+      int age = get_age(y);
       age = std::max(localWeight, age);
-      ret = set_age(age);
+      ret = set_age(age, y);
       
       if (ret)
        return -1;
@@ -400,7 +404,23 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD
   return victim.size;
 }
 
-int LRUPolicy::exist_key(std::string key)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+{
+
+}
+
+bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
+{
+  return false;
+}
+
+void LFUDAPolicy::shutdown()
+{
+  // call cancel() on the connection's executor
+  boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
+}
+
+int LRUPolicy::exist_key(std::string key, optional_yield y)
 {
   const std::lock_guard l(lru_lock);
   if (entries_map.count(key) != 0) {
@@ -409,13 +429,16 @@ int LRUPolicy::exist_key(std::string key)
     return false;
 }
 
-int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode)
+int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y)
 {
-  //Does not apply to LRU
+  uint64_t freeSpace = cacheNode->get_free_space(dpp);
+  while(freeSpace < block->size) {
+    freeSpace = eviction(dpp, cacheNode, y);
+  }
   return 0;
 }
 
-uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode)
+uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y)
 {
   const std::lock_guard l(lru_lock);
   auto p = entries_lru_list.front();
@@ -425,11 +448,11 @@ uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDri
   return cacheNode->get_free_space(dpp);
 }
 
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
 {
   erase(dpp, key);
-  const std::lock_guard l(lru_lock);
-  Entry *e = new Entry(key, offset, len);
+
+  Entry *e = new Entry(key, offset, len, ""); // update version later -Sam
   entries_lru_list.push_back(*e);
   entries_map.emplace(key, e);
 }
@@ -448,7 +471,7 @@ bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
 
 int PolicyDriver::init() {
   if (policyName == "lfuda") {
-    cachePolicy = new LFUDAPolicy();
+    cachePolicy = new LFUDAPolicy(io);
     return 0;
   } else if (policyName == "lru") {
     cachePolicy = new LRUPolicy();
index 9b694510717a5865e0e7062dbc31a2f24d28cf01..9e2faed25c845faffe10a236575664ce81a365ca 100644 (file)
 
 #include <string>
 #include <iostream>
-#include <cpp_redis/cpp_redis>
 #include "rgw_common.h"
 #include "d4n_directory.h"
 #include "../../rgw_redis_driver.h"
 
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
+
 namespace rgw { namespace d4n {
 
 class CachePolicy {
-  private:
-    cpp_redis::client client;
-    Address addr;
+  protected:
+    struct Entry : public boost::intrusive::list_base_hook<> {
+      std::string key;
+      uint64_t offset;
+      uint64_t len;
+      std::string version;
+      Entry(std::string& key, uint64_t offset, uint64_t len, std:: string version) : key(key), offset(offset), 
+                                                                                       len(len), version(version) {}
+    };
+    
+    //The disposer object function
+    struct Entry_delete_disposer {
+      void operator()(Entry *e) {
+        delete e;
+      }
+    };
+    typedef boost::intrusive::list<Entry> List;
+
+    //cpp_redis::client client;
+    //Address addr;
 
   public:
     CephContext* cct;
 
-    CachePolicy() : addr() {}
-    virtual ~CachePolicy() = default;
+    CachePolicy() {}
+    virtual ~CachePolicy() = default; 
 
-    virtual void init(CephContext *_cct) {
-      cct = _cct;
-      addr.host = cct->_conf->rgw_d4n_host;
-      addr.port = cct->_conf->rgw_d4n_port;
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) {
+      this->cct = cct;
+      return 0;
     }
-    virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) = 0;
-    virtual int exist_key(std::string key) = 0;
-    virtual Address get_addr() { return addr; }
-    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0;
-    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) = 0;
+    virtual int exist_key(std::string key, optional_yield y) = 0;
+    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+    virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) = 0;
+    virtual void shutdown() = 0;
 };
 
 class LFUDAPolicy : public CachePolicy {
   private:
-    cpp_redis::client client;
+    net::io_context& io;
+    std::shared_ptr<connection> conn;
 
   public:
-    LFUDAPolicy() : CachePolicy() {}
-
-    int set_age(int age);
-    int get_age();
-    int set_global_weight(std::string key, int weight);
-    int get_global_weight(std::string key);
-    int set_min_avg_weight(int weight, std::string cacheLocation);
-    int get_min_avg_weight();
-    CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode);
-
-    virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return CachePolicy::find_client(dpp, client); }
-    virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); }
-    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
-    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override {}
+    LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) {
+      conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
+    }
+    ~LFUDAPolicy() {
+      //delete dir;
+      shutdown();
+    } 
+
+    int set_age(int age, optional_yield y);
+    int get_age(optional_yield y);
+    int set_global_weight(std::string key, int weight, optional_yield y);
+    int get_global_weight(std::string key, optional_yield y);
+    int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y);
+    int get_min_avg_weight(optional_yield y);
+    CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y);
+
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) {
+      this->cct = cct;
+
+      config cfg;
+      cfg.addr.host = cct->_conf->rgw_d4n_host; // TODO: Replace with cache address
+      cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
+
+      if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
+       ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
+       return -EDESTADDRREQ;
+      }
+
+      conn->async_run(cfg, {}, net::consign(net::detached, conn));
+
+      return 0;
+    }
+    virtual int exist_key(std::string key, optional_yield y) override;
+    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
+    virtual void shutdown() override;
 };
 
 class LRUPolicy : public CachePolicy {
-  public:
-  struct Entry : public boost::intrusive::list_base_hook<> {
-      std::string key;
-      uint64_t offset;
-      uint64_t len;
-      Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {}
-  };
-  LRUPolicy() = default;
   private:
-    std::mutex lru_lock;
-    //The disposer object function
-    struct Entry_delete_disposer {
-      void operator()(Entry *e) {
-        delete e;
-      }
-    };
-    typedef boost::intrusive::list<Entry> List;
     List entries_lru_list;
     std::unordered_map<std::string, Entry*> entries_map;
+    std::mutex lru_lock;
+
   public:
-    virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return 0; };
-    virtual int exist_key(std::string key) override;
-    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
-    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override;
-    bool erase(const DoutPrefixProvider* dpp, const std::string& key);
+    LRUPolicy() = default;
+
+    virtual int exist_key(std::string key, optional_yield y) override;
+    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+    virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
+    virtual void shutdown() override {}
 };
 
 class PolicyDriver {
   private:
+    net::io_context& io;
     std::string policyName;
-
-  public:
     CachePolicy* cachePolicy;
 
-    PolicyDriver(std::string _policyName) : policyName(_policyName) {}
+  public:
+    PolicyDriver(net::io_context& io_context, std::string _policyName) : io(io_context), policyName(_policyName) {}
     ~PolicyDriver() {
       delete cachePolicy;
     }
 
     int init();
+    CachePolicy* get_cache_policy() { return cachePolicy; }
 };
 
 } } // namespace rgw::d4n
index 44da48311061a4c82c2671e2741a0eeee6a79e3d..754ba16439004b887ce6ee5c64cd4c55c0287344 100644 (file)
@@ -36,20 +36,22 @@ static inline Object* nextObject(Object* t)
   return dynamic_cast<FilterObject*>(t)->get_next();
 }
 
-D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next) 
+D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next) 
 {
+
   rgw::cache::Partition partition_info;
   partition_info.location = g_conf()->rgw_d4n_l1_datacache_persistent_path;
   partition_info.name = "d4n";
   partition_info.type = "read-cache";
   partition_info.size = g_conf()->rgw_d4n_l1_datacache_size;
 
+  //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
   cacheDriver = new rgw::cache::SSDDriver(partition_info);
-  objDir = new rgw::d4n::ObjectDirectory();
-  blockDir = new rgw::d4n::BlockDirectory();
+  objDir = new rgw::d4n::ObjectDirectory(io_context);
+  blockDir = new rgw::d4n::BlockDirectory(io_context);
   cacheBlock = new rgw::d4n::CacheBlock();
-  policyDriver = new rgw::d4n::PolicyDriver("lfuda");
-  lruPolicyDriver = new rgw::d4n::PolicyDriver("lru");
+  policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda");
+  lruPolicyDriver = new rgw::d4n::PolicyDriver(io_context, "lru");
 }
 
  D4NFilterDriver::~D4NFilterDriver()
@@ -68,14 +70,14 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
 
   cacheDriver->initialize(cct, dpp);
 
-  objDir->init(cct);
-  blockDir->init(cct);
+  objDir->init(cct, dpp);
+  blockDir->init(cct, dpp);
 
-  policyDriver->init();
-  policyDriver->cachePolicy->init(cct);
+  policyDriver->init(); 
+  policyDriver->get_cache_policy()->init(cct, dpp);
 
   lruPolicyDriver->init();
-  lruPolicyDriver->cachePolicy->init(cct);
+  lruPolicyDriver->get_cache_policy()->init(cct, dpp);
   
   return 0;
 }
@@ -129,16 +131,14 @@ int D4NFilterObject::copy_object(User* user,
                               const DoutPrefixProvider* dpp,
                               optional_yield y)
 {
-  /* Build cache block copy */
-  rgw::d4n::CacheBlock* copyCacheBlock = new rgw::d4n::CacheBlock(); // How will this copy work in lfuda? -Sam
-
-  copyCacheBlock->hostsList.push_back(driver->get_cache_block()->hostsList[0]); 
-  copyCacheBlock->size = driver->get_cache_block()->size;
-  copyCacheBlock->size = driver->get_cache_block()->globalWeight; // Do we want to reset the global weight? -Sam
-  copyCacheBlock->cacheObj.bucketName = dest_bucket->get_name();
-  copyCacheBlock->cacheObj.objName = dest_object->get_key().get_oid();
-  
-  int copy_valueReturn = driver->get_block_dir()->set_value(copyCacheBlock);
+  rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+                                 .cacheObj = {
+                                   .objName = this->get_key().get_oid(), 
+                                   .bucketName = src_bucket->get_name()
+                                 },
+                                 .blockID = 0, // TODO: get correct blockID
+                               };
+  int copy_valueReturn = driver->get_block_dir()->copy(&block, dest_object->get_name(), dest_bucket->get_name(), y);
 
   if (copy_valueReturn < 0) {
     ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation failed." << dendl;
@@ -146,8 +146,6 @@ int D4NFilterObject::copy_object(User* user,
     ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation succeeded." << dendl;
   }
 
-  delete copyCacheBlock;
-
   /* Append additional metadata to attributes */
   rgw::sal::Attrs baseAttrs = this->get_attrs();
   buffer::list bl;
@@ -266,6 +264,49 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d
 
     return next->get_obj_attrs(y, dpp, target_obj);
   } else {
+    /* Set metadata locally */
+    RGWQuotaInfo quota_info;
+    RGWObjState* astate;
+    std::unique_ptr<rgw::sal::User> user = this->driver->get_user(this->get_bucket()->get_owner());
+    this->get_obj_state(dpp, &astate, y);
+
+    for (auto it = attrs.begin(); it != attrs.end(); ++it) {
+      if (it->second.length() > 0) { // or return? -Sam
+       if (it->first == "mtime") {
+         parse_time(it->second.c_str(), &astate->mtime);
+         attrs.erase(it->first);
+       } else if (it->first == "object_size") {
+         this->set_obj_size(std::stoull(it->second.c_str()));
+         attrs.erase(it->first);
+       } else if (it->first == "accounted_size") {
+         astate->accounted_size = std::stoull(it->second.c_str());
+         attrs.erase(it->first);
+       } else if (it->first == "epoch") {
+         astate->epoch = std::stoull(it->second.c_str());
+         attrs.erase(it->first);
+       } else if (it->first == "version_id") {
+         this->set_instance(it->second.c_str());
+         attrs.erase(it->first);
+       } else if (it->first == "this_zone_short_id") {
+         astate->zone_short_id = static_cast<uint32_t>(std::stoul(it->second.c_str()));
+         attrs.erase(it->first);
+       } else if (it->first == "user_quota.max_size") {
+         quota_info.max_size = std::stoull(it->second.c_str());
+         attrs.erase(it->first);
+       } else if (it->first == "user_quota.max_objects") {
+         quota_info.max_objects = std::stoull(it->second.c_str());
+         attrs.erase(it->first);
+       } else if (it->first == "max_buckets") {
+         user->set_max_buckets(std::stoull(it->second.c_str()));
+         attrs.erase(it->first);
+       }
+      }
+    }
+
+    user->set_info(quota_info);
+    this->set_obj_state(*astate);
+   
+    /* Set attributes locally */
     int set_attrsReturn = this->set_attrs(attrs);
     
     if (set_attrsReturn < 0) {
@@ -340,7 +381,7 @@ std::unique_ptr<Writer> D4NFilterDriver::get_atomic_writer(const DoutPrefixProvi
                                                           owner, ptail_placement_rule,
                                                           olh_epoch, unique_tag);
 
-  return std::make_unique<D4NFilterWriter>(std::move(writer), this, obj, dpp, true);
+  return std::make_unique<D4NFilterWriter>(std::move(writer), this, obj, dpp, true, y);
 }
 
 std::unique_ptr<Object::ReadOp> D4NFilterObject::get_read_op()
@@ -488,6 +529,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
   this->client_cb = cb;
   this->cb->set_client_cb(cb, dpp); // what's this for? -Sam
+  // save y here -Sam
 
   /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
      One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
@@ -534,11 +576,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
     " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-    if (source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) { 
+    if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) { 
       // Read From Cache
       auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
 
-      source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, source->driver->get_cache_driver());
+      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", source->driver->get_cache_driver(), y);
 
       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
@@ -555,11 +597,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << 
       " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
-      if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) {
+      if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) {
         // Read From Cache
         auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
 
-        source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, source->driver->get_cache_driver());
+        source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", source->driver->get_cache_driver(), y);
 
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
 
@@ -625,12 +667,12 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     return r;
   }
   
-  return this->cb->flush_last_part();
+  return this->cb->flush_last_part(y);
 
   /*
   / Execute cache replacement policy /
-  int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(), 
-                   source->driver->get_cache_driver());
+  int policyRet = source->driver->get_policy_driver()->get_cache_policy()->get_block(dpp, source->driver->get_cache_block(), 
+                   source->driver->get_cache_driver(), y);
   
   if (policyRet < 0) {
     ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation failed." << dendl;
@@ -649,7 +691,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     cb->handle_data(bl, ofs, len);
   } else {
     / Block directory check /
-    int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block()); 
+    int getDirReturn = source->driver->get_block_dir()->get(source->driver->get_cache_block()); 
 
     if (getDirReturn >= -1) {
       if (getDirReturn == -1) {
@@ -677,7 +719,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
     } else {
       / Write tier retrieval /
       ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
-      getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj));
+      getDirReturn = source->driver->get_obj_dir()->get(&(source->driver->get_cache_block()->cacheObj));
 
       if (getDirReturn >= -1) {
        if (getDirReturn == -1) {
@@ -719,7 +761,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
          source->driver->get_cache_block()->cacheObj.bucketName = source->get_bucket()->get_name();
          source->driver->get_cache_block()->cacheObj.objName = source->get_key().get_oid();
 
-         int setDirReturn = tempBlockDir->set_value(source->driver->get_cache_block());
+         int setDirReturn = tempBlockDir->set(source->driver->get_cache_block());
 
          if (setDirReturn < 0) {
            ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
@@ -737,8 +779,9 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
   return next->iterate(dpp, ofs, end, cb, y); */
 }
 
-int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(optional_yield y)
 {
+  save_y = &y;
   last_part = true;
   return handle_data(bl_rem, 0, bl_rem.length());
 }
@@ -760,22 +803,25 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     const std::lock_guard l(d4n_get_data_lock);
     rgw::d4n::CacheBlock block;
     rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
-    block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port));
+    //block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port));
     block.cacheObj.bucketName = source->get_bucket()->get_name();
     block.cacheObj.objName = source->get_key().get_oid();
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
       std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+      block.blockID = 0; // TODO: fill out block correctly
+      block.version = "";
       block.size = bl.length();
-      block.blockId = ofs;
+      block.blockID = ofs;
       uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
       while(freeSpace < block.size) {
-        freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver());
+        freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
       }
       if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
-        filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+        filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
         /* Store block in directory */
-        if (!blockDir->exist_key(oid)) {
+        if (!blockDir->exist_key(oid, null_yield)) {
+          #if 0
           int ret = blockDir->set_value(&block);
           if (ret < 0) {
             ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
@@ -783,21 +829,24 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           } else {
             ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
           }
+          #endif
         }
       }
     } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
       std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
       ofs += bl_len;
-      block.blockId = ofs;
+      block.blockID = ofs;
+      block.version = "";
       block.size = bl.length();
       uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
       while(freeSpace < block.size) {
-        freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver());
+        freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
       }
       if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
-        filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+        filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
         /* Store block in directory */
-        if (!blockDir->exist_key(oid)) {
+        if (!blockDir->exist_key(oid, null_yield)) {
+          #if 0
           int ret = blockDir->set_value(&block);
           if (ret < 0) {
             ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
@@ -805,6 +854,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           } else {
             ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
           }
+          #endif
         }
       }
     } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
@@ -818,16 +868,17 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       if (bl_rem.length() == rgw_get_obj_max_req_size) {
         std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
         ofs += bl_rem.length();
-        block.blockId = ofs;
+        block.blockID = ofs;
         block.size = bl_rem.length();
         uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
         while(freeSpace < block.size) {
-          freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver());
+          freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
         }
         if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
-          filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl_rem.length(), filter->get_cache_driver());
+          filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", filter->get_cache_driver(), null_yield);
           /* Store block in directory */
-          if (!blockDir->exist_key(oid)) {
+          if (!blockDir->exist_key(oid, null_yield)) {
+            #if 0
             int ret = blockDir->set_value(&block);
             if (ret < 0) {
               ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
@@ -835,6 +886,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
             } else {
               ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
             }
+            #endif
           }
         }
 
@@ -850,7 +902,14 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
                                                    optional_yield y, uint32_t flags)
 {
-  int delDirReturn = source->driver->get_block_dir()->del_value(source->driver->get_cache_block());
+  rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+                                 .cacheObj = {
+                                   .objName = source->get_key().get_oid(),
+                                   .bucketName = source->get_bucket()->get_name()
+                                 },
+                                 .blockID = 0 // TODO: get correct blockID
+                               };
+  int delDirReturn = source->driver->get_block_dir()->del(&block, y);
 
   if (delDirReturn < 0) {
     ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation failed." << dendl;
@@ -867,13 +926,11 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
     currentFields.push_back(attrs->first);
   }
 
-  int delObjReturn = source->driver->get_cache_driver()->delete_data(dpp, source->get_key().get_oid(), y);
+  int delObjReturn = source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y);
 
   if (delObjReturn < 0) {
     ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object operation failed." << dendl;
   } else {
-    Attrs delattrs = source->get_attrs();
-    delObjReturn = source->driver->get_cache_driver()->delete_attrs(dpp, source->get_key().get_oid(), delattrs, y);
     ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation succeeded." << dendl;
   }
 
@@ -896,14 +953,15 @@ int D4NFilterWriter::prepare(optional_yield y)
 int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
 {
   /*
-  int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(), data);
+  int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(), 
+                                                                   data, y);
 
   if (append_dataReturn < 0) {
     ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl;
   } else {
     ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl;
-  }
-*/
+  }*/
+
   return next->process(std::move(data), offset);
 }
 
@@ -917,14 +975,22 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
                        const req_context& rctx,
                        uint32_t flags)
 {
-  rgw::d4n::BlockDirectory* tempBlockDir = driver->get_block_dir();
-
-  driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); 
-  driver->get_cache_block()->size = accounted_size;
-  driver->get_cache_block()->cacheObj.bucketName = obj->get_bucket()->get_name();
-  driver->get_cache_block()->cacheObj.objName = obj->get_key().get_oid();
-
-  int setDirReturn = tempBlockDir->set_value(driver->get_cache_block());
+  rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+                                 .cacheObj = {
+                                   .objName = obj->get_key().get_oid(), 
+                                   .bucketName = obj->get_bucket()->get_name(),
+                                   .creationTime = 0, // TODO: get correct value
+                                   .dirty = false,
+                                  .hostsList = { "127.0.0.1:6379" /*current cache addr*/ } // TODO: fix
+                                 },
+                                 .blockID = 0, // TODO: get correct version/blockID
+                                 .version = "", 
+                                 .size = accounted_size,
+                                 .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_host + ":" + 
+                                                std::to_string(driver->get_block_dir()->cct->_conf->rgw_d4n_port) }
+                               };
+
+  int setDirReturn = driver->get_block_dir()->set(&block, y);
 
   if (setDirReturn < 0) {
     ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
@@ -984,12 +1050,16 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
 
   baseAttrs.insert(attrs.begin(), attrs.end());
 
-/*  int set_attrsReturn = driver->get_cache_driver()->set_attrs(save_dpp, obj->get_key().get_oid(), baseAttrs);
-
-  if (set_attrsReturn < 0) {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation failed." << dendl;
+  // is the accounted_size equivalent to the length? -Sam
+  
+  //bufferlist bl_empty;
+  //int putReturn = driver->get_cache_driver()->
+  //     put(save_dpp, obj->get_key().get_oid(), bl_empty, accounted_size, baseAttrs, y); /* Data already written during process call */
+  /*
+  if (putReturn < 0) {
+    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation failed." << dendl;
   } else {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation succeeded." << dendl;
+    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation succeeded." << dendl;
   }
   */
   return ret;
@@ -999,9 +1069,9 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
 
 extern "C" {
 
-rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next)
+rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next, void* io_context)
 {
-  rgw::sal::D4NFilterDriver* driver = new rgw::sal::D4NFilterDriver(next);
+  rgw::sal::D4NFilterDriver* driver = new rgw::sal::D4NFilterDriver(next, *static_cast<boost::asio::io_context*>(io_context));
 
   return driver;
 }
index bda8aa9f19929428f2c9613ba713108efaa72181..0e0bb6e4d942246a0a2cbea9eda962c8fc335a1e 100644 (file)
 #include "driver/d4n/d4n_policy.h"
 
 #include <boost/intrusive/list.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/redis/connection.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
 
 namespace rgw { namespace sal {
 
@@ -41,8 +47,7 @@ class D4NFilterDriver : public FilterDriver {
     rgw::d4n::PolicyDriver* lruPolicyDriver;
 
   public:
-    D4NFilterDriver(Driver* _next);
-
+    D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context);
     virtual ~D4NFilterDriver();
 
     virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override;
@@ -116,10 +121,12 @@ class D4NFilterObject : public FilterObject {
                                                                          oid(_oid), 
                                                                    source(_source) {}
 
+           optional_yield* save_y;
+
            int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
            void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp) { this->client_cb = client_cb; this->dpp = dpp;}
            void set_ofs(uint64_t ofs) { this->ofs = ofs; }
-           int flush_last_part();
+           int flush_last_part(optional_yield y);
            void bypass_cache_write() { this->write_to_cache = false; }
        };
 
@@ -201,16 +208,17 @@ class D4NFilterWriter : public FilterWriter {
     D4NFilterDriver* driver; 
     const DoutPrefixProvider* save_dpp;
     bool atomic;
+    optional_yield y;
 
   public:
     D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj, 
-       const DoutPrefixProvider* _dpp) : FilterWriter(std::move(_next), _obj),
-                                         driver(_driver),
-                                         save_dpp(_dpp), atomic(false) {}
+       const DoutPrefixProvider* _dpp, optional_yield _y) : FilterWriter(std::move(_next), _obj),
+                                                            driver(_driver),
+                                                            save_dpp(_dpp), atomic(false), y(_y) {}
     D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj, 
-       const DoutPrefixProvider* _dpp, bool _atomic) : FilterWriter(std::move(_next), _obj),
-                                                       driver(_driver),
-                                                       save_dpp(_dpp), atomic(_atomic) {}
+       const DoutPrefixProvider* _dpp, bool _atomic, optional_yield _y) : FilterWriter(std::move(_next), _obj),
+                                                                          driver(_driver),
+                                                                          save_dpp(_dpp), atomic(_atomic), y(_y) {}
     virtual ~D4NFilterWriter() = default;
 
     virtual int prepare(optional_yield y);
index 44403d248ddc9af6288363a602c1cb041dd0c4b9..732e2cd5ce0178b89acbecbb0226a69f2929a1b9 100644 (file)
@@ -20,6 +20,7 @@ class CacheDriver {
     virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) = 0;
     virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) = 0;
     virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) = 0;
+    virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
     virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0;
     virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0;
     virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) = 0;
index e9c1006b606bc4ba8af18914fe5146688c0b0ec5..b1d0db6fb3b4c8d2fb6d7599bb993fbd9b778aaf 100644 (file)
@@ -1,46 +1,64 @@
 #include <boost/algorithm/string.hpp>
-#include "rgw_redis_driver.h"
-//#include "rgw_ssd_driver.h" // fix -Sam
-//#include <aedis/src.hpp>
+#include <boost/redis/src.hpp>
+#include <boost/asio/detached.hpp>
 
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
+#include "rgw_redis_driver.h"
+#include "common/async/blocked_completion.h"
 
 namespace rgw { namespace cache {
 
 std::unordered_map<std::string, Partition> RedisDriver::partitions;
 
-std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary) 
+std::list<std::string> build_attrs(rgw::sal::Attrs* binary) 
 {
-  std::vector< std::pair<std::string, std::string> > values;
+  std::list<std::string> values;
   rgw::sal::Attrs::iterator attrs;
 
   /* Convert to vector */
   if (binary != NULL) {
     for (attrs = binary->begin(); attrs != binary->end(); ++attrs) {
-      values.push_back(std::make_pair(attrs->first, attrs->second.to_str()));
+      values.push_back(attrs->first);
+      values.push_back(attrs->second.to_str());
     }
   }
 
   return values;
 }
 
-int RedisDriver::find_client(const DoutPrefixProvider* dpp) 
+// initiate a call to async_exec() on the connection's executor
+struct initiate_exec {
+  std::shared_ptr<boost::redis::connection> conn;
+  boost::redis::request req;
+
+  using executor_type = boost::redis::connection::executor_type;
+  executor_type get_executor() const noexcept { return conn->get_executor(); }
+  
+  template <typename Handler, typename Response>
+  void operator()(Handler handler, Response& resp)
+  {
+    conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn));
+  } 
+};
+
+template <typename Response, typename CompletionToken>
+auto async_exec(std::shared_ptr<connection> conn,
+                const boost::redis::request& req,
+                Response& resp, CompletionToken&& token)
 {
-  if (client.is_connected())
-    return 0;
+  return boost::asio::async_initiate<CompletionToken,
+         void(boost::system::error_code, std::size_t)>(
+      initiate_exec{std::move(conn), req}, token, resp);
+}
 
-  if (addr.host == "" || addr.port == 0) { 
-    ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
-    return EDESTADDRREQ;
+template <typename T>
+void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response<T>& resp, optional_yield y)
+{
+  if (y) {
+    auto yield = y.get_yield_context();
+    async_exec(std::move(conn), req, resp, yield[ec]);
+  } else {
+    async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
   }
-
-  client.connect(addr.host, addr.port, nullptr);
-
-  if (!client.is_connected())
-    return ECONNREFUSED;
-
-  return 0;
 }
 
 int RedisDriver::add_partition_info(Partition& info)
@@ -137,24 +155,20 @@ int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp)
 {
   this->cct = cct;
 
-  addr.host = cct->_conf->rgw_d4n_host; // change later -Sam
-  addr.port = cct->_conf->rgw_d4n_port;
-
   if (partition_info.location.back() != '/') {
     partition_info.location += "/";
   }
 
-  if (addr.host == "" || addr.port == 0) {
+  config cfg;
+  cfg.addr.host = cct->_conf->rgw_d4n_host; // TODO: Replace with cache address
+  cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
+
+  if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
     ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
     return -EDESTADDRREQ;
   }
 
-  client.connect("127.0.0.1", 6379, nullptr);
-
-  if (!client.is_connected()) {
-    ldpp_dout(dpp, 10) << "RGW Redis Cache: Could not connect to redis cache endpoint." << dendl;
-    return ECONNREFUSED;
-  }
+  conn->async_run(cfg, {}, net::consign(net::detached, conn));
 
   return 0;
 }
@@ -163,24 +177,23 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff
 {
   std::string entry = partition_info.location + key;
 
-  if (!client.is_connected()) 
-    find_client(dpp);
-
   /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
   try {
-    std::string result; 
+    boost::system::error_code ec;
+    response<std::string> resp;
     auto redisAttrs = build_attrs(&attrs);
-    redisAttrs.push_back({"data", bl.to_str()});
 
-    client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-       result = reply.as_string();
-      }
-    });
+    if (bl.length()) {
+      redisAttrs.push_back("data");
+      redisAttrs.push_back(bl.to_str());
+    }
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    request req;
+    req.push_range("HMSET", entry, redisAttrs);
+
+    redis_exec(conn, ec, req, resp, y);
 
-    if (result != "OK") {
+    if (std::get<0>(resp).value() != "OK" || ec) {
       return -1;
     }
   } catch(std::exception &e) {
@@ -194,75 +207,89 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_
 {
   std::string entry = partition_info.location + key;
   
-  if (!client.is_connected()) 
-    find_client(dpp);
-    
-    /* Retrieve existing values from cache */
+  /* Retrieve existing values from cache */
   try {
-      client.hgetall(entry, [&bl, &attrs](cpp_redis::reply &reply) {
-       if (reply.is_array()) {
-         auto arr = reply.as_array();
-    
-         if (!arr[0].is_null()) {
-           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
-             if (arr[i].as_string() == "data") {
-                bl.append(arr[i + 1].as_string());
-             } else {
-               buffer::list bl_value;
-               bl_value.append(arr[i + 1].as_string());
-                attrs.insert({arr[i].as_string(), bl_value});
-               bl_value.clear();
-             }
-            }
-         }
-       }
-      });
+    boost::system::error_code ec;
+    response< std::map<std::string, std::string> > resp;
+    request req;
+    req.push("HGETALL", entry);
 
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
       return -1;
-    }
 
+    for (auto const& it : std::get<0>(resp).value()) {
+      if (it.first == "data") {
+       bl.append(it.second);
+      } else {
+       buffer::list bl_value;
+       bl_value.append(it.second);
+       attrs.insert({it.first, bl_value});
+       bl_value.clear();
+      }
+    }
+  } catch(std::exception &e) {
+    return -1;
+  }
 
   return 0;
 }
 
+int RedisDriver::del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+{
+  std::string entry = partition_info.location + key;
+
+  try {
+    boost::system::error_code ec;
+    response<int> resp;
+    request req;
+    req.push("DEL", entry);
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return -1;
+
+    return std::get<0>(resp).value() - 1; 
+  } catch(std::exception &e) {
+    return -1;
+  }
+}
+
 int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) 
 {
   std::string value;
   std::string entry = partition_info.location + key;
 
-  if (!client.is_connected()) 
-    find_client(dpp);
-
   try {
-    client.hget(entry, "data", [&value](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        value = reply.as_string();
-      }
-    });
+    boost::system::error_code ec;
+    response<std::string> resp;
+    request req;
+    req.push("HGET", entry, "data");
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return -1;
+
+    value = std::get<0>(resp).value();
   } catch(std::exception &e) {
     return -1;
   }
 
   try { // do we want key check here? -Sam
     /* Append to existing value or set as new value */
+    boost::system::error_code ec;
+    response<std::string> resp;
     std::string newVal = value + bl_data.to_str();
-    std::vector< std::pair<std::string, std::string> > field;
-    field.push_back({"data", newVal});
-    std::string result;
 
-    client.hmset(entry, field, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
+    request req;
+    req.push("HMSET", entry, "data", newVal);
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
 
-    if (result != "OK") {
+    if (std::get<0>(resp).value() != "OK" || ec) {
       return -1;
     }
   } catch(std::exception &e) {
@@ -275,47 +302,38 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string&
 int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) 
 {
   std::string entry = partition_info.location + key;
-
-  if (!client.is_connected()) 
-    find_client(dpp);
-
-  int exists = -2;
+  response<int> value;
+  response<int> resp;
 
   try {
-      client.hexists(entry, "data", [&exists](cpp_redis::reply &reply) {
-       if (!reply.is_null()) {
-         exists = reply.as_integer();
-       }
-      });
+    boost::system::error_code ec;
+    request req;
+    req.push("HEXISTS", entry, "data");
 
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
-      return -1;
-    }
+    redis_exec(conn, ec, req, resp, y);
 
-    if (exists) {
-      try {
-       int result;
-       std::vector<std::string> deleteField;
-       deleteField.push_back("data");
+    if (ec)
+      return -1;
+  } catch(std::exception &e) {
+    return -1;
+  }
 
-       client.hdel(entry, deleteField, [&result](cpp_redis::reply &reply) {
-         if (reply.is_integer()) {
-           result = reply.as_integer(); 
-         }
-       });
+  if (std::get<0>(resp).value()) {
+    try {
+      boost::system::error_code ec;
+      request req;
+      req.push("HDEL", entry, "data");
 
-       client.sync_commit(std::chrono::milliseconds(1000));
+      redis_exec(conn, ec, req, value, y);
 
-       if (!result) {
-         return -1;
-       }
-      } catch(std::exception &e) {
+      if (!std::get<0>(value).value() || ec) {
        return -1;
       }
-    } else {
-      return 0; /* No delete was necessary */
+    } catch(std::exception &e) {
+      return -1;
     }
+  }
+
   return 0;
 }
 
@@ -323,31 +341,28 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key
 {
   std::string entry = partition_info.location + key;
 
-  if (!client.is_connected()) 
-    find_client(dpp);
-
   try {
-      client.hgetall(entry, [&attrs](cpp_redis::reply &reply) {
-       if (reply.is_array()) { 
-         auto arr = reply.as_array();
-    
-         if (!arr[0].is_null()) {
-           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
-             if (arr[i].as_string() != "data") {
-               buffer::list bl_value;
-               bl_value.append(arr[i + 1].as_string());
-                attrs.insert({arr[i].as_string(), bl_value});
-               bl_value.clear();
-             }
-            }
-         }
-       }
-      });
+    boost::system::error_code ec;
+    response< std::map<std::string, std::string> > resp;
+    request req;
+    req.push("HGETALL", entry);
 
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
       return -1;
+
+    for (auto const& it : std::get<0>(resp).value()) {
+      if (it.first != "data") {
+       buffer::list bl_value;
+       bl_value.append(it.second);
+       attrs.insert({it.first, bl_value});
+       bl_value.clear();
+      }
     }
+  } catch(std::exception &e) {
+    return -1;
+  }
 
   return 0;
 }
@@ -359,24 +374,20 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key
       
   std::string entry = partition_info.location + key;
 
-  if (!client.is_connected()) 
-    find_client(dpp);
-
   /* Every attr set will be treated as new */
   try {
+    boost::system::error_code ec;
+    response<std::string> resp;
     std::string result;
-    auto redisAttrs = build_attrs(&attrs);
+    std::list<std::string> redisAttrs = build_attrs(&attrs);
 
-    client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
+    request req;
+    req.push_range("HMSET", entry, redisAttrs);
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
 
-    if (result != "OK") {
-return -1;
+    if (std::get<0>(resp).value() != "OK" || ec) {
+      return -1;
     }
   } catch(std::exception &e) {
     return -1;
@@ -389,22 +400,17 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string&
 {
   std::string entry = partition_info.location + key;
 
-  if (!client.is_connected()) 
-    find_client(dpp);
-
   try {
-    std::string result;
+    boost::system::error_code ec;
+    response<std::string> resp;
     auto redisAttrs = build_attrs(&attrs);
 
-    client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
+    request req;
+    req.push_range("HMSET", entry, redisAttrs);
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
 
-    if (result != "OK") {
+    if (std::get<0>(resp).value() != "OK" || ec) {
       return -1;
     }
   } catch(std::exception &e) {
@@ -418,236 +424,126 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string&
 {
   std::string entry = partition_info.location + key;
 
-  if (!client.is_connected()) 
-    find_client(dpp);
-
-  std::vector<std::string> getFields;
-
   try {
-    client.hgetall(entry, [&getFields](cpp_redis::reply &reply) {
-if (reply.is_array()) {
-  auto arr = reply.as_array();
-  
-  if (!arr[0].is_null()) {
-    for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
-      getFields.push_back(arr[i].as_string());
-    }
-  }
-}
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {
-    return -1;
-  }
+    boost::system::error_code ec;
+    response<int> resp;
+    auto redisAttrs = build_attrs(&del_attrs);
 
-  auto redisAttrs = build_attrs(&del_attrs);
-  std::vector<std::string> redisFields;
-
-  std::transform(begin(redisAttrs), end(redisAttrs), std::back_inserter(redisFields),
-    [](auto const& pair) { return pair.first; });
-
-  /* Only delete attributes that have been stored */
-  for (const auto& it : redisFields) {
-    if (std::find(getFields.begin(), getFields.end(), it) == getFields.end()) {
-      redisFields.erase(std::find(redisFields.begin(), redisFields.end(), it));
-    }
-  }
-
-  try {
-    int result = 0;
+    request req;
+    req.push_range("HDEL", entry, redisAttrs);
 
-    client.hdel(entry, redisFields, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer();
-      }
-    });
+    redis_exec(conn, ec, req, resp, y);
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    if (ec)
+      return -1;
 
-    return result - 1;
+    return std::get<0>(resp).value(); 
   } catch(std::exception &e) {
     return -1;
   }
-
-  ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl;
-  return -2;
 }
 
 std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) 
 {
   std::string entry = partition_info.location + key;
-  std::string attrValue;
-
-  if (!client.is_connected()) 
-    find_client(dpp);
-
-  int exists = -2;
-  std::string getValue;
+  response<std::string> value;
+  response<int> resp;
 
   /* Ensure field was set */
   try {
-    client.hexists(entry, attr_name, [&exists](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-        exists = reply.as_integer();
-      }
-    });
+    boost::system::error_code ec;
+    request req;
+    req.push("HEXISTS", entry, attr_name);
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return {};
   } catch(std::exception &e) {
     return {};
   }
   
-  if (!exists) {
+  if (!std::get<0>(resp).value()) {
     ldpp_dout(dpp, 20) << "RGW Redis Cache: Attribute was not set." << dendl;
     return {};
   }
 
   /* Retrieve existing value from cache */
   try {
-    client.hget(entry, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        attrValue = reply.as_string();
-      }
-    });
+    boost::system::error_code ec;
+    request req;
+    req.push("HGET", entry, attr_name);
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, value, y);
+
+    if (ec)
+      return {};
   } catch(std::exception &e) {
     return {};
   }
 
-  return attrValue;
+  return std::get<0>(value).value();
 }
 
-int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal, optional_yield y) 
+int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) 
 {
   std::string entry = partition_info.location + key;
-  int result = 0;
-    
-  if (!client.is_connected()) 
-    find_client(dpp);
+  response<int> resp;
     
   /* Every attr set will be treated as new */
   try {
-    client.hset(entry, attr_name, attrVal, [&result](cpp_redis::reply& reply) {
-    if (!reply.is_null()) {
-        result = reply.as_integer();
-      }
-    });
+    boost::system::error_code ec;
+    request req;
+    req.push("HSET", entry, attr_name, attr_val);
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec)
+      return {};
   } catch(std::exception &e) {
     return -1;
   }
 
-  return result - 1;
-}
-#if 0
-std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) 
-{
-  return std::make_unique<RedisCacheAioRequest>(this);  
-}
-#endif
-rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) 
-{
-  rgw_raw_obj r_obj;
-  r_obj.oid = key;
-
-  return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
+  return std::get<0>(resp).value();
 }
 
-int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
+static Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn,
+                                 off_t read_ofs, off_t read_len, const std::string& key)
 {
-  ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl;
-  aio_cb.reset(new struct aiocb);
-  memset(aio_cb.get(), 0, sizeof(struct aiocb));
-  aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY));
-
-  if (aio_cb->aio_fildes < 0) {
-      int err = errno;
-      ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl;
-      return -err;
-  }
-
-  if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) {
-      posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
-  }
+  return [y, conn, key] (Aio* aio, AioResult& r) mutable {
+    using namespace boost::asio;
+    spawn::yield_context yield = y.get_yield_context();
+    async_completion<spawn::yield_context, void()> init(yield);
+    auto ex = get_associated_executor(init.completion_handler);
 
-  bufferptr bp(read_len);
-  aio_cb->aio_buf = bp.c_str();
-  result.append(std::move(bp));
+    boost::redis::request req;
+    req.push("HGET", key, "data");
 
-  aio_cb->aio_nbytes = read_len;
-  aio_cb->aio_offset = read_ofs;
-  aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
-  aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch;
-  aio_cb->aio_sigevent.sigev_notify_attributes = nullptr;
-  aio_cb->aio_sigevent.sigev_value.sival_ptr = arg;
+    // TODO: Make unique pointer once support is added
+    auto s = std::make_shared<RedisDriver::redis_response>();
+    auto& resp = s->resp;
 
-  return 0;
+    conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
+  };
 }
 
-void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval)
+rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) 
 {
-  auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
-  auto op = std::move(p->user_data);
-  const int ret = -aio_error(op.aio_cb.get());
-  boost::system::error_code ec;
-  if (ret < 0) {
-      ec.assign(-ret, boost::system::system_category());
-  }
+  std::string entry = partition_info.location + key;
+  rgw_raw_obj r_obj;
+  r_obj.oid = key;
 
-  ceph::async::dispatch(std::move(p), ec, std::move(op.result));
+  return aio->get(r_obj, redis_read_op(y, conn, ofs, len, entry), cost, id);
 }
 
-template <typename Executor1, typename CompletionHandler>
-auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler)
-{
-  auto p = Completion::create(ex1, std::move(handler));
-  return p;
-}
-
-template <typename ExecutionContext, typename CompletionToken>
-auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
-                off_t read_ofs, off_t read_len, CompletionToken&& token)
-{
-  std::string location = partition_info.location + key;
-  ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl;
-
-  using Op = AsyncReadOp;
-  using Signature = typename Op::Signature;
-  boost::asio::async_completion<CompletionToken, Signature> init(token);
-  auto p = Op::create(ctx.get_executor(), init.completion_handler);
-  auto& op = p->user_data;
-
-  int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
-  if (0 == ret) {
-    ret = ::aio_read(op.aio_cb.get());
-  }
-//  ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
- /* if(ret < 0) {
-      auto ec = boost::system::error_code{-ret, boost::system::system_category()};
-      ceph::async::post(std::move(p), ec, bufferlist{});
-  } else {
-      (void)p.release();
-  }*/
-  //return init.result.get();
-}
 #if 0
-void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
-{
-  using namespace boost::asio;
-  async_completion<yield_context, void()> init(y.get_yield_context());
-  auto ex = get_associated_executor(init.completion_handler);
-
-  ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): key=" << key << dendl;
-  cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, RedisDriver::libaio_handler{aio, r}));
-}
-
-void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
+int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
 {
+  // call cancel() on the connection's executor
+  boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
 }
 #endif
+
 int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
 {
   return 0;
index bbe430d31ecbdc9f148cd4a285548804ececf66f..9b818b42c390ac1c24d4fb91c22dc7ef4922c2df 100644 (file)
@@ -1,22 +1,30 @@
 #pragma once
 
-//#include <aedis.hpp>
 #include <aio.h>
+#include <boost/redis/connection.hpp>
+
 #include "common/async/completion.h"
 #include "rgw_common.h"
 #include "rgw_cache_driver.h"
 
-#include <cpp_redis/cpp_redis>
-#include "driver/d4n/d4n_directory.h"
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
 
 namespace rgw { namespace cache {
 
+namespace net = boost::asio;
+using boost::redis::config;
+using boost::redis::connection;
+using boost::redis::request;
+using boost::redis::response;
+
 class RedisDriver : public CacheDriver {
   public:
-    RedisDriver(Partition& _partition_info) : partition_info(_partition_info),
-                                              free_space(_partition_info.size), 
-                                             outstanding_write_size(0) 
+    RedisDriver(net::io_context& io_context, Partition& _partition_info) : partition_info(_partition_info),
+                                                                          free_space(_partition_info.size), 
+                                                                          outstanding_write_size(0)
     {
+      conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
       add_partition_info(_partition_info);
     }
     virtual ~RedisDriver()
@@ -35,72 +43,47 @@ class RedisDriver : public CacheDriver {
     virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
     virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) override;
     virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
+    virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
+    virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) override;
     virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override;
-    virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
     virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
+    virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
     virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
     virtual int delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs, optional_yield y) override;
-    virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) override;
     virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) override;
+    virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) override;
 
-    virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
     virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
+    struct redis_response {
+      boost::redis::response<std::string> resp;
+    };
+    void shutdown();
 
-    struct libaio_handler { // should this be the same as SSDDriver? -Sam
+    struct redis_aio_handler { 
       rgw::Aio* throttle = nullptr;
       rgw::AioResult& r;
+      std::shared_ptr<redis_response> s;
 
-      // read callback
-      void operator()(boost::system::error_code ec, bufferlist bl) const {
+      /* Read Callback */
+      void operator()(boost::system::error_code ec, long unsigned int size) const {
        r.result = -ec.value();
-       r.data = std::move(bl);
+       r.data.append(std::get<0>(s->resp).value().c_str());
        throttle->put(r);
       }
     };
-    template <typename ExecutionContext, typename CompletionToken>
-    auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
-                  off_t read_ofs, off_t read_len, CompletionToken&& token);
 
   protected:
-    cpp_redis::client client;
-    rgw::d4n::Address addr;
+    std::shared_ptr<connection> conn;
+
     static std::unordered_map<std::string, Partition> partitions;
     Partition partition_info;
     uint64_t free_space;
     uint64_t outstanding_write_size;
     CephContext* cct;
 
-    int find_client(const DoutPrefixProvider* dpp);
     int add_partition_info(Partition& info);
     int remove_partition_info(Partition& info);
-
-  private:
-    // unique_ptr with custom deleter for struct aiocb
-    struct libaio_aiocb_deleter {
-      void operator()(struct aiocb* c) {
-        if(c->aio_fildes > 0) {
-          if( ::close(c->aio_fildes) != 0) {
-          }
-        }
-        delete c;
-      }
-    };
-
-    using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
-
-    struct AsyncReadOp {
-      bufferlist result;
-      unique_aio_cb_ptr aio_cb;
-      using Signature = void(boost::system::error_code, bufferlist);
-      using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
-
-      int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
-      static void libaio_cb_aio_dispatch(sigval sigval);
-
-      template <typename Executor1, typename CompletionHandler>
-      static auto create(const Executor1& ex1, CompletionHandler&& handler);
-    };
 };
 
 } } // namespace rgw::cache
index 0f6e1745d713ca4ba6148315cac7cf2727ec2dac..574e467faac7e1f2e18f0b30d1502f1c7048a63f 100644 (file)
@@ -63,7 +63,7 @@ extern rgw::sal::Driver* newPOSIXDriver(rgw::sal::Driver* next);
 #endif
 extern rgw::sal::Driver* newBaseFilter(rgw::sal::Driver* next);
 #ifdef WITH_RADOSGW_D4N
-extern rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next);
+extern rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next, boost::asio::io_context* io_context);
 #endif
 }
 
@@ -239,7 +239,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
 #ifdef WITH_RADOSGW_D4N 
   else if (cfg.filter_name.compare("d4n") == 0) {
     rgw::sal::Driver* next = driver;
-    driver = newD4NFilter(next);
+    driver = newD4NFilter(next, &io_context);
 
     if (driver->initialize(cct, dpp) < 0) {
       delete driver;
index 9472eec3ecc40edc3b36715b414d756e7b9322e4..962b5834e146d242c55b85049a7c55187ecc878a 100644 (file)
@@ -14,6 +14,7 @@ public:
   virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
   virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) override;
   virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
+  virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override { return -1; } // TODO: implement
   virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
   virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
   virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) override;