clean up and removal of cpp_redis.
d4n/filter: Add `optional_yield` to CacheDriver calls in D4N Filter and
make minor updates to several filter methods
rgw/d4n: fix compilation issue.
rgw/cache: Add `del` method to CacheDriver and SSDDriver
cmake/d4n: Remove unnecessary D4N lines
rgw: Add `io_context` to D4N Filter and RedisDriver, remove `cpp_redis`
library from RedisDriver, and perform minor cleanup
d4n: Remove `cpp_redis` library from D4N directory and policy; update calls in filter; move Entry struct to base CachePolicy class
build/cpp_redis: Remove `cpp_redis` library
rgw/d4n: including <boost/asio/detached.hpp> wheerever needed.
rgw/d4n : fixes to d4n filter, policy, directory and redis driver files
for compilation errors.
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
Signed-off-by: Casey Bodley <cbodley@redhat.com>
Co-authored-by: Pritha Srivastava <prsrivas@redhat.com>
[submodule "s3select"]
path = src/s3select
url = https://github.com/ceph/s3select.git
-[submodule "src/cpp_redis"]
- path = src/cpp_redis
- url = https://github.com/ceph/cpp_redis.git
[submodule "src/libkmip"]
path = src/libkmip
url = https://github.com/ceph/libkmip
add_subdirectory(java)
endif()
-if(WITH_RADOSGW_D4N)
- add_subdirectory(cpp_redis) # remove later -Sam
-endif()
-
if (WITH_BLKIN)
add_subdirectory(blkin/blkin-lib)
endif(WITH_BLKIN)
+++ /dev/null
-Subproject commit 72d992fff2a95edb37430a75909a844637549331
driver/rados/topic.cc)
list(APPEND librgw_common_srcs
- driver/d4n/d4n_directory.cc
driver/immutable_config/store.cc
driver/json_config/store.cc
driver/rados/config/impl.cc
if(WITH_RADOSGW_DAOS)
list(APPEND librgw_common_srcs driver/motr/rgw_sal_daos.cc)
endif()
-if(WITH_RADOSGW_D4N)
- list(APPEND librgw_common_srcs driver/d4n/d4n_directory.cc)
- list(APPEND librgw_common_srcs driver/d4n/rgw_sal_d4n.cc)
-endif()
if(WITH_RADOSGW_POSIX)
#add_subdirectory(driver/posix)
find_package(LMDB REQUIRED)
PUBLIC "${LUA_INCLUDE_DIR}")
if(WITH_RADOSGW_D4N)
- add_dependencies(rgw_common cpp_redis) # remove later -Sam
- target_link_libraries(rgw_common PRIVATE cpp_redis)
- target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/cpp_redis/includes")
- target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/cpp_redis/tacopie/includes")
-
target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/boost_redis/include")
endif()
+#include <boost/asio/consign.hpp>
+#include "common/async/blocked_completion.h"
#include "d4n_directory.h"
-#include <time.h>
-
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
namespace rgw { namespace d4n {
-int ObjectDirectory::find_client(cpp_redis::client* client) {
- if (client->is_connected())
- return 0;
-
- if (addr.host == "" || addr.port == 0) {
- dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
- return EDESTADDRREQ;
- }
+// initiate a call to async_exec() on the connection's executor
+struct initiate_exec {
+ std::shared_ptr<boost::redis::connection> conn;
+ boost::redis::request req;
- client->connect(addr.host, addr.port, nullptr);
+ using executor_type = boost::redis::connection::executor_type;
+ executor_type get_executor() const noexcept { return conn->get_executor(); }
- if (!client->is_connected())
- return ECONNREFUSED;
+ template <typename Handler, typename Response>
+ void operator()(Handler handler, Response& resp)
+ {
+ conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn));
+ }
+};
+
+template <typename Response, typename CompletionToken>
+auto async_exec(std::shared_ptr<connection> conn,
+ const boost::redis::request& req,
+ Response& resp, CompletionToken&& token)
+{
+ return boost::asio::async_initiate<CompletionToken,
+ void(boost::system::error_code, std::size_t)>(
+ initiate_exec{std::move(conn), req}, token, resp);
+}
- return 0;
+template <typename T>
+void redis_exec(std::shared_ptr<connection> conn,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::response<T>& resp, optional_yield y)
+{
+ if (y) {
+ auto yield = y.get_yield_context();
+ async_exec(std::move(conn), req, resp, yield[ec]);
+ } else {
+ async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
+ }
}
std::string ObjectDirectory::build_index(CacheObj* object) {
return object->bucketName + "_" + object->objName;
}
-int ObjectDirectory::exist_key(std::string key) {
+int ObjectDirectory::exist_key(std::string key, optional_yield y) {
int result = 0;
std::vector<std::string> keys;
keys.push_back(key);
-
+#if 0
if (!client.is_connected()) {
return result;
}
-
+#endif
+ response<int> resp;
try {
- client.exists(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
+ boost::system::error_code ec;
+ request req;
+ req.push("EXISTS", key);
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if ((bool)ec)
+ return false;
} catch(std::exception &e) {}
- return result;
+ return std::get<0>(resp).value();
}
-int ObjectDirectory::set_value(CacheObj* object) {
- /* Creating the index based on objName */
- std::string result;
- std::string key = build_index(object);
- if (!client.is_connected()) {
- find_client(&client);
- }
+void ObjectDirectory::shutdown() // generalize -Sam
+{
+ // call cancel() on the connection's executor
+ boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
+}
- /* Every set will be new */
- if (addr.host == "" || addr.port == 0) {
- dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
- return -2;
- }
+int ObjectDirectory::set(CacheObj* object, optional_yield y) {
+ std::string key = build_index(object);
- std::string endpoint = addr.host + ":" + std::to_string(addr.port);
- std::vector< std::pair<std::string, std::string> > list;
+ /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
+ std::string endpoint = cct->_conf->rgw_d4n_host + ":" + std::to_string(cct->_conf->rgw_d4n_port);
+ std::list<std::string> redisValues;
- /* Creating a list of the entry's properties */
- list.push_back(make_pair("key", key));
- list.push_back(make_pair("objName", object->objName));
- list.push_back(make_pair("bucketName", object->bucketName));
- list.push_back(make_pair("creationTime", std::to_string(object->creationTime)));
- list.push_back(make_pair("dirty", std::to_string(object->dirty)));
- list.push_back(make_pair("hosts", endpoint));
+ /* Creating a redisValues of the entry's properties */
+ redisValues.push_back("objName");
+ redisValues.push_back(object->objName);
+ redisValues.push_back("bucketName");
+ redisValues.push_back(object->bucketName);
+ redisValues.push_back("creationTime");
+ redisValues.push_back(std::to_string(object->creationTime));
+ redisValues.push_back("dirty");
+ redisValues.push_back(std::to_string(object->dirty));
+ redisValues.push_back("objHosts");
+ redisValues.push_back(endpoint); // Set in filter -Sam
try {
- client.hmset(key, list, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push_range("HMSET", key, redisValues);
+ response<std::string> resp;
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (result != "OK") {
+ if (std::get<0>(resp).value() != "OK" || (bool)ec) {
return -1;
}
} catch(std::exception &e) {
return 0;
}
-int ObjectDirectory::get_value(CacheObj* object) {
- int keyExist = -2;
+int ObjectDirectory::get(CacheObj* object, optional_yield y) {
std::string key = build_index(object);
-
+#if 0
if (!client.is_connected()) {
find_client(&client);
}
-
- if (exist_key(key)) {
+#endif
+ if (exist_key(key, y)) {
std::string key;
std::string objName;
std::string bucketName;
std::string hosts;
std::vector<std::string> fields;
- fields.push_back("key");
fields.push_back("objName");
fields.push_back("bucketName");
fields.push_back("creationTime");
fields.push_back("dirty");
- fields.push_back("hosts");
+ fields.push_back("objHosts");
try {
- client.hmget(key, fields, [&key, &objName, &bucketName, &creationTime, &dirty, &hosts, &keyExist](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- keyExist = 0;
- key = arr[0].as_string();
- objName = arr[1].as_string();
- bucketName = arr[2].as_string();
- creationTime = arr[3].as_string();
- dirty = arr[4].as_string();
- hosts = arr[5].as_string();
- }
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push_range("HMGET", key, fields);
+ response< std::vector<std::string> > resp;
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (keyExist < 0) {
- return keyExist;
+ if (!std::get<0>(resp).value().size() || (bool)ec) {
+ return -1;
}
- /* Currently, there can only be one host */
- object->objName = objName;
- object->bucketName = bucketName;
+ object->objName = std::get<0>(resp).value()[0];
+ object->bucketName = std::get<0>(resp).value()[1];
+ object->creationTime = boost::lexical_cast<time_t>(std::get<0>(resp).value()[2]);
+ object->dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[3]);
- struct std::tm tm;
- std::istringstream(creationTime) >> std::get_time(&tm, "%T");
- strptime(creationTime.c_str(), "%T", &tm); // Need to check formatting -Sam
- object->creationTime = mktime(&tm);
+ {
+ std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
- std::istringstream(dirty) >> object->dirty;
+ while (!ss.eof()) {
+ std::string host;
+ std::getline(ss, host, '_');
+ object->hostsList.push_back(host);
+ }
+ }
} catch(std::exception &e) {
- keyExist = -1;
+ return -1;
}
+ } else {
+ return -2;
}
- return keyExist;
+ return 0;
}
-int ObjectDirectory::del_value(CacheObj* object) {
- int result = 0;
- std::vector<std::string> keys;
+int ObjectDirectory::copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y) {
std::string key = build_index(object);
+ std::vector<std::string> keys;
keys.push_back(key);
-
+ std::string copyKey;
+#if 0
if (!client.is_connected()) {
find_client(&client);
}
-
- if (exist_key(key)) {
+#endif
+ if (exist_key(key, y)) {
try {
- client.del(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
+ response<int> resp;
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("COPY", key, copyKey);
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if ((bool)ec) {
+ return -1;
+ }
+ }
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
+ response<std::string> res;
+
+ redis_exec(conn, ec, req, res, y);
+
+ if (std::get<0>(res).value() != "OK" || (bool)ec) {
+ return -1;
}
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- return result - 1;
+ }
+
+ return std::get<0>(resp).value() - 1;
} catch(std::exception &e) {
return -1;
}
}
}
-int BlockDirectory::find_client(cpp_redis::client* client) {
- if (client->is_connected())
- return 0;
+int ObjectDirectory::del(CacheObj* object, optional_yield y) {
+ std::string key = build_index(object);
- if (addr.host == "" || addr.port == 0) {
- dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
- return EDESTADDRREQ;
- }
+ if (exist_key(key, y)) {
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("DEL", key);
+ response<int> resp;
- client->connect(addr.host, addr.port, nullptr);
+ redis_exec(conn, ec, req, resp, y);
- if (!client->is_connected())
- return ECONNREFUSED;
+ if ((bool)ec)
+ return -1;
- return 0;
+ return std::get<0>(resp).value() - 1;
+ } catch(std::exception &e) {
+ return -1;
+ }
+ } else {
+ return 0; /* No delete was necessary */
+ }
}
std::string BlockDirectory::build_index(CacheBlock* block) {
- return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + boost::lexical_cast<std::string>(block->blockId);
+ return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID);
}
-int BlockDirectory::exist_key(std::string key) {
- int result = 0;
- std::vector<std::string> keys;
- keys.push_back(key);
-
- if (!client.is_connected()) {
- return result;
- }
+int BlockDirectory::exist_key(std::string key, optional_yield y) {
+ response<int> resp;
try {
- client.exists(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
+ boost::system::error_code ec;
+ request req;
+ req.push("EXISTS", key);
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if ((bool)ec)
+ return false;
} catch(std::exception &e) {}
- return result;
+ return std::get<0>(resp).value();
}
-int BlockDirectory::set_value(CacheBlock* block) {
- /* Creating the index based on objName */
- std::string result;
- std::string key = build_index(block);
- if (!client.is_connected()) {
- find_client(&client);
- }
+void BlockDirectory::shutdown()
+{
+ // call cancel() on the connection's executor
+ boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
+}
- /* Every set will be new */
- if (addr.host == "" || addr.port == 0) {
- dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
- return -2;
- }
+int BlockDirectory::set(CacheBlock* block, optional_yield y) {
+ std::string key = build_index(block);
- std::string endpoint = addr.host + ":" + std::to_string(addr.port);
- std::vector< std::pair<std::string, std::string> > list;
+ /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
+ std::string endpoint = cct->_conf->rgw_d4n_host + ":" + std::to_string(cct->_conf->rgw_d4n_port);
+ std::list<std::string> redisValues;
- /* Creating a list of the entry's properties */
- list.push_back(make_pair("key", key));
- list.push_back(make_pair("size", std::to_string(block->size)));
- list.push_back(make_pair("globalWeight", std::to_string(block->globalWeight)));
- list.push_back(make_pair("bucketName", block->cacheObj.bucketName));
- list.push_back(make_pair("objName", block->cacheObj.objName));
- list.push_back(make_pair("hosts", endpoint));
+ /* Creating a redisValues of the entry's properties */
+ redisValues.push_back("blockID");
+ redisValues.push_back(std::to_string(block->blockID));
+ redisValues.push_back("version");
+ redisValues.push_back(block->version);
+ redisValues.push_back("size");
+ redisValues.push_back(std::to_string(block->size));
+ redisValues.push_back("globalWeight");
+ redisValues.push_back(std::to_string(block->globalWeight));
+ redisValues.push_back("blockHosts");
+ redisValues.push_back(endpoint); // Set in filter -Sam
+
+ redisValues.push_back("objName");
+ redisValues.push_back(block->cacheObj.objName);
+ redisValues.push_back("bucketName");
+ redisValues.push_back(block->cacheObj.bucketName);
+ redisValues.push_back("creationTime");
+ redisValues.push_back(std::to_string(block->cacheObj.creationTime));
+ redisValues.push_back("dirty");
+ redisValues.push_back(std::to_string(block->cacheObj.dirty));
+ redisValues.push_back("objHosts");
+ redisValues.push_back(endpoint); // Set in filter -Sam
try {
- client.hmset(key, list, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push_range("HMSET", key, redisValues);
+ response<std::string> resp;
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (result != "OK") {
+ if (std::get<0>(resp).value() != "OK" || (bool)ec) {
return -1;
}
} catch(std::exception &e) {
return 0;
}
-int BlockDirectory::get_value(CacheBlock* block) {
- int keyExist = -2;
+int BlockDirectory::get(CacheBlock* block, optional_yield y) {
std::string key = build_index(block);
- if (!client.is_connected()) {
- find_client(&client);
- }
-
- if (exist_key(key)) {
- std::string hosts;
- std::string size;
- std::string bucketName;
- std::string objName;
+ if (exist_key(key, y)) {
std::vector<std::string> fields;
- fields.push_back("key");
- fields.push_back("hosts");
+ fields.push_back("blockID");
+ fields.push_back("version");
fields.push_back("size");
- fields.push_back("bucketName");
+ fields.push_back("globalWeight");
+ fields.push_back("blockHosts");
+
fields.push_back("objName");
+ fields.push_back("bucketName");
+ fields.push_back("creationTime");
+ fields.push_back("dirty");
+ fields.push_back("objHosts");
try {
- client.hmget(key, fields, [&key, &hosts, &size, &bucketName, &objName, &keyExist](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- keyExist = 0;
- key = arr[0].as_string();
- hosts = arr[1].as_string();
- size = arr[2].as_string();
- bucketName = arr[3].as_string();
- objName = arr[4].as_string();
- }
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push_range("HMGET", key, fields);
+ response< std::vector<std::string> > resp;
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (keyExist < 0 ) {
- return keyExist;
+ if (!std::get<0>(resp).value().size() || (bool)ec) {
+ return -1;
}
- /* Currently, there can only be one host */ // update -Sam
- block->size = std::stoi(size);
- block->cacheObj.bucketName = bucketName;
- block->cacheObj.objName = objName;
+ block->blockID = boost::lexical_cast<uint64_t>(std::get<0>(resp).value()[0]);
+ block->version = std::get<0>(resp).value()[1];
+ block->size = boost::lexical_cast<uint64_t>(std::get<0>(resp).value()[2]);
+ block->globalWeight = boost::lexical_cast<int>(std::get<0>(resp).value()[3]);
+
+ {
+ std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
+ block->hostsList.clear();
+
+ while (!ss.eof()) {
+ std::string host;
+ std::getline(ss, host, '_');
+ block->hostsList.push_back(host);
+ }
+ }
+
+ block->cacheObj.objName = std::get<0>(resp).value()[5];
+ block->cacheObj.bucketName = std::get<0>(resp).value()[6];
+ block->cacheObj.creationTime = boost::lexical_cast<time_t>(std::get<0>(resp).value()[7]);
+ block->cacheObj.dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
+
+ {
+ std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[9]));
+ block->cacheObj.hostsList.clear();
+
+ while (!ss.eof()) {
+ std::string host;
+ std::getline(ss, host, '_');
+ block->cacheObj.hostsList.push_back(host);
+ }
+ }
} catch(std::exception &e) {
- keyExist = -1;
+ return -1;
}
+ } else {
+ return -2;
}
- return keyExist;
+ return 0;
}
-int BlockDirectory::del_value(CacheBlock* block) {
- int result = 0;
- std::vector<std::string> keys;
+int BlockDirectory::copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) {
std::string key = build_index(block);
- keys.push_back(key);
-
- if (!client.is_connected()) {
- find_client(&client);
- }
-
- if (exist_key(key)) {
+ auto copyBlock = CacheBlock{ .cacheObj = { .objName = copyName, .bucketName = copyBucketName }, .blockID = 0 };
+ std::string copyKey = build_index(©Block);
+
+ if (exist_key(key, y)) {
try {
- client.del(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
+ response<int> resp;
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("COPY", key, copyKey);
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if ((bool)ec) {
+ return -1;
+ }
+ }
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
+ response<std::string> res;
+
+ redis_exec(conn, ec, req, res, y);
+
+ if (std::get<0>(res).value() != "OK" || (bool)ec) {
+ return -1;
}
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- return result - 1;
+ }
+
+ return std::get<0>(resp).value() - 1;
} catch(std::exception &e) {
return -1;
}
}
}
-int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value) { // represent in cache block too -Sam
- std::string result;
+int BlockDirectory::del(CacheBlock* block, optional_yield y) {
std::string key = build_index(block);
- if (!client.is_connected()) {
- find_client(&client);
- }
-
- if (exist_key(key)) {
- if (field == "hostsList") {
- /* Append rather than overwrite */
- std::string hosts;
-
- try {
- client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- hosts = reply.as_string();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
+ if (exist_key(key, y)) {
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("DEL", key);
+ response<int> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if ((bool)ec)
return -1;
- }
-
- value += "_";
- value += hosts;
- }
- /* Update cache block */ // Remove ones that aren't used -Sam
- if (field == "size")
- block->size = std::stoi(value);
- else if (field == "bucketName")
- block->cacheObj.bucketName = value;
- else if (field == "objName")
- block->cacheObj.objName = value;
- else if (field == "hostsList")
- block->hostsList.push_back(value);
+ return std::get<0>(resp).value() - 1;
+ } catch(std::exception &e) {
+ return -1;
+ }
+ } else {
+ return 0; /* No delete was necessary */
+ }
+}
- std::vector< std::pair<std::string, std::string> > list;
- list.push_back(std::make_pair(field, value));
+int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value, optional_yield y) {
+ std::string key = build_index(block);
+ if (exist_key(key, y)) {
try {
- client.hmset(key, list, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
+ /* Ensure field exists */
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("HEXISTS", key, field);
+ response<int> resp;
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (result != "OK") {
- return -1;
+ if (!std::get<0>(resp).value() || (bool)ec)
+ return -1;
+ }
+
+ if (field == "blockHosts" || field == "objHosts") {
+ /* Append rather than overwrite */
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", key, field);
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (!std::get<0>(resp).value().size() || (bool)ec)
+ return -1;
+
+ std::get<0>(resp).value() += "_";
+ std::get<0>(resp).value() += value;
+ value = std::get<0>(resp).value();
+ }
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push_range("HSET", key, std::map<std::string, std::string>{{field, value}});
+ response<int> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if ((bool)ec) {
+ return -1;
+ }
+
+ return std::get<0>(resp).value(); /* Zero fields added since it is an update of an existing field */
}
} catch(std::exception &e) {
return -1;
}
+ } else {
+ return -2;
}
-
- return 0;
}
} } // namespace rgw::d4n
#pragma once
#include "rgw_common.h"
-#include <cpp_redis/cpp_redis>
-#include <string>
-#include <iostream>
+
#include <boost/lexical_cast.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/redis/connection.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
namespace rgw { namespace d4n {
-struct Address {
- std::string host;
- int port;
-};
+namespace net = boost::asio;
+using boost::redis::config;
+using boost::redis::connection;
+using boost::redis::request;
+using boost::redis::response;
struct CacheObj {
std::string objName; /* S3 object name */
struct CacheBlock {
CacheObj cacheObj;
- uint64_t blockId; /* block ID */
+ uint64_t blockID;
+ std::string version;
uint64_t size; /* Block size in bytes */
- int globalWeight = 0;
+ int globalWeight = 0; /* LFUDA policy variable */
std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
};
class Directory {
public:
- Directory() {}
CephContext* cct;
+
+ Directory() {}
};
class ObjectDirectory: public Directory { // weave into write workflow -Sam
public:
- ObjectDirectory() {}
- ObjectDirectory(std::string host, int port) {
- addr.host = host;
- addr.port = port;
+ ObjectDirectory(net::io_context& io_context) {
+ conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
}
-
- void init(CephContext* _cct) {
- cct = _cct;
- addr.host = cct->_conf->rgw_d4n_host;
- addr.port = cct->_conf->rgw_d4n_port;
+ ~ObjectDirectory() {
+ shutdown();
}
- int find_client(cpp_redis::client* client);
- int exist_key(std::string key);
- Address get_addr() { return addr; }
+ int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+ this->cct = cct;
+
+ config cfg;
+ cfg.addr.host = cct->_conf->rgw_d4n_host; // same or different address from block directory? -Sam
+ cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
- int set_value(CacheObj* object);
- int get_value(CacheObj* object);
- int copy_value(CacheObj* object, CacheObj* copyObject);
- int del_value(CacheObj* object);
+ if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
+ ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Object directory endpoint was not configured correctly" << dendl;
+ return -EDESTADDRREQ;
+ }
+
+ conn->async_run(cfg, {}, net::consign(net::detached, conn));
+
+ return 0;
+ }
+ int exist_key(std::string key, optional_yield y);
+ void shutdown();
+
+ int set(CacheObj* object, optional_yield y);
+ int get(CacheObj* object, optional_yield y);
+ int copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y);
+ int del(CacheObj* object, optional_yield y);
private:
- cpp_redis::client client;
- Address addr;
+ std::shared_ptr<connection> conn;
+
std::string build_index(CacheObj* object);
};
class BlockDirectory: public Directory {
public:
- BlockDirectory() {}
- BlockDirectory(std::string host, int port) {
- addr.host = host;
- addr.port = port;
+ BlockDirectory(net::io_context& io_context) {
+ conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
+ }
+ ~BlockDirectory() {
+ shutdown();
}
- void init(CephContext* _cct) {
- cct = _cct;
- addr.host = cct->_conf->rgw_d4n_host;
- addr.port = cct->_conf->rgw_d4n_port;
+ int init(CephContext* cct, const DoutPrefixProvider* dpp) {
+ this->cct = cct;
+
+ config cfg;
+ cfg.addr.host = cct->_conf->rgw_d4n_host;
+ cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
+
+ if (!cfg.addr.host.length() || !cfg.addr.port.length()) { // add logs to other methods -Sam
+ ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Block directory endpoint was not configured correctly" << dendl;
+ return -EDESTADDRREQ;
+ }
+
+ conn->async_run(cfg, {}, net::consign(net::detached, conn));
+
+ return 0;
}
- int find_client(cpp_redis::client* client);
- int exist_key(std::string key);
- Address get_addr() { return addr; }
+ int exist_key(std::string key, optional_yield y);
+ void shutdown();
- int set_value(CacheBlock* block);
- int get_value(CacheBlock* block);
- int copy_value(CacheBlock* block, CacheBlock* copyBlock);
- int del_value(CacheBlock* block);
-
- int update_field(CacheBlock* block, std::string field, std::string value);
+ int set(CacheBlock* block, optional_yield y);
+ int get(CacheBlock* block, optional_yield y);
+ int copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y);
+ int del(CacheBlock* block, optional_yield y);
+ int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y);
private:
- cpp_redis::client client;
- Address addr;
+ std::shared_ptr<connection> conn;
+
std::string build_index(CacheBlock* block);
};
#include "../../../common/async/yield_context.h"
#include "d4n_policy.h"
-
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
+#include "common/async/blocked_completion.h"
namespace rgw { namespace d4n {
-int CachePolicy::find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) {
- if (client->is_connected())
- return 0;
-
- if (get_addr().host == "" || get_addr().port == 0) {
- ldpp_dout(dpp, 10) << "RGW D4N Policy: D4N policy endpoint was not configured correctly" << dendl;
- return EDESTADDRREQ;
- }
+// initiate a call to async_exec() on the connection's executor
+struct initiate_exec {
+ std::shared_ptr<boost::redis::connection> conn;
+ boost::redis::request req;
- client->connect(get_addr().host, get_addr().port, nullptr);
+ using executor_type = boost::redis::connection::executor_type;
+ executor_type get_executor() const noexcept { return conn->get_executor(); }
- if (!client->is_connected())
- return ECONNREFUSED;
+ template <typename Handler, typename Response>
+ void operator()(Handler handler, Response& resp)
+ {
+ conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn));
+ }
+};
- return 0;
+template <typename Response, typename CompletionToken>
+auto async_exec(std::shared_ptr<connection> conn,
+ const boost::redis::request& req,
+ Response& resp, CompletionToken&& token)
+{
+ return boost::asio::async_initiate<CompletionToken,
+ void(boost::system::error_code, std::size_t)>(
+ initiate_exec{std::move(conn), req}, token, resp);
}
-int CachePolicy::exist_key(std::string key) {
- int result = -1;
- std::vector<std::string> keys;
- keys.push_back(key);
-
- if (!client.is_connected()) {
- return result;
+template <typename T>
+void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response<T>& resp, optional_yield y)
+{
+ if (y) {
+ auto yield = y.get_yield_context();
+ async_exec(std::move(conn), req, resp, yield[ec]);
+ } else {
+ async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
}
+}
+int LFUDAPolicy::set_age(int age, optional_yield y) {
try {
- client.exists(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {}
+ boost::system::error_code ec;
+ response<int> resp;
+ request req;
+ req.push("HSET", "lfuda", "age", std::to_string(age));
- return result;
-}
+ redis_exec(conn, ec, req, resp, y);
-int LFUDAPolicy::set_age(int age) {
- int result = 0;
+ if (ec)
+ return {};
- try {
- client.hset("lfuda", "age", std::to_string(age), [&result](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit();
+ return std::get<0>(resp).value(); /* Returns number of fields set */
} catch(std::exception &e) {
return -1;
}
-
- return result - 1;
}
-int LFUDAPolicy::get_age() {
- int ret = 0;
- int age = -1;
+int LFUDAPolicy::get_age(optional_yield y) {
+ response<int> resp;
try {
- client.hexists("lfuda", "age", [&ret](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- ret = reply.as_integer();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push("HEXISTS", "lfuda", "age");
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return -1;
} catch(std::exception &e) {
return -1;
}
- if (!ret) {
- ret = set_age(0); /* Initialize age */
-
- if (!ret) {
- return 0; /* Success */
- } else {
+ if (!std::get<0>(resp).value()) {
+ if (!set_age(0, y)) /* Initialize age */
+ return 0;
+ else
return -1;
- };
}
- try {
- client.hget("lfuda", "age", [&age](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- age = std::stoi(reply.as_string());
- }
- });
+ try {
+ boost::system::error_code ec;
+ response<std::string> value;
+ request req;
+ req.push("HGET", "lfuda", "age");
+
+ redis_exec(conn, ec, req, value, y);
+
+ if (ec)
+ return -1;
- client.sync_commit(std::chrono::milliseconds(1000));
+ return std::stoi(std::get<0>(value).value());
} catch(std::exception &e) {
return -1;
}
-
- return age;
}
-int LFUDAPolicy::set_global_weight(std::string key, int weight) {
- int result = 0;
-
+int LFUDAPolicy::set_global_weight(std::string key, int weight, optional_yield y) {
try {
- client.hset(key, "globalWeight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- result = reply.as_integer();
- }
- });
+ boost::system::error_code ec;
+ response<int> resp;
+ request req;
+ req.push("HSET", key, "globalWeight", std::to_string(weight));
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return {};
- client.sync_commit();
+ return std::get<0>(resp).value(); /* Returns number of fields set */
} catch(std::exception &e) {
return -1;
}
-
- return result - 1;
}
-int LFUDAPolicy::get_global_weight(std::string key) {
- int weight = -1;
+int LFUDAPolicy::get_global_weight(std::string key, optional_yield y) {
+ try {
+ boost::system::error_code ec;
+ response<std::string> resp;
+ request req;
+ req.push("HGET", key, "globalWeight");
+
+ redis_exec(conn, ec, req, resp, y);
- try {
- client.hget(key, "globalWeight", [&weight](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- weight = reply.as_integer();
- }
- });
+ if (ec)
+ return -1;
- client.sync_commit(std::chrono::milliseconds(1000));
+ return std::stoi(std::get<0>(resp).value());
} catch(std::exception &e) {
return -1;
}
-
- return weight;
}
-int LFUDAPolicy::set_min_avg_weight(int weight, std::string cacheLocation) {
- int result = 0;
-
+int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y) {
try {
- client.hset("lfuda", "minAvgWeight:cache", cacheLocation, [&result](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- result = reply.as_integer();
- }
- });
+ boost::system::error_code ec;
+ response<int> resp;
+ request req;
+ req.push("HSET", "lfuda", "minAvgWeight:cache", cacheLocation);
- client.sync_commit();
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return {};
} catch(std::exception &e) {
return -1;
}
+
+ try {
+ boost::system::error_code ec;
+ response<int> resp;
+ request req;
+ req.push("HSET", "lfuda", "minAvgWeight:weight", cacheLocation);
- if (result == 1) {
- result = 0;
- try {
- client.hset("lfuda", "minAvgWeight:weight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- result = reply.as_integer();
- }
- });
+ redis_exec(conn, ec, req, resp, y);
- client.sync_commit();
- } catch(std::exception &e) {
- return -1;
- }
- }
+ if (ec)
+ return {};
- return result - 1;
+ return std::get<0>(resp).value(); /* Returns number of fields set */
+ } catch(std::exception &e) {
+ return -1;
+ }
}
-int LFUDAPolicy::get_min_avg_weight() {
- int ret = 0;
- int weight = -1;
+int LFUDAPolicy::get_min_avg_weight(optional_yield y) {
+ response<int> resp;
try {
- client.hexists("lfuda", "minAvgWeight:cache", [&ret](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- ret = reply.as_integer();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push("HEXISTS", "lfuda", "minAvgWeight:cache");
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return -1;
} catch(std::exception &e) {
return -1;
}
- if (!ret) {
- ret = set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */); /* Initialize minimum average weight */
-
- if (!ret) {
- return INT_MAX; /* Success */
+ if (!std::get<0>(resp).value()) {
+ if (!set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */, y)) { /* Initialize minimum average weight */
+ return INT_MAX;
} else {
return -1;
- };
+ }
}
- try {
- client.hget("lfuda", "minAvgWeight:weight", [&weight](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- weight = std::stoi(reply.as_string());
- }
- });
+ try {
+ boost::system::error_code ec;
+ response<std::string> value;
+ request req;
+ req.push("HGET", "lfuda", "minAvgWeight:weight");
+
+ redis_exec(conn, ec, req, value, y);
+
+ if (ec)
+ return -1;
- client.sync_commit(std::chrono::milliseconds(1000));
+ return std::stoi(std::get<0>(value).value());
} catch(std::exception &e) {
return -1;
}
-
- return weight;
}
-CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
+CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
#if 0
std::vector<rgw::cache::Entry> entries = cacheNode->list_entries(dpp);
std::string victimName;
int minWeight = INT_MAX;
for (auto it = entries.begin(); it != entries.end(); ++it) {
- optional_yield y = null_yield;
std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight", y); // should represent block -Sam
- if (!std::stoi(localWeightStr)) { // maybe do this in some sort of initialization procedure instead of here? -Sam
- /* Local weight hasn't been set */
- int ret = cacheNode->set_attr(dpp, it->key, "localWeight", std::to_string(get_age()));
+ /* Get victim cache block */
+ CacheBlock victim;
+ victim.cacheObj.objName = it->second->key;
+ victim.cacheObj.bucketName = cacheNode->get_attr(dpp, victim.cacheObj.objName, "bucket_name", y); // generalize for other cache backends -Sam
+ victim.blockID = 0; // find way to get ID -Sam
if (ret < 0)
return {};
/* Get victim cache block */
CacheBlock victimBlock;
victimBlock.cacheObj.objName = victimName;
- BlockDirectory blockDir;
- blockDir.init(cct);
+ BlockDirectory blockDir(io);
+ blockDir.init(cct, dpp);
- int ret = blockDir.get_value(&victimBlock);
+ int ret = blockDir.get(&victimBlock, y);
if (ret < 0)
return {};
return victimBlock;
}
-int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) {
+int LFUDAPolicy::exist_key(std::string key, optional_yield y) {
+ response<int> resp;
+
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("EXISTS", key);
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return false;
+ } catch(std::exception &e) {}
+
+ return std::get<0>(resp).value();
+}
+
+int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
std::string key = "rgw-object:" + block->cacheObj.objName + ":directory";
- optional_yield y = null_yield;
std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight", y); // change to block name eventually -Sam
int localWeight = -1;
-
- if (!client.is_connected())
- find_client(dpp, &client);
+ response<std::string> resp;
if (localWeightStr.empty()) { // figure out where to set local weight -Sam
- optional_yield y = null_yield;
- int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age()), y);
- localWeight = get_age();
+ int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age(y)), y);
+ localWeight = get_age(y);
if (ret < 0)
return -1;
localWeight = std::stoi(localWeightStr);
}
- int age = get_age();
+ int age = get_age(y);
- bool key_exists = true; //cacheNode->key_exists(dpp, block->cacheObj.objName) //TODO- correct this
- if (key_exists) { /* Local copy */
+ if (exist_key(key, y)) { /* Local copy */
localWeight += age;
} else {
- std::string hosts;
uint64_t freeSpace = cacheNode->get_free_space(dpp);
while (freeSpace < block->size) /* Not enough space in local cache */
- freeSpace += eviction(dpp, cacheNode);
+ freeSpace += eviction(dpp, cacheNode, y);
- if (exist_key(key)) {
+ if (exist_key(key, y)) { /* Remote copy */
try {
- client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- hosts = reply.as_string();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", key, "blockHosts");
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return -1;
} catch(std::exception &e) {
return -1;
}
}
// should not hold local cache IP if in this else statement -Sam
- if (hosts.length() > 0) { /* Remote copy */
- int globalWeight = get_global_weight(key);
+ if (std::get<0>(resp).value().length() > 0) { /* Remote copy */
+ int globalWeight = get_global_weight(key, y);
globalWeight += age;
- if (set_global_weight(key, globalWeight))
+ if (set_global_weight(key, globalWeight, y))
return -1;
} else { /* No remote copy */
// do I need to add the block to the local cache here? -Sam
return 0;
}
-uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
- CacheBlock victim = find_victim(dpp, cacheNode);
+uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
+ CacheBlock victim = find_victim(dpp, cacheNode, y);
if (victim.cacheObj.objName.empty()) {
ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
}
std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory";
- std::string hosts;
- int globalWeight = get_global_weight(key);
- optional_yield y = null_yield;
+ int globalWeight = get_global_weight(key, y);
int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight", y)); // change to block name eventually -Sam
- int avgWeight = get_min_avg_weight();
+ int avgWeight = get_min_avg_weight(y);
+ response<std::string> resp;
- if (exist_key(key)) {
+ if (exist_key(key, y)) {
try {
- client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- hosts = reply.as_string();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", key, "blockHosts");
+
+ redis_exec(conn, ec, req, resp, y);
- client.sync_commit(std::chrono::milliseconds(1000));
+ if (ec)
+ return -1;
} catch(std::exception &e) {
return -1;
}
return -2;
}
- if (hosts.empty()) { /* Last copy */
+ if (std::get<0>(resp).value().empty()) { /* Last copy */
if (globalWeight > 0) {
localWeight += globalWeight;
- optional_yield y = null_yield;
int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight), y);
if (!ret)
- ret = set_global_weight(key, 0);
+ ret = set_global_weight(key, 0, y);
else
return -1;
globalWeight += localWeight;
- if (set_global_weight(key, globalWeight))
+ if (set_global_weight(key, globalWeight, y))
return -2;
ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
- int ret = cacheNode->delete_data(dpp, victim.cacheObj.objName, y);
+ int ret = cacheNode->del(dpp, victim.cacheObj.objName, y);
if (!ret) {
- uint64_t num_entries = 100; //cacheNode->get_num_entries(dpp) TODO - correct this
- ret = set_min_avg_weight(avgWeight - (localWeight/num_entries), ""/*local cache location*/); // Where else must this be set? -Sam
+ //ret = set_min_avg_weight(avgWeight - (localWeight/entries_map.size()), ""/*local cache location*/, y); // Where else must this be set? -Sam
if (!ret) {
- int age = get_age();
+ int age = get_age(y);
age = std::max(localWeight, age);
- ret = set_age(age);
+ ret = set_age(age, y);
if (ret)
return -1;
return victim.size;
}
-int LRUPolicy::exist_key(std::string key)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
+{
+
+}
+
+bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
+{
+ return false;
+}
+
+void LFUDAPolicy::shutdown()
+{
+ // call cancel() on the connection's executor
+ boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
+}
+
+int LRUPolicy::exist_key(std::string key, optional_yield y)
{
const std::lock_guard l(lru_lock);
if (entries_map.count(key) != 0) {
return false;
}
-int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode)
+int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y)
{
- //Does not apply to LRU
+ uint64_t freeSpace = cacheNode->get_free_space(dpp);
+ while(freeSpace < block->size) {
+ freeSpace = eviction(dpp, cacheNode, y);
+ }
return 0;
}
-uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode)
+uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y)
{
const std::lock_guard l(lru_lock);
auto p = entries_lru_list.front();
return cacheNode->get_free_space(dpp);
}
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y)
{
erase(dpp, key);
- const std::lock_guard l(lru_lock);
- Entry *e = new Entry(key, offset, len);
+
+ Entry *e = new Entry(key, offset, len, ""); // update version later -Sam
entries_lru_list.push_back(*e);
entries_map.emplace(key, e);
}
int PolicyDriver::init() {
if (policyName == "lfuda") {
- cachePolicy = new LFUDAPolicy();
+ cachePolicy = new LFUDAPolicy(io);
return 0;
} else if (policyName == "lru") {
cachePolicy = new LRUPolicy();
#include <string>
#include <iostream>
-#include <cpp_redis/cpp_redis>
#include "rgw_common.h"
#include "d4n_directory.h"
#include "../../rgw_redis_driver.h"
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
+
namespace rgw { namespace d4n {
class CachePolicy {
- private:
- cpp_redis::client client;
- Address addr;
+ protected:
+ struct Entry : public boost::intrusive::list_base_hook<> {
+ std::string key;
+ uint64_t offset;
+ uint64_t len;
+ std::string version;
+ Entry(std::string& key, uint64_t offset, uint64_t len, std:: string version) : key(key), offset(offset),
+ len(len), version(version) {}
+ };
+
+ //The disposer object function
+ struct Entry_delete_disposer {
+ void operator()(Entry *e) {
+ delete e;
+ }
+ };
+ typedef boost::intrusive::list<Entry> List;
+
+ //cpp_redis::client client;
+ //Address addr;
public:
CephContext* cct;
- CachePolicy() : addr() {}
- virtual ~CachePolicy() = default;
+ CachePolicy() {}
+ virtual ~CachePolicy() = default;
- virtual void init(CephContext *_cct) {
- cct = _cct;
- addr.host = cct->_conf->rgw_d4n_host;
- addr.port = cct->_conf->rgw_d4n_port;
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) {
+ this->cct = cct;
+ return 0;
}
- virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) = 0;
- virtual int exist_key(std::string key) = 0;
- virtual Address get_addr() { return addr; }
- virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0;
- virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) = 0;
+ virtual int exist_key(std::string key, optional_yield y) = 0;
+ virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+ virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
+ virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) = 0;
+ virtual void shutdown() = 0;
};
class LFUDAPolicy : public CachePolicy {
private:
- cpp_redis::client client;
+ net::io_context& io;
+ std::shared_ptr<connection> conn;
public:
- LFUDAPolicy() : CachePolicy() {}
-
- int set_age(int age);
- int get_age();
- int set_global_weight(std::string key, int weight);
- int get_global_weight(std::string key);
- int set_min_avg_weight(int weight, std::string cacheLocation);
- int get_min_avg_weight();
- CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode);
-
- virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return CachePolicy::find_client(dpp, client); }
- virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); }
- virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
- virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override {}
+ LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) {
+ conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
+ }
+ ~LFUDAPolicy() {
+ //delete dir;
+ shutdown();
+ }
+
+ int set_age(int age, optional_yield y);
+ int get_age(optional_yield y);
+ int set_global_weight(std::string key, int weight, optional_yield y);
+ int get_global_weight(std::string key, optional_yield y);
+ int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y);
+ int get_min_avg_weight(optional_yield y);
+ CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y);
+
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) {
+ this->cct = cct;
+
+ config cfg;
+ cfg.addr.host = cct->_conf->rgw_d4n_host; // TODO: Replace with cache address
+ cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
+
+ if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
+ ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
+ return -EDESTADDRREQ;
+ }
+
+ conn->async_run(cfg, {}, net::consign(net::detached, conn));
+
+ return 0;
+ }
+ virtual int exist_key(std::string key, optional_yield y) override;
+ virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
+ virtual void shutdown() override;
};
class LRUPolicy : public CachePolicy {
- public:
- struct Entry : public boost::intrusive::list_base_hook<> {
- std::string key;
- uint64_t offset;
- uint64_t len;
- Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {}
- };
- LRUPolicy() = default;
private:
- std::mutex lru_lock;
- //The disposer object function
- struct Entry_delete_disposer {
- void operator()(Entry *e) {
- delete e;
- }
- };
- typedef boost::intrusive::list<Entry> List;
List entries_lru_list;
std::unordered_map<std::string, Entry*> entries_map;
+ std::mutex lru_lock;
+
public:
- virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return 0; };
- virtual int exist_key(std::string key) override;
- virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
- virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override;
- bool erase(const DoutPrefixProvider* dpp, const std::string& key);
+ LRUPolicy() = default;
+
+ virtual int exist_key(std::string key, optional_yield y) override;
+ virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
+ virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override;
+ virtual void shutdown() override {}
};
class PolicyDriver {
private:
+ net::io_context& io;
std::string policyName;
-
- public:
CachePolicy* cachePolicy;
- PolicyDriver(std::string _policyName) : policyName(_policyName) {}
+ public:
+ PolicyDriver(net::io_context& io_context, std::string _policyName) : io(io_context), policyName(_policyName) {}
~PolicyDriver() {
delete cachePolicy;
}
int init();
+ CachePolicy* get_cache_policy() { return cachePolicy; }
};
} } // namespace rgw::d4n
return dynamic_cast<FilterObject*>(t)->get_next();
}
-D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next)
+D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next)
{
+
rgw::cache::Partition partition_info;
partition_info.location = g_conf()->rgw_d4n_l1_datacache_persistent_path;
partition_info.name = "d4n";
partition_info.type = "read-cache";
partition_info.size = g_conf()->rgw_d4n_l1_datacache_size;
+ //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
cacheDriver = new rgw::cache::SSDDriver(partition_info);
- objDir = new rgw::d4n::ObjectDirectory();
- blockDir = new rgw::d4n::BlockDirectory();
+ objDir = new rgw::d4n::ObjectDirectory(io_context);
+ blockDir = new rgw::d4n::BlockDirectory(io_context);
cacheBlock = new rgw::d4n::CacheBlock();
- policyDriver = new rgw::d4n::PolicyDriver("lfuda");
- lruPolicyDriver = new rgw::d4n::PolicyDriver("lru");
+ policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda");
+ lruPolicyDriver = new rgw::d4n::PolicyDriver(io_context, "lru");
}
D4NFilterDriver::~D4NFilterDriver()
cacheDriver->initialize(cct, dpp);
- objDir->init(cct);
- blockDir->init(cct);
+ objDir->init(cct, dpp);
+ blockDir->init(cct, dpp);
- policyDriver->init();
- policyDriver->cachePolicy->init(cct);
+ policyDriver->init();
+ policyDriver->get_cache_policy()->init(cct, dpp);
lruPolicyDriver->init();
- lruPolicyDriver->cachePolicy->init(cct);
+ lruPolicyDriver->get_cache_policy()->init(cct, dpp);
return 0;
}
const DoutPrefixProvider* dpp,
optional_yield y)
{
- /* Build cache block copy */
- rgw::d4n::CacheBlock* copyCacheBlock = new rgw::d4n::CacheBlock(); // How will this copy work in lfuda? -Sam
-
- copyCacheBlock->hostsList.push_back(driver->get_cache_block()->hostsList[0]);
- copyCacheBlock->size = driver->get_cache_block()->size;
- copyCacheBlock->size = driver->get_cache_block()->globalWeight; // Do we want to reset the global weight? -Sam
- copyCacheBlock->cacheObj.bucketName = dest_bucket->get_name();
- copyCacheBlock->cacheObj.objName = dest_object->get_key().get_oid();
-
- int copy_valueReturn = driver->get_block_dir()->set_value(copyCacheBlock);
+ rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+ .cacheObj = {
+ .objName = this->get_key().get_oid(),
+ .bucketName = src_bucket->get_name()
+ },
+ .blockID = 0, // TODO: get correct blockID
+ };
+ int copy_valueReturn = driver->get_block_dir()->copy(&block, dest_object->get_name(), dest_bucket->get_name(), y);
if (copy_valueReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation failed." << dendl;
ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation succeeded." << dendl;
}
- delete copyCacheBlock;
-
/* Append additional metadata to attributes */
rgw::sal::Attrs baseAttrs = this->get_attrs();
buffer::list bl;
return next->get_obj_attrs(y, dpp, target_obj);
} else {
+ /* Set metadata locally */
+ RGWQuotaInfo quota_info;
+ RGWObjState* astate;
+ std::unique_ptr<rgw::sal::User> user = this->driver->get_user(this->get_bucket()->get_owner());
+ this->get_obj_state(dpp, &astate, y);
+
+ for (auto it = attrs.begin(); it != attrs.end(); ++it) {
+ if (it->second.length() > 0) { // or return? -Sam
+ if (it->first == "mtime") {
+ parse_time(it->second.c_str(), &astate->mtime);
+ attrs.erase(it->first);
+ } else if (it->first == "object_size") {
+ this->set_obj_size(std::stoull(it->second.c_str()));
+ attrs.erase(it->first);
+ } else if (it->first == "accounted_size") {
+ astate->accounted_size = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "epoch") {
+ astate->epoch = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "version_id") {
+ this->set_instance(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "this_zone_short_id") {
+ astate->zone_short_id = static_cast<uint32_t>(std::stoul(it->second.c_str()));
+ attrs.erase(it->first);
+ } else if (it->first == "user_quota.max_size") {
+ quota_info.max_size = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "user_quota.max_objects") {
+ quota_info.max_objects = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "max_buckets") {
+ user->set_max_buckets(std::stoull(it->second.c_str()));
+ attrs.erase(it->first);
+ }
+ }
+ }
+
+ user->set_info(quota_info);
+ this->set_obj_state(*astate);
+
+ /* Set attributes locally */
int set_attrsReturn = this->set_attrs(attrs);
if (set_attrsReturn < 0) {
owner, ptail_placement_rule,
olh_epoch, unique_tag);
- return std::make_unique<D4NFilterWriter>(std::move(writer), this, obj, dpp, true);
+ return std::make_unique<D4NFilterWriter>(std::move(writer), this, obj, dpp, true, y);
}
std::unique_ptr<Object::ReadOp> D4NFilterObject::get_read_op()
this->client_cb = cb;
this->cb->set_client_cb(cb, dpp); // what's this for? -Sam
+ // save y here -Sam
/* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if (source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) {
+ if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, source->driver->get_cache_driver());
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", source->driver->get_cache_driver(), y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) {
+ if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, source->driver->get_cache_driver());
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", source->driver->get_cache_driver(), y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
return r;
}
- return this->cb->flush_last_part();
+ return this->cb->flush_last_part(y);
/*
/ Execute cache replacement policy /
- int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(),
- source->driver->get_cache_driver());
+ int policyRet = source->driver->get_policy_driver()->get_cache_policy()->get_block(dpp, source->driver->get_cache_block(),
+ source->driver->get_cache_driver(), y);
if (policyRet < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation failed." << dendl;
cb->handle_data(bl, ofs, len);
} else {
/ Block directory check /
- int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block());
+ int getDirReturn = source->driver->get_block_dir()->get(source->driver->get_cache_block());
if (getDirReturn >= -1) {
if (getDirReturn == -1) {
} else {
/ Write tier retrieval /
ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
- getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj));
+ getDirReturn = source->driver->get_obj_dir()->get(&(source->driver->get_cache_block()->cacheObj));
if (getDirReturn >= -1) {
if (getDirReturn == -1) {
source->driver->get_cache_block()->cacheObj.bucketName = source->get_bucket()->get_name();
source->driver->get_cache_block()->cacheObj.objName = source->get_key().get_oid();
- int setDirReturn = tempBlockDir->set_value(source->driver->get_cache_block());
+ int setDirReturn = tempBlockDir->set(source->driver->get_cache_block());
if (setDirReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
return next->iterate(dpp, ofs, end, cb, y); */
}
-int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(optional_yield y)
{
+ save_y = &y;
last_part = true;
return handle_data(bl_rem, 0, bl_rem.length());
}
const std::lock_guard l(d4n_get_data_lock);
rgw::d4n::CacheBlock block;
rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
- block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port));
+ //block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port));
block.cacheObj.bucketName = source->get_bucket()->get_name();
block.cacheObj.objName = source->get_key().get_oid();
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ block.blockID = 0; // TODO: fill out block correctly
+ block.version = "";
block.size = bl.length();
- block.blockId = ofs;
+ block.blockID = ofs;
uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
while(freeSpace < block.size) {
- freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver());
+ freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
}
if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
- filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
/* Store block in directory */
- if (!blockDir->exist_key(oid)) {
+ if (!blockDir->exist_key(oid, null_yield)) {
+ #if 0
int ret = blockDir->set_value(&block);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
} else {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
}
+ #endif
}
}
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
ofs += bl_len;
- block.blockId = ofs;
+ block.blockID = ofs;
+ block.version = "";
block.size = bl.length();
uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
while(freeSpace < block.size) {
- freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver());
+ freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
}
if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
- filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield);
/* Store block in directory */
- if (!blockDir->exist_key(oid)) {
+ if (!blockDir->exist_key(oid, null_yield)) {
+ #if 0
int ret = blockDir->set_value(&block);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
} else {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
}
+ #endif
}
}
} else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
if (bl_rem.length() == rgw_get_obj_max_req_size) {
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
ofs += bl_rem.length();
- block.blockId = ofs;
+ block.blockID = ofs;
block.size = bl_rem.length();
uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp);
while(freeSpace < block.size) {
- freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver());
+ freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield);
}
if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
- filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl_rem.length(), filter->get_cache_driver());
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", filter->get_cache_driver(), null_yield);
/* Store block in directory */
- if (!blockDir->exist_key(oid)) {
+ if (!blockDir->exist_key(oid, null_yield)) {
+ #if 0
int ret = blockDir->set_value(&block);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
} else {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
}
+ #endif
}
}
int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
optional_yield y, uint32_t flags)
{
- int delDirReturn = source->driver->get_block_dir()->del_value(source->driver->get_cache_block());
+ rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+ .cacheObj = {
+ .objName = source->get_key().get_oid(),
+ .bucketName = source->get_bucket()->get_name()
+ },
+ .blockID = 0 // TODO: get correct blockID
+ };
+ int delDirReturn = source->driver->get_block_dir()->del(&block, y);
if (delDirReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation failed." << dendl;
currentFields.push_back(attrs->first);
}
- int delObjReturn = source->driver->get_cache_driver()->delete_data(dpp, source->get_key().get_oid(), y);
+ int delObjReturn = source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y);
if (delObjReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object operation failed." << dendl;
} else {
- Attrs delattrs = source->get_attrs();
- delObjReturn = source->driver->get_cache_driver()->delete_attrs(dpp, source->get_key().get_oid(), delattrs, y);
ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation succeeded." << dendl;
}
int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
{
/*
- int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(), data);
+ int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(),
+ data, y);
if (append_dataReturn < 0) {
ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl;
} else {
ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl;
- }
-*/
+ }*/
+
return next->process(std::move(data), offset);
}
const req_context& rctx,
uint32_t flags)
{
- rgw::d4n::BlockDirectory* tempBlockDir = driver->get_block_dir();
-
- driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port));
- driver->get_cache_block()->size = accounted_size;
- driver->get_cache_block()->cacheObj.bucketName = obj->get_bucket()->get_name();
- driver->get_cache_block()->cacheObj.objName = obj->get_key().get_oid();
-
- int setDirReturn = tempBlockDir->set_value(driver->get_cache_block());
+ rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
+ .cacheObj = {
+ .objName = obj->get_key().get_oid(),
+ .bucketName = obj->get_bucket()->get_name(),
+ .creationTime = 0, // TODO: get correct value
+ .dirty = false,
+ .hostsList = { "127.0.0.1:6379" /*current cache addr*/ } // TODO: fix
+ },
+ .blockID = 0, // TODO: get correct version/blockID
+ .version = "",
+ .size = accounted_size,
+ .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_host + ":" +
+ std::to_string(driver->get_block_dir()->cct->_conf->rgw_d4n_port) }
+ };
+
+ int setDirReturn = driver->get_block_dir()->set(&block, y);
if (setDirReturn < 0) {
ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
baseAttrs.insert(attrs.begin(), attrs.end());
-/* int set_attrsReturn = driver->get_cache_driver()->set_attrs(save_dpp, obj->get_key().get_oid(), baseAttrs);
-
- if (set_attrsReturn < 0) {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation failed." << dendl;
+ // is the accounted_size equivalent to the length? -Sam
+
+ //bufferlist bl_empty;
+ //int putReturn = driver->get_cache_driver()->
+ // put(save_dpp, obj->get_key().get_oid(), bl_empty, accounted_size, baseAttrs, y); /* Data already written during process call */
+ /*
+ if (putReturn < 0) {
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation failed." << dendl;
} else {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation succeeded." << dendl;
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation succeeded." << dendl;
}
*/
return ret;
extern "C" {
-rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next)
+rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next, void* io_context)
{
- rgw::sal::D4NFilterDriver* driver = new rgw::sal::D4NFilterDriver(next);
+ rgw::sal::D4NFilterDriver* driver = new rgw::sal::D4NFilterDriver(next, *static_cast<boost::asio::io_context*>(io_context));
return driver;
}
#include "driver/d4n/d4n_policy.h"
#include <boost/intrusive/list.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/redis/connection.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
namespace rgw { namespace sal {
rgw::d4n::PolicyDriver* lruPolicyDriver;
public:
- D4NFilterDriver(Driver* _next);
-
+ D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context);
virtual ~D4NFilterDriver();
virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override;
oid(_oid),
source(_source) {}
+ optional_yield* save_y;
+
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp) { this->client_cb = client_cb; this->dpp = dpp;}
void set_ofs(uint64_t ofs) { this->ofs = ofs; }
- int flush_last_part();
+ int flush_last_part(optional_yield y);
void bypass_cache_write() { this->write_to_cache = false; }
};
D4NFilterDriver* driver;
const DoutPrefixProvider* save_dpp;
bool atomic;
+ optional_yield y;
public:
D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj,
- const DoutPrefixProvider* _dpp) : FilterWriter(std::move(_next), _obj),
- driver(_driver),
- save_dpp(_dpp), atomic(false) {}
+ const DoutPrefixProvider* _dpp, optional_yield _y) : FilterWriter(std::move(_next), _obj),
+ driver(_driver),
+ save_dpp(_dpp), atomic(false), y(_y) {}
D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj,
- const DoutPrefixProvider* _dpp, bool _atomic) : FilterWriter(std::move(_next), _obj),
- driver(_driver),
- save_dpp(_dpp), atomic(_atomic) {}
+ const DoutPrefixProvider* _dpp, bool _atomic, optional_yield _y) : FilterWriter(std::move(_next), _obj),
+ driver(_driver),
+ save_dpp(_dpp), atomic(_atomic), y(_y) {}
virtual ~D4NFilterWriter() = default;
virtual int prepare(optional_yield y);
virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) = 0;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) = 0;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) = 0;
+ virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) = 0;
#include <boost/algorithm/string.hpp>
-#include "rgw_redis_driver.h"
-//#include "rgw_ssd_driver.h" // fix -Sam
-//#include <aedis/src.hpp>
+#include <boost/redis/src.hpp>
+#include <boost/asio/detached.hpp>
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
+#include "rgw_redis_driver.h"
+#include "common/async/blocked_completion.h"
namespace rgw { namespace cache {
std::unordered_map<std::string, Partition> RedisDriver::partitions;
-std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary)
+std::list<std::string> build_attrs(rgw::sal::Attrs* binary)
{
- std::vector< std::pair<std::string, std::string> > values;
+ std::list<std::string> values;
rgw::sal::Attrs::iterator attrs;
/* Convert to vector */
if (binary != NULL) {
for (attrs = binary->begin(); attrs != binary->end(); ++attrs) {
- values.push_back(std::make_pair(attrs->first, attrs->second.to_str()));
+ values.push_back(attrs->first);
+ values.push_back(attrs->second.to_str());
}
}
return values;
}
-int RedisDriver::find_client(const DoutPrefixProvider* dpp)
+// initiate a call to async_exec() on the connection's executor
+struct initiate_exec {
+ std::shared_ptr<boost::redis::connection> conn;
+ boost::redis::request req;
+
+ using executor_type = boost::redis::connection::executor_type;
+ executor_type get_executor() const noexcept { return conn->get_executor(); }
+
+ template <typename Handler, typename Response>
+ void operator()(Handler handler, Response& resp)
+ {
+ conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn));
+ }
+};
+
+template <typename Response, typename CompletionToken>
+auto async_exec(std::shared_ptr<connection> conn,
+ const boost::redis::request& req,
+ Response& resp, CompletionToken&& token)
{
- if (client.is_connected())
- return 0;
+ return boost::asio::async_initiate<CompletionToken,
+ void(boost::system::error_code, std::size_t)>(
+ initiate_exec{std::move(conn), req}, token, resp);
+}
- if (addr.host == "" || addr.port == 0) {
- ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
- return EDESTADDRREQ;
+template <typename T>
+void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response<T>& resp, optional_yield y)
+{
+ if (y) {
+ auto yield = y.get_yield_context();
+ async_exec(std::move(conn), req, resp, yield[ec]);
+ } else {
+ async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
}
-
- client.connect(addr.host, addr.port, nullptr);
-
- if (!client.is_connected())
- return ECONNREFUSED;
-
- return 0;
}
int RedisDriver::add_partition_info(Partition& info)
{
this->cct = cct;
- addr.host = cct->_conf->rgw_d4n_host; // change later -Sam
- addr.port = cct->_conf->rgw_d4n_port;
-
if (partition_info.location.back() != '/') {
partition_info.location += "/";
}
- if (addr.host == "" || addr.port == 0) {
+ config cfg;
+ cfg.addr.host = cct->_conf->rgw_d4n_host; // TODO: Replace with cache address
+ cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port);
+
+ if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
return -EDESTADDRREQ;
}
- client.connect("127.0.0.1", 6379, nullptr);
-
- if (!client.is_connected()) {
- ldpp_dout(dpp, 10) << "RGW Redis Cache: Could not connect to redis cache endpoint." << dendl;
- return ECONNREFUSED;
- }
+ conn->async_run(cfg, {}, net::consign(net::detached, conn));
return 0;
}
{
std::string entry = partition_info.location + key;
- if (!client.is_connected())
- find_client(dpp);
-
/* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
try {
- std::string result;
+ boost::system::error_code ec;
+ response<std::string> resp;
auto redisAttrs = build_attrs(&attrs);
- redisAttrs.push_back({"data", bl.to_str()});
- client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
+ if (bl.length()) {
+ redisAttrs.push_back("data");
+ redisAttrs.push_back(bl.to_str());
+ }
- client.sync_commit(std::chrono::milliseconds(1000));
+ request req;
+ req.push_range("HMSET", entry, redisAttrs);
+
+ redis_exec(conn, ec, req, resp, y);
- if (result != "OK") {
+ if (std::get<0>(resp).value() != "OK" || ec) {
return -1;
}
} catch(std::exception &e) {
{
std::string entry = partition_info.location + key;
- if (!client.is_connected())
- find_client(dpp);
-
- /* Retrieve existing values from cache */
+ /* Retrieve existing values from cache */
try {
- client.hgetall(entry, [&bl, &attrs](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
- if (arr[i].as_string() == "data") {
- bl.append(arr[i + 1].as_string());
- } else {
- buffer::list bl_value;
- bl_value.append(arr[i + 1].as_string());
- attrs.insert({arr[i].as_string(), bl_value});
- bl_value.clear();
- }
- }
- }
- }
- });
+ boost::system::error_code ec;
+ response< std::map<std::string, std::string> > resp;
+ request req;
+ req.push("HGETALL", entry);
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
return -1;
- }
+ for (auto const& it : std::get<0>(resp).value()) {
+ if (it.first == "data") {
+ bl.append(it.second);
+ } else {
+ buffer::list bl_value;
+ bl_value.append(it.second);
+ attrs.insert({it.first, bl_value});
+ bl_value.clear();
+ }
+ }
+ } catch(std::exception &e) {
+ return -1;
+ }
return 0;
}
+int RedisDriver::del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+{
+ std::string entry = partition_info.location + key;
+
+ try {
+ boost::system::error_code ec;
+ response<int> resp;
+ request req;
+ req.push("DEL", entry);
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return -1;
+
+ return std::get<0>(resp).value() - 1;
+ } catch(std::exception &e) {
+ return -1;
+ }
+}
+
int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y)
{
std::string value;
std::string entry = partition_info.location + key;
- if (!client.is_connected())
- find_client(dpp);
-
try {
- client.hget(entry, "data", [&value](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- value = reply.as_string();
- }
- });
+ boost::system::error_code ec;
+ response<std::string> resp;
+ request req;
+ req.push("HGET", entry, "data");
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return -1;
+
+ value = std::get<0>(resp).value();
} catch(std::exception &e) {
return -1;
}
try { // do we want key check here? -Sam
/* Append to existing value or set as new value */
+ boost::system::error_code ec;
+ response<std::string> resp;
std::string newVal = value + bl_data.to_str();
- std::vector< std::pair<std::string, std::string> > field;
- field.push_back({"data", newVal});
- std::string result;
- client.hmset(entry, field, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
+ request req;
+ req.push("HMSET", entry, "data", newVal);
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (result != "OK") {
+ if (std::get<0>(resp).value() != "OK" || ec) {
return -1;
}
} catch(std::exception &e) {
int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y)
{
std::string entry = partition_info.location + key;
-
- if (!client.is_connected())
- find_client(dpp);
-
- int exists = -2;
+ response<int> value;
+ response<int> resp;
try {
- client.hexists(entry, "data", [&exists](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- exists = reply.as_integer();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push("HEXISTS", entry, "data");
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
+ redis_exec(conn, ec, req, resp, y);
- if (exists) {
- try {
- int result;
- std::vector<std::string> deleteField;
- deleteField.push_back("data");
+ if (ec)
+ return -1;
+ } catch(std::exception &e) {
+ return -1;
+ }
- client.hdel(entry, deleteField, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
+ if (std::get<0>(resp).value()) {
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("HDEL", entry, "data");
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, value, y);
- if (!result) {
- return -1;
- }
- } catch(std::exception &e) {
+ if (!std::get<0>(value).value() || ec) {
return -1;
}
- } else {
- return 0; /* No delete was necessary */
+ } catch(std::exception &e) {
+ return -1;
}
+ }
+
return 0;
}
{
std::string entry = partition_info.location + key;
- if (!client.is_connected())
- find_client(dpp);
-
try {
- client.hgetall(entry, [&attrs](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
- if (arr[i].as_string() != "data") {
- buffer::list bl_value;
- bl_value.append(arr[i + 1].as_string());
- attrs.insert({arr[i].as_string(), bl_value});
- bl_value.clear();
- }
- }
- }
- }
- });
+ boost::system::error_code ec;
+ response< std::map<std::string, std::string> > resp;
+ request req;
+ req.push("HGETALL", entry);
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
return -1;
+
+ for (auto const& it : std::get<0>(resp).value()) {
+ if (it.first != "data") {
+ buffer::list bl_value;
+ bl_value.append(it.second);
+ attrs.insert({it.first, bl_value});
+ bl_value.clear();
+ }
}
+ } catch(std::exception &e) {
+ return -1;
+ }
return 0;
}
std::string entry = partition_info.location + key;
- if (!client.is_connected())
- find_client(dpp);
-
/* Every attr set will be treated as new */
try {
+ boost::system::error_code ec;
+ response<std::string> resp;
std::string result;
- auto redisAttrs = build_attrs(&attrs);
+ std::list<std::string> redisAttrs = build_attrs(&attrs);
- client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
+ request req;
+ req.push_range("HMSET", entry, redisAttrs);
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (result != "OK") {
-return -1;
+ if (std::get<0>(resp).value() != "OK" || ec) {
+ return -1;
}
} catch(std::exception &e) {
return -1;
{
std::string entry = partition_info.location + key;
- if (!client.is_connected())
- find_client(dpp);
-
try {
- std::string result;
+ boost::system::error_code ec;
+ response<std::string> resp;
auto redisAttrs = build_attrs(&attrs);
- client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
+ request req;
+ req.push_range("HMSET", entry, redisAttrs);
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
- if (result != "OK") {
+ if (std::get<0>(resp).value() != "OK" || ec) {
return -1;
}
} catch(std::exception &e) {
{
std::string entry = partition_info.location + key;
- if (!client.is_connected())
- find_client(dpp);
-
- std::vector<std::string> getFields;
-
try {
- client.hgetall(entry, [&getFields](cpp_redis::reply &reply) {
-if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
- getFields.push_back(arr[i].as_string());
- }
- }
-}
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
+ boost::system::error_code ec;
+ response<int> resp;
+ auto redisAttrs = build_attrs(&del_attrs);
- auto redisAttrs = build_attrs(&del_attrs);
- std::vector<std::string> redisFields;
-
- std::transform(begin(redisAttrs), end(redisAttrs), std::back_inserter(redisFields),
- [](auto const& pair) { return pair.first; });
-
- /* Only delete attributes that have been stored */
- for (const auto& it : redisFields) {
- if (std::find(getFields.begin(), getFields.end(), it) == getFields.end()) {
- redisFields.erase(std::find(redisFields.begin(), redisFields.end(), it));
- }
- }
-
- try {
- int result = 0;
+ request req;
+ req.push_range("HDEL", entry, redisAttrs);
- client.hdel(entry, redisFields, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
+ redis_exec(conn, ec, req, resp, y);
- client.sync_commit(std::chrono::milliseconds(1000));
+ if (ec)
+ return -1;
- return result - 1;
+ return std::get<0>(resp).value();
} catch(std::exception &e) {
return -1;
}
-
- ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl;
- return -2;
}
std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y)
{
std::string entry = partition_info.location + key;
- std::string attrValue;
-
- if (!client.is_connected())
- find_client(dpp);
-
- int exists = -2;
- std::string getValue;
+ response<std::string> value;
+ response<int> resp;
/* Ensure field was set */
try {
- client.hexists(entry, attr_name, [&exists](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- exists = reply.as_integer();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push("HEXISTS", entry, attr_name);
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return {};
} catch(std::exception &e) {
return {};
}
- if (!exists) {
+ if (!std::get<0>(resp).value()) {
ldpp_dout(dpp, 20) << "RGW Redis Cache: Attribute was not set." << dendl;
return {};
}
/* Retrieve existing value from cache */
try {
- client.hget(entry, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- attrValue = reply.as_string();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", entry, attr_name);
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, value, y);
+
+ if (ec)
+ return {};
} catch(std::exception &e) {
return {};
}
- return attrValue;
+ return std::get<0>(value).value();
}
-int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal, optional_yield y)
+int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y)
{
std::string entry = partition_info.location + key;
- int result = 0;
-
- if (!client.is_connected())
- find_client(dpp);
+ response<int> resp;
/* Every attr set will be treated as new */
try {
- client.hset(entry, attr_name, attrVal, [&result](cpp_redis::reply& reply) {
- if (!reply.is_null()) {
- result = reply.as_integer();
- }
- });
+ boost::system::error_code ec;
+ request req;
+ req.push("HSET", entry, attr_name, attr_val);
- client.sync_commit(std::chrono::milliseconds(1000));
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec)
+ return {};
} catch(std::exception &e) {
return -1;
}
- return result - 1;
-}
-#if 0
-std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp)
-{
- return std::make_unique<RedisCacheAioRequest>(this);
-}
-#endif
-rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
-{
- rgw_raw_obj r_obj;
- r_obj.oid = key;
-
- return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
+ return std::get<0>(resp).value();
}
-int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
+static Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn,
+ off_t read_ofs, off_t read_len, const std::string& key)
{
- ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl;
- aio_cb.reset(new struct aiocb);
- memset(aio_cb.get(), 0, sizeof(struct aiocb));
- aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY));
-
- if (aio_cb->aio_fildes < 0) {
- int err = errno;
- ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl;
- return -err;
- }
-
- if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) {
- posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
- }
+ return [y, conn, key] (Aio* aio, AioResult& r) mutable {
+ using namespace boost::asio;
+ spawn::yield_context yield = y.get_yield_context();
+ async_completion<spawn::yield_context, void()> init(yield);
+ auto ex = get_associated_executor(init.completion_handler);
- bufferptr bp(read_len);
- aio_cb->aio_buf = bp.c_str();
- result.append(std::move(bp));
+ boost::redis::request req;
+ req.push("HGET", key, "data");
- aio_cb->aio_nbytes = read_len;
- aio_cb->aio_offset = read_ofs;
- aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
- aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch;
- aio_cb->aio_sigevent.sigev_notify_attributes = nullptr;
- aio_cb->aio_sigevent.sigev_value.sival_ptr = arg;
+ // TODO: Make unique pointer once support is added
+ auto s = std::make_shared<RedisDriver::redis_response>();
+ auto& resp = s->resp;
- return 0;
+ conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
+ };
}
-void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval)
+rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
{
- auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
- auto op = std::move(p->user_data);
- const int ret = -aio_error(op.aio_cb.get());
- boost::system::error_code ec;
- if (ret < 0) {
- ec.assign(-ret, boost::system::system_category());
- }
+ std::string entry = partition_info.location + key;
+ rgw_raw_obj r_obj;
+ r_obj.oid = key;
- ceph::async::dispatch(std::move(p), ec, std::move(op.result));
+ return aio->get(r_obj, redis_read_op(y, conn, ofs, len, entry), cost, id);
}
-template <typename Executor1, typename CompletionHandler>
-auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler)
-{
- auto p = Completion::create(ex1, std::move(handler));
- return p;
-}
-
-template <typename ExecutionContext, typename CompletionToken>
-auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
- off_t read_ofs, off_t read_len, CompletionToken&& token)
-{
- std::string location = partition_info.location + key;
- ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl;
-
- using Op = AsyncReadOp;
- using Signature = typename Op::Signature;
- boost::asio::async_completion<CompletionToken, Signature> init(token);
- auto p = Op::create(ctx.get_executor(), init.completion_handler);
- auto& op = p->user_data;
-
- int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
- if (0 == ret) {
- ret = ::aio_read(op.aio_cb.get());
- }
-// ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
- /* if(ret < 0) {
- auto ec = boost::system::error_code{-ret, boost::system::system_category()};
- ceph::async::post(std::move(p), ec, bufferlist{});
- } else {
- (void)p.release();
- }*/
- //return init.result.get();
-}
#if 0
-void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
-{
- using namespace boost::asio;
- async_completion<yield_context, void()> init(y.get_yield_context());
- auto ex = get_associated_executor(init.completion_handler);
-
- ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): key=" << key << dendl;
- cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, RedisDriver::libaio_handler{aio, r}));
-}
-
-void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
+int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
{
+ // call cancel() on the connection's executor
+ boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
}
#endif
+
int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
{
return 0;
#pragma once
-//#include <aedis.hpp>
#include <aio.h>
+#include <boost/redis/connection.hpp>
+
#include "common/async/completion.h"
#include "rgw_common.h"
#include "rgw_cache_driver.h"
-#include <cpp_redis/cpp_redis>
-#include "driver/d4n/d4n_directory.h"
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
namespace rgw { namespace cache {
+namespace net = boost::asio;
+using boost::redis::config;
+using boost::redis::connection;
+using boost::redis::request;
+using boost::redis::response;
+
class RedisDriver : public CacheDriver {
public:
- RedisDriver(Partition& _partition_info) : partition_info(_partition_info),
- free_space(_partition_info.size),
- outstanding_write_size(0)
+ RedisDriver(net::io_context& io_context, Partition& _partition_info) : partition_info(_partition_info),
+ free_space(_partition_info.size),
+ outstanding_write_size(0)
{
+ conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
add_partition_info(_partition_info);
}
virtual ~RedisDriver()
virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
+ virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
+ virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) override;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override;
- virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
+ virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs, optional_yield y) override;
- virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) override;
virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) override;
+ virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) override;
- virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
+ struct redis_response {
+ boost::redis::response<std::string> resp;
+ };
+ void shutdown();
- struct libaio_handler { // should this be the same as SSDDriver? -Sam
+ struct redis_aio_handler {
rgw::Aio* throttle = nullptr;
rgw::AioResult& r;
+ std::shared_ptr<redis_response> s;
- // read callback
- void operator()(boost::system::error_code ec, bufferlist bl) const {
+ /* Read Callback */
+ void operator()(boost::system::error_code ec, long unsigned int size) const {
r.result = -ec.value();
- r.data = std::move(bl);
+ r.data.append(std::get<0>(s->resp).value().c_str());
throttle->put(r);
}
};
- template <typename ExecutionContext, typename CompletionToken>
- auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
- off_t read_ofs, off_t read_len, CompletionToken&& token);
protected:
- cpp_redis::client client;
- rgw::d4n::Address addr;
+ std::shared_ptr<connection> conn;
+
static std::unordered_map<std::string, Partition> partitions;
Partition partition_info;
uint64_t free_space;
uint64_t outstanding_write_size;
CephContext* cct;
- int find_client(const DoutPrefixProvider* dpp);
int add_partition_info(Partition& info);
int remove_partition_info(Partition& info);
-
- private:
- // unique_ptr with custom deleter for struct aiocb
- struct libaio_aiocb_deleter {
- void operator()(struct aiocb* c) {
- if(c->aio_fildes > 0) {
- if( ::close(c->aio_fildes) != 0) {
- }
- }
- delete c;
- }
- };
-
- using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
-
- struct AsyncReadOp {
- bufferlist result;
- unique_aio_cb_ptr aio_cb;
- using Signature = void(boost::system::error_code, bufferlist);
- using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
-
- int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
- static void libaio_cb_aio_dispatch(sigval sigval);
-
- template <typename Executor1, typename CompletionHandler>
- static auto create(const Executor1& ex1, CompletionHandler&& handler);
- };
};
} } // namespace rgw::cache
#endif
extern rgw::sal::Driver* newBaseFilter(rgw::sal::Driver* next);
#ifdef WITH_RADOSGW_D4N
-extern rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next);
+extern rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next, boost::asio::io_context* io_context);
#endif
}
#ifdef WITH_RADOSGW_D4N
else if (cfg.filter_name.compare("d4n") == 0) {
rgw::sal::Driver* next = driver;
- driver = newD4NFilter(next);
+ driver = newD4NFilter(next, &io_context);
if (driver->initialize(cct, dpp) < 0) {
delete driver;
virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
+ virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override { return -1; } // TODO: implement
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) override;