source(_source)
{
cb = std::make_unique<D4NFilterGetCB>(source->driver, source);
- }
+ }
virtual ~D4NFilterReadOp() = default;
virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
config cfg;
cfg.addr.host = address.substr(0, address.find(":"));
cfg.addr.port = address.substr(address.find(":") + 1, address.length());
+ cfg.clientname = "RedisDriver";
if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr<connection> conn,
off_t read_ofs, off_t read_len, const std::string& key)
{
- return [y, conn, key] (Aio* aio, AioResult& r) mutable {
+ return [y, conn, &key] (Aio* aio, AioResult& r) mutable {
using namespace boost::asio;
spawn::yield_context yield = y.get_yield_context();
async_completion<spawn::yield_context, void()> init(yield);
auto ex = get_associated_executor(init.completion_handler);
- boost::redis::request req;
+ // TODO: Make unique pointer once support is added
+ auto s = std::make_shared<RedisDriver::redis_response>();
+ auto& resp = s->resp;
+ auto& req = s->req;
req.push("HGET", key, "data");
+ conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
+ };
+}
+
+Aio::OpFunc RedisDriver::redis_write_op(optional_yield y, std::shared_ptr<connection> conn,
+ const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key)
+{
+ return [y, conn, &bl, &len, &attrs, &key] (Aio* aio, AioResult& r) mutable {
+ using namespace boost::asio;
+ spawn::yield_context yield = y.get_yield_context();
+ async_completion<spawn::yield_context, void()> init(yield);
+ auto ex = get_associated_executor(init.completion_handler);
+
+ auto redisAttrs = build_attrs(attrs);
+
+ if (bl.length()) {
+ redisAttrs.push_back("data");
+ redisAttrs.push_back(bl.to_str());
+ }
+
// TODO: Make unique pointer once support is added
auto s = std::make_shared<RedisDriver::redis_response>();
auto& resp = s->resp;
+ auto& req = s->req;
+ req.push_range("HMSET", key, redisAttrs);
conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
};
}
rgw::AioResultList RedisDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) {
- // TODO: implement
- rgw::AioResultList aio_result_list;
- return aio_result_list;
+ std::string entry = partition_info.location + key;
+ rgw_raw_obj r_obj;
+ r_obj.oid = key;
+
+ return aio->get(r_obj, redis_write_op(y, conn, bl, len, attrs, entry), cost, id);
}
void RedisDriver::shutdown()
virtual int initialize(const DoutPrefixProvider* dpp) override;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) override;
- virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
+ virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len,
+ const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
uint64_t outstanding_write_size;
struct redis_response {
+ boost::redis::request req;
boost::redis::response<std::string> resp;
};
std::shared_ptr<redis_response> s;
/* Read Callback */
- void operator()(boost::system::error_code ec, auto) const {
- r.result = -ec.value();
- r.data.append(std::get<0>(s->resp).value().c_str());
+ void operator()(auto ec, auto) const {
+ if (ec.failed()) {
+ r.result = -ec.value();
+ } else {
+ r.result = 0;
+ }
+
+ /* Only append data for GET call */
+ if (s->req.payload().find("HGET") != std::string::npos) {
+ r.data.append(std::get<0>(s->resp).value());
+ }
+
throttle->put(r);
}
};
- static Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn, off_t read_ofs, off_t read_len, const std::string& key);
+ Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn, off_t read_ofs, off_t read_len, const std::string& key);
+ Aio::OpFunc redis_write_op(optional_yield y, std::shared_ptr<connection> conn, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key);
};
} } // namespace rgw::cache
using boost::redis::response;
class Environment* env;
+rgw::AioResultList completed;
+uint64_t offset = 0;
+
+int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
+ int r = rgw::check_for_errors(results);
+
+ if (r < 0) {
+ return r;
+ }
+
+ auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+ results.sort(cmp); // merge() requires results to be sorted first
+ completed.merge(results, cmp); // merge results in sorted order
+
+ while (!completed.empty() && completed.front().id == offset) {
+ auto ret = std::move(completed.front().result);
+
+ EXPECT_EQ(0, ret);
+ completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
+ }
+ return 0;
+}
+
+void cancel(rgw::Aio* aio) {
+ aio->drain();
+}
+
+int drain(const DoutPrefixProvider* dpp, rgw::Aio* aio) {
+ auto c = aio->wait();
+ while (!c.empty()) {
+ int r = flush(dpp, std::move(c));
+ if (r < 0) {
+ cancel(aio);
+ return r;
+ }
+ c = aio->wait();
+ }
+ return flush(dpp, std::move(c));
+}
class Environment : public ::testing::Environment {
public:
io.run();
}
+TEST_F(RedisDriverFixture, PutAsyncYield)
+{
+ spawn::spawn(io, [this] (spawn::yield_context yield) {
+ std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
+ auto completed = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", bl, bl.length(), attrs, 0, 0);
+ drain(env->dpp, aio.get());
+
+ cacheDriver->shutdown();
+
+ boost::system::error_code ec;
+ request req;
+ req.push("HMGET", "RedisCache/testName", "attr", "data");
+ req.push("FLUSHALL");
+ response<std::vector<std::string>, boost::redis::ignore_t> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
+ EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
+ conn->cancel();
+ });
+
+ io.run();
+}
+
+TEST_F(RedisDriverFixture, GetAsyncYield)
+{
+ spawn::spawn(io, [this] (spawn::yield_context yield) {
+ ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+
+ std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
+ auto completed = cacheDriver->get_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", 0, bl.length(), 0, 0);
+ drain(env->dpp, aio.get());
+
+ cacheDriver->shutdown();
+
+ boost::system::error_code ec;
+ request req;
+ req.push("HMGET", "RedisCache/testName", "attr", "data");
+ req.push("FLUSHALL");
+ response<std::vector<std::string>, boost::redis::ignore_t> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+
+ ASSERT_EQ((bool)ec, false);
+ EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
+ EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
+ conn->cancel();
+ });
+
+ io.run();
+}
+
TEST_F(RedisDriverFixture, DelYield)
{
spawn::spawn(io, [this] (spawn::yield_context yield) {