]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: use boost::generic::response for bucket list operation.
authorPritha Srivastava <prsrivas@redhat.com>
Tue, 29 Apr 2025 11:12:18 +0000 (16:42 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 26 Aug 2025 04:18:05 +0000 (09:48 +0530)
1. test case to explore pipelining using boost::redis::generic_response

2. introduced a get method based on redis::generic_response
and used the same for Bucket::list.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/test/rgw/test_d4n_directory.cc

index 52f01a0edcc84bd7cc6080faee71af98b9dbdcca..e5979bba8e5c3c5eb3cb81decc9a17ac578031f8 100644 (file)
@@ -972,6 +972,10 @@ void parse_response(T t, std::vector<std::vector<std::string>>& responses)
     });
 }
 
+//explicit instantiation for 100 elements
+template int BlockDirectory::get<100>(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
+
+template <size_t N>
 int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
 {
   request req;
@@ -1111,6 +1115,130 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, option
   return 0;
 }
 
+int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
+{
+  boost::redis::generic_response resp;
+  request req;
+  for (auto block : blocks) {
+    std::string key = build_index(&block);
+    std::vector<std::string> fields;
+    ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+    fields.push_back("blockID");
+    fields.push_back("version");
+    fields.push_back("deleteMarker");
+    fields.push_back("size");
+    fields.push_back("globalWeight");
+
+    fields.push_back("objName");
+    fields.push_back("bucketName");
+    fields.push_back("creationTime");
+    fields.push_back("dirty");
+    fields.push_back("hosts");
+    fields.push_back("etag");
+    fields.push_back("objSize");
+    fields.push_back("userId");
+    fields.push_back("displayName");
+
+    try {
+      req.push("HGETALL", key);
+    } catch (std::exception &e) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+      return -EINVAL;
+    }
+  } //end - for
+
+  try {
+    boost::system::error_code ec;
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+  //i is used to index blocks
+  //j is used to keep a track of number of elements for aggregate type map or array
+  auto i = 0, j = 0;
+  bool field_key=true, field_val=false;
+  std::string key, fieldkey, fieldval, prev_val;
+  int num_elements = 0;
+  for (auto& element : resp.value()) {
+    ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): i is: " << i << dendl;
+    CacheBlock* block = &blocks[i];
+    std::string key = build_index(block);
+    ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+    if (element.data_type == boost::redis::resp3::type::array || element.data_type == boost::redis::resp3::type::map) {
+      num_elements = element.aggregate_size;
+      if (num_elements == 0) {
+        i++;
+        j = 0;
+      }
+      ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "() num_elements: " << num_elements << dendl;
+      continue;
+    } else {
+      if (j < num_elements) {
+        if (field_key && !field_val) {
+          if (element.value == "blockID" || element.value == "version" || element.value == "deleteMarker" ||
+              element.value == "size" || element.value == "globalWeight" || element.value == "objName" ||
+              element.value == "bucketName" || element.value == "creationTime" || element.value == "dirty" ||
+              element.value == "hosts" || element.value == "etag" || element.value == "objSize" ||
+              element.value == "userId" || element.value == "displayName") {
+            prev_val = element.value;
+            ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "() field key: " << prev_val << dendl;
+            field_key = false;
+            field_val = true;
+          }
+          continue;
+        } else {
+          ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "() field val: " << element.value << dendl;
+          if (prev_val == "blockID") {
+            block->blockID = std::stoull(element.value);
+          } else if (prev_val == "version") {
+            block->version = element.value;
+          } else if (prev_val == "deleteMarker") {
+            block->deleteMarker = (std::stoi(element.value) != 0);
+          } else if (prev_val == "size") {
+            block->size = std::stoull(element.value);
+          } else if (prev_val == "globalWeight") {
+            block->globalWeight = std::stoull(element.value);
+          } else if (prev_val == "objName") {
+            block->cacheObj.objName = element.value;
+          } else if (prev_val == "bucketName") {
+            block->cacheObj.bucketName = element.value;
+          } else if (prev_val == "creationTime") {
+            block->cacheObj.creationTime = element.value;
+          } else if (prev_val == "dirty") {
+            block->cacheObj.dirty = (std::stoi(element.value) != 0);
+          } else if (prev_val == "hosts") {
+            boost::split(block->cacheObj.hostsList, element.value, boost::is_any_of("_"));
+          } else if (prev_val == "etag") {
+            block->cacheObj.etag = element.value;
+          } else if (prev_val == "objSize") {
+            block->cacheObj.size = std::stoull(element.value);
+          } else if (prev_val == "userId") {
+            block->cacheObj.user_id = element.value;
+          } else if (prev_val == "displayName") {
+            block->cacheObj.display_name = element.value;
+          }
+          j++;
+          field_key= true;
+          field_val = false;
+          prev_val.clear();
+        }
+      }
+      if (j == num_elements) {
+        i++;
+        j = 0;
+      }
+    }
+  }
+  return 0;
+}
+
 /* Note: This method is not compatible for use on Ubuntu systems. */
 int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y)
 {
index 03ab2848fe063d7961683f68ba2f8b484b19d414..7f281b338b3b2859d95d038d103df3cc79110b05 100644 (file)
@@ -194,7 +194,10 @@ class BlockDirectory: public Directory {
 
     int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
     int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
-    //Pipelined version of get for list bucket
+    //Pipelined version of get using boost::redis::response for list bucket
+    template <size_t N = 100>
+    int get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
+    //Pipelined version of get using boost::redis::generic_response
     int get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
     int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
     int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false);
index 5c0537dbe87337e86ed78eb69d05f65e46074c1e..dafd2d9579ff446d93bec39c642e60ce641e68aa 100644 (file)
@@ -451,13 +451,13 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int
     } //end - else
 
     rgw::d4n::BlockDirectory* blockDir = this->filter->get_block_dir();
