]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: add pipelining to PUT in write-cache
authorPritha Srivastava <prsrivas@redhat.com>
Tue, 22 Apr 2025 03:04:04 +0000 (08:34 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 26 Aug 2025 04:18:05 +0000 (09:48 +0530)
and read cache for data blocks directory entries

introduced a pipelined version of set command.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/test/rgw/test_d4n_directory.cc

index e5979bba8e5c3c5eb3cb81decc9a17ac578031f8..169799097c005ed21ba32fc260a16ced04aa61b0 100644 (file)
@@ -157,29 +157,31 @@ void redis_exec_connection_pool(const DoutPrefixProvider* dpp,
        redis_exec_cp(redis_pool, ec, req, resp, y);
 }
 
-int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi)
+int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline)
 {
   try {
     boost::system::error_code ec;
-    request req;
-    req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
+    if (pipeline && pipeline->is_pipeline()) {
+      request& req = pipeline->get_request();
+      req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
+    } else {
+      request req;
+      req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
 
     response<std::string> resp;
 
     redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
 
-    if (ec) {
-      ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
-      return -ec.value();
-    }
+      if (ec) {
+        ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+        return -ec.value();
+      }
 
-    if (!multi) {
       if (std::get<0>(resp).value() != "1") {
         ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
         return -ENOENT;
       }
     }
-
   } catch (std::exception &e) {
     ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
     return -EINVAL;
@@ -189,7 +191,7 @@ int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& buck
 
 }
 
-int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi)
+int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y)
 {
   try {
     boost::system::error_code ec;
@@ -204,11 +206,9 @@ int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& buck
       return -ec.value();
     }
 
-    if (!multi) {
-      if (std::get<0>(resp).value() != "1") {
-        ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-        return -ENOENT;
-      }
+    if (std::get<0>(resp).value() != "1") {
+      ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -ENOENT;
     }
 
   } catch (std::exception &e) {
@@ -603,29 +603,37 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec
   return ret;
 }
 
-int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi)
+int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, Pipeline* pipeline)
 {
   std::string key = build_index(object);
   try {
     boost::system::error_code ec;
-    request req;
-    req.push("ZADD", key, "CH", std::to_string(score), member);
+    if (pipeline && pipeline->is_pipeline()) {
+      request& req = pipeline->get_request();
+      req.push("ZADD", key, "CH", std::to_string(score), member);
+    } else {
+      request req;
+      req.push("ZADD", key, "CH", std::to_string(score), member);
 
-    response<std::string> resp;
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+      response<std::string> resp;
+      if(!redis_pool)[[unlikely]]
+      {
+          redis_exec(conn, ec, req, resp, y);
+          ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
+      } 
+      else[[likely]]
+          redis_exec_cp(redis_pool, ec, req, resp, y);
 
-    if (ec) {
-      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
-      return -ec.value();
-    }
+      if (ec) {
+        ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+        return -ec.value();
+      }
 
-    if (!multi) {
       if (std::get<0>(resp).value() != "1") {
         ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
         return -ENOENT;
       }
     }
-
   } catch (std::exception &e) {
     ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
     return -EINVAL;
@@ -692,7 +700,7 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object,
   return 0;
 }
 
-int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi)
+int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y)
 {
   std::string key = build_index(object);
   try {
@@ -708,11 +716,9 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const
       return -ec.value();
     }
 
-    if (!multi) {
-      if (std::get<0>(resp).value() != "1") {
-        ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-        return -ENOENT;
-      }
+    if (std::get<0>(resp).value() != "1") {
+      ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -ENOENT;
     }
 
   } catch (std::exception &e) {
@@ -723,7 +729,7 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const
   return 0;
 }
 
-int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi)
+int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y)
 {
   std::string key = build_index(object);
   try {
@@ -739,11 +745,9 @@ int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* o
       return -ec.value();
     }
 
-    if (!multi) {
-      if (std::get<0>(resp).value() == "0") {
-        ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
-        return -ENOENT;
-      }
+    if (std::get<0>(resp).value() == "0") {
+      ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
+      return -ENOENT;
     }
 
   } catch (std::exception &e) {
@@ -836,52 +840,46 @@ int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block,
   return std::get<0>(resp).value();
 }
 
-int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
+template<SeqContainer Container>
+int BlockDirectory::set_values(const DoutPrefixProvider* dpp, CacheBlock& block, Container& redisValues, optional_yield y)
 {
-  /* For existing keys, call get method beforehand. 
-     Sets completely overwrite existing values. */
-  std::string key = build_index(block);
-  ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
-    
   std::string hosts;
-  std::list<std::string> redisValues;
-    
   /* Creating a redisValues of the entry's properties */
   redisValues.push_back("blockID");
-  redisValues.push_back(std::to_string(block->blockID));
+  redisValues.push_back(std::to_string(block.blockID));
   redisValues.push_back("version");
-  redisValues.push_back(block->version);
+  redisValues.push_back(block.version);
   redisValues.push_back("deleteMarker");
   int ret = -1;
-  if ((ret = check_bool(std::to_string(block->deleteMarker))) != -EINVAL) {
-    block->deleteMarker = (ret != 0);
+  if ((ret = check_bool(std::to_string(block.deleteMarker))) != -EINVAL) {
+    block.deleteMarker = (ret != 0);
   } else {
     ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value for delete marker" << dendl;
     return -EINVAL;
   }
-  redisValues.push_back(std::to_string(block->deleteMarker));
+  redisValues.push_back(std::to_string(block.deleteMarker));
   redisValues.push_back("size");
-  redisValues.push_back(std::to_string(block->size));
+  redisValues.push_back(std::to_string(block.size));
   redisValues.push_back("globalWeight");
-  redisValues.push_back(std::to_string(block->globalWeight));
+  redisValues.push_back(std::to_string(block.globalWeight));
   redisValues.push_back("objName");
-  redisValues.push_back(block->cacheObj.objName);
+  redisValues.push_back(block.cacheObj.objName);
   redisValues.push_back("bucketName");
-  redisValues.push_back(block->cacheObj.bucketName);
+  redisValues.push_back(block.cacheObj.bucketName);
   redisValues.push_back("creationTime");
-  redisValues.push_back(block->cacheObj.creationTime); 
+  redisValues.push_back(block.cacheObj.creationTime);
   redisValues.push_back("dirty");
-  if ((ret = check_bool(std::to_string(block->cacheObj.dirty))) != -EINVAL) {
-    block->cacheObj.dirty = (ret != 0);
+  if ((ret = check_bool(std::to_string(block.cacheObj.dirty))) != -EINVAL) {
+    block.cacheObj.dirty = (ret != 0);
   } else {
     ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl;
     return -EINVAL;
   }
-  redisValues.push_back(std::to_string(block->cacheObj.dirty));
+  redisValues.push_back(std::to_string(block.cacheObj.dirty));
   redisValues.push_back("hosts");
-  
+
   hosts.clear();
-  for (auto const& host : block->cacheObj.hostsList) {
+  for (auto const& host : block.cacheObj.hostsList) {
     if (hosts.empty())
     hosts = host + "_";
     else
@@ -893,23 +891,82 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option
 
   redisValues.push_back(hosts);
   redisValues.push_back("etag");
-  redisValues.push_back(block->cacheObj.etag);
+  redisValues.push_back(block.cacheObj.etag);
   redisValues.push_back("objSize");
-  redisValues.push_back(std::to_string(block->cacheObj.size));
+  redisValues.push_back(std::to_string(block.cacheObj.size));
   redisValues.push_back("userId");
-  redisValues.push_back(block->cacheObj.user_id);
+  redisValues.push_back(block.cacheObj.user_id);
   redisValues.push_back("displayName");
-  redisValues.push_back(block->cacheObj.display_name);
+  redisValues.push_back(block.cacheObj.display_name);
+
+  return 0;
+}
+
+int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, Pipeline* pipeline)
+{
+  /* For existing keys, call get method beforehand. 
+     Sets completely overwrite existing values. */
+  std::string key = build_index(block);
+  ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+  std::vector<std::string> redisValues;
+
+  auto ret = set_values(dpp, *block, redisValues, y);
+  if (ret < 0) {
+    return ret;
+  }
 
   try {
     boost::system::error_code ec;
     response<ignore_t> resp;
-    request req;
+    if (pipeline && pipeline->is_pipeline()) {
+      request& req = pipeline->get_request();
+      req.push_range("HSET", key, redisValues);
+    } else {
+      request req;
+      req.push_range("HSET", key, redisValues);
+
+      if(!redis_pool)[[unlikely]]
+      {
+          redis_exec(conn, ec, req, resp, y);
+          ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
+      } 
+      else[[likely]]
+          redis_exec_cp(redis_pool, ec, req, resp, y);
+      if (ec) {
+        ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+        return -ec.value();
+      }
+    }
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int BlockDirectory::set(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
+{
+  request req;
+  for (auto block : blocks) {
+    std::string key = build_index(&block);
+    ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+    //std::string hosts;
+    std::list<std::string> redisValues;
+    auto ret = set_values(dpp, block, redisValues, y);
+    if (ret < 0) {
+      return ret;
+    }
     req.push_range("HSET", key, redisValues);
+  }
 
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-    
-      if (ec) {
+  try {
+    boost::system::error_code ec;
+    boost::redis::generic_response resp;
+    redis_exec(conn, ec, req, resp, y);
+    if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
     }
@@ -979,7 +1036,7 @@ template <size_t N>
 int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
 {
   request req;
-  redis_response<100, std::optional<std::vector<std::string>>>::type resp;
+  typename redis_response<N, std::optional<std::vector<std::string>>>::type resp;
   for (auto block : blocks) {
     std::string key = build_index(&block);
     std::vector<std::string> fields;
@@ -1279,7 +1336,7 @@ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const
   }
 }
 
-int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi) 
+int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
 {
   std::string key = build_index(block);
   ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
@@ -1288,17 +1345,17 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option
     boost::system::error_code ec;
     request req;
     req.push("DEL", key);
-    if (!multi) {
-      response<int> resp;
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-      
-      if (!std::get<0>(resp).value()) {
-        ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
-        return -ENOENT;
-      }
-    } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string
-      response<std::string> resp;
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+    response<int> resp;
+    if(!redis_pool)[[unlikely]]
+    {
+        redis_exec(conn, ec, req, resp, y);
+        ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
+    } 
+    else[[likely]]
+        redis_exec_cp(redis_pool, ec, req, resp, y);
+    if (!std::get<0>(resp).value()) {
+      ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
+      return -ENOENT;
     }
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -1443,7 +1500,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block
   return 0;
 }
 
-int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi)
+int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y)
 {
   std::string key = build_index(block);
   try {
@@ -1458,11 +1515,9 @@ int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, doubl
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
     }
-    if (!multi) {
-      if (std::get<0>(resp).value() != "1") {
-        ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
-        return -EINVAL;
-      }
+    if (std::get<0>(resp).value() != "1") {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
     }
 
   } catch (std::exception &e) {
@@ -1565,147 +1620,22 @@ int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const
   return 0;
 }
 
-int BlockDirectory::watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
-{
-  std::string key = build_index(block);
-  try {
-    boost::system::error_code ec;
-    request req;
-    req.push("WATCH", key);
-    response<std::string> resp;
-
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
-    if (ec) {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
-      return -ec.value();
-    }
-
-    if (std::get<0>(resp).value() != "OK") {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-      return -EINVAL;
-    }
-
-  } catch (std::exception &e) {
-    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
-    return -EINVAL;
-  }
-
-  return 0;
-}
-
-int BlockDirectory::exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y)
-{
-  try {
-    boost::system::error_code ec;
-    request req;
-    req.push("EXEC");
-    boost::redis::generic_response resp;
-
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
-    if (ec) {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
-      std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl;
-      return -ec.value();
-    }
-
-    for (uint64_t i = 0; i < resp.value().size(); i++) {
-      ldpp_dout(dpp, 20) << "BlockDirectory::" << __func__ << "() MULTI: " << resp.value().front().value << dendl;
-      responses.emplace_back(resp.value().front().value);
-      boost::redis::consume_one(resp);
-    }
-
-  } catch (std::exception &e) {
-    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
-    std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << std::endl;
-    return -EINVAL;
-  }
-
-  return 0;
-}
-
-int BlockDirectory::multi(const DoutPrefixProvider* dpp, optional_yield y)
-{
-  try {
-    boost::system::error_code ec;
-    request req;
-    req.push("MULTI");
-    response<std::string> resp;
-
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
-    if (ec) {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
-      return -ec.value();
-    }
-
-    if (std::get<0>(resp).value() != "OK") {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-      return -EINVAL;
-    }
-
-  } catch (std::exception &e) {
-    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
-    return -EINVAL;
-  }
-
-  return 0;
-}
-
-int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y)
-{
-  try {
-    boost::system::error_code ec;
-    request req;
-    req.push("DISCARD");
-    response<std::string> resp;
-
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
-
-    if (ec) {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
-      return -ec.value();
-    }
-
-    if (std::get<0>(resp).value() != "OK") {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-      return -EINVAL;
-    }
-
-  } catch (std::exception &e) {
-    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
-    return -EINVAL;
-  }
-
-  return 0;
-}
-
-int BlockDirectory::unwatch(const DoutPrefixProvider* dpp, optional_yield y)
+int Pipeline::execute(const DoutPrefixProvider* dpp, optional_yield y)
 {
+  boost::redis::generic_response resp;
   try {
     boost::system::error_code ec;
-    request req;
-    req.push("UNWATCH");
-    response<std::string> resp;
-
-    redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
+    pipeline_mode = false;
+    redis_exec(conn, ec, req, resp, y);
 
     if (ec) {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
     }
-
-    if (std::get<0>(resp).value() != "OK") {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
-      return -EINVAL;
-    }
-
   } catch (std::exception &e) {
-    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << e.what() << dendl;
     return -EINVAL;
   }
-
   return 0;
 }
 
index 7f281b338b3b2859d95d038d103df3cc79110b05..df226f4c3fd8f2ab62246d1795b5f9494017fc6e 100644 (file)
@@ -8,9 +8,15 @@
 #include <condition_variable>
 #include <deque>
 #include <memory>
+#include <concepts>
 
 namespace rgw { namespace d4n {
 
+template<typename T>
+  concept SeqContainer = requires(T& t, typename T::value_type v) {
+      t.push_back(v);
+  };
+
 using boost::redis::connection;
 class RedisPool {
 public:
@@ -147,11 +153,26 @@ class Directory {
     Directory() {}
 };
 
+class Pipeline {
+  public:
+    Pipeline(std::shared_ptr<connection>& conn) : conn(conn) {}
+    void start() { pipeline_mode = true; }
+    //executes all commands and sets pipeline mode to false
+    int execute(const DoutPrefixProvider* dpp, optional_yield y);
+    bool is_pipeline() { return pipeline_mode; }
+    request& get_request() { return req; }
+
+  private:
+    std::shared_ptr<connection> conn;
+    request req;
+    bool pipeline_mode{false};
+};
+
 class BucketDirectory: public Directory {
   public:
     BucketDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
-    int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi=false);
-    int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi=false);
+    int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr);
+    int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y);
     int zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector<std::string>& members, optional_yield y);
     int zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector<std::string>& members, uint64_t next_cursor, optional_yield y);
     int zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y);
@@ -171,11 +192,11 @@ class ObjectDirectory: public Directory {
     int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
     int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
     int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y);
-    int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi=false);
+    int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, Pipeline* pipeline=nullptr);
     int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& start, const std::string& stop, std::vector<std::string>& members, optional_yield y);
