From: Pritha Srivastava Date: Thu, 8 Feb 2024 08:49:35 +0000 (+0530) Subject: rgw/cache: implementation of `put` using yield context X-Git-Tag: v20.0.0~2219^2~11 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1a44398cb5f698dd29316ab88e5c0b4f31799769;p=ceph.git rgw/cache: implementation of `put` using yield context as completion token and adding throttling to `put_async` in the cache driver api. Also added a test case to the ssd driver unit test for `put_async`. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index a1995d110e660..9940d1a6d0e6a 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -22,7 +22,7 @@ class CacheDriver { virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) = 0; virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0; - virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) = 0; + virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) = 0; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) = 0; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) = 0; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) = 0; diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index 554e68ad17f7f..27e8707aedd6e 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -578,9 +578,10 @@ rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optiona return aio->get(r_obj, redis_read_op(y, conn, ofs, len, entry), cost, id); } -int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) { +rgw::AioResultList RedisDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) { // TODO: implement - return -1; + rgw::AioResultList aio_result_list; + return aio_result_list; } void RedisDriver::shutdown() diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index 3abfde71729ad..2447d846c4d2e 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -31,7 +31,7 @@ class RedisDriver : public CacheDriver { virtual int initialize(const DoutPrefixProvider* dpp) override; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) override; - virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) override; + virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override; virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc index 6c68877ff6ab6..4e703a5d4b93c 100644 --- a/src/rgw/rgw_ssd_driver.cc +++ b/src/rgw/rgw_ssd_driver.cc @@ -1,5 +1,6 @@ #include "common/async/completion.h" #include "common/errno.h" +#include "common/async/blocked_completion.h" #include "rgw_ssd_driver.h" #if defined(__linux__) #include @@ -54,43 +55,18 @@ int SSDDriver::initialize(const DoutPrefixProvider* dpp) int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) { - bufferlist src = bl; - std::string location = partition_info.location + key; - - ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl; - FILE *cache_file = nullptr; - int r = 0; - size_t nbytes = 0; - - cache_file = fopen(location.c_str(), "w+"); - if (cache_file == nullptr) { - ldpp_dout(dpp, 0) << "ERROR: put::fopen file has return error, errno=" << errno << dendl; - return -errno; - } - - nbytes = fwrite(src.c_str(), 1, len, cache_file); - if (nbytes != len) { - ldpp_dout(dpp, 0) << "ERROR: put::io_write: fwrite has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl; - return -EIO; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl; + boost::system::error_code ec; + if (y) { + using namespace boost::asio; + spawn::yield_context yield = y.get_yield_context(); + this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, yield[ec]); + } else { + this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, ceph::async::use_blocked[ec]); } - - r = fclose(cache_file); - if (r != 0) { - ldpp_dout(dpp, 0) << "ERROR: put::fclose file has return error, errno=" << errno << dendl; - return -errno; + if (ec) { + return ec.value(); } - - if (attrs.size() > 0) { - r = set_attrs(dpp, key, attrs, y); - if (r < 0) { - ldpp_dout(dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, r = " << r << dendl; - return r; - } - } - - efs::space_info space = efs::space(partition_info.location); - this->free_space = space.available; - return 0; } @@ -177,6 +153,13 @@ auto SSDDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& ha return p; } +template +auto SSDDriver::AsyncWriteRequest::create(const Executor1& ex1, CompletionHandler&& handler) +{ + auto p = Completion::create(ex1, std::move(handler)); + return p; +} + template auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, off_t read_ofs, off_t read_len, CompletionToken&& token) @@ -190,7 +173,7 @@ auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, auto p = Op::create(ctx.get_executor(), init.completion_handler); auto& op = p->user_data; - int ret = op.init(dpp, location, read_ofs, read_len, p.get()); + int ret = op.prepare_libaio_read_op(dpp, location, read_ofs, read_len, p.get()); if(0 == ret) { ret = ::aio_read(op.aio_cb.get()); } @@ -204,6 +187,47 @@ auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, return init.result.get(); } +template +void SSDDriver::put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, + const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token) +{ + std::string location = partition_info.location + key; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl; + + using Op = AsyncWriteRequest; + using Signature = typename Op::Signature; + boost::asio::async_completion init(token); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; + + int r = 0; + bufferlist src = bl; + r = op.prepare_libaio_write_op(dpp, src, len, key, partition_info.location); + op.cb->aio_sigevent.sigev_notify = SIGEV_THREAD; + op.cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb; + op.cb->aio_sigevent.sigev_notify_attributes = nullptr; + op.cb->aio_sigevent.sigev_value.sival_ptr = (void*)p.get(); + op.key = key; + op.dpp = dpp; + op.priv_data = this; + op.attrs = std::move(attrs); + + if (r >= 0) { + r = ::aio_write(op.cb.get()); + } else { + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::prepare_libaio_write_op(), r=" << r << dendl; + } + + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_write(), r=" << r << dendl; + if(r < 0) { + auto ec = boost::system::error_code{-r, boost::system::system_category()}; + ceph::async::post(std::move(p), ec); + } else { + (void)p.release(); + } + init.result.get(); +} + rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver, off_t read_ofs, off_t read_len, const std::string& key) { return [this, dpp, y, read_ofs, read_len, key] (Aio* aio, AioResult& r) mutable { @@ -216,7 +240,23 @@ rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, opt auto ex = get_associated_executor(init.completion_handler); ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl; - this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_handler{aio, r})); + this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_read_handler{aio, r})); + }; +} + +rgw::Aio::OpFunc SSDDriver::ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver, + const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key) { + return [this, dpp, y, bl, len, attrs, key] (Aio* aio, AioResult& r) mutable { + ceph_assert(y); + ldpp_dout(dpp, 20) << "SSDCache: cache_write_op(): Write to Cache, oid=" << r.obj.oid << dendl; + + using namespace boost::asio; + spawn::yield_context yield = y.get_yield_context(); + async_completion init(yield); + auto ex = get_associated_executor(init.completion_handler); + + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl; + this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, bind_executor(ex, SSDDriver::libaio_write_handler{aio, r})); }; } @@ -227,35 +267,11 @@ rgw::AioResultList SSDDriver::get_async(const DoutPrefixProvider* dpp, optional_ return aio->get(r_obj, ssd_cache_read_op(dpp, y, this, ofs, len, key), cost, id); } -void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c) -{ - efs::space_info space = efs::space(partition_info.location); - this->free_space = space.available; -} - -int SSDDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) +rgw::AioResultList SSDDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) { - ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, oid=" << key << ", len=" << len << dendl; - bufferlist src = bl; - struct AsyncWriteRequest* wr = new struct AsyncWriteRequest(dpp); - int r = 0; - if ((r = wr->prepare_libaio_write_op(dpp, src, len, key, partition_info.location)) < 0) { - ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() prepare libaio write op r=" << r << dendl; - return r; - } - wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD; - wr->cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb; - wr->cb->aio_sigevent.sigev_notify_attributes = nullptr; - wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr; - wr->key = key; - wr->priv_data = this; - - if ((r = ::aio_write(wr->cb)) != 0) { - ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() aio_write r=" << r << dendl; - delete wr; - return r; - } - return 0; + rgw_raw_obj r_obj; + r_obj.oid = key; + return aio->get(r_obj, ssd_cache_write_op(dpp, y, this, bl, len, attrs, key), cost, id); } int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) @@ -277,12 +293,11 @@ int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvid { std::string location = cache_location + key; int r = 0; - ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl; - cb = new struct aiocb; + cb.reset(new struct aiocb); + memset(cb.get(), 0, sizeof(struct aiocb)); mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - memset(cb, 0, sizeof(struct aiocb)); - r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode); + r = fd = TEMP_FAILURE_RETRY(::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode)); if (fd < 0) { ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl; return r; @@ -303,13 +318,35 @@ int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvid return r; } -void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) -{ - SSDDriver::AsyncWriteRequest* c = static_cast(sigval.sival_ptr); - c->priv_data->libaio_write_completion_cb(c); +void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) { + auto p = std::unique_ptr{static_cast(sigval.sival_ptr)}; + auto op = std::move(p->user_data); + ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_cb: key: " << op.key << dendl; + int attr_ret = 0; + if (op.attrs.size() > 0) { + //TODO - fix yield_context + optional_yield y{null_yield}; + attr_ret = op.priv_data->set_attrs(op.dpp, op.key, op.attrs, y); + if (attr_ret < 0) { + ldpp_dout(op.dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, ret = " << attr_ret << dendl; + } + } + + Partition partition_info = op.priv_data->get_current_partition_info(op.dpp); + efs::space_info space = efs::space(partition_info.location); + op.priv_data->set_free_space(op.dpp, space.available); + + const int ret = -aio_error(op.cb.get()); + boost::system::error_code ec; + if (ret < 0) { + ec.assign(-ret, boost::system::system_category()); + } else if (attr_ret < 0) { + ec.assign(-attr_ret, boost::system::system_category()); + } + ceph::async::dispatch(std::move(p), ec); } -int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) +int SSDDriver::AsyncReadOp::prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) { ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl; aio_cb.reset(new struct aiocb); diff --git a/src/rgw/rgw_ssd_driver.h b/src/rgw/rgw_ssd_driver.h index 5ab82763e63c6..de0110eb584d3 100644 --- a/src/rgw/rgw_ssd_driver.h +++ b/src/rgw/rgw_ssd_driver.h @@ -16,7 +16,7 @@ public: virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override; virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override { return -1; } // TODO: implement virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; - virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) override; + virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) override; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override; @@ -30,13 +30,14 @@ public: /* Partition */ virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; } virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } + void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; } private: Partition partition_info; uint64_t free_space; CephContext* cct; - struct libaio_handler { + struct libaio_read_handler { rgw::Aio* throttle = nullptr; rgw::AioResult& r; // read callback @@ -47,13 +48,23 @@ private: } }; + struct libaio_write_handler { + rgw::Aio* throttle = nullptr; + rgw::AioResult& r; + // write callback + void operator()(boost::system::error_code ec) const { + r.result = -ec.value(); + throttle->put(r); + } + }; + // unique_ptr with custom deleter for struct aiocb struct libaio_aiocb_deleter { void operator()(struct aiocb* c) { if(c->aio_fildes > 0) { - if( ::close(c->aio_fildes) != 0) { - } + TEMP_FAILURE_RETRY(::close(c->aio_fildes)); } + c->aio_buf = nullptr; delete c; } }; @@ -61,10 +72,17 @@ private: template auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, off_t read_ofs, off_t read_len, CompletionToken&& token); - + + template + void put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, + const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token); + rgw::Aio::OpFunc ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver, off_t read_ofs, off_t read_len, const std::string& key); + rgw::Aio::OpFunc ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver, + const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key); + using unique_aio_cb_ptr = std::unique_ptr; struct AsyncReadOp { @@ -73,8 +91,8 @@ private: using Signature = void(boost::system::error_code, bufferlist); using Completion = ceph::async::Completion; - int init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg); - static void libaio_cb_aio_dispatch(sigval sigval); + int prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg); + static void libaio_cb_aio_dispatch(sigval sigval); template static auto create(const Executor1& ex1, CompletionHandler&& handler); @@ -85,21 +103,19 @@ private: std::string key; void *data; int fd; - struct aiocb *cb; + unique_aio_cb_ptr cb; SSDDriver *priv_data; + rgw::sal::Attrs attrs; + + using Signature = void(boost::system::error_code); + using Completion = ceph::async::Completion; - AsyncWriteRequest(const DoutPrefixProvider* dpp) : dpp(dpp) {} int prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location); static void libaio_write_cb(sigval sigval); - ~AsyncWriteRequest() { - ::close(fd); - cb->aio_buf = nullptr; - delete(cb); - } + template + static auto create(const Executor1& ex1, CompletionHandler&& handler); }; - - void libaio_write_completion_cb(AsyncWriteRequest* c); }; } } // namespace rgw::cache diff --git a/src/test/rgw/test_ssd_driver.cc b/src/test/rgw/test_ssd_driver.cc index dc5354030d505..682a12ae700e8 100644 --- a/src/test/rgw/test_ssd_driver.cc +++ b/src/test/rgw/test_ssd_driver.cc @@ -11,6 +11,46 @@ namespace net = boost::asio; +rgw::AioResultList completed; +uint64_t offset = 0; + +int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) { + int r = rgw::check_for_errors(results); + + if (r < 0) { + return r; + } + + auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; }; + results.sort(cmp); // merge() requires results to be sorted first + completed.merge(results, cmp); // merge results in sorted order + + while (!completed.empty() && completed.front().id == offset) { + auto ret = std::move(completed.front().result); + + EXPECT_EQ(0, ret); + completed.pop_front_and_dispose(std::default_delete{}); + } + return 0; +} + +void cancel(rgw::Aio* aio) { + aio->drain(); +} + +int drain(const DoutPrefixProvider* dpp, rgw::Aio* aio) { + auto c = aio->wait(); + while (!c.empty()) { + int r = flush(dpp, std::move(c)); + if (r < 0) { + cancel(aio); + return r; + } + c = aio->wait(); + } + return flush(dpp, std::move(c)); +} + class Environment* env; class Environment : public ::testing::Environment { @@ -216,6 +256,19 @@ TEST_F(SSDDriverFixture, DeleteData) io.run(); } +TEST_F(SSDDriverFixture, PutAsync) +{ + spawn::spawn(io, [this] (spawn::yield_context yield) { + rgw::sal::Attrs attrs = {}; + const uint64_t window_size = env->cct->_conf->rgw_put_obj_min_window_size; + std::unique_ptr aio = rgw::make_throttle(window_size, optional_yield{io, yield}); + auto results = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testPutAsync", bl, bl.length(), attrs, bl.length(), 0); + drain(env->dpp, aio.get()); + }); + + io.run(); +} + int main(int argc, char *argv[]) { ::testing::InitGoogleTest(&argc, argv);