]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/cache: implementation of `put` using yield context
authorPritha Srivastava <prsrivas@redhat.com>
Thu, 8 Feb 2024 08:49:35 +0000 (14:19 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:52 +0000 (21:24 +0530)
as completion token and adding throttling to `put_async`
in the cache driver api. Also added a test case to the
ssd driver unit test for `put_async`.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/rgw_cache_driver.h
src/rgw/rgw_redis_driver.cc
src/rgw/rgw_redis_driver.h
src/rgw/rgw_ssd_driver.cc
src/rgw/rgw_ssd_driver.h
src/test/rgw/test_ssd_driver.cc

index a1995d110e660cd649a1ad8d1be886427e217773..9940d1a6d0e6a7d471250314c172ceba1d5b3c46 100644 (file)
@@ -22,7 +22,7 @@ class CacheDriver {
     virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) = 0;
     virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
     virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0;
-    virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) = 0;
+    virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) = 0;
     virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) = 0;
     virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) = 0;
     virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) = 0;
index 554e68ad17f7f226c06b2bcbf82ca1f1cf657cf1..27e8707aedd6e38644ebabcd6a79df0adcf41998 100644 (file)
@@ -578,9 +578,10 @@ rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optiona
   return aio->get(r_obj, redis_read_op(y, conn, ofs, len, entry), cost, id);
 }
 
-int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) {
+rgw::AioResultList RedisDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) {
   // TODO: implement
-  return -1;
+  rgw::AioResultList aio_result_list;
+  return aio_result_list;
 } 
 
 void RedisDriver::shutdown()
index 3abfde71729ad6c24b6f33cfe1f1ba85ebda348d..2447d846c4d2e6ea336ef3886259463b909bd000 100644 (file)
@@ -31,7 +31,7 @@ class RedisDriver : public CacheDriver {
 
     virtual int initialize(const DoutPrefixProvider* dpp) override;
     virtual int put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) override;
-    virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) override;
+    virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
     virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
     virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
     virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
index 6c68877ff6ab6e87f8fc3da67836492914dd39d6..4e703a5d4b93cfd2b489daa6b85b30f067022223 100644 (file)
@@ -1,5 +1,6 @@
 #include "common/async/completion.h"
 #include "common/errno.h"
+#include "common/async/blocked_completion.h"
 #include "rgw_ssd_driver.h"
 #if defined(__linux__)
 #include <features.h>
@@ -54,43 +55,18 @@ int SSDDriver::initialize(const DoutPrefixProvider* dpp)
 
 int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y)
 {
-    bufferlist src = bl;
-    std::string location = partition_info.location + key;
-
-    ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl;
-    FILE *cache_file = nullptr;
-    int r = 0;
-    size_t nbytes = 0;
-
-    cache_file = fopen(location.c_str(), "w+");
-    if (cache_file == nullptr) {
-        ldpp_dout(dpp, 0) << "ERROR: put::fopen file has return error, errno=" << errno << dendl;
-        return -errno;
-    }
-
-    nbytes = fwrite(src.c_str(), 1, len, cache_file);
-    if (nbytes != len) {
-        ldpp_dout(dpp, 0) << "ERROR: put::io_write: fwrite has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl;
-        return -EIO;
+    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
+    boost::system::error_code ec;
+    if (y) {
+        using namespace boost::asio;
+        spawn::yield_context yield = y.get_yield_context();
+        this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, yield[ec]);
+    } else {
+        this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, ceph::async::use_blocked[ec]);
     }
-
-    r = fclose(cache_file);
-    if (r != 0) {
-        ldpp_dout(dpp, 0) << "ERROR: put::fclose file has return error, errno=" << errno << dendl;
-        return -errno;
+    if (ec) {
+        return ec.value();
     }
-
-    if (attrs.size() > 0) {
-        r = set_attrs(dpp, key, attrs, y);
-        if (r < 0) {
-            ldpp_dout(dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, r = " << r << dendl;
-            return r;
-        }
-    }
-
-    efs::space_info space = efs::space(partition_info.location);
-    this->free_space = space.available;
-
     return 0;
 }
 
@@ -177,6 +153,13 @@ auto SSDDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& ha
     return p;
 }
 