-    int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi=false);
-    int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi=false);
+    int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y);
+    int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y);
     int zrank(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, std::string& index, optional_yield y);
     //Return value is the incremented value, else return error
     int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
@@ -192,7 +213,9 @@ class BlockDirectory: public Directory {
     
     int exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
 
-    int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+    //Pipelined version of set
+    int set(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
+    int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, Pipeline* pipeline=nullptr);
     int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
     //Pipelined version of get using boost::redis::response for list bucket
     template <size_t N = 100>
@@ -200,23 +223,20 @@ class BlockDirectory: public Directory {
     //Pipelined version of get using boost::redis::generic_response
     int get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
     int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
-    int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false);
+    int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
     int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
     int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
-    int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi=false);
+    int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y);
     int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y);
-    int watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
-    //Move MULTI, EXEC and DISCARD to directory? As they do not operate on a key
-    int exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y);
-    int multi(const DoutPrefixProvider* dpp, optional_yield y);
-    int discard(const DoutPrefixProvider* dpp, optional_yield y);
-    int unwatch(const DoutPrefixProvider* dpp, optional_yield y);
 
   private:
     std::shared_ptr<connection> conn;
     std::string build_index(CacheBlock* block);
+
+    template<SeqContainer Container>
+    int set_values(const DoutPrefixProvider* dpp, CacheBlock& block, Container& redisValues, optional_yield y);
 };
 
 } } // namespace rgw::d4n
