]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/cache: This commit adds implementation of Partition in
authorSamarah Uriarte <samarah.uriarte@ibm.com>
Thu, 29 Jun 2023 13:51:02 +0000 (09:51 -0400)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:50 +0000 (21:24 +0530)
ssd driver and squashes the following related commits.

RGW: Minor RedisDriver cleanup
RGW: Add partitioning
RGW: Cleanup RedisDriver

Signed-off-by: Samarah Uriarte <samarah.uriarte@ibm.com>
build/boost_redis [new submodule]
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_redis_driver.cc
src/rgw/rgw_redis_driver.h

diff --git a/build/boost_redis b/build/boost_redis
new file mode 160000 (submodule)
index 0000000..69d1242
--- /dev/null
@@ -0,0 +1 @@
+Subproject commit 69d12421e29ae36750148acee1039c261e763d19
index 441c4bc149282cd129579a5867d31d23b328ba33..9bd812a9245a3d291de4eaba88dc3c53cfac5522 100644 (file)
@@ -24,7 +24,7 @@ int ObjectDirectory::find_client(cpp_redis::client* client) {
 }
 
 std::string ObjectDirectory::build_index(CacheObj* object) {
-  return "rgw-object:" + object->objName + ":object-directory";
+  return object->bucketName + "_" + object->objName;
 }
 
 int ObjectDirectory::exist_key(std::string key) {
@@ -204,7 +204,7 @@ int BlockDirectory::find_client(cpp_redis::client* client) {
 }
 
 std::string BlockDirectory::build_index(CacheBlock* block) {
-  return "rgw-object:" + block->cacheObj.objName + ":block-directory";
+  return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + boost::lexical_cast<std::string>(block->blockId);
 }
 
 int BlockDirectory::exist_key(std::string key) {
index fd2690db14137e7aff6b0dbf455665155afc29f6..13744ae42cb9b7e18381498ab19a07dedfe6e951 100644 (file)
@@ -5,6 +5,7 @@
 #include <cpp_redis/cpp_redis>
 #include <string>
 #include <iostream>
+#include <boost/lexical_cast.hpp>
 
 namespace rgw { namespace d4n {
 
@@ -16,16 +17,17 @@ struct Address {
 struct CacheObj {
   std::string objName; /* S3 object name */
   std::string bucketName; /* S3 bucket name */
-  time_t creationTime; // Creation time of the S3 Object
+  time_t creationTime; /* Creation time of the S3 Object */
   bool dirty;
-  std::vector<std::string> hostsList; /* Currently not supported: list of hostnames <ip:port> of object locations for multiple backends */
+  std::vector<std::string> hostsList; /* List of hostnames <ip:port> of object locations for multiple backends */
 };
 
 struct CacheBlock {
   CacheObj cacheObj;
+  uint64_t blockId; /* RADOS object block ID */
   uint64_t size; /* Block size in bytes */
   int globalWeight = 0;
-  std::vector<std::string> hostsList; /* Currently not supported: list of hostnames <ip:port> of block locations */
+  std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
 };
 
 class Directory {
@@ -34,7 +36,7 @@ class Directory {
     CephContext* cct;
 };
 
-class ObjectDirectory: public Directory { // where else should object directory be called? -Sam
+class ObjectDirectory: public Directory { // weave into write workflow -Sam
   public:
     ObjectDirectory() {}
     ObjectDirectory(std::string host, int port) {
index 6150223d1e893723a7141b0b414f63d2c1ac06ad..352e0b76deae013ae39568db825db1ad0c529528 100644 (file)
@@ -39,7 +39,10 @@ class D4NFilterDriver : public FilterDriver {
   public:
     D4NFilterDriver(Driver* _next) : FilterDriver(_next) 
     {
-      cacheDriver = new rgw::cache::RedisDriver(); // change later -Sam
+      rgw::cache::Partition partition_info;
+      partition_info.location = "RedisCache"; // figure out how to fill rest of partition information -Sam
+
+      cacheDriver = new rgw::cache::RedisDriver(partition_info); // change later -Sam
       objDir = new rgw::d4n::ObjectDirectory();
       blockDir = new rgw::d4n::BlockDirectory();
       cacheBlock = new rgw::d4n::CacheBlock();
index 0d5d0f23acc56e1ac7bae7811ccd48a2c56e8446..fd521092b2c3653e1fad30f3e59c97b42effe572 100644 (file)
@@ -8,20 +8,7 @@
 
 namespace rgw { namespace cache {
 
-/* Base metadata and data fields should remain consistent */
-std::vector<std::string> baseFields {
-  "mtime",
-  "object_size",
-  "accounted_size",
-  "epoch",
-  "version_id",
-  "source_zone_short_id",
-  "bucket_count",
-  "bucket_size",
-  "user_quota.max_size",
-  "user_quota.max_objects",
-  "max_buckets",
-  "data"};
+std::unordered_map<std::string, Partition> RedisDriver::partitions;
 
 std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary) 
 {
@@ -44,7 +31,7 @@ int RedisDriver::find_client(const DoutPrefixProvider* dpp)
     return 0;
 
   if (addr.host == "" || addr.port == 0) { 
-    dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
+    ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
     return EDESTADDRREQ;
   }
 
@@ -62,22 +49,136 @@ int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, of
   return ret.second;
 }
 
+std::optional<Entry> RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) 
+{
+  auto iter = entries.find(key);
+
+  if (iter != entries.end()) {
+    return iter->second;
+  }
+
+  return std::nullopt;
+}
+
 int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) 
 {
   return entries.erase(key);
 }
 
-std::optional<Entry> RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) 
+int RedisDriver::add_partition_info(Partition& info)
 {
-  auto iter = entries.find(key);
+  std::string key = info.name + info.type;
+  auto ret = partitions.emplace(key, info);
 
-  if (iter != entries.end()) {
-    return iter->second;
+  return ret.second;
+}
+
+int RedisDriver::remove_partition_info(Partition& info)
+{
+  std::string key = info.name + info.type;
+  return partitions.erase(key);
+}
+
+bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) 
+{
+  int result;
+  std::string entry = partition_info.location + key;
+  std::vector<std::string> keys;
+  keys.push_back(entry);
+
+  if (!client.is_connected()) 
+    find_client(dpp);
+
+  try {
+    client.exists(keys, [&result](cpp_redis::reply &reply) {
+      if (reply.is_integer()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {}
+
+  return result;
+}
+
+std::vector<Entry> RedisDriver::list_entries(const DoutPrefixProvider* dpp) 
+{
+  std::vector<Entry> result;
+
+  for (auto it = entries.begin(); it != entries.end(); ++it) 
+    result.push_back(it->second);
+
+  return result;
+}
+
+size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) 
+{
+  return entries.size();
+}
+
+/*
+uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) 
+{
+  int result = -1;
+
+  if (!client.is_connected()) 
+    find_client(dpp);
+
+  try {
+    client.info([&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        int usedMem = -1;
+       int maxMem = -1;
+
+        std::istringstream iss(reply.as_string());
+       std::string line;    
+        while (std::getline(iss, line)) {
+         size_t pos = line.find_first_of(":");
+         if (pos != std::string::npos) {
+           if (line.substr(0, pos) == "used_memory") {
+             usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
+           } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") {
+             maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
+           } 
+         }
+        }
+
+       if (usedMem > -1 && maxMem > -1)
+         result = maxMem - usedMem;
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
   }
 
+  return result;
+}
+*/
+
+std::optional<Partition> RedisDriver::get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type)
+{
+  std::string key = name + type;
+
+  auto iter = partitions.find(key);
+  if (iter != partitions.end())
+    return iter->second;
+
   return std::nullopt;
 }
 
+std::vector<Partition> RedisDriver::list_partitions(const DoutPrefixProvider* dpp)
+{
+  std::vector<Partition> partitions_v;
+
+  for (auto& it : partitions)
+    partitions_v.emplace_back(it.second);
+
+  return partitions_v;
+}
+
 /* Currently an attribute but will also be part of the Entry metadata once consistency is guaranteed -Sam
 int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight) 
 {
@@ -95,55 +196,43 @@ int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string
 int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) 
 {
   this->cct = cct;
+
   addr.host = cct->_conf->rgw_d4n_host; // change later -Sam
   addr.port = cct->_conf->rgw_d4n_port;
 
+  if (partition_info.location.back() != '/') {
+    partition_info.location += "/";
+  }
+
   if (addr.host == "" || addr.port == 0) {
-    dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
+    ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
     return EDESTADDRREQ;
   }
 
   client.connect("127.0.0.1", 6379, nullptr);
 
-  if (!client.is_connected()) 
+  if (!client.is_connected()) {
+    ldpp_dout(dpp, 10) << "RGW Redis Cache: Could not connect to redis cache endpoint." << dendl;
     return ECONNREFUSED;
+  }
 
   return 0;
 }
 
 int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) 
 {
-  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string entry = partition_info.location + key;
 
   if (!client.is_connected()) 
-    return ECONNREFUSED;
-
-  /* Every set will be treated as new */
-  try {
-    /* Set data field */
-    int result; 
-
-    client.hset(entryName, "data", bl.to_str(), [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_integer();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-
-    if (result <= 0) {
-      return -1;
-    }
-  } catch(std::exception &e) {
-    return -1;
-  }
+    find_client(dpp);
 
+  /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
   try {
-    /* Set attribute fields */
     std::string result; 
-    std::vector< std::pair<std::string, std::string> > redisAttrs = build_attrs(&attrs);
+    auto redisAttrs = build_attrs(&attrs);
+    redisAttrs.push_back({"data", bl.to_str()});
 
-    client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) {
+    client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
       if (!reply.is_null()) {
        result = reply.as_string();
       }
@@ -158,37 +247,32 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff
     return -1;
   }
 
-  return 0;
+  return insert_entry(dpp, key, 0, len); // why is offset necessarily 0? -Sam
 }
 
 int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) 
 {
-  std::string result;
-  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string entry = partition_info.location + key;
   
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
     
   if (key_exists(dpp, key)) {
-    rgw::sal::Attrs::iterator it;
-    std::vector< std::pair<std::string, std::string> > redisAttrs;
-    std::vector<std::string> getFields;
-
     /* Retrieve existing values from cache */
     try {
-      client.hgetall(entryName, [&bl, &attrs](cpp_redis::reply &reply) {
+      client.hgetall(entry, [&bl, &attrs](cpp_redis::reply &reply) {
        if (reply.is_array()) {
          auto arr = reply.as_array();
     
          if (!arr[0].is_null()) {
            for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
-             if (arr[i].as_string() == "data")
+             if (arr[i].as_string() == "data") {
                 bl.append(arr[i + 1].as_string());
-             else {
-               buffer::list temp;
-               temp.append(arr[i + 1].as_string());
-                attrs.insert({arr[i].as_string(), temp});
-               temp.clear();
+             else {
+               buffer::list bl_value;
+               bl_value.append(arr[i + 1].as_string());
+                attrs.insert({arr[i].as_string(), bl_value});
+               bl_value.clear();
              }
             }
          }
@@ -200,30 +284,24 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_
       return -1;
     }
   } else {
-    dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl;
+    ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl;
     return -2;
   }
 
   return 0;
 }
 
-rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) 
-{
-  return {};
-}
-
 int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) 
 {
-  std::string result;
-  std::string value = "";
-  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string value;
+  std::string entry = partition_info.location + key;
 
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
 
   if (key_exists(dpp, key)) {
     try {
-      client.hget(entryName, "data", [&value](cpp_redis::reply &reply) {
+      client.hget(entry, "data", [&value](cpp_redis::reply &reply) {
         if (!reply.is_null()) {
           value = reply.as_string();
         }
@@ -231,17 +309,18 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string&
 
       client.sync_commit(std::chrono::milliseconds(1000));
     } catch(std::exception &e) {
-      return -2;
+      return -1;
     }
   }
 
-  try {
+  try { // do we want key check here? -Sam
     /* Append to existing value or set as new value */
-    std::string temp = value + bl_data.to_str();
+    std::string newVal = value + bl_data.to_str();
     std::vector< std::pair<std::string, std::string> > field;
-    field.push_back({"data", temp});
+    field.push_back({"data", newVal});
+    std::string result;
 
-    client.hmset(entryName, field, [&result](cpp_redis::reply &reply) {
+    client.hmset(entry, field, [&result](cpp_redis::reply &reply) {
       if (!reply.is_null()) {
         result = reply.as_string();
       }
@@ -253,7 +332,7 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string&
       return -1;
     }
   } catch(std::exception &e) {
-    return -2;
+    return -1;
   }
 
   return 0;
@@ -261,56 +340,78 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string&
 
 int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) 
 {
-  int result = 0;
-  std::string entryName = "rgw-object:" + key + ":cache";
-  std::vector<std::string> deleteField;
-  deleteField.push_back("data");
+  std::string entry = partition_info.location + key;
 
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
 
   if (key_exists(dpp, key)) {
+    int exists = -2;
+
     try {
-    client.hdel(entryName, deleteField, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer();
-      }
-    });
+      client.hexists(entry, "data", [&exists](cpp_redis::reply &reply) {
+       if (!reply.is_null()) {
+         exists = reply.as_integer();
+       }
+      });
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+      client.sync_commit(std::chrono::milliseconds(1000));
     } catch(std::exception &e) {
-    return -2;
+      return -1;
+    }
+
+    if (exists) {
+      try {
+       int result;
+       std::vector<std::string> deleteField;
+       deleteField.push_back("data");
+
+       client.hdel(entry, deleteField, [&result](cpp_redis::reply &reply) {
+         if (reply.is_integer()) {
+           result = reply.as_integer(); 
+         }
+       });
+
+       client.sync_commit(std::chrono::milliseconds(1000));
+
+       if (!result) {
+         return -1;
+       } else {
+         return remove_entry(dpp, key);
+       }
+      } catch(std::exception &e) {
+       return -1;
+      }
+    } else {
+      return 0; /* No delete was necessary */
     }
   } else {
     return 0; /* No delete was necessary */
   }
-
-  return result - 1;
 }
 
 int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) 
 {
-  std::string result;
-  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string entry = partition_info.location + key;
 
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
 
   if (key_exists(dpp, key)) {
-    rgw::sal::Attrs::iterator it;
-    std::vector< std::pair<std::string, std::string> > redisAttrs;
-    std::vector<std::string> getFields;
-
-    /* Retrieve existing values from cache */
     try {
-      client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) {
+      client.hgetall(entry, [&attrs](cpp_redis::reply &reply) {
        if (reply.is_array()) { 
          auto arr = reply.as_array();
     
          if (!arr[0].is_null()) {
-           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
-             getFields.push_back(arr[i].as_string());
-           }
+           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
+             if (arr[i].as_string() != "data") {
+               buffer::list bl_value;
+               bl_value.append(arr[i + 1].as_string());
+                attrs.insert({arr[i].as_string(), bl_value});
+               bl_value.clear();
+             }
+            }
          }
        }
       });
@@ -319,47 +420,9 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key
     } catch(std::exception &e) {
       return -1;
     }
-
-    /* Ensure all metadata, attributes, and data has been set */
-    for (const auto& field : baseFields) {
-      auto it = std::find_if(getFields.begin(), getFields.end(),
-       [&](const auto& comp) { return comp == field; });
-
-      if (it == getFields.end()) {
-       return -1;
-      }
-    }
-
-    getFields.erase(std::find(getFields.begin(), getFields.end(), "data")); /* Do not query for data field */
-    int exists = -1;
-    /* Get attributes from cache */
-    try {
-      client.hmget(entryName, getFields, [&exists, &attrs, &getFields](cpp_redis::reply &reply) {
-       if (reply.is_array()) {
-         auto arr = reply.as_array();
-
-         if (!arr[0].is_null()) {
-           exists = 0;
-
-           for (long unsigned int i = 0; i < getFields.size(); ++i) {
-             std::string tmp = arr[i].as_string();
-             buffer::list bl;
-             bl.append(tmp);
-             attrs.insert({getFields[i], bl});
-           }
-         }
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
-      exit(-1);
-    }
-
-    if (exists < 0) {
-      dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl;
-      return -2;
-    }
+  } else {
+    ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl;
+    return -2;
   }
 
   return 0;
@@ -367,34 +430,37 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key
 
 int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) 
 {
-  /* Creating the index based on oid */
-  std::string entryName = "rgw-object:" + key + ":cache";
-  std::string result;
+  if (attrs.empty())
+    return -1;
+      
+  std::string entry = partition_info.location + key;
 
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
 
-  /* Every set will be treated as new */
-  try {
-    std::vector< std::pair<std::string, std::string> > redisAttrs = build_attrs(&attrs);
-      
-    if (redisAttrs.empty()) {
-      return -1;
-    } 
-      
-    client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
+  if (key_exists(dpp, key)) {
+    /* Every attr set will be treated as new */
+    try {
+      std::string result;
+      auto redisAttrs = build_attrs(&attrs);
+       
+      client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
+       if (!reply.is_null()) {
+         result = reply.as_string();
+       }
+      });
 
-    client.sync_commit(std::chrono::milliseconds(1000));
+      client.sync_commit(std::chrono::milliseconds(1000));
 
-    if (result != "OK") {
+      if (result != "OK") {
+       return -1;
+      }
+    } catch(std::exception &e) {
       return -1;
     }
-  } catch(std::exception &e) {
-    return -1;
+  } else {
+    ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl;
+    return -2;
   }
 
   return 0;
@@ -402,20 +468,17 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key
 
 int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) 
 {
-  std::string result;
-  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string entry = partition_info.location + key;
 
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
 
   if (key_exists(dpp, key)) {
     try {
-      std::vector< std::pair<std::string, std::string> > redisAttrs;
-      for (const auto& it : attrs) {
-        redisAttrs.push_back({it.first, it.second.to_str()});
-      }
+      std::string result;
+      auto redisAttrs = build_attrs(&attrs);
 
-      client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) {
+      client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
         if (!reply.is_null()) {
           result = reply.as_string();
         }
@@ -427,9 +490,10 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string&
         return -1;
       }
     } catch(std::exception &e) {
-      return -2;
+      return -1;
     }
   } else {
+    ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl;
     return -2;
   }
 
@@ -438,18 +502,16 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string&
 
 int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) 
 {
-  int result = 0;
-  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string entry = partition_info.location + key;
 
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
 
   if (key_exists(dpp, key)) {
     std::vector<std::string> getFields;
 
-    /* Retrieve existing values from cache */
     try {
-      client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) {
+      client.hgetall(entry, [&getFields](cpp_redis::reply &reply) {
        if (reply.is_array()) {
          auto arr = reply.as_array();
     
@@ -466,11 +528,11 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string&
       return -1;
     }
 
-    std::vector< std::pair<std::string, std::string> > redisAttrs = build_attrs(&del_attrs);
+    auto redisAttrs = build_attrs(&del_attrs);
     std::vector<std::string> redisFields;
 
     std::transform(begin(redisAttrs), end(redisAttrs), std::back_inserter(redisFields),
-                          [](auto const& pair) { return pair.first; });
+      [](auto const& pair) { return pair.first; });
 
     /* Only delete attributes that have been stored */
     for (const auto& it : redisFields) {
@@ -480,7 +542,9 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string&
     }
 
     try {
-      client.hdel(entryName, redisFields, [&result](cpp_redis::reply &reply) {
+      int result = 0;
+
+      client.hdel(entry, redisFields, [&result](cpp_redis::reply &reply) {
         if (reply.is_integer()) {
           result = reply.as_integer();
         }
@@ -494,26 +558,25 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string&
     }
   }
 
-  dout(20) << "RGW Redis Cache: Object is not in cache." << dendl;
+  ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl;
   return -2;
 }
 
 std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) 
 {
-  int exists = -2;
-  std::string result;
-  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string entry = partition_info.location + key;
   std::string attrValue;
 
   if (!client.is_connected()) 
-    return {};
+    find_client(dpp);
 
   if (key_exists(dpp, key)) {
+    int exists = -2;
     std::string getValue;
 
     /* Ensure field was set */
     try {
-      client.hexists(entryName, attr_name, [&exists](cpp_redis::reply& reply) {
+      client.hexists(entry, attr_name, [&exists](cpp_redis::reply& reply) {
        if (!reply.is_null()) {
          exists = reply.as_integer();
        }
@@ -525,15 +588,14 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri
     }
     
     if (!exists) {
-      dout(20) << "RGW Redis Cache: Attribute was not set." << dendl;
+      ldpp_dout(dpp, 20) << "RGW Redis Cache: Attribute was not set." << dendl;
       return {};
     }
 
     /* Retrieve existing value from cache */
     try {
-      client.hget(entryName, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) {
+      client.hget(entry, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) {
        if (!reply.is_null()) {
-         exists = 0;
          attrValue = reply.as_string();
        }
       });
@@ -542,11 +604,9 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri
     } catch(std::exception &e) {
       return {};
     }
-
-    if (exists < 0) {
-      dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl;
-      return {};
-    }
+  } else {
+    ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl;
+    return {};
   }
 
   return attrValue;
@@ -554,27 +614,31 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri
 
 int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) 
 {
-  /* Creating the index based on key */
-  std::string entryName = "rgw-object:" + key + ":cache";
-  int result = -1;
+  std::string entry = partition_info.location + key;
+  int result = 0;
     
   if (!client.is_connected()) 
-    return ECONNREFUSED;
+    find_client(dpp);
     
-  /* Every set will be treated as new */
-  try {
-    client.hset(entryName, attr_name, attrVal, [&result](cpp_redis::reply& reply) {
-      if (!reply.is_null()) {
-        result = reply.as_integer();
-      }
-    });
+  if (key_exists(dpp, key)) {
+    /* Every attr set will be treated as new */
+    try {
+      client.hset(entry, attr_name, attrVal, [&result](cpp_redis::reply& reply) {
+       if (!reply.is_null()) {
+         result = reply.as_integer();
+       }
+      });
 
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {
-    return -1;
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return -1;
+    }
+  } else {
+    ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl;
+    return -2; 
   }
 
-  return result;
+  return result - 1;
 }
 
 std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) 
@@ -582,166 +646,90 @@ std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const Do
   return std::make_unique<RedisCacheAioRequest>(this);  
 }
 
-bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key
+rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id
 {
-  int result = -1;
-  std::string entryName = "rgw-object:" + key + ":cache";
-  std::vector<std::string> keys;
-  keys.push_back(entryName);
-
-  if (!client.is_connected()) 
-    find_client(dpp);
+  rgw_raw_obj r_obj;
+  r_obj.oid = key;
 
-  try {
-    client.exists(keys, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {}
-
-  return result;
+  return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
 }
 
-std::vector<Entry> RedisDriver::list_entries(const DoutPrefixProvider* dpp) 
+int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
 {
-  std::vector<Entry> result;
-
-  for (auto it = entries.begin(); it != entries.end(); ++it) { 
-    result.push_back(it->second);
+  ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl;
+  aio_cb.reset(new struct aiocb);
+  memset(aio_cb.get(), 0, sizeof(struct aiocb));
+  aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY));
+
+  if (aio_cb->aio_fildes < 0) {
+      int err = errno;
+      ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl;
+      return -err;
   }
 
-  return result;
-}
-
-size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) 
-{
-  return entries.size();
-}
-
-Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) 
-{
-  Partition part;
-  return part; // Implement -Sam
-}
-
-uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) 
-{
-  int result = -1;
-
-  if (!client.is_connected()) 
-    find_client(dpp);
-
-  try {
-    client.info([&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        int usedMem = -1;
-       int maxMem = -1;
-
-        std::istringstream iss(reply.as_string());
-       std::string line;    
-        while (std::getline(iss, line)) {
-         size_t pos = line.find_first_of(":");
-         if (pos != std::string::npos) {
-           if (line.substr(0, pos) == "used_memory") {
-             usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
-           } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") {
-             maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
-           } 
-         }
-        }
-
-       if (usedMem > -1 && maxMem > -1)
-         result = maxMem - usedMem;
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {
-    return -1;
+  if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) {
+      posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
   }
 
-  return result;
-}
-
-int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
-{
-    ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl;
-    aio_cb.reset(new struct aiocb);
-    memset(aio_cb.get(), 0, sizeof(struct aiocb));
-    aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY));
-
-    if (aio_cb->aio_fildes < 0) {
-        int err = errno;
-        ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl;
-        return -err;
-    }
-
-    if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) {
-        posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
-    }
-
-    bufferptr bp(read_len);
-    aio_cb->aio_buf = bp.c_str();
-    result.append(std::move(bp));
+  bufferptr bp(read_len);
+  aio_cb->aio_buf = bp.c_str();
+  result.append(std::move(bp));
 
-    aio_cb->aio_nbytes = read_len;
-    aio_cb->aio_offset = read_ofs;
-    aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
-    aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch;
-    aio_cb->aio_sigevent.sigev_notify_attributes = nullptr;
-    aio_cb->aio_sigevent.sigev_value.sival_ptr = arg;
+  aio_cb->aio_nbytes = read_len;
+  aio_cb->aio_offset = read_ofs;
+  aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+  aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch;
+  aio_cb->aio_sigevent.sigev_notify_attributes = nullptr;
+  aio_cb->aio_sigevent.sigev_value.sival_ptr = arg;
 
-    return 0;
+  return 0;
 }
 
 void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval)
 {
-    auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
-    auto op = std::move(p->user_data);
-    const int ret = -aio_error(op.aio_cb.get());
-    boost::system::error_code ec;
-    if (ret < 0) {
-        ec.assign(-ret, boost::system::system_category());
-    }
+  auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
+  auto op = std::move(p->user_data);
+  const int ret = -aio_error(op.aio_cb.get());
+  boost::system::error_code ec;
+  if (ret < 0) {
+      ec.assign(-ret, boost::system::system_category());
+  }
 
-    ceph::async::dispatch(std::move(p), ec, std::move(op.result));
+  ceph::async::dispatch(std::move(p), ec, std::move(op.result));
 }
 
 template <typename Executor1, typename CompletionHandler>
 auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler)
 {
-    auto p = Completion::create(ex1, std::move(handler));
-    return p;
+  auto p = Completion::create(ex1, std::move(handler));
+  return p;
 }
 
 template <typename ExecutionContext, typename CompletionToken>
 auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
                 off_t read_ofs, off_t read_len, CompletionToken&& token)
 {
-    std::string location = "";//partition_info.location + key;
-    ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl;
-
-    using Op = AsyncReadOp;
-    using Signature = typename Op::Signature;
-    boost::asio::async_completion<CompletionToken, Signature> init(token);
-    auto p = Op::create(ctx.get_executor(), init.completion_handler);
-    auto& op = p->user_data;
-
-    int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
-    if(0 == ret) {
-        ret = ::aio_read(op.aio_cb.get());
-    }
-  //  ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
  /* if(ret < 0) {
-        auto ec = boost::system::error_code{-ret, boost::system::system_category()};
-        ceph::async::post(std::move(p), ec, bufferlist{});
-    } else {
-        (void)p.release();
-    }*/
-    //return init.result.get();
+  std::string location = partition_info.location + key;
+  ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl;
+
+  using Op = AsyncReadOp;
+  using Signature = typename Op::Signature;
+  boost::asio::async_completion<CompletionToken, Signature> init(token);
+  auto p = Op::create(ctx.get_executor(), init.completion_handler);
+  auto& op = p->user_data;
+
+  int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
+  if (0 == ret) {
+    ret = ::aio_read(op.aio_cb.get());
+  }
+//  ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
+ /* if(ret < 0) {
+      auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+      ceph::async::post(std::move(p), ec, bufferlist{});
+  } else {
+      (void)p.release();
+  }*/
+  //return init.result.get();
 }
 
 void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
index c392b5487b83f9b16bda296367d3d1b219e5f5d8..b3835c9362ef0705609c6f5a9b798d34fc97a4d1 100644 (file)
@@ -4,11 +4,10 @@
 //#include <aedis.hpp>
 #include <aio.h>
 #include "common/async/completion.h"
-#include <string>
-#include <iostream>
-#include <cpp_redis/cpp_redis>
 #include "rgw_common.h"
 #include "rgw_cache_driver.h"
+
+#include <cpp_redis/cpp_redis>
 #include "driver/d4n/d4n_directory.h"
 
 namespace rgw { namespace cache {
@@ -27,12 +26,32 @@ class RedisCacheAioRequest: public CacheAioRequest {
 
 class RedisDriver : public CacheDriver {
   public:
-    RedisDriver() : CacheDriver() {}
+    RedisDriver(Partition& _partition_info) : partition_info(_partition_info),
+                                              free_space(_partition_info.size), 
+                                             outstanding_write_size(0) 
+    {
+      add_partition_info(_partition_info);
+    }
+    virtual ~RedisDriver()
+    {
+      remove_partition_info(partition_info);
+    }
+
+    /* Entry */
+    virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override;
+    virtual std::vector<Entry> list_entries(const DoutPrefixProvider* dpp) override;
+    virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override;
+    //int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam
+
+    /* Partition */
+    virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; }
+    virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } // how to get this from redis server? -Sam
+    static std::optional<Partition> get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type);
+    static std::vector<Partition> list_partitions(const DoutPrefixProvider* dpp);
 
     virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
     virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
     virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override;
-    virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
     virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) override;
     virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override;
     virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
@@ -42,18 +61,9 @@ class RedisDriver : public CacheDriver {
     virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) override;
     virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val) override;
 
-    /* Entry */
-    virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override;
-    virtual std::vector<Entry> list_entries(const DoutPrefixProvider* dpp) override;
-    virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override;
-    int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam
-
-    /* Partition */
-    virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override;
-    virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override;
-
     virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
-  
+    virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
+
     struct libaio_handler { // should this be the same as SSDDriver? -Sam
       rgw::Aio* throttle = nullptr;
       rgw::AioResult& r;
@@ -69,17 +79,24 @@ class RedisDriver : public CacheDriver {
     auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
                   off_t read_ofs, off_t read_len, CompletionToken&& token);
 
-  private:
+  protected:
     cpp_redis::client client;
     rgw::d4n::Address addr;
+    static std::unordered_map<std::string, Partition> partitions;
     std::unordered_map<std::string, Entry> entries;
+    Partition partition_info;
+    uint64_t free_space;
+    uint64_t outstanding_write_size;
     CephContext* cct;
 
     int find_client(const DoutPrefixProvider* dpp);
     int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len);
-    int remove_entry(const DoutPrefixProvider* dpp, std::string key);
     std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
+    int remove_entry(const DoutPrefixProvider* dpp, std::string key);
+    int add_partition_info(Partition& info);
+    int remove_partition_info(Partition& info);
 
+  private:
     // unique_ptr with custom deleter for struct aiocb
     struct libaio_aiocb_deleter {
       void operator()(struct aiocb* c) {