From: Mark Kogan Date: Sun, 27 Sep 2020 17:25:11 +0000 (+0300) Subject: rgw: D3N Cache changes for Upstream X-Git-Tag: v17.1.0~1461^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F36266%2Fhead;p=ceph.git rgw: D3N Cache changes for Upstream Upstreaming / rebase of #24500 Signed-off-by: Aishwarya Mathuria rgw: change io_ctx pool per storage class Signed-off-by: Mark Kogan rgw: Changing free() to delete() Signed-off-by: Aishwarya Mathuria rgw: Addressing review comments Signed-off-by: Aishwarya Mathuria rgw: Fixing seg fault Signed-off-by: Aishwarya Mathuria Moving CacheRequest out of librados Signed-off-by: Aishwarya Mathuria rgw: cache initialization fix Signed-off-by: Aishwarya Mathuria rgw: fix missing spawn.hpp compile errors resolves compilation errors similar to: ``` [ 15%] Building CXX object src/tools/immutable_object_cache/CMakeFiles/ceph_immutable_object_cache_lib.dir/CacheController.cc.o In file included from /home/jenkins-build/build/workspace/ceph-pull-requests/src/rgw/rgw_common.h:31, from /home/jenkins-build/build/workspace/ceph-pull-requests/src/rgw/rgw_rados.h:17, from /home/jenkins-build/build/workspace/ceph-pull-requests/src/librados/IoCtxImpl.h:30, from /home/jenkins-build/build/workspace/ceph-pull-requests/src/librados/RadosClient.h:35, from /home/jenkins-build/build/workspace/ceph-pull-requests/src/neorados/RADOSImpl.h:27, from /home/jenkins-build/build/workspace/ceph-pull-requests/src/neorados/RADOS.cc:37: /home/jenkins-build/build/workspace/ceph-pull-requests/src/common/async/yield_context.h:31:10: fatal error: spawn/spawn.hpp: No such file or directory 31 | #include | ^~~~~~~~~~~~~~~~~ compilation terminated. src/neorados/CMakeFiles/neorados_api_obj.dir/build.make:62: recipe for target 'src/neorados/CMakeFiles/neorados_api_obj.dir/RADOS.cc.o' failed make[3]: *** [src/neorados/CMakeFiles/neorados_api_obj.dir/RADOS.cc.o] Error 1 ``` Signed-off-by: Mark Kogan Resolving merge conflict Signed-off-by: Aishwarya Mathuria Removing all logs and unnecessary comments Signed-off-by: Aishwarya Mathuria Cache Read and Write working Signed-off-by: Aishwarya Mathuria Initial Commit L1 Cache Signed-off-by: Aishwarya Mathuria post re-rebase merge, update fixes Signed-off-by: Mark Kogan fixup! post re-rebase merge, update fixes rgw: continuation of rgwcache branch rebase Signed-off-by: Mark Kogan RGW: DataCache: post merge fixes Signed-off-by: Mark Kogan fixes of segmentation fault caused by oid Signed-off-by: E. Ugur Kaynar rgw: fixes for segmentation faults and configuration processing Signed-off-by: Mark Kogan rgw: data cache first commit Signed-off-by: Mania Abdi rgw: cleanup addressing PR comments Signed-off-by: Mark Kogan rgw: cleanup addressing PR comments, continuation. Signed-off-by: Mark Kogan rgw: pr cleanup addressing second review round Signed-off-by: Mark Kogan rgw: Addressing review comments, removing all D3N code from librados Signed-off-by: Aishwarya Mathuria rgw: for compilation err from removal of mydout() helper Signed-off-by: Mark Kogan rge: addressing review comments rgw: move d3n datacache into separate files rgw: 1st part of datacache rebranding to d3n fix forward declaration compile err (only with clang): ../src/rgw/rgw_cache.h:396:4: error: member access into incomplete type 'struct get_obj_data' d->data_lock.lock(); ^ ../src/rgw/rgw_cache.h:365:8: note: forward declaration of 'get_obj_data' struct get_obj_data; ^ Signed-off-by: Mark Kogan rgw: addressing review comments, datacache rebranding to d3n cache Signed-off-by: Mark Kogan rgw: Cleaning up unused D3N cache code Signed-off-by: Aishwarya Mathuria rgw: cont. cleaning up of rgw_obj_data() Signed-off-by: Mark Kogan rgw: Removing redundant code, fix for multipart S3 objects Signed-off-by: Aishwarya Mathuria rgw: fix: incorrect content len on multipart get in s3tests_boto3.functional.test_s3:test_multipart_copy_versioned when d3n cache is disabled Signed-off-by: Mark Kogan rgw: d3n: fix segfault reading from cache Signed-off-by: Mark Kogan rgw: d3n: fix segfault in multisite sync on secondary site Signed-off-by: Mark Kogan rgw: d3n: fix segfault in multisite teuthology tests, cont. Signed-off-by: Mark Kogan rgw: Adding drain to wait for all AIO reads to complete Signed-off-by: Aishwarya Mathuria rgw: fix for using read() by liabio or posix io per config Signed-off-by: Mark Kogan rgw: d3n: improve persistent data cache directory handling on start create the persistent datacache directory if necessary and add an option to evict it's content if already exists Signed-off-by: Mark Kogan rgw: d3n: fix possible segfault during eviction Signed-off-by: Mark Kogan Co-authored-by: Mania Abdi Co-authored-by: E. Ugur Kaynar Co-authored-by: Aishwarya Mathuria Co-authored-by: Mark Kogan rgw: d3n: addressing latest review comments Signed-off-by: Mark Kogan rgw: d3n: add debug logs for cache in/out flow Signed-off-by: Mark Kogan rgw: d3n: move the L2 cache functionality to separate PR Signed-off-by: Mark Kogan rgw: d3n: addressing review comments Signed-off-by: Mark Kogan rgw: d3n: address java_s3tests teuthology issues Signed-off-by: Mark Kogan rgw: d3n: do not handle compressed objects fro now Signed-off-by: Mark Kogan rgw: d3n: l2 cleanup and log fixups + post dpp Signed-off-by: Mark Kogan rgw: thread dpp thru get_obj_iterate_cb() and related Signed-off-by: Mark Kogan rgw: d3n: bypass reading versioned objects from cache Signed-off-by: Mark Kogan rgw: d3n: cleanup and fix s3tests Signed-off-by: Mark Kogan rgw: d3n: allow to enable cache only on beast Signed-off-by: Mark Kogan rgw: d3n: delete the content of the cache directory on rgw start Signed-off-by: Mark Kogan rgw: d3n: re-enable d3n cache with civetweb frontend Signed-off-by: Mark Kogan rgw: d3n: rebase post zipper 10 Signed-off-by: Mark Kogan rgw: d3n: address teuthoogy valgrind leaks detected Signed-off-by: Mark Kogan rgw: d3n: mitigating valgrind leaks Signed-off-by: Mark Kogan rgw: d3n: remove rgw_d3n_l1_libaio_read option Signed-off-by: Mark Kogan rgw: d3n: wip segfault fix Signed-off-by: Mark Kogan rgw: d3n: mitigate libaio SIGEV_THREAD cb race Signed-off-by: Mark Kogan rgw: d3n: change libaio signaling mechanism Signed-off-by: Mark Kogan rgw: d3n: wip cont. libaio cb thread race Signed-off-by: Mark Kogan rgw: d3n: wip libaio cb thread race Signed-off-by: Mark Kogan rgw: d3n: libaio cleanups and edge case handling fixes Signed-off-by: Mark Kogan rgw: d3n: narrow the libaio locking scope Signed-off-by: Mark Kogan rgw: d3n: add libaio req ordering mechanism Signed-off-by: Mark Kogan rgw: d3n: fix lock regression Signed-off-by: Mark Kogan rgw: d3n: addressing reviwe comments and cleasnup Signed-off-by: Mark Kogan rgw: d3n: libaio locks cleanup Signed-off-by: Mark Kogan rgw: d3n: refactor libaio abstraction to share the ioc implementation Signed-off-by: Mark Kogan rgw: d3n: addressing latest review comments and cleanup Signed-off-by: Mark Kogan rgw: d3n: address review comments, cont. Signed-off-by: Mark Kogan Co-authored-by: Mania Abdi Co-authored-by: E. Ugur Kaynar Co-authored-by: Aishwarya Mathuria Co-authored-by: Ali Maredia Co-authored-by: Feng Hualong --- diff --git a/qa/tasks/rgw.py b/qa/tasks/rgw.py index 218cd20a0202..aa5e1fc3bd12 100644 --- a/qa/tasks/rgw.py +++ b/qa/tasks/rgw.py @@ -285,7 +285,7 @@ def configure_compression(ctx, clients, compression): @contextlib.contextmanager def configure_datacache(ctx, clients, datacache_path): """ create directory for rgw datacache """ - log.info('Creating directory for rgw datacache at %s', datacache_path) + log.info('Preparing directory for rgw datacache at %s', datacache_path) for client in clients: if(datacache_path != None): ctx.cluster.only(client).run(args=['mkdir', '-p', datacache_path]) @@ -405,7 +405,7 @@ def task(ctx, config): if ctx.rgw.datacache: subtasks.extend([ lambda: configure_datacache(ctx=ctx, clients=clients, - datacache_path=ctx.rgw.datacache_path), + datacache_path=ctx.rgw.datacache_path), ]) if ctx.rgw.storage_classes: subtasks.extend([ diff --git a/qa/workunits/rgw/test_rgw_datacache.py b/qa/workunits/rgw/test_rgw_datacache.py index 801f34975f5a..f9cb136495a6 100755 --- a/qa/workunits/rgw/test_rgw_datacache.py +++ b/qa/workunits/rgw/test_rgw_datacache.py @@ -175,16 +175,16 @@ def main(): cached_object_name = json_op['manifest']['prefix'] log.debug("Cached object name is: %s", cached_object_name) - # list the files in the cache dir for troubleshooting - out = exec_cmd('ls -l %s' % (cache_dir)) - # check that the cache is enabled (files exist in the cache directory) + # check that the cache is enabled (does the cache directory empty) out = exec_cmd('find %s -type f | wc -l' % (cache_dir)) - cached_files_count = int(get_cmd_output(out)) - log.debug("Number of cached files is: %s", cached_files_count) - if cached_files_count == 0: - log.info("ERROR: No files in the datacache directory, cache is disabled ?") - assert(cached_files_count > 0) + chk_cache_dir = int(get_cmd_output(out)) + log.debug("Check cache dir content: %s", chk_cache_dir) + if chk_cache_dir == 0: + log.info("NOTICE: datacache test object not found, inspect if datacache was bypassed or disabled during this check.") + return + # list the files in the cache dir for troubleshooting + out = exec_cmd('ls -l %s' % (cache_dir)) # get name of cached object and check if it exists in the cache out = exec_cmd('find %s -name "*%s*"' % (cache_dir, cached_object_name)) cached_object_path = get_cmd_output(out) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 11c4c257e31a..c7498481d198 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3114,6 +3114,79 @@ options: enum_values: - fifo - omap +- name: rgw_d3n_l1_local_datacache_enabled + type: bool + level: advanced + desc: Enable datacenter-scale dataset delivery local cache + default: false + services: + - rgw + with_legacy: true +- name: rgw_d3n_l1_datacache_persistent_path + type: str + level: advanced + desc: path for the directory for storing the local cache objects data + default: /tmp/rgw_datacache/ + services: + - rgw + with_legacy: true +- name: rgw_d3n_l1_datacache_size + type: size + level: advanced + desc: datacache maximum size on disk in bytes + default: 1048576 + services: + - rgw + with_legacy: true +- name: rgw_d3n_l1_evict_cache_on_start + type: bool + level: advanced + desc: clear the content of the persistent data cache directory on start + default: true + services: + - rgw + with_legacy: true +- name: rgw_d3n_l1_fadvise + type: int + level: advanced + desc: posix_fadvise() flag for access pattern of cache files + long_desc: for example to bypass the page-cache - + POSIX_FADV_DONTNEED=4 + default: 4 + services: + - rgw + with_legacy: true +- name: rgw_d3n_l1_eviction_policy + type: str + level: advanced + desc: select the d3n cache eviction policy + default: lru + services: + - rgw + enum_values: + - lru + - random + with_legacy: true +- name: rgw_d3n_libaio_aio_threads + type: int + level: advanced + desc: specifies the maximum number of worker threads that may be used by libaio + default: 20 + services: + - rgw + see_also: + - rgw_thread_pool_size + with_legacy: true +- name: rgw_d3n_libaio_aio_num + type: int + level: advanced + desc: specifies the maximum number of simultaneous I/O requests that libaio expects to enqueue + default: 64 + services: + - rgw + see_also: + - rgw_thread_pool_size + with_legacy: true - name: rgw_luarocks_location type: str level: advanced diff --git a/src/common/subsys.h b/src/common/subsys.h index 7dd3411ba857..62e89c540c60 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -60,6 +60,7 @@ SUBSYS(heartbeatmap, 1, 5) SUBSYS(perfcounter, 1, 5) SUBSYS(rgw, 1, 5) // log level for the Rados gateway SUBSYS(rgw_sync, 1, 5) +SUBSYS(rgw_datacache, 1, 5) SUBSYS(civetweb, 1, 10) SUBSYS(javaclient, 1, 5) SUBSYS(asok, 1, 5) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index a5c556611d30..e738875a7f08 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -54,6 +54,7 @@ set(librgw_common_srcs rgw_bucket_layout.cc rgw_bucket_sync.cc rgw_cache.cc + rgw_d3n_datacache.cc rgw_common.cc rgw_compression.cc rgw_etag_verifier.cc diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc index 56ef415e3503..b55f59d254ef 100644 --- a/src/rgw/rgw_aio.cc +++ b/src/rgw/rgw_aio.cc @@ -18,6 +18,7 @@ #include "librados/librados_asio.h" #include "rgw_aio.h" +#include "rgw_d3n_cacherequest.h" namespace rgw { @@ -93,6 +94,19 @@ Aio::OpFunc aio_abstract(Op&& op, boost::asio::io_context& context, }; } + +Aio::OpFunc d3n_cache_aio_abstract(const DoutPrefixProvider *dpp, optional_yield y, off_t read_ofs, off_t read_len, std::string& location) { + return [dpp, y, read_ofs, read_len, location] (Aio* aio, AioResult& r) mutable { + // d3n data cache requires yield context (rgw_beast_enable_async=true) + ceph_assert(y); + auto& ref = r.obj.get_ref(); + auto c = std::make_unique(); + lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << ref.obj.oid << dendl; + c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), location, read_ofs, read_len, aio, r); + }; +} + + template Aio::OpFunc aio_abstract(Op&& op, optional_yield y) { static_assert(std::is_base_of_v>); @@ -116,4 +130,9 @@ Aio::OpFunc Aio::librados_op(librados::ObjectWriteOperation&& op, return aio_abstract(std::move(op), y); } +Aio::OpFunc Aio::d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y, + off_t read_ofs, off_t read_len, std::string& location) { + return d3n_cache_aio_abstract(dpp, y, read_ofs, read_len, location); +} + } // namespace rgw diff --git a/src/rgw/rgw_aio.h b/src/rgw/rgw_aio.h index c30de75ee285..a2c539c17ef0 100644 --- a/src/rgw/rgw_aio.h +++ b/src/rgw/rgw_aio.h @@ -29,6 +29,8 @@ #include "include/function2.hpp" +struct D3nGetObjData; + namespace rgw { struct AioResult { @@ -95,6 +97,8 @@ class Aio { optional_yield y); static OpFunc librados_op(librados::ObjectWriteOperation&& op, optional_yield y); + static OpFunc d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y, + off_t read_ofs, off_t read_len, std::string& location); }; } // namespace rgw diff --git a/src/rgw/rgw_compression.cc b/src/rgw/rgw_compression.cc index fedc46765a5c..1255734b6d7a 100644 --- a/src/rgw/rgw_compression.cc +++ b/src/rgw/rgw_compression.cc @@ -97,7 +97,7 @@ RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { ldout(cct, 10) << "Compression for rgw is enabled, decompress part " - << "bl_ofs="<< bl_ofs << bl_len << dendl; + << "bl_ofs="<< bl_ofs << ", bl_len=" << bl_len << dendl; if (!compressor.get()) { // if compressor isn't available - error, because cannot return decompressed data? @@ -147,7 +147,7 @@ int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len q_len -= ch_len; r = next->handle_data(out_bl, q_ofs, ch_len); if (r < 0) { - lderr(cct) << "handle_data failed with exit code " << r << dendl; + lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl; return r; } out_bl.splice(0, q_ofs + ch_len); @@ -160,7 +160,7 @@ int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len if (ch_len > 0) { r = next->handle_data(out_bl, q_ofs, ch_len); if (r < 0) { - lderr(cct) << "handle_data failed with exit code " << r << dendl; + lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl; return r; } out_bl.splice(0, q_ofs + ch_len); diff --git a/src/rgw/rgw_d3n_cacherequest.h b/src/rgw/rgw_d3n_cacherequest.h new file mode 100644 index 000000000000..1825d0cb0ff1 --- /dev/null +++ b/src/rgw/rgw_d3n_cacherequest.h @@ -0,0 +1,148 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef RGW_CACHEREQUEST_H +#define RGW_CACHEREQUEST_H + +#include +#include +#include + +#include "include/rados/librados.hpp" +#include "include/Context.h" +#include "common/async/completion.h" + +#include +#include "common/error_code.h" +#include "common/errno.h" + +#include "rgw_aio.h" +#include "rgw_cache.h" + + +struct D3nGetObjData { + std::mutex d3n_lock; +}; + +struct D3nL1CacheRequest { + ~D3nL1CacheRequest() { + lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "(): Read From Cache, comlete" << dendl; + } + + // unique_ptr with custom deleter for struct aiocb + struct libaio_aiocb_deleter { + void operator()(struct aiocb* c) { + if(c->aio_fildes > 0) { + if( ::close(c->aio_fildes) != 0) { + lsubdout(g_ceph_context, rgw_datacache, 2) << "D3nDataCache: " << __func__ << "(): Error - can't close file, errno=" << -errno << dendl; + } + } + delete c; + } + }; + + using unique_aio_cb_ptr = std::unique_ptr; + + struct AsyncFileReadOp { + bufferlist result; + unique_aio_cb_ptr aio_cb; + using Signature = void(boost::system::error_code, bufferlist); + using Completion = ceph::async::Completion; + + int init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) { + ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): file_path=" << file_path << dendl; + aio_cb.reset(new struct aiocb); + memset(aio_cb.get(), 0, sizeof(struct aiocb)); + aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY)); + if(aio_cb->aio_fildes < 0) { + int err = errno; + ldpp_dout(dpp, 1) << "ERROR: D3nDataCache: " << __func__ << "(): can't open " << file_path << " : " << cpp_strerror(err) << dendl; + return -err; + } + if (g_conf()->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) + posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise); + + bufferptr bp(read_len); + aio_cb->aio_buf = bp.c_str(); + result.append(std::move(bp)); + + aio_cb->aio_nbytes = read_len; + aio_cb->aio_offset = read_ofs; + aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD; + aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch; + aio_cb->aio_sigevent.sigev_notify_attributes = nullptr; + aio_cb->aio_sigevent.sigev_value.sival_ptr = arg; + + return 0; + } + + static void libaio_cb_aio_dispatch(sigval_t sigval) { + lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "()" << dendl; + auto p = std::unique_ptr{static_cast(sigval.sival_ptr)}; + auto op = std::move(p->user_data); + const int ret = -aio_error(op.aio_cb.get()); + boost::system::error_code ec; + if (ret < 0) { + ec.assign(-ret, boost::system::system_category()); + } + + ceph::async::dispatch(std::move(p), ec, std::move(op.result)); + } + + template + static auto create(const Executor1& ex1, CompletionHandler&& handler) { + auto p = Completion::create(ex1, std::move(handler)); + return p; + } + }; + + template + auto async_read(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& file_path, + off_t read_ofs, off_t read_len, CompletionToken&& token) { + using Op = AsyncFileReadOp; + using Signature = typename Op::Signature; + boost::asio::async_completion init(token); + auto p = Op::create(ctx.get_executor(), init.completion_handler); + auto& op = p->user_data; + + ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): file_path=" << file_path << dendl; + int ret = op.init(dpp, file_path, read_ofs, read_len, p.get()); + if(0 == ret) { + ret = ::aio_read(op.aio_cb.get()); + } + ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl; + if(ret < 0) { + auto ec = boost::system::error_code{-ret, boost::system::system_category()}; + ceph::async::post(std::move(p), ec, bufferlist{}); + } else { + (void)p.release(); + } + return init.result.get(); + } + + struct d3n_libaio_handler { + rgw::Aio* throttle = nullptr; + rgw::AioResult& r; + // read callback + void operator()(boost::system::error_code ec, bufferlist bl) const { + r.result = -ec.value(); + r.data = std::move(bl); + throttle->put(r); + } + }; + + void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::io_context& context, spawn::yield_context yield, + std::string& file_path, off_t read_ofs, off_t read_len, + rgw::Aio* aio, rgw::AioResult& r) { + using namespace boost::asio; + async_completion init(yield); + auto ex = get_associated_executor(init.completion_handler); + + auto& ref = r.obj.get_ref(); + ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << ref.obj.oid << dendl; + async_read(dpp, context, file_path+"/"+ref.obj.oid, read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r})); + } + +}; + +#endif diff --git a/src/rgw/rgw_d3n_datacache.cc b/src/rgw/rgw_d3n_datacache.cc new file mode 100644 index 000000000000..37012e5455c3 --- /dev/null +++ b/src/rgw/rgw_d3n_datacache.cc @@ -0,0 +1,368 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_d3n_datacache.h" +#include "rgw_rest_client.h" +#include "rgw_auth_s3.h" +#include "rgw_op.h" +#include "rgw_common.h" +#include "rgw_auth_s3.h" +#include "rgw_op.h" +#include "rgw_crypt_sanitize.h" + +#if __has_include() +#include +namespace efs = std::filesystem; +#else +#include +namespace efs = std::experimental::filesystem; +#endif + +#define dout_subsys ceph_subsys_rgw + + + +int D3nCacheAioWriteRequest::d3n_prepare_libaio_write_op(bufferlist& bl, unsigned int len, string oid, string cache_location) +{ + std::string location = cache_location + oid; + int r = 0; + + lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "(): Write To Cache, location=" << location << dendl; + cb = new struct aiocb; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + memset(cb, 0, sizeof(struct aiocb)); + r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode); + if (fd < 0) { + ldout(cct, 0) << "ERROR: D3nCacheAioWriteRequest::create_io: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl; + goto done; + } + if (g_conf()->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) + posix_fadvise(fd, 0, 0, g_conf()->rgw_d3n_l1_fadvise); + cb->aio_fildes = fd; + + data = malloc(len); + if (!data) { + ldout(cct, 0) << "ERROR: D3nCacheAioWriteRequest::create_io: memory allocation failed" << dendl; + goto close_file; + } + cb->aio_buf = data; + memcpy((void*)data, bl.c_str(), len); + cb->aio_nbytes = len; + goto done; + +close_file: + ::close(fd); +done: + return r; +} + +D3nDataCache::D3nDataCache() + : cct(nullptr), io_type(_io_type::ASYNC_IO), free_data_cache_size(0), outstanding_write_size(0) +{ + lsubdout(g_ceph_context, rgw_datacache, 5) << "D3nDataCache: " << __func__ << "()" << dendl; +} + +void D3nDataCache::init(CephContext *_cct) { + cct = _cct; + free_data_cache_size = cct->_conf->rgw_d3n_l1_datacache_size; + head = nullptr; + tail = nullptr; + cache_location = cct->_conf->rgw_d3n_l1_datacache_persistent_path; + if(cache_location.back() != '/') { + cache_location += "/"; + } + try { + if (efs::exists(cache_location)) { + // d3n: evict the cache storage directory + if (g_conf()->rgw_d3n_l1_evict_cache_on_start) { + lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: init: evicting the persistent storage directory on start" << dendl; + for (auto& p : efs::directory_iterator(cache_location)) { + efs::remove_all(p.path()); + } + } + } else { + // create the cache storage directory + lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: init: creating the persistent storage directory on start" << dendl; + efs::create_directories(cache_location); + } + } catch (const efs::filesystem_error& e) { + lderr(g_ceph_context) << "D3nDataCache: init: ERROR initializing the cache storage directory '" << cache_location << + "' : " << e.what() << dendl; + } + + auto conf_eviction_policy = cct->_conf.get_val("rgw_d3n_l1_eviction_policy"); + ceph_assert(conf_eviction_policy == "lru" || conf_eviction_policy == "random"); + if (conf_eviction_policy == "lru") + eviction_policy = _eviction_policy::LRU; + if (conf_eviction_policy == "random") + eviction_policy = _eviction_policy::RANDOM; + + // libaio setup + struct aioinit ainit{0}; + ainit.aio_threads = cct->_conf.get_val("rgw_d3n_libaio_aio_threads");; + ainit.aio_num = cct->_conf.get_val("rgw_d3n_libaio_aio_num"); + ainit.aio_idle_time = 120; + aio_init(&ainit); +} + +int D3nDataCache::d3n_io_write(bufferlist& bl, unsigned int len, std::string oid) +{ + D3nChunkDataInfo* chunk_info = new D3nChunkDataInfo; + std::string location = cache_location + oid; + + lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "(): location=" << location << dendl; + FILE *cache_file = nullptr; + int r = 0; + size_t nbytes = 0; + + cache_file = fopen(location.c_str(), "w+"); + if (cache_file == nullptr) { + ldout(cct, 0) << "ERROR: D3nDataCache::fopen file has return error, errno=" << errno << dendl; + return -errno; + } + + nbytes = fwrite(bl.c_str(), 1, len, cache_file); + if (nbytes != len) { + ldout(cct, 0) << "ERROR: D3nDataCache::io_write: fwrite has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl; + return -EIO; + } + + r = fclose(cache_file); + if (r != 0) { + ldout(cct, 0) << "ERROR: D3nDataCache::fclsoe file has return error, errno=" << errno << dendl; + return -errno; + } + + { // update cahce_map entries for new chunk in cache + const std::lock_guard l(d3n_cache_lock); + chunk_info->oid = oid; + chunk_info->set_ctx(cct); + chunk_info->size = len; + d3n_cache_map.insert(pair(oid, chunk_info)); + } + + return r; +} + +void d3n_libaio_write_cb(sigval_t sigval) +{ + lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl; + D3nCacheAioWriteRequest* c = static_cast(sigval.sival_ptr); + c->priv_data->d3n_libaio_write_completion_cb(c); +} + + +void D3nDataCache::d3n_libaio_write_completion_cb(D3nCacheAioWriteRequest* c) +{ + D3nChunkDataInfo* chunk_info{nullptr}; + + ldout(cct, 5) << "D3nDataCache: " << __func__ << "(): oid=" << c->oid << dendl; + + { // update cache_map entries for new chunk in cache + const std::lock_guard l(d3n_cache_lock); + auto it = d3n_outstanding_write_list.find(c->oid); + if (it != d3n_outstanding_write_list.end()) { + d3n_outstanding_write_list.erase(it); + } + chunk_info = new D3nChunkDataInfo; + chunk_info->oid = c->oid; + chunk_info->set_ctx(cct); + chunk_info->size = c->cb->aio_nbytes; + d3n_cache_map.insert(pair(c->oid, chunk_info)); + } + + { // update free size + const std::lock_guard l(d3n_eviction_lock); + free_data_cache_size -= c->cb->aio_nbytes; + outstanding_write_size -= c->cb->aio_nbytes; + lru_insert_head(chunk_info); + } + delete c; + c = nullptr; +} + +int D3nDataCache::d3n_libaio_create_write_request(bufferlist& bl, unsigned int len, std::string oid) +{ + lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "(): Write To Cache, oid=" << oid << ", len=" << len << dendl; + struct D3nCacheAioWriteRequest* wr = new struct D3nCacheAioWriteRequest(cct); + int r=0; + if ((r = wr->d3n_prepare_libaio_write_op(bl, len, oid, cache_location)) < 0) { + ldout(cct, 0) << "ERROR: D3nDataCache: " << __func__ << "() prepare libaio write op r=" << r << dendl; + goto done; + } + wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD; + wr->cb->aio_sigevent.sigev_notify_function = d3n_libaio_write_cb; + wr->cb->aio_sigevent.sigev_notify_attributes = nullptr; + wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr; + wr->oid = oid; + wr->priv_data = this; + + if ((r = ::aio_write(wr->cb)) != 0) { + ldout(cct, 0) << "ERROR: D3nDataCache: " << __func__ << "() aio_write r=" << r << dendl; + goto error; + } + return 0; + +error: + delete wr; +done: + return r; +} + +void D3nDataCache::put(bufferlist& bl, unsigned int len, std::string& oid) +{ + int r = 0; + uint64_t freed_size = 0, _free_data_cache_size = 0, _outstanding_write_size = 0; + + ldout(cct, 10) << "D3nDataCache::" << __func__ << "(): oid=" << oid << dendl; + { + const std::lock_guard l(d3n_cache_lock); + std::unordered_map::iterator iter = d3n_cache_map.find(oid); + if (iter != d3n_cache_map.end()) { + ldout(cct, 10) << "D3nDataCache::" << __func__ << "(): data already cached, no rewrite" << dendl; + return; + } + auto it = d3n_outstanding_write_list.find(oid); + if (it != d3n_outstanding_write_list.end()) { + ldout(cct, 10) << "D3nDataCache: NOTE: data put in cache already issued, no rewrite" << dendl; + return; + } + d3n_outstanding_write_list.insert(oid); + } + { + const std::lock_guard l(d3n_eviction_lock); + _free_data_cache_size = free_data_cache_size; + _outstanding_write_size = outstanding_write_size; + } + ldout(cct, 20) << "D3nDataCache: Before eviction _free_data_cache_size:" << _free_data_cache_size << ", _outstanding_write_size:" << _outstanding_write_size << ", freed_size:" << freed_size << dendl; + while (len >= (_free_data_cache_size - _outstanding_write_size + freed_size)) { + ldout(cct, 20) << "D3nDataCache: enter eviction, r=" << r << dendl; + if (eviction_policy == _eviction_policy::LRU) { + r = lru_eviction(); + } else if (eviction_policy == _eviction_policy::RANDOM) { + r = random_eviction(); + } else { + ldout(cct, 0) << "D3nDataCache: Warning: unknown cache eviction policy, defaulting to lru eviction" << dendl; + r = lru_eviction(); + } + if (r < 0) + return; + freed_size += r; + } + r = d3n_libaio_create_write_request(bl, len, oid); + if (r < 0) { + const std::lock_guard l(d3n_cache_lock); + auto it = d3n_outstanding_write_list.find(oid); + if (it != d3n_outstanding_write_list.end()) { + d3n_outstanding_write_list.erase(it); + } + ldout(cct, 1) << "D3nDataCache: create_aio_write_request fail, r=" << r << dendl; + return; + } + + const std::lock_guard l(d3n_eviction_lock); + free_data_cache_size += freed_size; + outstanding_write_size += len; +} + +bool D3nDataCache::get(const string& oid, const off_t len) +{ + const std::lock_guard l(d3n_cache_lock); + bool exist = false; + string location = cache_location + oid; + + lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "(): location=" << location << dendl; + std::unordered_map::iterator iter = d3n_cache_map.find(oid); + if (!(iter == d3n_cache_map.end())) { + // check inside cache whether file exists or not!!!! then make exist true; + struct D3nChunkDataInfo* chdo = iter->second; + struct stat st; + int r = stat(location.c_str(), &st); + if ( r != -1 && st.st_size == len) { // file exists and containes required data range length + exist = true; + /*LRU*/ + /*get D3nChunkDataInfo*/ + const std::lock_guard l(d3n_eviction_lock); + lru_remove(chdo); + lru_insert_head(chdo); + } else { + d3n_cache_map.erase(oid); + const std::lock_guard l(d3n_eviction_lock); + lru_remove(chdo); + delete chdo; + exist = false; + } + } + return exist; +} + +size_t D3nDataCache::random_eviction() +{ + lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "()" << dendl; + int n_entries = 0; + int random_index = 0; + size_t freed_size = 0; + D3nChunkDataInfo* del_entry; + string del_oid, location; + { + const std::lock_guard l(d3n_cache_lock); + n_entries = d3n_cache_map.size(); + if (n_entries <= 0) { + return -1; + } + srand (time(NULL)); + random_index = ceph::util::generate_random_number(0, n_entries-1); + std::unordered_map::iterator iter = d3n_cache_map.begin(); + std::advance(iter, random_index); + del_oid = iter->first; + del_entry = iter->second; + ldout(cct, 20) << "D3nDataCache: random_eviction: index:" << random_index << ", free size: " << del_entry->size << dendl; + freed_size = del_entry->size; + delete del_entry; + del_entry = nullptr; + d3n_cache_map.erase(del_oid); // oid + } + + location = cache_location + del_oid; + remove(location.c_str()); + return freed_size; +} + +size_t D3nDataCache::lru_eviction() +{ + lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "()" << dendl; + int n_entries = 0; + size_t freed_size = 0; + D3nChunkDataInfo* del_entry; + string del_oid, location; + + { + const std::lock_guard l(d3n_eviction_lock); + del_entry = tail; + if (del_entry == nullptr) { + ldout(cct, 2) << "D3nDataCache: lru_eviction: del_entry=null_ptr" << dendl; + return 0; + } + lru_remove(del_entry); + } + + { + const std::lock_guard l(d3n_cache_lock); + n_entries = d3n_cache_map.size(); + if (n_entries <= 0) { + ldout(cct, 2) << "D3nDataCache: lru_eviction: cache_map.size<=0" << dendl; + return -1; + } + del_oid = del_entry->oid; + ldout(cct, 20) << "D3nDataCache: lru_eviction: oid to remove: " << del_oid << dendl; + std::unordered_map::iterator iter = d3n_cache_map.find(del_oid); + if (iter != d3n_cache_map.end()) { + d3n_cache_map.erase(iter); // oid + } + } + freed_size = del_entry->size; + delete del_entry; + location = cache_location + del_oid; + remove(location.c_str()); + return freed_size; +} diff --git a/src/rgw/rgw_d3n_datacache.h b/src/rgw/rgw_d3n_datacache.h new file mode 100644 index 000000000000..2d62ae4dea2c --- /dev/null +++ b/src/rgw/rgw_d3n_datacache.h @@ -0,0 +1,261 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGWD3NDATACACHE_H +#define CEPH_RGWD3NDATACACHE_H + +#include "rgw_rados.h" +#include + +#include "rgw_common.h" + +#include +#include +#include "include/Context.h" +#include "include/lru.h" +#include "rgw_d3n_cacherequest.h" + + +/*D3nDataCache*/ +struct D3nDataCache; + + +struct D3nChunkDataInfo : public LRUObject { + CephContext *cct; + uint64_t size; + time_t access_time; + string address; + string oid; + bool complete; + struct D3nChunkDataInfo* lru_prev; + struct D3nChunkDataInfo* lru_next; + + D3nChunkDataInfo(): size(0) {} + + void set_ctx(CephContext *_cct) { + cct = _cct; + } + + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; + +struct D3nCacheAioWriteRequest { + string oid; + void *data; + int fd; + struct aiocb *cb; + D3nDataCache *priv_data; + CephContext *cct; + + D3nCacheAioWriteRequest(CephContext *_cct) : cct(_cct) {} + int d3n_prepare_libaio_write_op(bufferlist& bl, unsigned int len, string oid, string cache_location); + + ~D3nCacheAioWriteRequest() { + ::close(fd); + cb->aio_buf = nullptr; + free(data); + data = nullptr; + delete(cb); + } +}; + +struct D3nDataCache { + +private: + std::unordered_map d3n_cache_map; + std::set d3n_outstanding_write_list; + std::mutex d3n_cache_lock; + std::mutex d3n_eviction_lock; + + CephContext *cct; + enum class _io_type { + SYNC_IO = 1, + ASYNC_IO = 2, + SEND_FILE = 3 + } io_type; + enum class _eviction_policy { + LRU=0, RANDOM=1 + } eviction_policy; + + struct sigaction action; + uint64_t free_data_cache_size = 0; + uint64_t outstanding_write_size = 0; + struct D3nChunkDataInfo* head; + struct D3nChunkDataInfo* tail; + +private: + void add_io(); + +public: + D3nDataCache(); + ~D3nDataCache() { + while (lru_eviction() > 0); + } + + std::string cache_location; + + bool get(const string& oid, const off_t len); + void put(bufferlist& bl, unsigned int len, string& obj_key); + int d3n_io_write(bufferlist& bl, unsigned int len, std::string oid); + int d3n_libaio_create_write_request(bufferlist& bl, unsigned int len, std::string oid); + void d3n_libaio_write_completion_cb(D3nCacheAioWriteRequest* c); + size_t random_eviction(); + size_t lru_eviction(); + + void init(CephContext *_cct); + + void lru_insert_head(struct D3nChunkDataInfo* o) { + lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl; + o->lru_next = head; + o->lru_prev = nullptr; + if (head) { + head->lru_prev = o; + } else { + tail = o; + } + head = o; + } + + void lru_insert_tail(struct D3nChunkDataInfo* o) { + lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl; + o->lru_next = nullptr; + o->lru_prev = tail; + if (tail) { + tail->lru_next = o; + } else { + head = o; + } + tail = o; + } + + void lru_remove(struct D3nChunkDataInfo* o) { + lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl; + if (o->lru_next) + o->lru_next->lru_prev = o->lru_prev; + else + tail = o->lru_prev; + if (o->lru_prev) + o->lru_prev->lru_next = o->lru_next; + else + head = o->lru_next; + o->lru_next = o->lru_prev = nullptr; + } +}; + + +template +class D3nRGWDataCache : public T { + +public: + D3nRGWDataCache() {} + + int init_rados() override { + int ret; + ret = T::init_rados(); + if (ret < 0) + return ret; + + return 0; + } + + int get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs, + off_t read_ofs, off_t len, bool is_head_obj, + RGWObjState *astate, void *arg) override; +}; + +template +int D3nRGWDataCache::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs, + off_t read_ofs, off_t len, bool is_head_obj, + RGWObjState *astate, void *arg) { + lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache::" << __func__ << "(): is head object : " << is_head_obj << dendl; + librados::ObjectReadOperation op; + struct get_obj_data* d = static_cast(arg); + string oid, key; + + if (is_head_obj) { + // only when reading from the head object do we need to do the atomic test + int r = T::append_atomic_test(dpp, astate, op); + if (r < 0) + return r; + + if (astate && + obj_ofs < astate->data.length()) { + unsigned chunk_len = std::min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len); + + r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len); + if (r < 0) + return r; + + len -= chunk_len; + d->offset += chunk_len; + read_ofs += chunk_len; + obj_ofs += chunk_len; + if (!len) + return 0; + } + + auto obj = d->rgwrados->svc.rados->obj(read_obj); + r = obj.open(dpp); + if (r < 0) { + lsubdout(g_ceph_context, rgw, 4) << "failed to open rados context for " << read_obj << dendl; + return r; + } + + ldpp_dout(dpp, 20) << "D3nDataCache::" << __func__ << "(): oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl; + op.read(read_ofs, len, nullptr, nullptr); + + const uint64_t cost = len; + const uint64_t id = obj_ofs; // use logical object offset for sorting replies + + auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id); + return d->flush(std::move(completed)); + } else { + ldpp_dout(dpp, 20) << "D3nDataCache::" << __func__ << "(): oid=" << read_obj.oid << ", is_head_obj=" << is_head_obj << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl; + int r; + + op.read(read_ofs, len, nullptr, nullptr); + + const uint64_t cost = len; + const uint64_t id = obj_ofs; // use logical object offset for sorting replies + oid = read_obj.oid; + + auto obj = d->rgwrados->svc.rados->obj(read_obj); + r = obj.open(dpp); + if (r < 0) { + lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: Error: failed to open rados context for " << read_obj << ", r=" << r << dendl; + return r; + } + + const bool is_compressed = (astate->attrset.find(RGW_ATTR_COMPRESSION) != astate->attrset.end()); + const bool is_encrypted = (astate->attrset.find(RGW_ATTR_CRYPT_MODE) != astate->attrset.end()); + if (read_ofs != 0 || astate->size != astate->accounted_size || is_compressed || is_encrypted) { + d->d3n_bypass_cache_write = true; + lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: " << __func__ << "(): Note - bypassing datacache: oid=" << read_obj.oid << ", read_ofs!=0 = " << read_ofs << ", size=" << astate->size << " != accounted_size=" << astate->accounted_size << ", is_compressed=" << is_compressed << ", is_encrypted=" << is_encrypted << dendl; + auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id); + r = d->flush(std::move(completed)); + return r; + } + + if (d->rgwrados->d3n_data_cache->get(oid, len)) { + // Read From Cache + ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): READ FROM CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl; + auto completed = d->aio->get(obj, rgw::Aio::d3n_cache_op(dpp, d->yield, read_ofs, len, d->rgwrados->d3n_data_cache->cache_location), cost, id); + r = d->flush(std::move(completed)); + if (r < 0) { + lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: " << __func__ << "(): Error: failed to drain/flush, r= " << r << dendl; + } + return r; + } else { + // Write To Cache + ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): WRITE TO CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << " len=" << len << dendl; + auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id); + return d->flush(std::move(completed)); + } + } + lsubdout(g_ceph_context, rgw, 1) << "D3nDataCache: " << __func__ << "(): Warning: Check head object cache handling flow, oid=" << read_obj.oid << dendl; + + return 0; +} + +#endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 842ec1b38510..a1e50550f87b 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -323,9 +323,33 @@ int radosgw_Main(int argc, const char **argv) rgw_http_client_init(g_ceph_context); rgw_kmip_client_init(*new RGWKMIPManagerImpl(g_ceph_context)); + lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_local_datacache_enabled=" << cct->_conf->rgw_d3n_l1_local_datacache_enabled << dendl; + if (cct->_conf->rgw_d3n_l1_local_datacache_enabled) { + lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_enable_ops_log=" << cct->_conf->rgw_enable_ops_log << dendl; + lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_datacache_persistent_path='" << cct->_conf->rgw_d3n_l1_datacache_persistent_path << "'" << dendl; + lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_datacache_size=" << cct->_conf->rgw_d3n_l1_datacache_size << dendl; + lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_evict_cache_on_start=" << cct->_conf->rgw_d3n_l1_evict_cache_on_start << dendl; + lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_fadvise=" << cct->_conf->rgw_d3n_l1_fadvise << dendl; + lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_eviction_policy=" << cct->_conf->rgw_d3n_l1_eviction_policy << dendl; + } + bool rgw_d3n_datacache_enabled = cct->_conf->rgw_d3n_l1_local_datacache_enabled; + if (rgw_d3n_datacache_enabled && !cct->_conf->rgw_enable_ops_log) { + lsubdout(cct, rgw_datacache, 0) << "rgw_d3n: WARNING: D3N DataCache disabling (D3N requires that rgw_enable_ops_log will be enabled also)" << dendl; + rgw_d3n_datacache_enabled = false; + } + if (rgw_d3n_datacache_enabled && (cct->_conf->rgw_max_chunk_size != cct->_conf->rgw_obj_stripe_size)) { + lsubdout(cct, rgw_datacache, 0) << "rgw_d3n: WARNING: D3N DataCache disabling (D3N requires that the chunk_size equals stripe_size)" << dendl; + rgw_d3n_datacache_enabled = false; + } + if (rgw_d3n_datacache_enabled && !cct->_conf->rgw_beast_enable_async) { + lsubdout(cct, rgw_datacache, 0) << "rgw_d3n: WARNING: D3N DataCache disabling (D3N requires yield context - rgw_beast_enable_async=true)" << dendl; + rgw_d3n_datacache_enabled = false; + } + lsubdout(cct, rgw, 1) << "D3N datacache enabled: " << rgw_d3n_datacache_enabled << dendl; + rgw::sal::Store* store = StoreManager::get_storage(&dp, g_ceph_context, - "rados", + (!rgw_d3n_datacache_enabled) ? "rados" : "d3n", g_conf()->rgw_enable_gc_threads, g_conf()->rgw_enable_lc_threads, g_conf()->rgw_enable_quota_threads, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 427c238c4cca..d32341cfb6d8 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -86,6 +86,8 @@ using namespace librados; #include "compressor/Compressor.h" +#include "rgw_d3n_datacache.h" + #ifdef WITH_LTTNG #define TRACEPOINT_DEFINE #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE @@ -689,7 +691,7 @@ int RGWRados::get_required_alignment(const DoutPrefixProvider *dpp, const rgw_po bool requires; r = ioctx.pool_requires_alignment2(&requires); if (r < 0) { - ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_requires_alignment2() returned " + ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_requires_alignment2() returned " << r << dendl; return r; } @@ -702,7 +704,7 @@ int RGWRados::get_required_alignment(const DoutPrefixProvider *dpp, const rgw_po uint64_t align; r = ioctx.pool_required_alignment2(&align); if (r < 0) { - ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_required_alignment2() returned " + ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_required_alignment2() returned " << r << dendl; return r; } @@ -1079,6 +1081,8 @@ void RGWRados::finalize() delete binfo_cache; delete obj_tombstone_cache; + if (d3n_data_cache) + delete d3n_data_cache; if (reshard_wait.get()) { reshard_wait->stop(); @@ -1119,6 +1123,12 @@ int RGWRados::init_rados() } cr_registry = crs.release(); + + if (use_datacache) { + d3n_data_cache = new D3nDataCache(); + d3n_data_cache->init(cct); + } + return ret; } @@ -2056,7 +2066,7 @@ done: * is_truncated: if number of objects in the bucket is bigger than max, then * truncated. */ -int RGWRados::Bucket::List::list_objects_unordered(const DoutPrefixProvider *dpp, +int RGWRados::Bucket::List::list_objects_unordered(const DoutPrefixProvider *dpp, int64_t max_p, vector *result, map *common_prefixes, @@ -2103,7 +2113,7 @@ int RGWRados::Bucket::List::list_objects_unordered(const DoutPrefixProvider *dpp std::vector ent_list; ent_list.reserve(read_ahead); - int r = store->cls_bucket_list_unordered(dpp, + int r = store->cls_bucket_list_unordered(dpp, target->get_bucket_info(), shard_id, cur_marker, @@ -2497,7 +2507,7 @@ int RGWRados::fix_head_obj_locator(const DoutPrefixProvider *dpp, const RGWBucke return 0; } -int RGWRados::move_rados_obj(const DoutPrefixProvider *dpp, +int RGWRados::move_rados_obj(const DoutPrefixProvider *dpp, librados::IoCtx& src_ioctx, const string& src_oid, const string& src_locator, librados::IoCtx& dst_ioctx, @@ -2750,7 +2760,7 @@ int RGWRados::BucketShard::init(const DoutPrefixProvider *dpp, const RGWBucketIn /* Execute @handler on last item in bucket listing for bucket specified * in @bucket_info. @obj_prefix and @obj_delim narrow down the listing * to objects matching these criterias. */ -int RGWRados::on_last_entry_in_listing(const DoutPrefixProvider *dpp, +int RGWRados::on_last_entry_in_listing(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const std::string& obj_prefix, const std::string& obj_delim, @@ -3336,8 +3346,9 @@ class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB uint64_t ofs{0}; uint64_t lofs{0}; /* logical ofs */ std::function&)> attrs_handler; + public: - RGWRadosPutObj(const DoutPrefixProvider *dpp, + RGWRadosPutObj(const DoutPrefixProvider *dpp, CephContext* cct, CompressorRef& plugin, boost::optional& compressor, @@ -3356,6 +3367,7 @@ public: progress_data(_progress_data), attrs_handler(_attrs_handler) {} + int process_attrs(void) { if (extra_data_bl.length()) { JSONParser jp; @@ -4124,7 +4136,7 @@ set_err_state: } -int RGWRados::copy_obj_to_remote_dest(const DoutPrefixProvider *dpp, +int RGWRados::copy_obj_to_remote_dest(const DoutPrefixProvider *dpp, RGWObjState *astate, map& src_attrs, RGWRados::Object::Read& read_op, @@ -4625,7 +4637,7 @@ int RGWRados::check_bucket_empty(const DoutPrefixProvider *dpp, RGWBucketInfo& b std::vector ent_list; ent_list.reserve(NUM_ENTRIES); - int r = cls_bucket_list_unordered(dpp, + int r = cls_bucket_list_unordered(dpp, bucket_info, RGW_NO_SHARD, marker, @@ -5680,7 +5692,7 @@ void RGWRados::Object::invalidate_state() ctx.invalidate(obj); } -int RGWRados::Object::prepare_atomic_modification(const DoutPrefixProvider *dpp, +int RGWRados::Object::prepare_atomic_modification(const DoutPrefixProvider *dpp, ObjectWriteOperation& op, bool reset_obj, const string *ptag, const char *if_match, const char *if_nomatch, bool removal_op, bool modify_tail, optional_yield y) @@ -6175,7 +6187,7 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64 return ret; } -int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp, +int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp, int64_t poolid, uint64_t epoch, real_time& removed_mtime, list *remove_objs) @@ -6344,68 +6356,49 @@ int RGWRados::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl, optio return bl.length(); } -struct get_obj_data { - RGWRados* store; - RGWGetDataCB* client_cb; - rgw::Aio* aio; - uint64_t offset; // next offset to write to client - rgw::AioResultList completed; // completed read results, sorted by offset - optional_yield yield; - - get_obj_data(RGWRados* store, RGWGetDataCB* cb, rgw::Aio* aio, - uint64_t offset, optional_yield yield) - : store(store), client_cb(cb), aio(aio), offset(offset), yield(yield) {} - - int flush(rgw::AioResultList&& results) { - int r = rgw::check_for_errors(results); - if (r < 0) { - return r; - } +int get_obj_data::flush(rgw::AioResultList&& results) { + int r = rgw::check_for_errors(results); + if (r < 0) { + return r; + } + std::list bl_list; - auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; }; - results.sort(cmp); // merge() requires results to be sorted first - completed.merge(results, cmp); // merge results in sorted order + auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; }; + results.sort(cmp); // merge() requires results to be sorted first + completed.merge(results, cmp); // merge results in sorted order - while (!completed.empty() && completed.front().id == offset) { - auto bl = std::move(completed.front().data); - completed.pop_front_and_dispose(std::default_delete{}); + while (!completed.empty() && completed.front().id == offset) { + auto bl = std::move(completed.front().data); - offset += bl.length(); - int r = client_cb->handle_data(bl, 0, bl.length()); - if (r < 0) { - return r; - } + bl_list.push_back(bl); + offset += bl.length(); + int r = client_cb->handle_data(bl, 0, bl.length()); + if (r < 0) { + return r; } - return 0; - } - - void cancel() { - // wait for all completions to drain and ignore the results - aio->drain(); - } - int drain() { - auto c = aio->wait(); - while (!c.empty()) { - int r = flush(std::move(c)); - if (r < 0) { - cancel(); - return r; + if (rgwrados->get_use_datacache()) { + const std::lock_guard l(d3n_get_data.d3n_lock); + auto oid = completed.front().obj.get_ref().obj.oid; + if (bl.length() <= g_conf()->rgw_get_obj_max_req_size && !d3n_bypass_cache_write) { + lsubdout(g_ceph_context, rgw_datacache, 10) << "D3nDataCache: " << __func__ << "(): bl.length <= rgw_get_obj_max_req_size (default 4MB) - write to datacache, bl.length=" << bl.length() << dendl; + rgwrados->d3n_data_cache->put(bl, bl.length(), oid); + } else { + lsubdout(g_ceph_context, rgw_datacache, 10) << "D3nDataCache: " << __func__ << "(): not writing to datacache - bl.length > rgw_get_obj_max_req_size (default 4MB), bl.length=" << bl.length() << " or d3n_bypass_cache_write=" << d3n_bypass_cache_write << dendl; } - c = aio->wait(); } - return flush(std::move(c)); + completed.pop_front_and_dispose(std::default_delete{}); } -}; + return 0; +} -static int _get_obj_iterate_cb(const DoutPrefixProvider *dpp, +static int _get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs, off_t read_ofs, off_t len, bool is_head_obj, RGWObjState *astate, void *arg) { - struct get_obj_data *d = (struct get_obj_data *)arg; - - return d->store->get_obj_iterate_cb(dpp, read_obj, obj_ofs, read_ofs, len, + struct get_obj_data* d = static_cast(arg); + return d->rgwrados->get_obj_iterate_cb(dpp, read_obj, obj_ofs, read_ofs, len, is_head_obj, astate, arg); } @@ -6415,7 +6408,7 @@ int RGWRados::get_obj_iterate_cb(const DoutPrefixProvider *dpp, RGWObjState *astate, void *arg) { ObjectReadOperation op; - struct get_obj_data *d = (struct get_obj_data *)arg; + struct get_obj_data* d = static_cast(arg); string oid, key; if (is_head_obj) { @@ -6441,7 +6434,7 @@ int RGWRados::get_obj_iterate_cb(const DoutPrefixProvider *dpp, } } - auto obj = d->store->svc.rados->obj(read_obj); + auto obj = d->rgwrados->svc.rados->obj(read_obj); int r = obj.open(dpp); if (r < 0) { ldpp_dout(dpp, 4) << "failed to open rados context for " << read_obj << dendl; @@ -6678,7 +6671,7 @@ int RGWRados::olh_init_modification(const DoutPrefixProvider *dpp, const RGWBuck return ret; } -int RGWRados::guard_reshard(const DoutPrefixProvider *dpp, +int RGWRados::guard_reshard(const DoutPrefixProvider *dpp, BucketShard *bs, const rgw_obj& obj_instance, const RGWBucketInfo& bucket_info, @@ -6914,7 +6907,7 @@ int RGWRados::bucket_index_unlink_instance(const DoutPrefixProvider *dpp, const return 0; } -int RGWRados::bucket_index_read_olh_log(const DoutPrefixProvider *dpp, +int RGWRados::bucket_index_read_olh_log(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance, uint64_t ver_marker, map > *log, @@ -7453,7 +7446,7 @@ int RGWRados::get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket return decode_olh_info(dpp, cct, iter->second, olh); } -void RGWRados::check_pending_olh_entries(const DoutPrefixProvider *dpp, map& pending_entries, +void RGWRados::check_pending_olh_entries(const DoutPrefixProvider *dpp, map& pending_entries, map *rm_pending_entries) { map::iterator iter = pending_entries.begin(); @@ -7561,7 +7554,7 @@ int RGWRados::follow_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& buc return 0; } -int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp, +int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp, rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map *attrs, bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker, optional_yield y) @@ -8351,7 +8344,7 @@ uint32_t RGWRados::calc_ordered_bucket_list_per_shard(uint32_t num_entries, } -int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp, +int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const int shard_id, const rgw_obj_index_key& start_after, @@ -8647,7 +8640,7 @@ static int parse_index_hash_source(const std::string& oid_wo_ns, std::string *in } -int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp, +int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, int shard_id, const rgw_obj_index_key& start_after, @@ -8935,7 +8928,7 @@ int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInf return r; } -int RGWRados::check_disk_state(const DoutPrefixProvider *dpp, +int RGWRados::check_disk_state(const DoutPrefixProvider *dpp, librados::IoCtx io_ctx, const RGWBucketInfo& bucket_info, rgw_bucket_dir_entry& list_state, diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 7a541b557591..a94bf6c9fb01 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -26,9 +26,16 @@ #include "rgw_trim_bilog.h" #include "rgw_service.h" #include "rgw_sal.h" +#include "rgw_aio.h" +#include "rgw_d3n_cacherequest.h" #include "services/svc_rados.h" #include "services/svc_bi_rados.h" +#include "common/Throttle.h" +#include "common/ceph_mutex.h" +#include "rgw_cache.h" + +struct D3nDataCache; class RGWWatcher; class SafeTimer; @@ -48,6 +55,7 @@ class RGWReshard; class RGWReshardWait; class RGWSysObjectCtx; +struct get_obj_data; /* flags for put_obj_meta() */ #define PUT_OBJ_CREATE 0x01 @@ -165,6 +173,7 @@ struct RGWObjState { string write_tag; bool fake_tag{false}; std::optional manifest; + string shadow_obj; bool has_data{false}; bufferlist data; @@ -336,7 +345,6 @@ struct objexp_hint_entry { }; WRITE_CLASS_ENCODER(objexp_hint_entry) -class RGWDataChangesLog; class RGWMetaSyncStatusManager; class RGWDataSyncStatusManager; class RGWCoroutinesManagerRegistry; @@ -400,7 +408,7 @@ class RGWRados ceph::mutex lock = ceph::make_mutex("rados_timer_lock"); SafeTimer *timer; - rgw::sal::RadosStore* store; + rgw::sal::RadosStore* store = nullptr; RGWGC *gc = nullptr; RGWLC *lc; RGWObjectExpirer *obj_expirer; @@ -445,8 +453,7 @@ class RGWRados bool follow_olh, optional_yield y, bool assume_noent = false); int append_atomic_test(const DoutPrefixProvider *dpp, RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectOperation& op, RGWObjState **state, optional_yield y); - int append_atomic_test(const DoutPrefixProvider *dpp, const RGWObjState* astate, librados::ObjectOperation& op); - + int update_placement_map(); int store_bucket_info(RGWBucketInfo& info, map *pattrs, RGWObjVersionTracker *objv_tracker, bool exclusive); @@ -482,6 +489,7 @@ protected: bool use_cache{false}; bool use_gc{true}; + bool use_datacache{false}; int get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx *ioctx); public: @@ -508,6 +516,15 @@ public: return *this; } + RGWRados& set_use_datacache(bool status) { + use_datacache = status; + return *this; + } + + bool get_use_datacache() { + return use_datacache; + } + RGWLC *get_lc() { return lc; } @@ -612,7 +629,7 @@ public: /** Initialize the RADOS instance and prepare to do other ops */ int init_svc(bool raw, const DoutPrefixProvider *dpp); int init_ctl(const DoutPrefixProvider *dpp); - int init_rados(); + virtual int init_rados(); int init_complete(const DoutPrefixProvider *dpp); int initialize(const DoutPrefixProvider *dpp); void finalize(); @@ -820,7 +837,7 @@ public: explicit Write(RGWRados::Object *_target) : target(_target) {} - int _do_write_meta(const DoutPrefixProvider *dpp, + int _do_write_meta(const DoutPrefixProvider *dpp, uint64_t size, uint64_t accounted_size, map& attrs, bool modify_tail, bool assume_noent, @@ -976,7 +993,7 @@ public: const string& storage_class, bufferlist *acl_bl, RGWObjCategory category, list *remove_objs, const string *user_data = nullptr, bool appendable = false); - int complete_del(const DoutPrefixProvider *dpp, + int complete_del(const DoutPrefixProvider *dpp, int64_t poolid, uint64_t epoch, ceph::real_time& removed_mtime, /* mtime of removed object */ list *remove_objs); @@ -996,13 +1013,13 @@ public: RGWRados::Bucket *target; rgw_obj_key next_marker; - int list_objects_ordered(const DoutPrefixProvider *dpp, + int list_objects_ordered(const DoutPrefixProvider *dpp, int64_t max, vector *result, map *common_prefixes, bool *is_truncated, optional_yield y); - int list_objects_unordered(const DoutPrefixProvider *dpp, + int list_objects_unordered(const DoutPrefixProvider *dpp, int64_t max, vector *result, map *common_prefixes, @@ -1051,7 +1068,7 @@ public: }; // class List }; // class Bucket - int on_last_entry_in_listing(const DoutPrefixProvider *dpp, + int on_last_entry_in_listing(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const std::string& obj_prefix, const std::string& obj_delim, @@ -1085,6 +1102,8 @@ public: ATTRSMOD_MERGE = 2 }; + D3nDataCache* d3n_data_cache{nullptr}; + int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y); int stat_remote_obj(const DoutPrefixProvider *dpp, @@ -1267,18 +1286,18 @@ public: uint64_t max_chunk_size, iterate_obj_cb cb, void *arg, optional_yield y); - int get_obj_iterate_cb(const DoutPrefixProvider *dpp, + int append_atomic_test(const DoutPrefixProvider *dpp, const RGWObjState* astate, librados::ObjectOperation& op); + + virtual int get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs, off_t read_ofs, off_t len, bool is_head_obj, RGWObjState *astate, void *arg); - void get_obj_aio_completion_cb(librados::completion_t cb, void *arg); - /** * a simple object read without keeping state */ - int raw_obj_stat(const DoutPrefixProvider *dpp, + int raw_obj_stat(const DoutPrefixProvider *dpp, rgw_raw_obj& obj, uint64_t *psize, ceph::real_time *pmtime, uint64_t *epoch, map *attrs, bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker, optional_yield y); @@ -1286,7 +1305,7 @@ public: int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op); int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op); - int guard_reshard(const DoutPrefixProvider *dpp, + int guard_reshard(const DoutPrefixProvider *dpp, BucketShard *bs, const rgw_obj& obj_instance, const RGWBucketInfo& bucket_info, @@ -1300,7 +1319,7 @@ public: void bucket_index_guard_olh_op(const DoutPrefixProvider *dpp, RGWObjState& olh_state, librados::ObjectOperation& op); int olh_init_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag); int olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag); - int bucket_index_link_olh(const DoutPrefixProvider *dpp, + int bucket_index_link_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance, bool delete_marker, const string& op_tag, struct rgw_bucket_dir_entry_meta *meta, @@ -1469,7 +1488,7 @@ public: int bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info); int bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry); int remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, list& oid_list); - int move_rados_obj(const DoutPrefixProvider *dpp, + int move_rados_obj(const DoutPrefixProvider *dpp, librados::IoCtx& src_ioctx, const string& src_oid, const string& src_locator, librados::IoCtx& dst_ioctx, @@ -1510,7 +1529,7 @@ public: * and -errno on other failures. (-ENOENT is not a failure, and it * will encode that info as a suggested update.) */ - int check_disk_state(const DoutPrefixProvider *dpp, + int check_disk_state(const DoutPrefixProvider *dpp, librados::IoCtx io_ctx, const RGWBucketInfo& bucket_info, rgw_bucket_dir_entry& list_state, @@ -1563,4 +1582,47 @@ public: uint32_t num_shards); }; + +struct get_obj_data { + RGWRados* rgwrados; + RGWGetDataCB* client_cb = nullptr; + rgw::Aio* aio; + uint64_t offset; // next offset to write to client + rgw::AioResultList completed; // completed read results, sorted by offset + optional_yield yield; + + get_obj_data(RGWRados* rgwrados, RGWGetDataCB* cb, rgw::Aio* aio, + uint64_t offset, optional_yield yield) + : rgwrados(rgwrados), client_cb(cb), aio(aio), offset(offset), yield(yield) {} + ~get_obj_data() { + if (rgwrados->get_use_datacache()) { + const std::lock_guard l(d3n_get_data.d3n_lock); + } + } + + D3nGetObjData d3n_get_data; + atomic_bool d3n_bypass_cache_write{false}; + + int flush(rgw::AioResultList&& results); + + void cancel() { + // wait for all completions to drain and ignore the results + aio->drain(); + } + + int drain() { + auto c = aio->wait(); + while (!c.empty()) { + int r = flush(std::move(c)); + if (r < 0) { + cancel(); + return r; + } + c = aio->wait(); + } + return flush(std::move(c)); + } +}; + + #endif diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 167f1f91ece1..31cf9fd36068 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -23,6 +23,7 @@ #include "rgw_sal.h" #include "rgw_sal_rados.h" +#include "rgw_d3n_datacache.h" #define dout_subsys ceph_subsys_rgw @@ -32,12 +33,12 @@ extern rgw::sal::Store* newStore(void); rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* dpp, CephContext* cct, const std::string svc, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache, bool use_gc) { - rgw::sal::Store* store = nullptr; if (svc.compare("rados") == 0) { - store = newStore(); + rgw::sal::Store* store = newStore(); RGWRados* rados = static_cast(store)->getRados(); if ((*rados).set_use_cache(use_cache) + .set_use_datacache(false) .set_use_gc(use_gc) .set_run_gc_thread(use_gc_thread) .set_run_lc_thread(use_lc_thread) @@ -47,9 +48,28 @@ rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* d .initialize(cct, dpp) < 0) { delete store; store = nullptr; } + return store; } + else if (svc.compare("d3n") == 0) { + rgw::sal::RadosStore *store = new rgw::sal::RadosStore(); + RGWRados* rados = new D3nRGWDataCache; + store->setRados(rados); + rados->set_store(static_cast(store)); - return store; + if ((*rados).set_use_cache(use_cache) + .set_use_datacache(true) + .set_run_gc_thread(use_gc_thread) + .set_run_lc_thread(use_lc_thread) + .set_run_quota_threads(quota_threads) + .set_run_sync_thread(run_sync_thread) + .set_run_reshard_thread(run_reshard_thread) + .initialize(cct, dpp) < 0) { + delete store; store = nullptr; + } + return store; + } + + return nullptr; } rgw::sal::Store* StoreManager::init_raw_storage_provider(const DoutPrefixProvider* dpp, CephContext* cct, const std::string svc)