index 3fc73f27579a73e6db4eb4132c22ef3a47e19959..d2d9834c3f0786038b6aa35f573e3fbc4e523b36 100644 (file)
@@ -950,9 +950,8 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
                }
              }
              //delete entry from ordered set of objects, as older versions would have been written to the backend store
-             ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y, true);
+             ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y);
              if (ret < 0) {
-               blockDir->discard(dpp, y);
                ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue zrem for object entry: " << c_obj->get_name() << ", ret=" << ret << dendl;
                continue;
              }
@@ -962,7 +961,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
              .objName = c_obj->get_name(),
              .bucketName = c_obj->get_bucket()->get_bucket_id(),
            };
-           ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+           ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y);
            if (ret < 0) {
              ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
              continue;
index dafd2d9579ff446d93bec39c642e60ce641e68aa..61d55a4c10986440233b3cbb1ae072b97d786a35 100644 (file)
@@ -1200,7 +1200,10 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
 
     //dirty objects
     if (dirty) {
-      auto ret = blockDir->set(dpp, &block, y);
+      auto redis_conn = this->driver->get_conn();
+      rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+      p.start();
+      auto ret = blockDir->set(dpp, &block, y, &p);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
         return ret;
@@ -1210,7 +1213,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
          delete-obj with "null" as version-id deletes the latest version */
       if (!(this->get_bucket()->versioned())) {
         block.cacheObj.objName = "_:null_" + this->get_name();
-        ret = blockDir->set(dpp, &block, y);
+        ret = blockDir->set(dpp, &block, y, &p);
         if (ret < 0) {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
           return ret;
@@ -1227,37 +1230,41 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
       auto score = ceph::real_clock::to_double(mtime);
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: "  << score << ret << dendl;
       rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir();
-      ret = objDir->zadd(dpp, &object, score, object_version, y);
+      ret = objDir->zadd(dpp, &object, score, object_version, y, &p);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add version to ordered set with error: " << ret << dendl;
-        blockDir->discard(dpp, y);
         return ret;
       }
       //add an entry to ordered set containing objects for bucket listing, set score to 0 always to lexicographically order the objects
       rgw::d4n::BucketDirectory* bucketDir = this->driver->get_bucket_dir();
-      ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, true);
+      ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, &p);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl;
         return ret;
       }