-    auto remainder_size = entries.size();
+    int remainder_size = entries.size();
     size_t j = 0, start_j = 0;
     while (remainder_size > 0) {
-      std::vector<rgw::d4n::CacheBlock> blocks(100);
+      auto batch_size = std::min(max, remainder_size);
+      std::vector<rgw::d4n::CacheBlock> blocks(batch_size);
       start_j = j;
-      size_t batch_size = std::min(static_cast<size_t>(100), remainder_size);
-      for (size_t i = 0; i < batch_size; i++) {
+      for (auto i = 0; i < batch_size; i++) {
         ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " objects[j]: " << entries[j].key.name << dendl;
         ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " remainder_size: " << remainder_size << dendl;
         if (entries[j].key.instance == "null") {
@@ -507,10 +507,10 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int
         start_j++;
       }
 
-      if (remainder_size <= 100) {
+      if (remainder_size <= max) {
         remainder_size = 0;
       } else {
-        remainder_size = remainder_size - 100;
+        remainder_size = remainder_size - max;
       }
     }
   } //d4n_write_cache_enabled = true
index 0c13e1db07bdf44cfc7656a325890ea9c68c77a3..2778abe9ab11564de63f0b8b4177d131e911ee24 100644 (file)
@@ -882,6 +882,14 @@ void foo(T t, std::vector<std::vector<std::string>>& responses)
     });
 }
 
+template <> struct fmt::formatter<boost::redis::resp3::type> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<boost::redis::resp3::node> : fmt::formatter<std::string> {
+    template <typename FormatContext> auto format(boost::redis::resp3::node const& node, FormatContext& ctx) const {
+        return format_to(ctx.out(), "({}@{}, {}, {})", node.data_type, node.depth, node.value,
+                         node.aggregate_size);
+    }
+};
+
 TEST_F(BlockDirectoryFixture, Pipeline)
 {
   boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
@@ -901,6 +909,7 @@ TEST_F(BlockDirectoryFixture, Pipeline)
       ASSERT_EQ((bool)ec, false);
     }
     {
+      //using boost::redis::response
       std::vector<std::string> fields;
       fields.push_back("name");
       request req;
@@ -920,6 +929,61 @@ TEST_F(BlockDirectoryFixture, Pipeline)
         }
       }
     }
+    {
+      //using boost::redis::generic_response
+      std::vector<std::string> fields;
+      fields.push_back("name");
+      request req;
+      req.push("HGETALL", "testkey1");
+      req.push("HGETALL", "testkey2");
+
+      ASSERT_EQ(req.get_commands(), 2);
+      boost::redis::generic_response resp;
+      conn->async_exec(req, resp, yield[ec]);
+      ASSERT_EQ((bool)ec, false);
+      //debug only
+      fmt::print("generic: {}\n", resp.value());
+
+      //1st node gives data type and number of elements of that type
+      //if data type is aggrgate, like array, map, then next n elements will be values of the aggregate type
+      std::unordered_map<std::string, std::unordered_map<std::string,std::string> > key_val_map;
+      auto i = 0, j = 0;
+      std::string key, fieldkey, fieldval;
+      int num_elements = 0;
+      for (auto& element : resp.value()) {
+        if (element.data_type == boost::redis::resp3::type::array || element.data_type == boost::redis::resp3::type::map) {
+          num_elements = element.aggregate_size;
+          if (j == 0) {
+            key = "testkey1";
+            j++;
+          } else {
+            key = "testkey2";
+          }
+          continue;
+        } else {
+          if (i < num_elements) {
+            fieldkey = element.value;
+            i++;
+          } else {
+            fieldval = element.value;
+            key_val_map.emplace(key, std::unordered_map<std::string,std::string>{{fieldkey,fieldval}});
+            key.clear();
+            fieldkey.clear();
+            fieldval.clear();
+            i = 0;
+          }
+        }
+      }
+      std::cout << "HGETALL response size is: " << key_val_map.size() << std::endl;
+      for (auto& it : key_val_map) {
+        std::cout << "key: " << it.first << std::endl;
+        std::unordered_map<std::string,std::string> field_key_val_map = it.second;
+        for (auto& inner_it : field_key_val_map) {
+          std::cout << "fieldkey: " << inner_it.first << std::endl;
+          std::cout << "fieldval: " << inner_it.second << std::endl;
+        }
+      }
+    }
     {
       boost::system::error_code ec;
       request req;