From: Samarah Date: Mon, 25 Sep 2023 14:49:43 +0000 (+0000) Subject: rgw/d4n: this commit squashes the following commits related to X-Git-Tag: testing/wip-batrick-testing-20240411.154038~45^2~54 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c6ddf8697269973bf78f0b1cf09d94bafe76db67;p=ceph-ci.git rgw/d4n: this commit squashes the following commits related to clean up and removal of cpp_redis. d4n/filter: Add `optional_yield` to CacheDriver calls in D4N Filter and make minor updates to several filter methods rgw/d4n: fix compilation issue. rgw/cache: Add `del` method to CacheDriver and SSDDriver cmake/d4n: Remove unnecessary D4N lines rgw: Add `io_context` to D4N Filter and RedisDriver, remove `cpp_redis` library from RedisDriver, and perform minor cleanup d4n: Remove `cpp_redis` library from D4N directory and policy; update calls in filter; move Entry struct to base CachePolicy class build/cpp_redis: Remove `cpp_redis` library rgw/d4n: including wheerever needed. rgw/d4n : fixes to d4n filter, policy, directory and redis driver files for compilation errors. Signed-off-by: Samarah Signed-off-by: Casey Bodley Co-authored-by: Pritha Srivastava --- diff --git a/.gitmodules b/.gitmodules index 341a8246ece..b89bc7e7b28 100644 --- a/.gitmodules +++ b/.gitmodules @@ -59,9 +59,6 @@ [submodule "s3select"] path = src/s3select url = https://github.com/ceph/s3select.git -[submodule "src/cpp_redis"] - path = src/cpp_redis - url = https://github.com/ceph/cpp_redis.git [submodule "src/libkmip"] path = src/libkmip url = https://github.com/ceph/libkmip diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index da7c5ea9019..73fa27ae34e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -293,10 +293,6 @@ if(WITH_CEPHFS_JAVA) add_subdirectory(java) endif() -if(WITH_RADOSGW_D4N) - add_subdirectory(cpp_redis) # remove later -Sam -endif() - if (WITH_BLKIN) add_subdirectory(blkin/blkin-lib) endif(WITH_BLKIN) diff --git a/src/cpp_redis b/src/cpp_redis deleted file mode 160000 index 72d992fff2a..00000000000 --- a/src/cpp_redis +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 72d992fff2a95edb37430a75909a844637549331 diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 8ad74d1a1ca..afd114f191d 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -202,7 +202,6 @@ set(librgw_common_srcs driver/rados/topic.cc) list(APPEND librgw_common_srcs - driver/d4n/d4n_directory.cc driver/immutable_config/store.cc driver/json_config/store.cc driver/rados/config/impl.cc @@ -229,10 +228,6 @@ endif() if(WITH_RADOSGW_DAOS) list(APPEND librgw_common_srcs driver/motr/rgw_sal_daos.cc) endif() -if(WITH_RADOSGW_D4N) - list(APPEND librgw_common_srcs driver/d4n/d4n_directory.cc) - list(APPEND librgw_common_srcs driver/d4n/rgw_sal_d4n.cc) -endif() if(WITH_RADOSGW_POSIX) #add_subdirectory(driver/posix) find_package(LMDB REQUIRED) @@ -297,11 +292,6 @@ target_include_directories(rgw_common PUBLIC "${LUA_INCLUDE_DIR}") if(WITH_RADOSGW_D4N) - add_dependencies(rgw_common cpp_redis) # remove later -Sam - target_link_libraries(rgw_common PRIVATE cpp_redis) - target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/cpp_redis/includes") - target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/cpp_redis/tacopie/includes") - target_include_directories(rgw_common SYSTEM PUBLIC "${CMAKE_SOURCE_DIR}/src/boost_redis/include") endif() diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 9bd812a9245..edeb69ff3f7 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -1,89 +1,110 @@ +#include +#include "common/async/blocked_completion.h" #include "d4n_directory.h" -#include - -#define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context namespace rgw { namespace d4n { -int ObjectDirectory::find_client(cpp_redis::client* client) { - if (client->is_connected()) - return 0; - - if (addr.host == "" || addr.port == 0) { - dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl; - return EDESTADDRREQ; - } +// initiate a call to async_exec() on the connection's executor +struct initiate_exec { + std::shared_ptr conn; + boost::redis::request req; - client->connect(addr.host, addr.port, nullptr); + using executor_type = boost::redis::connection::executor_type; + executor_type get_executor() const noexcept { return conn->get_executor(); } - if (!client->is_connected()) - return ECONNREFUSED; + template + void operator()(Handler handler, Response& resp) + { + conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn)); + } +}; + +template +auto async_exec(std::shared_ptr conn, + const boost::redis::request& req, + Response& resp, CompletionToken&& token) +{ + return boost::asio::async_initiate( + initiate_exec{std::move(conn), req}, token, resp); +} - return 0; +template +void redis_exec(std::shared_ptr conn, + boost::system::error_code& ec, + const boost::redis::request& req, + boost::redis::response& resp, optional_yield y) +{ + if (y) { + auto yield = y.get_yield_context(); + async_exec(std::move(conn), req, resp, yield[ec]); + } else { + async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]); + } } std::string ObjectDirectory::build_index(CacheObj* object) { return object->bucketName + "_" + object->objName; } -int ObjectDirectory::exist_key(std::string key) { +int ObjectDirectory::exist_key(std::string key, optional_yield y) { int result = 0; std::vector keys; keys.push_back(key); - +#if 0 if (!client.is_connected()) { return result; } - +#endif + response resp; try { - client.exists(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); + boost::system::error_code ec; + request req; + req.push("EXISTS", key); + + redis_exec(conn, ec, req, resp, y); + + if ((bool)ec) + return false; } catch(std::exception &e) {} - return result; + return std::get<0>(resp).value(); } -int ObjectDirectory::set_value(CacheObj* object) { - /* Creating the index based on objName */ - std::string result; - std::string key = build_index(object); - if (!client.is_connected()) { - find_client(&client); - } +void ObjectDirectory::shutdown() // generalize -Sam +{ + // call cancel() on the connection's executor + boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); +} - /* Every set will be new */ - if (addr.host == "" || addr.port == 0) { - dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl; - return -2; - } +int ObjectDirectory::set(CacheObj* object, optional_yield y) { + std::string key = build_index(object); - std::string endpoint = addr.host + ":" + std::to_string(addr.port); - std::vector< std::pair > list; + /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam + std::string endpoint = cct->_conf->rgw_d4n_host + ":" + std::to_string(cct->_conf->rgw_d4n_port); + std::list redisValues; - /* Creating a list of the entry's properties */ - list.push_back(make_pair("key", key)); - list.push_back(make_pair("objName", object->objName)); - list.push_back(make_pair("bucketName", object->bucketName)); - list.push_back(make_pair("creationTime", std::to_string(object->creationTime))); - list.push_back(make_pair("dirty", std::to_string(object->dirty))); - list.push_back(make_pair("hosts", endpoint)); + /* Creating a redisValues of the entry's properties */ + redisValues.push_back("objName"); + redisValues.push_back(object->objName); + redisValues.push_back("bucketName"); + redisValues.push_back(object->bucketName); + redisValues.push_back("creationTime"); + redisValues.push_back(std::to_string(object->creationTime)); + redisValues.push_back("dirty"); + redisValues.push_back(std::to_string(object->dirty)); + redisValues.push_back("objHosts"); + redisValues.push_back(endpoint); // Set in filter -Sam try { - client.hmset(key, list, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + boost::system::error_code ec; + request req; + req.push_range("HMSET", key, redisValues); + response resp; - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (result != "OK") { + if (std::get<0>(resp).value() != "OK" || (bool)ec) { return -1; } } catch(std::exception &e) { @@ -93,15 +114,14 @@ int ObjectDirectory::set_value(CacheObj* object) { return 0; } -int ObjectDirectory::get_value(CacheObj* object) { - int keyExist = -2; +int ObjectDirectory::get(CacheObj* object, optional_yield y) { std::string key = build_index(object); - +#if 0 if (!client.is_connected()) { find_client(&client); } - - if (exist_key(key)) { +#endif + if (exist_key(key, y)) { std::string key; std::string objName; std::string bucketName; @@ -110,74 +130,88 @@ int ObjectDirectory::get_value(CacheObj* object) { std::string hosts; std::vector fields; - fields.push_back("key"); fields.push_back("objName"); fields.push_back("bucketName"); fields.push_back("creationTime"); fields.push_back("dirty"); - fields.push_back("hosts"); + fields.push_back("objHosts"); try { - client.hmget(key, fields, [&key, &objName, &bucketName, &creationTime, &dirty, &hosts, &keyExist](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - keyExist = 0; - key = arr[0].as_string(); - objName = arr[1].as_string(); - bucketName = arr[2].as_string(); - creationTime = arr[3].as_string(); - dirty = arr[4].as_string(); - hosts = arr[5].as_string(); - } - } - }); + boost::system::error_code ec; + request req; + req.push_range("HMGET", key, fields); + response< std::vector > resp; - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (keyExist < 0) { - return keyExist; + if (!std::get<0>(resp).value().size() || (bool)ec) { + return -1; } - /* Currently, there can only be one host */ - object->objName = objName; - object->bucketName = bucketName; + object->objName = std::get<0>(resp).value()[0]; + object->bucketName = std::get<0>(resp).value()[1]; + object->creationTime = boost::lexical_cast(std::get<0>(resp).value()[2]); + object->dirty = boost::lexical_cast(std::get<0>(resp).value()[3]); - struct std::tm tm; - std::istringstream(creationTime) >> std::get_time(&tm, "%T"); - strptime(creationTime.c_str(), "%T", &tm); // Need to check formatting -Sam - object->creationTime = mktime(&tm); + { + std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[4])); - std::istringstream(dirty) >> object->dirty; + while (!ss.eof()) { + std::string host; + std::getline(ss, host, '_'); + object->hostsList.push_back(host); + } + } } catch(std::exception &e) { - keyExist = -1; + return -1; } + } else { + return -2; } - return keyExist; + return 0; } -int ObjectDirectory::del_value(CacheObj* object) { - int result = 0; - std::vector keys; +int ObjectDirectory::copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y) { std::string key = build_index(object); + std::vector keys; keys.push_back(key); - + std::string copyKey; +#if 0 if (!client.is_connected()) { find_client(&client); } - - if (exist_key(key)) { +#endif + if (exist_key(key, y)) { try { - client.del(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); + response resp; + + { + boost::system::error_code ec; + request req; + req.push("COPY", key, copyKey); + + redis_exec(conn, ec, req, resp, y); + + if ((bool)ec) { + return -1; + } + } + + { + boost::system::error_code ec; + request req; + req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName); + response res; + + redis_exec(conn, ec, req, res, y); + + if (std::get<0>(res).value() != "OK" || (bool)ec) { + return -1; } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - return result - 1; + } + + return std::get<0>(resp).value() - 1; } catch(std::exception &e) { return -1; } @@ -186,84 +220,96 @@ int ObjectDirectory::del_value(CacheObj* object) { } } -int BlockDirectory::find_client(cpp_redis::client* client) { - if (client->is_connected()) - return 0; +int ObjectDirectory::del(CacheObj* object, optional_yield y) { + std::string key = build_index(object); - if (addr.host == "" || addr.port == 0) { - dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl; - return EDESTADDRREQ; - } + if (exist_key(key, y)) { + try { + boost::system::error_code ec; + request req; + req.push("DEL", key); + response resp; - client->connect(addr.host, addr.port, nullptr); + redis_exec(conn, ec, req, resp, y); - if (!client->is_connected()) - return ECONNREFUSED; + if ((bool)ec) + return -1; - return 0; + return std::get<0>(resp).value() - 1; + } catch(std::exception &e) { + return -1; + } + } else { + return 0; /* No delete was necessary */ + } } std::string BlockDirectory::build_index(CacheBlock* block) { - return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + boost::lexical_cast(block->blockId); + return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID); } -int BlockDirectory::exist_key(std::string key) { - int result = 0; - std::vector keys; - keys.push_back(key); - - if (!client.is_connected()) { - return result; - } +int BlockDirectory::exist_key(std::string key, optional_yield y) { + response resp; try { - client.exists(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); + boost::system::error_code ec; + request req; + req.push("EXISTS", key); + + redis_exec(conn, ec, req, resp, y); + + if ((bool)ec) + return false; } catch(std::exception &e) {} - return result; + return std::get<0>(resp).value(); } -int BlockDirectory::set_value(CacheBlock* block) { - /* Creating the index based on objName */ - std::string result; - std::string key = build_index(block); - if (!client.is_connected()) { - find_client(&client); - } +void BlockDirectory::shutdown() +{ + // call cancel() on the connection's executor + boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); +} - /* Every set will be new */ - if (addr.host == "" || addr.port == 0) { - dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl; - return -2; - } +int BlockDirectory::set(CacheBlock* block, optional_yield y) { + std::string key = build_index(block); - std::string endpoint = addr.host + ":" + std::to_string(addr.port); - std::vector< std::pair > list; + /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam + std::string endpoint = cct->_conf->rgw_d4n_host + ":" + std::to_string(cct->_conf->rgw_d4n_port); + std::list redisValues; - /* Creating a list of the entry's properties */ - list.push_back(make_pair("key", key)); - list.push_back(make_pair("size", std::to_string(block->size))); - list.push_back(make_pair("globalWeight", std::to_string(block->globalWeight))); - list.push_back(make_pair("bucketName", block->cacheObj.bucketName)); - list.push_back(make_pair("objName", block->cacheObj.objName)); - list.push_back(make_pair("hosts", endpoint)); + /* Creating a redisValues of the entry's properties */ + redisValues.push_back("blockID"); + redisValues.push_back(std::to_string(block->blockID)); + redisValues.push_back("version"); + redisValues.push_back(block->version); + redisValues.push_back("size"); + redisValues.push_back(std::to_string(block->size)); + redisValues.push_back("globalWeight"); + redisValues.push_back(std::to_string(block->globalWeight)); + redisValues.push_back("blockHosts"); + redisValues.push_back(endpoint); // Set in filter -Sam + + redisValues.push_back("objName"); + redisValues.push_back(block->cacheObj.objName); + redisValues.push_back("bucketName"); + redisValues.push_back(block->cacheObj.bucketName); + redisValues.push_back("creationTime"); + redisValues.push_back(std::to_string(block->cacheObj.creationTime)); + redisValues.push_back("dirty"); + redisValues.push_back(std::to_string(block->cacheObj.dirty)); + redisValues.push_back("objHosts"); + redisValues.push_back(endpoint); // Set in filter -Sam try { - client.hmset(key, list, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + boost::system::error_code ec; + request req; + req.push_range("HMSET", key, redisValues); + response resp; - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (result != "OK") { + if (std::get<0>(resp).value() != "OK" || (bool)ec) { return -1; } } catch(std::exception &e) { @@ -273,81 +319,112 @@ int BlockDirectory::set_value(CacheBlock* block) { return 0; } -int BlockDirectory::get_value(CacheBlock* block) { - int keyExist = -2; +int BlockDirectory::get(CacheBlock* block, optional_yield y) { std::string key = build_index(block); - if (!client.is_connected()) { - find_client(&client); - } - - if (exist_key(key)) { - std::string hosts; - std::string size; - std::string bucketName; - std::string objName; + if (exist_key(key, y)) { std::vector fields; - fields.push_back("key"); - fields.push_back("hosts"); + fields.push_back("blockID"); + fields.push_back("version"); fields.push_back("size"); - fields.push_back("bucketName"); + fields.push_back("globalWeight"); + fields.push_back("blockHosts"); + fields.push_back("objName"); + fields.push_back("bucketName"); + fields.push_back("creationTime"); + fields.push_back("dirty"); + fields.push_back("objHosts"); try { - client.hmget(key, fields, [&key, &hosts, &size, &bucketName, &objName, &keyExist](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - keyExist = 0; - key = arr[0].as_string(); - hosts = arr[1].as_string(); - size = arr[2].as_string(); - bucketName = arr[3].as_string(); - objName = arr[4].as_string(); - } - } - }); + boost::system::error_code ec; + request req; + req.push_range("HMGET", key, fields); + response< std::vector > resp; - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (keyExist < 0 ) { - return keyExist; + if (!std::get<0>(resp).value().size() || (bool)ec) { + return -1; } - /* Currently, there can only be one host */ // update -Sam - block->size = std::stoi(size); - block->cacheObj.bucketName = bucketName; - block->cacheObj.objName = objName; + block->blockID = boost::lexical_cast(std::get<0>(resp).value()[0]); + block->version = std::get<0>(resp).value()[1]; + block->size = boost::lexical_cast(std::get<0>(resp).value()[2]); + block->globalWeight = boost::lexical_cast(std::get<0>(resp).value()[3]); + + { + std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[4])); + block->hostsList.clear(); + + while (!ss.eof()) { + std::string host; + std::getline(ss, host, '_'); + block->hostsList.push_back(host); + } + } + + block->cacheObj.objName = std::get<0>(resp).value()[5]; + block->cacheObj.bucketName = std::get<0>(resp).value()[6]; + block->cacheObj.creationTime = boost::lexical_cast(std::get<0>(resp).value()[7]); + block->cacheObj.dirty = boost::lexical_cast(std::get<0>(resp).value()[8]); + + { + std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[9])); + block->cacheObj.hostsList.clear(); + + while (!ss.eof()) { + std::string host; + std::getline(ss, host, '_'); + block->cacheObj.hostsList.push_back(host); + } + } } catch(std::exception &e) { - keyExist = -1; + return -1; } + } else { + return -2; } - return keyExist; + return 0; } -int BlockDirectory::del_value(CacheBlock* block) { - int result = 0; - std::vector keys; +int BlockDirectory::copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) { std::string key = build_index(block); - keys.push_back(key); - - if (!client.is_connected()) { - find_client(&client); - } - - if (exist_key(key)) { + auto copyBlock = CacheBlock{ .cacheObj = { .objName = copyName, .bucketName = copyBucketName }, .blockID = 0 }; + std::string copyKey = build_index(©Block); + + if (exist_key(key, y)) { try { - client.del(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); + response resp; + + { + boost::system::error_code ec; + request req; + req.push("COPY", key, copyKey); + + redis_exec(conn, ec, req, resp, y); + + if ((bool)ec) { + return -1; + } + } + + { + boost::system::error_code ec; + request req; + req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName); + response res; + + redis_exec(conn, ec, req, res, y); + + if (std::get<0>(res).value() != "OK" || (bool)ec) { + return -1; } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - return result - 1; + } + + return std::get<0>(resp).value() - 1; } catch(std::exception &e) { return -1; } @@ -356,66 +433,85 @@ int BlockDirectory::del_value(CacheBlock* block) { } } -int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value) { // represent in cache block too -Sam - std::string result; +int BlockDirectory::del(CacheBlock* block, optional_yield y) { std::string key = build_index(block); - if (!client.is_connected()) { - find_client(&client); - } - - if (exist_key(key)) { - if (field == "hostsList") { - /* Append rather than overwrite */ - std::string hosts; - - try { - client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) { - if (!reply.is_null()) { - hosts = reply.as_string(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { + if (exist_key(key, y)) { + try { + boost::system::error_code ec; + request req; + req.push("DEL", key); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if ((bool)ec) return -1; - } - - value += "_"; - value += hosts; - } - /* Update cache block */ // Remove ones that aren't used -Sam - if (field == "size") - block->size = std::stoi(value); - else if (field == "bucketName") - block->cacheObj.bucketName = value; - else if (field == "objName") - block->cacheObj.objName = value; - else if (field == "hostsList") - block->hostsList.push_back(value); + return std::get<0>(resp).value() - 1; + } catch(std::exception &e) { + return -1; + } + } else { + return 0; /* No delete was necessary */ + } +} - std::vector< std::pair > list; - list.push_back(std::make_pair(field, value)); +int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value, optional_yield y) { + std::string key = build_index(block); + if (exist_key(key, y)) { try { - client.hmset(key, list, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + /* Ensure field exists */ + { + boost::system::error_code ec; + request req; + req.push("HEXISTS", key, field); + response resp; - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (result != "OK") { - return -1; + if (!std::get<0>(resp).value() || (bool)ec) + return -1; + } + + if (field == "blockHosts" || field == "objHosts") { + /* Append rather than overwrite */ + boost::system::error_code ec; + request req; + req.push("HGET", key, field); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (!std::get<0>(resp).value().size() || (bool)ec) + return -1; + + std::get<0>(resp).value() += "_"; + std::get<0>(resp).value() += value; + value = std::get<0>(resp).value(); + } + + { + boost::system::error_code ec; + request req; + req.push_range("HSET", key, std::map{{field, value}}); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if ((bool)ec) { + return -1; + } + + return std::get<0>(resp).value(); /* Zero fields added since it is an update of an existing field */ } } catch(std::exception &e) { return -1; } + } else { + return -2; } - - return 0; } } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index b4b90f5e571..fc10754b063 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -1,17 +1,21 @@ #pragma once #include "rgw_common.h" -#include -#include -#include + #include +#include +#include + +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context namespace rgw { namespace d4n { -struct Address { - std::string host; - int port; -}; +namespace net = boost::asio; +using boost::redis::config; +using boost::redis::connection; +using boost::redis::request; +using boost::redis::response; struct CacheObj { std::string objName; /* S3 object name */ @@ -23,75 +27,97 @@ struct CacheObj { struct CacheBlock { CacheObj cacheObj; - uint64_t blockId; /* block ID */ + uint64_t blockID; + std::string version; uint64_t size; /* Block size in bytes */ - int globalWeight = 0; + int globalWeight = 0; /* LFUDA policy variable */ std::vector hostsList; /* List of hostnames of block locations */ }; class Directory { public: - Directory() {} CephContext* cct; + + Directory() {} }; class ObjectDirectory: public Directory { // weave into write workflow -Sam public: - ObjectDirectory() {} - ObjectDirectory(std::string host, int port) { - addr.host = host; - addr.port = port; + ObjectDirectory(net::io_context& io_context) { + conn = std::make_shared(boost::asio::make_strand(io_context)); } - - void init(CephContext* _cct) { - cct = _cct; - addr.host = cct->_conf->rgw_d4n_host; - addr.port = cct->_conf->rgw_d4n_port; + ~ObjectDirectory() { + shutdown(); } - int find_client(cpp_redis::client* client); - int exist_key(std::string key); - Address get_addr() { return addr; } + int init(CephContext* cct, const DoutPrefixProvider* dpp) { + this->cct = cct; + + config cfg; + cfg.addr.host = cct->_conf->rgw_d4n_host; // same or different address from block directory? -Sam + cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port); - int set_value(CacheObj* object); - int get_value(CacheObj* object); - int copy_value(CacheObj* object, CacheObj* copyObject); - int del_value(CacheObj* object); + if (!cfg.addr.host.length() || !cfg.addr.port.length()) { + ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Object directory endpoint was not configured correctly" << dendl; + return -EDESTADDRREQ; + } + + conn->async_run(cfg, {}, net::consign(net::detached, conn)); + + return 0; + } + int exist_key(std::string key, optional_yield y); + void shutdown(); + + int set(CacheObj* object, optional_yield y); + int get(CacheObj* object, optional_yield y); + int copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y); + int del(CacheObj* object, optional_yield y); private: - cpp_redis::client client; - Address addr; + std::shared_ptr conn; + std::string build_index(CacheObj* object); }; class BlockDirectory: public Directory { public: - BlockDirectory() {} - BlockDirectory(std::string host, int port) { - addr.host = host; - addr.port = port; + BlockDirectory(net::io_context& io_context) { + conn = std::make_shared(boost::asio::make_strand(io_context)); + } + ~BlockDirectory() { + shutdown(); } - void init(CephContext* _cct) { - cct = _cct; - addr.host = cct->_conf->rgw_d4n_host; - addr.port = cct->_conf->rgw_d4n_port; + int init(CephContext* cct, const DoutPrefixProvider* dpp) { + this->cct = cct; + + config cfg; + cfg.addr.host = cct->_conf->rgw_d4n_host; + cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port); + + if (!cfg.addr.host.length() || !cfg.addr.port.length()) { // add logs to other methods -Sam + ldpp_dout(dpp, 10) << "D4N Directory " << __func__ << ": Block directory endpoint was not configured correctly" << dendl; + return -EDESTADDRREQ; + } + + conn->async_run(cfg, {}, net::consign(net::detached, conn)); + + return 0; } - int find_client(cpp_redis::client* client); - int exist_key(std::string key); - Address get_addr() { return addr; } + int exist_key(std::string key, optional_yield y); + void shutdown(); - int set_value(CacheBlock* block); - int get_value(CacheBlock* block); - int copy_value(CacheBlock* block, CacheBlock* copyBlock); - int del_value(CacheBlock* block); - - int update_field(CacheBlock* block, std::string field, std::string value); + int set(CacheBlock* block, optional_yield y); + int get(CacheBlock* block, optional_yield y); + int copy(CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y); + int del(CacheBlock* block, optional_yield y); + int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y); private: - cpp_redis::client client; - Address addr; + std::shared_ptr conn; + std::string build_index(CacheBlock* block); }; diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 5004a096420..987c7d39458 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -1,232 +1,226 @@ #include "../../../common/async/yield_context.h" #include "d4n_policy.h" - -#define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context +#include "common/async/blocked_completion.h" namespace rgw { namespace d4n { -int CachePolicy::find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) { - if (client->is_connected()) - return 0; - - if (get_addr().host == "" || get_addr().port == 0) { - ldpp_dout(dpp, 10) << "RGW D4N Policy: D4N policy endpoint was not configured correctly" << dendl; - return EDESTADDRREQ; - } +// initiate a call to async_exec() on the connection's executor +struct initiate_exec { + std::shared_ptr conn; + boost::redis::request req; - client->connect(get_addr().host, get_addr().port, nullptr); + using executor_type = boost::redis::connection::executor_type; + executor_type get_executor() const noexcept { return conn->get_executor(); } - if (!client->is_connected()) - return ECONNREFUSED; + template + void operator()(Handler handler, Response& resp) + { + conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn)); + } +}; - return 0; +template +auto async_exec(std::shared_ptr conn, + const boost::redis::request& req, + Response& resp, CompletionToken&& token) +{ + return boost::asio::async_initiate( + initiate_exec{std::move(conn), req}, token, resp); } -int CachePolicy::exist_key(std::string key) { - int result = -1; - std::vector keys; - keys.push_back(key); - - if (!client.is_connected()) { - return result; +template +void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response& resp, optional_yield y) +{ + if (y) { + auto yield = y.get_yield_context(); + async_exec(std::move(conn), req, resp, yield[ec]); + } else { + async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]); } +} +int LFUDAPolicy::set_age(int age, optional_yield y) { try { - client.exists(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) {} + boost::system::error_code ec; + response resp; + request req; + req.push("HSET", "lfuda", "age", std::to_string(age)); - return result; -} + redis_exec(conn, ec, req, resp, y); -int LFUDAPolicy::set_age(int age) { - int result = 0; + if (ec) + return {}; - try { - client.hset("lfuda", "age", std::to_string(age), [&result](cpp_redis::reply& reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(); + return std::get<0>(resp).value(); /* Returns number of fields set */ } catch(std::exception &e) { return -1; } - - return result - 1; } -int LFUDAPolicy::get_age() { - int ret = 0; - int age = -1; +int LFUDAPolicy::get_age(optional_yield y) { + response resp; try { - client.hexists("lfuda", "age", [&ret](cpp_redis::reply& reply) { - if (!reply.is_null()) { - ret = reply.as_integer(); - } - }); + boost::system::error_code ec; + request req; + req.push("HEXISTS", "lfuda", "age"); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); + + if (ec) + return -1; } catch(std::exception &e) { return -1; } - if (!ret) { - ret = set_age(0); /* Initialize age */ - - if (!ret) { - return 0; /* Success */ - } else { + if (!std::get<0>(resp).value()) { + if (!set_age(0, y)) /* Initialize age */ + return 0; + else return -1; - }; } - try { - client.hget("lfuda", "age", [&age](cpp_redis::reply& reply) { - if (!reply.is_null()) { - age = std::stoi(reply.as_string()); - } - }); + try { + boost::system::error_code ec; + response value; + request req; + req.push("HGET", "lfuda", "age"); + + redis_exec(conn, ec, req, value, y); + + if (ec) + return -1; - client.sync_commit(std::chrono::milliseconds(1000)); + return std::stoi(std::get<0>(value).value()); } catch(std::exception &e) { return -1; } - - return age; } -int LFUDAPolicy::set_global_weight(std::string key, int weight) { - int result = 0; - +int LFUDAPolicy::set_global_weight(std::string key, int weight, optional_yield y) { try { - client.hset(key, "globalWeight", std::to_string(weight), [&result](cpp_redis::reply& reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); + boost::system::error_code ec; + response resp; + request req; + req.push("HSET", key, "globalWeight", std::to_string(weight)); + + redis_exec(conn, ec, req, resp, y); + + if (ec) + return {}; - client.sync_commit(); + return std::get<0>(resp).value(); /* Returns number of fields set */ } catch(std::exception &e) { return -1; } - - return result - 1; } -int LFUDAPolicy::get_global_weight(std::string key) { - int weight = -1; +int LFUDAPolicy::get_global_weight(std::string key, optional_yield y) { + try { + boost::system::error_code ec; + response resp; + request req; + req.push("HGET", key, "globalWeight"); + + redis_exec(conn, ec, req, resp, y); - try { - client.hget(key, "globalWeight", [&weight](cpp_redis::reply& reply) { - if (!reply.is_null()) { - weight = reply.as_integer(); - } - }); + if (ec) + return -1; - client.sync_commit(std::chrono::milliseconds(1000)); + return std::stoi(std::get<0>(resp).value()); } catch(std::exception &e) { return -1; } - - return weight; } -int LFUDAPolicy::set_min_avg_weight(int weight, std::string cacheLocation) { - int result = 0; - +int LFUDAPolicy::set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y) { try { - client.hset("lfuda", "minAvgWeight:cache", cacheLocation, [&result](cpp_redis::reply& reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); + boost::system::error_code ec; + response resp; + request req; + req.push("HSET", "lfuda", "minAvgWeight:cache", cacheLocation); - client.sync_commit(); + redis_exec(conn, ec, req, resp, y); + + if (ec) + return {}; } catch(std::exception &e) { return -1; } + + try { + boost::system::error_code ec; + response resp; + request req; + req.push("HSET", "lfuda", "minAvgWeight:weight", cacheLocation); - if (result == 1) { - result = 0; - try { - client.hset("lfuda", "minAvgWeight:weight", std::to_string(weight), [&result](cpp_redis::reply& reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); + redis_exec(conn, ec, req, resp, y); - client.sync_commit(); - } catch(std::exception &e) { - return -1; - } - } + if (ec) + return {}; - return result - 1; + return std::get<0>(resp).value(); /* Returns number of fields set */ + } catch(std::exception &e) { + return -1; + } } -int LFUDAPolicy::get_min_avg_weight() { - int ret = 0; - int weight = -1; +int LFUDAPolicy::get_min_avg_weight(optional_yield y) { + response resp; try { - client.hexists("lfuda", "minAvgWeight:cache", [&ret](cpp_redis::reply& reply) { - if (!reply.is_null()) { - ret = reply.as_integer(); - } - }); + boost::system::error_code ec; + request req; + req.push("HEXISTS", "lfuda", "minAvgWeight:cache"); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); + + if (ec) + return -1; } catch(std::exception &e) { return -1; } - if (!ret) { - ret = set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */); /* Initialize minimum average weight */ - - if (!ret) { - return INT_MAX; /* Success */ + if (!std::get<0>(resp).value()) { + if (!set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */, y)) { /* Initialize minimum average weight */ + return INT_MAX; } else { return -1; - }; + } } - try { - client.hget("lfuda", "minAvgWeight:weight", [&weight](cpp_redis::reply& reply) { - if (!reply.is_null()) { - weight = std::stoi(reply.as_string()); - } - }); + try { + boost::system::error_code ec; + response value; + request req; + req.push("HGET", "lfuda", "minAvgWeight:weight"); + + redis_exec(conn, ec, req, value, y); + + if (ec) + return -1; - client.sync_commit(std::chrono::milliseconds(1000)); + return std::stoi(std::get<0>(value).value()); } catch(std::exception &e) { return -1; } - - return weight; } -CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) { +CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { #if 0 std::vector entries = cacheNode->list_entries(dpp); std::string victimName; int minWeight = INT_MAX; for (auto it = entries.begin(); it != entries.end(); ++it) { - optional_yield y = null_yield; std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight", y); // should represent block -Sam - if (!std::stoi(localWeightStr)) { // maybe do this in some sort of initialization procedure instead of here? -Sam - /* Local weight hasn't been set */ - int ret = cacheNode->set_attr(dpp, it->key, "localWeight", std::to_string(get_age())); + /* Get victim cache block */ + CacheBlock victim; + victim.cacheObj.objName = it->second->key; + victim.cacheObj.bucketName = cacheNode->get_attr(dpp, victim.cacheObj.objName, "bucket_name", y); // generalize for other cache backends -Sam + victim.blockID = 0; // find way to get ID -Sam if (ret < 0) return {}; @@ -239,10 +233,10 @@ CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::C /* Get victim cache block */ CacheBlock victimBlock; victimBlock.cacheObj.objName = victimName; - BlockDirectory blockDir; - blockDir.init(cct); + BlockDirectory blockDir(io); + blockDir.init(cct, dpp); - int ret = blockDir.get_value(&victimBlock); + int ret = blockDir.get(&victimBlock, y); if (ret < 0) return {}; @@ -251,19 +245,32 @@ CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::C return victimBlock; } -int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) { +int LFUDAPolicy::exist_key(std::string key, optional_yield y) { + response resp; + + try { + boost::system::error_code ec; + request req; + req.push("EXISTS", key); + + redis_exec(conn, ec, req, resp, y); + + if (ec) + return false; + } catch(std::exception &e) {} + + return std::get<0>(resp).value(); +} + +int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) { std::string key = "rgw-object:" + block->cacheObj.objName + ":directory"; - optional_yield y = null_yield; std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight", y); // change to block name eventually -Sam int localWeight = -1; - - if (!client.is_connected()) - find_client(dpp, &client); + response resp; if (localWeightStr.empty()) { // figure out where to set local weight -Sam - optional_yield y = null_yield; - int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age()), y); - localWeight = get_age(); + int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age(y)), y); + localWeight = get_age(y); if (ret < 0) return -1; @@ -271,27 +278,26 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw localWeight = std::stoi(localWeightStr); } - int age = get_age(); + int age = get_age(y); - bool key_exists = true; //cacheNode->key_exists(dpp, block->cacheObj.objName) //TODO- correct this - if (key_exists) { /* Local copy */ + if (exist_key(key, y)) { /* Local copy */ localWeight += age; } else { - std::string hosts; uint64_t freeSpace = cacheNode->get_free_space(dpp); while (freeSpace < block->size) /* Not enough space in local cache */ - freeSpace += eviction(dpp, cacheNode); + freeSpace += eviction(dpp, cacheNode, y); - if (exist_key(key)) { + if (exist_key(key, y)) { /* Remote copy */ try { - client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) { - if (!reply.is_null()) { - hosts = reply.as_string(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); + boost::system::error_code ec; + request req; + req.push("HGET", key, "blockHosts"); + + redis_exec(conn, ec, req, resp, y); + + if (ec) + return -1; } catch(std::exception &e) { return -1; } @@ -300,11 +306,11 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw } // should not hold local cache IP if in this else statement -Sam - if (hosts.length() > 0) { /* Remote copy */ - int globalWeight = get_global_weight(key); + if (std::get<0>(resp).value().length() > 0) { /* Remote copy */ + int globalWeight = get_global_weight(key, y); globalWeight += age; - if (set_global_weight(key, globalWeight)) + if (set_global_weight(key, globalWeight, y)) return -1; } else { /* No remote copy */ // do I need to add the block to the local cache here? -Sam @@ -317,8 +323,8 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw return 0; } -uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) { - CacheBlock victim = find_victim(dpp, cacheNode); +uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { + CacheBlock victim = find_victim(dpp, cacheNode, y); if (victim.cacheObj.objName.empty()) { ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl; @@ -326,21 +332,21 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD } std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory"; - std::string hosts; - int globalWeight = get_global_weight(key); - optional_yield y = null_yield; + int globalWeight = get_global_weight(key, y); int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight", y)); // change to block name eventually -Sam - int avgWeight = get_min_avg_weight(); + int avgWeight = get_min_avg_weight(y); + response resp; - if (exist_key(key)) { + if (exist_key(key, y)) { try { - client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) { - if (!reply.is_null()) { - hosts = reply.as_string(); - } - }); + boost::system::error_code ec; + request req; + req.push("HGET", key, "blockHosts"); + + redis_exec(conn, ec, req, resp, y); - client.sync_commit(std::chrono::milliseconds(1000)); + if (ec) + return -1; } catch(std::exception &e) { return -1; } @@ -348,14 +354,13 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD return -2; } - if (hosts.empty()) { /* Last copy */ + if (std::get<0>(resp).value().empty()) { /* Last copy */ if (globalWeight > 0) { localWeight += globalWeight; - optional_yield y = null_yield; int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight), y); if (!ret) - ret = set_global_weight(key, 0); + ret = set_global_weight(key, 0, y); else return -1; @@ -373,20 +378,19 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD globalWeight += localWeight; - if (set_global_weight(key, globalWeight)) + if (set_global_weight(key, globalWeight, y)) return -2; ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl; - int ret = cacheNode->delete_data(dpp, victim.cacheObj.objName, y); + int ret = cacheNode->del(dpp, victim.cacheObj.objName, y); if (!ret) { - uint64_t num_entries = 100; //cacheNode->get_num_entries(dpp) TODO - correct this - ret = set_min_avg_weight(avgWeight - (localWeight/num_entries), ""/*local cache location*/); // Where else must this be set? -Sam + //ret = set_min_avg_weight(avgWeight - (localWeight/entries_map.size()), ""/*local cache location*/, y); // Where else must this be set? -Sam if (!ret) { - int age = get_age(); + int age = get_age(y); age = std::max(localWeight, age); - ret = set_age(age); + ret = set_age(age, y); if (ret) return -1; @@ -400,7 +404,23 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD return victim.size; } -int LRUPolicy::exist_key(std::string key) +void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) +{ + +} + +bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) +{ + return false; +} + +void LFUDAPolicy::shutdown() +{ + // call cancel() on the connection's executor + boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); +} + +int LRUPolicy::exist_key(std::string key, optional_yield y) { const std::lock_guard l(lru_lock); if (entries_map.count(key) != 0) { @@ -409,13 +429,16 @@ int LRUPolicy::exist_key(std::string key) return false; } -int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) +int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) { - //Does not apply to LRU + uint64_t freeSpace = cacheNode->get_free_space(dpp); + while(freeSpace < block->size) { + freeSpace = eviction(dpp, cacheNode, y); + } return 0; } -uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) +uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { const std::lock_guard l(lru_lock); auto p = entries_lru_list.front(); @@ -425,11 +448,11 @@ uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDri return cacheNode->get_free_space(dpp); } -void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) +void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) { erase(dpp, key); - const std::lock_guard l(lru_lock); - Entry *e = new Entry(key, offset, len); + + Entry *e = new Entry(key, offset, len, ""); // update version later -Sam entries_lru_list.push_back(*e); entries_map.emplace(key, e); } @@ -448,7 +471,7 @@ bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) int PolicyDriver::init() { if (policyName == "lfuda") { - cachePolicy = new LFUDAPolicy(); + cachePolicy = new LFUDAPolicy(io); return 0; } else if (policyName == "lru") { cachePolicy = new LRUPolicy(); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 9b694510717..9e2faed25c8 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -2,101 +2,132 @@ #include #include -#include #include "rgw_common.h" #include "d4n_directory.h" #include "../../rgw_redis_driver.h" +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context + namespace rgw { namespace d4n { class CachePolicy { - private: - cpp_redis::client client; - Address addr; + protected: + struct Entry : public boost::intrusive::list_base_hook<> { + std::string key; + uint64_t offset; + uint64_t len; + std::string version; + Entry(std::string& key, uint64_t offset, uint64_t len, std:: string version) : key(key), offset(offset), + len(len), version(version) {} + }; + + //The disposer object function + struct Entry_delete_disposer { + void operator()(Entry *e) { + delete e; + } + }; + typedef boost::intrusive::list List; + + //cpp_redis::client client; + //Address addr; public: CephContext* cct; - CachePolicy() : addr() {} - virtual ~CachePolicy() = default; + CachePolicy() {} + virtual ~CachePolicy() = default; - virtual void init(CephContext *_cct) { - cct = _cct; - addr.host = cct->_conf->rgw_d4n_host; - addr.port = cct->_conf->rgw_d4n_port; + virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { + this->cct = cct; + return 0; } - virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) = 0; - virtual int exist_key(std::string key) = 0; - virtual Address get_addr() { return addr; } - virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0; - virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) = 0; + virtual int exist_key(std::string key, optional_yield y) = 0; + virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; + virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; + virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) = 0; + virtual void shutdown() = 0; }; class LFUDAPolicy : public CachePolicy { private: - cpp_redis::client client; + net::io_context& io; + std::shared_ptr conn; public: - LFUDAPolicy() : CachePolicy() {} - - int set_age(int age); - int get_age(); - int set_global_weight(std::string key, int weight); - int get_global_weight(std::string key); - int set_min_avg_weight(int weight, std::string cacheLocation); - int get_min_avg_weight(); - CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode); - - virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return CachePolicy::find_client(dpp, client); } - virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); } - virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override; - virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override {} + LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) { + conn = std::make_shared(boost::asio::make_strand(io_context)); + } + ~LFUDAPolicy() { + //delete dir; + shutdown(); + } + + int set_age(int age, optional_yield y); + int get_age(optional_yield y); + int set_global_weight(std::string key, int weight, optional_yield y); + int get_global_weight(std::string key, optional_yield y); + int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y); + int get_min_avg_weight(optional_yield y); + CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y); + + virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { + this->cct = cct; + + config cfg; + cfg.addr.host = cct->_conf->rgw_d4n_host; // TODO: Replace with cache address + cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port); + + if (!cfg.addr.host.length() || !cfg.addr.port.length()) { + ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; + return -EDESTADDRREQ; + } + + conn->async_run(cfg, {}, net::consign(net::detached, conn)); + + return 0; + } + virtual int exist_key(std::string key, optional_yield y) override; + virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override; + virtual void shutdown() override; }; class LRUPolicy : public CachePolicy { - public: - struct Entry : public boost::intrusive::list_base_hook<> { - std::string key; - uint64_t offset; - uint64_t len; - Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {} - }; - LRUPolicy() = default; private: - std::mutex lru_lock; - //The disposer object function - struct Entry_delete_disposer { - void operator()(Entry *e) { - delete e; - } - }; - typedef boost::intrusive::list List; List entries_lru_list; std::unordered_map entries_map; + std::mutex lru_lock; + public: - virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return 0; }; - virtual int exist_key(std::string key) override; - virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override; - virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override; - bool erase(const DoutPrefixProvider* dpp, const std::string& key); + LRUPolicy() = default; + + virtual int exist_key(std::string key, optional_yield y) override; + virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; + virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key) override; + virtual void shutdown() override {} }; class PolicyDriver { private: + net::io_context& io; std::string policyName; - - public: CachePolicy* cachePolicy; - PolicyDriver(std::string _policyName) : policyName(_policyName) {} + public: + PolicyDriver(net::io_context& io_context, std::string _policyName) : io(io_context), policyName(_policyName) {} ~PolicyDriver() { delete cachePolicy; } int init(); + CachePolicy* get_cache_policy() { return cachePolicy; } }; } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 44da4831106..754ba164390 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -36,20 +36,22 @@ static inline Object* nextObject(Object* t) return dynamic_cast(t)->get_next(); } -D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next) +D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context) : FilterDriver(_next) { + rgw::cache::Partition partition_info; partition_info.location = g_conf()->rgw_d4n_l1_datacache_persistent_path; partition_info.name = "d4n"; partition_info.type = "read-cache"; partition_info.size = g_conf()->rgw_d4n_l1_datacache_size; + //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam cacheDriver = new rgw::cache::SSDDriver(partition_info); - objDir = new rgw::d4n::ObjectDirectory(); - blockDir = new rgw::d4n::BlockDirectory(); + objDir = new rgw::d4n::ObjectDirectory(io_context); + blockDir = new rgw::d4n::BlockDirectory(io_context); cacheBlock = new rgw::d4n::CacheBlock(); - policyDriver = new rgw::d4n::PolicyDriver("lfuda"); - lruPolicyDriver = new rgw::d4n::PolicyDriver("lru"); + policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda"); + lruPolicyDriver = new rgw::d4n::PolicyDriver(io_context, "lru"); } D4NFilterDriver::~D4NFilterDriver() @@ -68,14 +70,14 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) cacheDriver->initialize(cct, dpp); - objDir->init(cct); - blockDir->init(cct); + objDir->init(cct, dpp); + blockDir->init(cct, dpp); - policyDriver->init(); - policyDriver->cachePolicy->init(cct); + policyDriver->init(); + policyDriver->get_cache_policy()->init(cct, dpp); lruPolicyDriver->init(); - lruPolicyDriver->cachePolicy->init(cct); + lruPolicyDriver->get_cache_policy()->init(cct, dpp); return 0; } @@ -129,16 +131,14 @@ int D4NFilterObject::copy_object(User* user, const DoutPrefixProvider* dpp, optional_yield y) { - /* Build cache block copy */ - rgw::d4n::CacheBlock* copyCacheBlock = new rgw::d4n::CacheBlock(); // How will this copy work in lfuda? -Sam - - copyCacheBlock->hostsList.push_back(driver->get_cache_block()->hostsList[0]); - copyCacheBlock->size = driver->get_cache_block()->size; - copyCacheBlock->size = driver->get_cache_block()->globalWeight; // Do we want to reset the global weight? -Sam - copyCacheBlock->cacheObj.bucketName = dest_bucket->get_name(); - copyCacheBlock->cacheObj.objName = dest_object->get_key().get_oid(); - - int copy_valueReturn = driver->get_block_dir()->set_value(copyCacheBlock); + rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ + .cacheObj = { + .objName = this->get_key().get_oid(), + .bucketName = src_bucket->get_name() + }, + .blockID = 0, // TODO: get correct blockID + }; + int copy_valueReturn = driver->get_block_dir()->copy(&block, dest_object->get_name(), dest_bucket->get_name(), y); if (copy_valueReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation failed." << dendl; @@ -146,8 +146,6 @@ int D4NFilterObject::copy_object(User* user, ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation succeeded." << dendl; } - delete copyCacheBlock; - /* Append additional metadata to attributes */ rgw::sal::Attrs baseAttrs = this->get_attrs(); buffer::list bl; @@ -266,6 +264,49 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d return next->get_obj_attrs(y, dpp, target_obj); } else { + /* Set metadata locally */ + RGWQuotaInfo quota_info; + RGWObjState* astate; + std::unique_ptr user = this->driver->get_user(this->get_bucket()->get_owner()); + this->get_obj_state(dpp, &astate, y); + + for (auto it = attrs.begin(); it != attrs.end(); ++it) { + if (it->second.length() > 0) { // or return? -Sam + if (it->first == "mtime") { + parse_time(it->second.c_str(), &astate->mtime); + attrs.erase(it->first); + } else if (it->first == "object_size") { + this->set_obj_size(std::stoull(it->second.c_str())); + attrs.erase(it->first); + } else if (it->first == "accounted_size") { + astate->accounted_size = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "epoch") { + astate->epoch = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "version_id") { + this->set_instance(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "this_zone_short_id") { + astate->zone_short_id = static_cast(std::stoul(it->second.c_str())); + attrs.erase(it->first); + } else if (it->first == "user_quota.max_size") { + quota_info.max_size = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "user_quota.max_objects") { + quota_info.max_objects = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "max_buckets") { + user->set_max_buckets(std::stoull(it->second.c_str())); + attrs.erase(it->first); + } + } + } + + user->set_info(quota_info); + this->set_obj_state(*astate); + + /* Set attributes locally */ int set_attrsReturn = this->set_attrs(attrs); if (set_attrsReturn < 0) { @@ -340,7 +381,7 @@ std::unique_ptr D4NFilterDriver::get_atomic_writer(const DoutPrefixProvi owner, ptail_placement_rule, olh_epoch, unique_tag); - return std::make_unique(std::move(writer), this, obj, dpp, true); + return std::make_unique(std::move(writer), this, obj, dpp, true, y); } std::unique_ptr D4NFilterObject::get_read_op() @@ -488,6 +529,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->client_cb = cb; this->cb->set_client_cb(cb, dpp); // what's this for? -Sam + // save y here -Sam /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller @@ -534,11 +576,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if (source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) { + if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) { // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, source->driver->get_cache_driver()); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", source->driver->get_cache_driver(), y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; @@ -555,11 +597,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) { + if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) { // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, source->driver->get_cache_driver()); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", source->driver->get_cache_driver(), y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; @@ -625,12 +667,12 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int return r; } - return this->cb->flush_last_part(); + return this->cb->flush_last_part(y); /* / Execute cache replacement policy / - int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(), - source->driver->get_cache_driver()); + int policyRet = source->driver->get_policy_driver()->get_cache_policy()->get_block(dpp, source->driver->get_cache_block(), + source->driver->get_cache_driver(), y); if (policyRet < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation failed." << dendl; @@ -649,7 +691,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int cb->handle_data(bl, ofs, len); } else { / Block directory check / - int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block()); + int getDirReturn = source->driver->get_block_dir()->get(source->driver->get_cache_block()); if (getDirReturn >= -1) { if (getDirReturn == -1) { @@ -677,7 +719,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int } else { / Write tier retrieval / ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl; - getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj)); + getDirReturn = source->driver->get_obj_dir()->get(&(source->driver->get_cache_block()->cacheObj)); if (getDirReturn >= -1) { if (getDirReturn == -1) { @@ -719,7 +761,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int source->driver->get_cache_block()->cacheObj.bucketName = source->get_bucket()->get_name(); source->driver->get_cache_block()->cacheObj.objName = source->get_key().get_oid(); - int setDirReturn = tempBlockDir->set_value(source->driver->get_cache_block()); + int setDirReturn = tempBlockDir->set(source->driver->get_cache_block()); if (setDirReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl; @@ -737,8 +779,9 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int return next->iterate(dpp, ofs, end, cb, y); */ } -int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part() +int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(optional_yield y) { + save_y = &y; last_part = true; return handle_data(bl_rem, 0, bl_rem.length()); } @@ -760,22 +803,25 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl const std::lock_guard l(d4n_get_data_lock); rgw::d4n::CacheBlock block; rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); - block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port)); + //block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port)); block.cacheObj.bucketName = source->get_bucket()->get_name(); block.cacheObj.objName = source->get_key().get_oid(); if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); + block.blockID = 0; // TODO: fill out block correctly + block.version = ""; block.size = bl.length(); - block.blockId = ofs; + block.blockID = ofs; uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); while(freeSpace < block.size) { - freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver()); + freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield); } if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { - filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl.length(), filter->get_cache_driver()); + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield); /* Store block in directory */ - if (!blockDir->exist_key(oid)) { + if (!blockDir->exist_key(oid, null_yield)) { + #if 0 int ret = blockDir->set_value(&block); if (ret < 0) { ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; @@ -783,21 +829,24 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } else { ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; } + #endif } } } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); ofs += bl_len; - block.blockId = ofs; + block.blockID = ofs; + block.version = ""; block.size = bl.length(); uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); while(freeSpace < block.size) { - freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver()); + freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield); } if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { - filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl.length(), filter->get_cache_driver()); + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", filter->get_cache_driver(), null_yield); /* Store block in directory */ - if (!blockDir->exist_key(oid)) { + if (!blockDir->exist_key(oid, null_yield)) { + #if 0 int ret = blockDir->set_value(&block); if (ret < 0) { ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; @@ -805,6 +854,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } else { ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; } + #endif } } } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache @@ -818,16 +868,17 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (bl_rem.length() == rgw_get_obj_max_req_size) { std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length()); ofs += bl_rem.length(); - block.blockId = ofs; + block.blockID = ofs; block.size = bl_rem.length(); uint64_t freeSpace = filter->get_cache_driver()->get_free_space(dpp); while(freeSpace < block.size) { - freeSpace += filter->get_policy_driver()->cachePolicy->eviction(dpp, filter->get_cache_driver()); + freeSpace += filter->get_policy_driver()->get_cache_policy()->eviction(dpp, filter->get_cache_driver(), null_yield); } if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) { - filter->get_policy_driver()->cachePolicy->update(dpp, oid, ofs, bl_rem.length(), filter->get_cache_driver()); + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", filter->get_cache_driver(), null_yield); /* Store block in directory */ - if (!blockDir->exist_key(oid)) { + if (!blockDir->exist_key(oid, null_yield)) { + #if 0 int ret = blockDir->set_value(&block); if (ret < 0) { ldpp_dout(dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; @@ -835,6 +886,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } else { ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; } + #endif } } @@ -850,7 +902,14 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) { - int delDirReturn = source->driver->get_block_dir()->del_value(source->driver->get_cache_block()); + rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ + .cacheObj = { + .objName = source->get_key().get_oid(), + .bucketName = source->get_bucket()->get_name() + }, + .blockID = 0 // TODO: get correct blockID + }; + int delDirReturn = source->driver->get_block_dir()->del(&block, y); if (delDirReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation failed." << dendl; @@ -867,13 +926,11 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp currentFields.push_back(attrs->first); } - int delObjReturn = source->driver->get_cache_driver()->delete_data(dpp, source->get_key().get_oid(), y); + int delObjReturn = source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y); if (delObjReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object operation failed." << dendl; } else { - Attrs delattrs = source->get_attrs(); - delObjReturn = source->driver->get_cache_driver()->delete_attrs(dpp, source->get_key().get_oid(), delattrs, y); ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation succeeded." << dendl; } @@ -896,14 +953,15 @@ int D4NFilterWriter::prepare(optional_yield y) int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) { /* - int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(), data); + int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(), + data, y); if (append_dataReturn < 0) { ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl; } else { ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl; - } -*/ + }*/ + return next->process(std::move(data), offset); } @@ -917,14 +975,22 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, const req_context& rctx, uint32_t flags) { - rgw::d4n::BlockDirectory* tempBlockDir = driver->get_block_dir(); - - driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); - driver->get_cache_block()->size = accounted_size; - driver->get_cache_block()->cacheObj.bucketName = obj->get_bucket()->get_name(); - driver->get_cache_block()->cacheObj.objName = obj->get_key().get_oid(); - - int setDirReturn = tempBlockDir->set_value(driver->get_cache_block()); + rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ + .cacheObj = { + .objName = obj->get_key().get_oid(), + .bucketName = obj->get_bucket()->get_name(), + .creationTime = 0, // TODO: get correct value + .dirty = false, + .hostsList = { "127.0.0.1:6379" /*current cache addr*/ } // TODO: fix + }, + .blockID = 0, // TODO: get correct version/blockID + .version = "", + .size = accounted_size, + .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_host + ":" + + std::to_string(driver->get_block_dir()->cct->_conf->rgw_d4n_port) } + }; + + int setDirReturn = driver->get_block_dir()->set(&block, y); if (setDirReturn < 0) { ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl; @@ -984,12 +1050,16 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, baseAttrs.insert(attrs.begin(), attrs.end()); -/* int set_attrsReturn = driver->get_cache_driver()->set_attrs(save_dpp, obj->get_key().get_oid(), baseAttrs); - - if (set_attrsReturn < 0) { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation failed." << dendl; + // is the accounted_size equivalent to the length? -Sam + + //bufferlist bl_empty; + //int putReturn = driver->get_cache_driver()-> + // put(save_dpp, obj->get_key().get_oid(), bl_empty, accounted_size, baseAttrs, y); /* Data already written during process call */ + /* + if (putReturn < 0) { + ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation failed." << dendl; } else { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation succeeded." << dendl; + ldpp_dout(save_dpp, 20) << "D4N Filter: Cache put operation succeeded." << dendl; } */ return ret; @@ -999,9 +1069,9 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, extern "C" { -rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next) +rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next, void* io_context) { - rgw::sal::D4NFilterDriver* driver = new rgw::sal::D4NFilterDriver(next); + rgw::sal::D4NFilterDriver* driver = new rgw::sal::D4NFilterDriver(next, *static_cast(io_context)); return driver; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index bda8aa9f199..0e0bb6e4d94 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -28,6 +28,12 @@ #include "driver/d4n/d4n_policy.h" #include +#include +#include +#include + +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context namespace rgw { namespace sal { @@ -41,8 +47,7 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::PolicyDriver* lruPolicyDriver; public: - D4NFilterDriver(Driver* _next); - + D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context); virtual ~D4NFilterDriver(); virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override; @@ -116,10 +121,12 @@ class D4NFilterObject : public FilterObject { oid(_oid), source(_source) {} + optional_yield* save_y; + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp) { this->client_cb = client_cb; this->dpp = dpp;} void set_ofs(uint64_t ofs) { this->ofs = ofs; } - int flush_last_part(); + int flush_last_part(optional_yield y); void bypass_cache_write() { this->write_to_cache = false; } }; @@ -201,16 +208,17 @@ class D4NFilterWriter : public FilterWriter { D4NFilterDriver* driver; const DoutPrefixProvider* save_dpp; bool atomic; + optional_yield y; public: D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _driver, Object* _obj, - const DoutPrefixProvider* _dpp) : FilterWriter(std::move(_next), _obj), - driver(_driver), - save_dpp(_dpp), atomic(false) {} + const DoutPrefixProvider* _dpp, optional_yield _y) : FilterWriter(std::move(_next), _obj), + driver(_driver), + save_dpp(_dpp), atomic(false), y(_y) {} D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _driver, Object* _obj, - const DoutPrefixProvider* _dpp, bool _atomic) : FilterWriter(std::move(_next), _obj), - driver(_driver), - save_dpp(_dpp), atomic(_atomic) {} + const DoutPrefixProvider* _dpp, bool _atomic, optional_yield _y) : FilterWriter(std::move(_next), _obj), + driver(_driver), + save_dpp(_dpp), atomic(_atomic), y(_y) {} virtual ~D4NFilterWriter() = default; virtual int prepare(optional_yield y); diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index 44403d248dd..732e2cd5ce0 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -20,6 +20,7 @@ class CacheDriver { virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) = 0; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) = 0; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) = 0; + virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0; virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) = 0; diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index e9c1006b606..b1d0db6fb3b 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -1,46 +1,64 @@ #include -#include "rgw_redis_driver.h" -//#include "rgw_ssd_driver.h" // fix -Sam -//#include +#include +#include -#define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context +#include "rgw_redis_driver.h" +#include "common/async/blocked_completion.h" namespace rgw { namespace cache { std::unordered_map RedisDriver::partitions; -std::vector< std::pair > build_attrs(rgw::sal::Attrs* binary) +std::list build_attrs(rgw::sal::Attrs* binary) { - std::vector< std::pair > values; + std::list values; rgw::sal::Attrs::iterator attrs; /* Convert to vector */ if (binary != NULL) { for (attrs = binary->begin(); attrs != binary->end(); ++attrs) { - values.push_back(std::make_pair(attrs->first, attrs->second.to_str())); + values.push_back(attrs->first); + values.push_back(attrs->second.to_str()); } } return values; } -int RedisDriver::find_client(const DoutPrefixProvider* dpp) +// initiate a call to async_exec() on the connection's executor +struct initiate_exec { + std::shared_ptr conn; + boost::redis::request req; + + using executor_type = boost::redis::connection::executor_type; + executor_type get_executor() const noexcept { return conn->get_executor(); } + + template + void operator()(Handler handler, Response& resp) + { + conn->async_exec(req, resp, boost::asio::consign(std::move(handler), conn)); + } +}; + +template +auto async_exec(std::shared_ptr conn, + const boost::redis::request& req, + Response& resp, CompletionToken&& token) { - if (client.is_connected()) - return 0; + return boost::asio::async_initiate( + initiate_exec{std::move(conn), req}, token, resp); +} - if (addr.host == "" || addr.port == 0) { - ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; - return EDESTADDRREQ; +template +void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response& resp, optional_yield y) +{ + if (y) { + auto yield = y.get_yield_context(); + async_exec(std::move(conn), req, resp, yield[ec]); + } else { + async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]); } - - client.connect(addr.host, addr.port, nullptr); - - if (!client.is_connected()) - return ECONNREFUSED; - - return 0; } int RedisDriver::add_partition_info(Partition& info) @@ -137,24 +155,20 @@ int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) { this->cct = cct; - addr.host = cct->_conf->rgw_d4n_host; // change later -Sam - addr.port = cct->_conf->rgw_d4n_port; - if (partition_info.location.back() != '/') { partition_info.location += "/"; } - if (addr.host == "" || addr.port == 0) { + config cfg; + cfg.addr.host = cct->_conf->rgw_d4n_host; // TODO: Replace with cache address + cfg.addr.port = std::to_string(cct->_conf->rgw_d4n_port); + + if (!cfg.addr.host.length() || !cfg.addr.port.length()) { ldpp_dout(dpp, 10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; return -EDESTADDRREQ; } - client.connect("127.0.0.1", 6379, nullptr); - - if (!client.is_connected()) { - ldpp_dout(dpp, 10) << "RGW Redis Cache: Could not connect to redis cache endpoint." << dendl; - return ECONNREFUSED; - } + conn->async_run(cfg, {}, net::consign(net::detached, conn)); return 0; } @@ -163,24 +177,23 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buff { std::string entry = partition_info.location + key; - if (!client.is_connected()) - find_client(dpp); - /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam try { - std::string result; + boost::system::error_code ec; + response resp; auto redisAttrs = build_attrs(&attrs); - redisAttrs.push_back({"data", bl.to_str()}); - client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + if (bl.length()) { + redisAttrs.push_back("data"); + redisAttrs.push_back(bl.to_str()); + } - client.sync_commit(std::chrono::milliseconds(1000)); + request req; + req.push_range("HMSET", entry, redisAttrs); + + redis_exec(conn, ec, req, resp, y); - if (result != "OK") { + if (std::get<0>(resp).value() != "OK" || ec) { return -1; } } catch(std::exception &e) { @@ -194,75 +207,89 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_ { std::string entry = partition_info.location + key; - if (!client.is_connected()) - find_client(dpp); - - /* Retrieve existing values from cache */ + /* Retrieve existing values from cache */ try { - client.hgetall(entry, [&bl, &attrs](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { - if (arr[i].as_string() == "data") { - bl.append(arr[i + 1].as_string()); - } else { - buffer::list bl_value; - bl_value.append(arr[i + 1].as_string()); - attrs.insert({arr[i].as_string(), bl_value}); - bl_value.clear(); - } - } - } - } - }); + boost::system::error_code ec; + response< std::map > resp; + request req; + req.push("HGETALL", entry); - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { + redis_exec(conn, ec, req, resp, y); + + if (ec) return -1; - } + for (auto const& it : std::get<0>(resp).value()) { + if (it.first == "data") { + bl.append(it.second); + } else { + buffer::list bl_value; + bl_value.append(it.second); + attrs.insert({it.first, bl_value}); + bl_value.clear(); + } + } + } catch(std::exception &e) { + return -1; + } return 0; } +int RedisDriver::del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) +{ + std::string entry = partition_info.location + key; + + try { + boost::system::error_code ec; + response resp; + request req; + req.push("DEL", entry); + + redis_exec(conn, ec, req, resp, y); + + if (ec) + return -1; + + return std::get<0>(resp).value() - 1; + } catch(std::exception &e) { + return -1; + } +} + int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) { std::string value; std::string entry = partition_info.location + key; - if (!client.is_connected()) - find_client(dpp); - try { - client.hget(entry, "data", [&value](cpp_redis::reply &reply) { - if (!reply.is_null()) { - value = reply.as_string(); - } - }); + boost::system::error_code ec; + response resp; + request req; + req.push("HGET", entry, "data"); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); + + if (ec) + return -1; + + value = std::get<0>(resp).value(); } catch(std::exception &e) { return -1; } try { // do we want key check here? -Sam /* Append to existing value or set as new value */ + boost::system::error_code ec; + response resp; std::string newVal = value + bl_data.to_str(); - std::vector< std::pair > field; - field.push_back({"data", newVal}); - std::string result; - client.hmset(entry, field, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + request req; + req.push("HMSET", entry, "data", newVal); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (result != "OK") { + if (std::get<0>(resp).value() != "OK" || ec) { return -1; } } catch(std::exception &e) { @@ -275,47 +302,38 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) { std::string entry = partition_info.location + key; - - if (!client.is_connected()) - find_client(dpp); - - int exists = -2; + response value; + response resp; try { - client.hexists(entry, "data", [&exists](cpp_redis::reply &reply) { - if (!reply.is_null()) { - exists = reply.as_integer(); - } - }); + boost::system::error_code ec; + request req; + req.push("HEXISTS", entry, "data"); - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } + redis_exec(conn, ec, req, resp, y); - if (exists) { - try { - int result; - std::vector deleteField; - deleteField.push_back("data"); + if (ec) + return -1; + } catch(std::exception &e) { + return -1; + } - client.hdel(entry, deleteField, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); + if (std::get<0>(resp).value()) { + try { + boost::system::error_code ec; + request req; + req.push("HDEL", entry, "data"); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, value, y); - if (!result) { - return -1; - } - } catch(std::exception &e) { + if (!std::get<0>(value).value() || ec) { return -1; } - } else { - return 0; /* No delete was necessary */ + } catch(std::exception &e) { + return -1; } + } + return 0; } @@ -323,31 +341,28 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key { std::string entry = partition_info.location + key; - if (!client.is_connected()) - find_client(dpp); - try { - client.hgetall(entry, [&attrs](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { - if (arr[i].as_string() != "data") { - buffer::list bl_value; - bl_value.append(arr[i + 1].as_string()); - attrs.insert({arr[i].as_string(), bl_value}); - bl_value.clear(); - } - } - } - } - }); + boost::system::error_code ec; + response< std::map > resp; + request req; + req.push("HGETALL", entry); - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { + redis_exec(conn, ec, req, resp, y); + + if (ec) return -1; + + for (auto const& it : std::get<0>(resp).value()) { + if (it.first != "data") { + buffer::list bl_value; + bl_value.append(it.second); + attrs.insert({it.first, bl_value}); + bl_value.clear(); + } } + } catch(std::exception &e) { + return -1; + } return 0; } @@ -359,24 +374,20 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key std::string entry = partition_info.location + key; - if (!client.is_connected()) - find_client(dpp); - /* Every attr set will be treated as new */ try { + boost::system::error_code ec; + response resp; std::string result; - auto redisAttrs = build_attrs(&attrs); + std::list redisAttrs = build_attrs(&attrs); - client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + request req; + req.push_range("HMSET", entry, redisAttrs); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (result != "OK") { -return -1; + if (std::get<0>(resp).value() != "OK" || ec) { + return -1; } } catch(std::exception &e) { return -1; @@ -389,22 +400,17 @@ int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& { std::string entry = partition_info.location + key; - if (!client.is_connected()) - find_client(dpp); - try { - std::string result; + boost::system::error_code ec; + response resp; auto redisAttrs = build_attrs(&attrs); - client.hmset(entry, redisAttrs, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); + request req; + req.push_range("HMSET", entry, redisAttrs); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); - if (result != "OK") { + if (std::get<0>(resp).value() != "OK" || ec) { return -1; } } catch(std::exception &e) { @@ -418,236 +424,126 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& { std::string entry = partition_info.location + key; - if (!client.is_connected()) - find_client(dpp); - - std::vector getFields; - try { - client.hgetall(entry, [&getFields](cpp_redis::reply &reply) { -if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { - getFields.push_back(arr[i].as_string()); - } - } -} - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } + boost::system::error_code ec; + response resp; + auto redisAttrs = build_attrs(&del_attrs); - auto redisAttrs = build_attrs(&del_attrs); - std::vector redisFields; - - std::transform(begin(redisAttrs), end(redisAttrs), std::back_inserter(redisFields), - [](auto const& pair) { return pair.first; }); - - /* Only delete attributes that have been stored */ - for (const auto& it : redisFields) { - if (std::find(getFields.begin(), getFields.end(), it) == getFields.end()) { - redisFields.erase(std::find(redisFields.begin(), redisFields.end(), it)); - } - } - - try { - int result = 0; + request req; + req.push_range("HDEL", entry, redisAttrs); - client.hdel(entry, redisFields, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); + redis_exec(conn, ec, req, resp, y); - client.sync_commit(std::chrono::milliseconds(1000)); + if (ec) + return -1; - return result - 1; + return std::get<0>(resp).value(); } catch(std::exception &e) { return -1; } - - ldpp_dout(dpp, 20) << "RGW Redis Cache: Object is not in cache." << dendl; - return -2; } std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) { std::string entry = partition_info.location + key; - std::string attrValue; - - if (!client.is_connected()) - find_client(dpp); - - int exists = -2; - std::string getValue; + response value; + response resp; /* Ensure field was set */ try { - client.hexists(entry, attr_name, [&exists](cpp_redis::reply& reply) { - if (!reply.is_null()) { - exists = reply.as_integer(); - } - }); + boost::system::error_code ec; + request req; + req.push("HEXISTS", entry, attr_name); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); + + if (ec) + return {}; } catch(std::exception &e) { return {}; } - if (!exists) { + if (!std::get<0>(resp).value()) { ldpp_dout(dpp, 20) << "RGW Redis Cache: Attribute was not set." << dendl; return {}; } /* Retrieve existing value from cache */ try { - client.hget(entry, attr_name, [&exists, &attrValue](cpp_redis::reply &reply) { - if (!reply.is_null()) { - attrValue = reply.as_string(); - } - }); + boost::system::error_code ec; + request req; + req.push("HGET", entry, attr_name); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, value, y); + + if (ec) + return {}; } catch(std::exception &e) { return {}; } - return attrValue; + return std::get<0>(value).value(); } -int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal, optional_yield y) +int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) { std::string entry = partition_info.location + key; - int result = 0; - - if (!client.is_connected()) - find_client(dpp); + response resp; /* Every attr set will be treated as new */ try { - client.hset(entry, attr_name, attrVal, [&result](cpp_redis::reply& reply) { - if (!reply.is_null()) { - result = reply.as_integer(); - } - }); + boost::system::error_code ec; + request req; + req.push("HSET", entry, attr_name, attr_val); - client.sync_commit(std::chrono::milliseconds(1000)); + redis_exec(conn, ec, req, resp, y); + + if (ec) + return {}; } catch(std::exception &e) { return -1; } - return result - 1; -} -#if 0 -std::unique_ptr RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) -{ - return std::make_unique(this); -} -#endif -rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) -{ - rgw_raw_obj r_obj; - r_obj.oid = key; - - return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id); + return std::get<0>(resp).value(); } -int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) +static Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr conn, + off_t read_ofs, off_t read_len, const std::string& key) { - ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl; - aio_cb.reset(new struct aiocb); - memset(aio_cb.get(), 0, sizeof(struct aiocb)); - aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY)); - - if (aio_cb->aio_fildes < 0) { - int err = errno; - ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl; - return -err; - } - - if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) { - posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise); - } + return [y, conn, key] (Aio* aio, AioResult& r) mutable { + using namespace boost::asio; + spawn::yield_context yield = y.get_yield_context(); + async_completion init(yield); + auto ex = get_associated_executor(init.completion_handler); - bufferptr bp(read_len); - aio_cb->aio_buf = bp.c_str(); - result.append(std::move(bp)); + boost::redis::request req; + req.push("HGET", key, "data"); - aio_cb->aio_nbytes = read_len; - aio_cb->aio_offset = read_ofs; - aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD; - aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch; - aio_cb->aio_sigevent.sigev_notify_attributes = nullptr; - aio_cb->aio_sigevent.sigev_value.sival_ptr = arg; + // TODO: Make unique pointer once support is added + auto s = std::make_shared(); + auto& resp = s->resp; - return 0; + conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s})); + }; } -void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval) +rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) { - auto p = std::unique_ptr{static_cast(sigval.sival_ptr)}; - auto op = std::move(p->user_data); - const int ret = -aio_error(op.aio_cb.get()); - boost::system::error_code ec; - if (ret < 0) { - ec.assign(-ret, boost::system::system_category()); - } + std::string entry = partition_info.location + key; + rgw_raw_obj r_obj; + r_obj.oid = key; - ceph::async::dispatch(std::move(p), ec, std::move(op.result)); + return aio->get(r_obj, redis_read_op(y, conn, ofs, len, entry), cost, id); } -template -auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler) -{ - auto p = Completion::create(ex1, std::move(handler)); - return p; -} - -template -auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, - off_t read_ofs, off_t read_len, CompletionToken&& token) -{ - std::string location = partition_info.location + key; - ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl; - - using Op = AsyncReadOp; - using Signature = typename Op::Signature; - boost::asio::async_completion init(token); - auto p = Op::create(ctx.get_executor(), init.completion_handler); - auto& op = p->user_data; - - int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get()); - if (0 == ret) { - ret = ::aio_read(op.aio_cb.get()); - } -// ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl; - /* if(ret < 0) { - auto ec = boost::system::error_code{-ret, boost::system::system_category()}; - ceph::async::post(std::move(p), ec, bufferlist{}); - } else { - (void)p.release(); - }*/ - //return init.result.get(); -} #if 0 -void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) -{ - using namespace boost::asio; - async_completion init(y.get_yield_context()); - auto ex = get_associated_executor(init.completion_handler); - - ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): key=" << key << dendl; - cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, RedisDriver::libaio_handler{aio, r})); -} - -void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) +int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) { + // call cancel() on the connection's executor + boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); } #endif + int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) { return 0; diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index bbe430d31ec..9b818b42c39 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -1,22 +1,30 @@ #pragma once -//#include #include +#include + #include "common/async/completion.h" #include "rgw_common.h" #include "rgw_cache_driver.h" -#include -#include "driver/d4n/d4n_directory.h" +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context namespace rgw { namespace cache { +namespace net = boost::asio; +using boost::redis::config; +using boost::redis::connection; +using boost::redis::request; +using boost::redis::response; + class RedisDriver : public CacheDriver { public: - RedisDriver(Partition& _partition_info) : partition_info(_partition_info), - free_space(_partition_info.size), - outstanding_write_size(0) + RedisDriver(net::io_context& io_context, Partition& _partition_info) : partition_info(_partition_info), + free_space(_partition_info.size), + outstanding_write_size(0) { + conn = std::make_shared(boost::asio::make_strand(io_context)); add_partition_info(_partition_info); } virtual ~RedisDriver() @@ -35,72 +43,47 @@ class RedisDriver : public CacheDriver { virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) override; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override; + virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; + virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) override; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override; - virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override; virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override; + virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override; virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override; virtual int delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs, optional_yield y) override; - virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) override; virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) override; + virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) override; - virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; + struct redis_response { + boost::redis::response resp; + }; + void shutdown(); - struct libaio_handler { // should this be the same as SSDDriver? -Sam + struct redis_aio_handler { rgw::Aio* throttle = nullptr; rgw::AioResult& r; + std::shared_ptr s; - // read callback - void operator()(boost::system::error_code ec, bufferlist bl) const { + /* Read Callback */ + void operator()(boost::system::error_code ec, long unsigned int size) const { r.result = -ec.value(); - r.data = std::move(bl); + r.data.append(std::get<0>(s->resp).value().c_str()); throttle->put(r); } }; - template - auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, - off_t read_ofs, off_t read_len, CompletionToken&& token); protected: - cpp_redis::client client; - rgw::d4n::Address addr; + std::shared_ptr conn; + static std::unordered_map partitions; Partition partition_info; uint64_t free_space; uint64_t outstanding_write_size; CephContext* cct; - int find_client(const DoutPrefixProvider* dpp); int add_partition_info(Partition& info); int remove_partition_info(Partition& info); - - private: - // unique_ptr with custom deleter for struct aiocb - struct libaio_aiocb_deleter { - void operator()(struct aiocb* c) { - if(c->aio_fildes > 0) { - if( ::close(c->aio_fildes) != 0) { - } - } - delete c; - } - }; - - using unique_aio_cb_ptr = std::unique_ptr; - - struct AsyncReadOp { - bufferlist result; - unique_aio_cb_ptr aio_cb; - using Signature = void(boost::system::error_code, bufferlist); - using Completion = ceph::async::Completion; - - int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg); - static void libaio_cb_aio_dispatch(sigval sigval); - - template - static auto create(const Executor1& ex1, CompletionHandler&& handler); - }; }; } } // namespace rgw::cache diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 0f6e1745d71..574e467faac 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -63,7 +63,7 @@ extern rgw::sal::Driver* newPOSIXDriver(rgw::sal::Driver* next); #endif extern rgw::sal::Driver* newBaseFilter(rgw::sal::Driver* next); #ifdef WITH_RADOSGW_D4N -extern rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next); +extern rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next, boost::asio::io_context* io_context); #endif } @@ -239,7 +239,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* #ifdef WITH_RADOSGW_D4N else if (cfg.filter_name.compare("d4n") == 0) { rgw::sal::Driver* next = driver; - driver = newD4NFilter(next); + driver = newD4NFilter(next, &io_context); if (driver->initialize(cct, dpp) < 0) { delete driver; diff --git a/src/rgw/rgw_ssd_driver.h b/src/rgw/rgw_ssd_driver.h index 9472eec3ecc..962b5834e14 100644 --- a/src/rgw/rgw_ssd_driver.h +++ b/src/rgw/rgw_ssd_driver.h @@ -14,6 +14,7 @@ public: virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) override; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override; + virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override { return -1; } // TODO: implement virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y) override;