+      p.execute(dpp, y);
     } else { //for clean/non-dirty objects
       rgw::d4n::CacheBlock latest = block;
       auto ret = blockDir->get(dpp, &latest, y);
       if (ret == -ENOENT) {
         if (!(this->get_bucket()->versioned())) {
+          auto redis_conn = this->driver->get_conn();
+          rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+          p.start();
           //we can explore pipelining to send the two 'HSET' commands together
-          ret = blockDir->set(dpp, &block, y);
+          ret = blockDir->set(dpp, &block, y, &p);
           if (ret < 0) {
               ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
             return ret;
           }
           //bucket is non versioned, set a null instance
           block.cacheObj.objName = "_:null_" + this->get_name();
-          ret = blockDir->set(dpp, &block, y);
+          ret = blockDir->set(dpp, &block, y, &p);
           if (ret < 0) {
             ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
             return ret;
           }
+          p.execute(dpp, y);
         }
       } else if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl;
@@ -1274,18 +1281,22 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
         /* even if the head block is found, overwrite existing values with new version in case of non-versioned bucket, clean objects
            and versioned and non-versioned buckets dirty objects */
         if (!(this->get_bucket()->versioned())) {
-          ret = blockDir->set(dpp, &block, y);
+          auto redis_conn = this->driver->get_conn();
+          rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
+          p.start();
+          ret = blockDir->set(dpp, &block, y, &p);
           if (ret < 0) {
             ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
             return ret;
           }
           //bucket is non versioned, set a null instance
           block.cacheObj.objName = "_:null_" + this->get_name();
-          ret = blockDir->set(dpp, &block, y);
+          ret = blockDir->set(dpp, &block, y, &p);
           if (ret < 0) {
             ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
             return ret;
           }
+          p.execute(dpp, y);
         }//end-if !(this->get_bucket()->versioned())
       } //end-if ret = 0
     } //end-else
