]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
the current connection setup is single and shared connection, the strand on that...
authorgal salomon <gsalomon@li-8bb82fcc-354a-11b2-a85c-8eb2daf56065.ibm.com>
Wed, 11 Jun 2025 18:05:03 +0000 (21:05 +0300)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 26 Aug 2025 04:17:39 +0000 (09:47 +0530)
it should be noted that per s3-request there are several redis-operation that may run on co-routine.
the redis-connection pool implement the guarded acquire/release APIs.

adding configuration : rgw_redis_connection_pool_size.
re-factor of redis-exec* function.

shared pointer for Redis connection pool

adding branch predication optimization for redis-pool/single-shared-connection condition

adding a warning-report-method for the case there is a blocking state upon empty connection pool.

Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
src/common/options/rgw.yaml.in
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 3ce9118f6f823ef07bfa0ceaf936423e44620f35..83be490b704910b55f975ef49c194d385493b67e 100644 (file)
@@ -4498,3 +4498,16 @@ options:
   default: false
   services:
   - rgw
+- name: rgw_redis_connection_pool_size
+  type: int
+  level: basic
+  desc: RGW connection pool size for Redis operation per D4N
+  long_desc: This option sets the size of the connection pool for Redis operations
+    in D4N. It is used to manage the number of concurrent connections to Redis.
+    A larger pool size can improve performance when multiple threads are accessing
+    Redis simultaneously, but it also increases resource usage.
+  fmt_desc: The size of the redis connection pool.
+  default: 512
+  services:
+  - rgw
+  with_legacy: true
index 5600f34812a312960c896dff76228b1f097281a2..52f01a0edcc84bd7cc6080faee71af98b9dbdcca 100644 (file)
@@ -1,5 +1,7 @@
+#include <algorithm>
 #include <boost/asio/consign.hpp>
 #include <boost/algorithm/string.hpp>
+#include <memory>
 #include "common/async/blocked_completion.h"
 #include "common/dout.h" 
 #include "d4n_directory.h"
@@ -48,10 +50,36 @@ void redis_exec(std::shared_ptr<connection> conn,
   }
 }
 
