WATCH, EXEC, INCR, MULTI and DISCARD redis commands.
1. d4n/directory: support for watch, exec and incr methods in Block
and Object Directory.
2. d4n/directory: support for redis MULTI and DISCARD command,
adding a test case to exercise usage of MULTI.
Co-authored-by: Samarah <samarah.uriarte@ibm.com>
Adjusted log levels for directory methods added in this commit.
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
}
}
+void redis_exec(std::shared_ptr<connection> conn,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::generic_response& resp, optional_yield y)
+{
+ if (y) {
+ auto yield = y.get_yield_context();
+ async_exec(std::move(conn), req, resp, yield[ec]);
+ } else {
+ async_exec(std::move(conn), req, resp, ceph::async::use_blocked[ec]);
+ }
+}
+
int check_bool(std::string str) {
if (str == "true" || str == "1") {
return 1;
return std::get<0>(resp).value();
}
-int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
+int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
{
/* For existing keys, call get method beforehand.
Sets completely overwrite existing values. */
return 0;
}
+int ObjectDirectory::incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
+{
+ std::string key = build_index(object);
+ key = key + "_versioned_epoch";
+ uint64_t value;
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("INCR", key);
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ value = std::stoull(std::get<0>(resp).value());
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return value;
+}
std::string BlockDirectory::build_index(CacheBlock* block)
{
return std::get<0>(resp).value();
}
-int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
+int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
{
/* For existing keys, call get method beforehand.
Sets completely overwrite existing values. */
req.push_range("HSET", key, redisValues);
redis_exec(conn, ec, req, resp, y);
-
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
if (std::get<0>(resp).value().value().empty()) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values returned for key=" << key << dendl;
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values returned for key=" << key << dendl;
return -ENOENT;
}
if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) {
return 0;
} else {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values copied." << dendl;
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values copied." << dendl;
return -ENOENT;
}
} catch (std::exception &e) {
}
}
-int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
+int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi)
{
std::string key = build_index(block);
ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
try {
boost::system::error_code ec;
- response<int> resp;
request req;
req.push("DEL", key);
-
- redis_exec(conn, ec, req, resp, y);
-
- if (!std::get<0>(resp).value()) {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
- return -ENOENT;
- }
-
+ if (!multi) {
+ response<int> resp;
+ redis_exec(conn, ec, req, resp, y);
+ if (!std::get<0>(resp).value()) {
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
+ return -ENOENT;
+ }
+ } else { //if delete is called as part of a transaction, the command will be queued, hence the response will be a string
+ response<std::string> resp;
+ redis_exec(conn, ec, req, resp, y);
+ }
if (ec) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl;
return -ec.value();
}
} catch (std::exception &e) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << std::endl;
return -EINVAL;
}
if (it != std::string::npos) {
result.erase(result.begin() + it, result.begin() + it + value.size());
} else {
- ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): Host was not found." << dendl;
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): Host was not found." << dendl;
return -ENOENT;
}
return 0;
}
+int BlockDirectory::watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
+{
+ std::string key = build_index(block);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("WATCH", key);
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != "OK") {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BlockDirectory::exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("EXEC");
+ boost::redis::generic_response resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << std::endl;
+ return -ec.value();
+ }
+
+ for (uint64_t i = 0; i < resp.value().size(); i++) {
+ ldpp_dout(dpp, 20) << "BlockDirectory::" << __func__ << "() MULTI: " << resp.value().front().value << dendl;
+ responses.emplace_back(resp.value().front().value);
+ boost::redis::consume_one(resp);
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ std::cout << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << std::endl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BlockDirectory::multi(const DoutPrefixProvider* dpp, optional_yield y)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("MULTI");
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != "OK") {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("DISCARD");
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != "OK") {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
} } // namespace rgw::d4n
int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y);
+ //Return value is the incremented value, else return error
+ int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
private:
std::shared_ptr<connection> conn;
int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
- int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+ int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false);
int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y);
int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y);
+ int watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+ int exec(const DoutPrefixProvider* dpp, std::vector<std::string>& responses, optional_yield y);
+ int multi(const DoutPrefixProvider* dpp, optional_yield y);
+ int discard(const DoutPrefixProvider* dpp, optional_yield y);
private:
std::shared_ptr<connection> conn;
-
std::string build_index(CacheBlock* block);
};
io.run();
}
+TEST_F(BlockDirectoryFixture, WatchExecuteYield)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("WATCH", "testBucket");
+ response<std::string> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+
+ // The number of members added
+ EXPECT_EQ(std::get<0>(resp).value(), "OK");
+ }
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("HSET", "testBucket", "objName", "newoid");
+ response<int> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ EXPECT_EQ(std::get<0>(resp).value(), 1);
+ }
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("EXEC");
+ response<std::vector<std::string> > resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ }
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("FLUSHALL");
+ response<boost::redis::ignore_t> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+ }
+
+ conn->cancel();
+ }, rethrow);
+
+ io.run();
+}
+
+TEST_F(BlockDirectoryFixture, IncrYield)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ for (int i = 0; i < 10; i++) {
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("INCR", "testObject");
+ response<std::string> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::cout << "thread id: " << std::this_thread::get_id() << std::endl;
+ std::cout << "INCR value: " << std::get<0>(resp).value() << std::endl;
+ }
+ }
+ boost::asio::post(conn->get_executor(), [c = conn] { c->cancel(); });
+ }, rethrow);
+
+ std::vector<std::thread> threads;
+
+ for (int i = 0; i < 10; ++i) {
+ threads.emplace_back([&] { io.run(); });
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+TEST_F(BlockDirectoryFixture, MultiExecuteYield)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ {
+ boost::system::error_code ec;
+ {
+ request req;
+ response<std::string> resp;
+ req.push("MULTI"); // Start transaction
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::cout << "MULTI value: " << std::get<0>(resp).value() << std::endl;
+ }
+ {
+ request req;
+ response<std::string> resp;
+ req.push("SET", "key1", "value1"); // Command 1
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
+ }
+ {
+ request req;
+ response<std::string> resp;
+ req.push("SET", "key2", "value2"); // Command 2
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::cout << "SET value: " << std::get<0>(resp).value() << std::endl;
+ }
+ {
+ request req;
+ //string as response here as the command is only getting queued, not executed
+ //if response type is changed to int then the operation fails
+ response<std::string> resp;
+ req.push("DEL", "key3"); // Command 3
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::cout << "DEL value: " << std::get<0>(resp).value() << std::endl;
+ }
+ {
+ request req;
+ req.push("EXEC"); // Execute transaction
+
+ boost::redis::generic_response resp;
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ for (uint64_t i = 0; i < resp.value().size(); i++) {
+ std::cout << "EXEC: " << resp.value().front().value << std::endl;
+ boost::redis::consume_one(resp);
+ }
+ }
+ }
+ //test multi/exec using directory methods
+ {
+ ASSERT_EQ(0, dir->multi(env->dpp, optional_yield{yield}));
+ ASSERT_EQ(0, dir->set(env->dpp, block, yield));
+ block->cacheObj.objName = "testBlockNew";
+ ASSERT_EQ(0, dir->set(env->dpp, block, yield));
+ block->cacheObj.objName = "testBlockA";
+ ASSERT_EQ(0, dir->del(env->dpp, block, yield, true));
+ std::vector<std::string> responses;
+ ASSERT_EQ(0, dir->exec(env->dpp, responses, optional_yield{yield}));
+ for (auto r : responses) {
+ std::cout << "EXEC: " << r << std::endl;
+ }
+ }
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("FLUSHALL");
+ response<boost::redis::ignore_t> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+ }
+
+ conn->cancel();
+ }, rethrow);
+
+ io.run();
+}
+
int main(int argc, char *argv[]) {
::testing::InitGoogleTest(&argc, argv);