From: Samarah Uriarte Date: Thu, 29 Jun 2023 13:51:02 +0000 (-0400) Subject: rgw/cache: This commit adds implementation of Partition in X-Git-Tag: v20.0.0~2219^2~73 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=253bdafa41250ea04fe317cf01f021f2906bfc34;p=ceph.git rgw/cache: This commit adds implementation of Partition in ssd driver and squashes the following related commits. RGW: Minor RedisDriver cleanup RGW: Add partitioning RGW: Cleanup RedisDriver Signed-off-by: Samarah Uriarte --- diff --git a/build/boost_redis b/build/boost_redis new file mode 160000 index 0000000000000..69d12421e29ae --- /dev/null +++ b/build/boost_redis @@ -0,0 +1 @@ +Subproject commit 69d12421e29ae36750148acee1039c261e763d19 diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 441c4bc149282..9bd812a9245a3 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -24,7 +24,7 @@ int ObjectDirectory::find_client(cpp_redis::client* client) { } std::string ObjectDirectory::build_index(CacheObj* object) { - return "rgw-object:" + object->objName + ":object-directory"; + return object->bucketName + "_" + object->objName; } int ObjectDirectory::exist_key(std::string key) { @@ -204,7 +204,7 @@ int BlockDirectory::find_client(cpp_redis::client* client) { } std::string BlockDirectory::build_index(CacheBlock* block) { - return "rgw-object:" + block->cacheObj.objName + ":block-directory"; + return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + boost::lexical_cast(block->blockId); } int BlockDirectory::exist_key(std::string key) { diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index fd2690db14137..13744ae42cb9b 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace rgw { namespace d4n { @@ -16,16 +17,17 @@ struct Address { struct CacheObj { std::string objName; /* S3 object name */ std::string bucketName; /* S3 bucket name */ - time_t creationTime; // Creation time of the S3 Object + time_t creationTime; /* Creation time of the S3 Object */ bool dirty; - std::vector hostsList; /* Currently not supported: list of hostnames of object locations for multiple backends */ + std::vector hostsList; /* List of hostnames of object locations for multiple backends */ }; struct CacheBlock { CacheObj cacheObj; + uint64_t blockId; /* RADOS object block ID */ uint64_t size; /* Block size in bytes */ int globalWeight = 0; - std::vector hostsList; /* Currently not supported: list of hostnames of block locations */ + std::vector hostsList; /* List of hostnames of block locations */ }; class Directory { @@ -34,7 +36,7 @@ class Directory { CephContext* cct; }; -class ObjectDirectory: public Directory { // where else should object directory be called? -Sam +class ObjectDirectory: public Directory { // weave into write workflow -Sam public: ObjectDirectory() {} ObjectDirectory(std::string host, int port) { diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 6150223d1e893..352e0b76deae0 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -39,7 +39,10 @@ class D4NFilterDriver : public FilterDriver { public: D4NFilterDriver(Driver* _next) : FilterDriver(_next) { - cacheDriver = new rgw::cache::RedisDriver(); // change later -Sam + rgw::cache::Partition partition_info; + partition_info.location = "RedisCache"; // figure out how to fill rest of partition information -Sam + + cacheDriver = new rgw::cache::RedisDriver(partition_info); // change later -Sam objDir = new rgw::d4n::ObjectDirectory(); blockDir = new rgw::d4n::BlockDirectory(); cacheBlock = new rgw::d4n::CacheBlock(); diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index 0d5d0f23acc56..fd521092b2c36 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -8,20 +8,7 @@ namespace rgw { namespace cache { -/* Base metadata and data fields should remain consistent */ -std::vector baseFields { - "mtime", - "object_size", - "accounted_size", - "epoch", - "version_id", - "source_zone_short_id", - "bucket_count", - "bucket_size", - "user_quota.max_size", - "user_quota.max_objects", - "max_buckets", - "data"}; +std::unordered_map RedisDriver::partitions; std::vector< std::pair > build_attrs(rgw::sal::Attrs* binary) { @@ -44,7 +31,7 @@ int RedisDriver::find_client(const DoutPrefixProvider* dpp) return 0; if (addr.host == "" || addr.port == 0) { - dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; + ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; return EDESTADDRREQ; } @@ -62,22 +49,136 @@ int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, of return ret.second; } +std::optional RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) +{ + auto iter = entries.find(key); + + if (iter != entries.end()) { + return iter->second; + } + + return std::nullopt; +} + int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) { return entries.erase(key); } -std::optional RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) +int RedisDriver::add_partition_info(Partition& info) { - auto iter = entries.find(key); + std::string key = info.name + info.type; + auto ret = partitions.emplace(key, info); - if (iter != entries.end()) { - return iter->second; + return ret.second; +} + +int RedisDriver::remove_partition_info(Partition& info) +{ + std::string key = info.name + info.type; + return partitions.erase(key); +} + +bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) +{ + int result; + std::string entry = partition_info.location + key; + std::vector keys; + keys.push_back(entry); + + if (!client.is_connected()) + find_client(dpp); + + try { + client.exists(keys, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) {} + + return result; +} + +std::vector RedisDriver::list_entries(const DoutPrefixProvider* dpp) +{ + std::vector result; + + for (auto it = entries.begin(); it != entries.end(); ++it) + result.push_back(it->second); + + return result; +} + +size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) +{ + return entries.size(); +} + +/* +uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) +{ + int result = -1; + + if (!client.is_connected()) + find_client(dpp); + + try { + client.info([&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + int usedMem = -1; + int maxMem = -1; + + std::istringstream iss(reply.as_string()); + std::string line; + while (std::getline(iss, line)) { + size_t pos = line.find_first_of(":"); + if (pos != std::string::npos) { + if (line.substr(0, pos) == "used_memory") { + usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); + } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") { + maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); + } + } + } + + if (usedMem > -1 && maxMem > -1) + result = maxMem - usedMem; + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; } + return result; +} +*/ + +std::optional RedisDriver::get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type) +{ + std::string key = name + type; + + auto iter = partitions.find(key); + if (iter != partitions.end()) + return iter->second; + return std::nullopt; } +std::vector RedisDriver::list_partitions(const DoutPrefixProvider* dpp) +{ + std::vector partitions_v; + + for (auto& it : partitions) + partitions_v.emplace_back(it.second); + + return partitions_v; +} + /* Currently an attribute but will also be part of the Entry metadata once consistency is guaranteed -Sam int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight) { @@ -95,55 +196,43 @@ int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) { this->cct = cct; + addr.host = cct->_conf->rgw_d4n_host; // change later -Sam addr.port = cct->_conf->rgw_d4n_port; + if (partition_info.location.back() != '/') { + partition_info.location += "/"; + } + if (addr.host == "" || addr.port == 0) { - dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; + ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; return EDESTADDRREQ; } client.connect("127.0.0.1", 6379, nullptr); - if (!client.is_connected()) + if (!client.is_connected()) { + ldpp_dout(dpp, 10) << "RGW Redis Cache: Could not connect to redis cache endpoint." << dendl; return ECONNREFUSED; + } return 0; } int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) { - std::string entryName = "rgw-object:" + key + ":cache"; + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; - - /* Every set will be treated as new */ - try { - /* Set data field */ - int result; - - client.hset(entryName, "data", bl.to_str(), [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - if (result <= 0) { - return -1; - } - } catch(std::exception &e) { - return -1; - } + find_client(dpp); + /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam try { - /* Set attribute fields */ std::string result; - std::vector< std::pair > redisAttrs = build_attrs(&attrs); + auto redisAttrs = build_attrs(&attrs); + redisAttrs.push_back({"data", bl.to_str()}); - client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) { + client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) { if (!reply.is_null()) { result = reply.as_string(); } @@ -158,37 +247,32 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff return -1; } - return 0; + return insert_entry(dpp, key, 0, len); // why is offset necessarily 0? -Sam } int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) { - std::string result; - std::string entryName = "rgw-object:" + key + ":cache"; + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); if (key_exists(dpp, key)) { - rgw::sal::Attrs::iterator it; - std::vector< std::pair > redisAttrs; - std::vector getFields; - /* Retrieve existing values from cache */ try { - client.hgetall(entryName, [&bl, &attrs](cpp_redis::reply &reply) { + client.hgetall(entry, [&bl, &attrs](cpp_redis::reply &reply) { if (reply.is_array()) { auto arr = reply.as_array(); if (!arr[0].is_null()) { for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { - if (arr[i].as_string() == "data") + if (arr[i].as_string() == "data") { bl.append(arr[i + 1].as_string()); - else { - buffer::list temp; - temp.append(arr[i + 1].as_string()); - attrs.insert({arr[i].as_string(), temp}); - temp.clear(); + } else { + buffer::list bl_value; + bl_value.append(arr[i + 1].as_string()); + attrs.insert({arr[i].as_string(), bl_value}); + bl_value.clear(); } } } @@ -200,30 +284,24 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_ return -1; } } else { - dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl; + ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl; return -2; } return 0; } -rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) -{ - return {}; -} - int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) { - std::string result; - std::string value = ""; - std::string entryName = "rgw-object:" + key + ":cache"; + std::string value; + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); if (key_exists(dpp, key)) { try { - client.hget(entryName, "data", [&value](cpp_redis::reply &reply) { + client.hget(entry, "data", [&value](cpp_redis::reply &reply) { if (!reply.is_null()) { value = reply.as_string(); } @@ -231,17 +309,18 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& client.sync_commit(std::chrono::milliseconds(1000)); } catch(std::exception &e) { - return -2; + return -1; } } - try { + try { // do we want key check here? -Sam /* Append to existing value or set as new value */ - std::string temp = value + bl_data.to_str(); + std::string newVal = value + bl_data.to_str(); std::vector< std::pair > field; - field.push_back({"data", temp}); + field.push_back({"data", newVal}); + std::string result; - client.hmset(entryName, field, [&result](cpp_redis::reply &reply) { + client.hmset(entry, field, [&result](cpp_redis::reply &reply) { if (!reply.is_null()) { result = reply.as_string(); } @@ -253,7 +332,7 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& return -1; } } catch(std::exception &e) { - return -2; + return -1; } return 0; @@ -261,56 +340,78 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) { - int result = 0; - std::string entryName = "rgw-object:" + key + ":cache"; - std::vector deleteField; - deleteField.push_back("data"); + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); if (key_exists(dpp, key)) { + int exists = -2; + try { - client.hdel(entryName, deleteField, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); + client.hexists(entry, "data", [&exists](cpp_redis::reply &reply) { + if (!reply.is_null()) { + exists = reply.as_integer(); + } + }); - client.sync_commit(std::chrono::milliseconds(1000)); + client.sync_commit(std::chrono::milliseconds(1000)); } catch(std::exception &e) { - return -2; + return -1; + } + + if (exists) { + try { + int result; + std::vector deleteField; + deleteField.push_back("data"); + + client.hdel(entry, deleteField, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (!result) { + return -1; + } else { + return remove_entry(dpp, key); + } + } catch(std::exception &e) { + return -1; + } + } else { + return 0; /* No delete was necessary */ } } else { return 0; /* No delete was necessary */ } - - return result - 1; } int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { - std::string result; - std::string entryName = "rgw-object:" + key + ":cache"; + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); if (key_exists(dpp, key)) { - rgw::sal::Attrs::iterator it; - std::vector< std::pair > redisAttrs; - std::vector getFields; - - /* Retrieve existing values from cache */ try { - client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) { + client.hgetall(entry, [&attrs](cpp_redis::reply &reply) { if (reply.is_array()) { auto arr = reply.as_array(); if (!arr[0].is_null()) { - for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { - getFields.push_back(arr[i].as_string()); - } + for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { + if (arr[i].as_string() != "data") { + buffer::list bl_value; + bl_value.append(arr[i + 1].as_string()); + attrs.insert({arr[i].as_string(), bl_value}); + bl_value.clear(); + } + } } } }); @@ -319,47 +420,9 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key } catch(std::exception &e) { return -1; } - - /* Ensure all metadata, attributes, and data has been set */ - for (const auto& field : baseFields) { - auto it = std::find_if(getFields.begin(), getFields.end(), - [&](const auto& comp) { return comp == field; }); - - if (it == getFields.end()) { - return -1; - } - } - - getFields.erase(std::find(getFields.begin(), getFields.end(), "data")); /* Do not query for data field */ - int exists = -1; - /* Get attributes from cache */ - try { - client.hmget(entryName, getFields, [&exists, &attrs, &getFields](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - exists = 0; - - for (long unsigned int i = 0; i < getFields.size(); ++i) { - std::string tmp = arr[i].as_string(); - buffer::list bl; - bl.append(tmp); - attrs.insert({getFields[i], bl}); - } - } - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - exit(-1); - } - - if (exists < 0) { - dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl; - return -2; - } + } else { + ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl; + return -2; } return 0; @@ -367,34 +430,37 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { - /* Creating the index based on oid */ - std::string entryName = "rgw-object:" + key + ":cache"; - std::string result; + if (attrs.empty()) + return -1; + + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); - /* Every set will be treated as new */ - try { - std::vector< std::pair > redisAttrs = build_attrs(&attrs); - - if (redisAttrs.empty()) { - return -1; - } - - client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + if (key_exists(dpp, key)) { + /* Every attr set will be treated as new */ + try { + std::string result; + auto redisAttrs = build_attrs(&attrs); + + client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_string(); + } + }); - client.sync_commit(std::chrono::milliseconds(1000)); + client.sync_commit(std::chrono::milliseconds(1000)); - if (result != "OK") { + if (result != "OK") { + return -1; + } + } catch(std::exception &e) { return -1; } - } catch(std::exception &e) { - return -1; + } else { + ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl; + return -2; } return 0; @@ -402,20 +468,17 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) { - std::string result; - std::string entryName = "rgw-object:" + key + ":cache"; + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); if (key_exists(dpp, key)) { try { - std::vector< std::pair > redisAttrs; - for (const auto& it : attrs) { - redisAttrs.push_back({it.first, it.second.to_str()}); - } + std::string result; + auto redisAttrs = build_attrs(&attrs); - client.hmset(entryName, redisAttrs, [&result](cpp_redis::reply &reply) { + client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) { if (!reply.is_null()) { result = reply.as_string(); } @@ -427,9 +490,10 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& return -1; } } catch(std::exception &e) { - return -2; + return -1; } } else { + ldpp_dout(dpp, 20) << "RGW Redis Cache: Object was not retrievable." << dendl; return -2; } @@ -438,18 +502,16 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) { - int result = 0; - std::string entryName = "rgw-object:" + key + ":cache"; + std::string entry = partition_info.location + key; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); if (key_exists(dpp, key)) { std::vector getFields; - /* Retrieve existing values from cache */ try { - client.hgetall(entryName, [&getFields](cpp_redis::reply &reply) { + client.hgetall(entry, [&getFields](cpp_redis::reply &reply) { if (reply.is_array()) { auto arr = reply.as_array(); @@ -466,11 +528,11 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& return -1; } - std::vector< std::pair > redisAttrs = build_attrs(&del_attrs); + auto redisAttrs = build_attrs(&del_attrs); std::vector redisFields; std::transform(begin(redisAttrs), end(redisAttrs), std::back_inserter(redisFields), - [](auto const& pair) { return pair.first; }); + [](auto const& pair) { return pair.first; }); /* Only delete attributes that have been stored */ for (const auto& it : redisFields) { @@ -480,7 +542,9 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& } try { - client.hdel(entryName, redisFields, [&result](cpp_redis::reply &reply) { + int result = 0; + + client.hdel(entry, redisFields, [&result](cpp_redis::reply &reply) { if (reply.is_integer()) { result = reply.as_integer(); } @@ -494,26 +558,25 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& } } - dout(20) << "RGW Redis Cache: Object is not in cache." << dendl; + ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl; return -2; } std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) { - int exists = -2; - std::string result; - std::string entryName = "rgw-object:" + key + ":cache"; + std::string entry = partition_info.location + key; std::string attrValue; if (!client.is_connected()) - return {}; + find_client(dpp); if (key_exists(dpp, key)) { + int exists = -2; std::string getValue; /* Ensure field was set */ try { - client.hexists(entryName, attr_name, [&exists](cpp_redis::reply& reply) { + client.hexists(entry, attr_name, [&exists](cpp_redis::reply& reply) { if (!reply.is_null()) { exists = reply.as_integer(); } @@ -525,15 +588,14 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri } if (!exists) { - dout(20) << "RGW Redis Cache: Attribute was not set." << dendl; + ldpp_dout(dpp, 20) << "RGW Redis Cache: Attribute was not set." << dendl; return {}; } /* Retrieve existing value from cache */ try { - client.hget(entryName, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) { + client.hget(entry, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) { if (!reply.is_null()) { - exists = 0; attrValue = reply.as_string(); } }); @@ -542,11 +604,9 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri } catch(std::exception &e) { return {}; } - - if (exists < 0) { - dout(20) << "RGW Redis Cache: Object was not retrievable." << dendl; - return {}; - } + } else { + ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl; + return {}; } return attrValue; @@ -554,27 +614,31 @@ std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::stri int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) { - /* Creating the index based on key */ - std::string entryName = "rgw-object:" + key + ":cache"; - int result = -1; + std::string entry = partition_info.location + key; + int result = 0; if (!client.is_connected()) - return ECONNREFUSED; + find_client(dpp); - /* Every set will be treated as new */ - try { - client.hset(entryName, attr_name, attrVal, [&result](cpp_redis::reply& reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); + if (key_exists(dpp, key)) { + /* Every attr set will be treated as new */ + try { + client.hset(entry, attr_name, attrVal, [&result](cpp_redis::reply& reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + } else { + ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl; + return -2; } - return result; + return result - 1; } std::unique_ptr RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) @@ -582,166 +646,90 @@ std::unique_ptr RedisDriver::get_cache_aio_request_ptr(const Do return std::make_unique(this); } -bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) +rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) { - int result = -1; - std::string entryName = "rgw-object:" + key + ":cache"; - std::vector keys; - keys.push_back(entryName); - - if (!client.is_connected()) - find_client(dpp); + rgw_raw_obj r_obj; + r_obj.oid = key; - try { - client.exists(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) {} - - return result; + return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id); } -std::vector RedisDriver::list_entries(const DoutPrefixProvider* dpp) +int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) { - std::vector result; - - for (auto it = entries.begin(); it != entries.end(); ++it) { - result.push_back(it->second); + ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl; + aio_cb.reset(new struct aiocb); + memset(aio_cb.get(), 0, sizeof(struct aiocb)); + aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY)); + + if (aio_cb->aio_fildes < 0) { + int err = errno; + ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl; + return -err; } - return result; -} - -size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) -{ - return entries.size(); -} - -Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) -{ - Partition part; - return part; // Implement -Sam -} - -uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) -{ - int result = -1; - - if (!client.is_connected()) - find_client(dpp); - - try { - client.info([&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - int usedMem = -1; - int maxMem = -1; - - std::istringstream iss(reply.as_string()); - std::string line; - while (std::getline(iss, line)) { - size_t pos = line.find_first_of(":"); - if (pos != std::string::npos) { - if (line.substr(0, pos) == "used_memory") { - usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); - } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") { - maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2)); - } - } - } - - if (usedMem > -1 && maxMem > -1) - result = maxMem - usedMem; - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; + if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) { + posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise); } - return result; -} - -int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) -{ - ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl; - aio_cb.reset(new struct aiocb); - memset(aio_cb.get(), 0, sizeof(struct aiocb)); - aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY)); - - if (aio_cb->aio_fildes < 0) { - int err = errno; - ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl; - return -err; - } - - if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) { - posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise); - } - - bufferptr bp(read_len); - aio_cb->aio_buf = bp.c_str(); - result.append(std::move(bp)); + bufferptr bp(read_len); + aio_cb->aio_buf = bp.c_str(); + result.append(std::move(bp)); - aio_cb->aio_nbytes = read_len; - aio_cb->aio_offset = read_ofs; - aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD; - aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch; - aio_cb->aio_sigevent.sigev_notify_attributes = nullptr; - aio_cb->aio_sigevent.sigev_value.sival_ptr = arg; + aio_cb->aio_nbytes = read_len; + aio_cb->aio_offset = read_ofs; + aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD; + aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch; + aio_cb->aio_sigevent.sigev_notify_attributes = nullptr; + aio_cb->aio_sigevent.sigev_value.sival_ptr = arg; - return 0; + return 0; } void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval) { - auto p = std::unique_ptr{static_cast(sigval.sival_ptr)}; - auto op = std::move(p->user_data); - const int ret = -aio_error(op.aio_cb.get()); - boost::system::error_code ec; - if (ret < 0) { - ec.assign(-ret, boost::system::system_category()); - } + auto p = std::unique_ptr{static_cast(sigval.sival_ptr)}; + auto op = std::move(p->user_data); + const int ret = -aio_error(op.aio_cb.get()); + boost::system::error_code ec; + if (ret < 0) { + ec.assign(-ret, boost::system::system_category()); + } - ceph::async::dispatch(std::move(p), ec, std::move(op.result)); + ceph::async::dispatch(std::move(p), ec, std::move(op.result)); } template auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler) { - auto p = Completion::create(ex1, std::move(handler)); - return p; + auto p = Completion::create(ex1, std::move(handler)); + return p; } template auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, off_t read_ofs, off_t read_len, CompletionToken&& token) { - std::string location = "";//partition_info.location + key; - ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl; - - using Op = AsyncReadOp; - using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get()); - if(0 == ret) { - ret = ::aio_read(op.aio_cb.get()); - } - // ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl; - /* if(ret < 0) { - auto ec = boost::system::error_code{-ret, boost::system::system_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - (void)p.release(); - }*/ - //return init.result.get(); + std::string location = partition_info.location + key; + ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl; + + using Op = AsyncReadOp; + using Signature = typename Op::Signature; + boost::asio::async_completion init(token); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; + + int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get()); + if (0 == ret) { + ret = ::aio_read(op.aio_cb.get()); + } +// ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl; + /* if(ret < 0) { + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + (void)p.release(); + }*/ + //return init.result.get(); } void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index c392b5487b83f..b3835c9362ef0 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -4,11 +4,10 @@ //#include #include #include "common/async/completion.h" -#include -#include -#include #include "rgw_common.h" #include "rgw_cache_driver.h" + +#include #include "driver/d4n/d4n_directory.h" namespace rgw { namespace cache { @@ -27,12 +26,32 @@ class RedisCacheAioRequest: public CacheAioRequest { class RedisDriver : public CacheDriver { public: - RedisDriver() : CacheDriver() {} + RedisDriver(Partition& _partition_info) : partition_info(_partition_info), + free_space(_partition_info.size), + outstanding_write_size(0) + { + add_partition_info(_partition_info); + } + virtual ~RedisDriver() + { + remove_partition_info(partition_info); + } + + /* Entry */ + virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override; + virtual std::vector list_entries(const DoutPrefixProvider* dpp) override; + virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override; + //int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam + + /* Partition */ + virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; } + virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } // how to get this from redis server? -Sam + static std::optional get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type); + static std::vector list_partitions(const DoutPrefixProvider* dpp); virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override; - virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) override; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; @@ -42,18 +61,9 @@ class RedisDriver : public CacheDriver { virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) override; virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val) override; - /* Entry */ - virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override; - virtual std::vector list_entries(const DoutPrefixProvider* dpp) override; - virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override; - int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam - - /* Partition */ - virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override; - virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override; - virtual std::unique_ptr get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override; - + virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; + struct libaio_handler { // should this be the same as SSDDriver? -Sam rgw::Aio* throttle = nullptr; rgw::AioResult& r; @@ -69,17 +79,24 @@ class RedisDriver : public CacheDriver { auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, off_t read_ofs, off_t read_len, CompletionToken&& token); - private: + protected: cpp_redis::client client; rgw::d4n::Address addr; + static std::unordered_map partitions; std::unordered_map entries; + Partition partition_info; + uint64_t free_space; + uint64_t outstanding_write_size; CephContext* cct; int find_client(const DoutPrefixProvider* dpp); int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len); - int remove_entry(const DoutPrefixProvider* dpp, std::string key); std::optional get_entry(const DoutPrefixProvider* dpp, std::string key); + int remove_entry(const DoutPrefixProvider* dpp, std::string key); + int add_partition_info(Partition& info); + int remove_partition_info(Partition& info); + private: // unique_ptr with custom deleter for struct aiocb struct libaio_aiocb_deleter { void operator()(struct aiocb* c) {