(ordered listing) and list-object-versions.
Changes include the addition of an ordered set for
each bucket that stores the object, addition of etag
and object size to CacheObj structure needed for listing
objects, and addition of a test case to see the usage of
pipelining in Redis. Objects are fetched from the cache
and the backend store and are then merged, before returning
the final list.
Co-authored-by: sinashan <sinatak1373@live.com>
Added code for bucket listing using ObjectDirectory(this
has been removed)
Co-authored-by: Samarah <samarah.uriarte@ibm.com>
Fixed log levels in BucketDirectory and ObjectDirectory
methods. Added enum for field retrieval in Directory get
methods.
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
}
}
+int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZADD", bucket_id, "CH", std::to_string(0), member);
+
+ response<std::string> resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (!multi) {
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+ return -ENOENT;
+ }
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+
+}
+
+int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZREM", bucket_id, member);
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (!multi) {
+ if (std::get<0>(resp).value() != "1") {
+ ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ return -ENOENT;
+ }
+ }
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BucketDirectory::zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector<std::string>& members, optional_yield y)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+ if (offset == 0 && count == 0) {
+ req.push("ZRANGE", bucket_id, start, stop, "bylex");
+ } else {
+ req.push("ZRANGE", bucket_id, start, stop, "bylex", "LIMIT", offset, count);
+ }
+
+ response<std::vector<std::string> > resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ if (std::get<0>(resp).value().empty()) {
+ ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Empty response" << dendl;
+ return -ENOENT;
+ }
+
+ members = std::get<0>(resp).value();
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BucketDirectory::zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector<std::string>& members, uint64_t next_cursor, optional_yield y)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+
+ req.push("ZSCAN", bucket_id, cursor, "MATCH", pattern, "COUNT", count);
+
+ boost::redis::generic_response resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ std::vector<boost::redis::resp3::basic_node<std::__cxx11::basic_string<char> > > root_array;
+ if (resp.has_value()) {
+ root_array = resp.value();
+ ldpp_dout(dpp, 20) << "BucketDirectory::" << __func__ << "() aggregate size is: " << root_array.size() << dendl;
+ auto size = root_array.size();
+ if (size >= 2) {
+ //Nothing of interest at index 0, index 1 has the next cursor value
+ next_cursor = std::stoull(root_array[1].value);
+
+ //skip the first 3 values to get the actual member, score
+ for (uint64_t i = 3; i < size; i = i+2) {
+ members.emplace_back(root_array[i].value);
+ ldpp_dout(dpp, 20) << "BucketDirectory::" << __func__ << "() member is: " << root_array[i].value << dendl;
+ }
+ }
+ } else {
+ return -ENOENT;
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int BucketDirectory::zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y)
+{
+ try {
+ boost::system::error_code ec;
+ request req;
+
+ req.push("ZRANK", bucket_id, member);
+
+ response<int> resp;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ rank = std::get<0>(resp).value();
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
std::string ObjectDirectory::build_index(CacheObj* object)
{
return object->bucketName + "_" + object->objName;
if (!endpoint.empty())
endpoint.pop_back();
- redisValues.push_back(endpoint);
+ redisValues.push_back(endpoint);
+ redisValues.push_back("etag");
+ redisValues.push_back(object->etag);
+ redisValues.push_back("objSize");
+ redisValues.push_back(std::to_string(object->size));
+ redisValues.push_back("userId");
+ redisValues.push_back(object->user_id);
+ redisValues.push_back("displayName");
+ redisValues.push_back(object->display_name);
try {
boost::system::error_code ec;
fields.push_back("creationTime");
fields.push_back("dirty");
fields.push_back("hosts");
+ fields.push_back("etag");
+ fields.push_back("objSize");
+ fields.push_back("userId");
+ fields.push_back("displayName");
try {
boost::system::error_code ec;
}
if (std::get<0>(resp).value().empty()) {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values returned." << dendl;
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values returned." << dendl;
return -ENOENT;
}
- object->objName = std::get<0>(resp).value()[0];
- object->bucketName = std::get<0>(resp).value()[1];
- object->creationTime = std::get<0>(resp).value()[2];
- object->dirty = (std::stoi(std::get<0>(resp).value()[3]) != 0);
- boost::split(object->hostsList, std::get<0>(resp).value()[4], boost::is_any_of("_"));
+ using Fields = rgw::d4n::ObjectFields;
+ object->objName = std::get<0>(resp).value()[std::size_t(Fields::ObjName)];
+ object->bucketName = std::get<0>(resp).value()[std::size_t(Fields::BucketName)];
+ object->creationTime = std::get<0>(resp).value()[std::size_t(Fields::CreationTime)];
+ object->dirty = (std::stoi(std::get<0>(resp).value()[std::size_t(Fields::Dirty)]) != 0);
+ boost::split(object->hostsList, std::get<0>(resp).value()[std::size_t(Fields::Hosts)], boost::is_any_of("_"));
+ object->etag = std::get<0>(resp).value()[std::size_t(Fields::Etag)];
+ object->size = std::stoull(std::get<0>(resp).value()[std::size_t(Fields::ObjSize)]);
+ object->user_id = std::get<0>(resp).value()[std::size_t(Fields::UserID)];
+ object->display_name = std::get<0>(resp).value()[std::size_t(Fields::DisplayName)];
} catch (std::exception &e) {
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) {
return 0;
} else {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values copied." << dendl;
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values copied." << dendl;
return -ENOENT;
}
} catch (std::exception &e) {
redis_exec(conn, ec, req, resp, y);
if (!std::get<0>(resp).value()) {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values deleted." << dendl;
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values deleted." << dendl;
return -ENOENT;
}
if (!multi) {
if (std::get<0>(resp).value() != "1") {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl;
return -ENOENT;
}
}
}
if (std::get<0>(resp).value().empty()) {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl;
return -ENOENT;
}
return 0;
}
-int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y)
+int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& start, const std::string& stop, std::vector<std::string>& members, optional_yield y)
{
std::string key = build_index(object);
try {
boost::system::error_code ec;
request req;
- req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop));
+ req.push("ZREVRANGE", key, start, stop);
response<std::vector<std::string> > resp;
redis_exec(conn, ec, req, resp, y);
if (!multi) {
if (std::get<0>(resp).value() != "1") {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl;
return -ENOENT;
}
}
if (!multi) {
if (std::get<0>(resp).value() == "0") {
- ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
+ ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl;
return -ENOENT;
}
}
return value;
}
+int ObjectDirectory::zrank(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, std::string& index, optional_yield y)
+{
+ std::string key = build_index(object);
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("ZRANK", key, member);
+ response<std::string> resp;
+
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+
+ index = std::get<0>(resp).value();
+
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+ return 0;
+}
+
std::string BlockDirectory::build_index(CacheBlock* block)
{
return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
hosts.pop_back();
redisValues.push_back(hosts);
+ redisValues.push_back("etag");
+ redisValues.push_back(block->cacheObj.etag);
+ redisValues.push_back("objSize");
+ redisValues.push_back(std::to_string(block->cacheObj.size));
+ redisValues.push_back("userId");
+ redisValues.push_back(block->cacheObj.user_id);
+ redisValues.push_back("displayName");
+ redisValues.push_back(block->cacheObj.display_name);
try {
boost::system::error_code ec;
return 0;
}
+template<typename T, typename Seq>
+struct expander;
+
+template<typename T, std::size_t... Is>
+struct expander<T, std::index_sequence<Is...>> {
+template<typename E, std::size_t>
+using elem = E;
+
+using type = boost::redis::response<elem<T, Is>...>;
+};
+
+template <size_t N, class Type>
+struct redis_response
+{
+ using type = typename expander<Type, std::make_index_sequence<N>>::type;
+};
+
+template <typename Integer, Integer ...I, typename F>
+constexpr void constexpr_for_each(std::integer_sequence<Integer, I...>, F &&func)
+{
+ (func(std::integral_constant<Integer, I>{}) , ...);
+}
+
+template <auto N, typename F>
+constexpr void constexpr_for(F &&func)
+{
+ if constexpr (N > 0)
+ {
+ constexpr_for_each(std::make_integer_sequence<decltype(N), N>{}, std::forward<F>(func));
+ }
+}
+
+template <typename T>
+void parse_response(T t, std::vector<std::vector<std::string>>& responses)
+{
+ constexpr_for<std::tuple_size_v<T>>([&](auto index)
+ {
+ std::vector<std::string> empty_vector;
+ constexpr auto i = index.value;
+ if (std::get<i>(t).value().has_value()) {
+ if (std::get<i>(t).value().value().empty()) {
+ responses.emplace_back(empty_vector);
+ } else {
+ responses.emplace_back(std::get<i>(t).value().value());
+ }
+ } else {
+ responses.emplace_back(empty_vector);
+ }
+ });
+}
+
+int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y)
+{
+ request req;
+ redis_response<100, std::optional<std::vector<std::string>>>::type resp;
+ for (auto block : blocks) {
+ std::string key = build_index(&block);
+ std::vector<std::string> fields;
+ ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl;
+
+ fields.push_back("blockID");
+ fields.push_back("version");
+ fields.push_back("deleteMarker");
+ fields.push_back("size");
+ fields.push_back("globalWeight");
+
+ fields.push_back("objName");
+ fields.push_back("bucketName");
+ fields.push_back("creationTime");
+ fields.push_back("dirty");
+ fields.push_back("hosts");
+ fields.push_back("etag");
+ fields.push_back("objSize");
+ fields.push_back("userId");
+ fields.push_back("displayName");
+
+ try {
+ req.push_range("HMGET", key, fields);
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+ } //end - for
+
+ try {
+ boost::system::error_code ec;
+ redis_exec(conn, ec, req, resp, y);
+
+ if (ec) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
+ return -ec.value();
+ }
+ } catch (std::exception &e) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
+ return -EINVAL;
+ }
+
+ std::vector<std::vector<std::string>> responses;
+ parse_response<decltype(resp)>(resp, responses);
+
+ for (size_t i = 0; i < blocks.size(); i++) {
+ CacheBlock* block = &blocks[i];
+ auto vec = responses[i];
+ if (vec.empty()) {
+ ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values returned for key=" << build_index(block) << dendl;
+ continue;
+ }
+
+ using Fields = rgw::d4n::BlockFields;
+ block->blockID = std::stoull(vec[std::size_t(Fields::BlockID)]);
+ block->version = vec[std::size_t(Fields::Version)];
+ block->deleteMarker = (std::stoi(vec[std::size_t(Fields::DeleteMarker)]) != 0);
+ block->size = std::stoull(vec[std::size_t(Fields::Size)]);
+ block->globalWeight = std::stoull(vec[std::size_t(Fields::GlobalWeight)]);
+ block->cacheObj.objName = vec[std::size_t(Fields::ObjName)];
+ block->cacheObj.bucketName = vec[std::size_t(Fields::BucketName)];
+ block->cacheObj.creationTime = vec[std::size_t(Fields::CreationTime)];
+ block->cacheObj.dirty = (std::stoi(vec[std::size_t(Fields::Dirty)]) != 0);
+ boost::split(block->cacheObj.hostsList, vec[std::size_t(Fields::Hosts)], boost::is_any_of("_"));
+ block->cacheObj.etag = vec[std::size_t(Fields::Etag)];
+ block->cacheObj.size = std::stoull(vec[std::size_t(Fields::ObjSize)]);
+ block->cacheObj.user_id = vec[std::size_t(Fields::UserID)];
+ block->cacheObj.display_name = vec[std::size_t(Fields::DisplayName)];
+ }
+
+ return 0;
+}
+
int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y)
{
std::string key = build_index(block);
fields.push_back("creationTime");
fields.push_back("dirty");
fields.push_back("hosts");
+ fields.push_back("etag");
+ fields.push_back("objSize");
+ fields.push_back("userId");
+ fields.push_back("displayName");
try {
boost::system::error_code ec;
block->cacheObj.creationTime = std::get<0>(resp).value().value()[7];
block->cacheObj.dirty = (std::stoi(std::get<0>(resp).value().value()[8]) != 0);
boost::split(block->cacheObj.hostsList, std::get<0>(resp).value().value()[9], boost::is_any_of("_"));
+ block->cacheObj.etag = std::get<0>(resp).value().value()[10];
+ block->cacheObj.size = std::stoull(std::get<0>(resp).value().value()[11]);
+ block->cacheObj.user_id = std::get<0>(resp).value().value()[12];
+ block->cacheObj.display_name = std::get<0>(resp).value().value()[13];
} catch (std::exception &e) {
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl;
return -EINVAL;
using boost::redis::response;
using boost::redis::ignore_t;
+enum class ObjectFields { // Fields stored in object directory
+ ObjName,
+ BucketName,
+ CreationTime,
+ Dirty,
+ Hosts,
+ Etag,
+ ObjSize,
+ UserID,
+ DisplayName
+};
+
+enum class BlockFields { // Fields stored in block directory
+ BlockID,
+ Version,
+ DeleteMarker,
+ Size,
+ GlobalWeight,
+ ObjName,
+ BucketName,
+ CreationTime,
+ Dirty,
+ Hosts,
+ Etag,
+ ObjSize,
+ UserID,
+ DisplayName
+};
+
struct CacheObj {
std::string objName; /* S3 object name */
std::string bucketName; /* S3 bucket name */
std::string creationTime; /* Creation time of the S3 Object */
bool dirty{false};
std::unordered_set<std::string> hostsList; /* List of hostnames <ip:port> of object locations for multiple backends */
+ std::string etag; //etag needed for list objects
+ uint64_t size; //total object size (and not block size), needed for list objects
+ std::string user_id; // id of user, needed for list object versions
+ std::string display_name; // display name of owner, needed for list object versions
};
struct CacheBlock {
Directory() {}
};
+class BucketDirectory: public Directory {
+ public:
+ BucketDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
+ int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi=false);
+ int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi=false);
+ int zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector<std::string>& members, optional_yield y);
+ int zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector<std::string>& members, uint64_t next_cursor, optional_yield y);
+ int zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y);
+
+ private:
+ std::shared_ptr<connection> conn;
+};
+
class ObjectDirectory: public Directory {
public:
ObjectDirectory(std::shared_ptr<connection>& conn) : conn(conn) {}
int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y);
int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi=false);
int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
- int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector<std::string>& members, optional_yield y);
+ int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& start, const std::string& stop, std::vector<std::string>& members, optional_yield y);
int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi=false);
int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi=false);
+ int zrank(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, std::string& index, optional_yield y);
//Return value is the incremented value, else return error
int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y);
int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y);
+ //Pipelined version of get for list bucket
+ int get(const DoutPrefixProvider* dpp, std::vector<CacheBlock>& blocks, optional_yield y);
int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y);
int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false);
int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y);
if (op_ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
} else {
- // if this entry is the latest, it could have been overwritten by a newer one
+ // if this entry is not the latest, it could have been overwritten by a newer one
if (block.version == e->version) {
rgw::d4n::CacheBlock null_block;
null_block = block;
//hash entry for null block
op_ret = blockDir->get(dpp, &null_block, y);
if (op_ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl;
} else {
if (null_block.version == e->version) {
block.cacheObj.dirty = false;
ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
}
if (latest_block.version == e->version) {
- //remove object entry from ordered set
+ //remove object entry from ordered set of versions
if (c_obj->have_instance()) {
- blockDir->del(dpp, &latest_block, y);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
- continue;
+ blockDir->del(dpp, &latest_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ continue;
}
}
+ //delete entry from ordered set of objects, as older versions would have been written to the backend store
+ ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y, true);
+ if (ret < 0) {
+ blockDir->discard(dpp, y);
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue zrem for object entry: " << c_obj->get_name() << ", ret=" << ret << dendl;
+ continue;
+ }
}
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
}
void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
-const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)
+const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val)
{
const std::lock_guard l(lru_lock);
ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key);
std::shared_ptr<connection> conn;
BlockDirectory* blockDir;
ObjectDirectory* objDir;
+ BucketDirectory* bucketDir;
rgw::cache::CacheDriver* cacheDriver;
std::optional<asio::steady_timer> rthread_timer;
rgw::sal::Driver* driver;
{
blockDir = new BlockDirectory{conn};
objDir = new ObjectDirectory{conn};
+ bucketDir = new BucketDirectory{conn};
}
~LFUDAPolicy() {
rthread_stop();
+ delete bucketDir;
delete blockDir;
delete objDir;
std::lock_guard l(lfuda_cleaning_lock);
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
virtual bool update_refcount_if_key_exists(const DoutPrefixProvider* dpp, const std::string& key, uint8_t op, optional_yield y) override { return false; }
virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
- virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
- time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
- const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
+ virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
+ time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+ const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override { return false; }
conn = std::make_shared<connection>(boost::asio::make_strand(io_context));
objDir = std::make_unique<rgw::d4n::ObjectDirectory>(conn);
blockDir = std::make_unique<rgw::d4n::BlockDirectory>(conn);
+ bucketDir = std::make_unique<rgw::d4n::BucketDirectory>(conn);
policyDriver = std::make_unique<rgw::d4n::PolicyDriver>(conn,
cacheDriver.get(),
"lfuda");
return next->create(dpp, params, y);
}
+int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int max,
+ ListResults& results, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.marker.name: " << params.marker.name << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.marker.instance: " << params.marker.instance << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.end_marker.key: " << params.end_marker.name << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " max: " << max << dendl;
+
+ if (max == 0) {
+ return 0;
+ }
+
+ //Get objects from cache
+ auto bucketDir = this->filter->get_bucket_dir();
+ auto objDir = this->filter->get_obj_dir();
+ std::vector<rgw_obj_key> objects;
+ std::vector<rgw_bucket_list_entries> entries;
+
+ ListResults cache_results;
+ ListResults store_results;
+ if (g_conf()->d4n_writecache_enabled) {
+ cache_results.is_truncated = false;
+ if (!params.prefix.empty()) {
+ std::string pattern = params.prefix + "*";
+ /* zscan does not always take into account COUNT as smaller sizes of ordered set are stored as one sequential blob of memory
+ so the entire set might be returned regarless of value of COUNT.
+ also valid values of cursor are only zero during start of iteration and cursor returned by previous zscan call
+ Refer: https://valkey.io/commands/scan/ */
+ uint64_t cursor = 0, next_cursor = 0;
+ int num_objs = 0;
+ bool instance_marker_processed = false;
+ std::string last_version;
+ do {
+ std::vector<std::string> temp_objects;
+ auto ret = bucketDir->zscan(dpp, this->get_bucket_id(), cursor, pattern, (max + 1), temp_objects, next_cursor, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zscan failed with ret: " << ret << dendl;
+ return ret;
+ }
+ //filter elements before marker (exclude marker from output)
+ std::string last_element_processed;
+ for (auto it = temp_objects.begin(); it != temp_objects.end(); it++) {
+ last_element_processed = *it;
+ if (!params.marker.name.empty() && *it <= params.marker.name) {
+ if (*it != params.marker.name || !params.list_versions || params.marker.instance.empty()) {
+ continue;
+ }
+ } else {
+ auto pos = it->find(params.delim, params.prefix.length());
+ if (!params.delim.empty() && pos != std::string::npos) {
+ std::string delim_str = it->erase((pos + 1), (it->length() - 1));
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " delim_str: " << delim_str << dendl;
+ if (cache_results.common_prefixes.find(delim_str) == cache_results.common_prefixes.end()) {
+ cache_results.common_prefixes.emplace(std::make_pair(delim_str, true));
+ store_results.common_prefixes.emplace(std::make_pair(delim_str, true));
+ num_objs++; //all objects under a common prefix are counted as one
+ }
+ } else {
+ std::vector<std::string> temp_versions;
+ //if params.list_versions is given, get the versions of the object
+ if (params.list_versions) {
+ std::string objName = *it;
+ // special handling for name starting with '_'
+ if (objName[0] == '_') {
+ objName = "_" + *it;
+ }
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = objName,
+ .bucketName = this->get_bucket_id(),
+ };
+ std::string start;
+ if (params.marker.instance.empty() || instance_marker_processed || params.marker.name.empty()) {
+ start = "0";
+ } else {
+ if (!instance_marker_processed) {
+ std::string member = params.marker.instance;
+ std::string index;
+ auto ret = objDir->zrank(dpp, &dir_obj, member, index, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrank failed with: " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " start: " << index << dendl;
+ start = index;
+ instance_marker_processed = true;
+ }
+ }
+ auto ret = objDir->zrevrange(dpp, &dir_obj, start, "-1", temp_versions, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrevrange failed with ret: " << ret << dendl;
+ return ret;
+ }
+ } //params.list_version
+ if (!temp_versions.empty()) {
+ for (auto version_it = temp_versions.begin(); version_it != temp_versions.end(); version_it++) {
+ std::string version = *(version_it);
+ if (std::next(version_it) == temp_versions.end()) {
+ last_version = *version_it;
+ }
+ rgw_bucket_list_entries entry;
+ entry.flags = rgw_bucket_dir_entry::FLAG_VER;
+ if (version_it == temp_versions.begin()) {
+ entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+ } else {
+ entry.flags |= rgw_bucket_dir_entry::FLAG_VER_MARKER;
+ }
+ rgw_obj_key key{std::move(*it), std::move(*version_it)};
+ entry.key = std::move(key);
+ entries.emplace_back(entry);
+ num_objs++;
+ if (num_objs == max) {
+ if (std::next(version_it) != temp_versions.end()) {
+ cache_results.is_truncated = true;
+ cache_results.next_marker.instance = version;
+ cache_results.next_marker.name = last_element_processed;
+ }
+ break; //break from the 'for' loop that processes temp_versions
+ }
+ }
+ } else {
+ rgw_bucket_list_entries entry;
+ entry.key.name = std::move(*it);
+ entries.emplace_back(entry);
+ num_objs++;
+ }
+ }
+ }
+ if (num_objs == max) {
+ if (std::next(it) != temp_objects.end()) {
+ if (cache_results.next_marker.empty()) {
+ cache_results.is_truncated = true;
+ cache_results.next_marker.name = last_element_processed;
+ if (cache_results.next_marker.instance.empty() && params.list_versions) {
+ cache_results.next_marker.instance = last_version;
+ }
+ }
+ break; //break from the 'for' loop that processes temp_objects
+ }
+ }
+ }
+ /* break if next_cursor is 0 which means end of ordered set or if no entries are found in ordered set
+ or if num_objs after filtering is equal to max */
+ if ((next_cursor == 0) || (ret == -ENOENT) || (num_objs == max)) {
+ if ((num_objs == max) && (next_cursor != 0)) {
+ if (cache_results.next_marker.empty()) {
+ cache_results.is_truncated = true;
+ cache_results.next_marker.name = entries[(max - 1)].key.name;
+ }
+ }
+ break;
+ }
+ cursor = next_cursor;
+ } while(next_cursor != 0);
+ } else { //no prefix is specified
+ std::string start;
+ if (params.marker.name.empty()) {
+ start = "-";
+ } else {
+ if (!params.marker.instance.empty()) {
+ start = "[" + params.marker.name;
+ } else {
+ start = "(" + params.marker.name;
+ }
+ }
+ int num_objs = 0;
+ bool instance_marker_processed = false;
+ std::string last_version;
+ do {
+ std::vector<std::string> temp_objects;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " zrange start is: " << start << dendl;
+ auto ret = bucketDir->zrange(dpp, this->get_bucket_id(), start, "+", 0, (max + 1), temp_objects, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrange failed with ret: " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.delim: " << params.delim << dendl;
+ std::string last_element_processed;
+ for (auto it = temp_objects.begin(); it != temp_objects.end(); it++) {
+ last_element_processed = *it;
+ auto pos = it->find(params.delim);
+ if (!params.delim.empty() && pos != std::string::npos) {
+ std::string delim_str = it->erase((pos + 1), (it->length() - 1));
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " delim_str: " << delim_str << dendl;
+ if (cache_results.common_prefixes.find(delim_str) == cache_results.common_prefixes.end()) {
+ cache_results.common_prefixes.emplace(std::make_pair(delim_str, true));
+ store_results.common_prefixes.emplace(std::make_pair(delim_str, true));
+ num_objs++; //all objects under a common prefix are counted as one
+ if (num_objs == max) {
+ uint64_t cursor = 0, next_cursor = 0;
+ //get all the keys matching with delim_str so as to find the last element which will be the next marker
+ do {
+ std::vector<std::string> delim_objects;
+ std::string delim_pattern = delim_str + "*";
+ auto ret = bucketDir->zscan(dpp, this->get_bucket_id(), cursor, delim_pattern, 10, delim_objects, next_cursor, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " zscan failed with ret: " << ret << dendl;
+ return ret;
+ }
+ if ((next_cursor == 0) || (ret == -ENOENT)) {
+ if (!delim_objects.empty()) {
+ cache_results.next_marker.name = delim_objects.back();
+ std::string start = "(" + cache_results.next_marker.name;
+ std::vector<std::string> one_object;
+ auto ret = bucketDir->zrange(dpp, this->get_bucket_id(), start, "+", 0, 1, one_object, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " zrange failed with ret: " << ret << dendl;
+ return ret;
+ }
+ if (one_object.size() >= 1) {
+ cache_results.is_truncated = true;
+ } else {
+ cache_results.is_truncated = false;
+ }
+ }
+ break;
+ }
+ cursor = next_cursor;
+ next_cursor = 0;
+ } while(next_cursor != 0);
+ } //end-if num_objs == max
+ } //end-if cache_results.common_prefixes
+ } else {
+ std::vector<std::string> temp_versions;
+ //if params.list_versions is given, get the versions of the object
+ if (params.list_versions) {
+ std::string objName = *it;
+ // special handling for name starting with '_'
+ if (objName[0] == '_') {
+ objName = "_" + *it;
+ }
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = objName,
+ .bucketName = this->get_bucket_id(),
+ };
+ std::string start;
+ if (params.marker.instance.empty() || instance_marker_processed || params.marker.name.empty()) {
+ start = "0";
+ } else {
+ if (!instance_marker_processed) {
+ std::string member = params.marker.instance;
+ std::string index;
+ auto ret = objDir->zrank(dpp, &dir_obj, member, index, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrank failed with: " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " start: " << index << dendl;
+ start = index;
+ instance_marker_processed = true;
+ }
+ }
+ auto ret = objDir->zrevrange(dpp, &dir_obj, start, "-1", temp_versions, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrevrange failed with ret: " << ret << dendl;
+ return ret;
+ }
+ } //params.list_version
+ if (!temp_versions.empty()) {
+ for (auto version_it = temp_versions.begin(); version_it != temp_versions.end(); version_it++) {
+ std::string version = *(version_it);
+ if (std::next(version_it) == temp_versions.end()) {
+ last_version = *version_it;
+ }
+ rgw_bucket_list_entries entry;
+ entry.flags = rgw_bucket_dir_entry::FLAG_VER;
+ if (version_it == temp_versions.begin()) {
+ entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT;
+ } else {
+ entry.flags |= rgw_bucket_dir_entry::FLAG_VER_MARKER;
+ }
+ rgw_obj_key key{std::move(*it), std::move(*version_it)};
+ entry.key = std::move(key);
+ entries.emplace_back(entry);
+ num_objs++;
+ if (num_objs == max) {
+ if (std::next(version_it) != temp_versions.end()) {
+ cache_results.is_truncated = true;
+ cache_results.next_marker.instance = version;
+ cache_results.next_marker.name = *it;
+ }
+ break; //break from the 'for' loop that processes temp_versions
+ }
+ }
+ } else {
+ rgw_bucket_list_entries entry;
+ entry.key.name = std::move(*it);
+ entries.emplace_back(entry);
+ num_objs++;
+ }
+ }
+ if (num_objs == max) {
+ if (std::next(it) != temp_objects.end()) {
+ //could have been set due to delimiter processing
+ if (cache_results.next_marker.name.empty()) {
+ cache_results.is_truncated = true;
+ cache_results.next_marker.name = last_element_processed;
+ if (cache_results.next_marker.instance.empty() && params.list_versions) {
+ cache_results.next_marker.instance = last_version;
+ }
+ }
+ }
+ break; //break from the 'for' loop that processes temp_objects
+ }
+ } //for - processes temp_objects
+ //break from while loop if max+1 elements have been found or there are no more elements in the ordered set
+ if ((num_objs == max) || (ret == -ENOENT) || temp_objects.empty()) {
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " Breaking out! " << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " num_objs " << num_objs << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " ret " << ret << dendl;
+ break;
+ }
+ //adjust start to begin from the last element of objects, and exclude that element
+ start = "(" + last_element_processed;
+ } while(num_objs <= max);
+ } //end - else
+
+ rgw::d4n::BlockDirectory* blockDir = this->filter->get_block_dir();
+ auto remainder_size = entries.size();
+ size_t j = 0, start_j = 0;
+ while (remainder_size > 0) {
+ std::vector<rgw::d4n::CacheBlock> blocks(100);
+ start_j = j;
+ size_t batch_size = std::min(static_cast<size_t>(100), remainder_size);
+ for (size_t i = 0; i < batch_size; i++) {
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " objects[j]: " << entries[j].key.name << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " remainder_size: " << remainder_size << dendl;
+ if (entries[j].key.instance == "null") {
+ blocks[i].cacheObj.objName = "_:null_" + entries[j].key.name;
+ } else {
+ blocks[i].cacheObj.objName = entries[j].key.get_oid();
+ }
+ blocks[i].cacheObj.bucketName = this->get_bucket_id();
+ ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " blocks[i].cacheObj.objName: " << blocks[i].cacheObj.objName << dendl;
+ j++;
+ }
+ auto ret = blockDir->get(dpp, blocks, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " blockDir->get() returned error: " << ret << dendl;
+ return ret;
+ }
+
+ for (auto block : blocks) {
+ if (block.cacheObj.objName.empty()) {
+ start_j++;
+ continue;
+ }
+ rgw_bucket_dir_entry entry;
+ entry.key.name = entries[start_j].key.name;
+ // special handling for name starting with '_'
+ if (entry.key.name[0] == '_') {
+ entry.key.name = "_" + entries[start_j].key.name;
+ }
+ entry.key.instance = entries[start_j].key.instance;
+ entry.flags = entries[start_j].flags;
+ if (block.deleteMarker) {
+ entry.flags |= rgw_bucket_dir_entry::FLAG_DELETE_MARKER;
+ }
+ entry.meta.storage_class = "CACHE";
+ entry.meta.size = block.cacheObj.size;
+ entry.meta.accounted_size = block.cacheObj.size;
+ struct std::tm tm;
+ std::istringstream ss(block.cacheObj.creationTime);
+ ss >> std::get_time(&tm, "%H:%M:%S");
+ std::time_t creationTime = mktime(&tm);
+ entry.meta.mtime = ceph::real_clock::from_time_t(creationTime);
+ entry.meta.etag = block.cacheObj.etag;
+ entry.meta.owner = block.cacheObj.user_id;
+ entry.meta.owner_display_name = block.cacheObj.display_name;
+ cache_results.objs.emplace_back(entry);
+ start_j++;
+ }
+
+ if (remainder_size <= 100) {
+ remainder_size = 0;
+ } else {
+ remainder_size = remainder_size - 100;
+ }
+ }
+ } //d4n_write_cache_enabled = true
+
+ //Get objects from backend store
+ auto ret = next->list(dpp, params, max, store_results, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " next->list returned: " << ret << dendl;
+ return ret;
+ }
+
+ //return store results if cache results is empty
+ if (cache_results.objs.empty() && !store_results.objs.empty()) {
+ results = std::move(store_results);
+ return 0;
+ }
+
+ //return cache results if store results is empty
+ if (store_results.objs.empty() && !cache_results.objs.empty()) {
+ results = std::move(cache_results);
+ return 0;
+ }
+
+ //if both lists are non-empty then merge them, as they are already sorted
+ if (!cache_results.objs.empty() && !store_results.objs.empty()) {
+ std::vector<rgw_bucket_dir_entry> objs;
+ objs.reserve(max);
+
+ size_t i = 0, j = 0;
+ int elementsAdded = 0;
+
+ // Compare elements from both vectors and merge in sorted order
+ while (elementsAdded < max && i < cache_results.objs.size() && j < store_results.objs.size()) {
+ std::string key_name_in_cache = cache_results.objs[i].key.name;
+ std::string key_name_in_store = store_results.objs[j].key.name;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " key_name_in_cache: " << key_name_in_cache << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " key_name_in_store: " << key_name_in_store << dendl;
+ std::string key_instance_in_cache = cache_results.objs[i].key.instance;
+ std::string key_instance_in_store = store_results.objs[j].key.instance;
+ if (key_name_in_cache == key_name_in_store) {
+ objs.push_back(cache_results.objs[i]);
+ i++;
+ //if list versions is not given or if instance values are different then skip the store result
+ if (!params.list_versions || key_instance_in_cache == key_instance_in_store) {
+ j++;
+ }
+ }
+ else if (key_name_in_cache < key_name_in_store) {
+ objs.push_back(cache_results.objs[i]);
+ i++;
+ } else {
+ objs.push_back(store_results.objs[j]);
+ j++;
+ }
+ elementsAdded++;
+ }
+
+ while (elementsAdded < max && i < cache_results.objs.size()) {
+ objs.push_back(cache_results.objs[i]);
+ i++;
+ elementsAdded++;
+ }
+
+ while (elementsAdded < max && j < store_results.objs.size()) {
+ objs.push_back(store_results.objs[j]);
+ j++;
+ elementsAdded++;
+ }
+
+ //there are elements in cache_results
+ if (i < (cache_results.objs.size() - 1)) {
+ results.is_truncated = true;
+ results.next_marker.name = cache_results.objs[(i - 1)].key.to_string();
+ }
+
+ //there are elements in store_results
+ if (j < (store_results.objs.size() - 1)) {
+ results.is_truncated = true;
+ results.next_marker.name = store_results.objs[(j - 1)].key.to_string();
+ }
+ results.objs = std::move(objs);
+ }
+
+ if (!store_results.common_prefixes.empty()) {
+ results.common_prefixes = std::move(store_results.common_prefixes);
+ //results next_marker is not set at this point which means result.objs is empty
+ if (results.next_marker.empty()) {
+ results.is_truncated = store_results.is_truncated | cache_results.is_truncated;
+ if (store_results.is_truncated && cache_results.is_truncated) {
+ if (cache_results.next_marker <= store_results.next_marker) {
+ results.next_marker = std::move(cache_results.next_marker);
+ } else {
+ results.next_marker = std::move(store_results.next_marker);
+ }
+ } else if (store_results.is_truncated && !cache_results.is_truncated) {
+ results.next_marker = std::move(store_results.next_marker);
+ } else if (!store_results.is_truncated && cache_results.is_truncated) {
+ results.next_marker = std::move(cache_results.next_marker);
+ }
+ }
+ }
+
+ return 0;
+}
+
std::unique_ptr<MultipartUpload> D4NFilterBucket::get_multipart_upload(
const std::string& oid,
std::optional<std::string> upload_id,
for "null" versions, when bucket is non-versioned.
3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended.
4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified
-5. Redis ordered set to maintain the order of dirty objects added for a version enabled bucket. Even when the bucket is non-versioned, this set maintains a "null" entry */
+5. Redis ordered set to maintain the order of dirty objects added for a version enabled bucket. Even when the bucket is non-versioned, this set maintains a "null" entry
+6. Another ordered set to maintain a lexicographically sorted order of objects for a bucket - used for bucket listing */
int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::vector<std::string>* exec_responses, optional_yield y, bool is_latest_version, bool dirty)
{
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl;
rgw::d4n::CacheBlock block;
rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
+ auto attrs = this->get_attrs();
+ bufferlist bl_etag, bl_acl;
+ auto etag_it = attrs.find(RGW_ATTR_ETAG);
+ if (etag_it != attrs.end()) {
+ bl_etag = etag_it->second;
+ }
+ auto etag = bl_etag.to_str();
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): etag: " << etag << dendl;
+
+ auto acl_it = attrs.find(RGW_ATTR_ACL);
+ RGWAccessControlPolicy policy;
+ std::string user_id, display_name;
+ if (acl_it != attrs.end()) {
+ auto bliter = acl_it->second.cbegin();
+ try {
+ policy.decode(bliter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): ERROR: could not decode policy, caught buffer::error" << dendl;
+ return -EIO;
+ }
+ ACLOwner owner = policy.get_owner();
+ rgw_user user = std::get<rgw_user>(owner.id);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): INFO: user_d: " << user.to_str() << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): INFO: display_name: " << owner.display_name << dendl;
+ user_id = user.to_str();
+ display_name = owner.display_name;
+ }
+
if (is_latest_version) {
std::string objName = this->get_name();
// special handling for name starting with '_'
.creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())),
.dirty = dirty,
.hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
+ .etag = etag,
+ .size = this->get_accounted_size(),
+ .user_id = user_id,
+ .display_name = display_name,
};
block.cacheObj = object;
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: " << std::setprecision(std::numeric_limits<double>::max_digits10) << score << ret << dendl;
rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir();
ret = objDir->zadd(dpp, &object, score, object_version, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add version to ordered set with error: " << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ //add an entry to ordered set containing objects for bucket listing, set score to 0 always to lexicographically order the objects
+ rgw::d4n::BucketDirectory* bucketDir = this->driver->get_bucket_dir();
+ ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, true);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl;
return ret;
.bucketName = this->get_bucket()->get_bucket_id(),
.creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())),
.dirty = dirty,
+ .etag = etag,
+ .size = this->get_accounted_size(),
+ .user_id = user_id,
+ .display_name = display_name,
};
version_object.hostsList.insert({ dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address });
cacheDriver.reset();
objDir.reset();
blockDir.reset();
+ bucketDir.reset();
policyDriver.reset();
next->shutdown();
bool objDirty = block.cacheObj.dirty;
auto blockDir = source->driver->get_block_dir();
auto objDir = source->driver->get_obj_dir();
+ auto bucketDir = source->driver->get_bucket_dir();
std::string version = source->get_object_version();
std::string objName = source->get_name();
// special handling for name starting with '_'
if (latest_block.version == block.version) {
std::vector<std::string> members;
//get the second latest version
- ret = objDir->zrevrange(dpp, &dir_obj, 0, 1, members, y);
+ ret = objDir->zrevrange(dpp, &dir_obj, std::to_string(0), std::to_string(1), members, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the second latest version for: " << dir_obj.objName << ", ret=" << ret << dendl;
return ret;
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete latest entry in block directory, when it is the same as version requested, for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
return ret;
}
+ //delete entry from ordered set of objects
+ ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl;
+ return ret;
+ }
}
} //end-if latest_block.version == block.version
//delete versioned entry (handles delete markers also)
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
return ret;
}
- //delete entry from ordered set
+ //delete entry from ordered set of versions
std::string version = source->get_instance();
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl;
ret = objDir->zrem(dpp, &dir_obj, version, y, true);
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
return ret;
}
- //dirty objects - delete from ordered set
+ //dirty objects - delete from ordered set of versions and objects
if (objDirty) {
rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
.objName = source->get_name(),
.bucketName = source->get_bucket()->get_bucket_id(),
};
+ //delete entry from ordered set of object versions
ret = objDir->zrem(dpp, &dir_obj, "null", y, true);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
return ret;
}
+ //delete entry from ordered set of objects
+ ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
}
} //end-if non-versioned buckets
std::unique_ptr<rgw::cache::CacheDriver> cacheDriver;
std::unique_ptr<rgw::d4n::ObjectDirectory> objDir;
std::unique_ptr<rgw::d4n::BlockDirectory> blockDir;
+ std::unique_ptr<rgw::d4n::BucketDirectory> bucketDir;
std::unique_ptr<rgw::d4n::PolicyDriver> policyDriver;
boost::asio::io_context& io_context;
uint64_t olh_epoch,
const std::string& unique_tag) override;
rgw::cache::CacheDriver* get_cache_driver() { return cacheDriver.get(); }
-
-
rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir.get(); }
rgw::d4n::BlockDirectory* get_block_dir() { return blockDir.get(); }
+ rgw::d4n::BucketDirectory* get_bucket_dir() { return bucketDir.get(); }
rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); }
void shutdown() override;
};
class D4NFilterBucket : public FilterBucket {
private:
+ struct rgw_bucket_list_entries{
+ rgw_obj_key key;
+ uint16_t flags;
+ };
D4NFilterDriver* filter;
public:
virtual ~D4NFilterBucket() = default;
virtual std::unique_ptr<Object> get_object(const rgw_obj_key& key) override;
+ virtual int list(const DoutPrefixProvider* dpp, ListParams& params, int max,
+ ListResults& results, optional_yield y) override;
virtual int create(const DoutPrefixProvider* dpp,
const CreateParams& params,
optional_yield y) override;
io.run();
}
+TEST_F(BlockDirectoryFixture, ZScan)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ boost::system::error_code ec;
+ {
+ request req;
+ response<std::string> resp;
+ req.push("ZADD", "myzset", "0", "v1");
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl;
+ }
+ {
+ request req;
+ response<std::string> resp;
+ req.push("ZADD", "myzset", "0", "v2");
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl;
+ }
+ {
+ request req;
+ req.push("ZSCAN", "myzset", 0, "MATCH", "v*", "COUNT", 2);
+
+ boost::redis::generic_response resp;
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+
+ std::vector<boost::redis::resp3::basic_node<std::__cxx11::basic_string<char> > > root_array;
+ if (resp.has_value()) {
+ root_array = resp.value();
+ std::cout << "ZADD aggregate size is: " << root_array.size() << std::endl;
+ auto size = root_array.size();
+ if (size >= 2) {
+ //Nothing of interest at index 0, index 1 has the next cursor value
+ std::string new_cursor = root_array[1].value;
+
+ //skip the first 3 values to get the actual member, score
+ for (uint64_t i = 3; i < size; i = i+2) {
+ std::string member = root_array[i].value;
+ std::cout << "ZADD member: " << member << std::endl;
+ }
+ }
+ }
+ }
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("FLUSHALL");
+ response<boost::redis::ignore_t> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+ }
+
+ conn->cancel();
+ }, rethrow);
+
+ io.run();
+}
+
+template<typename T, typename Seq>
+struct expander;
+
+template<typename T, std::size_t... Is>
+struct expander<T, std::index_sequence<Is...>> {
+ template<typename E, std::size_t>
+ using elem = E;
+
+ using type = boost::redis::response<elem<T, Is>...>;
+};
+
+template <size_t N, class Type>
+struct my_tuple
+{
+ using type = typename expander<Type, std::make_index_sequence<N>>::type;
+};
+
+template <typename Integer, Integer ...I, typename F>
+constexpr void constexpr_for_each(std::integer_sequence<Integer, I...>, F &&func)
+{
+ (func(std::integral_constant<Integer, I>{}) , ...);
+}
+
+template <auto N, typename F>
+constexpr void constexpr_for(F &&func)
+{
+ if constexpr (N > 0)
+ {
+ constexpr_for_each(std::make_integer_sequence<decltype(N), N>{}, std::forward<F>(func));
+ }
+}
+
+template <typename T>
+void foo(T t, std::vector<std::vector<std::string>>& responses)
+{
+ constexpr_for<std::tuple_size_v<T>>([&](auto index)
+ {
+ constexpr auto i = index.value;
+ std::vector<std::string> empty_vector;
+ if (std::get<i>(t).value().has_value()) {
+ if (std::get<i>(t).value().value().empty()) {
+ responses.emplace_back(empty_vector);
+ std::cout << "Empty value for i: " << i << std::endl;
+ } else {
+ responses.emplace_back(std::get<i>(t).value().value());
+ }
+ } else {
+ std::cout << "No value for i: " << i << std::endl;
+ responses.emplace_back(empty_vector);
+ }
+ });
+}
+
+TEST_F(BlockDirectoryFixture, Pipeline)
+{
+ boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+ boost::system::error_code ec;
+ {
+ request req;
+ response<boost::redis::ignore_t> resp;
+ req.push("HSET", "testkey1", "name", "abc");
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ }
+ {
+ request req;
+ response<boost::redis::ignore_t> resp;
+ req.push("HSET", "testkey2", "name", "def");
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ }
+ {
+ std::vector<std::string> fields;
+ fields.push_back("name");
+ request req;
+ req.push_range("HMGET", "testkey1", fields);
+ req.push_range("HMGET", "abc", fields);
+
+ ASSERT_EQ(req.get_commands(), 2);
+ //using template parameterization in case we need to read responses for large numebr of elements (1000 elements)
+ my_tuple<5, std::optional<std::vector<std::string>>>::type resp;
+ conn->async_exec(req, resp, yield[ec]);
+ ASSERT_EQ((bool)ec, false);
+ std::vector<std::vector<std::string>> responses;
+ foo<decltype(resp)>(resp, responses);
+ for (auto vec : responses) {
+ if (!vec.empty()) {
+ std::cout << "HMGET: " << vec[0] << std::endl;
+ }
+ }
+ }
+ {
+ boost::system::error_code ec;
+ request req;
+ req.push("FLUSHALL");
+ response<boost::redis::ignore_t> resp;
+
+ conn->async_exec(req, resp, yield[ec]);
+ }
+
+ conn->cancel();
+ }, rethrow);
+
+ io.run();
+}
+
int main(int argc, char *argv[]) {
::testing::InitGoogleTest(&argc, argv);