From: Samarah Date: Wed, 8 Nov 2023 15:16:07 +0000 (+0000) Subject: rgw/redis: Implement RedisDriver::get_async and RedisDriver::put_async X-Git-Tag: v20.0.0~2219^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1e0e7138f0adaf1aa9be228e52de2b2511487958;p=ceph.git rgw/redis: Implement RedisDriver::get_async and RedisDriver::put_async Signed-off-by: Samarah --- diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 46fcfb5c1ad1b..5986cfaf20cfb 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -140,7 +140,7 @@ class D4NFilterObject : public FilterObject { source(_source) { cb = std::make_unique(source->driver, source); - } + } virtual ~D4NFilterReadOp() = default; virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override; diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index 27e8707aedd6e..a1aca90c1a287 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -73,6 +73,7 @@ int RedisDriver::initialize(const DoutPrefixProvider* dpp) config cfg; cfg.addr.host = address.substr(0, address.find(":")); cfg.addr.port = address.substr(address.find(":") + 1, address.length()); + cfg.clientname = "RedisDriver"; if (!cfg.addr.host.length() || !cfg.addr.port.length()) { ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl; @@ -552,18 +553,43 @@ int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr conn, off_t read_ofs, off_t read_len, const std::string& key) { - return [y, conn, key] (Aio* aio, AioResult& r) mutable { + return [y, conn, &key] (Aio* aio, AioResult& r) mutable { using namespace boost::asio; spawn::yield_context yield = y.get_yield_context(); async_completion init(yield); auto ex = get_associated_executor(init.completion_handler); - boost::redis::request req; + // TODO: Make unique pointer once support is added + auto s = std::make_shared(); + auto& resp = s->resp; + auto& req = s->req; req.push("HGET", key, "data"); + conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s})); + }; +} + +Aio::OpFunc RedisDriver::redis_write_op(optional_yield y, std::shared_ptr conn, + const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key) +{ + return [y, conn, &bl, &len, &attrs, &key] (Aio* aio, AioResult& r) mutable { + using namespace boost::asio; + spawn::yield_context yield = y.get_yield_context(); + async_completion init(yield); + auto ex = get_associated_executor(init.completion_handler); + + auto redisAttrs = build_attrs(attrs); + + if (bl.length()) { + redisAttrs.push_back("data"); + redisAttrs.push_back(bl.to_str()); + } + // TODO: Make unique pointer once support is added auto s = std::make_shared(); auto& resp = s->resp; + auto& req = s->req; + req.push_range("HMSET", key, redisAttrs); conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s})); }; @@ -579,9 +605,11 @@ rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optiona } rgw::AioResultList RedisDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) { - // TODO: implement - rgw::AioResultList aio_result_list; - return aio_result_list; + std::string entry = partition_info.location + key; + rgw_raw_obj r_obj; + r_obj.oid = key; + + return aio->get(r_obj, redis_write_op(y, conn, bl, len, attrs, entry), cost, id); } void RedisDriver::shutdown() diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index 2447d846c4d2e..fe401c07e5a57 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -31,7 +31,8 @@ class RedisDriver : public CacheDriver { virtual int initialize(const DoutPrefixProvider* dpp) override; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) override; - virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override; + virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, + const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override; virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; @@ -52,6 +53,7 @@ class RedisDriver : public CacheDriver { uint64_t outstanding_write_size; struct redis_response { + boost::redis::request req; boost::redis::response resp; }; @@ -61,14 +63,24 @@ class RedisDriver : public CacheDriver { std::shared_ptr s; /* Read Callback */ - void operator()(boost::system::error_code ec, auto) const { - r.result = -ec.value(); - r.data.append(std::get<0>(s->resp).value().c_str()); + void operator()(auto ec, auto) const { + if (ec.failed()) { + r.result = -ec.value(); + } else { + r.result = 0; + } + + /* Only append data for GET call */ + if (s->req.payload().find("HGET") != std::string::npos) { + r.data.append(std::get<0>(s->resp).value()); + } + throttle->put(r); } }; - static Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr conn, off_t read_ofs, off_t read_len, const std::string& key); + Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr conn, off_t read_ofs, off_t read_len, const std::string& key); + Aio::OpFunc redis_write_op(optional_yield y, std::shared_ptr conn, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key); }; } } // namespace rgw::cache diff --git a/src/test/rgw/test_redis_driver.cc b/src/test/rgw/test_redis_driver.cc index f031a939285fb..18d53ab1be44a 100644 --- a/src/test/rgw/test_redis_driver.cc +++ b/src/test/rgw/test_redis_driver.cc @@ -17,6 +17,45 @@ using boost::redis::request; using boost::redis::response; class Environment* env; +rgw::AioResultList completed; +uint64_t offset = 0; + +int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) { + int r = rgw::check_for_errors(results); + + if (r < 0) { + return r; + } + + auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; }; + results.sort(cmp); // merge() requires results to be sorted first + completed.merge(results, cmp); // merge results in sorted order + + while (!completed.empty() && completed.front().id == offset) { + auto ret = std::move(completed.front().result); + + EXPECT_EQ(0, ret); + completed.pop_front_and_dispose(std::default_delete{}); + } + return 0; +} + +void cancel(rgw::Aio* aio) { + aio->drain(); +} + +int drain(const DoutPrefixProvider* dpp, rgw::Aio* aio) { + auto c = aio->wait(); + while (!c.empty()) { + int r = flush(dpp, std::move(c)); + if (r < 0) { + cancel(aio); + return r; + } + c = aio->wait(); + } + return flush(dpp, std::move(c)); +} class Environment : public ::testing::Environment { public: @@ -148,6 +187,60 @@ TEST_F(RedisDriverFixture, GetYield) io.run(); } +TEST_F(RedisDriverFixture, PutAsyncYield) +{ + spawn::spawn(io, [this] (spawn::yield_context yield) { + std::unique_ptr aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield}); + auto completed = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", bl, bl.length(), attrs, 0, 0); + drain(env->dpp, aio.get()); + + cacheDriver->shutdown(); + + boost::system::error_code ec; + request req; + req.push("HMGET", "RedisCache/testName", "attr", "data"); + req.push("FLUSHALL"); + response, boost::redis::ignore_t> resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal"); + EXPECT_EQ(std::get<0>(resp).value()[1], "test data"); + conn->cancel(); + }); + + io.run(); +} + +TEST_F(RedisDriverFixture, GetAsyncYield) +{ + spawn::spawn(io, [this] (spawn::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + + std::unique_ptr aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield}); + auto completed = cacheDriver->get_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", 0, bl.length(), 0, 0); + drain(env->dpp, aio.get()); + + cacheDriver->shutdown(); + + boost::system::error_code ec; + request req; + req.push("HMGET", "RedisCache/testName", "attr", "data"); + req.push("FLUSHALL"); + response, boost::redis::ignore_t> resp; + + conn->async_exec(req, resp, yield[ec]); + + ASSERT_EQ((bool)ec, false); + EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal"); + EXPECT_EQ(std::get<0>(resp).value()[1], "test data"); + conn->cancel(); + }); + + io.run(); +} + TEST_F(RedisDriverFixture, DelYield) { spawn::spawn(io, [this] (spawn::yield_context yield) {