]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: changes to integrate pipelining with connection pool.
authorPritha Srivastava <prsrivas@redhat.com>
Fri, 13 Jun 2025 11:35:44 +0000 (17:05 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 26 Aug 2025 04:18:05 +0000 (09:48 +0530)
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 169799097c005ed21ba32fc260a16ced04aa61b0..efc30293dc51ad47228b08bfd863616f28a02d4a 100644 (file)
@@ -616,13 +616,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl
       req.push("ZADD", key, "CH", std::to_string(score), member);
 
       response<std::string> resp;
-      if(!redis_pool)[[unlikely]]
-      {
-          redis_exec(conn, ec, req, resp, y);
-          ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
-      } 
-      else[[likely]]
-          redis_exec_cp(redis_pool, ec, req, resp, y);
+      redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
       if (ec) {
         ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -926,13 +920,7 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option
       request req;
       req.push_range("HSET", key, redisValues);
 
-      if(!redis_pool)[[unlikely]]
-      {
-          redis_exec(conn, ec, req, resp, y);
-          ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
-      } 
-      else[[likely]]
-          redis_exec_cp(redis_pool, ec, req, resp, y);
+      redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
       if (ec) {
         ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
         return -ec.value();
@@ -965,7 +953,7 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, std::vector<CacheBlock>&
   try {
     boost::system::error_code ec;
     boost::redis::generic_response resp;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
@@ -1346,13 +1334,7 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option
     request req;
     req.push("DEL", key);
     response<int> resp;
-    if(!redis_pool)[[unlikely]]
-    {
-        redis_exec(conn, ec, req, resp, y);
-        ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
-    } 
-    else[[likely]]
-        redis_exec_cp(redis_pool, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
     if (!std::get<0>(resp).value()) {
       ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
       return -ENOENT;
@@ -1626,7 +1608,7 @@ int Pipeline::execute(const DoutPrefixProvider* dpp, optional_yield y)
   try {
     boost::system::error_code ec;
     pipeline_mode = false;
-    redis_exec(conn, ec, req, resp, y);
+    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
     if (ec) {
       ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << ec.what() << dendl;
index df226f4c3fd8f2ab62246d1795b5f9494017fc6e..26c899d2275e7dadbf791004a1eb43b5c04618f9 100644 (file)
@@ -155,7 +155,7 @@ class Directory {
 
 class Pipeline {
   public:
-    Pipeline(std::shared_ptr<connection>& conn) : conn(conn) {}
+    Pipeline(std::shared_ptr<connection>& conn, std::shared_ptr<RedisPool> redis_pool) : conn(conn), redis_pool(redis_pool) {}
     void start() { pipeline_mode = true; }
     //executes all commands and sets pipeline mode to false
     int execute(const DoutPrefixProvider* dpp, optional_yield y);
@@ -164,6 +164,7 @@ class Pipeline {
 
   private:
     std::shared_ptr<connection> conn;
+    std::shared_ptr<RedisPool> redis_pool{nullptr};
     request req;
     bool pipeline_mode{false};
 };
index 61d55a4c10986440233b3cbb1ae072b97d786a35..1b118726f1e5b6be3a8ca210889f6d9505cbca28 100644 (file)
@@ -1201,7 +1201,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
     //dirty objects
     if (dirty) {
       auto redis_conn = this->driver->get_conn();
-      rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+      auto redis_pool = this->driver->get_redis_pool();
+      rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool);
       p.start();
       auto ret = blockDir->set(dpp, &block, y, &p);
       if (ret < 0) {
@@ -1249,7 +1250,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
       if (ret == -ENOENT) {
         if (!(this->get_bucket()->versioned())) {
           auto redis_conn = this->driver->get_conn();
-          rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+          auto redis_pool = this->driver->get_redis_pool();
+          rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool);
           p.start();
           //we can explore pipelining to send the two 'HSET' commands together
           ret = blockDir->set(dpp, &block, y, &p);
@@ -1282,7 +1284,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
            and versioned and non-versioned buckets dirty objects */
         if (!(this->get_bucket()->versioned())) {
           auto redis_conn = this->driver->get_conn();
-          rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+          auto redis_pool = this->driver->get_redis_pool();
+          rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool);
           p.start();
           ret = blockDir->set(dpp, &block, y, &p);
           if (ret < 0) {
index 1c823688dd8d1543cfe2085511f07940d508eede..1478aa5f40f49171f0785fca4a2d6ca15218e071 100644 (file)
@@ -90,6 +90,7 @@ class D4NFilterDriver : public FilterDriver {
     rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); }
     void save_y(optional_yield y) { this->y = y; }
     std::shared_ptr<connection> get_conn() { return conn; }
+    std::shared_ptr<rgw::d4n::RedisPool> get_redis_pool() { return redis_pool; }
     void shutdown() override;
 };