From: Samarah Date: Wed, 27 Mar 2024 17:42:41 +0000 (+0000) Subject: rgw/d4n: squashing the following commits for miscellaneous fixes X-Git-Tag: v20.3.0~8^2~31 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=70e4cbdc49e2f662163f856d97388295120d2bec;p=ceph.git rgw/d4n: squashing the following commits for miscellaneous fixes to policy driver, redis driver, directory and filter driver. 1. d4n/directory: Use boost::split for simpler code 2. rgw: Lower log levels for failures in D4N and redis cache files 3. rgw: Add dpp and logs to directory, cache, and policy 4. rgw: Reduce Redis calls and fix workflow 5. qa/d4n: Remove D4N task and add S3 user creation to workunit driver script 6. d4n: Use Redis transactions to serialize consecutive requests for safe data handling and faster completion 7. d4n/directory: Remove boost lexical_cast calls 8. rgw/d4n: Add return values to error logs 9. rgw/d4n: Change directory hostsList to use `unordered_set` 10. d4n/filter: Simplify logic for storing block in `handle_data` 11. rgw/policy: Properly delete `LFUDAEntry` instances 12. rgw/d4n: Add support for `dirty` block metadata, `check_bool` for consistent values, and fix directory updates in `cleanup` method Signed-off-by: Samarah --- diff --git a/qa/suites/rgw/d4n/tasks/rgw_d4ntests.yaml b/qa/suites/rgw/d4n/tasks/rgw_d4ntests.yaml index f26e6b32e4e4..9689e21b1189 100644 --- a/qa/suites/rgw/d4n/tasks/rgw_d4ntests.yaml +++ b/qa/suites/rgw/d4n/tasks/rgw_d4ntests.yaml @@ -13,8 +13,6 @@ tasks: client.0: - sudo chmod 0777 /var/lib/ceph - sudo chmod 0777 /var/lib/ceph/radosgw -- d4ntests: - client.0: - workunit: clients: client.0: diff --git a/qa/tasks/d4ntests.py b/qa/tasks/d4ntests.py deleted file mode 100644 index bfd3ebceee40..000000000000 --- a/qa/tasks/d4ntests.py +++ /dev/null @@ -1,109 +0,0 @@ -import logging - -from teuthology import misc as teuthology -from teuthology.task import Task -from teuthology.packaging import remove_package - -log = logging.getLogger(__name__) - -display_name='Foo' -email='foo@foo.com' -access_key='test3' -secret_key='test3' - -class D4NTests(Task): - - def __init__(self, ctx, config): - super(D4NTests, self).__init__(ctx, config) - self.log = log - log.info('D4N Tests: __INIT__ ') - - clients = ['client.{id}'.format(id=id_) - for id_ in teuthology.all_roles_of_type(self.ctx.cluster, 'client')] - self.all_clients = [] - for client in clients: - if client in self.config: - self.all_clients.extend([client]) - if self.all_clients is None: - self.all_clients = 'client.0' - - self.user = {'s3main': 'tester'} - - def setup(self): - super(D4NTests, self).setup() - log.info('D4N Tests: SETUP') - - def begin(self): - super(D4NTests, self).begin() - log.info('D4N Tests: BEGIN') - - for (host, roles) in self.ctx.cluster.remotes.items(): - log.debug('D4N Tests: Cluster config is: {cfg}'.format(cfg=roles)) - log.debug('D4N Tests: Host is: {host}'.format(host=host)) - - self.create_user() - - def end(self): - super(D4NTests, self).end() - log.info('D4N Tests: END') - - for client in self.all_clients: - self.remove_packages(client) - self.delete_user(client) - - def create_user(self): - log.info("D4N Tests: Creating S3 user...") - testdir = teuthology.get_testdir(self.ctx) - - for client in self.all_clients: - for user in list(self.user.items()): - s3_user_id = 's3main' - log.debug( - 'D4N Tests: Creating user {s3_user_id}'.format(s3_user_id=s3_user_id)) - cluster_name, daemon_type, client_id = teuthology.split_role( - client) - client_with_id = daemon_type + '.' + client_id - self.ctx.cluster.only(client).run( - args=[ - 'sudo', - 'adjust-ulimits', - 'ceph-coverage', - '{tdir}/archive/coverage'.format(tdir=testdir), - 'radosgw-admin', - '-n', client_with_id, - 'user', 'create', - '--uid', s3_user_id, - '--display-name', display_name, - '--access-key', access_key, - '--secret', secret_key, - '--email', email, - '--cluster', cluster_name, - ], - ) - - def remove_packages(self, client): - (remote,) = self.ctx.cluster.only(client).remotes.keys() - remove_package('s3cmd', remote) - - def delete_user(self, client): - log.info("D4N Tests: Deleting S3 user...") - testdir = teuthology.get_testdir(self.ctx) - - for user in self.user.items(): - s3_user_id = 's3main' - self.ctx.cluster.only(client).run( - args=[ - 'sudo', - 'adjust-ulimits', - 'ceph-coverage', - '{tdir}/archive/coverage'.format(tdir=testdir), - 'radosgw-admin', - '-n', client, - 'user', 'rm', - '--uid', s3_user_id, - '--purge-data', - '--cluster', 'ceph', - ], - ) - -task = D4NTests diff --git a/qa/workunits/rgw/run-d4n.sh b/qa/workunits/rgw/run-d4n.sh index ca2f65f6c769..22ba9efed9e4 100755 --- a/qa/workunits/rgw/run-d4n.sh +++ b/qa/workunits/rgw/run-d4n.sh @@ -9,6 +9,9 @@ pip install redis pip install configobj pip install boto3 +# create user +radosgw-admin user create --uid=test3 --display-name=test3 --access-key=test3 --secret-key=test3 2>/dev/null + # run test $mydir/bin/python3 $mydir/test_rgw_d4n.py diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 96dac74890e8..d90bcdcf8149 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -1,4 +1,5 @@ #include +#include #include "common/async/blocked_completion.h" #include "common/dout.h" #include "d4n_directory.h" @@ -33,11 +34,11 @@ auto async_exec(std::shared_ptr conn, initiate_exec{std::move(conn)}, token, req, resp); } -template +template void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, const boost::redis::request& req, - boost::redis::response& resp, optional_yield y) + boost::redis::response& resp, optional_yield y) { if (y) { auto yield = y.get_yield_context(); @@ -47,12 +48,22 @@ void redis_exec(std::shared_ptr conn, } } +int check_bool(std::string str) { + if (str == "true" || str == "1") { + return 1; + } else if (str == "false" || str == "0") { + return 0; + } else { + return -EINVAL; + } +} + std::string ObjectDirectory::build_index(CacheObj* object) { return object->bucketName + "_" + object->objName; } -int ObjectDirectory::exist_key(CacheObj* object, optional_yield y) +int ObjectDirectory::exist_key(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y) { std::string key = build_index(object); response resp; @@ -64,18 +75,24 @@ int ObjectDirectory::exist_key(CacheObj* object, optional_yield y) redis_exec(conn, ec, req, resp, y); - if (ec) - return false; - } catch (std::exception &e) {} + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } return std::get<0>(resp).value(); } -int ObjectDirectory::set(CacheObj* object, optional_yield y) +int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y) { + /* For existing keys, call get method beforehand. + Sets completely overwrite existing values. */ std::string key = build_index(object); - - /* Every set will be treated as new */ + std::string endpoint; std::list redisValues; @@ -87,6 +104,13 @@ int ObjectDirectory::set(CacheObj* object, optional_yield y) redisValues.push_back("creationTime"); redisValues.push_back(object->creationTime); redisValues.push_back("dirty"); + int ret = -1; + if ((ret = check_bool(std::to_string(object->dirty))) != -EINVAL) { + object->dirty = (ret != 0); + } else { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl; + return -EINVAL; + } redisValues.push_back(std::to_string(object->dirty)); redisValues.push_back("objHosts"); @@ -104,201 +128,180 @@ int ObjectDirectory::set(CacheObj* object, optional_yield y) try { boost::system::error_code ec; + response resp; request req; - req.push_range("HMSET", key, redisValues); - response resp; + req.push_range("HSET", key, redisValues); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } return 0; } -int ObjectDirectory::get(CacheObj* object, optional_yield y) +int ObjectDirectory::get(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y) { std::string key = build_index(object); + std::vector fields; - if (exist_key(object, y)) { - std::vector fields; + fields.push_back("objName"); + fields.push_back("bucketName"); + fields.push_back("creationTime"); + fields.push_back("dirty"); + fields.push_back("objHosts"); - fields.push_back("objName"); - fields.push_back("bucketName"); - fields.push_back("creationTime"); - fields.push_back("dirty"); - fields.push_back("objHosts"); - - try { - boost::system::error_code ec; - request req; - req.push_range("HMGET", key, fields); - response< std::vector > resp; - - redis_exec(conn, ec, req, resp, y); - - if (std::get<0>(resp).value().empty()) { - return -ENOENT; - } else if (ec) { - return -ec.value(); - } + try { + boost::system::error_code ec; + response< std::vector > resp; + request req; + req.push_range("HMGET", key, fields); - object->objName = std::get<0>(resp).value()[0]; - object->bucketName = std::get<0>(resp).value()[1]; - object->creationTime = std::get<0>(resp).value()[2]; - object->dirty = boost::lexical_cast(std::get<0>(resp).value()[3]); + redis_exec(conn, ec, req, resp, y); - { - std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[4])); + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } - while (!ss.eof()) { - std::string host; - std::getline(ss, host, '_'); - object->hostsList.push_back(host); - } - } - } catch (std::exception &e) { - return -EINVAL; + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values returned." << dendl; + return -ENOENT; } - } else { - return -ENOENT; + + object->objName = std::get<0>(resp).value()[0]; + object->bucketName = std::get<0>(resp).value()[1]; + object->creationTime = std::get<0>(resp).value()[2]; + object->dirty = std::stoi(std::get<0>(resp).value()[3]); + boost::split(object->hostsList, std::get<0>(resp).value()[4], boost::is_any_of("_")); + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } return 0; } /* Note: This method is not compatible for use on Ubuntu systems. */ -int ObjectDirectory::copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y) +int ObjectDirectory::copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y) { std::string key = build_index(object); auto copyObj = CacheObj{ .objName = copyName, .bucketName = copyBucketName }; std::string copyKey = build_index(©Obj); - if (exist_key(object, y)) { - try { - response resp; - - { - boost::system::error_code ec; - request req; - req.push("COPY", key, copyKey); - - redis_exec(conn, ec, req, resp, y); - - if (ec) { - return -ec.value(); - } - } - - { - boost::system::error_code ec; - request req; - req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName); - response res; + try { + boost::system::error_code ec; + response< + ignore_t, + ignore_t, + ignore_t, + response, std::optional> + > resp; + request req; + req.push("MULTI"); + req.push("COPY", key, copyKey); + req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName); + req.push("EXEC"); - redis_exec(conn, ec, req, res, y); + redis_exec(conn, ec, req, resp, y); - if (ec) { - return -ec.value(); - } - } + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } - return std::get<0>(resp).value() - 1; - } catch (std::exception &e) { - return -EINVAL; + if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) { + return 0; + } else { + return -ENOENT; } - } else { - return -ENOENT; + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } } -int ObjectDirectory::del(CacheObj* object, optional_yield y) +int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y) { std::string key = build_index(object); - if (exist_key(object, y)) { - try { - boost::system::error_code ec; - request req; - req.push("DEL", key); - response resp; - - redis_exec(conn, ec, req, resp, y); + try { + boost::system::error_code ec; + response resp; + request req; + req.push("DEL", key); - if (ec) { - return -ec.value(); - } + redis_exec(conn, ec, req, resp, y); - return std::get<0>(resp).value() - 1; - } catch (std::exception &e) { - return -EINVAL; + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); } - } else { - return 0; /* No delete was necessary */ + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } + + return 0; } -int ObjectDirectory::update_field(CacheObj* object, std::string field, std::string value, optional_yield y) +int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y) { std::string key = build_index(object); - if (exist_key(object, y)) { + if (exist_key(dpp, object, y)) { try { - /* Ensure field exists */ - { - boost::system::error_code ec; - request req; - req.push("HEXISTS", key, field); - response resp; - - redis_exec(conn, ec, req, resp, y); - - if (!std::get<0>(resp).value()) { - return -ENOENT; - } else if (ec) { - return -ec.value(); - } - } - if (field == "objHosts") { /* Append rather than overwrite */ + ldpp_dout(dpp, 20) << "ObjectDirectory::" << __func__ << "() Appending to hosts list." << dendl; + boost::system::error_code ec; + response resp; request req; req.push("HGET", key, field); - response resp; redis_exec(conn, ec, req, resp, y); - if (std::get<0>(resp).value().empty()) { - return -ENOENT; - } else if (ec) { + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } + /* If entry exists, it should have at least one host */ std::get<0>(resp).value() += "_"; std::get<0>(resp).value() += value; value = std::get<0>(resp).value(); + } else if (field == "dirty") { + int ret = -1; + if ((ret = check_bool(value)) != -EINVAL) { + value = std::to_string((ret != 0)); + } else { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl; + return -EINVAL; + } } - { - boost::system::error_code ec; - request req; - req.push_range("HSET", key, std::map{{field, value}}); - response resp; - - redis_exec(conn, ec, req, resp, y); + boost::system::error_code ec; + response resp; + request req; + req.push("HSET", key, field, value); - if (ec) { - return -ec.value(); - } + redis_exec(conn, ec, req, resp, y); - return std::get<0>(resp).value(); + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); } + + return 0; } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } } else { @@ -323,18 +326,24 @@ int BlockDirectory::exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, redis_exec(conn, ec, req, resp, y); - if (ec) + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return false; - } catch (std::exception &e) {} + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } return std::get<0>(resp).value(); } int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { + /* For existing keys, call get method beforehand. + Sets completely overwrite existing values. */ std::string key = build_index(block); - /* Every set will be treated as new */ std::string endpoint; std::list redisValues; @@ -343,6 +352,15 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option redisValues.push_back(std::to_string(block->blockID)); redisValues.push_back("version"); redisValues.push_back(block->version); + redisValues.push_back("dirtyBlock"); + int ret = -1; + if ((ret = check_bool(std::to_string(block->dirty))) != -EINVAL) { + block->dirty = (ret != 0); + } else { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl; + return -EINVAL; + } + redisValues.push_back(std::to_string(block->dirty)); redisValues.push_back("size"); redisValues.push_back(std::to_string(block->size)); redisValues.push_back("globalWeight"); @@ -367,12 +385,12 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option redisValues.push_back(block->cacheObj.bucketName); redisValues.push_back("creationTime"); redisValues.push_back(block->cacheObj.creationTime); - redisValues.push_back("dirty"); - if (block->cacheObj.dirty == true || block->cacheObj.dirty == 1) { - block->cacheObj.dirty = 1; - } - if (block->cacheObj.dirty == false || block->cacheObj.dirty == 0) { - block->cacheObj.dirty = 0; + redisValues.push_back("dirtyObj"); + if ((ret = check_bool(std::to_string(block->cacheObj.dirty))) != -EINVAL) { + block->cacheObj.dirty = (ret != 0); + } else { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl; + return -EINVAL; } redisValues.push_back(std::to_string(block->cacheObj.dirty)); redisValues.push_back("objHosts"); @@ -392,16 +410,18 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option try { boost::system::error_code ec; + response resp; request req; - req.push_range("HMSET", key, redisValues); - response resp; + req.push_range("HSET", key, redisValues); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } @@ -411,120 +431,96 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); + std::vector fields; ldpp_dout(dpp, 10) << __func__ << "(): index is: " << key << dendl; - if (exist_key(dpp, block, y)) { - std::vector fields; - - fields.push_back("blockID"); - fields.push_back("version"); - fields.push_back("size"); - fields.push_back("globalWeight"); - fields.push_back("blockHosts"); - - fields.push_back("objName"); - fields.push_back("bucketName"); - fields.push_back("creationTime"); - fields.push_back("dirty"); - fields.push_back("objHosts"); - - try { - boost::system::error_code ec; - request req; - req.push_range("HMGET", key, fields); - response< std::vector > resp; + fields.push_back("blockID"); + fields.push_back("version"); + fields.push_back("dirtyBlock"); + fields.push_back("size"); + fields.push_back("globalWeight"); + fields.push_back("blockHosts"); - redis_exec(conn, ec, req, resp, y); + fields.push_back("objName"); + fields.push_back("bucketName"); + fields.push_back("creationTime"); + fields.push_back("dirtyObj"); + fields.push_back("objHosts"); - if (std::get<0>(resp).value().empty()) { - return -ENOENT; - } else if (ec) { - return -ec.value(); - } + try { + boost::system::error_code ec; + response< std::optional> > resp; + request req; + req.push_range("HMGET", key, fields); - block->blockID = boost::lexical_cast(std::get<0>(resp).value()[0]); - block->version = std::get<0>(resp).value()[1]; - block->size = boost::lexical_cast(std::get<0>(resp).value()[2]); - block->globalWeight = boost::lexical_cast(std::get<0>(resp).value()[3]); - { - std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[4])); - block->hostsList.clear(); - - while (!ss.eof()) { // Replace with boost::split? -Sam - std::string host; - std::getline(ss, host, '_'); - block->hostsList.push_back(host); - } - } + redis_exec(conn, ec, req, resp, y); - block->cacheObj.objName = std::get<0>(resp).value()[5]; - block->cacheObj.bucketName = std::get<0>(resp).value()[6]; - block->cacheObj.creationTime = std::get<0>(resp).value()[7]; - block->cacheObj.dirty = boost::lexical_cast(std::get<0>(resp).value()[8]); - block->dirty = boost::lexical_cast(std::get<0>(resp).value()[8]); - { - std::stringstream ss(boost::lexical_cast(std::get<0>(resp).value()[9])); - block->cacheObj.hostsList.clear(); - - while (!ss.eof()) { - std::string host; - std::getline(ss, host, '_'); - block->cacheObj.hostsList.push_back(host); - } - } - } catch (std::exception &e) { - return -EINVAL; + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); } - } else { - return -ENOENT; + + if (std::get<0>(resp).value().value().empty()) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values returned." << dendl; + return -ENOENT; + } + + block->blockID = std::stoull(std::get<0>(resp).value().value()[0]); + block->version = std::get<0>(resp).value().value()[1]; + block->dirty = std::stoi(std::get<0>(resp).value().value()[2]); + block->size = std::stoull(std::get<0>(resp).value().value()[3]); + block->globalWeight = std::stoull(std::get<0>(resp).value().value()[4]); + boost::split(block->hostsList, std::get<0>(resp).value().value()[5], boost::is_any_of("_")); + block->cacheObj.objName = std::get<0>(resp).value().value()[6]; + block->cacheObj.bucketName = std::get<0>(resp).value().value()[7]; + block->cacheObj.creationTime = std::get<0>(resp).value().value()[8]; + block->cacheObj.dirty = std::stoi(std::get<0>(resp).value().value()[9]); + boost::split(block->cacheObj.hostsList, std::get<0>(resp).value().value()[10], boost::is_any_of("_")); + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } return 0; } /* Note: This method is not compatible for use on Ubuntu systems. */ -int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y) +int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y) { std::string key = build_index(block); auto copyBlock = CacheBlock{ .cacheObj = { .objName = copyName, .bucketName = copyBucketName }, .blockID = 0 }; std::string copyKey = build_index(©Block); - if (exist_key(dpp, block, y)) { - try { - response resp; - - { - boost::system::error_code ec; - request req; - req.push("COPY", key, copyKey); - - redis_exec(conn, ec, req, resp, y); - - if (ec) { - return -ec.value(); - } - } - - { - boost::system::error_code ec; - request req; - req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName); - response res; + try { + boost::system::error_code ec; + response< + ignore_t, + ignore_t, + ignore_t, + response, std::optional> + > resp; + request req; + req.push("MULTI"); + req.push("COPY", key, copyKey); + req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName); + req.push("EXEC"); - redis_exec(conn, ec, req, res, y); + redis_exec(conn, ec, req, resp, y); - if (ec) { - return -ec.value(); - } - } + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } - return std::get<0>(resp).value() - 1; - } catch (std::exception &e) { - return -EINVAL; + if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) { + return 0; + } else { + return -ENOENT; } - } else { - return -ENOENT; + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } } @@ -532,92 +528,77 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option { std::string key = build_index(block); - if (exist_key(dpp, block, y)) { - try { - boost::system::error_code ec; - request req; - req.push("DEL", key); - response resp; - - redis_exec(conn, ec, req, resp, y); + try { + boost::system::error_code ec; + response resp; + request req; + req.push("DEL", key); - if (ec) { - return -ec.value(); - } + redis_exec(conn, ec, req, resp, y); - return std::get<0>(resp).value() - 1; - } catch (std::exception &e) { - return -EINVAL; + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); } - } else { - return 0; /* No delete was necessary */ + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } + + return 0; } -int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y) +int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y) { std::string key = build_index(block); if (exist_key(dpp, block, y)) { try { - /* Ensure field exists */ - { - boost::system::error_code ec; - request req; - req.push("HEXISTS", key, field); - response resp; - - redis_exec(conn, ec, req, resp, y); - - if (!std::get<0>(resp).value()) { - return -ENOENT; - } else if (ec) { - return -ec.value(); - } - } - if (field == "blockHosts") { /* Append rather than overwrite */ + ldpp_dout(dpp, 20) << "BlockDirectory::" << __func__ << "() Appending to hosts list." << dendl; + boost::system::error_code ec; + response< std::optional > resp; request req; req.push("HGET", key, field); - response resp; redis_exec(conn, ec, req, resp, y); - if (std::get<0>(resp).value().empty()) { - return -ENOENT; - } else if (ec) { + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - std::get<0>(resp).value() += "_"; - std::get<0>(resp).value() += value; - value = std::get<0>(resp).value(); + /* If entry exists, it should have at least one host */ + std::get<0>(resp).value().value() += "_"; + std::get<0>(resp).value().value() += value; + value = std::get<0>(resp).value().value(); + } else if (field == "dirtyObj" || field == "dirtyBlock") { + int ret = -1; + if ((ret = check_bool(value)) != -EINVAL) { + value = std::to_string((ret != 0)); + } else { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl; + return -EINVAL; + } } - if (field == "dirty") { - if (value == "true" || value == "1") { - value = "1"; - } - if (value == "false" || value == "0") { - value = "0"; - } - } - { - boost::system::error_code ec; - request req; - req.push_range("HSET", key, std::map{{field, value}}); - response resp; - redis_exec(conn, ec, req, resp, y); + boost::system::error_code ec; + response resp; + request req; + req.push("HSET", key, field, value); - if (ec) { - return -ec.value(); - } + redis_exec(conn, ec, req, resp, y); - return std::get<0>(resp).value(); + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); } + + return 0; } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } } else { @@ -625,78 +606,68 @@ int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* bloc } } -int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& delValue, optional_yield y) +int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y) { std::string key = build_index(block); - if (exist_key(dpp, block, y)) { - try { - /* Ensure field exists */ - { - boost::system::error_code ec; - request req; - req.push("HEXISTS", key, "blockHosts"); - response resp; + try { + { + boost::system::error_code ec; + response< std::optional > resp; + request req; + req.push("HGET", key, "blockHosts"); - redis_exec(conn, ec, req, resp, y); + redis_exec(conn, ec, req, resp, y); - if (!std::get<0>(resp).value()) { - return -ENOENT; - } else if (ec) { - return -ec.value(); - } + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); } - { - boost::system::error_code ec; - request req; - req.push("HGET", key, "blockHosts"); - response resp; - - redis_exec(conn, ec, req, resp, y); - - if (std::get<0>(resp).value().empty()) { - return -ENOENT; - } else if (ec) { - return -ec.value(); - } - - if (std::get<0>(resp).value().find("_") == std::string::npos) /* Last host, delete entirely */ - return del(dpp, block, y); - - std::string result = std::get<0>(resp).value(); - auto it = result.find(delValue); - if (it != std::string::npos) - result.erase(result.begin() + it, result.begin() + it + delValue.size()); - else - return -ENOENT; + if (std::get<0>(resp).value().value().empty()) { + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values returned." << dendl; + return -ENOENT; + } - if (result[0] == '_') - result.erase(0, 1); + std::string result = std::get<0>(resp).value().value(); + auto it = result.find(value); + if (it != std::string::npos) { + result.erase(result.begin() + it, result.begin() + it + value.size()); + } else { + return -ENOENT; + } - delValue = result; + if (result[0] == '_') { + result.erase(0, 1); + } else if (result.length() && result[result.length() - 1] == '_') { + result.erase(result.length() - 1, 1); } - { - boost::system::error_code ec; - request req; - req.push_range("HSET", key, std::map{{"blockHosts", delValue}}); - response resp; + if (result.length() == 0) /* Last host, delete entirely */ + return del(dpp, block, y); - redis_exec(conn, ec, req, resp, y); + value = result; + } - if (ec) { - return -ec.value(); - } + { + boost::system::error_code ec; + response resp; + request req; + req.push("HSET", key, "blockHosts", value); + + redis_exec(conn, ec, req, resp, y); - return std::get<0>(resp).value(); + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); } - } catch (std::exception &e) { - return -EINVAL; } - } else { - return -ENOENT; + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; } + + return 0; } } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 9a7a0dfe3c62..ad1fc6e775ef 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -2,7 +2,6 @@ #include "rgw_common.h" -#include #include #include @@ -13,13 +12,14 @@ using boost::redis::config; using boost::redis::connection; using boost::redis::request; using boost::redis::response; +using boost::redis::ignore_t; struct CacheObj { std::string objName; /* S3 object name */ std::string bucketName; /* S3 bucket name */ std::string creationTime; /* Creation time of the S3 Object */ bool dirty{false}; - std::vector hostsList; /* List of hostnames of object locations for multiple backends */ + std::unordered_set hostsList; /* List of hostnames of object locations for multiple backends */ }; struct CacheBlock { @@ -29,13 +29,11 @@ struct CacheBlock { bool dirty{false}; uint64_t size; /* Block size in bytes */ int globalWeight = 0; /* LFUDA policy variable */ - std::vector hostsList; /* List of hostnames of block locations */ + std::unordered_set hostsList; /* List of hostnames of block locations */ }; class Directory { public: - CephContext* cct; - Directory() {} }; @@ -43,16 +41,13 @@ class ObjectDirectory: public Directory { public: ObjectDirectory(std::shared_ptr& conn) : conn(conn) {} - void init(CephContext* cct) { - this->cct = cct; - } - int exist_key(CacheObj* object, optional_yield y); + int exist_key(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); - int set(CacheObj* object, optional_yield y); - int get(CacheObj* object, optional_yield y); - int copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y); - int del(CacheObj* object, optional_yield y); - int update_field(CacheObj* object, std::string field, std::string value, optional_yield y); + int set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); /* If nx is true, set only if key doesn't exist */ + int get(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); + int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y); + int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); + int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y); private: std::shared_ptr conn; @@ -64,16 +59,13 @@ class BlockDirectory: public Directory { public: BlockDirectory(std::shared_ptr& conn) : conn(conn) {} - void init(CephContext* cct) { - this->cct = cct; - } int exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); - int copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y); + int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); - int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y); + int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y); int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y); private: diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index f7b00624a6e1..1354c3418e90 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -35,7 +35,7 @@ auto async_exec(std::shared_ptr conn, } template -void redis_exec(std::shared_ptr conn, +static inline void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, const boost::redis::request& req, boost::redis::response& resp, optional_yield y) @@ -49,64 +49,47 @@ void redis_exec(std::shared_ptr conn, } int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) { - this->cct = cct; - dir->init(cct); + response resp; + driver = _driver; tc = std::thread(&CachePolicy::cleaning, this, dpp); tc.detach(); - int result = 0; - response resp; try { boost::system::error_code ec; + response< + ignore_t, + ignore_t, + ignore_t, + response, std::optional> + > resp; request req; - req.push("HEXISTS", "lfuda", "age"); - req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum)); /* New cache node will always have the minimum average weight */ - req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size())); - req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_d4n_l1_datacache_address); + req.push("MULTI"); + req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum), /* New cache node will always have the minimum average weight */ + "minLocalWeights_size", std::to_string(entries_map.size()), + "minLocalWeights_address", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + req.push("HSETNX", "lfuda", "age", age); /* Only set maximum age if it doesn't exist */ + req.push("EXEC"); redis_exec(conn, ec, req, resp, y); if (ec) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - - result = std::min(std::get<1>(resp).value(), std::min(std::get<2>(resp).value(), std::get<3>(resp).value())); } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; } - if (!std::get<0>(resp).value()) { /* Only set maximum age if it doesn't exist */ - try { - boost::system::error_code ec; - response value; - request req; - req.push("HSET", "lfuda", "age", age); - - redis_exec(conn, ec, req, value, y); - - if (ec) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; - return -ec.value(); - } - - result = std::min(result, std::get<0>(value).value()); - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; - return -EINVAL; - } - } - asio::co_spawn(io_context.get_executor(), redis_sync(dpp, y), asio::detached); - return result; + return 0; } int LFUDAPolicy::age_sync(const DoutPrefixProvider* dpp, optional_yield y) { - response resp; + response< std::optional > resp; try { boost::system::error_code ec; @@ -116,100 +99,99 @@ int LFUDAPolicy::age_sync(const DoutPrefixProvider* dpp, optional_yield y) { redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } } catch (std::exception &e) { return -EINVAL; } - if (age > std::stoi(std::get<0>(resp).value()) || std::get<0>(resp).value().empty()) { /* Set new maximum age */ + if (std::get<0>(resp).value().value().empty() || age > std::stoi(std::get<0>(resp).value().value())) { /* Set new maximum age */ try { boost::system::error_code ec; + response ret; request req; - response value; req.push("HSET", "lfuda", "age", age); - redis_exec(conn, ec, req, resp, y); + + redis_exec(conn, ec, req, ret, y); if (ec) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - - return std::get<0>(value).value(); } catch (std::exception &e) { return -EINVAL; } } else { - age = std::stoi(std::get<0>(resp).value()); - return 0; + age = std::stoi(std::get<0>(resp).value().value()); } + + return 0; } int LFUDAPolicy::local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y) { - int result; - if (fabs(weightSum - postedSum) > (postedSum * 0.1)) { - response resp; + response> resp; try { boost::system::error_code ec; request req; - req.push("HGET", "lfuda", "minLocalWeights_sum"); - req.push("HGET", "lfuda", "minLocalWeights_size"); + req.push("HMGET", "lfuda", "minLocalWeights_sum", "minLocalWeights_size"); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } } catch (std::exception &e) { return -EINVAL; } - float minAvgWeight = std::stof(std::get<0>(resp).value()) / std::stof(std::get<1>(resp).value()); + float minAvgWeight = std::stof(std::get<0>(resp).value()[0]) / std::stof(std::get<0>(resp).value()[1]); if ((static_cast(weightSum) / static_cast(entries_map.size())) < minAvgWeight) { /* Set new minimum weight */ try { boost::system::error_code ec; + response resp; request req; - response value; - req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum)); - req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size())); - req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_d4n_l1_datacache_address); + req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum), + "minLocalWeights_size", std::to_string(entries_map.size()), + "minLocalWeights_address", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - - result = std::min(std::get<0>(value).value(), std::get<1>(value).value()); - result = std::min(result, std::get<2>(value).value()); } catch (std::exception &e) { return -EINVAL; } } else { - weightSum = std::stoi(std::get<0>(resp).value()); - postedSum = std::stoi(std::get<0>(resp).value()); + weightSum = std::stoi(std::get<0>(resp).value()[0]); + postedSum = std::stoi(std::get<0>(resp).value()[0]); } } try { /* Post update for local cache */ boost::system::error_code ec; + response resp; request req; - response resp; - req.push("HSET", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, "avgLocalWeight_sum", std::to_string(weightSum)); - req.push("HSET", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, "avgLocalWeight_size", std::to_string(entries_map.size())); + req.push("HSET", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, "avgLocalWeight_sum", std::to_string(weightSum), + "avgLocalWeight_size", std::to_string(entries_map.size())); + redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - result = std::min(std::get<0>(resp).value(), std::get<1>(resp).value()); + return 0; } catch (std::exception &e) { return -EINVAL; } - - return result; } asio::awaitable LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, optional_yield y) { @@ -221,23 +203,22 @@ asio::awaitable LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, opt for (;;) try { /* Update age */ if (int ret = age_sync(dpp, y) < 0) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl; + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ret << dendl; } /* Update minimum local weight sum */ if (int ret = local_weight_sync(dpp, y) < 0) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl; + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ret << dendl; } int interval = dpp->get_cct()->_conf->rgw_lfuda_sync_frequency; rthread_timer->expires_after(std::chrono::seconds(interval)); co_await rthread_timer->async_wait(asio::use_awaitable); } catch (sys::system_error& e) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; - if (e.code() == asio::error::operation_aborted) { break; } else { + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl; continue; } } @@ -258,7 +239,7 @@ CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optiona victim->blockID = entries_heap.top()->offset; victim->size = entries_heap.top()->len; - if (dir->get(dpp, victim, y) < 0) { + if (blockDir->get(dpp, victim, y) < 0) { return nullptr; } @@ -281,7 +262,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional CacheBlock* victim = get_victim_block(dpp, y); if (victim == nullptr) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl; + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl; delete victim; return -ENOENT; } @@ -295,12 +276,12 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional } // check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty if (it->second->dirty) { - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl; + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl; return -ENOENT; } int avgWeight = weightSum / entries_map.size(); - if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */ + if (victim->hostsList.size() == 1 && *(victim->hostsList.begin()) == dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */ if (victim->globalWeight) { it->second->localWeight += victim->globalWeight; (*it->second->handle)->localWeight = it->second->localWeight; @@ -312,11 +293,6 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional } victim->globalWeight = 0; - auto globalWeight = std::to_string(victim->globalWeight); - if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) { - delete victim; - return ret; - } } if (it->second->localWeight > avgWeight) { @@ -326,13 +302,12 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional } victim->globalWeight += it->second->localWeight; - auto globalWeight = std::to_string(victim->globalWeight); - if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) { + if (int ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) { delete victim; return ret; } - if (int ret = dir->remove_host(dpp, victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) { + if (int ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y) < 0) { delete victim; return ret; } @@ -355,7 +330,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional return 0; } -void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) +void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) { using handle_type = boost::heap::fibonacci_heap>>::handle_type; const std::lock_guard l(lfuda_lock); @@ -385,8 +360,9 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64 } if (updateLocalWeight) { - if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) - ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl; + int ret = -1; + if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y)) < 0) + ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl; } weightSum += ((localWeight < 0) ? 0 : localWeight); @@ -415,6 +391,8 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o entries_heap.erase(p->second->handle); entries_map.erase(p); + delete p->second; + p->second = nullptr; return true; } @@ -430,13 +408,14 @@ bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key object_heap.erase(p->second->handle); o_entries_map.erase(p); delete p->second; + p->second = nullptr; return true; } void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) { - const int interval = cct->_conf->rgw_d4n_cache_cleaning_interval; + const int interval = dpp->get_cct()->_conf->rgw_d4n_cache_cleaning_interval; while(!quit) { ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl; std::string name = ""; @@ -507,8 +486,8 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) int op_ret = processor->prepare(null_yield); if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl; - break; + ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl; + break; } std::string prefix = b_name + "_" + e->version + "_" + c_obj->get_name(); @@ -531,9 +510,9 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) do { ceph::bufferlist data; if (fst >= lst){ - break; + break; } - off_t cur_size = std::min(fst + cct->_conf->rgw_max_chunk_size, lst); + off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); off_t cur_len = cur_size - fst; std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len); ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl; @@ -549,9 +528,9 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) op_ret = filter->process(std::move(data), ofs); if (op_ret < 0) { - ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret=" - << op_ret << dendl; - return; + ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret=" + << op_ret << dendl; + return; } ofs += len; @@ -572,9 +551,9 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) fst = 0; do { if (fst >= lst) { - break; + break; } - off_t cur_size = std::min(fst + cct->_conf->rgw_max_chunk_size, lst); + off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); off_t cur_len = cur_size - fst; std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len); @@ -591,10 +570,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) block.size = cur_len; block.blockID = fst; std::string dirty = "false"; - op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield); + op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield); if (op_ret < 0) { - ldpp_dout(dpp, 5) << __func__ << "updating dirty flag in Block directory failed!" << dendl; - return; + ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl; + return; } fst += cur_len; } while(fst < lst); @@ -611,10 +590,10 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) block.size = 0; block.blockID = 0; if (c_obj->have_instance()) { - dir->get(dpp, &block, null_yield); + blockDir->get(dpp, &block, null_yield); if (block.version == c_obj->get_instance()) { //versioned case - update head block entry that has latest version std::string dirty = "false"; - op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield); + op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield); if (op_ret < 0) { ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl; //return; @@ -622,7 +601,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) } } else { //non-versioned case std::string dirty = "false"; - op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield); + op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield); if (op_ret < 0) { ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl; //return; @@ -635,13 +614,35 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) instance_block.cacheObj.objName = c_obj->get_oid(); instance_block.size = 0; instance_block.blockID = 0; - op_ret = dir->update_field(dpp, &instance_block, "dirty", "false", null_yield); + op_ret = blockDir->update_field(dpp, &instance_block, "dirty", "false", null_yield); if (op_ret < 0) { ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl; //return; } } + + op_ret = blockDir->update_field(dpp, &block, "dirtyBlock", "false", null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory for head failed, ret=" << op_ret << dendl; + return; + } + //remove entry from map and queue, eraseObj locks correctly + rgw::d4n::CacheObj obj; + obj.bucketName = c_obj->get_bucket()->get_name(); + obj.objName = c_obj->get_key().get_oid(); + op_ret = objDir->update_field(dpp, &obj, "dirty", "false", null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in object directory failed, ret=" << op_ret << dendl; + return; + } + + op_ret = blockDir->update_field(dpp, &block, "dirtyObj", "false", null_yield); + if (op_ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl; + return; + } + eraseObj(dpp, e->key, null_yield); } else { //end-if std::difftime(time(NULL), e->creationTime) > interval std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised? @@ -649,7 +650,6 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) } //end-while true } - int LRUPolicy::exist_key(std::string key) { const std::lock_guard l(lru_lock); @@ -670,7 +670,7 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y entries_lru_list.pop_front_and_dispose(Entry_delete_disposer()); auto ret = cacheDriver->delete_data(dpp, p.key, y); if (ret < 0) { - ldpp_dout(dpp, 10) << __func__ << "(): Failed to delete data from the cache backend: " << ret << dendl; + ldpp_dout(dpp, 0) << __func__ << "(): Failed to delete data from the cache backend, ret=" << ret << dendl; return ret; } @@ -680,7 +680,7 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y return 0; } -void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) +void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) { const std::lock_guard l(lru_lock); _erase(dpp, key, y); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 6301028b1cad..9ff07b05e2c1 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -27,10 +27,9 @@ class CachePolicy { uint64_t len; std::string version; bool dirty; - Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty) : key(key), offset(offset), - len(len), version(version), dirty(dirty) {} - }; - + Entry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty) : key(key), offset(offset), + len(len), version(version), dirty(dirty) {} + }; //The disposer object function struct Entry_delete_disposer { @@ -50,7 +49,11 @@ class CachePolicy { std::string bucket_name; rgw_obj_key obj_key; ObjEntry() = default; - ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag), bucket_name(bucket_name), obj_key(obj_key) {} + ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, + time_t creationTime, rgw_user user, std::string& etag, + const std::string& bucket_name, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), + creationTime(creationTime), user(user), etag(etag), + bucket_name(bucket_name), obj_key(obj_key) {} }; public: @@ -60,8 +63,10 @@ class CachePolicy { virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0; virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) = 0; - virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) = 0; + virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0; + virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, + time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, + const rgw_obj_key& obj_key, optional_yield y) = 0; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; virtual void cleaning(const DoutPrefixProvider* dpp) = 0; @@ -98,7 +103,8 @@ class LFUDAPolicy : public CachePolicy { using handle_type = boost::heap::fibonacci_heap>>::handle_type; handle_type handle; - LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty), localWeight(localWeight) {} + LFUDAEntry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty), + localWeight(localWeight) {} void set_handle(handle_type handle_) { handle = handle_; } }; @@ -107,7 +113,11 @@ class LFUDAPolicy : public CachePolicy { using handle_type = boost::heap::fibonacci_heap>>::handle_type; handle_type handle; - LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key) {} + LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, + time_t creationTime, rgw_user user, std::string& etag, + const std::string& bucket_name, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, + creationTime, user, etag, bucket_name, + obj_key) {} void set_handle(handle_type handle_) { handle = handle_; } }; @@ -126,12 +136,12 @@ class LFUDAPolicy : public CachePolicy { int age = 1, weightSum = 0, postedSum = 0; optional_yield y = null_yield; std::shared_ptr conn; - BlockDirectory* dir; + BlockDirectory* blockDir; + ObjectDirectory* objDir; rgw::cache::CacheDriver* cacheDriver; std::optional rthread_timer; rgw::sal::Driver* driver; std::thread tc; - CephContext* cct; CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y); int age_sync(const DoutPrefixProvider* dpp, optional_yield y); @@ -144,7 +154,7 @@ class LFUDAPolicy : public CachePolicy { rthread_timer->cancel(); } } - LFUDAEntry* find_entry(std::string key) { + LFUDAEntry* find_entry(const std::string& key) { auto it = entries_map.find(key); if (it == entries_map.end()) return nullptr; @@ -156,11 +166,13 @@ class LFUDAPolicy : public CachePolicy { conn(conn), cacheDriver(cacheDriver) { - dir = new BlockDirectory{conn}; + blockDir = new BlockDirectory{conn}; + objDir = new ObjectDirectory{conn}; } ~LFUDAPolicy() { rthread_stop(); - delete dir; + delete blockDir; + delete objDir; std::lock_guard l(lfuda_cleaning_lock); quit = true; cond.notify_all(); @@ -169,10 +181,12 @@ class LFUDAPolicy : public CachePolicy { virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver); virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; void save_y(optional_yield y) { this->y = y; } - virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override; + virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, + time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, + const rgw_obj_key& obj_key, optional_yield y) override; virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y); virtual void cleaning(const DoutPrefixProvider* dpp) override; LFUDAObjEntry* find_obj_entry(const std::string& key) { @@ -202,8 +216,10 @@ class LRUPolicy : public CachePolicy { virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; } virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; - virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override; - virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override; + virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override; + virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, + time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, + const rgw_obj_key& obj_key, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual void cleaning(const DoutPrefixProvider* dpp) override {} @@ -215,7 +231,7 @@ class PolicyDriver { CachePolicy* cachePolicy; public: - PolicyDriver(std::shared_ptr& conn, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName) + PolicyDriver(std::shared_ptr& conn, rgw::cache::CacheDriver* cacheDriver, const std::string& _policyName) : policyName(_policyName) { if (policyName == "lfuda") { cachePolicy = new LFUDAPolicy(conn, cacheDriver); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 140b9ef4bc43..ae30e0891cc0 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -65,7 +65,7 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) cfg.clientname = "D4N.Filter"; if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "D4NFilterDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl; + ldpp_dout(dpp, 0) << "D4NFilterDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl; return -EDESTADDRREQ; } @@ -74,8 +74,6 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) FilterDriver::initialize(cct, dpp); cacheDriver->initialize(dpp); - objDir->init(cct); - blockDir->init(cct); policyDriver->get_cache_policy()->init(cct, dpp, io_context, next); return 0; @@ -136,14 +134,14 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr } if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, *delattrs, y); ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl; return ret; } } //if delattrs != nullptr } else { auto ret = next->set_obj_attrs(dpp, setattrs, delattrs, y, flags); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl; return ret; } } @@ -198,7 +196,7 @@ bool D4NFilterObject::get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, op /* Set attributes locally */ auto ret = this->set_attrs(attrs); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl; } } // if found_in_cache = true @@ -240,7 +238,7 @@ int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_y version = bl.c_str(); ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl; } else { - ldpp_dout(dpp, 20) << __func__ << " Failed to find id tag" << dendl; + ldpp_dout(dpp, 0) << __func__ << " Failed to find id tag" << dendl; return -ENOENT; } } @@ -265,7 +263,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio .objName = this->get_name(), .bucketName = this->get_bucket()->get_name(), .dirty = dirty, - .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }, + .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }, }; rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{ @@ -274,7 +272,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio .version = this->get_object_version(), .dirty = dirty, .size = 0, - .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }, + .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }, }; ret = blockDir->set(dpp, &block, y); if (ret < 0) { @@ -290,7 +288,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio .objName = this->get_oid(), .bucketName = this->get_bucket()->get_name(), .dirty = dirty, - .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }, + .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }, }; rgw::d4n::CacheBlock version_block = rgw::d4n::CacheBlock{ @@ -299,8 +297,9 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio .version = this->get_object_version(), .dirty = dirty, .size = 0, - .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }, - }; + .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }, + }; + auto ret = blockDir->set(dpp, &version_block, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; @@ -325,8 +324,9 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide }; bool found_in_cache = true; + int ret = -1; //if the block corresponding to head object does not exist in directory, implies it is not cached - if (blockDir->exist_key(dpp, &block, y) && (blockDir->get(dpp, &block, y) == 0)) { + if ((ret = blockDir->get(dpp, &block, y) == 0)) { std::string version; version = block.version; this->set_object_version(version); @@ -347,9 +347,13 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide found_in_cache = false; ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl; } - } else { //if blockDir->exist_key + } else if (ret == -ENOENT) { //if blockDir->get + found_in_cache = false; + } else { found_in_cache = false; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed, ret=" << ret << dendl; } + return found_in_cache; } @@ -370,7 +374,7 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d if (!target_obj) { ret = -ENOENT; } - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl; return ret; } @@ -385,7 +389,7 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d ret = calculate_version(dpp, y, version); if (ret < 0 || version.empty()) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl; } head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name(); @@ -397,7 +401,7 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d bufferlist bl; ret = this->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y); } else { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to evict data with err: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to evict data, ret=" << ret << dendl; } } if (ret == 0) { @@ -405,10 +409,10 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y); ret = set_head_obj_dir_entry(dpp, y, is_latest_version); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; } } else { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend with error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend, ret=" << ret << dendl; } } @@ -530,7 +534,7 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix next->params = params; auto ret = next->prepare(y, dpp); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed with error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed, ret=" << ret << dendl; return ret; } if (params.part_num) { @@ -559,7 +563,7 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y); ret = source->set_head_obj_dir_entry(dpp, y, is_latest_version); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; } //write object to directory. rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ @@ -567,18 +571,18 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix .bucketName = source->get_bucket()->get_name(), .creationTime = std::to_string(ceph::real_clock::to_double(source->get_mtime())), .dirty = false, - .hostsList = { source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address } + .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address } }; - ret = source->driver->get_obj_dir()->set(&object, y); + ret = source->driver->get_obj_dir()->set(dpp, &object, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl; return ret; } } else { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): put for head object failed with error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): put for head object failed, ret=" << ret << dendl; } } else { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object, eviction returned error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): failed to cache head object during eviction, ret=" << ret << dendl; } } @@ -732,18 +736,22 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int " read_ofs: " << read_ofs << " part len: " << part_len << dendl; int ret = -1; - if (source->driver->get_block_dir()->exist_key(dpp, &block, y) > 0 && (ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { - auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address); + if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { + auto it = find(block.hostsList.begin(), block.hostsList.end(), dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + if (it != block.hostsList.end()) { /* Local copy */ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl; std::string key = oid_in_cache; ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.dirty << dendl; + if (block.dirty == true) { - key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case + key = "D_" + oid_in_cache; // we keep track of dirty data in the cache for the metadata failure case ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl; } + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl; ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl; + if (block.version == version) { if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { // Read From Cache @@ -756,23 +764,79 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int if (r < 0) { drain(dpp, y); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl; return r; } // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) } else { + int r = -1; + if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl; + + if ((block.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl; + // TODO: Retrieve remotely + // Policy decision: should we cache remote blocks locally? + } else { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + + auto r = drain(dpp, y); + + if (r < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; + return r; + } + + break; + } + } + // if (block.version == version) + } else { + // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + + auto r = drain(dpp, y); + + if (r < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; + return r; + } + } + // if (it != block.hostsList.end()) + } else if (block.hostsList.size()) { /* Remote copy */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl; + // TODO: Retrieve remotely + // Policy decision: should we cache remote blocks locally? + } + // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) + } else if (ret == -ENOENT) { + block.blockID = adjusted_start_ofs; + block.size = obj_max_req_size; + + if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) { + auto it = find(block.hostsList.begin(), block.hostsList.end(), dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + + if (it != block.hostsList.end()) { /* Local copy */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl; + + if (block.version == version) { oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size); + std::string key = oid_in_cache; + //for ranged requests, for last part, the whole part might exist in the cache - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << - " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << + " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { - if (block.dirty == true){ - key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case - } - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl; - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl; // Read From Cache + if (block.dirty == true){ + key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case + } + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl; + auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id); this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size))); @@ -781,11 +845,38 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int auto r = flush(dpp, std::move(completed), y); if (r < 0) { - drain(dpp, y); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; - return r; + drain(dpp, y); + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl; + return r; + } + // if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) + } else { + int r = -1; + if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl; + + if ((block.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl; + // TODO: Retrieve remotely + // Policy decision: should we cache remote blocks locally? + } else { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + + auto r = drain(dpp, y); + + if (r < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; + return r; + } + + break; } } + // if (it != block.hostsList.end()) + } else if (block.hostsList.size()) { /* Remote copy */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl; + // TODO: Retrieve remotely + // Policy decision: should we cache remote blocks locally? } // if (block.version == version) } else { @@ -796,25 +887,32 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int auto r = drain(dpp, y); if (r < 0) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; return r; } } - } else if (block.hostsList.size()) { /* Remote copy */ - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl; - // TODO: Retrieve remotely + // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) + } else if (ret == -ENOENT) { /* Fetch from backend */ + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + + auto r = drain(dpp, y); + if (r < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; + return r; + } + + break; } - // if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && int ret = source->driver->get_block_dir()->get(&block, y) == 0) + // else if (ret == -ENOENT) } else { /* Fetch from backend */ if (ret < 0) - ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << dendl; + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl; ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; auto r = drain(dpp, y); - if (r < 0) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl; return r; } @@ -852,7 +950,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int auto r = next->iterate(dpp, ofs, end, this->cb.get(), y); if (r < 0) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, ret=" << r << dendl; return r; } @@ -866,7 +964,7 @@ int D4NFilterObject::D4NFilterReadOp::get_attr(const DoutPrefixProvider* dpp, co rgw_obj obj = source->get_obj(); auto ret = source->get_obj_attrs(y, dpp, &obj); if (ret < 0) { - ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Error: failed to fetch attrs, ret= " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Error: failed to fetch attrs, ret=" << ret << dendl; return ret; } //get_obj_attrs() calls set_attrs() internally, hence get_attrs() can be invoked to get the latest attrs. @@ -908,7 +1006,6 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl rgw::d4n::CacheBlock block, existing_block; rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); - block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); block.cacheObj.objName = source->get_key().get_oid(); block.cacheObj.bucketName = source->get_bucket()->get_name(); std::stringstream s; @@ -937,41 +1034,36 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y); /* Store block in directory */ - if (!blockDir->exist_key(dpp, &block, *y)) { - if (blockDir->set(dpp, &block, *y) < 0) //should we revert previous steps if this step fails? - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - existing_block.blockID = block.blockID; - existing_block.size = block.size; - existing_block.dirty = block.dirty; - if (blockDir->get(dpp, &existing_block, *y) < 0) { - ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; - } else { - if (existing_block.version != block.version) { - if (blockDir->del(dpp, &existing_block, *y) < 0) //delete existing block - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(dpp, &block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl; - } + existing_block.blockID = block.blockID; + existing_block.size = block.size; + + if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) { + if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? + block = existing_block; + block.version = version; } + + block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + + if ((ret = blockDir->set(dpp, &block, *y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; + } else { + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; } //write object to directory. rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ .objName = source->get_oid(), .bucketName = source->get_bucket()->get_name(), .dirty = dirty, - .hostsList = { source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address } + .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address } }; - ret = source->driver->get_obj_dir()->set(&object, *y); + ret = source->driver->get_obj_dir()->set(dpp, &object, *y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl; return ret; } } else { - ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl; } } } @@ -988,28 +1080,24 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y); /* Store block in directory */ - if (!blockDir->exist_key(dpp, &block, *y)) { - if (blockDir->set(dpp, &block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - existing_block.blockID = block.blockID; - existing_block.size = block.size; - existing_block.dirty = block.dirty; - if (blockDir->get(dpp, &existing_block, *y) < 0) { - ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; - } - if (existing_block.version != block.version) { - if (blockDir->del(dpp, &existing_block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(dpp, &block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl; + existing_block.blockID = block.blockID; + existing_block.size = block.size; + + if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) { + if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? + block = existing_block; + block.version = version; } + + block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + + if ((ret = blockDir->set(dpp, &block, *y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; + } else { + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; } } else { - ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl; } } } @@ -1034,32 +1122,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, *y); /* Store block in directory */ - if (!blockDir->exist_key(dpp, &block, *y)) { - if (blockDir->set(dpp, &block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - existing_block.blockID = block.blockID; - existing_block.size = block.size; - existing_block.dirty = block.dirty; - if (blockDir->get(dpp, &existing_block, *y) < 0) { - ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; - } else { - if (existing_block.version != block.version) { - if (blockDir->del(dpp, &existing_block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(dpp, &block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; - } + existing_block.blockID = block.blockID; + existing_block.size = block.size; + + if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) { + if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? + block = existing_block; + block.version = version; } - } + + block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + + if ((ret = blockDir->set(dpp, &block, *y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; + } else { + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; + } } else { - ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl; } } else { - ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured during eviction: " << " error: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4N Filter: " << __func__ << " An error occured during eviction, ret=" << ret << dendl; } } @@ -1097,8 +1180,9 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp .bucketName = source->get_bucket()->get_name() }; - if (source->driver->get_obj_dir()->del(&obj, y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory del method failed." << dendl; + int ret = -1; + if ((ret = source->driver->get_obj_dir()->del(dpp, &obj, y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory del() method failed, ret=" << ret << dendl; Attrs::iterator attrs; Attrs currentattrs = source->get_attrs(); @@ -1109,8 +1193,8 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp currentFields.push_back(attrs->first); } - if (source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver del method failed." << dendl; + if ((ret = source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver del() method failed, ret=" << ret << dendl; return next->delete_obj(dpp, y, flags); } @@ -1119,13 +1203,17 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp int D4NFilterWriter::prepare(optional_yield y) { startTime = time(NULL); - if (driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl; + + int ret = -1; + if ((ret = driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data() method failed, ret=" << ret << dendl; + d4n_writecache = g_conf()->d4n_writecache_enabled; if (!d4n_writecache){ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl; return next->prepare(y); } + if (!object->have_instance()) { if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended this->version = "null"; @@ -1140,6 +1228,7 @@ int D4NFilterWriter::prepare(optional_yield y) } else { ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): " << "version is: " << object->get_instance() << dendl; } + return 0; } @@ -1161,15 +1250,14 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name(); rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir(); - block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); + block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); block.cacheObj.bucketName = obj->get_bucket()->get_name(); block.cacheObj.objName = obj->get_key().get_oid(); block.cacheObj.dirty = dirty; - block.cacheObj.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); + block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); existing_block.cacheObj.objName = block.cacheObj.objName; existing_block.cacheObj.bucketName = block.cacheObj.bucketName; - int ret = 0; if (!d4n_writecache) { @@ -1182,7 +1270,7 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) block.size = bl.length(); block.blockID = ofs; block.dirty = true; - block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); + block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); block.version = version; dirty = true; ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, y); @@ -1193,28 +1281,26 @@ int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) if (ret == 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl; driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y); - if (!blockDir->exist_key(dpp, &block, y)) { - if (blockDir->set(dpp, &block, y) < 0) //should we revert previous steps if this step fails? - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - existing_block.blockID = block.blockID; - existing_block.size = block.size; - if (blockDir->get(dpp, &existing_block, y) < 0) { - ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; - } else { - if (existing_block.version != block.version) { - if (blockDir->del(dpp, &existing_block, y) < 0) //delete existing block - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl; - if (blockDir->set(dpp, &block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl; - } else { - if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl; - } + + /* Store block in directory */ + existing_block.blockID = block.blockID; + existing_block.size = block.size; + + if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) { + if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight? + block = existing_block; + block.version = version; } + + block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); + + if ((ret = blockDir->set(dpp, &block, y)) < 0) + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl; + } else { + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl; } } else { - ldpp_dout(dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl; + ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed, ret=" << ret << dendl; return ret; } } @@ -1235,7 +1321,7 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, uint32_t flags) { bool dirty = false; - std::vector hostsList = {}; + std::unordered_set hostsList = {}; auto creationTime = startTime; std::string objEtag = etag; bool write_to_backend_store = false; @@ -1275,13 +1361,13 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y); ret = object->set_head_obj_dir_entry(dpp, y, true, true); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; return ret; } driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, version, dirty, accounted_size, creationTime, std::get(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y); //write object to directory. - hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }; + hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }; rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ .objName = obj->get_oid(), .bucketName = obj->get_bucket()->get_name(), @@ -1289,15 +1375,15 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, .dirty = dirty, .hostsList = hostsList }; - ret = driver->get_obj_dir()->set(&object, y); + ret = driver->get_obj_dir()->set(dpp, &object, y); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed, ret=" << ret << dendl; return ret; } } else { //if get_cache_driver()->put() write_to_backend_store = true; - ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " put failed for head_oid_in_cache wih error: " << ret << dendl; - ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " calling complete of backend store: " << dendl; + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << " put failed for head_oid_in_cache, ret=" << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << " calling complete of backend store: " << dendl; } } else { // if d4n_writecache = true write_to_backend_store = true; @@ -1308,7 +1394,7 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, delete_at, if_match, if_nomatch, user_data, zones_trace, canceled, rctx, flags); if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): Writing to backend store failed with err: " << ret << dendl; + ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): Writing to backend store failed, ret=" << ret << dendl; } } diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index c30dfc3b41e2..d785a405a19d 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -8,9 +8,9 @@ namespace rgw { namespace cache { -std::list build_attrs(const rgw::sal::Attrs& binary) +std::vector build_attrs(const rgw::sal::Attrs& binary) { - std::list values; + std::vector values; /* Convert to vector */ if (!binary.empty()) { @@ -51,8 +51,11 @@ auto async_exec(std::shared_ptr conn, initiate_exec{std::move(conn)}, token, req, resp); } -template -void redis_exec(std::shared_ptr conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response& resp, optional_yield y) +template +void redis_exec(std::shared_ptr conn, + boost::system::error_code& ec, + const boost::redis::request& req, + boost::redis::response& resp, optional_yield y) { if (y) { auto yield = y.get_yield_context(); @@ -76,7 +79,7 @@ int RedisDriver::initialize(const DoutPrefixProvider* dpp) cfg.clientname = "RedisDriver"; if (!cfg.addr.host.length() || !cfg.addr.port.length()) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl; return -EDESTADDRREQ; } @@ -92,7 +95,7 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, cons /* Every set will be treated as new */ try { boost::system::error_code ec; - response resp; + response resp; auto redisAttrs = build_attrs(attrs); if (bl.length()) { @@ -101,15 +104,16 @@ int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, cons } request req; - req.push_range("HMSET", entry, redisAttrs); + req.push_range("HSET", entry, redisAttrs); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } @@ -124,17 +128,23 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_ /* Retrieve existing values from cache */ try { boost::system::error_code ec; - response< std::map > resp; + response< std::optional> > resp; request req; req.push("HGETALL", entry); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } - for (auto const& it : std::get<0>(resp).value()) { + if (std::get<0>(resp).value().value().empty()) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No values returned." << dendl; + return -ENOENT; + } + + for (auto const& it : std::get<0>(resp).value().value()) { if (it.first == "data") { bl.append(it.second); } else { @@ -145,7 +155,7 @@ int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_ } } } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } @@ -159,121 +169,78 @@ int RedisDriver::del(const DoutPrefixProvider* dpp, const std::string& key, opti try { boost::system::error_code ec; + response< + ignore_t, + ignore_t, + ignore_t, + response, std::optional> + > resp; request req; - req.push("HEXISTS", entry, "data"); + req.push("MULTI"); + req.push("HSTRLEN", entry, "data"); + req.push("DEL", entry); + req.push("EXEC"); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } + + this->free_space += std::get<0>(std::get<3>(resp).value()).value().value(); } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } - if (std::get<0>(resp).value()) { - response data; - response ret; + return 0; +} + +int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) +{ + std::string value = ""; + std::string entry = partition_info.location + key; - try { + try { + { boost::system::error_code ec; + response< std::optional > resp; request req; req.push("HGET", entry, "data"); - redis_exec(conn, ec, req, data, y); + redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; + + if (std::get<0>(resp).value().value().empty()) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No data entry found." << dendl; + } else { + value = std::get<0>(resp).value().value(); + } } - try { + { + /* Append to existing value or set as new value */ boost::system::error_code ec; + response resp; + std::string newVal = value + bl_data.to_str(); + request req; - req.push("DEL", entry); + req.push("HSET", entry, "data", newVal); - redis_exec(conn, ec, req, ret, y); + redis_exec(conn, ec, req, resp, y); - if (!std::get<0>(ret).value()) { - return -ENOENT; - } else if (ec) { + if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - this->free_space += std::get<0>(data).value().length(); - } - - return 0; -} - -int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) -{ - response exists; - std::string value; - std::string entry = partition_info.location + key; - - try { - boost::system::error_code ec; - request req; - req.push("HEXISTS", entry, "data"); - - redis_exec(conn, ec, req, exists, y); - - if (ec) { - return -ec.value(); - } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - if (!std::get<0>(exists).value()) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Data field was not found." << dendl; - return -ENOENT; - } - - try { - boost::system::error_code ec; - response resp; - request req; - req.push("HGET", entry, "data"); - - redis_exec(conn, ec, req, resp, y); - - if (ec) { - return -ec.value(); - } - - value = std::get<0>(resp).value(); - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - try { - /* Append to existing value or set as new value */ - boost::system::error_code ec; - response resp; - std::string newVal = value + bl_data.to_str(); - - request req; - req.push("HMSET", entry, "data", newVal); - - redis_exec(conn, ec, req, resp, y); - - if (ec) { - return -ec.value(); } } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } @@ -284,62 +251,34 @@ int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) { std::string entry = partition_info.location + key; - response resp; try { boost::system::error_code ec; request req; - req.push("HEXISTS", entry, "data"); + response< + ignore_t, + ignore_t, + ignore_t, + response, std::optional> + > resp; + req.push("MULTI"); + req.push("HSTRLEN", entry, "data"); + req.push("HDEL", entry, "data"); + req.push("EXEC"); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } + + this->free_space += std::get<0>(std::get<3>(resp).value()).value().value(); } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } - if (std::get<0>(resp).value()) { - response data; - response ret; - - try { - boost::system::error_code ec; - request req; - req.push("HGET", entry, "data"); - - redis_exec(conn, ec, req, data, y); - - if (ec) { - return -ec.value(); - } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - try { - boost::system::error_code ec; - request req; - req.push("HDEL", entry, "data"); - - redis_exec(conn, ec, req, ret, y); - - if (!std::get<0>(ret).value()) { - return -ENOENT; - } else if (ec) { - return -ec.value(); - } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - this->free_space += std::get<0>(data).value().length(); - } - return 0; } @@ -372,18 +311,24 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key try { boost::system::error_code ec; - response< std::map > resp; + response< std::optional> > resp; request req; req.push("HGETALL", entry); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } - for (auto const& it : std::get<0>(resp).value()) { - if (it.first != "data") { + if (std::get<0>(resp).value().value().empty()) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No values returned." << dendl; + return -ENOENT; + } + + for (auto const& it : std::get<0>(resp).value().value()) { + if (it.first != "data") { /* Ignore data */ buffer::list bl_value; bl_value.append(it.second); attrs.insert({it.first, bl_value}); @@ -391,7 +336,7 @@ int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key } } } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } @@ -408,20 +353,20 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key /* Every attr set will be treated as new */ try { boost::system::error_code ec; - response resp; - std::string result; - std::list redisAttrs = build_attrs(attrs); - + response resp; request req; - req.push_range("HMSET", entry, redisAttrs); + auto redisAttrs = build_attrs(attrs); + + req.push_range("HSET", entry, redisAttrs); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } @@ -430,27 +375,7 @@ int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) { - std::string entry = partition_info.location + key; - - try { - boost::system::error_code ec; - response resp; - auto redisAttrs = build_attrs(attrs); - - request req; - req.push_range("HMSET", entry, redisAttrs); - - redis_exec(conn, ec, req, resp, y); - - if (ec) { - return -ec.value(); - } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - return 0; + return set_attrs(dpp, key, attrs, y); } int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs, optional_yield y) @@ -459,7 +384,7 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& try { boost::system::error_code ec; - response resp; + response resp; auto redisAttrs = build_attrs(del_attrs); request req; @@ -467,110 +392,71 @@ int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& redis_exec(conn, ec, req, resp, y); - if (!std::get<0>(resp).value()) { - return -ENOENT; - } else if (ec) { + if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } - - return std::get<0>(resp).value(); } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } + + return 0; } int RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, std::string& attr_val, optional_yield y) { std::string entry = partition_info.location + key; - response value; - response resp; - attr_val = ""; + response< std::optional > resp; - /* Ensure field was set */ try { boost::system::error_code ec; request req; - req.push("HEXISTS", entry, attr_name); + req.push("HGET", entry, attr_name); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - if (!std::get<0>(resp).value()) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Attribute was not found." << dendl; - return -ENOENT; - } - - /* Retrieve existing value from cache */ - try { - boost::system::error_code ec; - request req; - req.push("HGET", entry, attr_name); - - redis_exec(conn, ec, req, value, y); - if (ec) { - return -ec.value(); + if (std::get<0>(resp).value().value().empty()) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No value returned." << dendl; + return -ENOENT; } } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } - if (!std::get<0>(resp).value()) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Attribute was not found." << dendl; - return -ENOENT; - } - - /* Retrieve existing value from cache */ - try { - boost::system::error_code ec; - request req; - req.push("HGET", entry, attr_name); - - redis_exec(conn, ec, req, value, y); - - if (ec) { - return -ec.value(); - } - } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; - return -EINVAL; - } - - attr_val = std::get<0>(value).value(); + attr_val = std::get<0>(resp).value().value(); return 0; } int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) { std::string entry = partition_info.location + key; - response resp; /* Every attr set will be treated as new */ try { boost::system::error_code ec; + response resp; request req; req.push("HSET", entry, attr_name, attr_val); redis_exec(conn, ec, req, resp, y); if (ec) { + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl; return -ec.value(); } } catch (std::exception &e) { - ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; + ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl; return -EINVAL; } - return std::get<0>(resp).value(); + return 0; } Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr conn, @@ -610,7 +496,7 @@ Aio::OpFunc RedisDriver::redis_write_op(optional_yield y, std::shared_ptr(); auto& resp = s->resp; auto& req = s->req; - req.push_range("HMSET", key, redisAttrs); + req.push_range("HSET", key, redisAttrs); conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s})); }; diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index da5203b3cb37..8496a4388751 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -14,6 +14,7 @@ using boost::redis::config; using boost::redis::connection; using boost::redis::request; using boost::redis::response; +using boost::redis::ignore_t; class RedisDriver : public CacheDriver { public: @@ -54,8 +55,8 @@ class RedisDriver : public CacheDriver { uint64_t outstanding_write_size; struct redis_response { - boost::redis::request req; - boost::redis::response resp; + request req; + boost::redis::generic_response resp; }; struct redis_aio_handler { @@ -73,7 +74,7 @@ class RedisDriver : public CacheDriver { /* Only append data for GET call */ if (s->req.payload().find("HGET") != std::string::npos) { - r.data.append(std::get<0>(s->resp).value()); + r.data.append((s->resp).value().at(0).value); } throttle->put(r); diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index d06286891ea8..bb6f0b4cc850 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -63,8 +63,6 @@ class ObjectDirectoryFixture: public ::testing::Test { ASSERT_NE(dir, nullptr); ASSERT_NE(conn, nullptr); - dir->init(env->cct); - /* Run fixture's connection */ config cfg; cfg.addr.host = env->redisHost.substr(0, env->redisHost.find(":")); @@ -111,8 +109,6 @@ class BlockDirectoryFixture: public ::testing::Test { ASSERT_NE(dir, nullptr); ASSERT_NE(conn, nullptr); - dir->init(env->cct); - /* Run fixture's connection */ config cfg; cfg.addr.host = env->redisHost.substr(0, env->redisHost.find(":")); @@ -132,10 +128,10 @@ class BlockDirectoryFixture: public ::testing::Test { net::io_context io; std::shared_ptr conn; - std::vector vals{"0", "", "0", "0", env->redisHost, + std::vector vals{"0", "", "0", "0", "0", env->redisHost, "testName", "testBucket", "", "0", env->redisHost}; - std::vector fields{"blockID", "version", "size", "globalWeight", "blockHosts", - "objName", "bucketName", "creationTime", "dirty", "objHosts"}; + std::vector fields{"blockID", "version", "dirtyBlock", "size", "globalWeight", "blockHosts", + "objName", "bucketName", "creationTime", "dirtyObj", "objHosts"}; }; void rethrow(std::exception_ptr eptr) { @@ -145,7 +141,7 @@ void rethrow(std::exception_ptr eptr) { TEST_F(ObjectDirectoryFixture, SetYield) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, yield)); + ASSERT_EQ(0, dir->set(env->dpp, obj, yield)); boost::system::error_code ec; request req; @@ -168,7 +164,7 @@ TEST_F(ObjectDirectoryFixture, SetYield) TEST_F(ObjectDirectoryFixture, GetYield) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, yield)); + ASSERT_EQ(0, dir->set(env->dpp, obj, yield)); { boost::system::error_code ec; @@ -182,7 +178,7 @@ TEST_F(ObjectDirectoryFixture, GetYield) EXPECT_EQ(std::get<0>(resp).value(), 0); } - ASSERT_EQ(0, dir->get(obj, yield)); + ASSERT_EQ(0, dir->get(env->dpp, obj, yield)); EXPECT_EQ(obj->objName, "newoid"); { @@ -204,8 +200,8 @@ TEST_F(ObjectDirectoryFixture, GetYield) TEST_F(ObjectDirectoryFixture, CopyYield) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, yield)); - ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", yield)); + ASSERT_EQ(0, dir->set(env->dpp, obj, yield)); + ASSERT_EQ(0, dir->copy(env->dpp, obj, "copyTestName", "copyBucketName", yield)); boost::system::error_code ec; request req; @@ -236,7 +232,7 @@ TEST_F(ObjectDirectoryFixture, CopyYield) TEST_F(ObjectDirectoryFixture, DelYield) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, yield)); + ASSERT_EQ(0, dir->set(env->dpp, obj, yield)); { boost::system::error_code ec; @@ -250,7 +246,7 @@ TEST_F(ObjectDirectoryFixture, DelYield) EXPECT_EQ(std::get<0>(resp).value(), 1); } - ASSERT_EQ(0, dir->del(obj, yield)); + ASSERT_EQ(0, dir->del(env->dpp, obj, yield)); { boost::system::error_code ec; @@ -274,9 +270,9 @@ TEST_F(ObjectDirectoryFixture, DelYield) TEST_F(ObjectDirectoryFixture, UpdateFieldYield) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, yield)); - ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", yield)); - ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", yield)); + ASSERT_EQ(0, dir->set(env->dpp, obj, yield)); + ASSERT_EQ(0, dir->update_field(env->dpp, obj, "objName", "newTestName", yield)); + ASSERT_EQ(0, dir->update_field(env->dpp, obj, "objHosts", "127.0.0.1:5000", yield)); boost::system::error_code ec; request req; @@ -378,8 +374,8 @@ TEST_F(BlockDirectoryFixture, CopyYield) EXPECT_EQ(std::get<0>(resp).value(), 1); auto copyVals = vals; - copyVals[5] = "copyTestName"; - copyVals[6] = "copyBucketName"; + copyVals[6] = "copyTestName"; + copyVals[7] = "copyBucketName"; EXPECT_EQ(std::get<1>(resp).value(), copyVals); conn->cancel(); @@ -456,7 +452,7 @@ TEST_F(BlockDirectoryFixture, UpdateFieldYield) TEST_F(BlockDirectoryFixture, RemoveHostYield) { boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { - block->hostsList.push_back("127.0.0.1:6000"); + block->hostsList.insert("127.0.0.1:6000"); ASSERT_EQ(0, dir->set(env->dpp, block, optional_yield{yield})); ASSERT_EQ(0, dir->remove_host(env->dpp, block, "127.0.0.1:6379", optional_yield{yield})); diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index 4c57fb0d7fd0..ea233227cbe3 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -130,7 +130,7 @@ class LFUDAPolicyFixture : public ::testing::Test { } } } else if (!exists) { /* No remote copy */ - block->hostsList.push_back(dir->cct->_conf->rgw_d4n_l1_datacache_address); + block->hostsList.insert(env->dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address); if (dir->set(env->dpp, block, y) < 0) return -1; @@ -235,8 +235,8 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */ block->hostsList.clear(); block->cacheObj.hostsList.clear(); - block->hostsList.push_back("127.0.0.1:6000"); - block->cacheObj.hostsList.push_back("127.0.0.1:6000"); + block->hostsList.insert("127.0.0.1:6000"); + block->cacheObj.hostsList.insert("127.0.0.1:6000"); ASSERT_EQ(0, dir->set(env->dpp, block, optional_yield{yield}));