From 0c640739ff9d5830e450242dd79c6bd06793f4d9 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Wed, 25 Sep 2024 09:46:34 +0530 Subject: [PATCH] rgw/directory: support for ordered set in both ObjectDirectory and BlockDirectory. Adds methods to add, list a range and remove members from a redis ordered set ordered by score passed in. Also adds unit test cases to test the methods. Samarah has fixed the arguments passed in to zrevrange in ZAddZRemYield and ZAddZRevRangeYield tests. Signed-off-by: Pritha Srivastava --- src/rgw/driver/d4n/d4n_directory.cc | 243 ++++++++++++++++++++++++++++ src/rgw/driver/d4n/d4n_directory.h | 8 + src/rgw/driver/d4n/d4n_policy.h | 2 +- src/test/rgw/test_d4n_directory.cc | 94 +++++++++++ 4 files changed, 346 insertions(+), 1 deletion(-) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 2e2f71a55e8..a833565aaa4 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -319,6 +319,128 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec } } +int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZADD", key, "CH", std::to_string(score), member); + + response resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; + +} + +int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZREM", key, member); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + + std::string BlockDirectory::build_index(CacheBlock* block) { return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); @@ -674,4 +796,125 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block return 0; } +int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZADD", key, "CH", std::to_string(score), member); + + response resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; + +} + +int BlockDirectory::zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BlockDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZREM", key, member); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index af99ba557f6..82419da530b 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -48,6 +48,10 @@ class ObjectDirectory: public Directory { int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y); + int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y); + int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); + int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); + int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y); private: std::shared_ptr conn; @@ -67,6 +71,10 @@ class BlockDirectory: public Directory { int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y); int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y); + int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y); + int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); + int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); + int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y); private: std::shared_ptr conn; diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index b23ee407354..8699e39a5a0 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -62,7 +62,7 @@ class CachePolicy { virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0; virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; - virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0; + virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0; virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) = 0; diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 64230bd3df7..6fe56eeb9d8 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -295,6 +295,100 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield) io.run(); } +TEST_F(ObjectDirectoryFixture, ZAddYield) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::string version = "v1"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + conn->cancel(); + }, rethrow); + + io.run(); +} + +TEST_F(ObjectDirectoryFixture, ZAddZRevRangeYield) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::string version = "v2"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::string version = "v1"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + std::vector members; + ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield)); + ASSERT_EQ(1, members.size()); + ASSERT_EQ("v1", members[0]); + } + + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + conn->cancel(); + }, rethrow); + + io.run(); +} + +TEST_F(ObjectDirectoryFixture, ZAddZRemYield) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::cout << "Score for v1: " << score << std::endl; + std::string version = "v1"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::cout << "Score for v2: " << score << std::endl; + std::string version = "v2"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + ASSERT_EQ(0, dir->zrem(env->dpp, obj, "v2", yield)); + std::vector members; + ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield)); + ASSERT_EQ(1, members.size()); + ASSERT_EQ("v1", members[0]); + } + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + conn->cancel(); + }, rethrow); + + io.run(); +} TEST_F(BlockDirectoryFixture, SetYield) { -- 2.39.5