]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: squashing following commits that add support for
authorPritha Srivastava <prsrivas@redhat.com>
Thu, 26 Sep 2024 06:06:47 +0000 (11:36 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
WATCH, EXEC, INCR, MULTI and DISCARD redis commands.

1. d4n/directory: support for watch, exec and incr methods in Block
and Object Directory.
2. d4n/directory: support for redis MULTI and DISCARD command,
adding a test case to exercise usage of MULTI.

Co-authored-by: Samarah <samarah.uriarte@ibm.com>
Adjusted log levels for directory methods added in this commit.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/test/rgw/test_d4n_directory.cc

index a833565aaa43c48497a9def2edf0de1a1fdc77a8..feee1a844723db15495f32cae0417fb9716129da 100644 (file)
@@ -48,6 +48,19 @@ void redis_exec(std::shared_ptr<connection> conn,
   }
 }
 
+void redis_exec(std::shared_ptr<connection> conn,
+                boost::system::error_code& ec,
+                const boost::redis::request& req,
+                boost::redis::generic_response& resp, optional_yield y)
+{
+  if (y) {
+    auto yield = y.get_yield_context();
+    async_exec(std::move(conn), req, resp, yield[ec]);
+  } else {
+    async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
+  }
+}
+
 int check_bool(std::string str) {
   if (str == "true" || str == "1") {
     return 1;
@@ -87,7 +100,7 @@ int ObjectDirectory::exist_key(const DoutPrefixProvider* dpp, CacheObj* object,
   return std::get<0>(resp).value();
 }
 
-int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y) 
+int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
 {
   /* For existing keys, call get method beforehand. 
      Sets completely overwrite existing values. */
@@ -440,6 +453,33 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const
   return 0;
 }
 
+int ObjectDirectory::incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
+{
+  std::string key = build_index(object);
+  key = key + "_versioned_epoch";
+  uint64_t value;
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("INCR", key);
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    value = std::stoull(std::get<0>(resp).value());
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return value;
+}
 
 std::string BlockDirectory::build_index(CacheBlock* block) 
 {
@@ -470,7 +510,7 @@ int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block,
   return std::get<0>(resp).value();
 }
 
-int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) 
+int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
 {
   /* For existing keys, call get method beforehand. 
      Sets completely overwrite existing values. */
@@ -534,7 +574,6 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option
     req.push_range("HSET", key, redisValues);
 
     redis_exec(conn, ec, req, resp, y);
-
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
       return -ec.value();
@@ -579,7 +618,7 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, option
     }
 
     if (std::get<0>(resp).value().value().empty()) {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values returned for key=" << key << dendl;
+      ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values returned for key=" << key << dendl;
       return -ENOENT;
     } 
 
@@ -632,7 +671,7 @@ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const
     if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) {
       return 0;
     } else {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values copied." << dendl;
+      ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values copied." << dendl;
       return -ENOENT;
     }
   } catch (std::exception &e) {
@@ -641,30 +680,34 @@ int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const
   }
 }
 
-int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) 
+int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi
 {
   std::string key = build_index(block);
   ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
 
   try {
     boost::system::error_code ec;
-    response<int> resp;
     request req;
     req.push("DEL", key);
-
-    redis_exec(conn, ec, req, resp, y);
-
-    if (!std::get<0>(resp).value()) {
-      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
-      return -ENOENT;
-    }
-
+    if (!multi) {
+      response<int> resp;
+      redis_exec(conn, ec, req, resp, y);
+      if (!std::get<0>(resp).value()) {
+        ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
+        return -ENOENT;
+      }
+    } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string
+      response<std::string> resp;
+      redis_exec(conn, ec, req, resp, y);
+      }
     if (ec) {
       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl;
       return -ec.value();
     }
   } catch (std::exception &e) {
     ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << std::endl;
     return -EINVAL;
   }
 
