From fa30165b58ca00c60522cffe221af3b37a80f2be Mon Sep 17 00:00:00 2001 From: Samarah Date: Tue, 13 Jun 2023 12:33:18 -0400 Subject: [PATCH] RGW: add entries and entry methods Signed-off-by: Samarah --- src/rgw/rgw_cache_driver.h | 1 + src/rgw/rgw_redis_driver.cc | 106 ++++++++++++++++++++++++++++-------- src/rgw/rgw_redis_driver.h | 5 ++ 3 files changed, 90 insertions(+), 22 deletions(-) diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index 0fa1882b2e0..0bae4602c53 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -15,6 +15,7 @@ struct Entry { std::string key; off_t offset; uint64_t len; + int localWeight; }; class CacheDriver { diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index 751eeedf6be..a5a0f4c6609 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -7,7 +7,7 @@ namespace rgw { namespace cache { /* Base metadata and data fields should remain consistent */ -std::vector baseFields{ +std::vector baseFields { "mtime", "object_size", "accounted_size", @@ -35,6 +35,25 @@ std::vector< std::pair > build_attrs(rgw::sal::Attrs* return values; } +int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len) { + auto ret = entries.emplace(key, Entry(key, offset, len)); + return ret.second; +} + +int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) { + return entries.erase(key); +} + +std::optional RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) { + auto iter = entries.find(key); + + if (iter != entries.end()) { + return iter->second; + } + + return std::nullopt; +} + int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) { if (client.is_connected()) return 0; @@ -76,32 +95,72 @@ bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& k } std::vector RedisDriver::list_entries(const DoutPrefixProvider* dpp) { - std::vector result; + std::vector keys; + std::vector entries; if (!client.is_connected()) return {}; -// try { + try { size_t cursor = 0; const std::string pattern = "*:cache"; - /* client.scan(cursor, pattern, [](cpp_redis::reply &reply) { - dout(0) << "Sam" << dendl; - if (!reply.is_null()) { - //result = reply.as_array(); + do { + auto reply = client.scan(cursor, pattern); + client.sync_commit(std::chrono::milliseconds(1000)); + + auto arr = reply.get().as_array(); + cursor = std::stoi(arr[0].as_string()); + auto result = arr[1].as_array(); + + for (auto it = result.begin(); it != result.end(); ++it) { + int i = std::distance(result.begin(), it); + std::string entryName = result[i].as_string(); + keys.push_back(entryName.substr(11, entryName.length() - 17)); } - }); - client.sync_commit(std::chrono::milliseconds(1000)); -*/ -/* if (result.empty()) { - return {}; - } + } while (cursor != 0); } catch(std::exception &e) { return {}; } -*/ - dout(0) << "Sam: " << client.is_connected() << dendl; - return result; + + /* Construct list of entries */ + for (auto it = keys.begin(); it != keys.end(); ++it) { + Entry entry; + + if (key_exists(dpp, *it)) { + try { + std::vector fields; + std::string entryName = "rgw-object:" + *it + ":cache"; + + entry.key = *it; + fields.push_back("offset"); + fields.push_back("len"); + fields.push_back("localWeight"); + + client.hmget(entryName, fields, [&entry](cpp_redis::reply &reply) { + if (reply.is_array()) { + auto arr = reply.as_array(); + + if (!arr[0].is_null()) { + entry.offset = std::stol(arr[0].as_string().c_str()); + entry.len = std::stoi(arr[1].as_string()); + entry.localWeight = std::stoi(arr[2].as_string()); + } + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return {}; // return failure or skip entry? -Sam + } + } else { // if one entry isn't found, shoud entire operation return a failure? -Sam + dout(20) << "RGW Redis Cache: Entry " << *it << " was not retrievable." << dendl; + } + + entries.push_back(entry); + } + + return entries; } size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) { @@ -149,11 +208,14 @@ uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) { std::istringstream iss(reply.as_string()); std::string line; while (std::getline(iss, line)) { - if (line.substr(0, line.find(':')) == "used_memory") { - usedMem = std::stoi(line); - } else if (line.substr(0, line.find(':')) == "maxmemory") { - maxMem = std::stoi(line); - } + size_t pos = line.find_first_of(":"); + if (pos != std::string::npos) { + if (line.substr(0, pos) == "used_memory") { + usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); + } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") { + maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); + } + } } if (usedMem > -1 && maxMem > -1) @@ -218,7 +280,7 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff return 0; } -int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) { +int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) { // for whole objects? -Sam std::string result; std::string entryName = "rgw-object:" + key + ":cache"; diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index 06cddefb0c0..9f1bffe213a 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -13,6 +13,11 @@ namespace rgw { namespace cache { class RedisDriver : public CacheDriver { private: cpp_redis::client client; + std::unordered_map entries; + + int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len); + int remove_entry(const DoutPrefixProvider* dpp, std::string key); + std::optional get_entry(const DoutPrefixProvider* dpp, std::string key); public: RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {} -- 2.39.5