+template <typename Executor1, typename CompletionHandler>
+auto SSDDriver::AsyncWriteRequest::create(const Executor1& ex1, CompletionHandler&& handler)
+{
+    auto p = Completion::create(ex1, std::move(handler));
+    return p;
+}
+
 template <typename ExecutionContext, typename CompletionToken>
 auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
                 off_t read_ofs, off_t read_len, CompletionToken&& token)
@@ -190,7 +173,7 @@ auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx,
     auto p = Op::create(ctx.get_executor(), init.completion_handler);
     auto& op = p->user_data;
 
-    int ret = op.init(dpp, location, read_ofs, read_len, p.get());
+    int ret = op.prepare_libaio_read_op(dpp, location, read_ofs, read_len, p.get());
     if(0 == ret) {
         ret = ::aio_read(op.aio_cb.get());
     }
@@ -204,6 +187,47 @@ auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx,
     return init.result.get();
 }
 
+template <typename ExecutionContext, typename CompletionToken>
+void SSDDriver::put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+                const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token)
+{
+    std::string location = partition_info.location + key;
+    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
+
+    using Op = AsyncWriteRequest;
+    using Signature = typename Op::Signature;
+    boost::asio::async_completion<CompletionToken, Signature> init(token);
+    auto p = Op::create(ctx.get_executor(), init.completion_handler);
+    auto& op = p->user_data;
+
+    int r = 0;
+    bufferlist src = bl;
+    r = op.prepare_libaio_write_op(dpp, src, len, key, partition_info.location);
+    op.cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+    op.cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
+    op.cb->aio_sigevent.sigev_notify_attributes = nullptr;
+    op.cb->aio_sigevent.sigev_value.sival_ptr = (void*)p.get();
+    op.key = key;
+    op.dpp = dpp;
+    op.priv_data = this;
+    op.attrs = std::move(attrs);
+
+    if (r >= 0) {
+        r = ::aio_write(op.cb.get());
+    } else {
+        ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::prepare_libaio_write_op(), r=" << r << dendl;
+    }
+
+    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_write(), r=" << r << dendl;
+    if(r < 0) {
+        auto ec = boost::system::error_code{-r, boost::system::system_category()};
+        ceph::async::post(std::move(p), ec);
+    } else {
+        (void)p.release();
+    }
+    init.result.get();
+}
+
 rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
                                 off_t read_ofs, off_t read_len, const std::string& key) {
   return [this, dpp, y, read_ofs, read_len, key] (Aio* aio, AioResult& r) mutable {
@@ -216,7 +240,23 @@ rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, opt
     auto ex = get_associated_executor(init.completion_handler);
 
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
-    this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_handler{aio, r}));
+    this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_read_handler{aio, r}));
+  };
+}
+
+rgw::Aio::OpFunc SSDDriver::ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
+                                const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key) {
+  return [this, dpp, y, bl, len, attrs, key] (Aio* aio, AioResult& r) mutable {
+    ceph_assert(y);
+    ldpp_dout(dpp, 20) << "SSDCache: cache_write_op(): Write to Cache, oid=" << r.obj.oid << dendl;
+
+    using namespace boost::asio;
+    spawn::yield_context yield = y.get_yield_context();
+    async_completion<spawn::yield_context, void()> init(yield);
+    auto ex = get_associated_executor(init.completion_handler);
+
+    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
+    this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, bind_executor(ex, SSDDriver::libaio_write_handler{aio, r}));
   };
 }
 
@@ -227,35 +267,11 @@ rgw::AioResultList SSDDriver::get_async(const DoutPrefixProvider* dpp, optional_
     return aio->get(r_obj, ssd_cache_read_op(dpp, y, this, ofs, len, key), cost, id);
 }
 