@@ -759,7 +802,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block
       if (it != std::string::npos) { 
        result.erase(result.begin() + it, result.begin() + it + value.size());
       } else {
-       ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): Host was not found." << dendl;
+       ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): Host was not found." << dendl;
        return -ENOENT;
       }
 
@@ -917,4 +960,120 @@ int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const
   return 0;
 }
 
+int BlockDirectory::watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
+{
+  std::string key = build_index(block);
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("WATCH", key);
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "OK") {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int BlockDirectory::exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y)
+{
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("EXEC");
+    boost::redis::generic_response resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl;
+      return -ec.value();
+    }
+
+    for (uint64_t i = 0; i < resp.value().size(); i++) {
+      ldpp_dout(dpp, 20) << "BlockDirectory::" << __func__ << "() MULTI: " << resp.value().front().value << dendl;
+      responses.emplace_back(resp.value().front().value);
+      boost::redis::consume_one(resp);
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << std::endl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int BlockDirectory::multi(const DoutPrefixProvider* dpp, optional_yield y)
+{
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("MULTI");
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "OK") {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y)
+{
+  try {
+    boost::system::error_code ec;
+    request req;
+    req.push("DISCARD");
+    response<std::string> resp;
+
+    redis_exec(conn, ec, req, resp, y);
+
+    if (ec) {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+      return -ec.value();
+    }
+
+    if (std::get<0>(resp).value() != "OK") {
+      ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+      return -EINVAL;
+    }
+
+  } catch (std::exception &e) {
+    ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
 } } // namespace rgw::d4n
index 82419da530bef526fad5c000711ca79eb300fed8..1a262b38eb2d198f79aa5e5d2e0ff566f02f2820 100644 (file)
@@ -52,6 +52,8 @@ class ObjectDirectory: public Directory {
     int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y);
+    //Return value is the incremented value, else return error
+    int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
 
   private:
     std::shared_ptr<connection> conn;
@@ -68,17 +70,20 @@ class BlockDirectory: public Directory {
     int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
     int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
     int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
-    int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+    int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false);
     int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
     int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
     int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y);
     int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
     int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y);
+    int watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+    int exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y);
+    int multi(const DoutPrefixProvider* dpp, optional_yield y);
+    int discard(const DoutPrefixProvider* dpp, optional_yield y);
 
   private:
     std::shared_ptr<connection> conn;
-
     std::string build_index(CacheBlock* block);
 };
 
index 6fe56eeb9d8cef2564f23fe2259dd8df4864fee8..310071db5bf0beb7cdf1bb9a6fdb9a614ff606b2 100644 (file)
@@ -595,6 +595,170 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield)
   io.run();
 }
 
