]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/cache: This commit squashes the following commits for redis driver.
authorSamarah <samarah.uriarte@ibm.com>
Fri, 22 Dec 2023 11:10:13 +0000 (16:40 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:50 +0000 (21:24 +0530)
RGW: fix key_exists method for RedisDriver and clean up rgw_sal_d4n.cc
RGW: Implement RedisDriver::get_free_space
rgw/cache: modifying namespace from rgw::cal to rgw::cache.
RGW: Update D4N files to match CacheDriver changes
RGW: Fix D4N read workflow crashes
RGW: Update RedisDriver to match new CacheDriver structure; define set_attrs method
RGW: Switch out D4N cache methods with Redis driver methods
RGW: Update Cache Driver structure
RGW: Update cache files.
RGW: create redis cache driver files

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_redis_driver.cc [new file with mode: 0644]
src/rgw/rgw_redis_driver.h [new file with mode: 0644]

index 1bf433cb395108b3fbdd5dd73fbce412d1eea3e1..3738c80d92976d9f2849bb6ff6ee2f7c2322b761 100644 (file)
@@ -116,6 +116,7 @@ set(librgw_common_srcs
   rgw_role.cc
   rgw_sal.cc
   rgw_sal_filter.cc
+  rgw_redis_driver.cc
   rgw_string.cc
   rgw_tag.cc
   rgw_tag_s3.cc
diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc
new file mode 100644 (file)
index 0000000..371f3e7
--- /dev/null
@@ -0,0 +1,595 @@
+#include <boost/algorithm/string.hpp>
+#include "rgw_redis_driver.h"
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
+
+namespace rgw { namespace cache {
+
+/* Base metadata and data fields should remain consistent */
+std::vector<std::string> baseFields{
+  "mtime",
+  "object_size",
+  "accounted_size",
+  "epoch",
+  "version_id",
+  "source_zone_short_id",
+  "bucket_count",
+  "bucket_size",
+  "user_quota.max_size",
+  "user_quota.max_objects",
+  "max_buckets",
+  "data"};
+
+std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary) {
+  std::vector< std::pair<std::string, std::string> > values;
+  rgw::sal::Attrs::iterator attrs;
+
+  /* Convert to vector */
+  if (binary != NULL) {
+    for (attrs = binary->begin(); attrs != binary->end(); ++attrs) {
+      values.push_back(std::make_pair(attrs->first, attrs->second.to_str()));
+    }
+  }
+
+  return values;
+}
+
+int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) {
+  if (client.is_connected())
+    return 0;
+
+  /*
+  if (addr.host == "" || addr.port == 0) {
+    dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
+    return EDESTADDRREQ;
+  }*/
+
+  client.connect("127.0.0.1", 6379, nullptr);
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  return 0;
+}
+
+bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) {
+  int result = -1;
+  std::string entryName = "rgw-object:" + key + ":cache";
+  std::vector<std::string> keys;
+  keys.push_back(entryName);
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  try {
+    client.exists(keys, [&result](cpp_redis::reply &reply) {
+      if (reply.is_integer()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {}
+
+  return result;
+}
+
+size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) {
+  int result = -1;
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  try {
+    client.keys(":cache", [&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+
+    if (result < 0) {
+      return -1;
+    }
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return result;
+}
+
+Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) {
+  Partition part;
+  return part; // Implement -Sam
+}
+
+uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) {
+  int result = -1;
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  try {
+    client.info([&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        int usedMem = -1;
+       int maxMem = -1;
+
+        std::istringstream iss(reply.as_string());
+       std::string line;    
+        while (std::getline(iss, line)) {
+         if (line.substr(0, line.find(':')) == "used_memory") {
+           usedMem = std::stoi(line);
+         } else if (line.substr(0, line.find(':')) == "maxmemory") {
+           maxMem = std::stoi(line);
+         } 
+        }
+
+       if (usedMem > -1 && maxMem > -1)
+         result = maxMem - usedMem;
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return result;
+}
+
+int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) {
+  std::string entryName = "rgw-object:" + key + ":cache";
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  /* Every set will be treated as new */
+  try {
+    /* Set data field */
+    int result; 
+
+    client.hset(entryName, "data", bl.to_str(), [&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+
+    if (result != 0) {
+      return -1;
+    }
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  try {
+    /* Set attribute fields */
+    std::string result; 
+    std::vector< std::pair<std::string, std::string> > redisAttrs = build_attrs(&attrs);
+
+    client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+       result = reply.as_string();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+
+    if (result != "OK") {
+      return -1;
+    }
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return 0;
+}
+
+int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) {
+  std::string result;
+  std::string entryName = "rgw-object:" + key + ":cache";
+  
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+    
+  if (key_exists(dpp, key)) {
+    rgw::sal::Attrs::iterator it;
+    std::vector< std::pair<std::string, std::string> > redisAttrs;
+    std::vector<std::string> getFields;
+
+    /* Retrieve existing values from cache */
+    try {
+      client.hgetall(entryName, [&bl, &attrs](cpp_redis::reply &reply) {
+       if (reply.is_array()) {
+         auto arr = reply.as_array();
+    
+         if (!arr[0].is_null()) {
+           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
+             if (arr[i].as_string() == "data")
+                bl.append(arr[i + 1].as_string());
+             else {
+               buffer::list temp;
+               temp.append(arr[i + 1].as_string());
+                attrs.insert({arr[i].as_string(), temp});
+               temp.clear();
+             }
+            }
+         }
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return -1;
+    }
+  } else {
+    dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl;
+    return -2;
+  }
+
+  return 0;
+}
+
+int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) {
+  std::string result;
+  std::string value = "";
+  std::string entryName = "rgw-object:" + key + ":cache";
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  if (key_exists(dpp, key)) {
+    try {
+      client.hget(entryName, "data", [&value](cpp_redis::reply &reply) {
+        if (!reply.is_null()) {
+          value = reply.as_string();
+        }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return -2;
+    }
+  }
+
+  try {
+    /* Append to existing value or set as new value */
+    std::string temp = value + bl_data.to_str();
+    std::vector< std::pair<std::string, std::string> > field;
+    field.push_back({"data", temp});
+
+    client.hmset(entryName, field, [&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        result = reply.as_string();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+
+    if (result != "OK") {
+      return -1;
+    }
+  } catch(std::exception &e) {
+    return -2;
+  }
+
+  return 0;
+}
+
+int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) {
+  int result = 0;
+  std::string entryName = "rgw-object:" + key + ":cache";
+  std::vector<std::string> deleteField;
+  deleteField.push_back("data");
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  if (key_exists(dpp, key)) {
+    try {
+    client.hdel(entryName, deleteField, [&result](cpp_redis::reply &reply) {
+      if (reply.is_integer()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+    return -2;
+    }
+  } else {
+    return 0; /* No delete was necessary */
+  }
+
+  return result - 1;
+}
+
+int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
+  int exists = -2;
+  std::string result;
+  std::string entryName = "rgw-object:" + key + ":cache";
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  if (key_exists(dpp, key)) {
+    rgw::sal::Attrs::iterator it;
+    std::vector< std::pair<std::string, std::string> > redisAttrs;
+    std::vector<std::string> getFields;
+
+    /* Retrieve existing values from cache */
+    try {
+      client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) {
+       if (reply.is_array()) {
+         auto arr = reply.as_array();
+    
+         if (!arr[0].is_null()) {
+           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
+             getFields.push_back(arr[i].as_string());
+           }
+         }
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return -1;
+    }
+
+    /* Ensure all metadata, attributes, and data has been set */
+    for (const auto& field : baseFields) {
+      auto it = std::find_if(getFields.begin(), getFields.end(),
+       [&](const auto& comp) { return comp == field; });
+
+      if (it == getFields.end()) {
+       return -1;
+      }
+    }
+
+    getFields.erase(std::find(getFields.begin(), getFields.end(), "data")); /* Do not query for data field */
+    
+    /* Get attributes from cache */
+    try {
+      client.hmget(entryName, getFields, [&exists, &attrs, &getFields](cpp_redis::reply &reply) {
+       if (reply.is_array()) {
+         auto arr = reply.as_array();
+
+         if (!arr[0].is_null()) {
+           exists = 0;
+
+           for (long unsigned int i = 0; i < getFields.size(); ++i) {
+             std::string tmp = arr[i].as_string();
+             buffer::list bl;
+             bl.append(tmp);
+             attrs.insert({getFields[i], bl});
+           }
+         }
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      exit(-1);
+    }
+
+    if (exists < 0) {
+      dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl;
+      return -2;
+    }
+  }
+
+  return 0;
+}
+
+int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
+  /* Creating the index based on oid */
+  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string result;
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  /* Every set will be treated as new */
+  try {
+    std::vector< std::pair<std::string, std::string> > redisAttrs = build_attrs(&attrs);
+      
+    if (redisAttrs.empty()) {
+      return -1;
+    } 
+      
+    client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        result = reply.as_string();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+
+    if (result != "OK") {
+      return -1;
+    }
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return 0;
+}
+
+int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
+  std::string result;
+  std::string entryName = "rgw-object:" + key + ":cache";
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  if (key_exists(dpp, key)) {
+    try {
+      std::vector< std::pair<std::string, std::string> > redisAttrs;
+      for (const auto& it : attrs) {
+        redisAttrs.push_back({it.first, it.second.to_str()});
+      }
+
+      client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) {
+        if (!reply.is_null()) {
+          result = reply.as_string();
+        }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+
+      if (result != "OK") {
+        return -1;
+      }
+    } catch(std::exception &e) {
+      return -2;
+    }
+  } else {
+    return -2;
+  }
+
+  return 0;
+}
+
+int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) {
+  int result = 0;
+  std::string entryName = "rgw-object:" + key + ":cache";
+
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+
+  if (key_exists(dpp, key)) {
+    std::vector<std::string> getFields;
+
+    /* Retrieve existing values from cache */
+    try {
+      client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) {
+       if (reply.is_array()) {
+         auto arr = reply.as_array();
+    
+         if (!arr[0].is_null()) {
+           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
+             getFields.push_back(arr[i].as_string());
+           }
+         }
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return -1;
+    }
+
+    std::vector< std::pair<std::string, std::string> > redisAttrs = build_attrs(&del_attrs);
+    std::vector<std::string> redisFields;
+
+    std::transform(begin(redisAttrs), end(redisAttrs), std::back_inserter(redisFields),
+                          [](auto const& pair) { return pair.first; });
+
+    /* Only delete attributes that have been stored */
+    for (const auto& it : redisFields) {
+      if (std::find(getFields.begin(), getFields.end(), it) == getFields.end()) {
+        redisFields.erase(std::find(redisFields.begin(), redisFields.end(), it));
+      }
+    }
+
+    try {
+      client.hdel(entryName, redisFields, [&result](cpp_redis::reply &reply) {
+        if (reply.is_integer()) {
+          result = reply.as_integer();
+        }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+
+      return result - 1;
+    } catch(std::exception &e) {
+      return -1;
+    }
+  }
+
+  dout(20) << "RGW Redis Cache: Object is not in cache." << dendl;
+  return -2;
+}
+
+std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) {
+  int exists = -2;
+  std::string result;
+  std::string entryName = "rgw-object:" + key + ":cache";
+  std::string attrValue;
+
+  if (!client.is_connected()) 
+    return {};
+
+  if (key_exists(dpp, key)) {
+    std::string getValue;
+
+    /* Ensure field was set */
+    try {
+      client.hexists(entryName, attr_name, [&exists](cpp_redis::reply& reply) {
+       if (!reply.is_null()) {
+         exists = reply.as_integer();
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return {};
+    }
+    
+    if (!exists) {
+      dout(20) << "RGW Redis Cache: Attribute was not set." << dendl;
+      return {};
+    }
+
+    /* Retrieve existing value from cache */
+    try {
+      client.hget(entryName, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) {
+       if (!reply.is_null()) {
+         exists = 0;
+         attrValue = reply.as_string();
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return {};
+    }
+
+    if (exists < 0) {
+      dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl;
+      return {};
+    }
+  }
+
+  return attrValue;
+}
+
+int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) {
+  /* Creating the index based on key */
+  std::string entryName = "rgw-object:" + key + ":cache";
+  int result = -1;
+    
+  if (!client.is_connected()) 
+    return ECONNREFUSED;
+    
+  /* Every set will be treated as new */
+  try {
+    client.hset(entryName, attr_name, attrVal, [&result](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return result;
+}
+
+} } // namespace rgw::cal
diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h
new file mode 100644 (file)
index 0000000..01c623a
--- /dev/null
@@ -0,0 +1,43 @@
+#ifndef CEPH_REDISDRIVER_H
+#define CEPH_REDISDRIVER_H
+
+#include <string>
+#include <iostream>
+#include <cpp_redis/cpp_redis>
+#include "rgw_common.h"
+#include "rgw_cache_driver.h"
+#include "driver/d4n/d4n_directory.h"
+
+namespace rgw { namespace cache {
+
+class RedisDriver : public CacheDriver {
+  private:
+    cpp_redis::client client;
+
+  public:
+    RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {}
+
+    virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
+    virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
+    virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override;
+    virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) override;
+    virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override;
+    virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
+    virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
+    virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
+    virtual int delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) override;
+    virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) override;
+    virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val) override;
+
+    /* Entry */
+    virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override;
+    virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override;
+
+    /* Partition */
+    virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override;
+    virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override;
+};
+
+} } // namespace rgw::cal
+    
+#endif