]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/redis: Implement RedisDriver::get_async and RedisDriver::put_async
authorSamarah <samarah.uriarte@ibm.com>
Wed, 8 Nov 2023 15:16:07 +0000 (15:16 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:52 +0000 (21:24 +0530)
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_redis_driver.cc
src/rgw/rgw_redis_driver.h
src/test/rgw/test_redis_driver.cc

index 46fcfb5c1ad1bb3d69fa82391d1c543570698c01..5986cfaf20cfbd32a03d6bab06c2712e65a187d5 100644 (file)
@@ -140,7 +140,7 @@ class D4NFilterObject : public FilterObject {
                                                                                   source(_source) 
         {
           cb = std::make_unique<D4NFilterGetCB>(source->driver, source);
-             }
+       }
        virtual ~D4NFilterReadOp() = default;
 
        virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
index 27e8707aedd6e38644ebabcd6a79df0adcf41998..a1aca90c1a287f3230892b0619488e7f79abbb35 100644 (file)
@@ -73,6 +73,7 @@ int RedisDriver::initialize(const DoutPrefixProvider* dpp)
   config cfg;
   cfg.addr.host = address.substr(0, address.find(":"));
   cfg.addr.port = address.substr(address.find(":") + 1, address.length());
+  cfg.clientname = "RedisDriver";
 
   if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
     ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
@@ -552,18 +553,43 @@ int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key,
 Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr<connection> conn,
                                  off_t read_ofs, off_t read_len, const std::string& key)
 {
-  return [y, conn, key] (Aio* aio, AioResult& r) mutable {
+  return [y, conn, &key] (Aio* aio, AioResult& r) mutable {
     using namespace boost::asio;
     spawn::yield_context yield = y.get_yield_context();
     async_completion<spawn::yield_context, void()> init(yield);
     auto ex = get_associated_executor(init.completion_handler);
 
-    boost::redis::request req;
+    // TODO: Make unique pointer once support is added
+    auto s = std::make_shared<RedisDriver::redis_response>();
+    auto& resp = s->resp;
+    auto& req = s->req;
     req.push("HGET", key, "data");
 
+    conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
+  };
+}
+
+Aio::OpFunc RedisDriver::redis_write_op(optional_yield y, std::shared_ptr<connection> conn,
+                                 const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key)
+{
+  return [y, conn, &bl, &len, &attrs, &key] (Aio* aio, AioResult& r) mutable {
+    using namespace boost::asio;
+    spawn::yield_context yield = y.get_yield_context();
+    async_completion<spawn::yield_context, void()> init(yield);
+    auto ex = get_associated_executor(init.completion_handler);
+
+    auto redisAttrs = build_attrs(attrs);
+
+    if (bl.length()) {
+      redisAttrs.push_back("data");
+      redisAttrs.push_back(bl.to_str());
+    }
+
     // TODO: Make unique pointer once support is added
     auto s = std::make_shared<RedisDriver::redis_response>();
     auto& resp = s->resp;
+    auto& req = s->req;
+    req.push_range("HMSET", key, redisAttrs);
 
     conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
   };
@@ -579,9 +605,11 @@ rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optiona
 }
 
 rgw::AioResultList RedisDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) {
-  // TODO: implement
-  rgw::AioResultList aio_result_list;
-  return aio_result_list;
+  std::string entry = partition_info.location + key;
+  rgw_raw_obj r_obj;
+  r_obj.oid = key;
+
+  return aio->get(r_obj, redis_write_op(y, conn, bl, len, attrs, entry), cost, id);
 } 
 
 void RedisDriver::shutdown()
index 2447d846c4d2e6ea336ef3886259463b909bd000..fe401c07e5a578b37664850aa1ff867db751878a 100644 (file)
@@ -31,7 +31,8 @@ class RedisDriver : public CacheDriver {
 
     virtual int initialize(const DoutPrefixProvider* dpp) override;
     virtual int put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) override;
-    virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
+    virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, 
+                                          const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
     virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
     virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
     virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