@@ -1339,8 +1350,9 @@ int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, o
   off_t lst = this->get_size();
   ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Object size =" << lst << dendl;
   off_t fst = 0;
+  std::vector<rgw::d4n::CacheBlock> blocks;
   do {
-    rgw::d4n::CacheBlock block, existing_block;
+    rgw::d4n::CacheBlock block;
     if (fst >= lst){
       break;
     }
@@ -1348,40 +1360,29 @@ int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, o
     off_t cur_len = cur_size - fst;
     block.cacheObj.bucketName = this->get_bucket()->get_bucket_id();
     block.cacheObj.objName = this->get_key().get_oid();
-    block.cacheObj.dirty = dirty;
-    block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-    existing_block.cacheObj.objName = block.cacheObj.objName;
-    existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
-
     block.size = cur_len;
     block.blockID = fst;
-    block.version = version;
-
-    /* Store block in directory */
-    existing_block.blockID = block.blockID;
-    existing_block.size = block.size;
-
-    int ret;
-    if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) {
-      if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
-        block = existing_block;
-        block.version = version;
-        block.cacheObj.dirty = dirty;
-      }
 
-      block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
-      if ((ret = blockDir->set(dpp, &block, y)) < 0) {
-        ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
-        return ret;
-      }
-    } else {
-      ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
-      return ret;
-    }
     fst += cur_len;
