From 85c04af9de120ad14a6717af5419607047c1a681 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Tue, 22 Apr 2025 08:34:04 +0530 Subject: [PATCH] rgw/d4n: add pipelining to PUT in write-cache and read cache for data blocks directory entries introduced a pipelined version of set command. Signed-off-by: Pritha Srivastava --- src/rgw/driver/d4n/d4n_directory.cc | 368 +++++++++++----------------- src/rgw/driver/d4n/d4n_directory.h | 48 ++-- src/rgw/driver/d4n/d4n_policy.cc | 5 +- src/rgw/driver/d4n/rgw_sal_d4n.cc | 186 ++++++-------- src/rgw/driver/d4n/rgw_sal_d4n.h | 2 + src/test/rgw/test_d4n_directory.cc | 91 ------- 6 files changed, 259 insertions(+), 441 deletions(-) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index e5979bba8e5c3..169799097c005 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -157,29 +157,31 @@ void redis_exec_connection_pool(const DoutPrefixProvider* dpp, redis_exec_cp(redis_pool, ec, req, resp, y); } -int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi) +int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline) { try { boost::system::error_code ec; - request req; - req.push("ZADD", bucket_id, "CH", std::to_string(0), member); + if (pipeline && pipeline->is_pipeline()) { + request& req = pipeline->get_request(); + req.push("ZADD", bucket_id, "CH", std::to_string(0), member); + } else { + request req; + req.push("ZADD", bucket_id, "CH", std::to_string(0), member); response resp; redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); - if (ec) { - ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; - return -ec.value(); - } + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } - if (!multi) { if (std::get<0>(resp).value() != "1") { ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; return -ENOENT; } } - } catch (std::exception &e) { ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; @@ -189,7 +191,7 @@ int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& buck } -int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi) +int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y) { try { boost::system::error_code ec; @@ -204,11 +206,9 @@ int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& buck return -ec.value(); } - if (!multi) { - if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -ENOENT; - } + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -ENOENT; } } catch (std::exception &e) { @@ -603,29 +603,37 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec return ret; } -int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi) +int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, Pipeline* pipeline) { std::string key = build_index(object); try { boost::system::error_code ec; - request req; - req.push("ZADD", key, "CH", std::to_string(score), member); + if (pipeline && pipeline->is_pipeline()) { + request& req = pipeline->get_request(); + req.push("ZADD", key, "CH", std::to_string(score), member); + } else { + request req; + req.push("ZADD", key, "CH", std::to_string(score), member); - response resp; - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + response resp; + if(!redis_pool)[[unlikely]] + { + redis_exec(conn, ec, req, resp, y); + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl; + } + else[[likely]] + redis_exec_cp(redis_pool, ec, req, resp, y); - if (ec) { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; - return -ec.value(); - } + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } - if (!multi) { if (std::get<0>(resp).value() != "1") { ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; return -ENOENT; } } - } catch (std::exception &e) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; @@ -692,7 +700,7 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, return 0; } -int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi) +int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y) { std::string key = build_index(object); try { @@ -708,11 +716,9 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const return -ec.value(); } - if (!multi) { - if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -ENOENT; - } + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -ENOENT; } } catch (std::exception &e) { @@ -723,7 +729,7 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const return 0; } -int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi) +int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y) { std::string key = build_index(object); try { @@ -739,11 +745,9 @@ int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* o return -ec.value(); } - if (!multi) { - if (std::get<0>(resp).value() == "0") { - ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl; - return -ENOENT; - } + if (std::get<0>(resp).value() == "0") { + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl; + return -ENOENT; } } catch (std::exception &e) { @@ -836,52 +840,46 @@ int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, return std::get<0>(resp).value(); } -int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) +template +int BlockDirectory::set_values(const DoutPrefixProvider* dpp, CacheBlock& block, Container& redisValues, optional_yield y) { - /* For existing keys, call get method beforehand. - Sets completely overwrite existing values. */ - std::string key = build_index(block); - ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; - std::string hosts; - std::list redisValues; - /* Creating a redisValues of the entry's properties */ redisValues.push_back("blockID"); - redisValues.push_back(std::to_string(block->blockID)); + redisValues.push_back(std::to_string(block.blockID)); redisValues.push_back("version"); - redisValues.push_back(block->version); + redisValues.push_back(block.version); redisValues.push_back("deleteMarker"); int ret = -1; - if ((ret = check_bool(std::to_string(block->deleteMarker))) != -EINVAL) { - block->deleteMarker = (ret != 0); + if ((ret = check_bool(std::to_string(block.deleteMarker))) != -EINVAL) { + block.deleteMarker = (ret != 0); } else { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value for delete marker" << dendl; return -EINVAL; } - redisValues.push_back(std::to_string(block->deleteMarker)); + redisValues.push_back(std::to_string(block.deleteMarker)); redisValues.push_back("size"); - redisValues.push_back(std::to_string(block->size)); + redisValues.push_back(std::to_string(block.size)); redisValues.push_back("globalWeight"); - redisValues.push_back(std::to_string(block->globalWeight)); + redisValues.push_back(std::to_string(block.globalWeight)); redisValues.push_back("objName"); - redisValues.push_back(block->cacheObj.objName); + redisValues.push_back(block.cacheObj.objName); redisValues.push_back("bucketName"); - redisValues.push_back(block->cacheObj.bucketName); + redisValues.push_back(block.cacheObj.bucketName); redisValues.push_back("creationTime"); - redisValues.push_back(block->cacheObj.creationTime); + redisValues.push_back(block.cacheObj.creationTime); redisValues.push_back("dirty"); - if ((ret = check_bool(std::to_string(block->cacheObj.dirty))) != -EINVAL) { - block->cacheObj.dirty = (ret != 0); + if ((ret = check_bool(std::to_string(block.cacheObj.dirty))) != -EINVAL) { + block.cacheObj.dirty = (ret != 0); } else { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl; return -EINVAL; } - redisValues.push_back(std::to_string(block->cacheObj.dirty)); + redisValues.push_back(std::to_string(block.cacheObj.dirty)); redisValues.push_back("hosts"); - + hosts.clear(); - for (auto const& host : block->cacheObj.hostsList) { + for (auto const& host : block.cacheObj.hostsList) { if (hosts.empty()) hosts = host + "_"; else @@ -893,23 +891,82 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option redisValues.push_back(hosts); redisValues.push_back("etag"); - redisValues.push_back(block->cacheObj.etag); + redisValues.push_back(block.cacheObj.etag); redisValues.push_back("objSize"); - redisValues.push_back(std::to_string(block->cacheObj.size)); + redisValues.push_back(std::to_string(block.cacheObj.size)); redisValues.push_back("userId"); - redisValues.push_back(block->cacheObj.user_id); + redisValues.push_back(block.cacheObj.user_id); redisValues.push_back("displayName"); - redisValues.push_back(block->cacheObj.display_name); + redisValues.push_back(block.cacheObj.display_name); + + return 0; +} + +int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, Pipeline* pipeline) +{ + /* For existing keys, call get method beforehand. + Sets completely overwrite existing values. */ + std::string key = build_index(block); + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; + + std::vector redisValues; + + auto ret = set_values(dpp, *block, redisValues, y); + if (ret < 0) { + return ret; + } try { boost::system::error_code ec; response resp; - request req; + if (pipeline && pipeline->is_pipeline()) { + request& req = pipeline->get_request(); + req.push_range("HSET", key, redisValues); + } else { + request req; + req.push_range("HSET", key, redisValues); + + if(!redis_pool)[[unlikely]] + { + redis_exec(conn, ec, req, resp, y); + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl; + } + else[[likely]] + redis_exec_cp(redis_pool, ec, req, resp, y); + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BlockDirectory::set(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y) +{ + request req; + for (auto block : blocks) { + std::string key = build_index(&block); + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; + + //std::string hosts; + std::list redisValues; + auto ret = set_values(dpp, block, redisValues, y); + if (ret < 0) { + return ret; + } req.push_range("HSET", key, redisValues); + } - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); - - if (ec) { + try { + boost::system::error_code ec; + boost::redis::generic_response resp; + redis_exec(conn, ec, req, resp, y); + if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } @@ -979,7 +1036,7 @@ template int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y) { request req; - redis_response<100, std::optional>>::type resp; + typename redis_response>>::type resp; for (auto block : blocks) { std::string key = build_index(&block); std::vector fields; @@ -1279,7 +1336,7 @@ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const } } -int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi) +int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; @@ -1288,17 +1345,17 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option boost::system::error_code ec; request req; req.push("DEL", key); - if (!multi) { - response resp; - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); - - if (!std::get<0>(resp).value()) { - ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl; - return -ENOENT; - } - } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string - response resp; - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + response resp; + if(!redis_pool)[[unlikely]] + { + redis_exec(conn, ec, req, resp, y); + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl; + } + else[[likely]] + redis_exec_cp(redis_pool, ec, req, resp, y); + if (!std::get<0>(resp).value()) { + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl; + return -ENOENT; } if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1443,7 +1500,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block return 0; } -int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi) +int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y) { std::string key = build_index(block); try { @@ -1458,11 +1515,9 @@ int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, doubl ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - if (!multi) { - if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; - } + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; } } catch (std::exception &e) { @@ -1565,147 +1620,22 @@ int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const return 0; } -int BlockDirectory::watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) -{ - std::string key = build_index(block); - try { - boost::system::error_code ec; - request req; - req.push("WATCH", key); - response resp; - - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); - - if (ec) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; - return -ec.value(); - } - - if (std::get<0>(resp).value() != "OK") { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; - } - - } catch (std::exception &e) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; - return -EINVAL; - } - - return 0; -} - -int BlockDirectory::exec(const DoutPrefixProvider* dpp, std::vector& responses, optional_yield y) -{ - try { - boost::system::error_code ec; - request req; - req.push("EXEC"); - boost::redis::generic_response resp; - - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); - - if (ec) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; - std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl; - return -ec.value(); - } - - for (uint64_t i = 0; i < resp.value().size(); i++) { - ldpp_dout(dpp, 20) << "BlockDirectory::" << __func__ << "() MULTI: " << resp.value().front().value << dendl; - responses.emplace_back(resp.value().front().value); - boost::redis::consume_one(resp); - } - - } catch (std::exception &e) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; - std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << std::endl; - return -EINVAL; - } - - return 0; -} - -int BlockDirectory::multi(const DoutPrefixProvider* dpp, optional_yield y) -{ - try { - boost::system::error_code ec; - request req; - req.push("MULTI"); - response resp; - - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); - - if (ec) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; - return -ec.value(); - } - - if (std::get<0>(resp).value() != "OK") { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; - } - - } catch (std::exception &e) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; - return -EINVAL; - } - - return 0; -} - -int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y) -{ - try { - boost::system::error_code ec; - request req; - req.push("DISCARD"); - response resp; - - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); - - if (ec) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; - return -ec.value(); - } - - if (std::get<0>(resp).value() != "OK") { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; - } - - } catch (std::exception &e) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; - return -EINVAL; - } - - return 0; -} - -int BlockDirectory::unwatch(const DoutPrefixProvider* dpp, optional_yield y) +int Pipeline::execute(const DoutPrefixProvider* dpp, optional_yield y) { + boost::redis::generic_response resp; try { boost::system::error_code ec; - request req; - req.push("UNWATCH"); - response resp; - - redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + pipeline_mode = false; + redis_exec(conn, ec, req, resp, y); if (ec) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - - if (std::get<0>(resp).value() != "OK") { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; - } - } catch (std::exception &e) { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } - return 0; } diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 7f281b338b3b2..df226f4c3fd8f 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -8,9 +8,15 @@ #include #include #include +#include namespace rgw { namespace d4n { +template + concept SeqContainer = requires(T& t, typename T::value_type v) { + t.push_back(v); + }; + using boost::redis::connection; class RedisPool { public: @@ -147,11 +153,26 @@ class Directory { Directory() {} }; +class Pipeline { + public: + Pipeline(std::shared_ptr& conn) : conn(conn) {} + void start() { pipeline_mode = true; } + //executes all commands and sets pipeline mode to false + int execute(const DoutPrefixProvider* dpp, optional_yield y); + bool is_pipeline() { return pipeline_mode; } + request& get_request() { return req; } + + private: + std::shared_ptr conn; + request req; + bool pipeline_mode{false}; +}; + class BucketDirectory: public Directory { public: BucketDirectory(std::shared_ptr& conn) : conn(conn) {} - int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi=false); - int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi=false); + int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr); + int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y); int zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector& members, optional_yield y); int zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector& members, uint64_t next_cursor, optional_yield y); int zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y); @@ -171,11 +192,11 @@ class ObjectDirectory: public Directory { int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y); - int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi=false); + int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr); int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& start, const std::string& stop, std::vector& members, optional_yield y); - int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi=false); - int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi=false); + int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y); + int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y); int zrank(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, std::string& index, optional_yield y); //Return value is the incremented value, else return error int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); @@ -192,7 +213,9 @@ class BlockDirectory: public Directory { int exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); - int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); + //Pipelined version of set + int set(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); + int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, Pipeline* pipeline=nullptr); int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); //Pipelined version of get using boost::redis::response for list bucket template @@ -200,23 +223,20 @@ class BlockDirectory: public Directory { //Pipelined version of get using boost::redis::generic_response int get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y); - int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false); + int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y); int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y); - int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi=false); + int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y); int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y); - int watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); - //Move MULTI, EXEC and DISCARD to directory? As they do not operate on a key - int exec(const DoutPrefixProvider* dpp, std::vector& responses, optional_yield y); - int multi(const DoutPrefixProvider* dpp, optional_yield y); - int discard(const DoutPrefixProvider* dpp, optional_yield y); - int unwatch(const DoutPrefixProvider* dpp, optional_yield y); private: std::shared_ptr conn; std::string build_index(CacheBlock* block); + + template + int set_values(const DoutPrefixProvider* dpp, CacheBlock& block, Container& redisValues, optional_yield y); }; } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 3fc73f27579a7..d2d9834c3f078 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -950,9 +950,8 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) } } //delete entry from ordered set of objects, as older versions would have been written to the backend store - ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y, true); + ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y); if (ret < 0) { - blockDir->discard(dpp, y); ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue zrem for object entry: " << c_obj->get_name() << ", ret=" << ret << dendl; continue; } @@ -962,7 +961,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) .objName = c_obj->get_name(), .bucketName = c_obj->get_bucket()->get_bucket_id(), }; - ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true); + ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y); if (ret < 0) { ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl; continue; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index dafd2d9579ff4..61d55a4c10986 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -1200,7 +1200,10 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: //dirty objects if (dirty) { - auto ret = blockDir->set(dpp, &block, y); + auto redis_conn = this->driver->get_conn(); + rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn); + p.start(); + auto ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; return ret; @@ -1210,7 +1213,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: delete-obj with "null" as version-id deletes the latest version */ if (!(this->get_bucket()->versioned())) { block.cacheObj.objName = "_:null_" + this->get_name(); - ret = blockDir->set(dpp, &block, y); + ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl; return ret; @@ -1227,37 +1230,41 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: auto score = ceph::real_clock::to_double(mtime); ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: " << score << ret << dendl; rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir(); - ret = objDir->zadd(dpp, &object, score, object_version, y); + ret = objDir->zadd(dpp, &object, score, object_version, y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add version to ordered set with error: " << ret << dendl; - blockDir->discard(dpp, y); return ret; } //add an entry to ordered set containing objects for bucket listing, set score to 0 always to lexicographically order the objects rgw::d4n::BucketDirectory* bucketDir = this->driver->get_bucket_dir(); - ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, true); + ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl; return ret; } + p.execute(dpp, y); } else { //for clean/non-dirty objects rgw::d4n::CacheBlock latest = block; auto ret = blockDir->get(dpp, &latest, y); if (ret == -ENOENT) { if (!(this->get_bucket()->versioned())) { + auto redis_conn = this->driver->get_conn(); + rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn); + p.start(); //we can explore pipelining to send the two 'HSET' commands together - ret = blockDir->set(dpp, &block, y); + ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; return ret; } //bucket is non versioned, set a null instance block.cacheObj.objName = "_:null_" + this->get_name(); - ret = blockDir->set(dpp, &block, y); + ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl; return ret; } + p.execute(dpp, y); } } else if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl; @@ -1274,18 +1281,22 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: /* even if the head block is found, overwrite existing values with new version in case of non-versioned bucket, clean objects and versioned and non-versioned buckets dirty objects */ if (!(this->get_bucket()->versioned())) { - ret = blockDir->set(dpp, &block, y); + auto redis_conn = this->driver->get_conn(); + rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn); + p.start(); + ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; return ret; } //bucket is non versioned, set a null instance block.cacheObj.objName = "_:null_" + this->get_name(); - ret = blockDir->set(dpp, &block, y); + ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl; return ret; } + p.execute(dpp, y); }//end-if !(this->get_bucket()->versioned()) } //end-if ret = 0 } //end-else @@ -1339,8 +1350,9 @@ int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, o off_t lst = this->get_size(); ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Object size =" << lst << dendl; off_t fst = 0; + std::vector blocks; do { - rgw::d4n::CacheBlock block, existing_block; + rgw::d4n::CacheBlock block; if (fst >= lst){ break; } @@ -1348,40 +1360,29 @@ int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, o off_t cur_len = cur_size - fst; block.cacheObj.bucketName = this->get_bucket()->get_bucket_id(); block.cacheObj.objName = this->get_key().get_oid(); - block.cacheObj.dirty = dirty; - block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); - existing_block.cacheObj.objName = block.cacheObj.objName; - existing_block.cacheObj.bucketName = block.cacheObj.bucketName; - block.size = cur_len; block.blockID = fst; - block.version = version; - - /* Store block in directory */ - existing_block.blockID = block.blockID; - existing_block.size = block.size; - - int ret; - if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) { - if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? - block = existing_block; - block.version = version; - block.cacheObj.dirty = dirty; - } - block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); - - if ((ret = blockDir->set(dpp, &block, y)) < 0) { - ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; - return ret; - } - } else { - ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; - return ret; - } fst += cur_len; + blocks.emplace_back(block); } while(fst < lst); + auto ret = blockDir->get(dpp, blocks, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined get() method failed, ret=" << ret << dendl; + return ret; + } + + for (auto& block : blocks) { + block.cacheObj.dirty = dirty; + block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + block.version = version; + } + if ((ret = blockDir->set(dpp, blocks, y)) < 0) { + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method failed, ret=" << ret << dendl; + return ret; + } + return 0; } @@ -2170,7 +2171,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl std::string prefix = source->get_prefix(); std::string dest_prefix; - rgw::d4n::CacheBlock block, existing_block, dest_block; + rgw::d4n::CacheBlock block, dest_block; rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); auto policy = filter->get_policy_driver()->get_cache_policy(); auto cache_driver = filter->get_cache_driver(); @@ -2178,8 +2179,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl block.cacheObj.bucketName = source->get_bucket()->get_bucket_id(); std::stringstream s; block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_double(source->get_mtime())); - bool dirty = block.cacheObj.dirty = false; //Reading from the backend, data is clean - block.version = version; + bool dirty = false; //Reading from the backend, data is clean if (source->dest_object && source->dest_bucket) { D4NFilterObject* d4n_dest_object = dynamic_cast(source->dest_object); @@ -2193,10 +2193,6 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl dest_block.version = dest_version; } - //populating fields needed for building directory index - existing_block.cacheObj.objName = block.cacheObj.objName; - existing_block.cacheObj.bucketName = block.cacheObj.bucketName; - ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl; if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache @@ -2211,24 +2207,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (ret == 0) { std::string objEtag = ""; policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); - - /* Store block in directory */ - existing_block.blockID = block.blockID; - existing_block.size = block.size; - - if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) { - if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? - block = existing_block; - block.version = version; - } - - block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); - - if ((ret = blockDir->set(dpp, &block, *y)) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; - } else { //end -if blockDir->get - ldpp_dout(dpp, 20) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; - } + blocks.emplace_back(block); } else { ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl; } @@ -2245,9 +2224,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y); if (ret == 0) { policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); - if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) { - ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB:: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl; - } + dest_blocks.emplace_back(dest_block); } } } @@ -2261,24 +2238,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl ret = cache_driver->put(dpp, oid, bl, bl.length(), attrs, *y); if (ret == 0) { policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); - - /* Store block in directory */ - existing_block.blockID = block.blockID; - existing_block.size = block.size; - - if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) { - if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? - block = existing_block; - block.version = version; - } - - block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); - - if ((ret = blockDir->set(dpp, &block, *y)) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; - } else { - ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; - } + blocks.emplace_back(block); } else { ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl; } @@ -2295,9 +2255,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y); if (ret == 0) { policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); - if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl; - } + dest_blocks.emplace_back(dest_block); } } } @@ -2321,24 +2279,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl ret = cache_driver->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y); if (ret == 0) { policy->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y); - - /* Store block in directory */ - existing_block.blockID = block.blockID; - existing_block.size = block.size; - - if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) { - if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? - block = existing_block; - block.version = version; - } - - block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); - - if ((ret = blockDir->set(dpp, &block, *y)) < 0) - ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; - } else { - ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; - } + blocks.emplace_back(block); } else { ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl; } @@ -2358,9 +2299,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl ret = cache_driver->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y); if (ret == 0) { policy->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y); - if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl; - } + dest_blocks.emplace_back(dest_block); } } } @@ -2369,7 +2308,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl bl_rem = std::move(bl); }//bl_rem.length() } - } + if (last_part) { + auto ret = blockDir->get(dpp, blocks, *y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined get() method failed, ret=" << ret << dendl; + } + + for (auto& block : blocks) { + block.cacheObj.dirty = false; + block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + block.version = version; + } + if ((ret = blockDir->set(dpp, blocks, *y)) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method failed, ret=" << ret << dendl; + } + if (source->dest_object && source->dest_bucket) { + if ((ret = blockDir->set(dpp, dest_blocks, *y)) < 0) { + ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method for dest blocks failed, ret=" << ret << dendl; + } + } + }// if last_part + }//if write_to_cache /* Clean-up: 1. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different @@ -2520,7 +2479,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp return ret; } //delete entry from ordered set of objects - ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true); + ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl; return ret; @@ -2535,7 +2494,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp //delete entry from ordered set of versions std::string version = source->get_instance(); ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl; - ret = objDir->zrem(dpp, &dir_obj, version, y, true); + ret = objDir->zrem(dpp, &dir_obj, version, y); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl; return ret; @@ -2586,16 +2545,15 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp .bucketName = source->get_bucket()->get_bucket_id(), }; //delete entry from ordered set of object versions - ret = objDir->zrem(dpp, &dir_obj, "null", y, true); + ret = objDir->zrem(dpp, &dir_obj, "null", y); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl; return ret; } //delete entry from ordered set of objects - ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true); + ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl; - blockDir->discard(dpp, y); return ret; } } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 11325cb6858ac..1c823688dd8d1 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -89,6 +89,7 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::BucketDirectory* get_bucket_dir() { return bucketDir.get(); } rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); } void save_y(optional_yield y) { this->y = y; } + std::shared_ptr get_conn() { return conn; } void shutdown() override; }; @@ -161,6 +162,7 @@ class D4NFilterObject : public FilterObject { optional_yield* y; int part_num{0}, num_parts{0}; int len_sent = 0; + std::vector blocks, dest_blocks; public: D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter), diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 2778abe9ab115..2f7c436628e65 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -678,97 +678,6 @@ TEST_F(BlockDirectoryFixture, IncrYield) } } -TEST_F(BlockDirectoryFixture, MultiExecuteYield) -{ - boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { - { - boost::system::error_code ec; - { - request req; - response resp; - req.push("MULTI"); // Start transaction - conn->async_exec(req, resp, yield[ec]); - ASSERT_EQ((bool)ec, false); - std::cout << "MULTI value: " << std::get<0>(resp).value() << std::endl; - } - { - request req; - response resp; - req.push("SET", "key1", "value1"); // Command 1 - conn->async_exec(req, resp, yield[ec]); - ASSERT_EQ((bool)ec, false); - std::cout << "SET value: " << std::get<0>(resp).value() << std::endl; - } - { - request req; - response resp; - req.push("SET", "key2", "value2"); // Command 2 - conn->async_exec(req, resp, yield[ec]); - ASSERT_EQ((bool)ec, false); - std::cout << "SET value: " << std::get<0>(resp).value() << std::endl; - } - { - request req; - response resp; - req.push("ZADD", "key4", "1", "v1"); // Command 3 - conn->async_exec(req, resp, yield[ec]); - ASSERT_EQ((bool)ec, false); - std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl; - } - { - request req; - /* string as response here as the command is only getting queued, not executed - if response type is changed to int then the operation fails */ - response resp; - req.push("DEL", "key3"); // Command 4 - conn->async_exec(req, resp, yield[ec]); - ASSERT_EQ((bool)ec, false); - std::cout << "DEL value: " << std::get<0>(resp).value() << std::endl; - } - { - request req; - req.push("EXEC"); // Execute transaction - - boost::redis::generic_response resp; - conn->async_exec(req, resp, yield[ec]); - ASSERT_EQ((bool)ec, false); - for (uint64_t i = 0; i < resp.value().size(); i++) { - std::cout << "EXEC: " << resp.value().front().value << std::endl; - boost::redis::consume_one(resp); - } - } - } - //test multi/exec using directory methods - { - ASSERT_EQ(0, dir->multi(env->dpp, optional_yield{yield})); - ASSERT_EQ(0, dir->set(env->dpp, block, yield)); - block->cacheObj.objName = "testBlockNew"; - ASSERT_EQ(0, dir->set(env->dpp, block, yield)); - block->cacheObj.objName = "testBlockA"; - ASSERT_EQ(0, dir->del(env->dpp, block, yield, true)); - block->cacheObj.objName = "testBlockB"; - ASSERT_EQ(0, dir->zadd(env->dpp, block, 100, "version1", yield, true)); - std::vector responses; - ASSERT_EQ(0, dir->exec(env->dpp, responses, optional_yield{yield})); - for (auto r : responses) { - std::cout << "EXEC: " << r << std::endl; - } - } - { - boost::system::error_code ec; - request req; - req.push("FLUSHALL"); - response resp; - - conn->async_exec(req, resp, yield[ec]); - } - - conn->cancel(); - }, rethrow); - - io.run(); -} - TEST_F(BlockDirectoryFixture, ZScan) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { -- 2.39.5