to policy driver, redis driver, directory and filter driver.
1. d4n/directory: Use boost::split for simpler code
2. rgw: Lower log levels for failures in D4N and redis cache files
3. rgw: Add dpp and logs to directory, cache, and policy
4. rgw: Reduce Redis calls and fix workflow
5. qa/d4n: Remove D4N task and add S3 user creation to workunit driver script
6. d4n: Use Redis transactions to serialize consecutive requests for safe
data handling and faster completion
7. d4n/directory: Remove boost lexical_cast calls
8. rgw/d4n: Add return values to error logs
9. rgw/d4n: Change directory hostsList to use `unordered_set`
10. d4n/filter: Simplify logic for storing block in `handle_data`
11. rgw/policy: Properly delete `LFUDAEntry` instances
12. rgw/d4n: Add support for `dirty` block metadata, `check_bool` for consistent values, and fix directory updates in `cleanup` method
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
client.0:
- sudo chmod 0777 /var/lib/ceph
- sudo chmod 0777 /var/lib/ceph/radosgw
-- d4ntests:
- client.0:
- workunit:
clients:
client.0:
+++ /dev/null
-import logging
-
-from teuthology import misc as teuthology
-from teuthology.task import Task
-from teuthology.packaging import remove_package
-
-log = logging.getLogger(__name__)
-
-display_name='Foo'
-email='foo@foo.com'
-access_key='test3'
-secret_key='test3'
-
-class D4NTests(Task):
-
- def __init__(self, ctx, config):
- super(D4NTests, self).__init__(ctx, config)
- self.log = log
- log.info('D4N Tests: __INIT__ ')
-
- clients = ['client.{id}'.format(id=id_)
- for id_ in teuthology.all_roles_of_type(self.ctx.cluster, 'client')]
- self.all_clients = []
- for client in clients:
- if client in self.config:
- self.all_clients.extend([client])
- if self.all_clients is None:
- self.all_clients = 'client.0'
-
- self.user = {'s3main': 'tester'}
-
- def setup(self):
- super(D4NTests, self).setup()
- log.info('D4N Tests: SETUP')
-
- def begin(self):
- super(D4NTests, self).begin()
- log.info('D4N Tests: BEGIN')
-
- for (host, roles) in self.ctx.cluster.remotes.items():
- log.debug('D4N Tests: Cluster config is: {cfg}'.format(cfg=roles))
- log.debug('D4N Tests: Host is: {host}'.format(host=host))
-
- self.create_user()
-
- def end(self):
- super(D4NTests, self).end()
- log.info('D4N Tests: END')
-
- for client in self.all_clients:
- self.remove_packages(client)
- self.delete_user(client)
-
- def create_user(self):
- log.info("D4N Tests: Creating S3 user...")
- testdir = teuthology.get_testdir(self.ctx)
-
- for client in self.all_clients:
- for user in list(self.user.items()):
- s3_user_id = 's3main'
- log.debug(
- 'D4N Tests: Creating user {s3_user_id}'.format(s3_user_id=s3_user_id))
- cluster_name, daemon_type, client_id = teuthology.split_role(
- client)
- client_with_id = daemon_type + '.' + client_id
- self.ctx.cluster.only(client).run(
- args=[
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'radosgw-admin',
- '-n', client_with_id,
- 'user', 'create',
- '--uid', s3_user_id,
- '--display-name', display_name,
- '--access-key', access_key,
- '--secret', secret_key,
- '--email', email,
- '--cluster', cluster_name,
- ],
- )
-
- def remove_packages(self, client):
- (remote,) = self.ctx.cluster.only(client).remotes.keys()
- remove_package('s3cmd', remote)
-
- def delete_user(self, client):
- log.info("D4N Tests: Deleting S3 user...")
- testdir = teuthology.get_testdir(self.ctx)
-
- for user in self.user.items():
- s3_user_id = 's3main'
- self.ctx.cluster.only(client).run(
- args=[
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'radosgw-admin',
- '-n', client,
- 'user', 'rm',
- '--uid', s3_user_id,
- '--purge-data',
- '--cluster', 'ceph',
- ],
- )
-
-task = D4NTests
pip install configobj
pip install boto3
+# create user
+radosgw-admin user create --uid=test3 --display-name=test3 --access-key=test3 --secret-key=test3 2>/dev/null
+
# run test
$mydir/bin/python3 $mydir/test_rgw_d4n.py
#include <boost/asio/consign.hpp>
+#include <boost/algorithm/string.hpp>
#include "common/async/blocked_completion.h"
#include "common/dout.h"
#include "d4n_directory.h"
initiate_exec{std::move(conn)}, token, req, resp);
}
-template <typename T>
+template <typename... Types>
void redis_exec(std::shared_ptr<connection> conn,
boost::system::error_code& ec,
const boost::redis::request& req,
- boost::redis::response<T>& resp, optional_yield y)
+ boost::redis::response<Types...>& resp, optional_yield y)
{
if (y) {
auto yield = y.get_yield_context();
}
}
+int check_bool(std::string str) {
+ if (str == "true" || str == "1") {
+ return 1;
+ } else if (str == "false" || str == "0") {
+ return 0;
+ } else {
+ return -EINVAL;
+ }
+}
+
std::string ObjectDirectory::build_index(CacheObj* object)
{
return object->bucketName + "_" + object->objName;
}
-int ObjectDirectory::exist_key(CacheObj* object, optional_yield y)
+int ObjectDirectory::exist_key(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
{
std::string key = build_index(object);
response<int> resp;
redis_exec(conn, ec, req, resp, y);
- if (ec)
- return false;
- } catch (std::exception &e) {}
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
return std::get<0>(resp).value();
}
-int ObjectDirectory::set(CacheObj* object, optional_yield y)
+int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
{
+ /* For existing keys, call get method beforehand.
+ Sets completely overwrite existing values. */
std::string key = build_index(object);
-
- /* Every set will be treated as new */
+
std::string endpoint;
std::list<std::string> redisValues;
redisValues.push_back("creationTime");
redisValues.push_back(object->creationTime);
redisValues.push_back("dirty");
+ int ret = -1;
+ if ((ret = check_bool(std::to_string(object->dirty))) != -EINVAL) {
+ object->dirty = (ret != 0);
+ } else {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl;
+ return -EINVAL;
+ }
redisValues.push_back(std::to_string(object->dirty));
redisValues.push_back("objHosts");
try {
boost::system::error_code ec;
+ response<ignore_t> resp;
request req;
- req.push_range("HMSET", key, redisValues);
- response<std::string> resp;
+ req.push_range("HSET", key, redisValues);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
} catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
return 0;
}
-int ObjectDirectory::get(CacheObj* object, optional_yield y)
+int ObjectDirectory::get(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
{
std::string key = build_index(object);
+ std::vector<std::string> fields;
- if (exist_key(object, y)) {
- std::vector<std::string> fields;
+ fields.push_back("objName");
+ fields.push_back("bucketName");
+ fields.push_back("creationTime");
+ fields.push_back("dirty");
+ fields.push_back("objHosts");
- fields.push_back("objName");
- fields.push_back("bucketName");
- fields.push_back("creationTime");
- fields.push_back("dirty");
- fields.push_back("objHosts");
-
- try {
- boost::system::error_code ec;
- request req;
- req.push_range("HMGET", key, fields);
- response< std::vector<std::string> > resp;
-
- redis_exec(conn, ec, req, resp, y);
-
- if (std::get<0>(resp).value().empty()) {
- return -ENOENT;
- } else if (ec) {
- return -ec.value();
- }
+ try {
+ boost::system::error_code ec;
+ response< std::vector<std::string> > resp;
+ request req;
+ req.push_range("HMGET", key, fields);
- object->objName = std::get<0>(resp).value()[0];
- object->bucketName = std::get<0>(resp).value()[1];
- object->creationTime = std::get<0>(resp).value()[2];
- object->dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[3]);
+ redis_exec(conn, ec, req, resp, y);
- {
- std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
- while (!ss.eof()) {
- std::string host;
- std::getline(ss, host, '_');
- object->hostsList.push_back(host);
- }
- }
- } catch (std::exception &e) {
- return -EINVAL;
+ if (std::get<0>(resp).value().empty()) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values returned." << dendl;
+ return -ENOENT;
}
- } else {
- return -ENOENT;
+
+ object->objName = std::get<0>(resp).value()[0];
+ object->bucketName = std::get<0>(resp).value()[1];
+ object->creationTime = std::get<0>(resp).value()[2];
+ object->dirty = std::stoi(std::get<0>(resp).value()[3]);
+ boost::split(object->hostsList, std::get<0>(resp).value()[4], boost::is_any_of("_"));
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
return 0;
}
/* Note: This method is not compatible for use on Ubuntu systems. */
-int ObjectDirectory::copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y)
+int ObjectDirectory::copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y)
{
std::string key = build_index(object);
auto copyObj = CacheObj{ .objName = copyName, .bucketName = copyBucketName };
std::string copyKey = build_index(©Obj);
- if (exist_key(object, y)) {
- try {
- response<int> resp;
-
- {
- boost::system::error_code ec;
- request req;
- req.push("COPY", key, copyKey);
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec) {
- return -ec.value();
- }
- }
-
- {
- boost::system::error_code ec;
- request req;
- req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
- response<std::string> res;
+ try {
+ boost::system::error_code ec;
+ response<
+ ignore_t,
+ ignore_t,
+ ignore_t,
+ response<std::optional<int>, std::optional<int>>
+ > resp;
+ request req;
+ req.push("MULTI");
+ req.push("COPY", key, copyKey);
+ req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
+ req.push("EXEC");
- redis_exec(conn, ec, req, res, y);
+ redis_exec(conn, ec, req, resp, y);
- if (ec) {
- return -ec.value();
- }
- }
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
- return std::get<0>(resp).value() - 1;
- } catch (std::exception &e) {
- return -EINVAL;
+ if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) {
+ return 0;
+ } else {
+ return -ENOENT;
}
- } else {
- return -ENOENT;
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
}
-int ObjectDirectory::del(CacheObj* object, optional_yield y)
+int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y)
{
std::string key = build_index(object);
- if (exist_key(object, y)) {
- try {
- boost::system::error_code ec;
- request req;
- req.push("DEL", key);
- response<int> resp;
-
- redis_exec(conn, ec, req, resp, y);
+ try {
+ boost::system::error_code ec;
+ response<ignore_t> resp;
+ request req;
+ req.push("DEL", key);
- if (ec) {
- return -ec.value();
- }
+ redis_exec(conn, ec, req, resp, y);
- return std::get<0>(resp).value() - 1;
- } catch (std::exception &e) {
- return -EINVAL;
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
}
- } else {
- return 0; /* No delete was necessary */
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
+
+ return 0;
}
-int ObjectDirectory::update_field(CacheObj* object, std::string field, std::string value, optional_yield y)
+int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y)
{
std::string key = build_index(object);
- if (exist_key(object, y)) {
+ if (exist_key(dpp, object, y)) {
try {
- /* Ensure field exists */
- {
- boost::system::error_code ec;
- request req;
- req.push("HEXISTS", key, field);
- response<int> resp;
-
- redis_exec(conn, ec, req, resp, y);
-
- if (!std::get<0>(resp).value()) {
- return -ENOENT;
- } else if (ec) {
- return -ec.value();
- }
- }
-
if (field == "objHosts") {
/* Append rather than overwrite */
+ ldpp_dout(dpp, 20) << "ObjectDirectory::" << __func__ << "() Appending to hosts list." << dendl;
+
boost::system::error_code ec;
+ response<std::string> resp;
request req;
req.push("HGET", key, field);
- response<std::string> resp;
redis_exec(conn, ec, req, resp, y);
- if (std::get<0>(resp).value().empty()) {
- return -ENOENT;
- } else if (ec) {
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
+ /* If entry exists, it should have at least one host */
std::get<0>(resp).value() += "_";
std::get<0>(resp).value() += value;
value = std::get<0>(resp).value();
+ } else if (field == "dirty") {
+ int ret = -1;
+ if ((ret = check_bool(value)) != -EINVAL) {
+ value = std::to_string((ret != 0));
+ } else {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl;
+ return -EINVAL;
+ }
}
- {
- boost::system::error_code ec;
- request req;
- req.push_range("HSET", key, std::map<std::string, std::string>{{field, value}});
- response<int> resp;
-
- redis_exec(conn, ec, req, resp, y);
+ boost::system::error_code ec;
+ response<ignore_t> resp;
+ request req;
+ req.push("HSET", key, field, value);
- if (ec) {
- return -ec.value();
- }
+ redis_exec(conn, ec, req, resp, y);
- return std::get<0>(resp).value();
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
}
+
+ return 0;
} catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
} else {
redis_exec(conn, ec, req, resp, y);
- if (ec)
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return false;
- } catch (std::exception &e) {}
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
return std::get<0>(resp).value();
}
int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
{
+ /* For existing keys, call get method beforehand.
+ Sets completely overwrite existing values. */
std::string key = build_index(block);
- /* Every set will be treated as new */
std::string endpoint;
std::list<std::string> redisValues;
redisValues.push_back(std::to_string(block->blockID));
redisValues.push_back("version");
redisValues.push_back(block->version);
+ redisValues.push_back("dirtyBlock");
+ int ret = -1;
+ if ((ret = check_bool(std::to_string(block->dirty))) != -EINVAL) {
+ block->dirty = (ret != 0);
+ } else {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl;
+ return -EINVAL;
+ }
+ redisValues.push_back(std::to_string(block->dirty));
redisValues.push_back("size");
redisValues.push_back(std::to_string(block->size));
redisValues.push_back("globalWeight");
redisValues.push_back(block->cacheObj.bucketName);
redisValues.push_back("creationTime");
redisValues.push_back(block->cacheObj.creationTime);
- redisValues.push_back("dirty");
- if (block->cacheObj.dirty == true || block->cacheObj.dirty == 1) {
- block->cacheObj.dirty = 1;
- }
- if (block->cacheObj.dirty == false || block->cacheObj.dirty == 0) {
- block->cacheObj.dirty = 0;
+ redisValues.push_back("dirtyObj");
+ if ((ret = check_bool(std::to_string(block->cacheObj.dirty))) != -EINVAL) {
+ block->cacheObj.dirty = (ret != 0);
+ } else {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl;
+ return -EINVAL;
}
redisValues.push_back(std::to_string(block->cacheObj.dirty));
redisValues.push_back("objHosts");
try {
boost::system::error_code ec;
+ response<ignore_t> resp;
request req;
- req.push_range("HMSET", key, redisValues);
- response<std::string> resp;
+ req.push_range("HSET", key, redisValues);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
} catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
{
std::string key = build_index(block);
+ std::vector<std::string> fields;
ldpp_dout(dpp, 10) << __func__ << "(): index is: " << key << dendl;
- if (exist_key(dpp, block, y)) {
- std::vector<std::string> fields;
-
- fields.push_back("blockID");
- fields.push_back("version");
- fields.push_back("size");
- fields.push_back("globalWeight");
- fields.push_back("blockHosts");
-
- fields.push_back("objName");
- fields.push_back("bucketName");
- fields.push_back("creationTime");
- fields.push_back("dirty");
- fields.push_back("objHosts");
-
- try {
- boost::system::error_code ec;
- request req;
- req.push_range("HMGET", key, fields);
- response< std::vector<std::string> > resp;
+ fields.push_back("blockID");
+ fields.push_back("version");
+ fields.push_back("dirtyBlock");
+ fields.push_back("size");
+ fields.push_back("globalWeight");
+ fields.push_back("blockHosts");
- redis_exec(conn, ec, req, resp, y);
+ fields.push_back("objName");
+ fields.push_back("bucketName");
+ fields.push_back("creationTime");
+ fields.push_back("dirtyObj");
+ fields.push_back("objHosts");
- if (std::get<0>(resp).value().empty()) {
- return -ENOENT;
- } else if (ec) {
- return -ec.value();
- }
+ try {
+ boost::system::error_code ec;
+ response< std::optional<std::vector<std::string>> > resp;
+ request req;
+ req.push_range("HMGET", key, fields);
- block->blockID = boost::lexical_cast<uint64_t>(std::get<0>(resp).value()[0]);
- block->version = std::get<0>(resp).value()[1];
- block->size = boost::lexical_cast<uint64_t>(std::get<0>(resp).value()[2]);
- block->globalWeight = boost::lexical_cast<int>(std::get<0>(resp).value()[3]);
- {
- std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[4]));
- block->hostsList.clear();
-
- while (!ss.eof()) { // Replace with boost::split? -Sam
- std::string host;
- std::getline(ss, host, '_');
- block->hostsList.push_back(host);
- }
- }
+ redis_exec(conn, ec, req, resp, y);
- block->cacheObj.objName = std::get<0>(resp).value()[5];
- block->cacheObj.bucketName = std::get<0>(resp).value()[6];
- block->cacheObj.creationTime = std::get<0>(resp).value()[7];
- block->cacheObj.dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
- block->dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
- {
- std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[9]));
- block->cacheObj.hostsList.clear();
-
- while (!ss.eof()) {
- std::string host;
- std::getline(ss, host, '_');
- block->cacheObj.hostsList.push_back(host);
- }
- }
- } catch (std::exception &e) {
- return -EINVAL;
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
}
- } else {
- return -ENOENT;
+
+ if (std::get<0>(resp).value().value().empty()) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values returned." << dendl;
+ return -ENOENT;
+ }
+
+ block->blockID = std::stoull(std::get<0>(resp).value().value()[0]);
+ block->version = std::get<0>(resp).value().value()[1];
+ block->dirty = std::stoi(std::get<0>(resp).value().value()[2]);
+ block->size = std::stoull(std::get<0>(resp).value().value()[3]);
+ block->globalWeight = std::stoull(std::get<0>(resp).value().value()[4]);
+ boost::split(block->hostsList, std::get<0>(resp).value().value()[5], boost::is_any_of("_"));
+ block->cacheObj.objName = std::get<0>(resp).value().value()[6];
+ block->cacheObj.bucketName = std::get<0>(resp).value().value()[7];
+ block->cacheObj.creationTime = std::get<0>(resp).value().value()[8];
+ block->cacheObj.dirty = std::stoi(std::get<0>(resp).value().value()[9]);
+ boost::split(block->cacheObj.hostsList, std::get<0>(resp).value().value()[10], boost::is_any_of("_"));
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
return 0;
}
/* Note: This method is not compatible for use on Ubuntu systems. */
-int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y)
+int BlockDirectory::copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y)
{
std::string key = build_index(block);
auto copyBlock = CacheBlock{ .cacheObj = { .objName = copyName, .bucketName = copyBucketName }, .blockID = 0 };
std::string copyKey = build_index(©Block);
- if (exist_key(dpp, block, y)) {
- try {
- response<int> resp;
-
- {
- boost::system::error_code ec;
- request req;
- req.push("COPY", key, copyKey);
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec) {
- return -ec.value();
- }
- }
-
- {
- boost::system::error_code ec;
- request req;
- req.push("HMSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
- response<std::string> res;
+ try {
+ boost::system::error_code ec;
+ response<
+ ignore_t,
+ ignore_t,
+ ignore_t,
+ response<std::optional<int>, std::optional<int>>
+ > resp;
+ request req;
+ req.push("MULTI");
+ req.push("COPY", key, copyKey);
+ req.push("HSET", copyKey, "objName", copyName, "bucketName", copyBucketName);
+ req.push("EXEC");
- redis_exec(conn, ec, req, res, y);
+ redis_exec(conn, ec, req, resp, y);
- if (ec) {
- return -ec.value();
- }
- }
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
- return std::get<0>(resp).value() - 1;
- } catch (std::exception &e) {
- return -EINVAL;
+ if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) {
+ return 0;
+ } else {
+ return -ENOENT;
}
- } else {
- return -ENOENT;
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
}
{
std::string key = build_index(block);
- if (exist_key(dpp, block, y)) {
- try {
- boost::system::error_code ec;
- request req;
- req.push("DEL", key);
- response<int> resp;
-
- redis_exec(conn, ec, req, resp, y);
+ try {
+ boost::system::error_code ec;
+ response<ignore_t> resp;
+ request req;
+ req.push("DEL", key);
- if (ec) {
- return -ec.value();
- }
+ redis_exec(conn, ec, req, resp, y);
- return std::get<0>(resp).value() - 1;
- } catch (std::exception &e) {
- return -EINVAL;
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
}
- } else {
- return 0; /* No delete was necessary */
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
+
+ return 0;
}
-int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y)
+int BlockDirectory::update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y)
{
std::string key = build_index(block);
if (exist_key(dpp, block, y)) {
try {
- /* Ensure field exists */
- {
- boost::system::error_code ec;
- request req;
- req.push("HEXISTS", key, field);
- response<int> resp;
-
- redis_exec(conn, ec, req, resp, y);
-
- if (!std::get<0>(resp).value()) {
- return -ENOENT;
- } else if (ec) {
- return -ec.value();
- }
- }
-
if (field == "blockHosts") {
/* Append rather than overwrite */
+ ldpp_dout(dpp, 20) << "BlockDirectory::" << __func__ << "() Appending to hosts list." << dendl;
+
boost::system::error_code ec;
+ response< std::optional<std::string> > resp;
request req;
req.push("HGET", key, field);
- response<std::string> resp;
redis_exec(conn, ec, req, resp, y);
- if (std::get<0>(resp).value().empty()) {
- return -ENOENT;
- } else if (ec) {
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
- std::get<0>(resp).value() += "_";
- std::get<0>(resp).value() += value;
- value = std::get<0>(resp).value();
+ /* If entry exists, it should have at least one host */
+ std::get<0>(resp).value().value() += "_";
+ std::get<0>(resp).value().value() += value;
+ value = std::get<0>(resp).value().value();
+ } else if (field == "dirtyObj" || field == "dirtyBlock") {
+ int ret = -1;
+ if ((ret = check_bool(value)) != -EINVAL) {
+ value = std::to_string((ret != 0));
+ } else {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: Invalid bool value" << dendl;
+ return -EINVAL;
+ }
}
- if (field == "dirty") {
- if (value == "true" || value == "1") {
- value = "1";
- }
- if (value == "false" || value == "0") {
- value = "0";
- }
- }
- {
- boost::system::error_code ec;
- request req;
- req.push_range("HSET", key, std::map<std::string, std::string>{{field, value}});
- response<int> resp;
- redis_exec(conn, ec, req, resp, y);
+ boost::system::error_code ec;
+ response<ignore_t> resp;
+ request req;
+ req.push("HSET", key, field, value);
- if (ec) {
- return -ec.value();
- }
+ redis_exec(conn, ec, req, resp, y);
- return std::get<0>(resp).value();
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
}
+
+ return 0;
} catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
} else {
}
}
-int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& delValue, optional_yield y)
+int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y)
{
std::string key = build_index(block);
- if (exist_key(dpp, block, y)) {
- try {
- /* Ensure field exists */
- {
- boost::system::error_code ec;
- request req;
- req.push("HEXISTS", key, "blockHosts");
- response<int> resp;
+ try {
+ {
+ boost::system::error_code ec;
+ response< std::optional<std::string> > resp;
+ request req;
+ req.push("HGET", key, "blockHosts");
- redis_exec(conn, ec, req, resp, y);
+ redis_exec(conn, ec, req, resp, y);
- if (!std::get<0>(resp).value()) {
- return -ENOENT;
- } else if (ec) {
- return -ec.value();
- }
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
}
- {
- boost::system::error_code ec;
- request req;
- req.push("HGET", key, "blockHosts");
- response<std::string> resp;
-
- redis_exec(conn, ec, req, resp, y);
-
- if (std::get<0>(resp).value().empty()) {
- return -ENOENT;
- } else if (ec) {
- return -ec.value();
- }
-
- if (std::get<0>(resp).value().find("_") == std::string::npos) /* Last host, delete entirely */
- return del(dpp, block, y);
-
- std::string result = std::get<0>(resp).value();
- auto it = result.find(delValue);
- if (it != std::string::npos)
- result.erase(result.begin() + it, result.begin() + it + delValue.size());
- else
- return -ENOENT;
+ if (std::get<0>(resp).value().value().empty()) {
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values returned." << dendl;
+ return -ENOENT;
+ }
- if (result[0] == '_')
- result.erase(0, 1);
+ std::string result = std::get<0>(resp).value().value();
+ auto it = result.find(value);
+ if (it != std::string::npos) {
+ result.erase(result.begin() + it, result.begin() + it + value.size());
+ } else {
+ return -ENOENT;
+ }
- delValue = result;
+ if (result[0] == '_') {
+ result.erase(0, 1);
+ } else if (result.length() && result[result.length() - 1] == '_') {
+ result.erase(result.length() - 1, 1);
}
- {
- boost::system::error_code ec;
- request req;
- req.push_range("HSET", key, std::map<std::string, std::string>{{"blockHosts", delValue}});
- response<int> resp;
+ if (result.length() == 0) /* Last host, delete entirely */
+ return del(dpp, block, y);
- redis_exec(conn, ec, req, resp, y);
+ value = result;
+ }
- if (ec) {
- return -ec.value();
- }
+ {
+ boost::system::error_code ec;
+ response<ignore_t> resp;
+ request req;
+ req.push("HSET", key, "blockHosts", value);
+
+ redis_exec(conn, ec, req, resp, y);
- return std::get<0>(resp).value();
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
}
- } catch (std::exception &e) {
- return -EINVAL;
}
- } else {
- return -ENOENT;
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
}
+
+ return 0;
}
} } // namespace rgw::d4n
#include "rgw_common.h"
-#include <boost/lexical_cast.hpp>
#include <boost/asio/detached.hpp>
#include <boost/redis/connection.hpp>
using boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
+using boost::redis::ignore_t;
struct CacheObj {
std::string objName; /* S3 object name */
std::string bucketName; /* S3 bucket name */
std::string creationTime; /* Creation time of the S3 Object */
bool dirty{false};
- std::vector<std::string> hostsList; /* List of hostnames <ip:port> of object locations for multiple backends */
+ std::unordered_set<std::string> hostsList; /* List of hostnames <ip:port> of object locations for multiple backends */
};
struct CacheBlock {
bool dirty{false};
uint64_t size; /* Block size in bytes */
int globalWeight = 0; /* LFUDA policy variable */
- std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
+ std::unordered_set<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
};
class Directory {
public:
- CephContext* cct;
-
Directory() {}
};
public:
ObjectDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
- void init(CephContext* cct) {
- this->cct = cct;
- }
- int exist_key(CacheObj* object, optional_yield y);
+ int exist_key(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
- int set(CacheObj* object, optional_yield y);
- int get(CacheObj* object, optional_yield y);
- int copy(CacheObj* object, std::string copyName, std::string copyBucketName, optional_yield y);
- int del(CacheObj* object, optional_yield y);
- int update_field(CacheObj* object, std::string field, std::string value, optional_yield y);
+ int set(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); /* If nx is true, set only if key doesn't exist */
+ int get(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
+ int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
+ int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
+ int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y);
private:
std::shared_ptr<connection> conn;
public:
BlockDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
- void init(CephContext* cct) {
- this->cct = cct;
- }
int exist_key(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
- int copy(const DoutPrefixProvider* dpp, CacheBlock* block, std::string copyName, std::string copyBucketName, optional_yield y);
+ int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
- int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, std::string field, std::string& value, optional_yield y);
+ int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y);
private:
}
template <typename... Types>
-void redis_exec(std::shared_ptr<connection> conn,
+static inline void redis_exec(std::shared_ptr<connection> conn,
boost::system::error_code& ec,
const boost::redis::request& req,
boost::redis::response<Types...>& resp, optional_yield y)
}
int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) {
- this->cct = cct;
- dir->init(cct);
+ response<int, int, int, int> resp;
+
driver = _driver;
tc = std::thread(&CachePolicy::cleaning, this, dpp);
tc.detach();
- int result = 0;
- response<int, int, int, int> resp;
try {
boost::system::error_code ec;
+ response<
+ ignore_t,
+ ignore_t,
+ ignore_t,
+ response<std::optional<int>, std::optional<int>>
+ > resp;
request req;
- req.push("HEXISTS", "lfuda", "age");
- req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum)); /* New cache node will always have the minimum average weight */
- req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size()));
- req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_d4n_l1_datacache_address);
+ req.push("MULTI");
+ req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum), /* New cache node will always have the minimum average weight */
+ "minLocalWeights_size", std::to_string(entries_map.size()),
+ "minLocalWeights_address", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+ req.push("HSETNX", "lfuda", "age", age); /* Only set maximum age if it doesn't exist */
+ req.push("EXEC");
redis_exec(conn, ec, req, resp, y);
if (ec) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
-
- result = std::min(std::get<1>(resp).value(), std::min(std::get<2>(resp).value(), std::get<3>(resp).value()));
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
}
- if (!std::get<0>(resp).value()) { /* Only set maximum age if it doesn't exist */
- try {
- boost::system::error_code ec;
- response<int> value;
- request req;
- req.push("HSET", "lfuda", "age", age);
-
- redis_exec(conn, ec, req, value, y);
-
- if (ec) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
- return -ec.value();
- }
-
- result = std::min(result, std::get<0>(value).value());
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
- }
-
asio::co_spawn(io_context.get_executor(),
redis_sync(dpp, y), asio::detached);
- return result;
+ return 0;
}
int LFUDAPolicy::age_sync(const DoutPrefixProvider* dpp, optional_yield y) {
- response<std::string> resp;
+ response< std::optional<std::string> > resp;
try {
boost::system::error_code ec;
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
} catch (std::exception &e) {
return -EINVAL;
}
- if (age > std::stoi(std::get<0>(resp).value()) || std::get<0>(resp).value().empty()) { /* Set new maximum age */
+ if (std::get<0>(resp).value().value().empty() || age > std::stoi(std::get<0>(resp).value().value())) { /* Set new maximum age */
try {
boost::system::error_code ec;
+ response<ignore_t> ret;
request req;
- response<int> value;
req.push("HSET", "lfuda", "age", age);
- redis_exec(conn, ec, req, resp, y);
+
+ redis_exec(conn, ec, req, ret, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
-
- return std::get<0>(value).value();
} catch (std::exception &e) {
return -EINVAL;
}
} else {
- age = std::stoi(std::get<0>(resp).value());
- return 0;
+ age = std::stoi(std::get<0>(resp).value().value());
}
+
+ return 0;
}
int LFUDAPolicy::local_weight_sync(const DoutPrefixProvider* dpp, optional_yield y) {
- int result;
-
if (fabs(weightSum - postedSum) > (postedSum * 0.1)) {
- response<std::string, std::string> resp;
+ response<std::vector<std::string>> resp;
try {
boost::system::error_code ec;
request req;
- req.push("HGET", "lfuda", "minLocalWeights_sum");
- req.push("HGET", "lfuda", "minLocalWeights_size");
+ req.push("HMGET", "lfuda", "minLocalWeights_sum", "minLocalWeights_size");
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
} catch (std::exception &e) {
return -EINVAL;
}
- float minAvgWeight = std::stof(std::get<0>(resp).value()) / std::stof(std::get<1>(resp).value());
+ float minAvgWeight = std::stof(std::get<0>(resp).value()[0]) / std::stof(std::get<0>(resp).value()[1]);
if ((static_cast<float>(weightSum) / static_cast<float>(entries_map.size())) < minAvgWeight) { /* Set new minimum weight */
try {
boost::system::error_code ec;
+ response<ignore_t> resp;
request req;
- response<int, int, int> value;
- req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum));
- req.push("HSET", "lfuda", "minLocalWeights_size", std::to_string(entries_map.size()));
- req.push("HSET", "lfuda", "minLocalWeights_address", dir->cct->_conf->rgw_d4n_l1_datacache_address);
+ req.push("HSET", "lfuda", "minLocalWeights_sum", std::to_string(weightSum),
+ "minLocalWeights_size", std::to_string(entries_map.size()),
+ "minLocalWeights_address", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
-
- result = std::min(std::get<0>(value).value(), std::get<1>(value).value());
- result = std::min(result, std::get<2>(value).value());
} catch (std::exception &e) {
return -EINVAL;
}
} else {
- weightSum = std::stoi(std::get<0>(resp).value());
- postedSum = std::stoi(std::get<0>(resp).value());
+ weightSum = std::stoi(std::get<0>(resp).value()[0]);
+ postedSum = std::stoi(std::get<0>(resp).value()[0]);
}
}
try { /* Post update for local cache */
boost::system::error_code ec;
+ response<ignore_t> resp;
request req;
- response<int, int> resp;
- req.push("HSET", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, "avgLocalWeight_sum", std::to_string(weightSum));
- req.push("HSET", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, "avgLocalWeight_size", std::to_string(entries_map.size()));
+ req.push("HSET", dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, "avgLocalWeight_sum", std::to_string(weightSum),
+ "avgLocalWeight_size", std::to_string(entries_map.size()));
+
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ec.what() << dendl;
return -ec.value();
}
- result = std::min(std::get<0>(resp).value(), std::get<1>(resp).value());
+ return 0;
} catch (std::exception &e) {
return -EINVAL;
}
-
- return result;
}
asio::awaitable<void> LFUDAPolicy::redis_sync(const DoutPrefixProvider* dpp, optional_yield y) {
for (;;) try {
/* Update age */
if (int ret = age_sync(dpp, y) < 0) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ret << dendl;
}
/* Update minimum local weight sum */
if (int ret = local_weight_sync(dpp, y) < 0) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << ret << dendl;
}
int interval = dpp->get_cct()->_conf->rgw_lfuda_sync_frequency;
rthread_timer->expires_after(std::chrono::seconds(interval));
co_await rthread_timer->async_wait(asio::use_awaitable);
} catch (sys::system_error& e) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
-
if (e.code() == asio::error::operation_aborted) {
break;
} else {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "() ERROR: " << e.what() << dendl;
continue;
}
}
victim->blockID = entries_heap.top()->offset;
victim->size = entries_heap.top()->len;
- if (dir->get(dpp, victim, y) < 0) {
+ if (blockDir->get(dpp, victim, y) < 0) {
return nullptr;
}
CacheBlock* victim = get_victim_block(dpp, y);
if (victim == nullptr) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl;
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl;
delete victim;
return -ENOENT;
}
}
// check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty
if (it->second->dirty) {
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl;
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl;
return -ENOENT;
}
int avgWeight = weightSum / entries_map.size();
- if (victim->hostsList.size() == 1 && victim->hostsList[0] == dir->cct->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */
+ if (victim->hostsList.size() == 1 && *(victim->hostsList.begin()) == dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */
if (victim->globalWeight) {
it->second->localWeight += victim->globalWeight;
(*it->second->handle)->localWeight = it->second->localWeight;
}
victim->globalWeight = 0;
- auto globalWeight = std::to_string(victim->globalWeight);
- if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) {
- delete victim;
- return ret;
- }
}
if (it->second->localWeight > avgWeight) {
}
victim->globalWeight += it->second->localWeight;
- auto globalWeight = std::to_string(victim->globalWeight);
- if (int ret = dir->update_field(dpp, victim, "globalWeight", globalWeight, y) < 0) {
+ if (int ret = blockDir->update_field(dpp, victim, "globalWeight", std::to_string(victim->globalWeight), y) < 0) {
delete victim;
return ret;
}
- if (int ret = dir->remove_host(dpp, victim, dir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
+ if (int ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y) < 0) {
delete victim;
return ret;
}
return 0;
}
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y)
{
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
const std::lock_guard l(lfuda_lock);
}
if (updateLocalWeight) {
- if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0)
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
+ int ret = -1;
+ if ((ret = cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y)) < 0)
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl;
}
weightSum += ((localWeight < 0) ? 0 : localWeight);
entries_heap.erase(p->second->handle);
entries_map.erase(p);
+ delete p->second;
+ p->second = nullptr;
return true;
}
object_heap.erase(p->second->handle);
o_entries_map.erase(p);
delete p->second;
+ p->second = nullptr;
return true;
}
void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
{
- const int interval = cct->_conf->rgw_d4n_cache_cleaning_interval;
+ const int interval = dpp->get_cct()->_conf->rgw_d4n_cache_cleaning_interval;
while(!quit) {
ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl;
std::string name = "";
int op_ret = processor->prepare(null_yield);
if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
- break;
+ ldpp_dout(dpp, 20) << __func__ << "processor->prepare() returned ret=" << op_ret << dendl;
+ break;
}
std::string prefix = b_name + "_" + e->version + "_" + c_obj->get_name();
do {
ceph::bufferlist data;
if (fst >= lst){
- break;
+ break;
}
- off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
off_t cur_len = cur_size - fst;
std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
ldpp_dout(dpp, 10) << __func__ << "(): oid_in_cache=" << oid_in_cache << dendl;
op_ret = filter->process(std::move(data), ofs);
if (op_ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
- << op_ret << dendl;
- return;
+ ldpp_dout(dpp, 20) << __func__ << "processor->process() returned ret="
+ << op_ret << dendl;
+ return;
}
ofs += len;
fst = 0;
do {
if (fst >= lst) {
- break;
+ break;
}
- off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
off_t cur_len = cur_size - fst;
std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
block.size = cur_len;
block.blockID = fst;
std::string dirty = "false";
- op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield);
+ op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
if (op_ret < 0) {
- ldpp_dout(dpp, 5) << __func__ << "updating dirty flag in Block directory failed!" << dendl;
- return;
+ ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
+ return;
}
fst += cur_len;
} while(fst < lst);
block.size = 0;
block.blockID = 0;
if (c_obj->have_instance()) {
- dir->get(dpp, &block, null_yield);
+ blockDir->get(dpp, &block, null_yield);
if (block.version == c_obj->get_instance()) { //versioned case - update head block entry that has latest version
std::string dirty = "false";
- op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield);
+ op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
if (op_ret < 0) {
ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
//return;
}
} else { //non-versioned case
std::string dirty = "false";
- op_ret = dir->update_field(dpp, &block, "dirty", dirty, null_yield);
+ op_ret = blockDir->update_field(dpp, &block, "dirty", dirty, null_yield);
if (op_ret < 0) {
ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for head failed!" << dendl;
//return;
instance_block.cacheObj.objName = c_obj->get_oid();
instance_block.size = 0;
instance_block.blockID = 0;
- op_ret = dir->update_field(dpp, &instance_block, "dirty", "false", null_yield);
+ op_ret = blockDir->update_field(dpp, &instance_block, "dirty", "false", null_yield);
if (op_ret < 0) {
ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
//return;
}
}
+
+ op_ret = blockDir->update_field(dpp, &block, "dirtyBlock", "false", null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory for head failed, ret=" << op_ret << dendl;
+ return;
+ }
+
//remove entry from map and queue, eraseObj locks correctly
+ rgw::d4n::CacheObj obj;
+ obj.bucketName = c_obj->get_bucket()->get_name();
+ obj.objName = c_obj->get_key().get_oid();
+ op_ret = objDir->update_field(dpp, &obj, "dirty", "false", null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in object directory failed, ret=" << op_ret << dendl;
+ return;
+ }
+
+ op_ret = blockDir->update_field(dpp, &block, "dirtyObj", "false", null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
+ return;
+ }
+
eraseObj(dpp, e->key, null_yield);
} else { //end-if std::difftime(time(NULL), e->creationTime) > interval
std::this_thread::sleep_for(std::chrono::milliseconds(interval)); //TODO:: should this time be optimised?
} //end-while true
}
-
int LRUPolicy::exist_key(std::string key)
{
const std::lock_guard l(lru_lock);
entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
auto ret = cacheDriver->delete_data(dpp, p.key, y);
if (ret < 0) {
- ldpp_dout(dpp, 10) << __func__ << "(): Failed to delete data from the cache backend: " << ret << dendl;
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to delete data from the cache backend, ret=" << ret << dendl;
return ret;
}
return 0;
}
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y)
{
const std::lock_guard l(lru_lock);
_erase(dpp, key, y);
uint64_t len;
std::string version;
bool dirty;
- Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty) : key(key), offset(offset),
- len(len), version(version), dirty(dirty) {}
- };
-
+ Entry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty) : key(key), offset(offset),
+ len(len), version(version), dirty(dirty) {}
+ };
//The disposer object function
struct Entry_delete_disposer {
std::string bucket_name;
rgw_obj_key obj_key;
ObjEntry() = default;
- ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag), bucket_name(bucket_name), obj_key(obj_key) {}
+ ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size,
+ time_t creationTime, rgw_user user, std::string& etag,
+ const std::string& bucket_name, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size),
+ creationTime(creationTime), user(user), etag(etag),
+ bucket_name(bucket_name), obj_key(obj_key) {}
};
public:
virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0;
virtual int exist_key(std::string key) = 0;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) = 0;
- virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) = 0;
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) = 0;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size,
+ time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name,
+ const rgw_obj_key& obj_key, optional_yield y) = 0;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual void cleaning(const DoutPrefixProvider* dpp) = 0;
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
handle_type handle;
- LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty), localWeight(localWeight) {}
+ LFUDAEntry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty),
+ localWeight(localWeight) {}
void set_handle(handle_type handle_) { handle = handle_; }
};
using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
handle_type handle;
- LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, obj_key) {}
+ LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size,
+ time_t creationTime, rgw_user user, std::string& etag,
+ const std::string& bucket_name, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size,
+ creationTime, user, etag, bucket_name,
+ obj_key) {}
void set_handle(handle_type handle_) { handle = handle_; }
};
int age = 1, weightSum = 0, postedSum = 0;
optional_yield y = null_yield;
std::shared_ptr<connection> conn;
- BlockDirectory* dir;
+ BlockDirectory* blockDir;
+ ObjectDirectory* objDir;
rgw::cache::CacheDriver* cacheDriver;
std::optional<asio::steady_timer> rthread_timer;
rgw::sal::Driver* driver;
std::thread tc;
- CephContext* cct;
CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
int age_sync(const DoutPrefixProvider* dpp, optional_yield y);
rthread_timer->cancel();
}
}
- LFUDAEntry* find_entry(std::string key) {
+ LFUDAEntry* find_entry(const std::string& key) {
auto it = entries_map.find(key);
if (it == entries_map.end())
return nullptr;
conn(conn),
cacheDriver(cacheDriver)
{
- dir = new BlockDirectory{conn};
+ blockDir = new BlockDirectory{conn};
+ objDir = new ObjectDirectory{conn};
}
~LFUDAPolicy() {
rthread_stop();
- delete dir;
+ delete blockDir;
+ delete objDir;
std::lock_guard l(lfuda_cleaning_lock);
quit = true;
cond.notify_all();
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
void save_y(optional_yield y) { this->y = y; }
- virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size,
+ time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name,
+ const rgw_obj_key& obj_key, optional_yield y) override;
virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
virtual void cleaning(const DoutPrefixProvider* dpp) override;
LFUDAObjEntry* find_obj_entry(const std::string& key) {
virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, optional_yield y) override;
- virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name, const rgw_obj_key& obj_key, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y) override;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size,
+ time_t creationTime, const rgw_user user, std::string& etag, const std::string& bucket_name,
+ const rgw_obj_key& obj_key, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual void cleaning(const DoutPrefixProvider* dpp) override {}
CachePolicy* cachePolicy;
public:
- PolicyDriver(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver, std::string _policyName) : policyName(_policyName)
+ PolicyDriver(std::shared_ptr<connection>& conn, rgw::cache::CacheDriver* cacheDriver, const std::string& _policyName) : policyName(_policyName)
{
if (policyName == "lfuda") {
cachePolicy = new LFUDAPolicy(conn, cacheDriver);
cfg.clientname = "D4N.Filter";
if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
- ldpp_dout(dpp, 10) << "D4NFilterDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
return -EDESTADDRREQ;
}
FilterDriver::initialize(cct, dpp);
cacheDriver->initialize(dpp);
- objDir->init(cct);
- blockDir->init(cct);
policyDriver->get_cache_policy()->init(cct, dpp, io_context, next);
return 0;
}
if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, *delattrs, y); ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
return ret;
}
} //if delattrs != nullptr
} else {
auto ret = next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl;
return ret;
}
}
/* Set attributes locally */
auto ret = this->set_attrs(attrs);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): D4NFilterObject set_attrs method failed." << dendl;
}
} // if found_in_cache = true
version = bl.c_str();
ldpp_dout(dpp, 20) << __func__ << " id tag version is: " << version << dendl;
} else {
- ldpp_dout(dpp, 20) << __func__ << " Failed to find id tag" << dendl;
+ ldpp_dout(dpp, 0) << __func__ << " Failed to find id tag" << dendl;
return -ENOENT;
}
}
.objName = this->get_name(),
.bucketName = this->get_bucket()->get_name(),
.dirty = dirty,
- .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
};
rgw::d4n::CacheBlock block = rgw::d4n::CacheBlock{
.version = this->get_object_version(),
.dirty = dirty,
.size = 0,
- .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
};
ret = blockDir->set(dpp, &block, y);
if (ret < 0) {
.objName = this->get_oid(),
.bucketName = this->get_bucket()->get_name(),
.dirty = dirty,
- .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
};
rgw::d4n::CacheBlock version_block = rgw::d4n::CacheBlock{
.version = this->get_object_version(),
.dirty = dirty,
.size = 0,
- .hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address },
- };
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
+ };
+
auto ret = blockDir->set(dpp, &version_block, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
};
bool found_in_cache = true;
+ int ret = -1;
//if the block corresponding to head object does not exist in directory, implies it is not cached
- if (blockDir->exist_key(dpp, &block, y) && (blockDir->get(dpp, &block, y) == 0)) {
+ if ((ret = blockDir->get(dpp, &block, y) == 0)) {
std::string version;
version = block.version;
this->set_object_version(version);
found_in_cache = false;
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
}
- } else { //if blockDir->exist_key
+ } else if (ret == -ENOENT) { //if blockDir->get
+ found_in_cache = false;
+ } else {
found_in_cache = false;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed, ret=" << ret << dendl;
}
+
return found_in_cache;
}
if (!target_obj) {
ret = -ENOENT;
}
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to fetching attrs from backend store with ret: " << ret << dendl;
return ret;
}
ret = calculate_version(dpp, y, version);
if (ret < 0 || version.empty()) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): version could not be calculated." << dendl;
}
head_oid_in_cache = this->get_bucket()->get_name() + "_" + version + "_" + this->get_name();
bufferlist bl;
ret = this->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
} else {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to evict data with err: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to evict data, ret=" << ret << dendl;
}
}
if (ret == 0) {
this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y);
ret = set_head_obj_dir_entry(dpp, y, is_latest_version);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
}
} else {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend with error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend, ret=" << ret << dendl;
}
}
next->params = params;
auto ret = next->prepare(y, dpp);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed with error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): next->prepare method failed, ret=" << ret << dendl;
return ret;
}
if (params.part_num) {
source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
ret = source->set_head_obj_dir_entry(dpp, y, is_latest_version);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
}
//write object to directory.
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
.bucketName = source->get_bucket()->get_name(),
.creationTime = std::to_string(ceph::real_clock::to_double(source->get_mtime())),
.dirty = false,
- .hostsList = { source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
};
- ret = source->driver->get_obj_dir()->set(&object, y);
+ ret = source->driver->get_obj_dir()->set(dpp, &object, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
return ret;
}
} else {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): put for head object failed with error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): put for head object failed, ret=" << ret << dendl;
}
} else {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): failed to cache head object, eviction returned error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): failed to cache head object during eviction, ret=" << ret << dendl;
}
}
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
int ret = -1;
- if (source->driver->get_block_dir()->exist_key(dpp, &block, y) > 0 && (ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
- auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address);
+ if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+ auto it = find(block.hostsList.begin(), block.hostsList.end(), dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
if (it != block.hostsList.end()) { /* Local copy */
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
std::string key = oid_in_cache;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.dirty << dendl;
+
if (block.dirty == true) {
- key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+ key = "D_" + oid_in_cache; // we keep track of dirty data in the cache for the metadata failure case
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl;
}
+
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
+
if (block.version == version) {
if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
// Read From Cache
if (r < 0) {
drain(dpp, y);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
return r;
}
// if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
} else {
+ int r = -1;
+ if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl;
+
+ if ((block.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+ auto r = drain(dpp, y);
+
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+
+ break;
+ }
+ }
+ // if (block.version == version)
+ } else {
+ // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+ auto r = drain(dpp, y);
+
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ }
+ // if (it != block.hostsList.end())
+ } else if (block.hostsList.size()) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ }
+ // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
+ } else if (ret == -ENOENT) {
+ block.blockID = adjusted_start_ofs;
+ block.size = obj_max_req_size;
+
+ if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
+ auto it = find(block.hostsList.begin(), block.hostsList.end(), dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
+ if (it != block.hostsList.end()) { /* Local copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
+
+ if (block.version == version) {
oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+ std::string key = oid_in_cache;
+
//for ranged requests, for last part, the whole part might exist in the cache
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
- " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
+ " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
- if (block.dirty == true){
- key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
- }
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
// Read From Cache
+ if (block.dirty == true){
+ key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+ }
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
+
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
auto r = flush(dpp, std::move(completed), y);
if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
- return r;
+ drain(dpp, y);
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+ return r;
+ }
+ // if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
+ } else {
+ int r = -1;
+ if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+
+ if ((block.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+ auto r = drain(dpp, y);
+
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+
+ break;
}
}
+ // if (it != block.hostsList.end())
+ } else if (block.hostsList.size()) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
}
// if (block.version == version)
} else {
auto r = drain(dpp, y);
if (r < 0) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
return r;
}
}
- } else if (block.hostsList.size()) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
- // TODO: Retrieve remotely
+ // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
+ } else if (ret == -ENOENT) { /* Fetch from backend */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+
+ break;
}
- // if (source->driver->get_block_dir()->exist_key(&block, y) > 0 && int ret = source->driver->get_block_dir()->get(&block, y) == 0)
+ // else if (ret == -ENOENT)
} else { /* Fetch from backend */
if (ret < 0)
- ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << dendl;
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
auto r = drain(dpp, y);
-
if (r < 0) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
return r;
}
auto r = next->iterate(dpp, ofs, end, this->cb.get(), y);
if (r < 0) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, ret=" << r << dendl;
return r;
}
rgw_obj obj = source->get_obj();
auto ret = source->get_obj_attrs(y, dpp, &obj);
if (ret < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Error: failed to fetch attrs, ret= " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Error: failed to fetch attrs, ret=" << ret << dendl;
return ret;
}
//get_obj_attrs() calls set_attrs() internally, hence get_attrs() can be invoked to get the latest attrs.
rgw::d4n::CacheBlock block, existing_block;
rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
- block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
block.cacheObj.objName = source->get_key().get_oid();
block.cacheObj.bucketName = source->get_bucket()->get_name();
std::stringstream s;
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
/* Store block in directory */
- if (!blockDir->exist_key(dpp, &block, *y)) {
- if (blockDir->set(dpp, &block, *y) < 0) //should we revert previous steps if this step fails?
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
- existing_block.dirty = block.dirty;
- if (blockDir->get(dpp, &existing_block, *y) < 0) {
- ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
- } else {
- if (existing_block.version != block.version) {
- if (blockDir->del(dpp, &existing_block, *y) < 0) //delete existing block
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(dpp, &block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
- }
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+
+ if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
+ if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
+ block = existing_block;
+ block.version = version;
}
+
+ block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
+ if ((ret = blockDir->set(dpp, &block, *y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
}
//write object to directory.
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
.objName = source->get_oid(),
.bucketName = source->get_bucket()->get_name(),
.dirty = dirty,
- .hostsList = { source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address }
+ .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }
};
- ret = source->driver->get_obj_dir()->set(&object, *y);
+ ret = source->driver->get_obj_dir()->set(dpp, &object, *y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
return ret;
}
} else {
- ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
}
}
}
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, *y);
/* Store block in directory */
- if (!blockDir->exist_key(dpp, &block, *y)) {
- if (blockDir->set(dpp, &block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
- existing_block.dirty = block.dirty;
- if (blockDir->get(dpp, &existing_block, *y) < 0) {
- ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
- }
- if (existing_block.version != block.version) {
- if (blockDir->del(dpp, &existing_block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(dpp, &block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl;
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+
+ if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
+ if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
+ block = existing_block;
+ block.version = version;
}
+
+ block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
+ if ((ret = blockDir->set(dpp, &block, *y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
}
} else {
- ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
}
}
}
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, *y);
/* Store block in directory */
- if (!blockDir->exist_key(dpp, &block, *y)) {
- if (blockDir->set(dpp, &block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
- existing_block.dirty = block.dirty;
- if (blockDir->get(dpp, &existing_block, *y) < 0) {
- ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
- } else {
- if (existing_block.version != block.version) {
- if (blockDir->del(dpp, &existing_block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(dpp, &block, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
- }
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+
+ if ((ret = blockDir->get(dpp, &existing_block, *y)) == 0 || ret == -ENOENT) {
+ if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
+ block = existing_block;
+ block.version = version;
}
- }
+
+ block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
+ if ((ret = blockDir->set(dpp, &block, *y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
+ }
} else {
- ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed, ret=" << ret << dendl;
}
} else {
- ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured during eviction: " << " error: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4N Filter: " << __func__ << " An error occured during eviction, ret=" << ret << dendl;
}
}
.bucketName = source->get_bucket()->get_name()
};
- if (source->driver->get_obj_dir()->del(&obj, y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory del method failed." << dendl;
+ int ret = -1;
+ if ((ret = source->driver->get_obj_dir()->del(dpp, &obj, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): ObjectDirectory del() method failed, ret=" << ret << dendl;
Attrs::iterator attrs;
Attrs currentattrs = source->get_attrs();
currentFields.push_back(attrs->first);
}
- if (source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver del method failed." << dendl;
+ if ((ret = source->driver->get_cache_driver()->del(dpp, source->get_key().get_oid(), y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver del() method failed, ret=" << ret << dendl;
return next->delete_obj(dpp, y, flags);
}
int D4NFilterWriter::prepare(optional_yield y)
{
startTime = time(NULL);
- if (driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl;
+
+ int ret = -1;
+ if ((ret = driver->get_cache_driver()->delete_data(dpp, obj->get_key().get_oid(), y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data() method failed, ret=" << ret << dendl;
+
d4n_writecache = g_conf()->d4n_writecache_enabled;
if (!d4n_writecache){
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl;
return next->prepare(y);
}
+
if (!object->have_instance()) {
if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended
this->version = "null";
} else {
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): " << "version is: " << object->get_instance() << dendl;
}
+
return 0;
}
prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_name();
rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
- block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+ block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
block.cacheObj.bucketName = obj->get_bucket()->get_name();
block.cacheObj.objName = obj->get_key().get_oid();
block.cacheObj.dirty = dirty;
- block.cacheObj.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+ block.cacheObj.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
existing_block.cacheObj.objName = block.cacheObj.objName;
existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
-
int ret = 0;
if (!d4n_writecache) {
block.size = bl.length();
block.blockID = ofs;
block.dirty = true;
- block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+ block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
block.version = version;
dirty = true;
ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, y);
if (ret == 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl;
driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y);
- if (!blockDir->exist_key(dpp, &block, y)) {
- if (blockDir->set(dpp, &block, y) < 0) //should we revert previous steps if this step fails?
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- existing_block.blockID = block.blockID;
- existing_block.size = block.size;
- if (blockDir->get(dpp, &existing_block, y) < 0) {
- ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
- } else {
- if (existing_block.version != block.version) {
- if (blockDir->del(dpp, &existing_block, y) < 0) //delete existing block
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
- if (blockDir->set(dpp, &block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
- } else {
- if (blockDir->update_field(dpp, &block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
- }
+
+ /* Store block in directory */
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+
+ if ((ret = blockDir->get(dpp, &existing_block, y)) == 0 || ret == -ENOENT) {
+ if (ret == 0) { //new versioned block will have new version, hostsList etc, how about globalWeight?
+ block = existing_block;
+ block.version = version;
}
+
+ block.hostsList.insert(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
+
+ if ((ret = blockDir->set(dpp, &block, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set() method failed, ret=" << ret << dendl;
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << ", ret=" << ret << dendl;
}
} else {
- ldpp_dout(dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed, ret=" << ret << dendl;
return ret;
}
}
uint32_t flags)
{
bool dirty = false;
- std::vector<std::string> hostsList = {};
+ std::unordered_set<std::string> hostsList = {};
auto creationTime = startTime;
std::string objEtag = etag;
bool write_to_backend_store = false;
driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
ret = object->set_head_obj_dir_entry(dpp, y, true, true);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
return ret;
}
driver->get_policy_driver()->get_cache_policy()->updateObj(dpp, key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_key(), y);
//write object to directory.
- hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address };
+ hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address };
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
.objName = obj->get_oid(),
.bucketName = obj->get_bucket()->get_name(),
.dirty = dirty,
.hostsList = hostsList
};
- ret = driver->get_obj_dir()->set(&object, y);
+ ret = driver->get_obj_dir()->set(dpp, &object, y);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed with err: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed, ret=" << ret << dendl;
return ret;
}
} else { //if get_cache_driver()->put()
write_to_backend_store = true;
- ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " put failed for head_oid_in_cache wih error: " << ret << dendl;
- ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << " calling complete of backend store: " << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << " put failed for head_oid_in_cache, ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << " calling complete of backend store: " << dendl;
}
} else { // if d4n_writecache = true
write_to_backend_store = true;
delete_at, if_match, if_nomatch, user_data, zones_trace,
canceled, rctx, flags);
if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): Writing to backend store failed with err: " << ret << dendl;
+ ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): Writing to backend store failed, ret=" << ret << dendl;
}
}
namespace rgw { namespace cache {
-std::list<std::string> build_attrs(const rgw::sal::Attrs& binary)
+std::vector<std::string> build_attrs(const rgw::sal::Attrs& binary)
{
- std::list<std::string> values;
+ std::vector<std::string> values;
/* Convert to vector */
if (!binary.empty()) {
initiate_exec{std::move(conn)}, token, req, resp);
}
-template <typename T>
-void redis_exec(std::shared_ptr<connection> conn, boost::system::error_code& ec, boost::redis::request& req, boost::redis::response<T>& resp, optional_yield y)
+template <typename... Types>
+void redis_exec(std::shared_ptr<connection> conn,
+ boost::system::error_code& ec,
+ const boost::redis::request& req,
+ boost::redis::response<Types...>& resp, optional_yield y)
{
if (y) {
auto yield = y.get_yield_context();
cfg.clientname = "RedisDriver";
if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
return -EDESTADDRREQ;
}
/* Every set will be treated as new */
try {
boost::system::error_code ec;
- response<std::string> resp;
+ response<ignore_t> resp;
auto redisAttrs = build_attrs(attrs);
if (bl.length()) {
}
request req;
- req.push_range("HMSET", entry, redisAttrs);
+ req.push_range("HSET", entry, redisAttrs);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
/* Retrieve existing values from cache */
try {
boost::system::error_code ec;
- response< std::map<std::string, std::string> > resp;
+ response< std::optional<std::map<std::string, std::string>> > resp;
request req;
req.push("HGETALL", entry);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
- for (auto const& it : std::get<0>(resp).value()) {
+ if (std::get<0>(resp).value().value().empty()) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No values returned." << dendl;
+ return -ENOENT;
+ }
+
+ for (auto const& it : std::get<0>(resp).value().value()) {
if (it.first == "data") {
bl.append(it.second);
} else {
}
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
try {
boost::system::error_code ec;
+ response<
+ ignore_t,
+ ignore_t,
+ ignore_t,
+ response<std::optional<int>, std::optional<int>>
+ > resp;
request req;
- req.push("HEXISTS", entry, "data");
+ req.push("MULTI");
+ req.push("HSTRLEN", entry, "data");
+ req.push("DEL", entry);
+ req.push("EXEC");
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
+
+ this->free_space += std::get<0>(std::get<3>(resp).value()).value().value();
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
- if (std::get<0>(resp).value()) {
- response<std::string> data;
- response<int> ret;
+ return 0;
+}
+
+int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y)
+{
+ std::string value = "";
+ std::string entry = partition_info.location + key;
- try {
+ try {
+ {
boost::system::error_code ec;
+ response< std::optional<std::string> > resp;
request req;
req.push("HGET", entry, "data");
- redis_exec(conn, ec, req, data, y);
+ redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
+
+ if (std::get<0>(resp).value().value().empty()) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No data entry found." << dendl;
+ } else {
+ value = std::get<0>(resp).value().value();
+ }
}
- try {
+ {
+ /* Append to existing value or set as new value */
boost::system::error_code ec;
+ response<ignore_t> resp;
+ std::string newVal = value + bl_data.to_str();
+
request req;
- req.push("DEL", entry);
+ req.push("HSET", entry, "data", newVal);
- redis_exec(conn, ec, req, ret, y);
+ redis_exec(conn, ec, req, resp, y);
- if (!std::get<0>(ret).value()) {
- return -ENOENT;
- } else if (ec) {
+ if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- this->free_space += std::get<0>(data).value().length();
- }
-
- return 0;
-}
-
-int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y)
-{
- response<int> exists;
- std::string value;
- std::string entry = partition_info.location + key;
-
- try {
- boost::system::error_code ec;
- request req;
- req.push("HEXISTS", entry, "data");
-
- redis_exec(conn, ec, req, exists, y);
-
- if (ec) {
- return -ec.value();
- }
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- if (!std::get<0>(exists).value()) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Data field was not found." << dendl;
- return -ENOENT;
- }
-
- try {
- boost::system::error_code ec;
- response<std::string> resp;
- request req;
- req.push("HGET", entry, "data");
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec) {
- return -ec.value();
- }
-
- value = std::get<0>(resp).value();
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- try {
- /* Append to existing value or set as new value */
- boost::system::error_code ec;
- response<std::string> resp;
- std::string newVal = value + bl_data.to_str();
-
- request req;
- req.push("HMSET", entry, "data", newVal);
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec) {
- return -ec.value();
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y)
{
std::string entry = partition_info.location + key;
- response<int> resp;
try {
boost::system::error_code ec;
request req;
- req.push("HEXISTS", entry, "data");
+ response<
+ ignore_t,
+ ignore_t,
+ ignore_t,
+ response<std::optional<int>, std::optional<int>>
+ > resp;
+ req.push("MULTI");
+ req.push("HSTRLEN", entry, "data");
+ req.push("HDEL", entry, "data");
+ req.push("EXEC");
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
+
+ this->free_space += std::get<0>(std::get<3>(resp).value()).value().value();
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
- if (std::get<0>(resp).value()) {
- response<std::string> data;
- response<int> ret;
-
- try {
- boost::system::error_code ec;
- request req;
- req.push("HGET", entry, "data");
-
- redis_exec(conn, ec, req, data, y);
-
- if (ec) {
- return -ec.value();
- }
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- try {
- boost::system::error_code ec;
- request req;
- req.push("HDEL", entry, "data");
-
- redis_exec(conn, ec, req, ret, y);
-
- if (!std::get<0>(ret).value()) {
- return -ENOENT;
- } else if (ec) {
- return -ec.value();
- }
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- this->free_space += std::get<0>(data).value().length();
- }
-
return 0;
}
try {
boost::system::error_code ec;
- response< std::map<std::string, std::string> > resp;
+ response< std::optional<std::map<std::string, std::string>> > resp;
request req;
req.push("HGETALL", entry);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
- for (auto const& it : std::get<0>(resp).value()) {
- if (it.first != "data") {
+ if (std::get<0>(resp).value().value().empty()) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No values returned." << dendl;
+ return -ENOENT;
+ }
+
+ for (auto const& it : std::get<0>(resp).value().value()) {
+ if (it.first != "data") { /* Ignore data */
buffer::list bl_value;
bl_value.append(it.second);
attrs.insert({it.first, bl_value});
}
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
/* Every attr set will be treated as new */
try {
boost::system::error_code ec;
- response<std::string> resp;
- std::string result;
- std::list<std::string> redisAttrs = build_attrs(attrs);
-
+ response<ignore_t> resp;
request req;
- req.push_range("HMSET", entry, redisAttrs);
+ auto redisAttrs = build_attrs(attrs);
+
+ req.push_range("HSET", entry, redisAttrs);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y)
{
- std::string entry = partition_info.location + key;
-
- try {
- boost::system::error_code ec;
- response<std::string> resp;
- auto redisAttrs = build_attrs(attrs);
-
- request req;
- req.push_range("HMSET", entry, redisAttrs);
-
- redis_exec(conn, ec, req, resp, y);
-
- if (ec) {
- return -ec.value();
- }
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- return 0;
+ return set_attrs(dpp, key, attrs, y);
}
int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs, optional_yield y)
try {
boost::system::error_code ec;
- response<int> resp;
+ response<ignore_t> resp;
auto redisAttrs = build_attrs(del_attrs);
request req;
redis_exec(conn, ec, req, resp, y);
- if (!std::get<0>(resp).value()) {
- return -ENOENT;
- } else if (ec) {
+ if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
-
- return std::get<0>(resp).value();
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
+
+ return 0;
}
int RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, std::string& attr_val, optional_yield y)
{
std::string entry = partition_info.location + key;
- response<std::string> value;
- response<int> resp;
- attr_val = "";
+ response< std::optional<std::string> > resp;
- /* Ensure field was set */
try {
boost::system::error_code ec;
request req;
- req.push("HEXISTS", entry, attr_name);
+ req.push("HGET", entry, attr_name);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- if (!std::get<0>(resp).value()) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Attribute was not found." << dendl;
- return -ENOENT;
- }
-
- /* Retrieve existing value from cache */
- try {
- boost::system::error_code ec;
- request req;
- req.push("HGET", entry, attr_name);
-
- redis_exec(conn, ec, req, value, y);
- if (ec) {
- return -ec.value();
+ if (std::get<0>(resp).value().value().empty()) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): No value returned." << dendl;
+ return -ENOENT;
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
- if (!std::get<0>(resp).value()) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Attribute was not found." << dendl;
- return -ENOENT;
- }
-
- /* Retrieve existing value from cache */
- try {
- boost::system::error_code ec;
- request req;
- req.push("HGET", entry, attr_name);
-
- redis_exec(conn, ec, req, value, y);
-
- if (ec) {
- return -ec.value();
- }
- } catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
- return -EINVAL;
- }
-
- attr_val = std::get<0>(value).value();
+ attr_val = std::get<0>(resp).value().value();
return 0;
}
int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y)
{
std::string entry = partition_info.location + key;
- response<int> resp;
/* Every attr set will be treated as new */
try {
boost::system::error_code ec;
+ response<ignore_t> resp;
request req;
req.push("HSET", entry, attr_name, attr_val);
redis_exec(conn, ec, req, resp, y);
if (ec) {
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << ec.what() << dendl;
return -ec.value();
}
} catch (std::exception &e) {
- ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
+ ldpp_dout(dpp, 0) << "RedisDriver::" << __func__ << "(): ERROR: " << e.what() << dendl;
return -EINVAL;
}
- return std::get<0>(resp).value();
+ return 0;
}
Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr<connection> conn,
auto s = std::make_shared<RedisDriver::redis_response>();
auto& resp = s->resp;
auto& req = s->req;
- req.push_range("HMSET", key, redisAttrs);
+ req.push_range("HSET", key, redisAttrs);
conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
};
using boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
+using boost::redis::ignore_t;
class RedisDriver : public CacheDriver {
public:
uint64_t outstanding_write_size;
struct redis_response {
- boost::redis::request req;
- boost::redis::response<std::string> resp;
+ request req;
+ boost::redis::generic_response resp;
};
struct redis_aio_handler {
/* Only append data for GET call */
if (s->req.payload().find("HGET") != std::string::npos) {
- r.data.append(std::get<0>(s->resp).value());
+ r.data.append((s->resp).value().at(0).value);
}
throttle->put(r);
ASSERT_NE(dir, nullptr);
ASSERT_NE(conn, nullptr);
- dir->init(env->cct);
-
/* Run fixture's connection */
config cfg;
cfg.addr.host = env->redisHost.substr(0, env->redisHost.find(":"));
ASSERT_NE(dir, nullptr);
ASSERT_NE(conn, nullptr);
- dir->init(env->cct);
-
/* Run fixture's connection */
config cfg;
cfg.addr.host = env->redisHost.substr(0, env->redisHost.find(":"));
net::io_context io;
std::shared_ptr<connection> conn;
- std::vector<std::string> vals{"0", "", "0", "0", env->redisHost,
+ std::vector<std::string> vals{"0", "", "0", "0", "0", env->redisHost,
"testName", "testBucket", "", "0", env->redisHost};
- std::vector<std::string> fields{"blockID", "version", "size", "globalWeight", "blockHosts",
- "objName", "bucketName", "creationTime", "dirty", "objHosts"};
+ std::vector<std::string> fields{"blockID", "version", "dirtyBlock", "size", "globalWeight", "blockHosts",
+ "objName", "bucketName", "creationTime", "dirtyObj", "objHosts"};
};
void rethrow(std::exception_ptr eptr) {
TEST_F(ObjectDirectoryFixture, SetYield)
{
boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
- ASSERT_EQ(0, dir->set(obj, yield));
+ ASSERT_EQ(0, dir->set(env->dpp, obj, yield));
boost::system::error_code ec;
request req;
TEST_F(ObjectDirectoryFixture, GetYield)
{
boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
- ASSERT_EQ(0, dir->set(obj, yield));
+ ASSERT_EQ(0, dir->set(env->dpp, obj, yield));
{
boost::system::error_code ec;
EXPECT_EQ(std::get<0>(resp).value(), 0);
}
- ASSERT_EQ(0, dir->get(obj, yield));
+ ASSERT_EQ(0, dir->get(env->dpp, obj, yield));
EXPECT_EQ(obj->objName, "newoid");
{
TEST_F(ObjectDirectoryFixture, CopyYield)
{
boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
- ASSERT_EQ(0, dir->set(obj, yield));
- ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", yield));
+ ASSERT_EQ(0, dir->set(env->dpp, obj, yield));
+ ASSERT_EQ(0, dir->copy(env->dpp, obj, "copyTestName", "copyBucketName", yield));
boost::system::error_code ec;
request req;
TEST_F(ObjectDirectoryFixture, DelYield)
{
boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
- ASSERT_EQ(0, dir->set(obj, yield));
+ ASSERT_EQ(0, dir->set(env->dpp, obj, yield));
{
boost::system::error_code ec;
EXPECT_EQ(std::get<0>(resp).value(), 1);
}
- ASSERT_EQ(0, dir->del(obj, yield));
+ ASSERT_EQ(0, dir->del(env->dpp, obj, yield));
{
boost::system::error_code ec;
TEST_F(ObjectDirectoryFixture, UpdateFieldYield)
{
boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
- ASSERT_EQ(0, dir->set(obj, yield));
- ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", yield));
- ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", yield));
+ ASSERT_EQ(0, dir->set(env->dpp, obj, yield));
+ ASSERT_EQ(0, dir->update_field(env->dpp, obj, "objName", "newTestName", yield));
+ ASSERT_EQ(0, dir->update_field(env->dpp, obj, "objHosts", "127.0.0.1:5000", yield));
boost::system::error_code ec;
request req;
EXPECT_EQ(std::get<0>(resp).value(), 1);
auto copyVals = vals;
- copyVals[5] = "copyTestName";
- copyVals[6] = "copyBucketName";
+ copyVals[6] = "copyTestName";
+ copyVals[7] = "copyBucketName";
EXPECT_EQ(std::get<1>(resp).value(), copyVals);
conn->cancel();
TEST_F(BlockDirectoryFixture, RemoveHostYield)
{
boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
- block->hostsList.push_back("127.0.0.1:6000");
+ block->hostsList.insert("127.0.0.1:6000");
ASSERT_EQ(0, dir->set(env->dpp, block, optional_yield{yield}));
ASSERT_EQ(0, dir->remove_host(env->dpp, block, "127.0.0.1:6379", optional_yield{yield}));
}
}
} else if (!exists) { /* No remote copy */
- block->hostsList.push_back(dir->cct->_conf->rgw_d4n_l1_datacache_address);
+ block->hostsList.insert(env->dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
if (dir->set(env->dpp, block, y) < 0)
return -1;
block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */
block->hostsList.clear();
block->cacheObj.hostsList.clear();
- block->hostsList.push_back("127.0.0.1:6000");
- block->cacheObj.hostsList.push_back("127.0.0.1:6000");
+ block->hostsList.insert("127.0.0.1:6000");
+ block->cacheObj.hostsList.insert("127.0.0.1:6000");
ASSERT_EQ(0, dir->set(env->dpp, block, optional_yield{yield}));