From: Samarah Date: Fri, 22 Dec 2023 11:10:13 +0000 (+0530) Subject: rgw/cache: This commit squashes the following commits for redis driver. X-Git-Tag: v20.0.0~2219^2~85 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e0639b137b5709b52b3b9a00c59c3693d4d913d1;p=ceph.git rgw/cache: This commit squashes the following commits for redis driver. RGW: fix key_exists method for RedisDriver and clean up rgw_sal_d4n.cc RGW: Implement RedisDriver::get_free_space rgw/cache: modifying namespace from rgw::cal to rgw::cache. RGW: Update D4N files to match CacheDriver changes RGW: Fix D4N read workflow crashes RGW: Update RedisDriver to match new CacheDriver structure; define set_attrs method RGW: Switch out D4N cache methods with Redis driver methods RGW: Update Cache Driver structure RGW: Update cache files. RGW: create redis cache driver files Signed-off-by: Samarah --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 1bf433cb3951..3738c80d9297 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -116,6 +116,7 @@ set(librgw_common_srcs rgw_role.cc rgw_sal.cc rgw_sal_filter.cc + rgw_redis_driver.cc rgw_string.cc rgw_tag.cc rgw_tag_s3.cc diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc new file mode 100644 index 000000000000..371f3e72196a --- /dev/null +++ b/src/rgw/rgw_redis_driver.cc @@ -0,0 +1,595 @@ +#include +#include "rgw_redis_driver.h" + +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context + +namespace rgw { namespace cache { + +/* Base metadata and data fields should remain consistent */ +std::vector baseFields{ + "mtime", + "object_size", + "accounted_size", + "epoch", + "version_id", + "source_zone_short_id", + "bucket_count", + "bucket_size", + "user_quota.max_size", + "user_quota.max_objects", + "max_buckets", + "data"}; + +std::vector< std::pair > build_attrs(rgw::sal::Attrs* binary) { + std::vector< std::pair > values; + rgw::sal::Attrs::iterator attrs; + + /* Convert to vector */ + if (binary != NULL) { + for (attrs = binary->begin(); attrs != binary->end(); ++attrs) { + values.push_back(std::make_pair(attrs->first, attrs->second.to_str())); + } + } + + return values; +} + +int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) { + if (client.is_connected()) + return 0; + + /* + if (addr.host == "" || addr.port == 0) { + dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; + return EDESTADDRREQ; + }*/ + + client.connect("127.0.0.1", 6379, nullptr); + + if (!client.is_connected()) + return ECONNREFUSED; + + return 0; +} + +bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) { + int result = -1; + std::string entryName = "rgw-object:" + key + ":cache"; + std::vector keys; + keys.push_back(entryName); + + if (!client.is_connected()) + return ECONNREFUSED; + + try { + client.exists(keys, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) {} + + return result; +} + +size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) { + int result = -1; + + if (!client.is_connected()) + return ECONNREFUSED; + + try { + client.keys(":cache", [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result < 0) { + return -1; + } + } catch(std::exception &e) { + return -1; + } + + return result; +} + +Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) { + Partition part; + return part; // Implement -Sam +} + +uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) { + int result = -1; + + if (!client.is_connected()) + return ECONNREFUSED; + + try { + client.info([&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + int usedMem = -1; + int maxMem = -1; + + std::istringstream iss(reply.as_string()); + std::string line; + while (std::getline(iss, line)) { + if (line.substr(0, line.find(':')) == "used_memory") { + usedMem = std::stoi(line); + } else if (line.substr(0, line.find(':')) == "maxmemory") { + maxMem = std::stoi(line); + } + } + + if (usedMem > -1 && maxMem > -1) + result = maxMem - usedMem; + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + return result; +} + +int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) { + std::string entryName = "rgw-object:" + key + ":cache"; + + if (!client.is_connected()) + return ECONNREFUSED; + + /* Every set will be treated as new */ + try { + /* Set data field */ + int result; + + client.hset(entryName, "data", bl.to_str(), [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result != 0) { + return -1; + } + } catch(std::exception &e) { + return -1; + } + + try { + /* Set attribute fields */ + std::string result; + std::vector< std::pair > redisAttrs = build_attrs(&attrs); + + client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result != "OK") { + return -1; + } + } catch(std::exception &e) { + return -1; + } + + return 0; +} + +int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) { + std::string result; + std::string entryName = "rgw-object:" + key + ":cache"; + + if (!client.is_connected()) + return ECONNREFUSED; + + if (key_exists(dpp, key)) { + rgw::sal::Attrs::iterator it; + std::vector< std::pair > redisAttrs; + std::vector getFields; + + /* Retrieve existing values from cache */ + try { + client.hgetall(entryName, [&bl, &attrs](cpp_redis::reply &reply) { + if (reply.is_array()) { + auto arr = reply.as_array(); + + if (!arr[0].is_null()) { + for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { + if (arr[i].as_string() == "data") + bl.append(arr[i + 1].as_string()); + else { + buffer::list temp; + temp.append(arr[i + 1].as_string()); + attrs.insert({arr[i].as_string(), temp}); + temp.clear(); + } + } + } + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + } else { + dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl; + return -2; + } + + return 0; +} + +int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) { + std::string result; + std::string value = ""; + std::string entryName = "rgw-object:" + key + ":cache"; + + if (!client.is_connected()) + return ECONNREFUSED; + + if (key_exists(dpp, key)) { + try { + client.hget(entryName, "data", [&value](cpp_redis::reply &reply) { + if (!reply.is_null()) { + value = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -2; + } + } + + try { + /* Append to existing value or set as new value */ + std::string temp = value + bl_data.to_str(); + std::vector< std::pair > field; + field.push_back({"data", temp}); + + client.hmset(entryName, field, [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result != "OK") { + return -1; + } + } catch(std::exception &e) { + return -2; + } + + return 0; +} + +int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) { + int result = 0; + std::string entryName = "rgw-object:" + key + ":cache"; + std::vector deleteField; + deleteField.push_back("data"); + + if (!client.is_connected()) + return ECONNREFUSED; + + if (key_exists(dpp, key)) { + try { + client.hdel(entryName, deleteField, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -2; + } + } else { + return 0; /* No delete was necessary */ + } + + return result - 1; +} + +int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { + int exists = -2; + std::string result; + std::string entryName = "rgw-object:" + key + ":cache"; + + if (!client.is_connected()) + return ECONNREFUSED; + + if (key_exists(dpp, key)) { + rgw::sal::Attrs::iterator it; + std::vector< std::pair > redisAttrs; + std::vector getFields; + + /* Retrieve existing values from cache */ + try { + client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) { + if (reply.is_array()) { + auto arr = reply.as_array(); + + if (!arr[0].is_null()) { + for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { + getFields.push_back(arr[i].as_string()); + } + } + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + /* Ensure all metadata, attributes, and data has been set */ + for (const auto& field : baseFields) { + auto it = std::find_if(getFields.begin(), getFields.end(), + [&](const auto& comp) { return comp == field; }); + + if (it == getFields.end()) { + return -1; + } + } + + getFields.erase(std::find(getFields.begin(), getFields.end(), "data")); /* Do not query for data field */ + + /* Get attributes from cache */ + try { + client.hmget(entryName, getFields, [&exists, &attrs, &getFields](cpp_redis::reply &reply) { + if (reply.is_array()) { + auto arr = reply.as_array(); + + if (!arr[0].is_null()) { + exists = 0; + + for (long unsigned int i = 0; i < getFields.size(); ++i) { + std::string tmp = arr[i].as_string(); + buffer::list bl; + bl.append(tmp); + attrs.insert({getFields[i], bl}); + } + } + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + exit(-1); + } + + if (exists < 0) { + dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl; + return -2; + } + } + + return 0; +} + +int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { + /* Creating the index based on oid */ + std::string entryName = "rgw-object:" + key + ":cache"; + std::string result; + + if (!client.is_connected()) + return ECONNREFUSED; + + /* Every set will be treated as new */ + try { + std::vector< std::pair > redisAttrs = build_attrs(&attrs); + + if (redisAttrs.empty()) { + return -1; + } + + client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result != "OK") { + return -1; + } + } catch(std::exception &e) { + return -1; + } + + return 0; +} + +int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { + std::string result; + std::string entryName = "rgw-object:" + key + ":cache"; + + if (!client.is_connected()) + return ECONNREFUSED; + + if (key_exists(dpp, key)) { + try { + std::vector< std::pair > redisAttrs; + for (const auto& it : attrs) { + redisAttrs.push_back({it.first, it.second.to_str()}); + } + + client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result != "OK") { + return -1; + } + } catch(std::exception &e) { + return -2; + } + } else { + return -2; + } + + return 0; +} + +int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) { + int result = 0; + std::string entryName = "rgw-object:" + key + ":cache"; + + if (!client.is_connected()) + return ECONNREFUSED; + + if (key_exists(dpp, key)) { + std::vector getFields; + + /* Retrieve existing values from cache */ + try { + client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) { + if (reply.is_array()) { + auto arr = reply.as_array(); + + if (!arr[0].is_null()) { + for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { + getFields.push_back(arr[i].as_string()); + } + } + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + std::vector< std::pair > redisAttrs = build_attrs(&del_attrs); + std::vector redisFields; + + std::transform(begin(redisAttrs), end(redisAttrs), std::back_inserter(redisFields), + [](auto const& pair) { return pair.first; }); + + /* Only delete attributes that have been stored */ + for (const auto& it : redisFields) { + if (std::find(getFields.begin(), getFields.end(), it) == getFields.end()) { + redisFields.erase(std::find(redisFields.begin(), redisFields.end(), it)); + } + } + + try { + client.hdel(entryName, redisFields, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + return result - 1; + } catch(std::exception &e) { + return -1; + } + } + + dout(20) << "RGW Redis Cache: Object is not in cache." << dendl; + return -2; +} + +std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) { + int exists = -2; + std::string result; + std::string entryName = "rgw-object:" + key + ":cache"; + std::string attrValue; + + if (!client.is_connected()) + return {}; + + if (key_exists(dpp, key)) { + std::string getValue; + + /* Ensure field was set */ + try { + client.hexists(entryName, attr_name, [&exists](cpp_redis::reply& reply) { + if (!reply.is_null()) { + exists = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return {}; + } + + if (!exists) { + dout(20) << "RGW Redis Cache: Attribute was not set." << dendl; + return {}; + } + + /* Retrieve existing value from cache */ + try { + client.hget(entryName, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) { + if (!reply.is_null()) { + exists = 0; + attrValue = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return {}; + } + + if (exists < 0) { + dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl; + return {}; + } + } + + return attrValue; +} + +int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) { + /* Creating the index based on key */ + std::string entryName = "rgw-object:" + key + ":cache"; + int result = -1; + + if (!client.is_connected()) + return ECONNREFUSED; + + /* Every set will be treated as new */ + try { + client.hset(entryName, attr_name, attrVal, [&result](cpp_redis::reply& reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + return result; +} + +} } // namespace rgw::cal diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h new file mode 100644 index 000000000000..01c623acb680 --- /dev/null +++ b/src/rgw/rgw_redis_driver.h @@ -0,0 +1,43 @@ +#ifndef CEPH_REDISDRIVER_H +#define CEPH_REDISDRIVER_H + +#include +#include +#include +#include "rgw_common.h" +#include "rgw_cache_driver.h" +#include "driver/d4n/d4n_directory.h" + +namespace rgw { namespace cache { + +class RedisDriver : public CacheDriver { + private: + cpp_redis::client client; + + public: + RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {} + + virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override; + virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; + virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override; + virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) override; + virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override; + virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; + virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; + virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; + virtual int delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) override; + virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) override; + virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val) override; + + /* Entry */ + virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override; + virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override; + + /* Partition */ + virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override; + virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override; +}; + +} } // namespace rgw::cal + +#endif