From: Pritha Srivastava Date: Fri, 13 Jun 2025 11:35:44 +0000 (+0530) Subject: rgw/d4n: changes to integrate pipelining with connection pool. X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=aa3bd25a7d93d93df924d003f9b0ea23c63e87c0;p=ceph.git rgw/d4n: changes to integrate pipelining with connection pool. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 169799097c005..efc30293dc51a 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -616,13 +616,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl req.push("ZADD", key, "CH", std::to_string(score), member); response resp; - if(!redis_pool)[[unlikely]] - { - redis_exec(conn, ec, req, resp, y); - ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl; - } - else[[likely]] - redis_exec_cp(redis_pool, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; @@ -926,13 +920,7 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option request req; req.push_range("HSET", key, redisValues); - if(!redis_pool)[[unlikely]] - { - redis_exec(conn, ec, req, resp, y); - ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl; - } - else[[likely]] - redis_exec_cp(redis_pool, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); @@ -965,7 +953,7 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, std::vector& try { boost::system::error_code ec; boost::redis::generic_response resp; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); @@ -1346,13 +1334,7 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option request req; req.push("DEL", key); response resp; - if(!redis_pool)[[unlikely]] - { - redis_exec(conn, ec, req, resp, y); - ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl; - } - else[[likely]] - redis_exec_cp(redis_pool, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (!std::get<0>(resp).value()) { ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl; return -ENOENT; @@ -1626,7 +1608,7 @@ int Pipeline::execute(const DoutPrefixProvider* dpp, optional_yield y) try { boost::system::error_code ec; pipeline_mode = false; - redis_exec(conn, ec, req, resp, y); + redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y); if (ec) { ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << ec.what() << dendl; diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index df226f4c3fd8f..26c899d2275e7 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -155,7 +155,7 @@ class Directory { class Pipeline { public: - Pipeline(std::shared_ptr& conn) : conn(conn) {} + Pipeline(std::shared_ptr& conn, std::shared_ptr redis_pool) : conn(conn), redis_pool(redis_pool) {} void start() { pipeline_mode = true; } //executes all commands and sets pipeline mode to false int execute(const DoutPrefixProvider* dpp, optional_yield y); @@ -164,6 +164,7 @@ class Pipeline { private: std::shared_ptr conn; + std::shared_ptr redis_pool{nullptr}; request req; bool pipeline_mode{false}; }; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 61d55a4c10986..1b118726f1e5b 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -1201,7 +1201,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: //dirty objects if (dirty) { auto redis_conn = this->driver->get_conn(); - rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn); + auto redis_pool = this->driver->get_redis_pool(); + rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool); p.start(); auto ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { @@ -1249,7 +1250,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: if (ret == -ENOENT) { if (!(this->get_bucket()->versioned())) { auto redis_conn = this->driver->get_conn(); - rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn); + auto redis_pool = this->driver->get_redis_pool(); + rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool); p.start(); //we can explore pipelining to send the two 'HSET' commands together ret = blockDir->set(dpp, &block, y, &p); @@ -1282,7 +1284,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: and versioned and non-versioned buckets dirty objects */ if (!(this->get_bucket()->versioned())) { auto redis_conn = this->driver->get_conn(); - rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn); + auto redis_pool = this->driver->get_redis_pool(); + rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool); p.start(); ret = blockDir->set(dpp, &block, y, &p); if (ret < 0) { diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 1c823688dd8d1..1478aa5f40f49 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -90,6 +90,7 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); } void save_y(optional_yield y) { this->y = y; } std::shared_ptr get_conn() { return conn; } + std::shared_ptr get_redis_pool() { return redis_pool; } void shutdown() override; };