+template <typename... Types>
+void redis_exec_cp(std::shared_ptr<rgw::d4n::RedisPool> pool,
+                boost::system::error_code& ec,
+                const boost::redis::request& req,
+                boost::redis::response<Types...>& resp,
+               optional_yield y)
+{
+//purpose: Execute a Redis command using a connection from the pool
+       std::shared_ptr<connection> conn = pool->acquire();
+       try {
+
+               if (y) {
+               auto yield = y.get_yield_context();
+               async_exec(conn, req, resp, yield[ec]);
+               } else {
+               async_exec(conn, req, resp, ceph::async::use_blocked[ec]);
+               }
+       } catch (const std::exception& e) {
+               //release the connection upon exception
+               pool->release(conn);
+               throw;
+       }
+       //release the connection back to the pool after execution
+       pool->release(conn);
+}
+
 void redis_exec(std::shared_ptr<connection> conn,
                 boost::system::error_code& ec,
                 const boost::redis::request& req,
-                boost::redis::generic_response& resp, optional_yield y)
+    boost::redis::generic_response& resp, optional_yield y)
 {
   if (y) {
     auto yield = y.get_yield_context();
@@ -61,6 +89,29 @@ void redis_exec(std::shared_ptr<connection> conn,
   }
 }
 
+void redis_exec_cp(std::shared_ptr<rgw::d4n::RedisPool> pool,
+                boost::system::error_code& ec,
+                const boost::redis::request& req,
+                boost::redis::generic_response& resp, optional_yield y)
+{
+       //purpose: Execute a Redis command using a connection from the pool
+       std::shared_ptr<connection> conn = pool->acquire();
+
+       try {
+               if (y) {
+                       auto yield = y.get_yield_context();
+                       async_exec(conn, req, resp, yield[ec]);
+               } else {
+                       async_exec(conn, req, resp, ceph::async::use_blocked[ec]);
+               }       
+       } catch (const std::exception& e) {
+                       pool->release(conn);
+                       throw;
+       }
+       //release the connection back to the pool after execution
+       pool->release(conn);
+}
+
 int check_bool(std::string str) {
   if (str == "true" || str == "1") {
     return 1;
@@ -71,6 +122,41 @@ int check_bool(std::string str) {
   }
 }
 
+void redis_exec_connection_pool(const DoutPrefixProvider* dpp,
+                               std::shared_ptr<RedisPool> redis_pool,
+                               std::shared_ptr<connection> conn,
+                               boost::system::error_code& ec,
+                               const boost::redis::request& req,
+                               boost::redis::generic_response& resp,
+                               optional_yield y)
+{
+    if(!redis_pool)[[unlikely]]
+    {
+       redis_exec(conn, ec, req, resp, y);
+       ldpp_dout(dpp, 0) << "Directory::" << __func__ << " not using connection-pool, it's using the shared connection " << dendl;
+    }
+    else[[likely]]
+       redis_exec_cp(redis_pool, ec, req, resp, y);
+}
+
+template <typename... Types>
+void redis_exec_connection_pool(const DoutPrefixProvider* dpp,
+                               std::shared_ptr<RedisPool> redis_pool,
+                               std::shared_ptr<connection> conn,
+                               boost::system::error_code& ec,
+                               const boost::redis::request& req,
+                               boost::redis::response<Types...>& resp,
+                               optional_yield y)
+{
+    if(!redis_pool)[[unlikely]]
+    {
+       redis_exec(conn, ec, req, resp, y);
+       ldpp_dout(dpp, 0) << "Directory::" << __func__ << " not using connection-pool, it's using the shared connection " << dendl;
+    }
+    else[[likely]]
+       redis_exec_cp(redis_pool, ec, req, resp, y);
+}
+
 int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi)
 {
   try {
@@ -79,7 +165,8 @@ int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& buck
     req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
 
     response<std::string> resp;
-    redis_exec(conn, ec, req, resp, y);
+
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -110,7 +197,7 @@ int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& buck
     req.push("ZREM", bucket_id, member);
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -144,7 +231,7 @@ int BucketDirectory::zrange(const DoutPrefixProvider* dpp, const std::string& bu
     }
 
     response<std::vector<std::string> > resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -175,7 +262,7 @@ int BucketDirectory::zscan(const DoutPrefixProvider* dpp, const std::string& buc
     req.push("ZSCAN", bucket_id, cursor, "MATCH", pattern, "COUNT", count);
 
     boost::redis::generic_response resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -217,7 +304,7 @@ int BucketDirectory::zrank(const DoutPrefixProvider* dpp, const std::string& buc
     req.push("ZRANK", bucket_id, member);
 
     response<int> resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -249,7 +336,7 @@ int ObjectDirectory::exist_key(const DoutPrefixProvider* dpp, CacheObj* object,
     request req;
     req.push("EXISTS", key);
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -316,7 +403,7 @@ int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, option
     request req;
     req.push_range("HSET", key, redisValues);
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -352,7 +439,7 @@ int ObjectDirectory::get(const DoutPrefixProvider* dpp, CacheObj* object, option
     request req;
     req.push_range("HMGET", key, fields);
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -403,7 +490,7 @@ int ObjectDirectory::copy(const DoutPrefixProvider* dpp, CacheObj* object, const
     req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
     req.push("EXEC");
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -433,7 +520,7 @@ int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, option
     request req;
     req.push("DEL", key);
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (!std::get<0>(resp).value()) {
       ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values deleted." << dendl;
@@ -468,7 +555,7 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec
        request req;
        req.push("HGET", key, field);
 
-       redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
        if (ec) {
          ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -495,7 +582,7 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec
       request req;
       req.push("HSET", key, field, value);
 
-      redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
       if (ec) {
        ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -525,7 +612,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl
     req.push("ZADD", key, "CH", std::to_string(score), member);
 
     response<std::string> resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -557,7 +644,7 @@ int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int
     req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
 
     response<std::vector<std::string> > resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -588,7 +675,7 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object,
     req.push("ZREVRANGE", key, start, stop);
 
     response<std::vector<std::string> > resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -614,7 +701,7 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const
     req.push("ZREM", key, member);
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -645,7 +732,7 @@ int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* o
     req.push("ZREMRANGEBYSCORE", key, std::to_string(min), std::to_string(max));
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -678,7 +765,7 @@ int ObjectDirectory::incr(const DoutPrefixProvider* dpp, CacheObj* object, optio
     req.push("INCR", key);
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -704,7 +791,7 @@ int ObjectDirectory::zrank(const DoutPrefixProvider* dpp, CacheObj* object, cons
     req.push("ZRANK", key, member);
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -735,7 +822,7 @@ int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block,
     request req;
     req.push("EXISTS", key);
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -820,8 +907,9 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option
     request req;
     req.push_range("HSET", key, redisValues);
 
-    redis_exec(conn, ec, req, resp, y);
-    if (ec) {
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+    
+      if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
     }
@@ -919,7 +1007,7 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>&
 
   try {
     boost::system::error_code ec;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -989,7 +1077,7 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, option
     request req;
     req.push_range("HMGET", key, fields);
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1044,7 +1132,7 @@ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const
     req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
     req.push("EXEC");
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1074,14 +1162,15 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option
     req.push("DEL", key);
     if (!multi) {
       response<int> resp;
-      redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+      
       if (!std::get<0>(resp).value()) {
         ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
         return -ENOENT;
       }
     } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string
       response<std::string> resp;
-      redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
     }
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1113,7 +1202,7 @@ int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* bloc
        request req;
        req.push("HGET", key, field);
 
-       redis_exec(conn, ec, req, resp, y);
+       redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
        if (ec) {
          ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1140,7 +1229,7 @@ int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* bloc
       request req;
       req.push("HSET", key, field, value);
 
-      redis_exec(conn, ec, req, resp, y);
+       redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
       if (ec) {
        ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1172,7 +1261,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block
       request req;
       req.push("HGET", key, "hosts");
 
-      redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
       if (ec) {
        ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1211,7 +1300,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block
       request req;
       req.push("HSET", key, "hosts", value);
 
-      redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
       if (ec) {
        ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1235,7 +1324,7 @@ int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, doubl
     req.push("ZADD", key, "CH", std::to_string(score), member);
 
     response<std::string> resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1266,7 +1355,7 @@ int BlockDirectory::zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int
     req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
 
     response<std::vector<std::string> > resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1297,7 +1386,7 @@ int BlockDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block,
     req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop));
 
     response<std::vector<std::string> > resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1328,7 +1417,7 @@ int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const
     req.push("ZREM", key, member);
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1357,7 +1446,7 @@ int BlockDirectory::watch(const DoutPrefixProvider* dpp, CacheBlock* block, opti
     req.push("WATCH", key);
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1385,7 +1474,7 @@ int BlockDirectory::exec(const DoutPrefixProvider* dpp, std::vector<std::string>
     req.push("EXEC");
     boost::redis::generic_response resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1416,7 +1505,7 @@ int BlockDirectory::multi(const DoutPrefixProvider* dpp, optional_yield y)
     req.push("MULTI");
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1444,7 +1533,7 @@ int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y)
     req.push("DISCARD");
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1472,7 +1561,7 @@ int BlockDirectory::unwatch(const DoutPrefixProvider* dpp, optional_yield y)
     req.push("UNWATCH");
     response<std::string> resp;
 
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
index 7c5a84d9024ef2900a480055f384e600ebeb2431..03ab2848fe063d7961683f68ba2f8b484b19d414 100644 (file)
@@ -1,12 +1,85 @@
 #pragma once
 
 #include "rgw_common.h"
+#include "rgw_asio_thread.h"
 
 #include <boost/asio/detached.hpp>
 #include <boost/redis/connection.hpp>
+#include <condition_variable>
+#include <deque>
+#include <memory>
 
 namespace rgw { namespace d4n {
 
+using boost::redis::connection;
+class RedisPool {
+public:
+    RedisPool(boost::asio::io_context* ioc, const boost::redis::config& cfg, std::size_t size)
+        :  m_ioc(ioc),m_cfg(cfg) {
+        for (std::size_t i = 0; i < size; ++i) {
+            // Each connection gets its own strand
+            auto strand = boost::asio::make_strand(*m_ioc);
+            auto conn = std::make_shared<connection>(strand);
+            m_pool.push_back(conn);
+        }
+    }
+
+    ~RedisPool() {
+      cancel_all();
+    }
+
+    std::shared_ptr<connection> acquire() {
+        std::unique_lock<std::mutex> lock(m_aquire_release_mtx);
+
+       if (!m_is_pool_connected) {
+               for(auto& it:m_pool) {
+                       auto conn = it;
+                       conn->async_run(m_cfg, {}, boost::asio::consign(boost::asio::detached, conn));
+               }
+           m_is_pool_connected = true;
+       }
+
+        if (m_pool.empty()) {
+               maybe_warn_about_blocking(nullptr);
+               //wait until m_pool is not empty
+               m_cond_var.wait(lock, [this] { return !m_pool.empty(); });
+        } 
+        auto conn = m_pool.front();
+        m_pool.pop_front();
+        return conn;
+    }
+
+    void release(std::shared_ptr<connection> conn) {
+        std::unique_lock<std::mutex> lock(m_aquire_release_mtx);
+        m_pool.push_back(conn);
+       // Notify one waiting thread that a connection is available
+       m_cond_var.notify_one();
+    }
+
+    int current_pool_size() const {
+        std::unique_lock<std::mutex> lock(m_aquire_release_mtx);
+        return m_pool.size();
+    }
+
+    void cancel_all() {
+        std::unique_lock<std::mutex> lock(m_aquire_release_mtx);
+        if(m_is_pool_connected) {
+       for(auto& conn : m_pool) {
+               conn->cancel();
+        }
+      }
+    }
+
+private:
+    boost::asio::io_context* m_ioc;
+    boost::redis::config m_cfg;
+    std::deque<std::shared_ptr<connection>> m_pool;
+    mutable std::mutex m_aquire_release_mtx;
+    std::condition_variable m_cond_var;
+    bool m_is_pool_connected{false};
+};
+
+
 namespace net = boost::asio;
 using boost::redis::config;
 using boost::redis::connection;
@@ -67,6 +140,10 @@ struct CacheBlock {
 
 class Directory {
   public:
+       std::shared_ptr<RedisPool> redis_pool{nullptr}; // Redis connection pool
+       void set_redis_pool(std::shared_ptr<RedisPool> pool) {
+       redis_pool = pool;
+    }
     Directory() {}
 };
 
index bdbff1e41851340388b641fd8a5969781cbc8d66..5c0537dbe87337e86ed78eb69d05f65e46074c1e 100644 (file)
@@ -14,6 +14,8 @@
  */
 
 #include "rgw_perf_counters.h"
+#include <boost/redis/config.hpp>
+#include <memory>
 #include "rgw_sal_d4n.h"
 
 namespace rgw { namespace sal {
@@ -82,6 +84,18 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
   cacheDriver->initialize(dpp);
   policyDriver->get_cache_policy()->init(cct, dpp, io_context, next);
 
+  //setting the connection pool size and other parameters
+  uint64_t rgw_redis_connection_pool_size = dpp->get_cct()->_conf->rgw_redis_connection_pool_size;
+  std::shared_ptr<rgw::d4n::RedisPool>redis_pool = nullptr;
+  if(rgw_redis_connection_pool_size>0){
+      redis_pool = std::make_shared<rgw::d4n::RedisPool>(&io_context, cfg, rgw_redis_connection_pool_size);
+      ldpp_dout(dpp, 10) << "redis connection pool created with " << rgw_redis_connection_pool_size << " connections "  << dendl;
+  }
+
+  objDir->set_redis_pool(redis_pool);
+  blockDir->set_redis_pool(redis_pool);
+  bucketDir->set_redis_pool(redis_pool);
+  
   return 0;
 }
 
index b0ae4754db18908e795bcfbacf39b39ded972648..11325cb6858aca421dc8a38f509cb94b35e0b8a9 100644 (file)
@@ -62,6 +62,9 @@ class D4NFilterDriver : public FilterDriver {
     boost::asio::io_context& io_context;
     optional_yield y;
 
+    // Redis connection pool
+    std::shared_ptr<rgw::d4n::RedisPool> redis_pool;
+
   public:
     D4NFilterDriver(Driver* _next, boost::asio::io_context& io_context, bool admin);
     virtual ~D4NFilterDriver();