From: Pritha Srivastava Date: Fri, 23 Aug 2024 13:50:03 +0000 (-0400) Subject: rgw/d4n: implementation of list objects for a bucket X-Git-Tag: v20.3.0~8^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=576e3a92185f672b59d90fc79664dd607bb25757;p=ceph.git rgw/d4n: implementation of list objects for a bucket (ordered listing) and list-object-versions. Changes include the addition of an ordered set for each bucket that stores the object, addition of etag and object size to CacheObj structure needed for listing objects, and addition of a test case to see the usage of pipelining in Redis. Objects are fetched from the cache and the backend store and are then merged, before returning the final list. Co-authored-by: sinashan Added code for bucket listing using ObjectDirectory(this has been removed) Co-authored-by: Samarah Fixed log levels in BucketDirectory and ObjectDirectory methods. Added enum for field retrieval in Directory get methods. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 04a760d0f2ad..925d8ba3d705 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -71,6 +71,169 @@ int check_bool(std::string str) { } } +int BucketDirectory::zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi) +{ + try { + boost::system::error_code ec; + request req; + req.push("ZADD", bucket_id, "CH", std::to_string(0), member); + + response resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (!multi) { + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -ENOENT; + } + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; + +} + +int BucketDirectory::zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi) +{ + try { + boost::system::error_code ec; + request req; + req.push("ZREM", bucket_id, member); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (!multi) { + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -ENOENT; + } + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BucketDirectory::zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector& members, optional_yield y) +{ + try { + boost::system::error_code ec; + request req; + if (offset == 0 && count == 0) { + req.push("ZRANGE", bucket_id, start, stop, "bylex"); + } else { + req.push("ZRANGE", bucket_id, start, stop, "bylex", "LIMIT", offset, count); + } + + response > resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value().empty()) { + ldpp_dout(dpp, 10) << "BucketDirectory::" << __func__ << "() Empty response" << dendl; + return -ENOENT; + } + + members = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BucketDirectory::zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector& members, uint64_t next_cursor, optional_yield y) +{ + try { + boost::system::error_code ec; + request req; + + req.push("ZSCAN", bucket_id, cursor, "MATCH", pattern, "COUNT", count); + + boost::redis::generic_response resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + std::vector > > root_array; + if (resp.has_value()) { + root_array = resp.value(); + ldpp_dout(dpp, 20) << "BucketDirectory::" << __func__ << "() aggregate size is: " << root_array.size() << dendl; + auto size = root_array.size(); + if (size >= 2) { + //Nothing of interest at index 0, index 1 has the next cursor value + next_cursor = std::stoull(root_array[1].value); + + //skip the first 3 values to get the actual member, score + for (uint64_t i = 3; i < size; i = i+2) { + members.emplace_back(root_array[i].value); + ldpp_dout(dpp, 20) << "BucketDirectory::" << __func__ << "() member is: " << root_array[i].value << dendl; + } + } + } else { + return -ENOENT; + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int BucketDirectory::zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y) +{ + try { + boost::system::error_code ec; + request req; + + req.push("ZRANK", bucket_id, member); + + response resp; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + rank = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + std::string ObjectDirectory::build_index(CacheObj* object) { return object->bucketName + "_" + object->objName; @@ -137,7 +300,15 @@ int ObjectDirectory::set(const DoutPrefixProvider* dpp, CacheObj* object, option if (!endpoint.empty()) endpoint.pop_back(); - redisValues.push_back(endpoint); + redisValues.push_back(endpoint); + redisValues.push_back("etag"); + redisValues.push_back(object->etag); + redisValues.push_back("objSize"); + redisValues.push_back(std::to_string(object->size)); + redisValues.push_back("userId"); + redisValues.push_back(object->user_id); + redisValues.push_back("displayName"); + redisValues.push_back(object->display_name); try { boost::system::error_code ec; @@ -170,6 +341,10 @@ int ObjectDirectory::get(const DoutPrefixProvider* dpp, CacheObj* object, option fields.push_back("creationTime"); fields.push_back("dirty"); fields.push_back("hosts"); + fields.push_back("etag"); + fields.push_back("objSize"); + fields.push_back("userId"); + fields.push_back("displayName"); try { boost::system::error_code ec; @@ -185,15 +360,20 @@ int ObjectDirectory::get(const DoutPrefixProvider* dpp, CacheObj* object, option } if (std::get<0>(resp).value().empty()) { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values returned." << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values returned." << dendl; return -ENOENT; } - object->objName = std::get<0>(resp).value()[0]; - object->bucketName = std::get<0>(resp).value()[1]; - object->creationTime = std::get<0>(resp).value()[2]; - object->dirty = (std::stoi(std::get<0>(resp).value()[3]) != 0); - boost::split(object->hostsList, std::get<0>(resp).value()[4], boost::is_any_of("_")); + using Fields = rgw::d4n::ObjectFields; + object->objName = std::get<0>(resp).value()[std::size_t(Fields::ObjName)]; + object->bucketName = std::get<0>(resp).value()[std::size_t(Fields::BucketName)]; + object->creationTime = std::get<0>(resp).value()[std::size_t(Fields::CreationTime)]; + object->dirty = (std::stoi(std::get<0>(resp).value()[std::size_t(Fields::Dirty)]) != 0); + boost::split(object->hostsList, std::get<0>(resp).value()[std::size_t(Fields::Hosts)], boost::is_any_of("_")); + object->etag = std::get<0>(resp).value()[std::size_t(Fields::Etag)]; + object->size = std::stoull(std::get<0>(resp).value()[std::size_t(Fields::ObjSize)]); + object->user_id = std::get<0>(resp).value()[std::size_t(Fields::UserID)]; + object->display_name = std::get<0>(resp).value()[std::size_t(Fields::DisplayName)]; } catch (std::exception &e) { ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; @@ -233,7 +413,7 @@ int ObjectDirectory::copy(const DoutPrefixProvider* dpp, CacheObj* object, const if (std::get<0>(std::get<3>(resp).value()).value().value() == 1) { return 0; } else { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values copied." << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values copied." << dendl; return -ENOENT; } } catch (std::exception &e) { @@ -256,7 +436,7 @@ int ObjectDirectory::del(const DoutPrefixProvider* dpp, CacheObj* object, option redis_exec(conn, ec, req, resp, y); if (!std::get<0>(resp).value()) { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "(): No values deleted." << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "(): No values deleted." << dendl; return -ENOENT; } @@ -350,7 +530,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl if (!multi) { if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; return -ENOENT; } } @@ -381,7 +561,7 @@ int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int } if (std::get<0>(resp).value().empty()) { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; return -ENOENT; } @@ -395,13 +575,13 @@ int ObjectDirectory::zrange(const DoutPrefixProvider* dpp, CacheObj* object, int return 0; } -int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y) +int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& start, const std::string& stop, std::vector& members, optional_yield y) { std::string key = build_index(object); try { boost::system::error_code ec; request req; - req.push("ZREVRANGE", key, std::to_string(start), std::to_string(stop)); + req.push("ZREVRANGE", key, start, stop); response > resp; redis_exec(conn, ec, req, resp, y); @@ -439,7 +619,7 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const if (!multi) { if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; return -ENOENT; } } @@ -470,7 +650,7 @@ int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* o if (!multi) { if (std::get<0>(resp).value() == "0") { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl; + ldpp_dout(dpp, 10) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl; return -ENOENT; } } @@ -511,6 +691,31 @@ int ObjectDirectory::incr(const DoutPrefixProvider* dpp, CacheObj* object, optio return value; } +int ObjectDirectory::zrank(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, std::string& index, optional_yield y) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZRANK", key, member); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + index = std::get<0>(resp).value(); + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + return 0; +} + std::string BlockDirectory::build_index(CacheBlock* block) { return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); @@ -596,6 +801,14 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option hosts.pop_back(); redisValues.push_back(hosts); + redisValues.push_back("etag"); + redisValues.push_back(block->cacheObj.etag); + redisValues.push_back("objSize"); + redisValues.push_back(std::to_string(block->cacheObj.size)); + redisValues.push_back("userId"); + redisValues.push_back(block->cacheObj.user_id); + redisValues.push_back("displayName"); + redisValues.push_back(block->cacheObj.display_name); try { boost::system::error_code ec; @@ -616,6 +829,134 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option return 0; } +template +struct expander; + +template +struct expander> { +template +using elem = E; + +using type = boost::redis::response...>; +}; + +template +struct redis_response +{ + using type = typename expander>::type; +}; + +template +constexpr void constexpr_for_each(std::integer_sequence, F &&func) +{ + (func(std::integral_constant{}) , ...); +} + +template +constexpr void constexpr_for(F &&func) +{ + if constexpr (N > 0) + { + constexpr_for_each(std::make_integer_sequence{}, std::forward(func)); + } +} + +template +void parse_response(T t, std::vector>& responses) +{ + constexpr_for>([&](auto index) + { + std::vector empty_vector; + constexpr auto i = index.value; + if (std::get(t).value().has_value()) { + if (std::get(t).value().value().empty()) { + responses.emplace_back(empty_vector); + } else { + responses.emplace_back(std::get(t).value().value()); + } + } else { + responses.emplace_back(empty_vector); + } + }); +} + +int BlockDirectory::get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y) +{ + request req; + redis_response<100, std::optional>>::type resp; + for (auto block : blocks) { + std::string key = build_index(&block); + std::vector fields; + ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): index is: " << key << dendl; + + fields.push_back("blockID"); + fields.push_back("version"); + fields.push_back("deleteMarker"); + fields.push_back("size"); + fields.push_back("globalWeight"); + + fields.push_back("objName"); + fields.push_back("bucketName"); + fields.push_back("creationTime"); + fields.push_back("dirty"); + fields.push_back("hosts"); + fields.push_back("etag"); + fields.push_back("objSize"); + fields.push_back("userId"); + fields.push_back("displayName"); + + try { + req.push_range("HMGET", key, fields); + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + } //end - for + + try { + boost::system::error_code ec; + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + std::vector> responses; + parse_response(resp, responses); + + for (size_t i = 0; i < blocks.size(); i++) { + CacheBlock* block = &blocks[i]; + auto vec = responses[i]; + if (vec.empty()) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "(): No values returned for key=" << build_index(block) << dendl; + continue; + } + + using Fields = rgw::d4n::BlockFields; + block->blockID = std::stoull(vec[std::size_t(Fields::BlockID)]); + block->version = vec[std::size_t(Fields::Version)]; + block->deleteMarker = (std::stoi(vec[std::size_t(Fields::DeleteMarker)]) != 0); + block->size = std::stoull(vec[std::size_t(Fields::Size)]); + block->globalWeight = std::stoull(vec[std::size_t(Fields::GlobalWeight)]); + block->cacheObj.objName = vec[std::size_t(Fields::ObjName)]; + block->cacheObj.bucketName = vec[std::size_t(Fields::BucketName)]; + block->cacheObj.creationTime = vec[std::size_t(Fields::CreationTime)]; + block->cacheObj.dirty = (std::stoi(vec[std::size_t(Fields::Dirty)]) != 0); + boost::split(block->cacheObj.hostsList, vec[std::size_t(Fields::Hosts)], boost::is_any_of("_")); + block->cacheObj.etag = vec[std::size_t(Fields::Etag)]; + block->cacheObj.size = std::stoull(vec[std::size_t(Fields::ObjSize)]); + block->cacheObj.user_id = vec[std::size_t(Fields::UserID)]; + block->cacheObj.display_name = vec[std::size_t(Fields::DisplayName)]; + } + + return 0; +} + int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y) { std::string key = build_index(block); @@ -633,6 +974,10 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, option fields.push_back("creationTime"); fields.push_back("dirty"); fields.push_back("hosts"); + fields.push_back("etag"); + fields.push_back("objSize"); + fields.push_back("userId"); + fields.push_back("displayName"); try { boost::system::error_code ec; @@ -662,6 +1007,10 @@ int BlockDirectory::get(const DoutPrefixProvider* dpp, CacheBlock* block, option block->cacheObj.creationTime = std::get<0>(resp).value().value()[7]; block->cacheObj.dirty = (std::stoi(std::get<0>(resp).value().value()[8]) != 0); boost::split(block->cacheObj.hostsList, std::get<0>(resp).value().value()[9], boost::is_any_of("_")); + block->cacheObj.etag = std::get<0>(resp).value().value()[10]; + block->cacheObj.size = std::stoull(std::get<0>(resp).value().value()[11]); + block->cacheObj.user_id = std::get<0>(resp).value().value()[12]; + block->cacheObj.display_name = std::get<0>(resp).value().value()[13]; } catch (std::exception &e) { ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; return -EINVAL; diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 8319ba93587a..7c5a84d9024e 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -14,12 +14,45 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; +enum class ObjectFields { // Fields stored in object directory + ObjName, + BucketName, + CreationTime, + Dirty, + Hosts, + Etag, + ObjSize, + UserID, + DisplayName +}; + +enum class BlockFields { // Fields stored in block directory + BlockID, + Version, + DeleteMarker, + Size, + GlobalWeight, + ObjName, + BucketName, + CreationTime, + Dirty, + Hosts, + Etag, + ObjSize, + UserID, + DisplayName +}; + struct CacheObj { std::string objName; /* S3 object name */ std::string bucketName; /* S3 bucket name */ std::string creationTime; /* Creation time of the S3 Object */ bool dirty{false}; std::unordered_set hostsList; /* List of hostnames of object locations for multiple backends */ + std::string etag; //etag needed for list objects + uint64_t size; //total object size (and not block size), needed for list objects + std::string user_id; // id of user, needed for list object versions + std::string display_name; // display name of owner, needed for list object versions }; struct CacheBlock { @@ -37,6 +70,19 @@ class Directory { Directory() {} }; +class BucketDirectory: public Directory { + public: + BucketDirectory(std::shared_ptr& conn) : conn(conn) {} + int zadd(const DoutPrefixProvider* dpp, const std::string& bucket_id, double score, const std::string& member, optional_yield y, bool multi=false); + int zrem(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, optional_yield y, bool multi=false); + int zrange(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& start, const std::string& stop, uint64_t offset, uint64_t count, std::vector& members, optional_yield y); + int zscan(const DoutPrefixProvider* dpp, const std::string& bucket_id, uint64_t cursor, const std::string& pattern, uint64_t count, std::vector& members, uint64_t next_cursor, optional_yield y); + int zrank(const DoutPrefixProvider* dpp, const std::string& bucket_id, const std::string& member, uint64_t& rank, optional_yield y); + + private: + std::shared_ptr conn; +}; + class ObjectDirectory: public Directory { public: ObjectDirectory(std::shared_ptr& conn) : conn(conn) {} @@ -50,9 +96,10 @@ class ObjectDirectory: public Directory { int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y); int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi=false); int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); - int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); + int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& start, const std::string& stop, std::vector& members, optional_yield y); int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi=false); int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi=false); + int zrank(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, std::string& index, optional_yield y); //Return value is the incremented value, else return error int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); @@ -70,6 +117,8 @@ class BlockDirectory: public Directory { int set(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); int get(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); + //Pipelined version of get for list bucket + int get(const DoutPrefixProvider* dpp, std::vector& blocks, optional_yield y); int copy(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& copyName, const std::string& copyBucketName, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false); int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y); diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 9ab9bf3d6e3c..18a9e3b29df2 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -870,7 +870,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) if (op_ret < 0) { ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; } else { - // if this entry is the latest, it could have been overwritten by a newer one + // if this entry is not the latest, it could have been overwritten by a newer one if (block.version == e->version) { rgw::d4n::CacheBlock null_block; null_block = block; @@ -878,7 +878,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) //hash entry for null block op_ret = blockDir->get(dpp, &null_block, y); if (op_ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl; + ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl; } else { if (null_block.version == e->version) { block.cacheObj.dirty = false; @@ -931,14 +931,21 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; } if (latest_block.version == e->version) { - //remove object entry from ordered set + //remove object entry from ordered set of versions if (c_obj->have_instance()) { - blockDir->del(dpp, &latest_block, y); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; - continue; + blockDir->del(dpp, &latest_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + continue; } } + //delete entry from ordered set of objects, as older versions would have been written to the backend store + ret = bucketDir->zrem(dpp, e->bucket_id, c_obj->get_name(), y, true); + if (ret < 0) { + blockDir->discard(dpp, y); + ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue zrem for object entry: " << c_obj->get_name() << ", ret=" << ret << dendl; + continue; + } } ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ @@ -1002,7 +1009,7 @@ void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, ui } void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, -const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val) +const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val) { const std::lock_guard l(lru_lock); ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key); diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 33c6c5fc0b3a..3677d19ff92c 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -156,6 +156,7 @@ class LFUDAPolicy : public CachePolicy { std::shared_ptr conn; BlockDirectory* blockDir; ObjectDirectory* objDir; + BucketDirectory* bucketDir; rgw::cache::CacheDriver* cacheDriver; std::optional rthread_timer; rgw::sal::Driver* driver; @@ -187,9 +188,11 @@ class LFUDAPolicy : public CachePolicy { { blockDir = new BlockDirectory{conn}; objDir = new ObjectDirectory{conn}; + bucketDir = new BucketDirectory{conn}; } ~LFUDAPolicy() { rthread_stop(); + delete bucketDir; delete blockDir; delete objDir; std::lock_guard l(lfuda_cleaning_lock); @@ -240,9 +243,9 @@ class LRUPolicy : public CachePolicy { virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; virtual bool update_refcount_if_key_exists(const DoutPrefixProvider* dpp, const std::string& key, uint8_t op, optional_yield y) override { return false; } virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val=empty) override; - virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, - time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, - const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) override; + virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, + time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, + const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override { return false; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index af31d2479371..2e6b2d0d687d 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -57,6 +57,7 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) conn = std::make_shared(boost::asio::make_strand(io_context)); objDir = std::make_unique(conn); blockDir = std::make_unique(conn); + bucketDir = std::make_unique(conn); policyDriver = std::make_unique(conn, cacheDriver.get(), "lfuda"); @@ -117,6 +118,487 @@ int D4NFilterBucket::create(const DoutPrefixProvider* dpp, return next->create(dpp, params, y); } +int D4NFilterBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int max, + ListResults& results, optional_yield y) +{ + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.marker.name: " << params.marker.name << dendl; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.marker.instance: " << params.marker.instance << dendl; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.end_marker.key: " << params.end_marker.name << dendl; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " max: " << max << dendl; + + if (max == 0) { + return 0; + } + + //Get objects from cache + auto bucketDir = this->filter->get_bucket_dir(); + auto objDir = this->filter->get_obj_dir(); + std::vector objects; + std::vector entries; + + ListResults cache_results; + ListResults store_results; + if (g_conf()->d4n_writecache_enabled) { + cache_results.is_truncated = false; + if (!params.prefix.empty()) { + std::string pattern = params.prefix + "*"; + /* zscan does not always take into account COUNT as smaller sizes of ordered set are stored as one sequential blob of memory + so the entire set might be returned regarless of value of COUNT. + also valid values of cursor are only zero during start of iteration and cursor returned by previous zscan call + Refer: https://valkey.io/commands/scan/ */ + uint64_t cursor = 0, next_cursor = 0; + int num_objs = 0; + bool instance_marker_processed = false; + std::string last_version; + do { + std::vector temp_objects; + auto ret = bucketDir->zscan(dpp, this->get_bucket_id(), cursor, pattern, (max + 1), temp_objects, next_cursor, y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zscan failed with ret: " << ret << dendl; + return ret; + } + //filter elements before marker (exclude marker from output) + std::string last_element_processed; + for (auto it = temp_objects.begin(); it != temp_objects.end(); it++) { + last_element_processed = *it; + if (!params.marker.name.empty() && *it <= params.marker.name) { + if (*it != params.marker.name || !params.list_versions || params.marker.instance.empty()) { + continue; + } + } else { + auto pos = it->find(params.delim, params.prefix.length()); + if (!params.delim.empty() && pos != std::string::npos) { + std::string delim_str = it->erase((pos + 1), (it->length() - 1)); + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " delim_str: " << delim_str << dendl; + if (cache_results.common_prefixes.find(delim_str) == cache_results.common_prefixes.end()) { + cache_results.common_prefixes.emplace(std::make_pair(delim_str, true)); + store_results.common_prefixes.emplace(std::make_pair(delim_str, true)); + num_objs++; //all objects under a common prefix are counted as one + } + } else { + std::vector temp_versions; + //if params.list_versions is given, get the versions of the object + if (params.list_versions) { + std::string objName = *it; + // special handling for name starting with '_' + if (objName[0] == '_') { + objName = "_" + *it; + } + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = objName, + .bucketName = this->get_bucket_id(), + }; + std::string start; + if (params.marker.instance.empty() || instance_marker_processed || params.marker.name.empty()) { + start = "0"; + } else { + if (!instance_marker_processed) { + std::string member = params.marker.instance; + std::string index; + auto ret = objDir->zrank(dpp, &dir_obj, member, index, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrank failed with: " << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " start: " << index << dendl; + start = index; + instance_marker_processed = true; + } + } + auto ret = objDir->zrevrange(dpp, &dir_obj, start, "-1", temp_versions, y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrevrange failed with ret: " << ret << dendl; + return ret; + } + } //params.list_version + if (!temp_versions.empty()) { + for (auto version_it = temp_versions.begin(); version_it != temp_versions.end(); version_it++) { + std::string version = *(version_it); + if (std::next(version_it) == temp_versions.end()) { + last_version = *version_it; + } + rgw_bucket_list_entries entry; + entry.flags = rgw_bucket_dir_entry::FLAG_VER; + if (version_it == temp_versions.begin()) { + entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT; + } else { + entry.flags |= rgw_bucket_dir_entry::FLAG_VER_MARKER; + } + rgw_obj_key key{std::move(*it), std::move(*version_it)}; + entry.key = std::move(key); + entries.emplace_back(entry); + num_objs++; + if (num_objs == max) { + if (std::next(version_it) != temp_versions.end()) { + cache_results.is_truncated = true; + cache_results.next_marker.instance = version; + cache_results.next_marker.name = last_element_processed; + } + break; //break from the 'for' loop that processes temp_versions + } + } + } else { + rgw_bucket_list_entries entry; + entry.key.name = std::move(*it); + entries.emplace_back(entry); + num_objs++; + } + } + } + if (num_objs == max) { + if (std::next(it) != temp_objects.end()) { + if (cache_results.next_marker.empty()) { + cache_results.is_truncated = true; + cache_results.next_marker.name = last_element_processed; + if (cache_results.next_marker.instance.empty() && params.list_versions) { + cache_results.next_marker.instance = last_version; + } + } + break; //break from the 'for' loop that processes temp_objects + } + } + } + /* break if next_cursor is 0 which means end of ordered set or if no entries are found in ordered set + or if num_objs after filtering is equal to max */ + if ((next_cursor == 0) || (ret == -ENOENT) || (num_objs == max)) { + if ((num_objs == max) && (next_cursor != 0)) { + if (cache_results.next_marker.empty()) { + cache_results.is_truncated = true; + cache_results.next_marker.name = entries[(max - 1)].key.name; + } + } + break; + } + cursor = next_cursor; + } while(next_cursor != 0); + } else { //no prefix is specified + std::string start; + if (params.marker.name.empty()) { + start = "-"; + } else { + if (!params.marker.instance.empty()) { + start = "[" + params.marker.name; + } else { + start = "(" + params.marker.name; + } + } + int num_objs = 0; + bool instance_marker_processed = false; + std::string last_version; + do { + std::vector temp_objects; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " zrange start is: " << start << dendl; + auto ret = bucketDir->zrange(dpp, this->get_bucket_id(), start, "+", 0, (max + 1), temp_objects, y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrange failed with ret: " << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " params.delim: " << params.delim << dendl; + std::string last_element_processed; + for (auto it = temp_objects.begin(); it != temp_objects.end(); it++) { + last_element_processed = *it; + auto pos = it->find(params.delim); + if (!params.delim.empty() && pos != std::string::npos) { + std::string delim_str = it->erase((pos + 1), (it->length() - 1)); + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " delim_str: " << delim_str << dendl; + if (cache_results.common_prefixes.find(delim_str) == cache_results.common_prefixes.end()) { + cache_results.common_prefixes.emplace(std::make_pair(delim_str, true)); + store_results.common_prefixes.emplace(std::make_pair(delim_str, true)); + num_objs++; //all objects under a common prefix are counted as one + if (num_objs == max) { + uint64_t cursor = 0, next_cursor = 0; + //get all the keys matching with delim_str so as to find the last element which will be the next marker + do { + std::vector delim_objects; + std::string delim_pattern = delim_str + "*"; + auto ret = bucketDir->zscan(dpp, this->get_bucket_id(), cursor, delim_pattern, 10, delim_objects, next_cursor, y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " zscan failed with ret: " << ret << dendl; + return ret; + } + if ((next_cursor == 0) || (ret == -ENOENT)) { + if (!delim_objects.empty()) { + cache_results.next_marker.name = delim_objects.back(); + std::string start = "(" + cache_results.next_marker.name; + std::vector one_object; + auto ret = bucketDir->zrange(dpp, this->get_bucket_id(), start, "+", 0, 1, one_object, y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " zrange failed with ret: " << ret << dendl; + return ret; + } + if (one_object.size() >= 1) { + cache_results.is_truncated = true; + } else { + cache_results.is_truncated = false; + } + } + break; + } + cursor = next_cursor; + next_cursor = 0; + } while(next_cursor != 0); + } //end-if num_objs == max + } //end-if cache_results.common_prefixes + } else { + std::vector temp_versions; + //if params.list_versions is given, get the versions of the object + if (params.list_versions) { + std::string objName = *it; + // special handling for name starting with '_' + if (objName[0] == '_') { + objName = "_" + *it; + } + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = objName, + .bucketName = this->get_bucket_id(), + }; + std::string start; + if (params.marker.instance.empty() || instance_marker_processed || params.marker.name.empty()) { + start = "0"; + } else { + if (!instance_marker_processed) { + std::string member = params.marker.instance; + std::string index; + auto ret = objDir->zrank(dpp, &dir_obj, member, index, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrank failed with: " << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " start: " << index << dendl; + start = index; + instance_marker_processed = true; + } + } + auto ret = objDir->zrevrange(dpp, &dir_obj, start, "-1", temp_versions, y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " zrevrange failed with ret: " << ret << dendl; + return ret; + } + } //params.list_version + if (!temp_versions.empty()) { + for (auto version_it = temp_versions.begin(); version_it != temp_versions.end(); version_it++) { + std::string version = *(version_it); + if (std::next(version_it) == temp_versions.end()) { + last_version = *version_it; + } + rgw_bucket_list_entries entry; + entry.flags = rgw_bucket_dir_entry::FLAG_VER; + if (version_it == temp_versions.begin()) { + entry.flags |= rgw_bucket_dir_entry::FLAG_CURRENT; + } else { + entry.flags |= rgw_bucket_dir_entry::FLAG_VER_MARKER; + } + rgw_obj_key key{std::move(*it), std::move(*version_it)}; + entry.key = std::move(key); + entries.emplace_back(entry); + num_objs++; + if (num_objs == max) { + if (std::next(version_it) != temp_versions.end()) { + cache_results.is_truncated = true; + cache_results.next_marker.instance = version; + cache_results.next_marker.name = *it; + } + break; //break from the 'for' loop that processes temp_versions + } + } + } else { + rgw_bucket_list_entries entry; + entry.key.name = std::move(*it); + entries.emplace_back(entry); + num_objs++; + } + } + if (num_objs == max) { + if (std::next(it) != temp_objects.end()) { + //could have been set due to delimiter processing + if (cache_results.next_marker.name.empty()) { + cache_results.is_truncated = true; + cache_results.next_marker.name = last_element_processed; + if (cache_results.next_marker.instance.empty() && params.list_versions) { + cache_results.next_marker.instance = last_version; + } + } + } + break; //break from the 'for' loop that processes temp_objects + } + } //for - processes temp_objects + //break from while loop if max+1 elements have been found or there are no more elements in the ordered set + if ((num_objs == max) || (ret == -ENOENT) || temp_objects.empty()) { + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " Breaking out! " << dendl; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " num_objs " << num_objs << dendl; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " ret " << ret << dendl; + break; + } + //adjust start to begin from the last element of objects, and exclude that element + start = "(" + last_element_processed; + } while(num_objs <= max); + } //end - else + + rgw::d4n::BlockDirectory* blockDir = this->filter->get_block_dir(); + auto remainder_size = entries.size(); + size_t j = 0, start_j = 0; + while (remainder_size > 0) { + std::vector blocks(100); + start_j = j; + size_t batch_size = std::min(static_cast(100), remainder_size); + for (size_t i = 0; i < batch_size; i++) { + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " objects[j]: " << entries[j].key.name << dendl; + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " remainder_size: " << remainder_size << dendl; + if (entries[j].key.instance == "null") { + blocks[i].cacheObj.objName = "_:null_" + entries[j].key.name; + } else { + blocks[i].cacheObj.objName = entries[j].key.get_oid(); + } + blocks[i].cacheObj.bucketName = this->get_bucket_id(); + ldpp_dout(dpp, 20) << "D4NFilterBucket::" << __func__ << " blocks[i].cacheObj.objName: " << blocks[i].cacheObj.objName << dendl; + j++; + } + auto ret = blockDir->get(dpp, blocks, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " blockDir->get() returned error: " << ret << dendl; + return ret; + } + + for (auto block : blocks) { + if (block.cacheObj.objName.empty()) { + start_j++; + continue; + } + rgw_bucket_dir_entry entry; + entry.key.name = entries[start_j].key.name; + // special handling for name starting with '_' + if (entry.key.name[0] == '_') { + entry.key.name = "_" + entries[start_j].key.name; + } + entry.key.instance = entries[start_j].key.instance; + entry.flags = entries[start_j].flags; + if (block.deleteMarker) { + entry.flags |= rgw_bucket_dir_entry::FLAG_DELETE_MARKER; + } + entry.meta.storage_class = "CACHE"; + entry.meta.size = block.cacheObj.size; + entry.meta.accounted_size = block.cacheObj.size; + struct std::tm tm; + std::istringstream ss(block.cacheObj.creationTime); + ss >> std::get_time(&tm, "%H:%M:%S"); + std::time_t creationTime = mktime(&tm); + entry.meta.mtime = ceph::real_clock::from_time_t(creationTime); + entry.meta.etag = block.cacheObj.etag; + entry.meta.owner = block.cacheObj.user_id; + entry.meta.owner_display_name = block.cacheObj.display_name; + cache_results.objs.emplace_back(entry); + start_j++; + } + + if (remainder_size <= 100) { + remainder_size = 0; + } else { + remainder_size = remainder_size - 100; + } + } + } //d4n_write_cache_enabled = true + + //Get objects from backend store + auto ret = next->list(dpp, params, max, store_results, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterBucket::" << __func__ << " next->list returned: " << ret << dendl; + return ret; + } + + //return store results if cache results is empty + if (cache_results.objs.empty() && !store_results.objs.empty()) { + results = std::move(store_results); + return 0; + } + + //return cache results if store results is empty + if (store_results.objs.empty() && !cache_results.objs.empty()) { + results = std::move(cache_results); + return 0; + } + + //if both lists are non-empty then merge them, as they are already sorted + if (!cache_results.objs.empty() && !store_results.objs.empty()) { + std::vector objs; + objs.reserve(max); + + size_t i = 0, j = 0; + int elementsAdded = 0; + + // Compare elements from both vectors and merge in sorted order + while (elementsAdded < max && i < cache_results.objs.size() && j < store_results.objs.size()) { + std::string key_name_in_cache = cache_results.objs[i].key.name; + std::string key_name_in_store = store_results.objs[j].key.name; + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " key_name_in_cache: " << key_name_in_cache << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " key_name_in_store: " << key_name_in_store << dendl; + std::string key_instance_in_cache = cache_results.objs[i].key.instance; + std::string key_instance_in_store = store_results.objs[j].key.instance; + if (key_name_in_cache == key_name_in_store) { + objs.push_back(cache_results.objs[i]); + i++; + //if list versions is not given or if instance values are different then skip the store result + if (!params.list_versions || key_instance_in_cache == key_instance_in_store) { + j++; + } + } + else if (key_name_in_cache < key_name_in_store) { + objs.push_back(cache_results.objs[i]); + i++; + } else { + objs.push_back(store_results.objs[j]); + j++; + } + elementsAdded++; + } + + while (elementsAdded < max && i < cache_results.objs.size()) { + objs.push_back(cache_results.objs[i]); + i++; + elementsAdded++; + } + + while (elementsAdded < max && j < store_results.objs.size()) { + objs.push_back(store_results.objs[j]); + j++; + elementsAdded++; + } + + //there are elements in cache_results + if (i < (cache_results.objs.size() - 1)) { + results.is_truncated = true; + results.next_marker.name = cache_results.objs[(i - 1)].key.to_string(); + } + + //there are elements in store_results + if (j < (store_results.objs.size() - 1)) { + results.is_truncated = true; + results.next_marker.name = store_results.objs[(j - 1)].key.to_string(); + } + results.objs = std::move(objs); + } + + if (!store_results.common_prefixes.empty()) { + results.common_prefixes = std::move(store_results.common_prefixes); + //results next_marker is not set at this point which means result.objs is empty + if (results.next_marker.empty()) { + results.is_truncated = store_results.is_truncated | cache_results.is_truncated; + if (store_results.is_truncated && cache_results.is_truncated) { + if (cache_results.next_marker <= store_results.next_marker) { + results.next_marker = std::move(cache_results.next_marker); + } else { + results.next_marker = std::move(store_results.next_marker); + } + } else if (store_results.is_truncated && !cache_results.is_truncated) { + results.next_marker = std::move(store_results.next_marker); + } else if (!store_results.is_truncated && cache_results.is_truncated) { + results.next_marker = std::move(cache_results.next_marker); + } + } + } + + return 0; +} + std::unique_ptr D4NFilterBucket::get_multipart_upload( const std::string& oid, std::optional upload_id, @@ -602,12 +1084,41 @@ int D4NFilterObject::create_delete_marker(const DoutPrefixProvider* dpp, optiona for "null" versions, when bucket is non-versioned. 3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended. 4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified -5. Redis ordered set to maintain the order of dirty objects added for a version enabled bucket. Even when the bucket is non-versioned, this set maintains a "null" entry */ +5. Redis ordered set to maintain the order of dirty objects added for a version enabled bucket. Even when the bucket is non-versioned, this set maintains a "null" entry +6. Another ordered set to maintain a lexicographically sorted order of objects for a bucket - used for bucket listing */ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::vector* exec_responses, optional_yield y, bool is_latest_version, bool dirty) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl; rgw::d4n::CacheBlock block; rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir(); + auto attrs = this->get_attrs(); + bufferlist bl_etag, bl_acl; + auto etag_it = attrs.find(RGW_ATTR_ETAG); + if (etag_it != attrs.end()) { + bl_etag = etag_it->second; + } + auto etag = bl_etag.to_str(); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): etag: " << etag << dendl; + + auto acl_it = attrs.find(RGW_ATTR_ACL); + RGWAccessControlPolicy policy; + std::string user_id, display_name; + if (acl_it != attrs.end()) { + auto bliter = acl_it->second.cbegin(); + try { + policy.decode(bliter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): ERROR: could not decode policy, caught buffer::error" << dendl; + return -EIO; + } + ACLOwner owner = policy.get_owner(); + rgw_user user = std::get(owner.id); + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): INFO: user_d: " << user.to_str() << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): INFO: display_name: " << owner.display_name << dendl; + user_id = user.to_str(); + display_name = owner.display_name; + } + if (is_latest_version) { std::string objName = this->get_name(); // special handling for name starting with '_' @@ -621,6 +1132,10 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())), .dirty = dirty, .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }, + .etag = etag, + .size = this->get_accounted_size(), + .user_id = user_id, + .display_name = display_name, }; block.cacheObj = object; @@ -663,6 +1178,14 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: " << std::setprecision(std::numeric_limits::max_digits10) << score << ret << dendl; rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir(); ret = objDir->zadd(dpp, &object, score, object_version, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add version to ordered set with error: " << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + //add an entry to ordered set containing objects for bucket listing, set score to 0 always to lexicographically order the objects + rgw::d4n::BucketDirectory* bucketDir = this->driver->get_bucket_dir(); + ret = bucketDir->zadd(dpp, this->get_bucket()->get_bucket_id(), 0, this->get_name(), y, true); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl; return ret; @@ -732,6 +1255,10 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std:: .bucketName = this->get_bucket()->get_bucket_id(), .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())), .dirty = dirty, + .etag = etag, + .size = this->get_accounted_size(), + .user_id = user_id, + .display_name = display_name, }; version_object.hostsList.insert({ dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }); @@ -1078,6 +1605,7 @@ void D4NFilterDriver::shutdown() cacheDriver.reset(); objDir.reset(); blockDir.reset(); + bucketDir.reset(); policyDriver.reset(); next->shutdown(); @@ -1960,6 +2488,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp bool objDirty = block.cacheObj.dirty; auto blockDir = source->driver->get_block_dir(); auto objDir = source->driver->get_obj_dir(); + auto bucketDir = source->driver->get_bucket_dir(); std::string version = source->get_object_version(); std::string objName = source->get_name(); // special handling for name starting with '_' @@ -2045,7 +2574,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp if (latest_block.version == block.version) { std::vector members; //get the second latest version - ret = objDir->zrevrange(dpp, &dir_obj, 0, 1, members, y); + ret = objDir->zrevrange(dpp, &dir_obj, std::to_string(0), std::to_string(1), members, y); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the second latest version for: " << dir_obj.objName << ", ret=" << ret << dendl; return ret; @@ -2075,6 +2604,12 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete latest entry in block directory, when it is the same as version requested, for: " << block.cacheObj.objName << ", ret=" << ret << dendl; return ret; } + //delete entry from ordered set of objects + ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl; + return ret; + } } } //end-if latest_block.version == block.version //delete versioned entry (handles delete markers also) @@ -2082,7 +2617,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; return ret; } - //delete entry from ordered set + //delete entry from ordered set of versions std::string version = source->get_instance(); ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl; ret = objDir->zrem(dpp, &dir_obj, version, y, true); @@ -2129,17 +2664,25 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; return ret; } - //dirty objects - delete from ordered set + //dirty objects - delete from ordered set of versions and objects if (objDirty) { rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ .objName = source->get_name(), .bucketName = source->get_bucket()->get_bucket_id(), }; + //delete entry from ordered set of object versions ret = objDir->zrem(dpp, &dir_obj, "null", y, true); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl; return ret; } + //delete entry from ordered set of objects + ret = bucketDir->zrem(dpp, source->get_bucket()->get_bucket_id(), source->get_name(), y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in bucket directory for: " << source->get_name() << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } } } //end-if non-versioned buckets diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 0f645dcf12e7..70f7d392ba8a 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -57,6 +57,7 @@ class D4NFilterDriver : public FilterDriver { std::unique_ptr cacheDriver; std::unique_ptr objDir; std::unique_ptr blockDir; + std::unique_ptr bucketDir; std::unique_ptr policyDriver; boost::asio::io_context& io_context; @@ -79,10 +80,9 @@ class D4NFilterDriver : public FilterDriver { uint64_t olh_epoch, const std::string& unique_tag) override; rgw::cache::CacheDriver* get_cache_driver() { return cacheDriver.get(); } - - rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir.get(); } rgw::d4n::BlockDirectory* get_block_dir() { return blockDir.get(); } + rgw::d4n::BucketDirectory* get_bucket_dir() { return bucketDir.get(); } rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); } void shutdown() override; }; @@ -100,6 +100,10 @@ class D4NFilterUser : public FilterUser { class D4NFilterBucket : public FilterBucket { private: + struct rgw_bucket_list_entries{ + rgw_obj_key key; + uint16_t flags; + }; D4NFilterDriver* filter; public: @@ -109,6 +113,8 @@ class D4NFilterBucket : public FilterBucket { virtual ~D4NFilterBucket() = default; virtual std::unique_ptr get_object(const rgw_obj_key& key) override; + virtual int list(const DoutPrefixProvider* dpp, ListParams& params, int max, + ListResults& results, optional_yield y) override; virtual int create(const DoutPrefixProvider* dpp, const CreateParams& params, optional_yield y) override; diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 4816c47666db..0c13e1db07bd 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -769,6 +769,172 @@ TEST_F(BlockDirectoryFixture, MultiExecuteYield) io.run(); } +TEST_F(BlockDirectoryFixture, ZScan) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + boost::system::error_code ec; + { + request req; + response resp; + req.push("ZADD", "myzset", "0", "v1"); + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl; + } + { + request req; + response resp; + req.push("ZADD", "myzset", "0", "v2"); + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl; + } + { + request req; + req.push("ZSCAN", "myzset", 0, "MATCH", "v*", "COUNT", 2); + + boost::redis::generic_response resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + + std::vector > > root_array; + if (resp.has_value()) { + root_array = resp.value(); + std::cout << "ZADD aggregate size is: " << root_array.size() << std::endl; + auto size = root_array.size(); + if (size >= 2) { + //Nothing of interest at index 0, index 1 has the next cursor value + std::string new_cursor = root_array[1].value; + + //skip the first 3 values to get the actual member, score + for (uint64_t i = 3; i < size; i = i+2) { + std::string member = root_array[i].value; + std::cout << "ZADD member: " << member << std::endl; + } + } + } + } + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + + conn->async_exec(req, resp, yield[ec]); + } + + conn->cancel(); + }, rethrow); + + io.run(); +} + +template +struct expander; + +template +struct expander> { + template + using elem = E; + + using type = boost::redis::response...>; +}; + +template +struct my_tuple +{ + using type = typename expander>::type; +}; + +template +constexpr void constexpr_for_each(std::integer_sequence, F &&func) +{ + (func(std::integral_constant{}) , ...); +} + +template +constexpr void constexpr_for(F &&func) +{ + if constexpr (N > 0) + { + constexpr_for_each(std::make_integer_sequence{}, std::forward(func)); + } +} + +template +void foo(T t, std::vector>& responses) +{ + constexpr_for>([&](auto index) + { + constexpr auto i = index.value; + std::vector empty_vector; + if (std::get(t).value().has_value()) { + if (std::get(t).value().value().empty()) { + responses.emplace_back(empty_vector); + std::cout << "Empty value for i: " << i << std::endl; + } else { + responses.emplace_back(std::get(t).value().value()); + } + } else { + std::cout << "No value for i: " << i << std::endl; + responses.emplace_back(empty_vector); + } + }); +} + +TEST_F(BlockDirectoryFixture, Pipeline) +{ + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + boost::system::error_code ec; + { + request req; + response resp; + req.push("HSET", "testkey1", "name", "abc"); + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + { + request req; + response resp; + req.push("HSET", "testkey2", "name", "def"); + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + } + { + std::vector fields; + fields.push_back("name"); + request req; + req.push_range("HMGET", "testkey1", fields); + req.push_range("HMGET", "abc", fields); + + ASSERT_EQ(req.get_commands(), 2); + //using template parameterization in case we need to read responses for large numebr of elements (1000 elements) + my_tuple<5, std::optional>>::type resp; + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + std::vector> responses; + foo(resp, responses); + for (auto vec : responses) { + if (!vec.empty()) { + std::cout << "HMGET: " << vec[0] << std::endl; + } + } + } + { + boost::system::error_code ec; + request req; + req.push("FLUSHALL"); + response resp; + + conn->async_exec(req, resp, yield[ec]); + } + + conn->cancel(); + }, rethrow); + + io.run(); +} + int main(int argc, char *argv[]) { ::testing::InitGoogleTest(&argc, argv);