+    blocks.emplace_back(block);
   } while(fst < lst);
 
+  auto ret = blockDir->get(dpp, blocks, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined get() method failed, ret=" << ret << dendl;
+    return ret;
+  }
+
+  for (auto& block : blocks) {
+    block.cacheObj.dirty = dirty;
+    block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+    block.version = version;
+  }
+  if ((ret = blockDir->set(dpp, blocks, y)) < 0) {
+    ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method failed, ret=" << ret << dendl;
+    return ret;
+  }
+
   return 0;
 }
 
@@ -2170,7 +2171,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     std::string prefix = source->get_prefix();
     std::string dest_prefix;
 
-    rgw::d4n::CacheBlock block, existing_block, dest_block;
+    rgw::d4n::CacheBlock block, dest_block;
     rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
     auto policy = filter->get_policy_driver()->get_cache_policy();
     auto cache_driver = filter->get_cache_driver();
@@ -2178,8 +2179,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     block.cacheObj.bucketName = source->get_bucket()->get_bucket_id();
     std::stringstream s;
     block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_double(source->get_mtime()));
-    bool dirty = block.cacheObj.dirty = false; //Reading from the backend, data is clean
-    block.version = version;
+    bool dirty = false; //Reading from the backend, data is clean
 
     if (source->dest_object && source->dest_bucket) {
       D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
@@ -2193,10 +2193,6 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       dest_block.version = dest_version;
     }
 
-    //populating fields needed for building directory index
-    existing_block.cacheObj.objName = block.cacheObj.objName;
-    existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
-
     ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl;
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
@@ -2211,24 +2207,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           if (ret == 0) {
             std::string objEtag = "";
             policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
-
-            /* Store block in directory */
-            existing_block.blockID = block.blockID;
-            existing_block.size = block.size;
-
-            if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
-              if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
-                block = existing_block;
-                block.version = version;
-              }
-
-              block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
-              if ((ret = blockDir->set(dpp, &block, *y)) < 0)
-                ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
-            } else { //end -if blockDir->get
-              ldpp_dout(dpp, 20) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
-            }
+            blocks.emplace_back(block);
           } else {
             ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
           }
