From f3d78c59883e07a315ca4b3ec0e1c81109e3b05a Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Tue, 29 Apr 2025 16:42:18 +0530 Subject: [PATCH] rgw/d4n: use boost::generic::response for bucket list operation. 1. test case to explore pipelining using boost::redis::generic_response 2. introduced a get method based on redis::generic_response and used the same for Bucket::list. Signed-off-by: Pritha Srivastava --- src/rgw/driver/d4n/d4n_directory.cc | 128 ++++++++++++++++++++++++++++ src/rgw/driver/d4n/d4n_directory.h | 5 +- src/rgw/driver/d4n/rgw_sal_d4n.cc | 12 +-- src/test/rgw/test_d4n_directory.cc | 64 ++++++++++++++ 4 files changed, 202 insertions(+), 7 deletions(-) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 52f01a0edcc84..e5979bba8e5c3 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -972,6 +972,10 @@ void parse_response(T t, std::vector>& responses) }); } +//explicit instantiation for 100 elements +template int BlockDirectory::get<100>(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); + +template int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y) { request req; @@ -1111,6 +1115,130 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, option return 0; } +int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y) +{ + boost::redis::generic_response resp; + request req; + for (auto block : blocks) { + std::string key = build_index(&block); + std::vector fields; + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; + + fields.push_back("blockID"); + fields.push_back("version"); + fields.push_back("deleteMarker"); + fields.push_back("size"); + fields.push_back("globalWeight"); + + fields.push_back("objName"); + fields.push_back("bucketName"); + fields.push_back("creationTime"); + fields.push_back("dirty"); + fields.push_back("hosts"); + fields.push_back("etag"); + fields.push_back("objSize"); + fields.push_back("userId"); + fields.push_back("displayName"); + + try { + req.push("HGETALL", key); + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + } //end - for + + try { + boost::system::error_code ec; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + //i is used to index blocks + //j is used to keep a track of number of elements for aggregate type map or array + auto i = 0, j = 0; + bool field_key=true, field_val=false; + std::string key, fieldkey, fieldval, prev_val; + int num_elements = 0; + for (auto& element : resp.value()) { + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): i is: " << i << dendl; + CacheBlock* block = &blocks[i]; + std::string key = build_index(block); + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; + if (element.data_type == boost::redis::resp3::type::array || element.data_type == boost::redis::resp3::type::map) { + num_elements = element.aggregate_size; + if (num_elements == 0) { + i++; + j = 0; + } + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "() num_elements: " << num_elements << dendl; + continue; + } else { + if (j < num_elements) { + if (field_key && !field_val) { + if (element.value == "blockID" || element.value == "version" || element.value == "deleteMarker" || + element.value == "size" || element.value == "globalWeight" || element.value == "objName" || + element.value == "bucketName" || element.value == "creationTime" || element.value == "dirty" || + element.value == "hosts" || element.value == "etag" || element.value == "objSize" || + element.value == "userId" || element.value == "displayName") { + prev_val = element.value; + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "() field key: " << prev_val << dendl; + field_key = false; + field_val = true; + } + continue; + } else { + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "() field val: " << element.value << dendl; + if (prev_val == "blockID") { + block->blockID = std::stoull(element.value); + } else if (prev_val == "version") { + block->version = element.value; + } else if (prev_val == "deleteMarker") { + block->deleteMarker = (std::stoi(element.value) != 0); + } else if (prev_val == "size") { + block->size = std::stoull(element.value); + } else if (prev_val == "globalWeight") { + block->globalWeight = std::stoull(element.value); + } else if (prev_val == "objName") { + block->cacheObj.objName = element.value; + } else if (prev_val == "bucketName") { + block->cacheObj.bucketName = element.value; + } else if (prev_val == "creationTime") { + block->cacheObj.creationTime = element.value; + } else if (prev_val == "dirty") { + block->cacheObj.dirty = (std::stoi(element.value) != 0); + } else if (prev_val == "hosts") { + boost::split(block->cacheObj.hostsList, element.value, boost::is_any_of("_")); + } else if (prev_val == "etag") { + block->cacheObj.etag = element.value; + } else if (prev_val == "objSize") { + block->cacheObj.size = std::stoull(element.value); + } else if (prev_val == "userId") { + block->cacheObj.user_id = element.value; + } else if (prev_val == "displayName") { + block->cacheObj.display_name = element.value; + } + j++; + field_key= true; + field_val = false; + prev_val.clear(); + } + } + if (j == num_elements) { + i++; + j = 0; + } + } + } + return 0; +} + /* Note: This method is not compatible for use on Ubuntu systems. */ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y) { diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 03ab2848fe063..7f281b338b3b2 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -194,7 +194,10 @@ class BlockDirectory: public Directory { int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); - //Pipelined version of get for list bucket + //Pipelined version of get using boost::redis::response for list bucket + template + int get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); + //Pipelined version of get using boost::redis::generic_response int get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 5c0537dbe8733..dafd2d9579ff4 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -451,13 +451,13 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int } //end - else rgw::d4n::BlockDirectory* blockDir = this->filter->get_block_dir(); - auto remainder_size = entries.size(); + int remainder_size = entries.size(); size_t j = 0, start_j = 0; while (remainder_size > 0) { - std::vector blocks(100); + auto batch_size = std::min(max, remainder_size); + std::vector blocks(batch_size); start_j = j; - size_t batch_size = std::min(static_cast(100), remainder_size); - for (size_t i = 0; i < batch_size; i++) { + for (auto i = 0; i < batch_size; i++) { ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " objects[j]: " << entries[j].key.name << dendl; ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " remainder_size: " << remainder_size << dendl; if (entries[j].key.instance == "null") { @@ -507,10 +507,10 @@ int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int start_j++; } - if (remainder_size <= 100) { + if (remainder_size <= max) { remainder_size = 0; } else { - remainder_size = remainder_size - 100; + remainder_size = remainder_size - max; } } } //d4n_write_cache_enabled = true diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 0c13e1db07bdf..2778abe9ab115 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -882,6 +882,14 @@ void foo(T t, std::vector>& responses) }); } +template <> struct fmt::formatter : fmt::ostream_formatter {}; +template <> struct fmt::formatter : fmt::formatter { + template auto format(boost::redis::resp3::node const& node, FormatContext& ctx) const { + return format_to(ctx.out(), "({}@{}, {}, {})", node.data_type, node.depth, node.value, + node.aggregate_size); + } +}; + TEST_F(BlockDirectoryFixture, Pipeline) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { @@ -901,6 +909,7 @@ TEST_F(BlockDirectoryFixture, Pipeline) ASSERT_EQ((bool)ec, false); } { + //using boost::redis::response std::vector fields; fields.push_back("name"); request req; @@ -920,6 +929,61 @@ TEST_F(BlockDirectoryFixture, Pipeline) } } } + { + //using boost::redis::generic_response + std::vector fields; + fields.push_back("name"); + request req; + req.push("HGETALL", "testkey1"); + req.push("HGETALL", "testkey2"); + + ASSERT_EQ(req.get_commands(), 2); + boost::redis::generic_response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + //debug only + fmt::print("generic: {}\n", resp.value()); + + //1st node gives data type and number of elements of that type + //if data type is aggrgate, like array, map, then next n elements will be values of the aggregate type + std::unordered_map > key_val_map; + auto i = 0, j = 0; + std::string key, fieldkey, fieldval; + int num_elements = 0; + for (auto& element : resp.value()) { + if (element.data_type == boost::redis::resp3::type::array || element.data_type == boost::redis::resp3::type::map) { + num_elements = element.aggregate_size; + if (j == 0) { + key = "testkey1"; + j++; + } else { + key = "testkey2"; + } + continue; + } else { + if (i < num_elements) { + fieldkey = element.value; + i++; + } else { + fieldval = element.value; + key_val_map.emplace(key, std::unordered_map{{fieldkey,fieldval}}); + key.clear(); + fieldkey.clear(); + fieldval.clear(); + i = 0; + } + } + } + std::cout << "HGETALL response size is: " << key_val_map.size() << std::endl; + for (auto& it : key_val_map) { + std::cout << "key: " << it.first << std::endl; + std::unordered_map field_key_val_map = it.second; + for (auto& inner_it : field_key_val_map) { + std::cout << "fieldkey: " << inner_it.first << std::endl; + std::cout << "fieldval: " << inner_it.second << std::endl; + } + } + } { boost::system::error_code ec; request req; -- 2.39.5