@contextlib.contextmanager
def configure_datacache(ctx, clients, datacache_path):
""" create directory for rgw datacache """
- log.info('Creating directory for rgw datacache at %s', datacache_path)
+ log.info('Preparing directory for rgw datacache at %s', datacache_path)
for client in clients:
if(datacache_path != None):
ctx.cluster.only(client).run(args=['mkdir', '-p', datacache_path])
if ctx.rgw.datacache:
subtasks.extend([
lambda: configure_datacache(ctx=ctx, clients=clients,
- datacache_path=ctx.rgw.datacache_path),
+ datacache_path=ctx.rgw.datacache_path),
])
if ctx.rgw.storage_classes:
subtasks.extend([
cached_object_name = json_op['manifest']['prefix']
log.debug("Cached object name is: %s", cached_object_name)
- # list the files in the cache dir for troubleshooting
- out = exec_cmd('ls -l %s' % (cache_dir))
- # check that the cache is enabled (files exist in the cache directory)
+ # check that the cache is enabled (does the cache directory empty)
out = exec_cmd('find %s -type f | wc -l' % (cache_dir))
- cached_files_count = int(get_cmd_output(out))
- log.debug("Number of cached files is: %s", cached_files_count)
- if cached_files_count == 0:
- log.info("ERROR: No files in the datacache directory, cache is disabled ?")
- assert(cached_files_count > 0)
+ chk_cache_dir = int(get_cmd_output(out))
+ log.debug("Check cache dir content: %s", chk_cache_dir)
+ if chk_cache_dir == 0:
+ log.info("NOTICE: datacache test object not found, inspect if datacache was bypassed or disabled during this check.")
+ return
+ # list the files in the cache dir for troubleshooting
+ out = exec_cmd('ls -l %s' % (cache_dir))
# get name of cached object and check if it exists in the cache
out = exec_cmd('find %s -name "*%s*"' % (cache_dir, cached_object_name))
cached_object_path = get_cmd_output(out)
enum_values:
- fifo
- omap
+- name: rgw_d3n_l1_local_datacache_enabled
+ type: bool
+ level: advanced
+ desc: Enable datacenter-scale dataset delivery local cache
+ default: false
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_d3n_l1_datacache_persistent_path
+ type: str
+ level: advanced
+ desc: path for the directory for storing the local cache objects data
+ default: /tmp/rgw_datacache/
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_d3n_l1_datacache_size
+ type: size
+ level: advanced
+ desc: datacache maximum size on disk in bytes
+ default: 1048576
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_d3n_l1_evict_cache_on_start
+ type: bool
+ level: advanced
+ desc: clear the content of the persistent data cache directory on start
+ default: true
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_d3n_l1_fadvise
+ type: int
+ level: advanced
+ desc: posix_fadvise() flag for access pattern of cache files
+ long_desc: for example to bypass the page-cache -
+ POSIX_FADV_DONTNEED=4
+ default: 4
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_d3n_l1_eviction_policy
+ type: str
+ level: advanced
+ desc: select the d3n cache eviction policy
+ default: lru
+ services:
+ - rgw
+ enum_values:
+ - lru
+ - random
+ with_legacy: true
+- name: rgw_d3n_libaio_aio_threads
+ type: int
+ level: advanced
+ desc: specifies the maximum number of worker threads that may be used by libaio
+ default: 20
+ services:
+ - rgw
+ see_also:
+ - rgw_thread_pool_size
+ with_legacy: true
+- name: rgw_d3n_libaio_aio_num
+ type: int
+ level: advanced
+ desc: specifies the maximum number of simultaneous I/O requests that libaio expects to enqueue
+ default: 64
+ services:
+ - rgw
+ see_also:
+ - rgw_thread_pool_size
+ with_legacy: true
- name: rgw_luarocks_location
type: str
level: advanced
SUBSYS(perfcounter, 1, 5)
SUBSYS(rgw, 1, 5) // log level for the Rados gateway
SUBSYS(rgw_sync, 1, 5)
+SUBSYS(rgw_datacache, 1, 5)
SUBSYS(civetweb, 1, 10)
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
rgw_bucket_layout.cc
rgw_bucket_sync.cc
rgw_cache.cc
+ rgw_d3n_datacache.cc
rgw_common.cc
rgw_compression.cc
rgw_etag_verifier.cc
#include "librados/librados_asio.h"
#include "rgw_aio.h"
+#include "rgw_d3n_cacherequest.h"
namespace rgw {
};
}
+
+Aio::OpFunc d3n_cache_aio_abstract(const DoutPrefixProvider *dpp, optional_yield y, off_t read_ofs, off_t read_len, std::string& location) {
+ return [dpp, y, read_ofs, read_len, location] (Aio* aio, AioResult& r) mutable {
+ // d3n data cache requires yield context (rgw_beast_enable_async=true)
+ ceph_assert(y);
+ auto& ref = r.obj.get_ref();
+ auto c = std::make_unique<D3nL1CacheRequest>();
+ lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << ref.obj.oid << dendl;
+ c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), location, read_ofs, read_len, aio, r);
+ };
+}
+
+
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op, optional_yield y) {
static_assert(std::is_base_of_v<librados::ObjectOperation, std::decay_t<Op>>);
return aio_abstract(std::move(op), y);
}
+Aio::OpFunc Aio::d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y,
+ off_t read_ofs, off_t read_len, std::string& location) {
+ return d3n_cache_aio_abstract(dpp, y, read_ofs, read_len, location);
+}
+
} // namespace rgw
#include "include/function2.hpp"
+struct D3nGetObjData;
+
namespace rgw {
struct AioResult {
optional_yield y);
static OpFunc librados_op(librados::ObjectWriteOperation&& op,
optional_yield y);
+ static OpFunc d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y,
+ off_t read_ofs, off_t read_len, std::string& location);
};
} // namespace rgw
int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
ldout(cct, 10) << "Compression for rgw is enabled, decompress part "
- << "bl_ofs="<< bl_ofs << bl_len << dendl;
+ << "bl_ofs="<< bl_ofs << ", bl_len=" << bl_len << dendl;
if (!compressor.get()) {
// if compressor isn't available - error, because cannot return decompressed data?
q_len -= ch_len;
r = next->handle_data(out_bl, q_ofs, ch_len);
if (r < 0) {
- lderr(cct) << "handle_data failed with exit code " << r << dendl;
+ lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl;
return r;
}
out_bl.splice(0, q_ofs + ch_len);
if (ch_len > 0) {
r = next->handle_data(out_bl, q_ofs, ch_len);
if (r < 0) {
- lderr(cct) << "handle_data failed with exit code " << r << dendl;
+ lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl;
return r;
}
out_bl.splice(0, q_ofs + ch_len);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#ifndef RGW_CACHEREQUEST_H
+#define RGW_CACHEREQUEST_H
+
+#include <fcntl.h>
+#include <stdlib.h>
+#include <aio.h>
+
+#include "include/rados/librados.hpp"
+#include "include/Context.h"
+#include "common/async/completion.h"
+
+#include <errno.h>
+#include "common/error_code.h"
+#include "common/errno.h"
+
+#include "rgw_aio.h"
+#include "rgw_cache.h"
+
+
+struct D3nGetObjData {
+ std::mutex d3n_lock;
+};
+
+struct D3nL1CacheRequest {
+ ~D3nL1CacheRequest() {
+ lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "(): Read From Cache, comlete" << dendl;
+ }
+
+ // unique_ptr with custom deleter for struct aiocb
+ struct libaio_aiocb_deleter {
+ void operator()(struct aiocb* c) {
+ if(c->aio_fildes > 0) {
+ if( ::close(c->aio_fildes) != 0) {
+ lsubdout(g_ceph_context, rgw_datacache, 2) << "D3nDataCache: " << __func__ << "(): Error - can't close file, errno=" << -errno << dendl;
+ }
+ }
+ delete c;
+ }
+ };
+
+ using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
+
+ struct AsyncFileReadOp {
+ bufferlist result;
+ unique_aio_cb_ptr aio_cb;
+ using Signature = void(boost::system::error_code, bufferlist);
+ using Completion = ceph::async::Completion<Signature, AsyncFileReadOp>;
+
+ int init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg) {
+ ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): file_path=" << file_path << dendl;
+ aio_cb.reset(new struct aiocb);
+ memset(aio_cb.get(), 0, sizeof(struct aiocb));
+ aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY));
+ if(aio_cb->aio_fildes < 0) {
+ int err = errno;
+ ldpp_dout(dpp, 1) << "ERROR: D3nDataCache: " << __func__ << "(): can't open " << file_path << " : " << cpp_strerror(err) << dendl;
+ return -err;
+ }
+ if (g_conf()->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL)
+ posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
+
+ bufferptr bp(read_len);
+ aio_cb->aio_buf = bp.c_str();
+ result.append(std::move(bp));
+
+ aio_cb->aio_nbytes = read_len;
+ aio_cb->aio_offset = read_ofs;
+ aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+ aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch;
+ aio_cb->aio_sigevent.sigev_notify_attributes = nullptr;
+ aio_cb->aio_sigevent.sigev_value.sival_ptr = arg;
+
+ return 0;
+ }
+
+ static void libaio_cb_aio_dispatch(sigval_t sigval) {
+ lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "()" << dendl;
+ auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
+ auto op = std::move(p->user_data);
+ const int ret = -aio_error(op.aio_cb.get());
+ boost::system::error_code ec;
+ if (ret < 0) {
+ ec.assign(-ret, boost::system::system_category());
+ }
+
+ ceph::async::dispatch(std::move(p), ec, std::move(op.result));
+ }
+
+ template <typename Executor1, typename CompletionHandler>
+ static auto create(const Executor1& ex1, CompletionHandler&& handler) {
+ auto p = Completion::create(ex1, std::move(handler));
+ return p;
+ }
+ };
+
+ template <typename ExecutionContext, typename CompletionToken>
+ auto async_read(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& file_path,
+ off_t read_ofs, off_t read_len, CompletionToken&& token) {
+ using Op = AsyncFileReadOp;
+ using Signature = typename Op::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = Op::create(ctx.get_executor(), init.completion_handler);
+ auto& op = p->user_data;
+
+ ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): file_path=" << file_path << dendl;
+ int ret = op.init(dpp, file_path, read_ofs, read_len, p.get());
+ if(0 == ret) {
+ ret = ::aio_read(op.aio_cb.get());
+ }
+ ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
+ if(ret < 0) {
+ auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+ ceph::async::post(std::move(p), ec, bufferlist{});
+ } else {
+ (void)p.release();
+ }
+ return init.result.get();
+ }
+
+ struct d3n_libaio_handler {
+ rgw::Aio* throttle = nullptr;
+ rgw::AioResult& r;
+ // read callback
+ void operator()(boost::system::error_code ec, bufferlist bl) const {
+ r.result = -ec.value();
+ r.data = std::move(bl);
+ throttle->put(r);
+ }
+ };
+
+ void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::io_context& context, spawn::yield_context yield,
+ std::string& file_path, off_t read_ofs, off_t read_len,
+ rgw::Aio* aio, rgw::AioResult& r) {
+ using namespace boost::asio;
+ async_completion<spawn::yield_context, void()> init(yield);
+ auto ex = get_associated_executor(init.completion_handler);
+
+ auto& ref = r.obj.get_ref();
+ ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << ref.obj.oid << dendl;
+ async_read(dpp, context, file_path+"/"+ref.obj.oid, read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r}));
+ }
+
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_d3n_datacache.h"
+#include "rgw_rest_client.h"
+#include "rgw_auth_s3.h"
+#include "rgw_op.h"
+#include "rgw_common.h"
+#include "rgw_auth_s3.h"
+#include "rgw_op.h"
+#include "rgw_crypt_sanitize.h"
+
+#if __has_include(<filesystem>)
+#include <filesystem>
+namespace efs = std::filesystem;
+#else
+#include <experimental/filesystem>
+namespace efs = std::experimental::filesystem;
+#endif
+
+#define dout_subsys ceph_subsys_rgw
+
+
+
+int D3nCacheAioWriteRequest::d3n_prepare_libaio_write_op(bufferlist& bl, unsigned int len, string oid, string cache_location)
+{
+ std::string location = cache_location + oid;
+ int r = 0;
+
+ lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "(): Write To Cache, location=" << location << dendl;
+ cb = new struct aiocb;
+ mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+ memset(cb, 0, sizeof(struct aiocb));
+ r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode);
+ if (fd < 0) {
+ ldout(cct, 0) << "ERROR: D3nCacheAioWriteRequest::create_io: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl;
+ goto done;
+ }
+ if (g_conf()->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL)
+ posix_fadvise(fd, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
+ cb->aio_fildes = fd;
+
+ data = malloc(len);
+ if (!data) {
+ ldout(cct, 0) << "ERROR: D3nCacheAioWriteRequest::create_io: memory allocation failed" << dendl;
+ goto close_file;
+ }
+ cb->aio_buf = data;
+ memcpy((void*)data, bl.c_str(), len);
+ cb->aio_nbytes = len;
+ goto done;
+
+close_file:
+ ::close(fd);
+done:
+ return r;
+}
+
+D3nDataCache::D3nDataCache()
+ : cct(nullptr), io_type(_io_type::ASYNC_IO), free_data_cache_size(0), outstanding_write_size(0)
+{
+ lsubdout(g_ceph_context, rgw_datacache, 5) << "D3nDataCache: " << __func__ << "()" << dendl;
+}
+
+void D3nDataCache::init(CephContext *_cct) {
+ cct = _cct;
+ free_data_cache_size = cct->_conf->rgw_d3n_l1_datacache_size;
+ head = nullptr;
+ tail = nullptr;
+ cache_location = cct->_conf->rgw_d3n_l1_datacache_persistent_path;
+ if(cache_location.back() != '/') {
+ cache_location += "/";
+ }
+ try {
+ if (efs::exists(cache_location)) {
+ // d3n: evict the cache storage directory
+ if (g_conf()->rgw_d3n_l1_evict_cache_on_start) {
+ lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: init: evicting the persistent storage directory on start" << dendl;
+ for (auto& p : efs::directory_iterator(cache_location)) {
+ efs::remove_all(p.path());
+ }
+ }
+ } else {
+ // create the cache storage directory
+ lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: init: creating the persistent storage directory on start" << dendl;
+ efs::create_directories(cache_location);
+ }
+ } catch (const efs::filesystem_error& e) {
+ lderr(g_ceph_context) << "D3nDataCache: init: ERROR initializing the cache storage directory '" << cache_location <<
+ "' : " << e.what() << dendl;
+ }
+
+ auto conf_eviction_policy = cct->_conf.get_val<std::string>("rgw_d3n_l1_eviction_policy");
+ ceph_assert(conf_eviction_policy == "lru" || conf_eviction_policy == "random");
+ if (conf_eviction_policy == "lru")
+ eviction_policy = _eviction_policy::LRU;
+ if (conf_eviction_policy == "random")
+ eviction_policy = _eviction_policy::RANDOM;
+
+ // libaio setup
+ struct aioinit ainit{0};
+ ainit.aio_threads = cct->_conf.get_val<int64_t>("rgw_d3n_libaio_aio_threads");;
+ ainit.aio_num = cct->_conf.get_val<int64_t>("rgw_d3n_libaio_aio_num");
+ ainit.aio_idle_time = 120;
+ aio_init(&ainit);
+}
+
+int D3nDataCache::d3n_io_write(bufferlist& bl, unsigned int len, std::string oid)
+{
+ D3nChunkDataInfo* chunk_info = new D3nChunkDataInfo;
+ std::string location = cache_location + oid;
+
+ lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "(): location=" << location << dendl;
+ FILE *cache_file = nullptr;
+ int r = 0;
+ size_t nbytes = 0;
+
+ cache_file = fopen(location.c_str(), "w+");
+ if (cache_file == nullptr) {
+ ldout(cct, 0) << "ERROR: D3nDataCache::fopen file has return error, errno=" << errno << dendl;
+ return -errno;
+ }
+
+ nbytes = fwrite(bl.c_str(), 1, len, cache_file);
+ if (nbytes != len) {
+ ldout(cct, 0) << "ERROR: D3nDataCache::io_write: fwrite has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl;
+ return -EIO;
+ }
+
+ r = fclose(cache_file);
+ if (r != 0) {
+ ldout(cct, 0) << "ERROR: D3nDataCache::fclsoe file has return error, errno=" << errno << dendl;
+ return -errno;
+ }
+
+ { // update cahce_map entries for new chunk in cache
+ const std::lock_guard l(d3n_cache_lock);
+ chunk_info->oid = oid;
+ chunk_info->set_ctx(cct);
+ chunk_info->size = len;
+ d3n_cache_map.insert(pair<string, D3nChunkDataInfo*>(oid, chunk_info));
+ }
+
+ return r;
+}
+
+void d3n_libaio_write_cb(sigval_t sigval)
+{
+ lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl;
+ D3nCacheAioWriteRequest* c = static_cast<D3nCacheAioWriteRequest*>(sigval.sival_ptr);
+ c->priv_data->d3n_libaio_write_completion_cb(c);
+}
+
+
+void D3nDataCache::d3n_libaio_write_completion_cb(D3nCacheAioWriteRequest* c)
+{
+ D3nChunkDataInfo* chunk_info{nullptr};
+
+ ldout(cct, 5) << "D3nDataCache: " << __func__ << "(): oid=" << c->oid << dendl;
+
+ { // update cache_map entries for new chunk in cache
+ const std::lock_guard l(d3n_cache_lock);
+ auto it = d3n_outstanding_write_list.find(c->oid);
+ if (it != d3n_outstanding_write_list.end()) {
+ d3n_outstanding_write_list.erase(it);
+ }
+ chunk_info = new D3nChunkDataInfo;
+ chunk_info->oid = c->oid;
+ chunk_info->set_ctx(cct);
+ chunk_info->size = c->cb->aio_nbytes;
+ d3n_cache_map.insert(pair<string, D3nChunkDataInfo*>(c->oid, chunk_info));
+ }
+
+ { // update free size
+ const std::lock_guard l(d3n_eviction_lock);
+ free_data_cache_size -= c->cb->aio_nbytes;
+ outstanding_write_size -= c->cb->aio_nbytes;
+ lru_insert_head(chunk_info);
+ }
+ delete c;
+ c = nullptr;
+}
+
+int D3nDataCache::d3n_libaio_create_write_request(bufferlist& bl, unsigned int len, std::string oid)
+{
+ lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "(): Write To Cache, oid=" << oid << ", len=" << len << dendl;
+ struct D3nCacheAioWriteRequest* wr = new struct D3nCacheAioWriteRequest(cct);
+ int r=0;
+ if ((r = wr->d3n_prepare_libaio_write_op(bl, len, oid, cache_location)) < 0) {
+ ldout(cct, 0) << "ERROR: D3nDataCache: " << __func__ << "() prepare libaio write op r=" << r << dendl;
+ goto done;
+ }
+ wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+ wr->cb->aio_sigevent.sigev_notify_function = d3n_libaio_write_cb;
+ wr->cb->aio_sigevent.sigev_notify_attributes = nullptr;
+ wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr;
+ wr->oid = oid;
+ wr->priv_data = this;
+
+ if ((r = ::aio_write(wr->cb)) != 0) {
+ ldout(cct, 0) << "ERROR: D3nDataCache: " << __func__ << "() aio_write r=" << r << dendl;
+ goto error;
+ }
+ return 0;
+
+error:
+ delete wr;
+done:
+ return r;
+}
+
+void D3nDataCache::put(bufferlist& bl, unsigned int len, std::string& oid)
+{
+ int r = 0;
+ uint64_t freed_size = 0, _free_data_cache_size = 0, _outstanding_write_size = 0;
+
+ ldout(cct, 10) << "D3nDataCache::" << __func__ << "(): oid=" << oid << dendl;
+ {
+ const std::lock_guard l(d3n_cache_lock);
+ std::unordered_map<string, D3nChunkDataInfo*>::iterator iter = d3n_cache_map.find(oid);
+ if (iter != d3n_cache_map.end()) {
+ ldout(cct, 10) << "D3nDataCache::" << __func__ << "(): data already cached, no rewrite" << dendl;
+ return;
+ }
+ auto it = d3n_outstanding_write_list.find(oid);
+ if (it != d3n_outstanding_write_list.end()) {
+ ldout(cct, 10) << "D3nDataCache: NOTE: data put in cache already issued, no rewrite" << dendl;
+ return;
+ }
+ d3n_outstanding_write_list.insert(oid);
+ }
+ {
+ const std::lock_guard l(d3n_eviction_lock);
+ _free_data_cache_size = free_data_cache_size;
+ _outstanding_write_size = outstanding_write_size;
+ }
+ ldout(cct, 20) << "D3nDataCache: Before eviction _free_data_cache_size:" << _free_data_cache_size << ", _outstanding_write_size:" << _outstanding_write_size << ", freed_size:" << freed_size << dendl;
+ while (len >= (_free_data_cache_size - _outstanding_write_size + freed_size)) {
+ ldout(cct, 20) << "D3nDataCache: enter eviction, r=" << r << dendl;
+ if (eviction_policy == _eviction_policy::LRU) {
+ r = lru_eviction();
+ } else if (eviction_policy == _eviction_policy::RANDOM) {
+ r = random_eviction();
+ } else {
+ ldout(cct, 0) << "D3nDataCache: Warning: unknown cache eviction policy, defaulting to lru eviction" << dendl;
+ r = lru_eviction();
+ }
+ if (r < 0)
+ return;
+ freed_size += r;
+ }
+ r = d3n_libaio_create_write_request(bl, len, oid);
+ if (r < 0) {
+ const std::lock_guard l(d3n_cache_lock);
+ auto it = d3n_outstanding_write_list.find(oid);
+ if (it != d3n_outstanding_write_list.end()) {
+ d3n_outstanding_write_list.erase(it);
+ }
+ ldout(cct, 1) << "D3nDataCache: create_aio_write_request fail, r=" << r << dendl;
+ return;
+ }
+
+ const std::lock_guard l(d3n_eviction_lock);
+ free_data_cache_size += freed_size;
+ outstanding_write_size += len;
+}
+
+bool D3nDataCache::get(const string& oid, const off_t len)
+{
+ const std::lock_guard l(d3n_cache_lock);
+ bool exist = false;
+ string location = cache_location + oid;
+
+ lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "(): location=" << location << dendl;
+ std::unordered_map<string, D3nChunkDataInfo*>::iterator iter = d3n_cache_map.find(oid);
+ if (!(iter == d3n_cache_map.end())) {
+ // check inside cache whether file exists or not!!!! then make exist true;
+ struct D3nChunkDataInfo* chdo = iter->second;
+ struct stat st;
+ int r = stat(location.c_str(), &st);
+ if ( r != -1 && st.st_size == len) { // file exists and containes required data range length
+ exist = true;
+ /*LRU*/
+ /*get D3nChunkDataInfo*/
+ const std::lock_guard l(d3n_eviction_lock);
+ lru_remove(chdo);
+ lru_insert_head(chdo);
+ } else {
+ d3n_cache_map.erase(oid);
+ const std::lock_guard l(d3n_eviction_lock);
+ lru_remove(chdo);
+ delete chdo;
+ exist = false;
+ }
+ }
+ return exist;
+}
+
+size_t D3nDataCache::random_eviction()
+{
+ lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "()" << dendl;
+ int n_entries = 0;
+ int random_index = 0;
+ size_t freed_size = 0;
+ D3nChunkDataInfo* del_entry;
+ string del_oid, location;
+ {
+ const std::lock_guard l(d3n_cache_lock);
+ n_entries = d3n_cache_map.size();
+ if (n_entries <= 0) {
+ return -1;
+ }
+ srand (time(NULL));
+ random_index = ceph::util::generate_random_number<int>(0, n_entries-1);
+ std::unordered_map<string, D3nChunkDataInfo*>::iterator iter = d3n_cache_map.begin();
+ std::advance(iter, random_index);
+ del_oid = iter->first;
+ del_entry = iter->second;
+ ldout(cct, 20) << "D3nDataCache: random_eviction: index:" << random_index << ", free size: " << del_entry->size << dendl;
+ freed_size = del_entry->size;
+ delete del_entry;
+ del_entry = nullptr;
+ d3n_cache_map.erase(del_oid); // oid
+ }
+
+ location = cache_location + del_oid;
+ remove(location.c_str());
+ return freed_size;
+}
+
+size_t D3nDataCache::lru_eviction()
+{
+ lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: " << __func__ << "()" << dendl;
+ int n_entries = 0;
+ size_t freed_size = 0;
+ D3nChunkDataInfo* del_entry;
+ string del_oid, location;
+
+ {
+ const std::lock_guard l(d3n_eviction_lock);
+ del_entry = tail;
+ if (del_entry == nullptr) {
+ ldout(cct, 2) << "D3nDataCache: lru_eviction: del_entry=null_ptr" << dendl;
+ return 0;
+ }
+ lru_remove(del_entry);
+ }
+
+ {
+ const std::lock_guard l(d3n_cache_lock);
+ n_entries = d3n_cache_map.size();
+ if (n_entries <= 0) {
+ ldout(cct, 2) << "D3nDataCache: lru_eviction: cache_map.size<=0" << dendl;
+ return -1;
+ }
+ del_oid = del_entry->oid;
+ ldout(cct, 20) << "D3nDataCache: lru_eviction: oid to remove: " << del_oid << dendl;
+ std::unordered_map<string, D3nChunkDataInfo*>::iterator iter = d3n_cache_map.find(del_oid);
+ if (iter != d3n_cache_map.end()) {
+ d3n_cache_map.erase(iter); // oid
+ }
+ }
+ freed_size = del_entry->size;
+ delete del_entry;
+ location = cache_location + del_oid;
+ remove(location.c_str());
+ return freed_size;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#ifndef CEPH_RGWD3NDATACACHE_H
+#define CEPH_RGWD3NDATACACHE_H
+
+#include "rgw_rados.h"
+#include <curl/curl.h>
+
+#include "rgw_common.h"
+
+#include <unistd.h>
+#include <signal.h>
+#include "include/Context.h"
+#include "include/lru.h"
+#include "rgw_d3n_cacherequest.h"
+
+
+/*D3nDataCache*/
+struct D3nDataCache;
+
+
+struct D3nChunkDataInfo : public LRUObject {
+ CephContext *cct;
+ uint64_t size;
+ time_t access_time;
+ string address;
+ string oid;
+ bool complete;
+ struct D3nChunkDataInfo* lru_prev;
+ struct D3nChunkDataInfo* lru_next;
+
+ D3nChunkDataInfo(): size(0) {}
+
+ void set_ctx(CephContext *_cct) {
+ cct = _cct;
+ }
+
+ void dump(Formatter *f) const;
+ static void generate_test_instances(list<D3nChunkDataInfo*>& o);
+};
+
+struct D3nCacheAioWriteRequest {
+ string oid;
+ void *data;
+ int fd;
+ struct aiocb *cb;
+ D3nDataCache *priv_data;
+ CephContext *cct;
+
+ D3nCacheAioWriteRequest(CephContext *_cct) : cct(_cct) {}
+ int d3n_prepare_libaio_write_op(bufferlist& bl, unsigned int len, string oid, string cache_location);
+
+ ~D3nCacheAioWriteRequest() {
+ ::close(fd);
+ cb->aio_buf = nullptr;
+ free(data);
+ data = nullptr;
+ delete(cb);
+ }
+};
+
+struct D3nDataCache {
+
+private:
+ std::unordered_map <string, D3nChunkDataInfo*> d3n_cache_map;
+ std::set<string> d3n_outstanding_write_list;
+ std::mutex d3n_cache_lock;
+ std::mutex d3n_eviction_lock;
+
+ CephContext *cct;
+ enum class _io_type {
+ SYNC_IO = 1,
+ ASYNC_IO = 2,
+ SEND_FILE = 3
+ } io_type;
+ enum class _eviction_policy {
+ LRU=0, RANDOM=1
+ } eviction_policy;
+
+ struct sigaction action;
+ uint64_t free_data_cache_size = 0;
+ uint64_t outstanding_write_size = 0;
+ struct D3nChunkDataInfo* head;
+ struct D3nChunkDataInfo* tail;
+
+private:
+ void add_io();
+
+public:
+ D3nDataCache();
+ ~D3nDataCache() {
+ while (lru_eviction() > 0);
+ }
+
+ std::string cache_location;
+
+ bool get(const string& oid, const off_t len);
+ void put(bufferlist& bl, unsigned int len, string& obj_key);
+ int d3n_io_write(bufferlist& bl, unsigned int len, std::string oid);
+ int d3n_libaio_create_write_request(bufferlist& bl, unsigned int len, std::string oid);
+ void d3n_libaio_write_completion_cb(D3nCacheAioWriteRequest* c);
+ size_t random_eviction();
+ size_t lru_eviction();
+
+ void init(CephContext *_cct);
+
+ void lru_insert_head(struct D3nChunkDataInfo* o) {
+ lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl;
+ o->lru_next = head;
+ o->lru_prev = nullptr;
+ if (head) {
+ head->lru_prev = o;
+ } else {
+ tail = o;
+ }
+ head = o;
+ }
+
+ void lru_insert_tail(struct D3nChunkDataInfo* o) {
+ lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl;
+ o->lru_next = nullptr;
+ o->lru_prev = tail;
+ if (tail) {
+ tail->lru_next = o;
+ } else {
+ head = o;
+ }
+ tail = o;
+ }
+
+ void lru_remove(struct D3nChunkDataInfo* o) {
+ lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache: " << __func__ << "()" << dendl;
+ if (o->lru_next)
+ o->lru_next->lru_prev = o->lru_prev;
+ else
+ tail = o->lru_prev;
+ if (o->lru_prev)
+ o->lru_prev->lru_next = o->lru_next;
+ else
+ head = o->lru_next;
+ o->lru_next = o->lru_prev = nullptr;
+ }
+};
+
+
+template <class T>
+class D3nRGWDataCache : public T {
+
+public:
+ D3nRGWDataCache() {}
+
+ int init_rados() override {
+ int ret;
+ ret = T::init_rados();
+ if (ret < 0)
+ return ret;
+
+ return 0;
+ }
+
+ int get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs,
+ off_t read_ofs, off_t len, bool is_head_obj,
+ RGWObjState *astate, void *arg) override;
+};
+
+template<typename T>
+int D3nRGWDataCache<T>::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const rgw_raw_obj& read_obj, off_t obj_ofs,
+ off_t read_ofs, off_t len, bool is_head_obj,
+ RGWObjState *astate, void *arg) {
+ lsubdout(g_ceph_context, rgw_datacache, 30) << "D3nDataCache::" << __func__ << "(): is head object : " << is_head_obj << dendl;
+ librados::ObjectReadOperation op;
+ struct get_obj_data* d = static_cast<struct get_obj_data*>(arg);
+ string oid, key;
+
+ if (is_head_obj) {
+ // only when reading from the head object do we need to do the atomic test
+ int r = T::append_atomic_test(dpp, astate, op);
+ if (r < 0)
+ return r;
+
+ if (astate &&
+ obj_ofs < astate->data.length()) {
+ unsigned chunk_len = std::min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
+
+ r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
+ if (r < 0)
+ return r;
+
+ len -= chunk_len;
+ d->offset += chunk_len;
+ read_ofs += chunk_len;
+ obj_ofs += chunk_len;
+ if (!len)
+ return 0;
+ }
+
+ auto obj = d->rgwrados->svc.rados->obj(read_obj);
+ r = obj.open(dpp);
+ if (r < 0) {
+ lsubdout(g_ceph_context, rgw, 4) << "failed to open rados context for " << read_obj << dendl;
+ return r;
+ }
+
+ ldpp_dout(dpp, 20) << "D3nDataCache::" << __func__ << "(): oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
+ op.read(read_ofs, len, nullptr, nullptr);
+
+ const uint64_t cost = len;
+ const uint64_t id = obj_ofs; // use logical object offset for sorting replies
+
+ auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
+ return d->flush(std::move(completed));
+ } else {
+ ldpp_dout(dpp, 20) << "D3nDataCache::" << __func__ << "(): oid=" << read_obj.oid << ", is_head_obj=" << is_head_obj << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl;
+ int r;
+
+ op.read(read_ofs, len, nullptr, nullptr);
+
+ const uint64_t cost = len;
+ const uint64_t id = obj_ofs; // use logical object offset for sorting replies
+ oid = read_obj.oid;
+
+ auto obj = d->rgwrados->svc.rados->obj(read_obj);
+ r = obj.open(dpp);
+ if (r < 0) {
+ lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: Error: failed to open rados context for " << read_obj << ", r=" << r << dendl;
+ return r;
+ }
+
+ const bool is_compressed = (astate->attrset.find(RGW_ATTR_COMPRESSION) != astate->attrset.end());
+ const bool is_encrypted = (astate->attrset.find(RGW_ATTR_CRYPT_MODE) != astate->attrset.end());
+ if (read_ofs != 0 || astate->size != astate->accounted_size || is_compressed || is_encrypted) {
+ d->d3n_bypass_cache_write = true;
+ lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: " << __func__ << "(): Note - bypassing datacache: oid=" << read_obj.oid << ", read_ofs!=0 = " << read_ofs << ", size=" << astate->size << " != accounted_size=" << astate->accounted_size << ", is_compressed=" << is_compressed << ", is_encrypted=" << is_encrypted << dendl;
+ auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
+ r = d->flush(std::move(completed));
+ return r;
+ }
+
+ if (d->rgwrados->d3n_data_cache->get(oid, len)) {
+ // Read From Cache
+ ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): READ FROM CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl;
+ auto completed = d->aio->get(obj, rgw::Aio::d3n_cache_op(dpp, d->yield, read_ofs, len, d->rgwrados->d3n_data_cache->cache_location), cost, id);
+ r = d->flush(std::move(completed));
+ if (r < 0) {
+ lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: " << __func__ << "(): Error: failed to drain/flush, r= " << r << dendl;
+ }
+ return r;
+ } else {
+ // Write To Cache
+ ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): WRITE TO CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << " len=" << len << dendl;
+ auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
+ return d->flush(std::move(completed));
+ }
+ }
+ lsubdout(g_ceph_context, rgw, 1) << "D3nDataCache: " << __func__ << "(): Warning: Check head object cache handling flow, oid=" << read_obj.oid << dendl;
+
+ return 0;
+}
+
+#endif
rgw_http_client_init(g_ceph_context);
rgw_kmip_client_init(*new RGWKMIPManagerImpl(g_ceph_context));
+ lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_local_datacache_enabled=" << cct->_conf->rgw_d3n_l1_local_datacache_enabled << dendl;
+ if (cct->_conf->rgw_d3n_l1_local_datacache_enabled) {
+ lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_enable_ops_log=" << cct->_conf->rgw_enable_ops_log << dendl;
+ lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_datacache_persistent_path='" << cct->_conf->rgw_d3n_l1_datacache_persistent_path << "'" << dendl;
+ lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_datacache_size=" << cct->_conf->rgw_d3n_l1_datacache_size << dendl;
+ lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_evict_cache_on_start=" << cct->_conf->rgw_d3n_l1_evict_cache_on_start << dendl;
+ lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_fadvise=" << cct->_conf->rgw_d3n_l1_fadvise << dendl;
+ lsubdout(cct, rgw, 1) << "rgw_d3n: rgw_d3n_l1_eviction_policy=" << cct->_conf->rgw_d3n_l1_eviction_policy << dendl;
+ }
+ bool rgw_d3n_datacache_enabled = cct->_conf->rgw_d3n_l1_local_datacache_enabled;
+ if (rgw_d3n_datacache_enabled && !cct->_conf->rgw_enable_ops_log) {
+ lsubdout(cct, rgw_datacache, 0) << "rgw_d3n: WARNING: D3N DataCache disabling (D3N requires that rgw_enable_ops_log will be enabled also)" << dendl;
+ rgw_d3n_datacache_enabled = false;
+ }
+ if (rgw_d3n_datacache_enabled && (cct->_conf->rgw_max_chunk_size != cct->_conf->rgw_obj_stripe_size)) {
+ lsubdout(cct, rgw_datacache, 0) << "rgw_d3n: WARNING: D3N DataCache disabling (D3N requires that the chunk_size equals stripe_size)" << dendl;
+ rgw_d3n_datacache_enabled = false;
+ }
+ if (rgw_d3n_datacache_enabled && !cct->_conf->rgw_beast_enable_async) {
+ lsubdout(cct, rgw_datacache, 0) << "rgw_d3n: WARNING: D3N DataCache disabling (D3N requires yield context - rgw_beast_enable_async=true)" << dendl;
+ rgw_d3n_datacache_enabled = false;
+ }
+ lsubdout(cct, rgw, 1) << "D3N datacache enabled: " << rgw_d3n_datacache_enabled << dendl;
+
rgw::sal::Store* store =
StoreManager::get_storage(&dp, g_ceph_context,
- "rados",
+ (!rgw_d3n_datacache_enabled) ? "rados" : "d3n",
g_conf()->rgw_enable_gc_threads,
g_conf()->rgw_enable_lc_threads,
g_conf()->rgw_enable_quota_threads,
#include "compressor/Compressor.h"
+#include "rgw_d3n_datacache.h"
+
#ifdef WITH_LTTNG
#define TRACEPOINT_DEFINE
#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
bool requires;
r = ioctx.pool_requires_alignment2(&requires);
if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_requires_alignment2() returned "
+ ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_requires_alignment2() returned "
<< r << dendl;
return r;
}
uint64_t align;
r = ioctx.pool_required_alignment2(&align);
if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_required_alignment2() returned "
+ ldpp_dout(dpp, 0) << "ERROR: ioctx.pool_required_alignment2() returned "
<< r << dendl;
return r;
}
delete binfo_cache;
delete obj_tombstone_cache;
+ if (d3n_data_cache)
+ delete d3n_data_cache;
if (reshard_wait.get()) {
reshard_wait->stop();
}
cr_registry = crs.release();
+
+ if (use_datacache) {
+ d3n_data_cache = new D3nDataCache();
+ d3n_data_cache->init(cct);
+ }
+
return ret;
}
* is_truncated: if number of objects in the bucket is bigger than max, then
* truncated.
*/
-int RGWRados::Bucket::List::list_objects_unordered(const DoutPrefixProvider *dpp,
+int RGWRados::Bucket::List::list_objects_unordered(const DoutPrefixProvider *dpp,
int64_t max_p,
vector<rgw_bucket_dir_entry> *result,
map<string, bool> *common_prefixes,
std::vector<rgw_bucket_dir_entry> ent_list;
ent_list.reserve(read_ahead);
- int r = store->cls_bucket_list_unordered(dpp,
+ int r = store->cls_bucket_list_unordered(dpp,
target->get_bucket_info(),
shard_id,
cur_marker,
return 0;
}
-int RGWRados::move_rados_obj(const DoutPrefixProvider *dpp,
+int RGWRados::move_rados_obj(const DoutPrefixProvider *dpp,
librados::IoCtx& src_ioctx,
const string& src_oid, const string& src_locator,
librados::IoCtx& dst_ioctx,
/* Execute @handler on last item in bucket listing for bucket specified
* in @bucket_info. @obj_prefix and @obj_delim narrow down the listing
* to objects matching these criterias. */
-int RGWRados::on_last_entry_in_listing(const DoutPrefixProvider *dpp,
+int RGWRados::on_last_entry_in_listing(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
const std::string& obj_prefix,
const std::string& obj_delim,
uint64_t ofs{0};
uint64_t lofs{0}; /* logical ofs */
std::function<int(map<string, bufferlist>&)> attrs_handler;
+
public:
- RGWRadosPutObj(const DoutPrefixProvider *dpp,
+ RGWRadosPutObj(const DoutPrefixProvider *dpp,
CephContext* cct,
CompressorRef& plugin,
boost::optional<RGWPutObj_Compress>& compressor,
progress_data(_progress_data),
attrs_handler(_attrs_handler) {}
+
int process_attrs(void) {
if (extra_data_bl.length()) {
JSONParser jp;
}
-int RGWRados::copy_obj_to_remote_dest(const DoutPrefixProvider *dpp,
+int RGWRados::copy_obj_to_remote_dest(const DoutPrefixProvider *dpp,
RGWObjState *astate,
map<string, bufferlist>& src_attrs,
RGWRados::Object::Read& read_op,
std::vector<rgw_bucket_dir_entry> ent_list;
ent_list.reserve(NUM_ENTRIES);
- int r = cls_bucket_list_unordered(dpp,
+ int r = cls_bucket_list_unordered(dpp,
bucket_info,
RGW_NO_SHARD,
marker,
ctx.invalidate(obj);
}
-int RGWRados::Object::prepare_atomic_modification(const DoutPrefixProvider *dpp,
+int RGWRados::Object::prepare_atomic_modification(const DoutPrefixProvider *dpp,
ObjectWriteOperation& op, bool reset_obj, const string *ptag,
const char *if_match, const char *if_nomatch, bool removal_op,
bool modify_tail, optional_yield y)
return ret;
}
-int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
+int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
int64_t poolid, uint64_t epoch,
real_time& removed_mtime,
list<rgw_obj_index_key> *remove_objs)
return bl.length();
}
-struct get_obj_data {
- RGWRados* store;
- RGWGetDataCB* client_cb;
- rgw::Aio* aio;
- uint64_t offset; // next offset to write to client
- rgw::AioResultList completed; // completed read results, sorted by offset
- optional_yield yield;
-
- get_obj_data(RGWRados* store, RGWGetDataCB* cb, rgw::Aio* aio,
- uint64_t offset, optional_yield yield)
- : store(store), client_cb(cb), aio(aio), offset(offset), yield(yield) {}
-
- int flush(rgw::AioResultList&& results) {
- int r = rgw::check_for_errors(results);
- if (r < 0) {
- return r;
- }
+int get_obj_data::flush(rgw::AioResultList&& results) {
+ int r = rgw::check_for_errors(results);
+ if (r < 0) {
+ return r;
+ }
+ std::list<bufferlist> bl_list;
- auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
- results.sort(cmp); // merge() requires results to be sorted first
- completed.merge(results, cmp); // merge results in sorted order
+ auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+ results.sort(cmp); // merge() requires results to be sorted first
+ completed.merge(results, cmp); // merge results in sorted order
- while (!completed.empty() && completed.front().id == offset) {
- auto bl = std::move(completed.front().data);
- completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
+ while (!completed.empty() && completed.front().id == offset) {
+ auto bl = std::move(completed.front().data);
- offset += bl.length();
- int r = client_cb->handle_data(bl, 0, bl.length());
- if (r < 0) {
- return r;
- }
+ bl_list.push_back(bl);
+ offset += bl.length();
+ int r = client_cb->handle_data(bl, 0, bl.length());
+ if (r < 0) {
+ return r;
}
- return 0;
- }
-
- void cancel() {
- // wait for all completions to drain and ignore the results
- aio->drain();
- }
- int drain() {
- auto c = aio->wait();
- while (!c.empty()) {
- int r = flush(std::move(c));
- if (r < 0) {
- cancel();
- return r;
+ if (rgwrados->get_use_datacache()) {
+ const std::lock_guard l(d3n_get_data.d3n_lock);
+ auto oid = completed.front().obj.get_ref().obj.oid;
+ if (bl.length() <= g_conf()->rgw_get_obj_max_req_size && !d3n_bypass_cache_write) {
+ lsubdout(g_ceph_context, rgw_datacache, 10) << "D3nDataCache: " << __func__ << "(): bl.length <= rgw_get_obj_max_req_size (default 4MB) - write to datacache, bl.length=" << bl.length() << dendl;
+ rgwrados->d3n_data_cache->put(bl, bl.length(), oid);
+ } else {
+ lsubdout(g_ceph_context, rgw_datacache, 10) << "D3nDataCache: " << __func__ << "(): not writing to datacache - bl.length > rgw_get_obj_max_req_size (default 4MB), bl.length=" << bl.length() << " or d3n_bypass_cache_write=" << d3n_bypass_cache_write << dendl;
}
- c = aio->wait();
}
- return flush(std::move(c));
+ completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
}
-};
+ return 0;
+}
-static int _get_obj_iterate_cb(const DoutPrefixProvider *dpp,
+static int _get_obj_iterate_cb(const DoutPrefixProvider *dpp,
const rgw_raw_obj& read_obj, off_t obj_ofs,
off_t read_ofs, off_t len, bool is_head_obj,
RGWObjState *astate, void *arg)
{
- struct get_obj_data *d = (struct get_obj_data *)arg;
-
- return d->store->get_obj_iterate_cb(dpp, read_obj, obj_ofs, read_ofs, len,
+ struct get_obj_data* d = static_cast<struct get_obj_data*>(arg);
+ return d->rgwrados->get_obj_iterate_cb(dpp, read_obj, obj_ofs, read_ofs, len,
is_head_obj, astate, arg);
}
RGWObjState *astate, void *arg)
{
ObjectReadOperation op;
- struct get_obj_data *d = (struct get_obj_data *)arg;
+ struct get_obj_data* d = static_cast<struct get_obj_data*>(arg);
string oid, key;
if (is_head_obj) {
}
}
- auto obj = d->store->svc.rados->obj(read_obj);
+ auto obj = d->rgwrados->svc.rados->obj(read_obj);
int r = obj.open(dpp);
if (r < 0) {
ldpp_dout(dpp, 4) << "failed to open rados context for " << read_obj << dendl;
return ret;
}
-int RGWRados::guard_reshard(const DoutPrefixProvider *dpp,
+int RGWRados::guard_reshard(const DoutPrefixProvider *dpp,
BucketShard *bs,
const rgw_obj& obj_instance,
const RGWBucketInfo& bucket_info,
return 0;
}
-int RGWRados::bucket_index_read_olh_log(const DoutPrefixProvider *dpp,
+int RGWRados::bucket_index_read_olh_log(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info, RGWObjState& state,
const rgw_obj& obj_instance, uint64_t ver_marker,
map<uint64_t, vector<rgw_bucket_olh_log_entry> > *log,
return decode_olh_info(dpp, cct, iter->second, olh);
}
-void RGWRados::check_pending_olh_entries(const DoutPrefixProvider *dpp, map<string, bufferlist>& pending_entries,
+void RGWRados::check_pending_olh_entries(const DoutPrefixProvider *dpp, map<string, bufferlist>& pending_entries,
map<string, bufferlist> *rm_pending_entries)
{
map<string, bufferlist>::iterator iter = pending_entries.begin();
return 0;
}
-int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp,
+int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp,
rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch,
map<string, bufferlist> *attrs, bufferlist *first_chunk,
RGWObjVersionTracker *objv_tracker, optional_yield y)
}
-int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
+int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
const int shard_id,
const rgw_obj_index_key& start_after,
}
-int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
+int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
int shard_id,
const rgw_obj_index_key& start_after,
return r;
}
-int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
+int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
librados::IoCtx io_ctx,
const RGWBucketInfo& bucket_info,
rgw_bucket_dir_entry& list_state,
#include "rgw_trim_bilog.h"
#include "rgw_service.h"
#include "rgw_sal.h"
+#include "rgw_aio.h"
+#include "rgw_d3n_cacherequest.h"
#include "services/svc_rados.h"
#include "services/svc_bi_rados.h"
+#include "common/Throttle.h"
+#include "common/ceph_mutex.h"
+#include "rgw_cache.h"
+
+struct D3nDataCache;
class RGWWatcher;
class SafeTimer;
class RGWReshardWait;
class RGWSysObjectCtx;
+struct get_obj_data;
/* flags for put_obj_meta() */
#define PUT_OBJ_CREATE 0x01
string write_tag;
bool fake_tag{false};
std::optional<RGWObjManifest> manifest;
+
string shadow_obj;
bool has_data{false};
bufferlist data;
};
WRITE_CLASS_ENCODER(objexp_hint_entry)
-class RGWDataChangesLog;
class RGWMetaSyncStatusManager;
class RGWDataSyncStatusManager;
class RGWCoroutinesManagerRegistry;
ceph::mutex lock = ceph::make_mutex("rados_timer_lock");
SafeTimer *timer;
- rgw::sal::RadosStore* store;
+ rgw::sal::RadosStore* store = nullptr;
RGWGC *gc = nullptr;
RGWLC *lc;
RGWObjectExpirer *obj_expirer;
bool follow_olh, optional_yield y, bool assume_noent = false);
int append_atomic_test(const DoutPrefixProvider *dpp, RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
librados::ObjectOperation& op, RGWObjState **state, optional_yield y);
- int append_atomic_test(const DoutPrefixProvider *dpp, const RGWObjState* astate, librados::ObjectOperation& op);
-
+
int update_placement_map();
int store_bucket_info(RGWBucketInfo& info, map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, bool exclusive);
bool use_cache{false};
bool use_gc{true};
+ bool use_datacache{false};
int get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx *ioctx);
public:
return *this;
}
+ RGWRados& set_use_datacache(bool status) {
+ use_datacache = status;
+ return *this;
+ }
+
+ bool get_use_datacache() {
+ return use_datacache;
+ }
+
RGWLC *get_lc() {
return lc;
}
/** Initialize the RADOS instance and prepare to do other ops */
int init_svc(bool raw, const DoutPrefixProvider *dpp);
int init_ctl(const DoutPrefixProvider *dpp);
- int init_rados();
+ virtual int init_rados();
int init_complete(const DoutPrefixProvider *dpp);
int initialize(const DoutPrefixProvider *dpp);
void finalize();
explicit Write(RGWRados::Object *_target) : target(_target) {}
- int _do_write_meta(const DoutPrefixProvider *dpp,
+ int _do_write_meta(const DoutPrefixProvider *dpp,
uint64_t size, uint64_t accounted_size,
map<std::string, bufferlist>& attrs,
bool modify_tail, bool assume_noent,
const string& storage_class,
bufferlist *acl_bl, RGWObjCategory category,
list<rgw_obj_index_key> *remove_objs, const string *user_data = nullptr, bool appendable = false);
- int complete_del(const DoutPrefixProvider *dpp,
+ int complete_del(const DoutPrefixProvider *dpp,
int64_t poolid, uint64_t epoch,
ceph::real_time& removed_mtime, /* mtime of removed object */
list<rgw_obj_index_key> *remove_objs);
RGWRados::Bucket *target;
rgw_obj_key next_marker;
- int list_objects_ordered(const DoutPrefixProvider *dpp,
+ int list_objects_ordered(const DoutPrefixProvider *dpp,
int64_t max,
vector<rgw_bucket_dir_entry> *result,
map<string, bool> *common_prefixes,
bool *is_truncated,
optional_yield y);
- int list_objects_unordered(const DoutPrefixProvider *dpp,
+ int list_objects_unordered(const DoutPrefixProvider *dpp,
int64_t max,
vector<rgw_bucket_dir_entry> *result,
map<string, bool> *common_prefixes,
}; // class List
}; // class Bucket
- int on_last_entry_in_listing(const DoutPrefixProvider *dpp,
+ int on_last_entry_in_listing(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
const std::string& obj_prefix,
const std::string& obj_delim,
ATTRSMOD_MERGE = 2
};
+ D3nDataCache* d3n_data_cache{nullptr};
+
int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y);
int stat_remote_obj(const DoutPrefixProvider *dpp,
uint64_t max_chunk_size, iterate_obj_cb cb, void *arg,
optional_yield y);
- int get_obj_iterate_cb(const DoutPrefixProvider *dpp,
+ int append_atomic_test(const DoutPrefixProvider *dpp, const RGWObjState* astate, librados::ObjectOperation& op);
+
+ virtual int get_obj_iterate_cb(const DoutPrefixProvider *dpp,
const rgw_raw_obj& read_obj, off_t obj_ofs,
off_t read_ofs, off_t len, bool is_head_obj,
RGWObjState *astate, void *arg);
- void get_obj_aio_completion_cb(librados::completion_t cb, void *arg);
-
/**
* a simple object read without keeping state
*/
- int raw_obj_stat(const DoutPrefixProvider *dpp,
+ int raw_obj_stat(const DoutPrefixProvider *dpp,
rgw_raw_obj& obj, uint64_t *psize, ceph::real_time *pmtime, uint64_t *epoch,
map<string, bufferlist> *attrs, bufferlist *first_chunk,
RGWObjVersionTracker *objv_tracker, optional_yield y);
int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op);
int obj_operate(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);
- int guard_reshard(const DoutPrefixProvider *dpp,
+ int guard_reshard(const DoutPrefixProvider *dpp,
BucketShard *bs,
const rgw_obj& obj_instance,
const RGWBucketInfo& bucket_info,
void bucket_index_guard_olh_op(const DoutPrefixProvider *dpp, RGWObjState& olh_state, librados::ObjectOperation& op);
int olh_init_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
int olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
- int bucket_index_link_olh(const DoutPrefixProvider *dpp,
+ int bucket_index_link_olh(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info, RGWObjState& olh_state,
const rgw_obj& obj_instance, bool delete_marker,
const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
int bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info);
int bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
int remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list);
- int move_rados_obj(const DoutPrefixProvider *dpp,
+ int move_rados_obj(const DoutPrefixProvider *dpp,
librados::IoCtx& src_ioctx,
const string& src_oid, const string& src_locator,
librados::IoCtx& dst_ioctx,
* and -errno on other failures. (-ENOENT is not a failure, and it
* will encode that info as a suggested update.)
*/
- int check_disk_state(const DoutPrefixProvider *dpp,
+ int check_disk_state(const DoutPrefixProvider *dpp,
librados::IoCtx io_ctx,
const RGWBucketInfo& bucket_info,
rgw_bucket_dir_entry& list_state,
uint32_t num_shards);
};
+
+struct get_obj_data {
+ RGWRados* rgwrados;
+ RGWGetDataCB* client_cb = nullptr;
+ rgw::Aio* aio;
+ uint64_t offset; // next offset to write to client
+ rgw::AioResultList completed; // completed read results, sorted by offset
+ optional_yield yield;
+
+ get_obj_data(RGWRados* rgwrados, RGWGetDataCB* cb, rgw::Aio* aio,
+ uint64_t offset, optional_yield yield)
+ : rgwrados(rgwrados), client_cb(cb), aio(aio), offset(offset), yield(yield) {}
+ ~get_obj_data() {
+ if (rgwrados->get_use_datacache()) {
+ const std::lock_guard l(d3n_get_data.d3n_lock);
+ }
+ }
+
+ D3nGetObjData d3n_get_data;
+ atomic_bool d3n_bypass_cache_write{false};
+
+ int flush(rgw::AioResultList&& results);
+
+ void cancel() {
+ // wait for all completions to drain and ignore the results
+ aio->drain();
+ }
+
+ int drain() {
+ auto c = aio->wait();
+ while (!c.empty()) {
+ int r = flush(std::move(c));
+ if (r < 0) {
+ cancel();
+ return r;
+ }
+ c = aio->wait();
+ }
+ return flush(std::move(c));
+ }
+};
+
+
#endif
#include "rgw_sal.h"
#include "rgw_sal_rados.h"
+#include "rgw_d3n_datacache.h"
#define dout_subsys ceph_subsys_rgw
rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* dpp, CephContext* cct, const std::string svc, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache, bool use_gc)
{
- rgw::sal::Store* store = nullptr;
if (svc.compare("rados") == 0) {
- store = newStore();
+ rgw::sal::Store* store = newStore();
RGWRados* rados = static_cast<rgw::sal::RadosStore* >(store)->getRados();
if ((*rados).set_use_cache(use_cache)
+ .set_use_datacache(false)
.set_use_gc(use_gc)
.set_run_gc_thread(use_gc_thread)
.set_run_lc_thread(use_lc_thread)
.initialize(cct, dpp) < 0) {
delete store; store = nullptr;
}
+ return store;
}
+ else if (svc.compare("d3n") == 0) {
+ rgw::sal::RadosStore *store = new rgw::sal::RadosStore();
+ RGWRados* rados = new D3nRGWDataCache<RGWRados>;
+ store->setRados(rados);
+ rados->set_store(static_cast<rgw::sal::RadosStore* >(store));
- return store;
+ if ((*rados).set_use_cache(use_cache)
+ .set_use_datacache(true)
+ .set_run_gc_thread(use_gc_thread)
+ .set_run_lc_thread(use_lc_thread)
+ .set_run_quota_threads(quota_threads)
+ .set_run_sync_thread(run_sync_thread)
+ .set_run_reshard_thread(run_reshard_thread)
+ .initialize(cct, dpp) < 0) {
+ delete store; store = nullptr;
+ }
+ return store;
+ }
+
+ return nullptr;
}
rgw::sal::Store* StoreManager::init_raw_storage_provider(const DoutPrefixProvider* dpp, CephContext* cct, const std::string svc)