@@ -52,6 +53,7 @@ class RedisDriver : public CacheDriver {
     uint64_t outstanding_write_size;
 
     struct redis_response {
+      boost::redis::request req;
       boost::redis::response<std::string> resp;
     };
 
@@ -61,14 +63,24 @@ class RedisDriver : public CacheDriver {
       std::shared_ptr<redis_response> s;
 
       /* Read Callback */
-      void operator()(boost::system::error_code ec, auto) const {
-       r.result = -ec.value();
-       r.data.append(std::get<0>(s->resp).value().c_str());
+      void operator()(auto ec, auto) const {
+        if (ec.failed()) {
+         r.result = -ec.value();
+        } else {
+         r.result = 0;
+        }
+
+        /* Only append data for GET call */
+        if (s->req.payload().find("HGET") != std::string::npos) {
+         r.data.append(std::get<0>(s->resp).value());
+        }
+
        throttle->put(r);
       }
     };
 
-    static Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn, off_t read_ofs, off_t read_len, const std::string& key);
+    Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn, off_t read_ofs, off_t read_len, const std::string& key);
+    Aio::OpFunc redis_write_op(optional_yield y, std::shared_ptr<connection> conn, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key);
 };
 
 } } // namespace rgw::cache
index f031a939285fbc5af2279492cd6d9c68696bd9d5..18d53ab1be44a20bdb83a374d826198e94711183 100644 (file)
@@ -17,6 +17,45 @@ using boost::redis::request;
 using boost::redis::response;
 
 class Environment* env;
+rgw::AioResultList completed;
+uint64_t offset = 0;
+
+int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
+  int r = rgw::check_for_errors(results);
+
+  if (r < 0) {
+    return r;
+  }
+
+  auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+  results.sort(cmp); // merge() requires results to be sorted first
+  completed.merge(results, cmp); // merge results in sorted order
+
+  while (!completed.empty() && completed.front().id == offset) {
+    auto ret = std::move(completed.front().result);
+
+    EXPECT_EQ(0, ret);
+    completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
+  }
+  return 0;
+}
+
+void cancel(rgw::Aio* aio) {
+  aio->drain();
+}
+
+int drain(const DoutPrefixProvider* dpp, rgw::Aio* aio) {
+  auto c = aio->wait(); 
+  while (!c.empty()) {
+    int r = flush(dpp, std::move(c));
+    if (r < 0) {
+      cancel(aio);
+      return r;
+    }
+    c = aio->wait();
+  }
+  return flush(dpp, std::move(c));
+}
 
 class Environment : public ::testing::Environment {
   public:
@@ -148,6 +187,60 @@ TEST_F(RedisDriverFixture, GetYield)
   io.run();
 }
 
+TEST_F(RedisDriverFixture, PutAsyncYield)
+{
+  spawn::spawn(io, [this] (spawn::yield_context yield) {
+    std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
+    auto completed = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", bl, bl.length(), attrs, 0, 0);
+    drain(env->dpp, aio.get());
+
+    cacheDriver->shutdown();
+
+    boost::system::error_code ec;
+    request req;
+    req.push("HMGET", "RedisCache/testName", "attr", "data");
+    req.push("FLUSHALL");
+    response<std::vector<std::string>, boost::redis::ignore_t> resp;
+
+    conn->async_exec(req, resp, yield[ec]);
+
+    ASSERT_EQ((bool)ec, false);
+    EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
+    EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
+    conn->cancel();
+  });
+
+  io.run();
+}
+
+TEST_F(RedisDriverFixture, GetAsyncYield)
+{
+  spawn::spawn(io, [this] (spawn::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+
+    std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
+    auto completed = cacheDriver->get_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", 0, bl.length(), 0, 0);
+    drain(env->dpp, aio.get());
+
+    cacheDriver->shutdown();
+
+    boost::system::error_code ec;
+    request req;
+    req.push("HMGET", "RedisCache/testName", "attr", "data");
+    req.push("FLUSHALL");
+    response<std::vector<std::string>, boost::redis::ignore_t> resp;
+
+    conn->async_exec(req, resp, yield[ec]);
+
+    ASSERT_EQ((bool)ec, false);
+    EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
+    EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
+    conn->cancel();
+  });
+
+  io.run();
+}
+
 TEST_F(RedisDriverFixture, DelYield)
 {
   spawn::spawn(io, [this] (spawn::yield_context yield) {