From 951157437b00abac8db7f6051a862b3c9f2e15a6 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Tue, 11 Jul 2023 11:31:04 +0530 Subject: [PATCH] rgw/cache: implementation of put_async method in ssd driver and redis driver and squashes the following commits. rgw/cache: implementation of async put. the call does not take into account throttling for now. rgw/cache: dummy implementation of put_async in redis driver to fix compilation error. Signed-off-by: Pritha Srivastava --- src/rgw/rgw_cache_driver.h | 1 + src/rgw/rgw_redis_driver.cc | 4 +++ src/rgw/rgw_redis_driver.h | 1 + src/rgw/rgw_ssd_driver.cc | 71 ++++++++++++++++++++++++++++++++++++- src/rgw/rgw_ssd_driver.h | 42 ++++++++++++++++------ 5 files changed, 108 insertions(+), 11 deletions(-) diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index eae032940b7..5d5199fdf51 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -36,6 +36,7 @@ class CacheDriver { virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) = 0; virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0; + virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) = 0; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) = 0; diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index fd521092b2c..44ceaac9363 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -746,4 +746,8 @@ void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, option { } +int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) +{ + return 0; +} } } // namespace rgw::cache diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index b3835c9362e..832bcedf2c2 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -63,6 +63,7 @@ class RedisDriver : public CacheDriver { virtual std::unique_ptr get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override; virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; + virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; struct libaio_handler { // should this be the same as SSDDriver? -Sam rgw::Aio* throttle = nullptr; diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc index 5a44ef4efa2..ba43662097a 100644 --- a/src/rgw/rgw_ssd_driver.cc +++ b/src/rgw/rgw_ssd_driver.cc @@ -159,7 +159,7 @@ int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buffer return -errno; } - efs::space_info space = efs::space(location); + efs::space_info space = efs::space(partition_info.location); this->free_space = space.available; if (attrs.size() > 0) { @@ -229,6 +229,38 @@ rgw::AioResultList SSDDriver::get_async(const DoutPrefixProvider* dpp, optional_ return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id); } +void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c) +{ + efs::space_info space = efs::space(partition_info.location); + this->free_space = space.available; + + insert_entry(c->dpp, c->key, 0, c->cb->aio_nbytes); +} + +int SSDDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) +{ + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, oid=" << key << ", len=" << len << dendl; + struct AsyncWriteRequest* wr = new struct AsyncWriteRequest(dpp); + int r = 0; + if ((r = wr->prepare_libaio_write_op(dpp, bl, len, key, partition_info.location)) < 0) { + ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() prepare libaio write op r=" << r << dendl; + return r; + } + wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD; + wr->cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb; + wr->cb->aio_sigevent.sigev_notify_attributes = nullptr; + wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr; + wr->key = key; + wr->priv_data = this; + + if ((r = ::aio_write(wr->cb)) != 0) { + ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() aio_write r=" << r << dendl; + delete wr; + return r; + } + return 0; +} + int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) { std::string location = partition_info.location + key; @@ -244,6 +276,43 @@ int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& ke return remove_entry(dpp, key); } +int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location) +{ + std::string location = cache_location + key; + int r = 0; + + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl; + cb = new struct aiocb; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + memset(cb, 0, sizeof(struct aiocb)); + r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode); + if (fd < 0) { + ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl; + return r; + } + if (dpp->get_cct()->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) + posix_fadvise(fd, 0, 0, dpp->get_cct()->_conf->rgw_d3n_l1_fadvise); + cb->aio_fildes = fd; + + data = malloc(len); + if (!data) { + ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: memory allocation failed" << dendl; + ::close(fd); + return r; + } + cb->aio_buf = data; + memcpy((void*)data, bl.c_str(), len); + cb->aio_nbytes = len; + return r; +} + +void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) +{ + SSDDriver::AsyncWriteRequest* c = static_cast(sigval.sival_ptr); + ldpp_dout(c->dpp, 20) << "SSDCache: " << __func__ << "()" << dendl; + c->priv_data->libaio_write_completion_cb(c); +} + int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) { ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl; diff --git a/src/rgw/rgw_ssd_driver.h b/src/rgw/rgw_ssd_driver.h index 67d7ada057a..bc3efedab03 100644 --- a/src/rgw/rgw_ssd_driver.h +++ b/src/rgw/rgw_ssd_driver.h @@ -27,6 +27,7 @@ public: virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override; virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; + virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; @@ -63,7 +64,6 @@ public: template auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, off_t read_ofs, off_t read_len, CompletionToken&& token); - protected: static std::unordered_map partitions; std::unordered_map entries; @@ -79,6 +79,7 @@ protected: std::optional get_entry(const DoutPrefixProvider* dpp, std::string key); private: + // unique_ptr with custom deleter for struct aiocb struct libaio_aiocb_deleter { void operator()(struct aiocb* c) { @@ -93,17 +94,38 @@ struct libaio_aiocb_deleter { using unique_aio_cb_ptr = std::unique_ptr; struct AsyncReadOp { - bufferlist result; - unique_aio_cb_ptr aio_cb; - using Signature = void(boost::system::error_code, bufferlist); - using Completion = ceph::async::Completion; + bufferlist result; + unique_aio_cb_ptr aio_cb; + using Signature = void(boost::system::error_code, bufferlist); + using Completion = ceph::async::Completion; - int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg); - static void libaio_cb_aio_dispatch(sigval sigval); + int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg); + static void libaio_cb_aio_dispatch(sigval sigval); - template - static auto create(const Executor1& ex1, CompletionHandler&& handler); - }; + template + static auto create(const Executor1& ex1, CompletionHandler&& handler); +}; + +struct AsyncWriteRequest { + const DoutPrefixProvider* dpp; + std::string key; + void *data; + int fd; + struct aiocb *cb; + SSDDriver *priv_data; + + AsyncWriteRequest(const DoutPrefixProvider* dpp) : dpp(dpp) {} + int prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location); + static void libaio_write_cb(sigval sigval); + + ~AsyncWriteRequest() { + ::close(fd); + cb->aio_buf = nullptr; + delete(cb); + } +}; + +void libaio_write_completion_cb(AsyncWriteRequest* c); }; -- 2.39.5