-void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c)
-{
-    efs::space_info space = efs::space(partition_info.location);
-    this->free_space = space.available;
-}
-
-int SSDDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs)
+rgw::AioResultList SSDDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id)
 {
-    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, oid=" << key << ", len=" << len << dendl;
-    bufferlist src = bl;
-    struct AsyncWriteRequest* wr = new struct AsyncWriteRequest(dpp);
-    int r = 0;
-    if ((r = wr->prepare_libaio_write_op(dpp, src, len, key, partition_info.location)) < 0) {
-        ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() prepare libaio write op r=" << r << dendl;
-        return r;
-    }
-    wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
-    wr->cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
-    wr->cb->aio_sigevent.sigev_notify_attributes = nullptr;
-    wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr;
-    wr->key = key;
-    wr->priv_data = this;
-
-    if ((r = ::aio_write(wr->cb)) != 0) {
-        ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() aio_write r=" << r << dendl;
-        delete wr;
-        return r;
-    }
-    return 0;
+    rgw_raw_obj r_obj;
+    r_obj.oid = key;
+    return aio->get(r_obj, ssd_cache_write_op(dpp, y, this, bl, len, attrs, key), cost, id);
 }
 
 int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y)
@@ -277,12 +293,11 @@ int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvid
 {
     std::string location = cache_location + key;
     int r = 0;
-
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl;
-    cb = new struct aiocb;
+    cb.reset(new struct aiocb);
+    memset(cb.get(), 0, sizeof(struct aiocb));
     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
-    memset(cb, 0, sizeof(struct aiocb));
-    r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode);
+    r = fd = TEMP_FAILURE_RETRY(::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode));
     if (fd < 0) {
         ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl;
         return r;
@@ -303,13 +318,35 @@ int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvid
     return r;
 }
 
-void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval)
-{
-  SSDDriver::AsyncWriteRequest* c = static_cast<SSDDriver::AsyncWriteRequest*>(sigval.sival_ptr);
-  c->priv_data->libaio_write_completion_cb(c);
+void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) {
+    auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
+    auto op = std::move(p->user_data);
+    ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_cb: key: " << op.key << dendl;
+    int attr_ret = 0;
+    if (op.attrs.size() > 0) {
+        //TODO - fix yield_context
+        optional_yield y{null_yield};
+        attr_ret = op.priv_data->set_attrs(op.dpp, op.key, op.attrs, y);
+        if (attr_ret < 0) {
+            ldpp_dout(op.dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, ret = " << attr_ret << dendl;
+        }
+    }
+
+    Partition partition_info = op.priv_data->get_current_partition_info(op.dpp);
+    efs::space_info space = efs::space(partition_info.location);
+    op.priv_data->set_free_space(op.dpp, space.available);
+
+    const int ret = -aio_error(op.cb.get());
+    boost::system::error_code ec;
+    if (ret < 0) {
+        ec.assign(-ret, boost::system::system_category());
+    } else if (attr_ret < 0) {
+        ec.assign(-attr_ret, boost::system::system_category());
+    }
+    ceph::async::dispatch(std::move(p), ec);
 }
 
-int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
+int SSDDriver::AsyncReadOp::prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
 {
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl;
     aio_cb.reset(new struct aiocb);
index 5ab82763e63c661b847ffdc924bb5b73dda9b581..de0110eb584d33cde9a2c156cf5ad36e77d08d47 100644 (file)
@@ -16,7 +16,7 @@ public:
   virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
   virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override { return -1; } // TODO: implement
   virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
-  virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) override;
+  virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
   virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) override;
   virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override;
   virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
@@ -30,13 +30,14 @@ public:
   /* Partition */
   virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; }
   virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; }
+  void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; }
 
 private:
   Partition partition_info;
   uint64_t free_space;
   CephContext* cct;
 
-  struct libaio_handler {
+  struct libaio_read_handler {
     rgw::Aio* throttle = nullptr;
     rgw::AioResult& r;
     // read callback
@@ -47,13 +48,23 @@ private:
     }
   };
 
+  struct libaio_write_handler {
+    rgw::Aio* throttle = nullptr;
+    rgw::AioResult& r;
+    // write callback
+    void operator()(boost::system::error_code ec) const {
+      r.result = -ec.value();
+      throttle->put(r);
+    }
+  };
+
   // unique_ptr with custom deleter for struct aiocb
   struct libaio_aiocb_deleter {
     void operator()(struct aiocb* c) {
       if(c->aio_fildes > 0) {
-             if( ::close(c->aio_fildes) != 0) {
-             }
+             TEMP_FAILURE_RETRY(::close(c->aio_fildes));
       }
+      c->aio_buf = nullptr;
       delete c;
     }
   };