+TEST_F(BlockDirectoryFixture, WatchExecuteYield)
+{
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+  {
+    boost::system::error_code ec;
+    request req;
+    req.push("WATCH", "testBucket");
+    response<std::string> resp;
+
+    conn->async_exec(req, resp, yield[ec]);
+    ASSERT_EQ((bool)ec, false);
+
+    // The number of members added
+    EXPECT_EQ(std::get<0>(resp).value(), "OK");
+  }
+
+  {
+      boost::system::error_code ec;
+      request req;
+      req.push("HSET", "testBucket", "objName", "newoid");
+      response<int> resp;
+
+      conn->async_exec(req, resp, yield[ec]);
+
+      ASSERT_EQ((bool)ec, false);
+      EXPECT_EQ(std::get<0>(resp).value(), 1);
+  }
+
+  {
+      boost::system::error_code ec;
+      request req;
+      req.push("EXEC");
+      response<std::vector<std::string> > resp;
+
+      conn->async_exec(req, resp, yield[ec]);
+
+      ASSERT_EQ((bool)ec, false);
+  }
+
+  {
+      boost::system::error_code ec;
+      request req;
+      req.push("FLUSHALL");
+      response<boost::redis::ignore_t> resp;
+
+      conn->async_exec(req, resp, yield[ec]);
+  }
+
+  conn->cancel();
+  }, rethrow);
+
+  io.run();
+}
+
+TEST_F(BlockDirectoryFixture, IncrYield)
+{
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    for (int i = 0; i < 10; i++) {
+      {
+        boost::system::error_code ec;
+        request req;
+        req.push("INCR", "testObject");
+        response<std::string> resp;
+
+        conn->async_exec(req, resp, yield[ec]);
+        ASSERT_EQ((bool)ec, false);
+        std::cout << "thread id: " << std::this_thread::get_id() << std::endl;
+        std::cout << "INCR value: " << std::get<0>(resp).value() << std::endl;
+      }
+    }
+    boost::asio::post(conn->get_executor(), [c = conn] { c->cancel(); });
+  }, rethrow);
+
+  std::vector<std::thread> threads;
+
+  for (int i = 0; i < 10; ++i) {
+    threads.emplace_back([&] { io.run(); });
+  }
+  for (auto& thread : threads) {
+    thread.join();
+  }
+}
+
+TEST_F(BlockDirectoryFixture, MultiExecuteYield)
+{
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    {
+      boost::system::error_code ec;
+      {
+        request req;
+        response<std::string> resp;
+        req.push("MULTI");                       // Start transaction
+        conn->async_exec(req, resp, yield[ec]);
+        ASSERT_EQ((bool)ec, false);
+        std::cout << "MULTI value: " << std::get<0>(resp).value() << std::endl;
+      }
+      {
+        request req;
+        response<std::string> resp;
+        req.push("SET", "key1", "value1");       // Command 1
+        conn->async_exec(req, resp, yield[ec]);
+        ASSERT_EQ((bool)ec, false);
+        std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
+      }
+      {
+        request req;
+        response<std::string> resp;
+        req.push("SET", "key2", "value2");       // Command 2
+        conn->async_exec(req, resp, yield[ec]);
+        ASSERT_EQ((bool)ec, false);
+        std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
+      }
+      {
+        request req;
+        //string as response here as the command is only getting queued, not executed
+        //if response type is changed to int then the operation fails
+        response<std::string> resp;
+        req.push("DEL", "key3");                  // Command 3
+        conn->async_exec(req, resp, yield[ec]);
+        ASSERT_EQ((bool)ec, false);
+        std::cout << "DEL value: " << std::get<0>(resp).value() << std::endl;
+      }
+      {
+        request req;
+        req.push("EXEC");                        // Execute transaction
+
+        boost::redis::generic_response resp;
+        conn->async_exec(req, resp, yield[ec]);
+        ASSERT_EQ((bool)ec, false);
+        for (uint64_t i = 0; i < resp.value().size(); i++) {
+          std::cout << "EXEC: " << resp.value().front().value << std::endl;
+          boost::redis::consume_one(resp);
+        }
+      }
+    }
+    //test multi/exec using directory methods
+    {
+      ASSERT_EQ(0, dir->multi(env->dpp, optional_yield{yield}));
+      ASSERT_EQ(0, dir->set(env->dpp, block, yield));
+      block->cacheObj.objName = "testBlockNew";
+      ASSERT_EQ(0, dir->set(env->dpp, block, yield));
+      block->cacheObj.objName = "testBlockA";
+      ASSERT_EQ(0, dir->del(env->dpp, block, yield, true));
+      std::vector<std::string> responses;
+      ASSERT_EQ(0, dir->exec(env->dpp, responses, optional_yield{yield}));
+      for (auto r : responses) {
+        std::cout << "EXEC: " << r << std::endl;
+      }
+    }
+    {
+      boost::system::error_code ec;
+      request req;
+      req.push("FLUSHALL");
+      response<boost::redis::ignore_t> resp;
+
+      conn->async_exec(req, resp, yield[ec]);
+    }
+
+    conn->cancel();
+  }, rethrow);
+
+  io.run();
+}
+
 int main(int argc, char *argv[]) {
   ::testing::InitGoogleTest(&argc, argv);