@@ -2245,9 +2224,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
             policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
-            if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
-              ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB:: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
-            }
+            dest_blocks.emplace_back(dest_block);
           }
         }
       }
@@ -2261,24 +2238,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           ret = cache_driver->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
             policy->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
-
-            /* Store block in directory */
-            existing_block.blockID = block.blockID;
-            existing_block.size = block.size;
-
-            if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
-              if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
-                block = existing_block;
-                block.version = version;
-              }
-
-            block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
-            if ((ret = blockDir->set(dpp, &block, *y)) < 0)
-              ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
-            } else {
-              ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
-            }
+            blocks.emplace_back(block);
           } else {
             ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
           }
@@ -2295,9 +2255,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           ret = cache_driver->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
             policy->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
-            if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
-              ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
-            }
+            dest_blocks.emplace_back(dest_block);
           }
         }
       }
@@ -2321,24 +2279,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
             ret = cache_driver->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
               policy->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
-
-              /* Store block in directory */
-              existing_block.blockID = block.blockID;
-              existing_block.size = block.size;
-
-              if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
-                if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
-                  block = existing_block;
-                  block.version = version;
-                }
-
-                block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
-                if ((ret = blockDir->set(dpp, &block, *y)) < 0)
-                  ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
-              } else {
-                ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
-              }
+              blocks.emplace_back(block);
             } else {
               ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
             }
@@ -2358,9 +2299,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
             ret = cache_driver->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
               policy->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
-              if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
-                ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
-              }
+              dest_blocks.emplace_back(dest_block);
             }
           }
         }
@@ -2369,7 +2308,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         bl_rem = std::move(bl);
       }//bl_rem.length()
     }
