}
}
+int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y)
+{
+ std::string key = build_index(object);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZADD", key, "CH", std::to_string(score), member);
+
+ response<std::string> resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+
+}
+
+int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+ std::string key = build_index(object);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
+
+ response<std::vector<std::string> > resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value().empty()) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
+ return -EINVAL;
+ }
+
+ members = std::get<0>(resp).value();
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+ std::string key = build_index(object);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop));
+
+ response<std::vector<std::string> > resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value().empty()) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
+ return -EINVAL;
+ }
+
+ members = std::get<0>(resp).value();
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y)
+{
+ std::string key = build_index(object);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZREM", key, member);
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+
std::string BlockDirectory::build_index(CacheBlock* block)
{
return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
return 0;
}
+int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y)
+{
+ std::string key = build_index(block);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZADD", key, "CH", std::to_string(score), member);
+
+ response<std::string> resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+
+}
+
+int BlockDirectory::zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+ std::string key = build_index(block);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZRANGE", key, std::to_string(start), std::to_string(stop));
+
+ response<std::vector<std::string> > resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value().empty()) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl;
+ return -EINVAL;
+ }
+
+ members = std::get<0>(resp).value();
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BlockDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y)
+{
+ std::string key = build_index(block);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop));
+
+ response<std::vector<std::string> > resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value().empty()) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Empty response" << dendl;
+ return -EINVAL;
+ }
+
+ members = std::get<0>(resp).value();
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BlockDirectory::zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y)
+{
+ std::string key = build_index(block);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZREM", key, member);
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -EINVAL;
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
} } // namespace rgw::d4n
int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y);
+ int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y);
+ int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
+ int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
+ int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y);
private:
std::shared_ptr<connection> conn;
int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
+ int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y);
+ int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
+ int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector<std::string>& members, optional_yield y);
+ int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y);
private:
std::shared_ptr<connection> conn;
io.run();
}
+TEST_F(ObjectDirectoryFixture, ZAddYield)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ auto m_time = real_clock::now();
+ auto score = ceph::real_clock::to_double(m_time);
+ std::string version = "v1";
+ ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("FLUSHALL");
+ response<boost::redis::ignore_t> resp;
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ }
+ conn->cancel();
+ }, rethrow);
+
+ io.run();
+}
+
+TEST_F(ObjectDirectoryFixture, ZAddZRevRangeYield)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ {
+ auto m_time = real_clock::now();
+ auto score = ceph::real_clock::to_double(m_time);
+ std::string version = "v2";
+ ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+ }
+ {
+ auto m_time = real_clock::now();
+ auto score = ceph::real_clock::to_double(m_time);
+ std::string version = "v1";
+ ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+ }
+ {
+ std::vector<std::string> members;
+ ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield));
+ ASSERT_EQ(1, members.size());
+ ASSERT_EQ("v1", members[0]);
+ }
+
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("FLUSHALL");
+ response<boost::redis::ignore_t> resp;
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ }
+ conn->cancel();
+ }, rethrow);
+
+ io.run();
+}
+
+TEST_F(ObjectDirectoryFixture, ZAddZRemYield)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ {
+ auto m_time = real_clock::now();
+ auto score = ceph::real_clock::to_double(m_time);
+ std::cout << "Score for v1: " << score << std::endl;
+ std::string version = "v1";
+ ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+ }
+ {
+ auto m_time = real_clock::now();
+ auto score = ceph::real_clock::to_double(m_time);
+ std::cout << "Score for v2: " << score << std::endl;
+ std::string version = "v2";
+ ASSERT_EQ(0, dir->zadd(env->dpp, obj, score, version, yield));
+ }
+ {
+ ASSERT_EQ(0, dir->zrem(env->dpp, obj, "v2", yield));
+ std::vector<std::string> members;
+ ASSERT_EQ(0, dir->zrevrange(env->dpp, obj, "0", "0", members, yield));
+ ASSERT_EQ(1, members.size());
+ ASSERT_EQ("v1", members[0]);
+ }
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("FLUSHALL");
+ response<boost::redis::ignore_t> resp;
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ }
+ conn->cancel();
+ }, rethrow);
+
+ io.run();
+}
TEST_F(BlockDirectoryFixture, SetYield)
{