From: Pritha Srivastava Date: Wed, 25 Sep 2024 04:16:34 +0000 (+0530) Subject: rgw/directory: support for ordered set in both ObjectDirectory X-Git-Tag: v20.3.0~8^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0c640739ff9d5830e450242dd79c6bd06793f4d9;p=ceph.git rgw/directory: support for ordered set in both ObjectDirectory and BlockDirectory. Adds methods to add, list a range and remove members from a redis ordered set ordered by score passed in. Also adds unit test cases to test the methods. Samarah has fixed the arguments passed in to zrevrange in ZAddZRemYield and ZAddZRevRangeYield tests. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 2e2f71a55e81..a833565aaa43 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -319,6 +319,128 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec } } +int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZADD", key, "CH", std::to_string(score), member); + + response resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; + +} + +int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZREM", key, member); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + + std::string BlockDirectory::build_index(CacheBlock* block) { return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); @@ -674,4 +796,125 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block return 0; } +int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZADD", key, "CH", std::to_string(score), member); + + response resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; + +} + +int BlockDirectory::zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BlockDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop)); + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl; + return -EINVAL; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y) +{ + std::string key = build_index(block); + try { + boost::system::error_code ec; + request req; + req.push("ZREM", key, member); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index af99ba557f6c..82419da530be 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -48,6 +48,10 @@ class ObjectDirectory: public Directory { int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y); + int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y); + int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); + int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); + int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y); private: std::shared_ptr conn; @@ -67,6 +71,10 @@ class BlockDirectory: public Directory { int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y); int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y); + int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y); + int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); + int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); + int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y); private: std::shared_ptr conn; diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index b23ee407354d..8699e39a5a01 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -62,7 +62,7 @@ class CachePolicy { virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0; virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; - virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0; + virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0; virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) = 0; diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 64230bd3df76..6fe56eeb9d8c 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -295,6 +295,100 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield) io.run(); } +TEST_F(ObjectDirectoryFixture, ZAddYield) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::string version = "v1"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + conn->cancel(); + }, rethrow); + + io.run(); +} + +TEST_F(ObjectDirectoryFixture, ZAddZRevRangeYield) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::string version = "v2"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::string version = "v1"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + std::vector members; + ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield)); + ASSERT_EQ(1, members.size()); + ASSERT_EQ("v1", members[0]); + } + + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + conn->cancel(); + }, rethrow); + + io.run(); +} + +TEST_F(ObjectDirectoryFixture, ZAddZRemYield) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::cout << "Score for v1: " << score << std::endl; + std::string version = "v1"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + auto m_time = real_clock::now(); + auto score = ceph::real_clock::to_double(m_time); + std::cout << "Score for v2: " << score << std::endl; + std::string version = "v2"; + ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield)); + } + { + ASSERT_EQ(0, dir->zrem(env->dpp, obj, "v2", yield)); + std::vector members; + ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield)); + ASSERT_EQ(1, members.size()); + ASSERT_EQ("v1", members[0]); + } + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + conn->cancel(); + }, rethrow); + + io.run(); +} TEST_F(BlockDirectoryFixture, SetYield) {