From: Pritha Srivastava Date: Mon, 5 Jun 2023 08:04:32 +0000 (+0530) Subject: rgw/cache: this commit squashes the following commits for ssd backed cache driver. X-Git-Tag: v20.0.0~2219^2~82 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d5ebfc06e84f9ea8fb838f7067d789ece5cd3ad7;p=ceph.git rgw/cache: this commit squashes the following commits for ssd backed cache driver. initial implementation of ssd backed cache driver.This contains implementations for methods related to Entry and Partition. adding methods for put, get and delete data. They are all synchronous. added CacheAioRequest to handle async read and write requests and added a corresponding read op in aio.h and aio.cc. added get_async method implementation and initial implementation to handle async reads using asio. changing the signature of list_entries to match that with base class and making it public. adding implementation for asynchronous get. modifying namespace from rgw::cal to rgw::cache. Modifying the SSDDriver constructor for Partition related modifications. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 3738c80d9297..de907dd150f7 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -117,6 +117,7 @@ set(librgw_common_srcs rgw_sal.cc rgw_sal_filter.cc rgw_redis_driver.cc + rgw_ssd_driver.cc rgw_string.cc rgw_tag.cc rgw_tag_s3.cc diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc index 293dea13217c..da71ea5071ad 100644 --- a/src/rgw/rgw_aio.cc +++ b/src/rgw/rgw_aio.cc @@ -19,6 +19,7 @@ #include "rgw_aio.h" #include "rgw_d3n_cacherequest.h" +#include "rgw_cache_driver.h" namespace rgw { @@ -145,4 +146,14 @@ Aio::OpFunc Aio::d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y, return d3n_cache_aio_abstract(dpp, y, read_ofs, read_len, cache_location); } +Aio::OpFunc Aio::cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver, + off_t read_ofs, off_t read_len, const std::string& key) { + return [dpp, y, cache_driver, read_ofs, read_len, key] (Aio* aio, AioResult& r) mutable { + ceph_assert(y); + auto c = cache_driver->get_cache_aio_request_ptr(dpp); + ldpp_dout(dpp, 20) << "Cache: cache_read_op(): Read From Cache, oid=" << r.obj.oid << dendl; + c->cache_aio_read(dpp, y, key, read_ofs, read_len, aio, r); + }; +} + } // namespace rgw diff --git a/src/rgw/rgw_aio.h b/src/rgw/rgw_aio.h index 9be144f607f2..035758e42da7 100644 --- a/src/rgw/rgw_aio.h +++ b/src/rgw/rgw_aio.h @@ -29,6 +29,10 @@ struct D3nGetObjData; +namespace rgw::cache { + class CacheDriver; +} + namespace rgw { struct AioResult { @@ -99,6 +103,9 @@ class Aio { optional_yield y, jspan_context *trace_ctx = nullptr); static OpFunc d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y, off_t read_ofs, off_t read_len, std::string& location); + + static OpFunc cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver, + off_t read_ofs, off_t read_len, const std::string& key); }; } // namespace rgw diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index 0bae4602c530..eae032940b73 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -1,6 +1,7 @@ #pragma once #include "rgw_common.h" +#include "rgw_aio.h" namespace rgw { namespace cache { @@ -18,6 +19,14 @@ struct Entry { int localWeight; }; +class CacheAioRequest { + public: + CacheAioRequest() {} + virtual ~CacheAioRequest() = default; + virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) = 0; + virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) = 0; +}; + class CacheDriver { public: CacheDriver() {} @@ -26,6 +35,7 @@ class CacheDriver { virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) = 0; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) = 0; + virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) = 0; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) = 0; @@ -35,6 +45,8 @@ class CacheDriver { virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) = 0; virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val) = 0; + virtual std::unique_ptr get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) = 0; + /* Entry */ virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) = 0; virtual std::vector list_entries(const DoutPrefixProvider* dpp) = 0; diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc new file mode 100644 index 000000000000..7ad7ee25e26b --- /dev/null +++ b/src/rgw/rgw_ssd_driver.cc @@ -0,0 +1,312 @@ +#include "common/async/completion.h" +#include "rgw_ssd_driver.h" +#if defined(__linux__) +#include +#endif + +#if __has_include() +#include +namespace efs = std::filesystem; +#else +#include +namespace efs = std::experimental::filesystem; +#endif + +namespace rgw { namespace cache { + +std::optional SSDDriver::get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type) +{ + std::string key = name + type; + auto iter = partitions.find(key); + if (iter != partitions.end()) { + return iter->second; + } + + return std::nullopt; +} + +std::vector SSDDriver::list_partitions(const DoutPrefixProvider* dpp) +{ + std::vector partitions_v; + for (auto& it : SSDDriver::partitions) { + partitions_v.emplace_back(it.second); + } + return partitions_v; +} + +int SSDDriver::add_partition_info(Partition& info) +{ + std::string key = info.name + info.type; + auto ret = partitions.emplace(key, info); + return ret.second; +} + +int SSDDriver::remove_partition_info(Partition& info) +{ + std::string key = info.name + info.type; + return partitions.erase(key); +} + +int SSDDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len) +{ + auto ret = entries.emplace(key, Entry(key, offset, len)); + return ret.second; +} + +int SSDDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) +{ + return entries.erase(key); +} + +std::optional SSDDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) +{ + auto iter = entries.find(key); + if (iter != entries.end()) { + return iter->second; + } + + return std::nullopt; +} + +std::vector SSDDriver::list_entries(const DoutPrefixProvider* dpp) +{ + std::vector entries_v; + for (auto& it : entries) { + entries_v.emplace_back(it.second); + } + return entries_v; +} + +SSDDriver::SSDDriver(Partition& partition_info) : partition_info(partition_info), + free_space(partition_info.size), outstanding_write_size(0) +{ + add_partition_info(partition_info); +} + +SSDDriver::~SSDDriver() +{ + remove_partition_info(partition_info); +} + +int SSDDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) +{ + this->cct = cct; + + if(partition_info.location.back() != '/') { + partition_info.location += "/"; + } + try { + if (efs::exists(partition_info.location)) { + if (cct->_conf->rgw_d3n_l1_evict_cache_on_start) { + ldpp_dout(dpp, 5) << "initialize: evicting the persistent storage directory on start" << dendl; + for (auto& p : efs::directory_iterator(partition_info.location)) { + efs::remove_all(p.path()); + } + } + } else { + ldpp_dout(dpp, 5) << "initialize:: creating the persistent storage directory on start" << dendl; + efs::create_directories(partition_info.location); + } + } catch (const efs::filesystem_error& e) { + ldpp_dout(dpp, 0) << "initialize::: ERROR initializing the cache storage directory '" << partition_info.location << + "' : " << e.what() << dendl; + //return -EINVAL; Should return error from here? + } + + #if defined(HAVE_LIBAIO) && defined(__GLIBC__) + // libaio setup + struct aioinit ainit{0}; + ainit.aio_threads = cct->_conf.get_val("rgw_d3n_libaio_aio_threads"); + ainit.aio_num = cct->_conf.get_val("rgw_d3n_libaio_aio_num"); + ainit.aio_idle_time = 120; + aio_init(&ainit); + #endif + + return 0; +} + +int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) +{ + if (key_exists(dpp, key)) { + return 0; + } + + std::string location = partition_info.location + key; + + ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl; + FILE *cache_file = nullptr; + int r = 0; + size_t nbytes = 0; + + cache_file = fopen(location.c_str(), "w+"); + if (cache_file == nullptr) { + ldpp_dout(dpp, 0) << "ERROR: put::fopen file has return error, errno=" << errno << dendl; + return -errno; + } + + nbytes = fwrite(bl.c_str(), 1, len, cache_file); + if (nbytes != len) { + ldpp_dout(dpp, 0) << "ERROR: put::io_write: fwrite has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl; + return -EIO; + } + + r = fclose(cache_file); + if (r != 0) { + ldpp_dout(dpp, 0) << "ERROR: put::fclose file has return error, errno=" << errno << dendl; + return -errno; + } + + efs::space_info space = efs::space(location); + this->free_space = space.available; + + return insert_entry(dpp, key, 0, len); +} + +int SSDDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) +{ + if (!key_exists(dpp, key)) { + return -ENOENT; + } + + char buffer[len]; + std::string location = partition_info.location + key; + + ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl; + FILE *cache_file = nullptr; + int r = 0; + size_t nbytes = 0; + + cache_file = fopen(location.c_str(), "r+"); + if (cache_file == nullptr) { + ldpp_dout(dpp, 0) << "ERROR: put::fopen file has return error, errno=" << errno << dendl; + return -errno; + } + + nbytes = fread(buffer, sizeof(buffer), 1 , cache_file); + if (nbytes != len) { + ldpp_dout(dpp, 0) << "ERROR: put::io_read: fread has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl; + return -EIO; + } + + r = fclose(cache_file); + if (r != 0) { + ldpp_dout(dpp, 0) << "ERROR: put::fclose file has return error, errno=" << errno << dendl; + return -errno; + } + + ceph::encode(buffer, bl); + + return 0; +} + +std::unique_ptr SSDDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) +{ + return std::make_unique(this); +} + +rgw::AioResultList SSDDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) +{ + rgw_raw_obj r_obj; + r_obj.oid = key; + return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id); +} + +int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) +{ + std::string location = partition_info.location + key; + + if (!efs::remove(location)) { + ldpp_dout(dpp, 0) << "ERROR: delete_data::remove has failed to remove the file: " << location << dendl; + return -EIO; + } + + return remove_entry(dpp, key); +} + +int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) +{ + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl; + aio_cb.reset(new struct aiocb); + memset(aio_cb.get(), 0, sizeof(struct aiocb)); + aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY)); + if(aio_cb->aio_fildes < 0) { + int err = errno; + ldpp_dout(dpp, 1) << "ERROR: SSDCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl; + return -err; + } + if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) { + posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise); + } + + bufferptr bp(read_len); + aio_cb->aio_buf = bp.c_str(); + result.append(std::move(bp)); + + aio_cb->aio_nbytes = read_len; + aio_cb->aio_offset = read_ofs; + aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD; + aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch; + aio_cb->aio_sigevent.sigev_notify_attributes = nullptr; + aio_cb->aio_sigevent.sigev_value.sival_ptr = arg; + + return 0; +} + +void SSDDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval) +{ + auto p = std::unique_ptr{static_cast(sigval.sival_ptr)}; + auto op = std::move(p->user_data); + const int ret = -aio_error(op.aio_cb.get()); + boost::system::error_code ec; + if (ret < 0) { + ec.assign(-ret, boost::system::system_category()); + } + + ceph::async::dispatch(std::move(p), ec, std::move(op.result)); +} + +template +auto SSDDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler) +{ + auto p = Completion::create(ex1, std::move(handler)); + return p; +} + +template +auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, + off_t read_ofs, off_t read_len, CompletionToken&& token) +{ + std::string location = partition_info.location + key; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl; + + using Op = AsyncReadOp; + using Signature = typename Op::Signature; + boost::asio::async_completion init(token); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; + + int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get()); + if(0 == ret) { + ret = ::aio_read(op.aio_cb.get()); + } + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl; + if(ret < 0) { + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + (void)p.release(); + } + return init.result.get(); +} + +void SSDCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) +{ + using namespace boost::asio; + async_completion init(y.get_yield_context()); + auto ex = get_associated_executor(init.completion_handler); + + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl; + cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, SSDDriver::libaio_handler{aio, r})); +} + +} } // namespace rgw::cache diff --git a/src/rgw/rgw_ssd_driver.h b/src/rgw/rgw_ssd_driver.h new file mode 100644 index 000000000000..c783397d5a55 --- /dev/null +++ b/src/rgw/rgw_ssd_driver.h @@ -0,0 +1,110 @@ +#pragma once + +#include +#include "rgw_common.h" +#include "rgw_cache_driver.h" + +namespace rgw { namespace cache { //cal stands for Cache Abstraction Layer + +class SSDDriver; + +class SSDCacheAioRequest: public CacheAioRequest { +public: + SSDCacheAioRequest(SSDDriver* cache_driver) : cache_driver(cache_driver) {} + virtual ~SSDCacheAioRequest() = default; + virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; + virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; +private: + SSDDriver* cache_driver; +}; + +class SSDDriver : public CacheDriver { +public: + SSDDriver(Partition& partition_info); + virtual ~SSDDriver(); + + virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override; + virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; + virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override; + virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; + virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0; + virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override; + virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) = 0; + virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) = 0; + virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) = 0; + virtual int delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) = 0; + virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) = 0; + virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val) = 0; + + /* Entry */ + virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override { return entries.count(key) != 0; } + virtual std::vector list_entries(const DoutPrefixProvider* dpp) override; + virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override { return entries.size(); } + + /* Partition */ + virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; } + virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } + static std::optional get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type); + static std::vector list_partitions(const DoutPrefixProvider* dpp); + + virtual std::unique_ptr get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override; + + struct libaio_handler { + rgw::Aio* throttle = nullptr; + rgw::AioResult& r; + // read callback + void operator()(boost::system::error_code ec, bufferlist bl) const { + r.result = -ec.value(); + r.data = std::move(bl); + throttle->put(r); + } + }; + template + auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, + off_t read_ofs, off_t read_len, CompletionToken&& token); + +protected: + static std::unordered_map partitions; + std::unordered_map entries; + Partition partition_info; + uint64_t free_space; + uint64_t outstanding_write_size; + CephContext* cct; + + int add_partition_info(Partition& info); + int remove_partition_info(Partition& info); + int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len); + int remove_entry(const DoutPrefixProvider* dpp, std::string key); + std::optional get_entry(const DoutPrefixProvider* dpp, std::string key); + +private: +// unique_ptr with custom deleter for struct aiocb +struct libaio_aiocb_deleter { + void operator()(struct aiocb* c) { + if(c->aio_fildes > 0) { + if( ::close(c->aio_fildes) != 0) { + } + } + delete c; + } +}; + +using unique_aio_cb_ptr = std::unique_ptr; + +struct AsyncReadOp { + bufferlist result; + unique_aio_cb_ptr aio_cb; + using Signature = void(boost::system::error_code, bufferlist); + using Completion = ceph::async::Completion; + + int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg); + static void libaio_cb_aio_dispatch(sigval sigval); + + template + static auto create(const Executor1& ex1, CompletionHandler&& handler); + }; + +}; + +} } // namespace rgw::cache +