@@ -61,10 +72,17 @@ private:
   template <typename ExecutionContext, typename CompletionToken>
     auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
                    off_t read_ofs, off_t read_len, CompletionToken&& token);
-
+  
+  template <typename ExecutionContext, typename CompletionToken>
+  void put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+                  const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token);
+  
   rgw::Aio::OpFunc ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
                                  off_t read_ofs, off_t read_len, const std::string& key);
 
+  rgw::Aio::OpFunc ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
+                                const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key);
+
   using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
 
   struct AsyncReadOp {
@@ -73,8 +91,8 @@ private:
     using Signature = void(boost::system::error_code, bufferlist);
     using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
 
-  int init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
-  static void libaio_cb_aio_dispatch(sigval sigval);
+    int prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
+    static void libaio_cb_aio_dispatch(sigval sigval);
 
     template <typename Executor1, typename CompletionHandler>
     static auto create(const Executor1& ex1, CompletionHandler&& handler);
@@ -85,21 +103,19 @@ private:
          std::string key;
          void *data;
          int fd;
-         struct aiocb *cb;
+         unique_aio_cb_ptr cb;
     SSDDriver *priv_data;
+    rgw::sal::Attrs attrs;
+
+    using Signature = void(boost::system::error_code);
+    using Completion = ceph::async::Completion<Signature, AsyncWriteRequest>;
 
-         AsyncWriteRequest(const DoutPrefixProvider* dpp) : dpp(dpp) {}
          int prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location);
     static void libaio_write_cb(sigval sigval);
 
-    ~AsyncWriteRequest() {
-      ::close(fd);
-                 cb->aio_buf = nullptr;
-                 delete(cb);
-    }
+    template <typename Executor1, typename CompletionHandler>
+    static auto create(const Executor1& ex1, CompletionHandler&& handler);
   };
-
-  void libaio_write_completion_cb(AsyncWriteRequest* c);
 };
 
 } } // namespace rgw::cache
index dc5354030d505a24a1ae81c0cbe90c43e3af72e7..682a12ae700e80e9e177a5df95370c3716a2ae1b 100644 (file)
 
 namespace net = boost::asio;
 
+rgw::AioResultList completed;
+uint64_t offset = 0;
+
+int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
+  int r = rgw::check_for_errors(results);
+
+  if (r < 0) {
+    return r;
+  }
+
+  auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+  results.sort(cmp); // merge() requires results to be sorted first
+  completed.merge(results, cmp); // merge results in sorted order
+
+  while (!completed.empty() && completed.front().id == offset) {
+    auto ret = std::move(completed.front().result);
+
+    EXPECT_EQ(0, ret);
+    completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
+  }
+  return 0;
+}
+
+void cancel(rgw::Aio* aio) {
+  aio->drain();
+}
+
+int drain(const DoutPrefixProvider* dpp, rgw::Aio* aio) {
+  auto c = aio->wait();
+  while (!c.empty()) {
+    int r = flush(dpp, std::move(c));
+    if (r < 0) {
+      cancel(aio);
+      return r;
+    }
+    c = aio->wait();
+  }
+  return flush(dpp, std::move(c));
+}
+
 class Environment* env;
 
 class Environment : public ::testing::Environment {
@@ -216,6 +256,19 @@ TEST_F(SSDDriverFixture, DeleteData)
     io.run();
 }
 
+TEST_F(SSDDriverFixture, PutAsync)
+{
+    spawn::spawn(io, [this] (spawn::yield_context yield) {
+        rgw::sal::Attrs attrs = {};
+        const uint64_t window_size = env->cct->_conf->rgw_put_obj_min_window_size;
+        std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(window_size, optional_yield{io, yield});
+        auto results = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testPutAsync", bl, bl.length(), attrs, bl.length(), 0);
+        drain(env->dpp, aio.get());
+    });
+
+    io.run();
+}
+
 int main(int argc, char *argv[]) {
   ::testing::InitGoogleTest(&argc, argv);