]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/directory: support for ordered set in both ObjectDirectory
authorPritha Srivastava <prsrivas@redhat.com>
Wed, 25 Sep 2024 04:16:34 +0000 (09:46 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
and BlockDirectory.

Adds methods to add, list a range and remove members from a
redis ordered set ordered by score passed in. Also adds unit
test cases to test the methods.

Samarah has fixed the arguments passed in to zrevrange in
ZAddZRemYield and ZAddZRevRangeYield tests.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.h
src/test/rgw/test_d4n_directory.cc

index 2e2f71a55e810fe61c2d96faf260f50e2af64bf8..a833565aaa43c48497a9def2edf0de1a1fdc77a8 100644 (file)
@@ -319,6 +319,128 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec
   }
 }
 
+int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y)
+{
+  std::string key = build_index(object);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZADD", key, "CH", std::to_string(score), member);
+
+    response<std::string> resp;
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "1") {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+
+}
+
+int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+  std::string key = build_index(object);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
+
+    response<std::vector<std::string> > resp;
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value().empty()) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
+      return -EINVAL;
+    }
+
+    members = std::get<0>(resp).value();
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+  std::string key = build_index(object);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop));
+
+    response<std::vector<std::string> > resp;
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value().empty()) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
+      return -EINVAL;
+    }
+
+    members = std::get<0>(resp).value();
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y)
+{
+  std::string key = build_index(object);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZREM", key, member);
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "1") {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+
 std::string BlockDirectory::build_index(CacheBlock* block) 
 {
   return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
@@ -674,4 +796,125 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block
   return 0;
 }
 
+int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y)
+{
+  std::string key = build_index(block);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZADD", key, "CH", std::to_string(score), member);
+
+    response<std::string> resp;
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "1") {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+
+}
+
+int BlockDirectory::zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+  std::string key = build_index(block);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
+
+    response<std::vector<std::string> > resp;
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value().empty()) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl;
+      return -EINVAL;
+    }
+
+    members = std::get<0>(resp).value();
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int BlockDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+  std::string key = build_index(block);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop));
+
+    response<std::vector<std::string> > resp;
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value().empty()) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl;
+      return -EINVAL;
+    }
+
+    members = std::get<0>(resp).value();
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y)
+{
+  std::string key = build_index(block);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("ZREM", key, member);
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "1") {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
 } } // namespace rgw::d4n
index af99ba557f6ccdffc606ea4894961ad046bfe12b..82419da530bef526fad5c000711ca79eb300fed8 100644 (file)
@@ -48,6 +48,10 @@ class ObjectDirectory: public Directory {
     int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
     int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
     int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y);
+    int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y);
+    int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
+    int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
+    int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y);
 
   private:
     std::shared_ptr<connection> conn;
@@ -67,6 +71,10 @@ class BlockDirectory: public Directory {
     int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
     int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
     int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
+    int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y);
+    int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
+    int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
+    int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y);
 
   private:
     std::shared_ptr<connection> conn;
index b23ee407354de1b04aa6bd44eecf66d46564ad02..8699e39a5a01591fd64c964dc2f0408319710167 100644 (file)
@@ -62,7 +62,7 @@ class CachePolicy {
     virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0;
     virtual int exist_key(std::string key) = 0;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
-    virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0;
+    virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0;
     virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, 
                            time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
                            const rgw_obj_key& obj_key, optional_yield y) = 0;
index 64230bd3df76fe8b8bbaef529fc6e03b160537cb..6fe56eeb9d8cef2564f23fe2259dd8df4864fee8 100644 (file)
@@ -295,6 +295,100 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield)
   io.run();
 }
 
+TEST_F(ObjectDirectoryFixture, ZAddYield)
+{
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    auto m_time = real_clock::now();
+    auto score = ceph::real_clock::to_double(m_time);
+    std::string version = "v1";
+    ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+    {
+      boost::system::error_code ec;
+      request req;
+      req.push("FLUSHALL");
+      response<boost::redis::ignore_t> resp;
+      conn->async_exec(req, resp, yield[ec]);
+      ASSERT_EQ((bool)ec, false);
+    }
+    conn->cancel();
+  }, rethrow);
+
+  io.run();
+}
+
+TEST_F(ObjectDirectoryFixture, ZAddZRevRangeYield)
+{
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    {
+      auto m_time = real_clock::now();
+      auto score = ceph::real_clock::to_double(m_time);
+      std::string version = "v2";
+      ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+    }
+    {
+      auto m_time = real_clock::now();
+      auto score = ceph::real_clock::to_double(m_time);
+      std::string version = "v1";
+      ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+    }
+    {
+      std::vector<std::string> members;
+      ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield));
+      ASSERT_EQ(1, members.size());
+      ASSERT_EQ("v1", members[0]);
+    }
+
+    {
+      boost::system::error_code ec;
+      request req;
+      req.push("FLUSHALL");
+      response<boost::redis::ignore_t> resp;
+      conn->async_exec(req, resp, yield[ec]);
+      ASSERT_EQ((bool)ec, false);
+    }
+    conn->cancel();
+  }, rethrow);
+
+  io.run();
+}
+
+TEST_F(ObjectDirectoryFixture, ZAddZRemYield)
+{
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    {
+      auto m_time = real_clock::now();
+      auto score = ceph::real_clock::to_double(m_time);
+      std::cout << "Score for v1: " << score << std::endl;
+      std::string version = "v1";
+      ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+    }
+    {
+      auto m_time = real_clock::now();
+      auto score = ceph::real_clock::to_double(m_time);
+      std::cout << "Score for v2: " << score << std::endl;
+      std::string version = "v2";
+      ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+    }
+    {
+      ASSERT_EQ(0, dir->zrem(env->dpp, obj, "v2", yield));
+      std::vector<std::string> members;
+      ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield));
+      ASSERT_EQ(1, members.size());
+      ASSERT_EQ("v1", members[0]);
+    }
+    {
+      boost::system::error_code ec;
+      request req;
+      req.push("FLUSHALL");
+      response<boost::redis::ignore_t> resp;
+      conn->async_exec(req, resp, yield[ec]);
+      ASSERT_EQ((bool)ec, false);
+    }
+    conn->cancel();
+  }, rethrow);
+
+  io.run();
+}
 
 TEST_F(BlockDirectoryFixture, SetYield)
 {