-  }
+    if (last_part) {
+      auto ret = blockDir->get(dpp, blocks, *y);
+      if (ret < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined get() method failed, ret=" << ret << dendl;
+      }
+
+      for (auto& block : blocks) {
+        block.cacheObj.dirty = false;
+        block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+        block.version = version;
+      }
+      if ((ret = blockDir->set(dpp, blocks, *y)) < 0) {
+        ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method failed, ret=" << ret << dendl;
+      }
+      if (source->dest_object && source->dest_bucket) {
+        if ((ret = blockDir->set(dpp, dest_blocks, *y)) < 0) {
+          ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory pipelined set() method for dest blocks failed, ret=" << ret << dendl;
+        }
+      }
+    }// if last_part
+  }//if write_to_cache
 
   /* Clean-up:
   1. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different
@@ -2520,7 +2479,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
                       return ret;
                     }
                     //delete entry from ordered set of objects
-                    ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true);
+                    ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y);
                     if (ret < 0) {
                       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl;
                       return ret;
@@ -2535,7 +2494,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
                 //delete entry from ordered set of versions
                 std::string version = source->get_instance();
                 ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl;
-                ret = objDir->zrem(dpp, &dir_obj, version, y, true);
+                ret = objDir->zrem(dpp, &dir_obj, version, y);
                 if (ret < 0) {
                   ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
                   return ret;
@@ -2586,16 +2545,15 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
           .bucketName = source->get_bucket()->get_bucket_id(),
         };
         //delete entry from ordered set of object versions
-        ret = objDir->zrem(dpp, &dir_obj, "null", y, true);
+        ret = objDir->zrem(dpp, &dir_obj, "null", y);
         if (ret < 0) {
           ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
           return ret;
         }
         //delete entry from ordered set of objects
-        ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true);
+        ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y);
         if (ret < 0) {
           ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl;
-          blockDir->discard(dpp, y);
           return ret;
         }
       }
index 11325cb6858aca421dc8a38f509cb94b35e0b8a9..1c823688dd8d1543cfe2085511f07940d508eede 100644 (file)
@@ -89,6 +89,7 @@ class D4NFilterDriver : public FilterDriver {
     rgw::d4n::BucketDirectory* get_bucket_dir() { return bucketDir.get(); }
     rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); }
     void save_y(optional_yield y) { this->y = y; }
+    std::shared_ptr<connection> get_conn() { return conn; }
     void shutdown() override;
 };
 
@@ -161,6 +162,7 @@ class D4NFilterObject : public FilterObject {
            optional_yield* y;
       int part_num{0}, num_parts{0};
       int len_sent = 0;
+      std::vector<rgw::d4n::CacheBlock> blocks, dest_blocks;
 
          public:
            D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter),
index 2778abe9ab11564de63f0b8b4177d131e911ee24..2f7c436628e657e553b24ace76ef99e573c57076 100644 (file)
@@ -678,97 +678,6 @@ TEST_F(BlockDirectoryFixture, IncrYield)
   }
 }
 
-TEST_F(BlockDirectoryFixture, MultiExecuteYield)
-{
-  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
-    {
-      boost::system::error_code ec;
-      {
-        request req;
-        response<std::string> resp;
-        req.push("MULTI");                       // Start transaction
-        conn->async_exec(req, resp, yield[ec]);
-        ASSERT_EQ((bool)ec, false);
-        std::cout << "MULTI value: " << std::get<0>(resp).value() << std::endl;
-      }
-      {
-        request req;
-        response<std::string> resp;
-        req.push("SET", "key1", "value1");       // Command 1
-        conn->async_exec(req, resp, yield[ec]);
-        ASSERT_EQ((bool)ec, false);
-        std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
-      }
-      {
-        request req;
-        response<std::string> resp;
-        req.push("SET", "key2", "value2");       // Command 2
-        conn->async_exec(req, resp, yield[ec]);
-        ASSERT_EQ((bool)ec, false);
-        std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
-      }
-      {
-        request req;
-        response<std::string> resp;
-        req.push("ZADD", "key4", "1", "v1");                  // Command 3
-        conn->async_exec(req, resp, yield[ec]);
-        ASSERT_EQ((bool)ec, false);
-        std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl;
-      }
-      {
-        request req;
-        /* string as response here as the command is only getting queued, not executed
-           if response type is changed to int then the operation fails */
-        response<std::string> resp;
-        req.push("DEL", "key3");                  // Command 4
-        conn->async_exec(req, resp, yield[ec]);
-        ASSERT_EQ((bool)ec, false);
-        std::cout << "DEL value: " << std::get<0>(resp).value() << std::endl;
-      }
-      {
-        request req;
-        req.push("EXEC");                        // Execute transaction
-
-        boost::redis::generic_response resp;
-        conn->async_exec(req, resp, yield[ec]);
-        ASSERT_EQ((bool)ec, false);
-        for (uint64_t i = 0; i < resp.value().size(); i++) {
-          std::cout << "EXEC: " << resp.value().front().value << std::endl;
-          boost::redis::consume_one(resp);
-        }
-      }
-    }
-    //test multi/exec using directory methods
-    {
-      ASSERT_EQ(0, dir->multi(env->dpp, optional_yield{yield}));
-      ASSERT_EQ(0, dir->set(env->dpp, block, yield));
-      block->cacheObj.objName = "testBlockNew";
-      ASSERT_EQ(0, dir->set(env->dpp, block, yield));
-      block->cacheObj.objName = "testBlockA";
-      ASSERT_EQ(0, dir->del(env->dpp, block, yield, true));
-      block->cacheObj.objName = "testBlockB";
-      ASSERT_EQ(0, dir->zadd(env->dpp, block, 100, "version1", yield, true));
-      std::vector<std::string> responses;
-      ASSERT_EQ(0, dir->exec(env->dpp, responses, optional_yield{yield}));
-      for (auto r : responses) {
-        std::cout << "EXEC: " << r << std::endl;
-      }
-    }
-    {
-      boost::system::error_code ec;
-      request req;
-      req.push("FLUSHALL");
-      response<boost::redis::ignore_t> resp;
-
-      conn->async_exec(req, resp, yield[ec]);
-    }
-
-    conn->cancel();
-  }, rethrow);
-
-  io.run();
-}
-
 TEST_F(BlockDirectoryFixture, ZScan)
 {
   boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {