redis_exec_cp(redis_pool, ec, req, resp, y);
}
-int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi)
+int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline)
{
try {
boost::system::error_code ec;
- request req;
- req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
+ if (pipeline && pipeline->is_pipeline()) {
+ request& req = pipeline->get_request();
+ req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
+ } else {
+ request req;
+ req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
response<std::string> resp;
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
- if (ec) {
- ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
- return -ec.value();
- }
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
- if (!multi) {
if (std::get<0>(resp).value() != "1") {
ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
return -ENOENT;
}
}
-
} catch (std::exception &e) {
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
-int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi)
+int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y)
{
try {
boost::system::error_code ec;
return -ec.value();
}
- if (!multi) {
- if (std::get<0>(resp).value() != "1") {
- ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
- return -ENOENT;
- }
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -ENOENT;
}
} catch (std::exception &e) {
return ret;
}
-int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi)
+int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, Pipeline* pipeline)
{
std::string key = build_index(object);
try {
boost::system::error_code ec;
- request req;
- req.push("ZADD", key, "CH", std::to_string(score), member);
+ if (pipeline && pipeline->is_pipeline()) {
+ request& req = pipeline->get_request();
+ req.push("ZADD", key, "CH", std::to_string(score), member);
+ } else {
+ request req;
+ req.push("ZADD", key, "CH", std::to_string(score), member);
- response<std::string> resp;
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+ response<std::string> resp;
+ if(!redis_pool)[[unlikely]]
+ {
+ redis_exec(conn, ec, req, resp, y);
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
+ }
+ else[[likely]]
+ redis_exec_cp(redis_pool, ec, req, resp, y);
- if (ec) {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
- return -ec.value();
- }
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
- if (!multi) {
if (std::get<0>(resp).value() != "1") {
ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
return -ENOENT;
}
}
-
} catch (std::exception &e) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
return 0;
}
-int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi)
+int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y)
{
std::string key = build_index(object);
try {
return -ec.value();
}
- if (!multi) {
- if (std::get<0>(resp).value() != "1") {
- ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
- return -ENOENT;
- }
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -ENOENT;
}
} catch (std::exception &e) {
return 0;
}
-int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi)
+int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y)
{
std::string key = build_index(object);
try {
return -ec.value();
}
- if (!multi) {
- if (std::get<0>(resp).value() == "0") {
- ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
- return -ENOENT;
- }
+ if (std::get<0>(resp).value() == "0") {
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
+ return -ENOENT;
}
} catch (std::exception &e) {
return std::get<0>(resp).value();
}
-int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
+template<SeqContainer Container>
+int BlockDirectory::set_values(const DoutPrefixProvider* dpp, CacheBlock& block, Container& redisValues, optional_yield y)
{
- /* For existing keys, call get method beforehand.
- Sets completely overwrite existing values. */
- std::string key = build_index(block);
- ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
-
std::string hosts;
- std::list<std::string> redisValues;
-
/* Creating a redisValues of the entry's properties */
redisValues.push_back("blockID");
- redisValues.push_back(std::to_string(block->blockID));
+ redisValues.push_back(std::to_string(block.blockID));
redisValues.push_back("version");
- redisValues.push_back(block->version);
+ redisValues.push_back(block.version);
redisValues.push_back("deleteMarker");
int ret = -1;
- if ((ret = check_bool(std::to_string(block->deleteMarker))) != -EINVAL) {
- block->deleteMarker = (ret != 0);
+ if ((ret = check_bool(std::to_string(block.deleteMarker))) != -EINVAL) {
+ block.deleteMarker = (ret != 0);
} else {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value for delete marker" << dendl;
return -EINVAL;
}
- redisValues.push_back(std::to_string(block->deleteMarker));
+ redisValues.push_back(std::to_string(block.deleteMarker));
redisValues.push_back("size");
- redisValues.push_back(std::to_string(block->size));
+ redisValues.push_back(std::to_string(block.size));
redisValues.push_back("globalWeight");
- redisValues.push_back(std::to_string(block->globalWeight));
+ redisValues.push_back(std::to_string(block.globalWeight));
redisValues.push_back("objName");
- redisValues.push_back(block->cacheObj.objName);
+ redisValues.push_back(block.cacheObj.objName);
redisValues.push_back("bucketName");
- redisValues.push_back(block->cacheObj.bucketName);
+ redisValues.push_back(block.cacheObj.bucketName);
redisValues.push_back("creationTime");
- redisValues.push_back(block->cacheObj.creationTime);
+ redisValues.push_back(block.cacheObj.creationTime);
redisValues.push_back("dirty");
- if ((ret = check_bool(std::to_string(block->cacheObj.dirty))) != -EINVAL) {
- block->cacheObj.dirty = (ret != 0);
+ if ((ret = check_bool(std::to_string(block.cacheObj.dirty))) != -EINVAL) {
+ block.cacheObj.dirty = (ret != 0);
} else {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl;
return -EINVAL;
}
- redisValues.push_back(std::to_string(block->cacheObj.dirty));
+ redisValues.push_back(std::to_string(block.cacheObj.dirty));
redisValues.push_back("hosts");
-
+
hosts.clear();
- for (auto const& host : block->cacheObj.hostsList) {
+ for (auto const& host : block.cacheObj.hostsList) {
if (hosts.empty())
hosts = host + "_";
else
redisValues.push_back(hosts);
redisValues.push_back("etag");
- redisValues.push_back(block->cacheObj.etag);
+ redisValues.push_back(block.cacheObj.etag);
redisValues.push_back("objSize");
- redisValues.push_back(std::to_string(block->cacheObj.size));
+ redisValues.push_back(std::to_string(block.cacheObj.size));
redisValues.push_back("userId");
- redisValues.push_back(block->cacheObj.user_id);
+ redisValues.push_back(block.cacheObj.user_id);
redisValues.push_back("displayName");
- redisValues.push_back(block->cacheObj.display_name);
+ redisValues.push_back(block.cacheObj.display_name);
+
+ return 0;
+}
+
+int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, Pipeline* pipeline)
+{
+ /* For existing keys, call get method beforehand.
+ Sets completely overwrite existing values. */
+ std::string key = build_index(block);
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+ std::vector<std::string> redisValues;
+
+ auto ret = set_values(dpp, *block, redisValues, y);
+ if (ret < 0) {
+ return ret;
+ }
try {
boost::system::error_code ec;
response<ignore_t> resp;
- request req;
+ if (pipeline && pipeline->is_pipeline()) {
+ request& req = pipeline->get_request();
+ req.push_range("HSET", key, redisValues);
+ } else {
+ request req;
+ req.push_range("HSET", key, redisValues);
+
+ if(!redis_pool)[[unlikely]]
+ {
+ redis_exec(conn, ec, req, resp, y);
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
+ }
+ else[[likely]]
+ redis_exec_cp(redis_pool, ec, req, resp, y);
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BlockDirectory::set(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
+{
+ request req;
+ for (auto block : blocks) {
+ std::string key = build_index(&block);
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+ //std::string hosts;
+ std::list<std::string> redisValues;
+ auto ret = set_values(dpp, block, redisValues, y);
+ if (ret < 0) {
+ return ret;
+ }
req.push_range("HSET", key, redisValues);
+ }
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
- if (ec) {
+ try {
+ boost::system::error_code ec;
+ boost::redis::generic_response resp;
+ redis_exec(conn, ec, req, resp, y);
+ if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
{
request req;
- redis_response<100, std::optional<std::vector<std::string>>>::type resp;
+ typename redis_response<N, std::optional<std::vector<std::string>>>::type resp;
for (auto block : blocks) {
std::string key = build_index(&block);
std::vector<std::string> fields;
}
}
-int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi)
+int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
{
std::string key = build_index(block);
ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
boost::system::error_code ec;
request req;
req.push("DEL", key);
- if (!multi) {
- response<int> resp;
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
- if (!std::get<0>(resp).value()) {
- ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
- return -ENOENT;
- }
- } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string
- response<std::string> resp;
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+ response<int> resp;
+ if(!redis_pool)[[unlikely]]
+ {
+ redis_exec(conn, ec, req, resp, y);
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
+ }
+ else[[likely]]
+ redis_exec_cp(redis_pool, ec, req, resp, y);
+ if (!std::get<0>(resp).value()) {
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
+ return -ENOENT;
}
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return 0;
}
-int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi)
+int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y)
{
std::string key = build_index(block);
try {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
- if (!multi) {
- if (std::get<0>(resp).value() != "1") {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
- return -EINVAL;
- }
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
}
} catch (std::exception &e) {
return 0;
}
-int BlockDirectory::watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
-{
- std::string key = build_index(block);
- try {
- boost::system::error_code ec;
- request req;
- req.push("WATCH", key);
- response<std::string> resp;
-
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
- if (ec) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
- return -ec.value();
- }
-
- if (std::get<0>(resp).value() != "OK") {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
- return -EINVAL;
- }
-
- } catch (std::exception &e) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- return 0;
-}
-
-int BlockDirectory::exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y)
-{
- try {
- boost::system::error_code ec;
- request req;
- req.push("EXEC");
- boost::redis::generic_response resp;
-
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
- if (ec) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
- std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl;
- return -ec.value();
- }
-
- for (uint64_t i = 0; i < resp.value().size(); i++) {
- ldpp_dout(dpp, 20) << "BlockDirectory::" << __func__ << "() MULTI: " << resp.value().front().value << dendl;
- responses.emplace_back(resp.value().front().value);
- boost::redis::consume_one(resp);
- }
-
- } catch (std::exception &e) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
- std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << std::endl;
- return -EINVAL;
- }
-
- return 0;
-}
-
-int BlockDirectory::multi(const DoutPrefixProvider* dpp, optional_yield y)
-{
- try {
- boost::system::error_code ec;
- request req;
- req.push("MULTI");
- response<std::string> resp;
-
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
- if (ec) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
- return -ec.value();
- }
-
- if (std::get<0>(resp).value() != "OK") {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
- return -EINVAL;
- }
-
- } catch (std::exception &e) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- return 0;
-}
-
-int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y)
-{
- try {
- boost::system::error_code ec;
- request req;
- req.push("DISCARD");
- response<std::string> resp;
-
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
- if (ec) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
- return -ec.value();
- }
-
- if (std::get<0>(resp).value() != "OK") {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
- return -EINVAL;
- }
-
- } catch (std::exception &e) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- return 0;
-}
-
-int BlockDirectory::unwatch(const DoutPrefixProvider* dpp, optional_yield y)
+int Pipeline::execute(const DoutPrefixProvider* dpp, optional_yield y)
{
+ boost::redis::generic_response resp;
try {
boost::system::error_code ec;
- request req;
- req.push("UNWATCH");
- response<std::string> resp;
-
- redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+ pipeline_mode = false;
+ redis_exec(conn, ec, req, resp, y);
if (ec) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
-
- if (std::get<0>(resp).value() != "OK") {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
- return -EINVAL;
- }
-
} catch (std::exception &e) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
-
return 0;
}
#include <condition_variable>
#include <deque>
#include <memory>
+#include <concepts>
namespace rgw { namespace d4n {
+template<typename T>
+ concept SeqContainer = requires(T& t, typename T::value_type v) {
+ t.push_back(v);
+ };
+
using boost::redis::connection;
class RedisPool {
public:
Directory() {}
};
+class Pipeline {
+ public:
+ Pipeline(std::shared_ptr<connection>& conn) : conn(conn) {}
+ void start() { pipeline_mode = true; }
+ //executes all commands and sets pipeline mode to false
+ int execute(const DoutPrefixProvider* dpp, optional_yield y);
+ bool is_pipeline() { return pipeline_mode; }
+ request& get_request() { return req; }
+
+ private:
+ std::shared_ptr<connection> conn;
+ request req;
+ bool pipeline_mode{false};
+};
+
class BucketDirectory: public Directory {
public:
BucketDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
- int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi=false);
- int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi=false);
+ int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr);
+ int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y);
int zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector<std::string>& members, optional_yield y);
int zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector<std::string>& members, uint64_t next_cursor, optional_yield y);
int zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y);
int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y);
- int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi=false);
+ int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr);
int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& start, const std::string& stop, std::vector<std::string>& members, optional_yield y);
- int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi=false);
- int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi=false);
+ int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y);
+ int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y);
int zrank(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, std::string& index, optional_yield y);
//Return value is the incremented value, else return error
int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
int exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
- int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+ //Pipelined version of set
+ int set(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
+ int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, Pipeline* pipeline=nullptr);
int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
//Pipelined version of get using boost::redis::response for list bucket
template <size_t N = 100>
//Pipelined version of get using boost::redis::generic_response
int get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
- int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false);
+ int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
- int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi=false);
+ int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y);
int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y);
- int watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
- //Move MULTI, EXEC and DISCARD to directory? As they do not operate on a key
- int exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y);
- int multi(const DoutPrefixProvider* dpp, optional_yield y);
- int discard(const DoutPrefixProvider* dpp, optional_yield y);
- int unwatch(const DoutPrefixProvider* dpp, optional_yield y);
private:
std::shared_ptr<connection> conn;
std::string build_index(CacheBlock* block);
+
+ template<SeqContainer Container>
+ int set_values(const DoutPrefixProvider* dpp, CacheBlock& block, Container& redisValues, optional_yield y);
};
} } // namespace rgw::d4n
}
}
//delete entry from ordered set of objects, as older versions would have been written to the backend store
- ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y, true);
+ ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y);
if (ret < 0) {
- blockDir->discard(dpp, y);
ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue zrem for object entry: " << c_obj->get_name() << ", ret=" << ret << dendl;
continue;
}
.objName = c_obj->get_name(),
.bucketName = c_obj->get_bucket()->get_bucket_id(),
};
- ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+ ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
continue;
//dirty objects
if (dirty) {
- auto ret = blockDir->set(dpp, &block, y);
+ auto redis_conn = this->driver->get_conn();
+ rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+ p.start();
+ auto ret = blockDir->set(dpp, &block, y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
return ret;
delete-obj with "null" as version-id deletes the latest version */
if (!(this->get_bucket()->versioned())) {
block.cacheObj.objName = "_:null_" + this->get_name();
- ret = blockDir->set(dpp, &block, y);
+ ret = blockDir->set(dpp, &block, y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
return ret;
auto score = ceph::real_clock::to_double(mtime);
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: " << score << ret << dendl;
rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir();
- ret = objDir->zadd(dpp, &object, score, object_version, y);
+ ret = objDir->zadd(dpp, &object, score, object_version, y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add version to ordered set with error: " << ret << dendl;
- blockDir->discard(dpp, y);
return ret;
}
//add an entry to ordered set containing objects for bucket listing, set score to 0 always to lexicographically order the objects
rgw::d4n::BucketDirectory* bucketDir = this->driver->get_bucket_dir();
- ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, true);
+ ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl;
return ret;
}
+ p.execute(dpp, y);
} else { //for clean/non-dirty objects
rgw::d4n::CacheBlock latest = block;
auto ret = blockDir->get(dpp, &latest, y);
if (ret == -ENOENT) {
if (!(this->get_bucket()->versioned())) {
+ auto redis_conn = this->driver->get_conn();
+ rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+ p.start();
//we can explore pipelining to send the two 'HSET' commands together
- ret = blockDir->set(dpp, &block, y);
+ ret = blockDir->set(dpp, &block, y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
return ret;
}
//bucket is non versioned, set a null instance
block.cacheObj.objName = "_:null_" + this->get_name();
- ret = blockDir->set(dpp, &block, y);
+ ret = blockDir->set(dpp, &block, y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
return ret;
}
+ p.execute(dpp, y);
}
} else if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl;
/* even if the head block is found, overwrite existing values with new version in case of non-versioned bucket, clean objects
and versioned and non-versioned buckets dirty objects */
if (!(this->get_bucket()->versioned())) {
- ret = blockDir->set(dpp, &block, y);
+ auto redis_conn = this->driver->get_conn();
+ rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+ p.start();
+ ret = blockDir->set(dpp, &block, y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
return ret;
}
//bucket is non versioned, set a null instance
block.cacheObj.objName = "_:null_" + this->get_name();
- ret = blockDir->set(dpp, &block, y);
+ ret = blockDir->set(dpp, &block, y, &p);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
return ret;
}
+ p.execute(dpp, y);
}//end-if !(this->get_bucket()->versioned())
} //end-if ret = 0
} //end-else
off_t lst = this->get_size();
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Object size =" << lst << dendl;
off_t fst = 0;
+ std::vector<rgw::d4n::CacheBlock> blocks;
do {
- rgw::d4n::CacheBlock block, existing_block;
+ rgw::d4n::CacheBlock block;
if (fst >= lst){
break;
}
off_t cur_len = cur_size - fst;
block.cacheObj.bucketName = this->get_bucket()->get_bucket_id();
block.cacheObj.objName = this->get_key().get_oid();
- block.cacheObj.dirty = dirty;
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
- existing_block.cacheObj.objName = block.cacheObj.objName;
- existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
-
block.size = cur_len;
block.blockID = fst;
- block.version = version;
-
- /* Store block in directory */
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
-
- int ret;
- if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) {
- if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
- block = existing_block;
- block.version = version;
- block.cacheObj.dirty = dirty;
- }
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
- if ((ret = blockDir->set(dpp, &block, y)) < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
- return ret;
- }
- } else {
- ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
- return ret;
- }
fst += cur_len;
+ blocks.emplace_back(block);
} while(fst < lst);
+ auto ret = blockDir->get(dpp, blocks, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined get() method failed, ret=" << ret << dendl;
+ return ret;
+ }
+
+ for (auto& block : blocks) {
+ block.cacheObj.dirty = dirty;
+ block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ block.version = version;
+ }
+ if ((ret = blockDir->set(dpp, blocks, y)) < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method failed, ret=" << ret << dendl;
+ return ret;
+ }
+
return 0;
}
std::string prefix = source->get_prefix();
std::string dest_prefix;
- rgw::d4n::CacheBlock block, existing_block, dest_block;
+ rgw::d4n::CacheBlock block, dest_block;
rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
auto policy = filter->get_policy_driver()->get_cache_policy();
auto cache_driver = filter->get_cache_driver();
block.cacheObj.bucketName = source->get_bucket()->get_bucket_id();
std::stringstream s;
block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_double(source->get_mtime()));
- bool dirty = block.cacheObj.dirty = false; //Reading from the backend, data is clean
- block.version = version;
+ bool dirty = false; //Reading from the backend, data is clean
if (source->dest_object && source->dest_bucket) {
D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
dest_block.version = dest_version;
}
- //populating fields needed for building directory index
- existing_block.cacheObj.objName = block.cacheObj.objName;
- existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
-
ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
if (ret == 0) {
std::string objEtag = "";
policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
-
- /* Store block in directory */
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
-
- if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
- if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
- block = existing_block;
- block.version = version;
- }
-
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
- if ((ret = blockDir->set(dpp, &block, *y)) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
- } else { //end -if blockDir->get
- ldpp_dout(dpp, 20) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
- }
+ blocks.emplace_back(block);
} else {
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
}
ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
- if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB:: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
- }
+ dest_blocks.emplace_back(dest_block);
}
}
}
ret = cache_driver->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
-
- /* Store block in directory */
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
-
- if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
- if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
- block = existing_block;
- block.version = version;
- }
-
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
- if ((ret = blockDir->set(dpp, &block, *y)) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
- }
+ blocks.emplace_back(block);
} else {
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
}
ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
- if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
- }
+ dest_blocks.emplace_back(dest_block);
}
}
}
ret = cache_driver->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
policy->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
-
- /* Store block in directory */
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
-
- if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
- if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
- block = existing_block;
- block.version = version;
- }
-
- block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
- if ((ret = blockDir->set(dpp, &block, *y)) < 0)
- ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
- } else {
- ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
- }
+ blocks.emplace_back(block);
} else {
ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
}
ret = cache_driver->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
policy->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
- if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
- }
+ dest_blocks.emplace_back(dest_block);
}
}
}
bl_rem = std::move(bl);
}//bl_rem.length()
}
- }
+ if (last_part) {
+ auto ret = blockDir->get(dpp, blocks, *y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined get() method failed, ret=" << ret << dendl;
+ }
+
+ for (auto& block : blocks) {
+ block.cacheObj.dirty = false;
+ block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ block.version = version;
+ }
+ if ((ret = blockDir->set(dpp, blocks, *y)) < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method failed, ret=" << ret << dendl;
+ }
+ if (source->dest_object && source->dest_bucket) {
+ if ((ret = blockDir->set(dpp, dest_blocks, *y)) < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method for dest blocks failed, ret=" << ret << dendl;
+ }
+ }
+ }// if last_part
+ }//if write_to_cache
/* Clean-up:
1. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
return ret;
}
//delete entry from ordered set of objects
- ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true);
+ ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl;
return ret;
//delete entry from ordered set of versions
std::string version = source->get_instance();
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl;
- ret = objDir->zrem(dpp, &dir_obj, version, y, true);
+ ret = objDir->zrem(dpp, &dir_obj, version, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
return ret;
.bucketName = source->get_bucket()->get_bucket_id(),
};
//delete entry from ordered set of object versions
- ret = objDir->zrem(dpp, &dir_obj, "null", y, true);
+ ret = objDir->zrem(dpp, &dir_obj, "null", y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
return ret;
}
//delete entry from ordered set of objects
- ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true);
+ ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl;
- blockDir->discard(dpp, y);
return ret;
}
}
rgw::d4n::BucketDirectory* get_bucket_dir() { return bucketDir.get(); }
rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); }
void save_y(optional_yield y) { this->y = y; }
+ std::shared_ptr<connection> get_conn() { return conn; }
void shutdown() override;
};
optional_yield* y;
int part_num{0}, num_parts{0};
int len_sent = 0;
+ std::vector<rgw::d4n::CacheBlock> blocks, dest_blocks;
public:
D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter),
}
}
-TEST_F(BlockDirectoryFixture, MultiExecuteYield)
-{
- boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
- {
- boost::system::error_code ec;
- {
- request req;
- response<std::string> resp;
- req.push("MULTI"); // Start transaction
- conn->async_exec(req, resp, yield[ec]);
- ASSERT_EQ((bool)ec, false);
- std::cout << "MULTI value: " << std::get<0>(resp).value() << std::endl;
- }
- {
- request req;
- response<std::string> resp;
- req.push("SET", "key1", "value1"); // Command 1
- conn->async_exec(req, resp, yield[ec]);
- ASSERT_EQ((bool)ec, false);
- std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
- }
- {
- request req;
- response<std::string> resp;
- req.push("SET", "key2", "value2"); // Command 2
- conn->async_exec(req, resp, yield[ec]);
- ASSERT_EQ((bool)ec, false);
- std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
- }
- {
- request req;
- response<std::string> resp;
- req.push("ZADD", "key4", "1", "v1"); // Command 3
- conn->async_exec(req, resp, yield[ec]);
- ASSERT_EQ((bool)ec, false);
- std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl;
- }
- {
- request req;
- /* string as response here as the command is only getting queued, not executed
- if response type is changed to int then the operation fails */
- response<std::string> resp;
- req.push("DEL", "key3"); // Command 4
- conn->async_exec(req, resp, yield[ec]);
- ASSERT_EQ((bool)ec, false);
- std::cout << "DEL value: " << std::get<0>(resp).value() << std::endl;
- }
- {
- request req;
- req.push("EXEC"); // Execute transaction
-
- boost::redis::generic_response resp;
- conn->async_exec(req, resp, yield[ec]);
- ASSERT_EQ((bool)ec, false);
- for (uint64_t i = 0; i < resp.value().size(); i++) {
- std::cout << "EXEC: " << resp.value().front().value << std::endl;
- boost::redis::consume_one(resp);
- }
- }
- }
- //test multi/exec using directory methods
- {
- ASSERT_EQ(0, dir->multi(env->dpp, optional_yield{yield}));
- ASSERT_EQ(0, dir->set(env->dpp, block, yield));
- block->cacheObj.objName = "testBlockNew";
- ASSERT_EQ(0, dir->set(env->dpp, block, yield));
- block->cacheObj.objName = "testBlockA";
- ASSERT_EQ(0, dir->del(env->dpp, block, yield, true));
- block->cacheObj.objName = "testBlockB";
- ASSERT_EQ(0, dir->zadd(env->dpp, block, 100, "version1", yield, true));
- std::vector<std::string> responses;
- ASSERT_EQ(0, dir->exec(env->dpp, responses, optional_yield{yield}));
- for (auto r : responses) {
- std::cout << "EXEC: " << r << std::endl;
- }
- }
- {
- boost::system::error_code ec;
- request req;
- req.push("FLUSHALL");
- response<boost::redis::ignore_t> resp;
-
- conn->async_exec(req, resp, yield[ec]);
- }
-
- conn->cancel();
- }, rethrow);
-
- io.run();
-}
-
TEST_F(BlockDirectoryFixture, ZScan)
{
boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {