From 1ac08d30d1913fd99722689fa3cc1a0a5b73bd13 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Wed, 11 Jun 2025 21:05:03 +0300 Subject: [PATCH] the current connection setup is single and shared connection, the strand on that single connection may cause a serialization. it should be noted that per s3-request there are several redis-operation that may run on co-routine. the redis-connection pool implement the guarded acquire/release APIs. adding configuration : rgw_redis_connection_pool_size. re-factor of redis-exec* function. shared pointer for Redis connection pool adding branch predication optimization for redis-pool/single-shared-connection condition adding a warning-report-method for the case there is a blocking state upon empty connection pool. Signed-off-by: galsalomon66 --- src/common/options/rgw.yaml.in | 13 +++ src/rgw/driver/d4n/d4n_directory.cc | 171 +++++++++++++++++++++------- src/rgw/driver/d4n/d4n_directory.h | 77 +++++++++++++ src/rgw/driver/d4n/rgw_sal_d4n.cc | 14 +++ src/rgw/driver/d4n/rgw_sal_d4n.h | 3 + 5 files changed, 237 insertions(+), 41 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 3ce9118f6f8..83be490b704 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -4498,3 +4498,16 @@ options: default: false services: - rgw +- name: rgw_redis_connection_pool_size + type: int + level: basic + desc: RGW connection pool size for Redis operation per D4N + long_desc: This option sets the size of the connection pool for Redis operations + in D4N. It is used to manage the number of concurrent connections to Redis. + A larger pool size can improve performance when multiple threads are accessing + Redis simultaneously, but it also increases resource usage. + fmt_desc: The size of the redis connection pool. + default: 512 + services: + - rgw + with_legacy: true diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 5600f34812a..52f01a0edcc 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -1,5 +1,7 @@ +#include #include #include +#include #include "common/async/blocked_completion.h" #include "common/dout.h" #include "d4n_directory.h" @@ -48,10 +50,36 @@ void redis_exec(std::shared_ptr conn, } } +template +void redis_exec_cp(std::shared_ptr pool, + boost::system::error_code& ec, + const boost::redis::request& req, + boost::redis::response& resp, + optional_yield y) +{ +//purpose: Execute a Redis command using a connection from the pool + std::shared_ptr conn = pool->acquire(); + try { + + if (y) { + auto yield = y.get_yield_context(); + async_exec(conn, req, resp, yield[ec]); + } else { + async_exec(conn, req, resp, ceph::async::use_blocked[ec]); + } + } catch (const std::exception& e) { + //release the connection upon exception + pool->release(conn); + throw; + } + //release the connection back to the pool after execution + pool->release(conn); +} + void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, const boost::redis::request& req, - boost::redis::generic_response& resp, optional_yield y) + boost::redis::generic_response& resp, optional_yield y) { if (y) { auto yield = y.get_yield_context(); @@ -61,6 +89,29 @@ void redis_exec(std::shared_ptr conn, } } +void redis_exec_cp(std::shared_ptr pool, + boost::system::error_code& ec, + const boost::redis::request& req, + boost::redis::generic_response& resp, optional_yield y) +{ + //purpose: Execute a Redis command using a connection from the pool + std::shared_ptr conn = pool->acquire(); + + try { + if (y) { + auto yield = y.get_yield_context(); + async_exec(conn, req, resp, yield[ec]); + } else { + async_exec(conn, req, resp, ceph::async::use_blocked[ec]); + } + } catch (const std::exception& e) { + pool->release(conn); + throw; + } + //release the connection back to the pool after execution + pool->release(conn); +} + int check_bool(std::string str) { if (str == "true" || str == "1") { return 1; @@ -71,6 +122,41 @@ int check_bool(std::string str) { } } +void redis_exec_connection_pool(const DoutPrefixProvider* dpp, + std::shared_ptr redis_pool, + std::shared_ptr conn, + boost::system::error_code& ec, + const boost::redis::request& req, + boost::redis::generic_response& resp, + optional_yield y) +{ + if(!redis_pool)[[unlikely]] + { + redis_exec(conn, ec, req, resp, y); + ldpp_dout(dpp, 0) << "Directory::" << __func__ << " not using connection-pool, it's using the shared connection " << dendl; + } + else[[likely]] + redis_exec_cp(redis_pool, ec, req, resp, y); +} + +template +void redis_exec_connection_pool(const DoutPrefixProvider* dpp, + std::shared_ptr redis_pool, + std::shared_ptr conn, + boost::system::error_code& ec, + const boost::redis::request& req, + boost::redis::response& resp, + optional_yield y) +{ + if(!redis_pool)[[unlikely]] + { + redis_exec(conn, ec, req, resp, y); + ldpp_dout(dpp, 0) << "Directory::" << __func__ << " not using connection-pool, it's using the shared connection " << dendl; + } + else[[likely]] + redis_exec_cp(redis_pool, ec, req, resp, y); +} + int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi) { try { @@ -79,7 +165,8 @@ int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& buck req.push("ZADD", bucket_id, "CH", std::to_string(0), member); response resp; - redis_exec(conn, ec, req, resp, y); + + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -110,7 +197,7 @@ int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& buck req.push("ZREM", bucket_id, member); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -144,7 +231,7 @@ int BucketDirectory::zrange(const DoutPrefixProvider* dpp, const std::string& bu } response > resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -175,7 +262,7 @@ int BucketDirectory::zscan(const DoutPrefixProvider* dpp, const std::string& buc req.push("ZSCAN", bucket_id, cursor, "MATCH", pattern, "COUNT", count); boost::redis::generic_response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -217,7 +304,7 @@ int BucketDirectory::zrank(const DoutPrefixProvider* dpp, const std::string& buc req.push("ZRANK", bucket_id, member); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -249,7 +336,7 @@ int ObjectDirectory::exist_key(const DoutPrefixProvider* dpp, CacheObj* object, request req; req.push("EXISTS", key); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -316,7 +403,7 @@ int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, option request req; req.push_range("HSET", key, redisValues); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -352,7 +439,7 @@ int ObjectDirectory::get(const DoutPrefixProvider* dpp, CacheObj* object, option request req; req.push_range("HMGET", key, fields); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -403,7 +490,7 @@ int ObjectDirectory::copy(const DoutPrefixProvider* dpp, CacheObj* object, const req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName); req.push("EXEC"); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -433,7 +520,7 @@ int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, option request req; req.push("DEL", key); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (!std::get<0>(resp).value()) { ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values deleted." << dendl; @@ -468,7 +555,7 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec request req; req.push("HGET", key, field); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -495,7 +582,7 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec request req; req.push("HSET", key, field, value); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -525,7 +612,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl req.push("ZADD", key, "CH", std::to_string(score), member); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -557,7 +644,7 @@ int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int req.push("ZRANGE", key, std::to_string(start), std::to_string(stop)); response > resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -588,7 +675,7 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, req.push("ZREVRANGE", key, start, stop); response > resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -614,7 +701,7 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const req.push("ZREM", key, member); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -645,7 +732,7 @@ int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* o req.push("ZREMRANGEBYSCORE", key, std::to_string(min), std::to_string(max)); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -678,7 +765,7 @@ int ObjectDirectory::incr(const DoutPrefixProvider* dpp, CacheObj* object, optio req.push("INCR", key); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -704,7 +791,7 @@ int ObjectDirectory::zrank(const DoutPrefixProvider* dpp, CacheObj* object, cons req.push("ZRANK", key, member); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -735,7 +822,7 @@ int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, request req; req.push("EXISTS", key); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -820,8 +907,9 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option request req; req.push_range("HSET", key, redisValues); - redis_exec(conn, ec, req, resp, y); - if (ec) { + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + + if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } @@ -919,7 +1007,7 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector& try { boost::system::error_code ec; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -989,7 +1077,7 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, option request req; req.push_range("HMGET", key, fields); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1044,7 +1132,7 @@ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName); req.push("EXEC"); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1074,14 +1162,15 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option req.push("DEL", key); if (!multi) { response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); + if (!std::get<0>(resp).value()) { ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl; return -ENOENT; } } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); } if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1113,7 +1202,7 @@ int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* bloc request req; req.push("HGET", key, field); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1140,7 +1229,7 @@ int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* bloc request req; req.push("HSET", key, field, value); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1172,7 +1261,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block request req; req.push("HGET", key, "hosts"); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1211,7 +1300,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block request req; req.push("HSET", key, "hosts", value); - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1235,7 +1324,7 @@ int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, doubl req.push("ZADD", key, "CH", std::to_string(score), member); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1266,7 +1355,7 @@ int BlockDirectory::zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int req.push("ZRANGE", key, std::to_string(start), std::to_string(stop)); response > resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1297,7 +1386,7 @@ int BlockDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop)); response > resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1328,7 +1417,7 @@ int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const req.push("ZREM", key, member); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1357,7 +1446,7 @@ int BlockDirectory::watch(const DoutPrefixProvider* dpp, CacheBlock* block, opti req.push("WATCH", key); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1385,7 +1474,7 @@ int BlockDirectory::exec(const DoutPrefixProvider* dpp, std::vector req.push("EXEC"); boost::redis::generic_response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1416,7 +1505,7 @@ int BlockDirectory::multi(const DoutPrefixProvider* dpp, optional_yield y) req.push("MULTI"); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1444,7 +1533,7 @@ int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y) req.push("DISCARD"); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -1472,7 +1561,7 @@ int BlockDirectory::unwatch(const DoutPrefixProvider* dpp, optional_yield y) req.push("UNWATCH"); response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 7c5a84d9024..03ab2848fe0 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -1,12 +1,85 @@ #pragma once #include "rgw_common.h" +#include "rgw_asio_thread.h" #include #include +#include +#include +#include namespace rgw { namespace d4n { +using boost::redis::connection; +class RedisPool { +public: + RedisPool(boost::asio::io_context* ioc, const boost::redis::config& cfg, std::size_t size) + : m_ioc(ioc),m_cfg(cfg) { + for (std::size_t i = 0; i < size; ++i) { + // Each connection gets its own strand + auto strand = boost::asio::make_strand(*m_ioc); + auto conn = std::make_shared(strand); + m_pool.push_back(conn); + } + } + + ~RedisPool() { + cancel_all(); + } + + std::shared_ptr acquire() { + std::unique_lock lock(m_aquire_release_mtx); + + if (!m_is_pool_connected) { + for(auto& it:m_pool) { + auto conn = it; + conn->async_run(m_cfg, {}, boost::asio::consign(boost::asio::detached, conn)); + } + m_is_pool_connected = true; + } + + if (m_pool.empty()) { + maybe_warn_about_blocking(nullptr); + //wait until m_pool is not empty + m_cond_var.wait(lock, [this] { return !m_pool.empty(); }); + } + auto conn = m_pool.front(); + m_pool.pop_front(); + return conn; + } + + void release(std::shared_ptr conn) { + std::unique_lock lock(m_aquire_release_mtx); + m_pool.push_back(conn); + // Notify one waiting thread that a connection is available + m_cond_var.notify_one(); + } + + int current_pool_size() const { + std::unique_lock lock(m_aquire_release_mtx); + return m_pool.size(); + } + + void cancel_all() { + std::unique_lock lock(m_aquire_release_mtx); + if(m_is_pool_connected) { + for(auto& conn : m_pool) { + conn->cancel(); + } + } + } + +private: + boost::asio::io_context* m_ioc; + boost::redis::config m_cfg; + std::deque> m_pool; + mutable std::mutex m_aquire_release_mtx; + std::condition_variable m_cond_var; + bool m_is_pool_connected{false}; +}; + + namespace net = boost::asio; using boost::redis::config; using boost::redis::connection; @@ -67,6 +140,10 @@ struct CacheBlock { class Directory { public: + std::shared_ptr redis_pool{nullptr}; // Redis connection pool + void set_redis_pool(std::shared_ptr pool) { + redis_pool = pool; + } Directory() {} }; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index bdbff1e4185..5c0537dbe87 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -14,6 +14,8 @@ */ #include "rgw_perf_counters.h" +#include +#include #include "rgw_sal_d4n.h" namespace rgw { namespace sal { @@ -82,6 +84,18 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) cacheDriver->initialize(dpp); policyDriver->get_cache_policy()->init(cct, dpp, io_context, next); + //setting the connection pool size and other parameters + uint64_t rgw_redis_connection_pool_size = dpp->get_cct()->_conf->rgw_redis_connection_pool_size; + std::shared_ptrredis_pool = nullptr; + if(rgw_redis_connection_pool_size>0){ + redis_pool = std::make_shared(&io_context, cfg, rgw_redis_connection_pool_size); + ldpp_dout(dpp, 10) << "redis connection pool created with " << rgw_redis_connection_pool_size << " connections " << dendl; + } + + objDir->set_redis_pool(redis_pool); + blockDir->set_redis_pool(redis_pool); + bucketDir->set_redis_pool(redis_pool); + return 0; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index b0ae4754db1..11325cb6858 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -62,6 +62,9 @@ class D4NFilterDriver : public FilterDriver { boost::asio::io_context& io_context; optional_yield y; + // Redis connection pool + std::shared_ptr redis_pool; + public: D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context, bool admin); virtual ~D4NFilterDriver(); -- 2.39.5