+#include <algorithm>
#include <boost/asio/consign.hpp>
#include <boost/algorithm/string.hpp>
+#include <memory>
#include "common/async/blocked_completion.h"
#include "common/dout.h"
#include "d4n_directory.h"
}
}
+template <typename... Types>
+void redis_exec_cp(std::shared_ptr<rgw::d4n::RedisPool> pool,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::response<Types...>& resp,
+ optional_yield y)
+{
+//purpose: Execute a Redis command using a connection from the pool
+ std::shared_ptr<connection> conn = pool->acquire();
+ try {
+
+ if (y) {
+ auto yield = y.get_yield_context();
+ async_exec(conn, req, resp, yield[ec]);
+ } else {
+ async_exec(conn, req, resp, ceph::async::use_blocked[ec]);
+ }
+ } catch (const std::exception& e) {
+ //release the connection upon exception
+ pool->release(conn);
+ throw;
+ }
+ //release the connection back to the pool after execution
+ pool->release(conn);
+}
+
void redis_exec(std::shared_ptr<connection> conn,
boost::system::error_code& ec,
const boost::redis::request& req,
- boost::redis::generic_response& resp, optional_yield y)
+ boost::redis::generic_response& resp, optional_yield y)
{
if (y) {
auto yield = y.get_yield_context();
}
}
+void redis_exec_cp(std::shared_ptr<rgw::d4n::RedisPool> pool,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::generic_response& resp, optional_yield y)
+{
+ //purpose: Execute a Redis command using a connection from the pool
+ std::shared_ptr<connection> conn = pool->acquire();
+
+ try {
+ if (y) {
+ auto yield = y.get_yield_context();
+ async_exec(conn, req, resp, yield[ec]);
+ } else {
+ async_exec(conn, req, resp, ceph::async::use_blocked[ec]);
+ }
+ } catch (const std::exception& e) {
+ pool->release(conn);
+ throw;
+ }
+ //release the connection back to the pool after execution
+ pool->release(conn);
+}
+
int check_bool(std::string str) {
if (str == "true" || str == "1") {
return 1;
}
}
+void redis_exec_connection_pool(const DoutPrefixProvider* dpp,
+ std::shared_ptr<RedisPool> redis_pool,
+ std::shared_ptr<connection> conn,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::generic_response& resp,
+ optional_yield y)
+{
+ if(!redis_pool)[[unlikely]]
+ {
+ redis_exec(conn, ec, req, resp, y);
+ ldpp_dout(dpp, 0) << "Directory::" << __func__ << " not using connection-pool, it's using the shared connection " << dendl;
+ }
+ else[[likely]]
+ redis_exec_cp(redis_pool, ec, req, resp, y);
+}
+
+template <typename... Types>
+void redis_exec_connection_pool(const DoutPrefixProvider* dpp,
+ std::shared_ptr<RedisPool> redis_pool,
+ std::shared_ptr<connection> conn,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::response<Types...>& resp,
+ optional_yield y)
+{
+ if(!redis_pool)[[unlikely]]
+ {
+ redis_exec(conn, ec, req, resp, y);
+ ldpp_dout(dpp, 0) << "Directory::" << __func__ << " not using connection-pool, it's using the shared connection " << dendl;
+ }
+ else[[likely]]
+ redis_exec_cp(redis_pool, ec, req, resp, y);
+}
+
int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi)
{
try {
req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZREM", bucket_id, member);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
}
response<std::vector<std::string> > resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZSCAN", bucket_id, cursor, "MATCH", pattern, "COUNT", count);
boost::redis::generic_response resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZRANK", bucket_id, member);
response<int> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("EXISTS", key);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push_range("HSET", key, redisValues);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push_range("HMGET", key, fields);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
req.push("EXEC");
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("DEL", key);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (!std::get<0>(resp).value()) {
ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values deleted." << dendl;
request req;
req.push("HGET", key, field);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("HSET", key, field, value);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZADD", key, "CH", std::to_string(score), member);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
response<std::vector<std::string> > resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZREVRANGE", key, start, stop);
response<std::vector<std::string> > resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZREM", key, member);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZREMRANGEBYSCORE", key, std::to_string(min), std::to_string(max));
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("INCR", key);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZRANK", key, member);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("EXISTS", key);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push_range("HSET", key, redisValues);
- redis_exec(conn, ec, req, resp, y);
- if (ec) {
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+
+ if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
try {
boost::system::error_code ec;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push_range("HMGET", key, fields);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
req.push("EXEC");
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("DEL", key);
if (!multi) {
response<int> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+
if (!std::get<0>(resp).value()) {
ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
return -ENOENT;
}
} else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
}
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("HGET", key, field);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("HSET", key, field, value);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("HGET", key, "hosts");
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
request req;
req.push("HSET", key, "hosts", value);
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZADD", key, "CH", std::to_string(score), member);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
response<std::vector<std::string> > resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop));
response<std::vector<std::string> > resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("ZREM", key, member);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("WATCH", key);
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("EXEC");
boost::redis::generic_response resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("MULTI");
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("DISCARD");
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
req.push("UNWATCH");
response<std::string> resp;
- redis_exec(conn, ec, req, resp, y);
+ redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;