From 9f91f3f94865db1b118bf416339c9e0dc231a334 Mon Sep 17 00:00:00 2001 From: Samarah Date: Thu, 25 May 2023 14:58:26 -0400 Subject: [PATCH] rgw/d4n: This commit squashes the following commits related to d4n policy and filter driver. RGW: Add cache driver base class - add d4n filter driver. RGW: Update cache and policy files; add RedisDriver into D4N filter RGW: Add D4N classes and unit testing; update cpp_redis submodule QA: Add D4N teuthology suite RGW: Improve D4N readability and structure RGW: Add base D4N policy and test RGW: Add GWF policy to D4N RGW: Add global weight property RGW: Added D4N namespace RGW: Update policy driver interface RGW: Update unit tests RGW: Add Address struct to all files RGW: Update D4N names and structure RGW: Update structure and fix errors RGW: Add more features to policy driver RGW: Access local weight in policy code RGW: Work on D4N workflows RGW: Add object directory class and remove copy_value method RGW: Establish object directory in read workflow RGW: Update cache and policy files; add RedisDriver into D4N filter RGW: Switch out D4N cache methods with Redis driver methods RGW: Update RedisDriver to match new CacheDriver structure; define set_attrs method RGW: Fix D4N read workflow crashes RGW: Update D4N files to match CacheDriver changes RGW: fix key_exists method for RedisDriver and clean up rgw_sal_d4n.cc RGW: Use correct get_block method RGW: Make CachePolicy a virtual class RGW: Initialize localWeight if not found and develop find_victim method RGW: Debugging network failure RGW: Rebase and debugging network failure RGW: Update RedisDriver::list_entries and usage in D4N policy driver RGW: Fix network failure issue; add entries and entry methods RGW: Update D4N test files to match rebase RGW: Update D4N policy and RedisDriver with entries RGW: make localWeight an attribute rgw/cache: commit to fix compilation issues. Signed-off-by: Samarah --- ceph.spec.in | 6 + src/rgw/CMakeLists.txt | 5 +- src/rgw/driver/d4n/d4n_datacache.cc | 490 ---------------------------- src/rgw/driver/d4n/d4n_datacache.h | 40 --- src/rgw/driver/d4n/d4n_directory.cc | 344 ++++++++++++++++--- src/rgw/driver/d4n/d4n_directory.h | 102 ++++-- src/rgw/driver/d4n/d4n_policy.cc | 404 +++++++++++++++++++++++ src/rgw/driver/d4n/d4n_policy.h | 76 +++++ src/rgw/driver/d4n/rgw_sal_d4n.cc | 366 +++++++++++++++------ src/rgw/driver/d4n/rgw_sal_d4n.h | 63 ++-- src/rgw/rgw_process_env.h | 2 + src/rgw/rgw_redis_driver.cc | 18 +- src/rgw/rgw_redis_driver.h | 23 +- src/rgw/rgw_sal.cc | 2 + src/test/rgw/test_d4n_directory.cc | 108 +++--- src/test/rgw/test_d4n_filter.cc | 221 +++++++++++-- 16 files changed, 1445 insertions(+), 825 deletions(-) delete mode 100644 src/rgw/driver/d4n/d4n_datacache.cc delete mode 100644 src/rgw/driver/d4n/d4n_datacache.h create mode 100644 src/rgw/driver/d4n/d4n_policy.cc create mode 100644 src/rgw/driver/d4n/d4n_policy.h diff --git a/ceph.spec.in b/ceph.spec.in index 6e5942280bb..ed62be27f05 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -2228,6 +2228,12 @@ fi %dir %{_localstatedir}/lib/ceph/radosgw %{_unitdir}/ceph-radosgw@.service %{_unitdir}/ceph-radosgw.target +%exclude %{_includedir}/cpp_redis +%exclude %{_includedir}/tacopie +%exclude /usr/lib/libcpp_redis.a +%exclude /usr/lib/libtacopie.a +%exclude /usr/lib/pkgconfig/cpp_redis.pc +%exclude /usr/lib/pkgconfig/tacopie.pc %post radosgw %if 0%{?suse_version} diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index de907dd150f..a54d57f9c5a 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -150,6 +150,9 @@ set(librgw_common_srcs rgw_tracer.cc rgw_lua_background.cc rgw_data_access.cc + driver/d4n/d4n_directory.cc + driver/d4n/d4n_policy.cc + driver/d4n/rgw_sal_d4n.cc driver/rados/cls_fifo_legacy.cc driver/rados/rgw_bucket.cc driver/rados/rgw_bucket_sync.cc @@ -199,6 +202,7 @@ set(librgw_common_srcs driver/rados/topic.cc) list(APPEND librgw_common_srcs + driver/d4n/d4n_directory.cc driver/immutable_config/store.cc driver/json_config/store.cc driver/rados/config/impl.cc @@ -227,7 +231,6 @@ if(WITH_RADOSGW_DAOS) endif() if(WITH_RADOSGW_D4N) list(APPEND librgw_common_srcs driver/d4n/d4n_directory.cc) - list(APPEND librgw_common_srcs driver/d4n/d4n_datacache.cc) list(APPEND librgw_common_srcs driver/d4n/rgw_sal_d4n.cc) endif() if(WITH_RADOSGW_POSIX) diff --git a/src/rgw/driver/d4n/d4n_datacache.cc b/src/rgw/driver/d4n/d4n_datacache.cc deleted file mode 100644 index ec0338f5bd2..00000000000 --- a/src/rgw/driver/d4n/d4n_datacache.cc +++ /dev/null @@ -1,490 +0,0 @@ -#include "d4n_datacache.h" - -#define dout_subsys ceph_subsys_rgw -#define dout_context g_ceph_context - -/* Base metadata and data fields should remain consistent */ -std::vector baseFields { - "mtime", - "object_size", - "accounted_size", - "epoch", - "version_id", - "source_zone_short_id", - "bucket_count", - "bucket_size", - "user_quota.max_size", - "user_quota.max_objects", - "max_buckets", - "data"}; - -std::vector< std::pair > RGWD4NCache::buildObject(rgw::sal::Attrs* binary) { - std::vector< std::pair > values; - rgw::sal::Attrs::iterator attrs; - - /* Convert to vector */ - if (binary != NULL) { - for (attrs = binary->begin(); attrs != binary->end(); ++attrs) { - values.push_back(std::make_pair(attrs->first, attrs->second.to_str())); - } - } - - return values; -} - -int RGWD4NCache::findClient(cpp_redis::client *client) { - if (client->is_connected()) - return 0; - - if (host == "" || port == 0) { - dout(10) << "RGW D4N Cache: D4N cache endpoint was not configured correctly" << dendl; - return EDESTADDRREQ; - } - - client->connect(host, port, nullptr); - - if (!client->is_connected()) - return ECONNREFUSED; - - return 0; -} - -int RGWD4NCache::existKey(std::string key) { - int result = -1; - std::vector keys; - keys.push_back(key); - - if (!client.is_connected()) { - return result; - } - - try { - client.exists(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); /* Returns 1 upon success */ - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) {} - - return result; -} - -int RGWD4NCache::setObject(std::string oid, rgw::sal::Attrs* attrs) { - /* Creating the index based on oid */ - std::string key = "rgw-object:" + oid + ":cache"; - std::string result; - - if (!client.is_connected()) { - findClient(&client); - } - - /* Every set will be treated as new */ - try { - std::vector< std::pair > redisObject = buildObject(attrs); - - if (redisObject.empty()) { - return -1; - } - - client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - if (result != "OK") { - return -1; - } - } catch(std::exception &e) { - return -1; - } - - return 0; -} - -int RGWD4NCache::getObject(std::string oid, - rgw::sal::Attrs* newAttrs, - std::vector< std::pair >* newMetadata) -{ - std::string result; - std::string key = "rgw-object:" + oid + ":cache"; - - if (!client.is_connected()) { - findClient(&client); - } - - if (existKey(key)) { - int field_exist = -1; - - rgw::sal::Attrs::iterator it; - std::vector< std::pair > redisObject; - std::vector getFields; - - /* Retrieve existing fields from cache */ - try { - client.hgetall(key, [&getFields](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { - getFields.push_back(arr[i].as_string()); - } - } - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } - - /* Only data exists */ - if (getFields.size() == 1 && getFields[0] == "data") - return 0; - - /* Ensure all metadata, attributes, and data has been set */ - for (const auto& field : baseFields) { - auto it = std::find_if(getFields.begin(), getFields.end(), - [&](const auto& comp) { return comp == field; }); - - if (it != getFields.end()) { - int index = std::distance(getFields.begin(), it); - getFields.erase(getFields.begin() + index); - } else { - return -1; - } - } - - /* Get attributes from cache */ - try { - client.hmget(key, getFields, [&field_exist, &newAttrs, &getFields](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - field_exist = 0; - - for (long unsigned int i = 0; i < getFields.size(); ++i) { - std::string tmp = arr[i].as_string(); - buffer::list bl; - bl.append(tmp); - newAttrs->insert({getFields[i], bl}); - } - } - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } - - if (field_exist == 0) { - field_exist = -1; - - getFields.clear(); - getFields.insert(getFields.begin(), baseFields.begin(), baseFields.end()); - getFields.pop_back(); /* Do not query for data field */ - - /* Get metadata from cache */ - try { - client.hmget(key, getFields, [&field_exist, &newMetadata, &getFields](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - field_exist = 0; - - for (long unsigned int i = 0; i < getFields.size(); ++i) { - newMetadata->push_back({getFields[i], arr[i].as_string()}); - } - } - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } - } else { - return -1; - } - } else { - dout(20) << "RGW D4N Cache: Object was not retrievable." << dendl; - return -2; - } - - return 0; -} - -int RGWD4NCache::copyObject(std::string original_oid, std::string copy_oid, rgw::sal::Attrs* attrs) { - std::string result; - std::vector< std::pair > redisObject; - std::string key = "rgw-object:" + original_oid + ":cache"; - - if (!client.is_connected()) { - findClient(&client); - } - - /* Read values from cache */ - if (existKey(key)) { - try { - client.hgetall(key, [&redisObject](cpp_redis::reply &reply) { - if (reply.is_array()) { - auto arr = reply.as_array(); - - if (!arr[0].is_null()) { - for (long unsigned int i = 0; i < arr.size() - 1; i += 2) { - redisObject.push_back({arr[i].as_string(), arr[i + 1].as_string()}); - } - } - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } - } else { - return -2; - } - - /* Build copy with updated values */ - if (!redisObject.empty()) { - rgw::sal::Attrs::iterator attr; - - for (attr = attrs->begin(); attr != attrs->end(); ++attr) { - auto it = std::find_if(redisObject.begin(), redisObject.end(), - [&](const auto& pair) { return pair.first == attr->first; }); - - if (it != redisObject.end()) { - int index = std::distance(redisObject.begin(), it); - redisObject[index] = {attr->first, attr->second.to_str()}; - } else { - redisObject.push_back(std::make_pair(attr->first, attr->second.to_str())); - } - } - } else { - return -1; - } - - /* Set copy with new values */ - key = "rgw-object:" + copy_oid + ":cache"; - - try { - client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - if (result != "OK") { - return -1; - } - } catch(std::exception &e) { - return -1; - } - - return 0; -} - -int RGWD4NCache::delObject(std::string oid) { - int result = 0; - std::vector keys; - std::string key = "rgw-object:" + oid + ":cache"; - keys.push_back(key); - - if (!client.is_connected()) { - findClient(&client); - } - - if (existKey(key)) { - try { - client.del(keys, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - return result - 1; - } catch(std::exception &e) { - return -1; - } - } else { - dout(20) << "RGW D4N Cache: Object is not in cache." << dendl; - return -2; - } -} - -int RGWD4NCache::updateAttr(std::string oid, rgw::sal::Attrs* attr) { - std::string result; - std::string key = "rgw-object:" + oid + ":cache"; - - if (!client.is_connected()) { - findClient(&client); - } - - if (existKey(key)) { - try { - std::vector< std::pair > redisObject; - auto it = attr->begin(); - redisObject.push_back({it->first, it->second.to_str()}); - - client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - if (result != "OK") { - return -1; - } - } catch(std::exception &e) { - return -1; - } - } else { - return -2; - } - - return 0; -} - -int RGWD4NCache::delAttrs(std::string oid, std::vector& baseFields, std::vector& deleteFields) { - int result = 0; - std::string key = "rgw-object:" + oid + ":cache"; - - if (!client.is_connected()) { - findClient(&client); - } - - if (existKey(key)) { - /* Find if attribute doesn't exist */ - for (const auto& delField : deleteFields) { - if (std::find(baseFields.begin(), baseFields.end(), delField) == baseFields.end()) { - deleteFields.erase(std::find(deleteFields.begin(), deleteFields.end(), delField)); - } - } - - try { - client.hdel(key, deleteFields, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - return result - 1; - } catch(std::exception &e) { - return -1; - } - } - - dout(20) << "RGW D4N Cache: Object is not in cache." << dendl; - return -2; -} - -int RGWD4NCache::appendData(std::string oid, buffer::list& data) { - std::string result; - std::string value = ""; - std::string key = "rgw-object:" + oid + ":cache"; - - if (!client.is_connected()) { - findClient(&client); - } - - if (existKey(key)) { - try { - client.hget(key, "data", [&value](cpp_redis::reply &reply) { - if (!reply.is_null()) { - value = reply.as_string(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } - } - - try { - /* Append to existing value or set as new value */ - std::string temp = value + data.to_str(); - std::vector< std::pair > field; - field.push_back({"data", temp}); - - client.hmset(key, field, [&result](cpp_redis::reply &reply) { - if (!reply.is_null()) { - result = reply.as_string(); - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - if (result != "OK") { - return -1; - } - } catch(std::exception &e) { - return -1; - } - - return 0; -} - -int RGWD4NCache::deleteData(std::string oid) { - int result = 0; - std::string key = "rgw-object:" + oid + ":cache"; - std::vector deleteField; - deleteField.push_back("data"); - - if (!client.is_connected()) { - findClient(&client); - } - - if (existKey(key)) { - int field_exist = -1; - - try { - client.hget(key, "data", [&field_exist](cpp_redis::reply &reply) { - if (!reply.is_null()) { - field_exist = 0; - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - } catch(std::exception &e) { - return -1; - } - - if (field_exist == 0) { - try { - client.hdel(key, deleteField, [&result](cpp_redis::reply &reply) { - if (reply.is_integer()) { - result = reply.as_integer(); /* Returns 1 upon success */ - } - }); - - client.sync_commit(std::chrono::milliseconds(1000)); - - return result - 1; - } catch(std::exception &e) { - return -1; - } - } else { - return -1; - } - } else { - return 0; /* No delete was necessary */ - } -} diff --git a/src/rgw/driver/d4n/d4n_datacache.h b/src/rgw/driver/d4n/d4n_datacache.h deleted file mode 100644 index 5faf7b6ce0e..00000000000 --- a/src/rgw/driver/d4n/d4n_datacache.h +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef CEPH_RGWD4NCACHE_H -#define CEPH_RGWD4NCACHE_H - -#include "rgw_common.h" -#include -#include -#include - -class RGWD4NCache { - public: - CephContext *cct; - - RGWD4NCache() {} - RGWD4NCache(std::string cacheHost, int cachePort):host(cacheHost), port(cachePort) {} - - void init(CephContext *_cct) { - cct = _cct; - host = cct->_conf->rgw_d4n_host; - port = cct->_conf->rgw_d4n_port; - } - - int findClient(cpp_redis::client *client); - int existKey(std::string key); - int setObject(std::string oid, rgw::sal::Attrs* attrs); - int getObject(std::string oid, rgw::sal::Attrs* newAttrs, std::vector< std::pair >* newMetadata); - int copyObject(std::string original_oid, std::string copy_oid, rgw::sal::Attrs* attrs); - int delObject(std::string oid); - int updateAttr(std::string oid, rgw::sal::Attrs* attr); - int delAttrs(std::string oid, std::vector& baseFields, std::vector& deleteFields); - int appendData(std::string oid, buffer::list& data); - int deleteData(std::string oid); - - private: - cpp_redis::client client; - std::string host = ""; - int port = 0; - std::vector< std::pair > buildObject(rgw::sal::Attrs* binary); -}; - -#endif diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 96667295533..441c4bc1492 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -1,18 +1,21 @@ #include "d4n_directory.h" +#include #define dout_subsys ceph_subsys_rgw #define dout_context g_ceph_context -int RGWBlockDirectory::findClient(cpp_redis::client *client) { +namespace rgw { namespace d4n { + +int ObjectDirectory::find_client(cpp_redis::client* client) { if (client->is_connected()) return 0; - if (host == "" || port == 0) { + if (addr.host == "" || addr.port == 0) { dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl; return EDESTADDRREQ; } - client->connect(host, port, nullptr); + client->connect(addr.host, addr.port, nullptr); if (!client->is_connected()) return ECONNREFUSED; @@ -20,12 +23,12 @@ int RGWBlockDirectory::findClient(cpp_redis::client *client) { return 0; } -std::string RGWBlockDirectory::buildIndex(cache_block *ptr) { - return "rgw-object:" + ptr->c_obj.obj_name + ":directory"; +std::string ObjectDirectory::build_index(CacheObj* object) { + return "rgw-object:" + object->objName + ":object-directory"; } -int RGWBlockDirectory::existKey(std::string key) { - int result = -1; +int ObjectDirectory::exist_key(std::string key) { + int result = 0; std::vector keys; keys.push_back(key); @@ -36,7 +39,7 @@ int RGWBlockDirectory::existKey(std::string key) { try { client.exists(keys, [&result](cpp_redis::reply &reply) { if (reply.is_integer()) { - result = reply.as_integer(); /* Returns 1 upon success */ + result = reply.as_integer(); } }); @@ -46,31 +49,209 @@ int RGWBlockDirectory::existKey(std::string key) { return result; } -int RGWBlockDirectory::setValue(cache_block *ptr) { - /* Creating the index based on obj_name */ - std::string key = buildIndex(ptr); +int ObjectDirectory::set_value(CacheObj* object) { + /* Creating the index based on objName */ + std::string result; + std::string key = build_index(object); if (!client.is_connected()) { - findClient(&client); + find_client(&client); } - std::string result; + /* Every set will be new */ + if (addr.host == "" || addr.port == 0) { + dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl; + return -2; + } + + std::string endpoint = addr.host + ":" + std::to_string(addr.port); + std::vector< std::pair > list; + + /* Creating a list of the entry's properties */ + list.push_back(make_pair("key", key)); + list.push_back(make_pair("objName", object->objName)); + list.push_back(make_pair("bucketName", object->bucketName)); + list.push_back(make_pair("creationTime", std::to_string(object->creationTime))); + list.push_back(make_pair("dirty", std::to_string(object->dirty))); + list.push_back(make_pair("hosts", endpoint)); + + try { + client.hmset(key, list, [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result != "OK") { + return -1; + } + } catch(std::exception &e) { + return -1; + } + + return 0; +} + +int ObjectDirectory::get_value(CacheObj* object) { + int keyExist = -2; + std::string key = build_index(object); + + if (!client.is_connected()) { + find_client(&client); + } + + if (exist_key(key)) { + std::string key; + std::string objName; + std::string bucketName; + std::string creationTime; + std::string dirty; + std::string hosts; + std::vector fields; + + fields.push_back("key"); + fields.push_back("objName"); + fields.push_back("bucketName"); + fields.push_back("creationTime"); + fields.push_back("dirty"); + fields.push_back("hosts"); + + try { + client.hmget(key, fields, [&key, &objName, &bucketName, &creationTime, &dirty, &hosts, &keyExist](cpp_redis::reply &reply) { + if (reply.is_array()) { + auto arr = reply.as_array(); + + if (!arr[0].is_null()) { + keyExist = 0; + key = arr[0].as_string(); + objName = arr[1].as_string(); + bucketName = arr[2].as_string(); + creationTime = arr[3].as_string(); + dirty = arr[4].as_string(); + hosts = arr[5].as_string(); + } + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (keyExist < 0) { + return keyExist; + } + + /* Currently, there can only be one host */ + object->objName = objName; + object->bucketName = bucketName; + + struct std::tm tm; + std::istringstream(creationTime) >> std::get_time(&tm, "%T"); + strptime(creationTime.c_str(), "%T", &tm); // Need to check formatting -Sam + object->creationTime = mktime(&tm); + + std::istringstream(dirty) >> object->dirty; + } catch(std::exception &e) { + keyExist = -1; + } + } + + return keyExist; +} + +int ObjectDirectory::del_value(CacheObj* object) { + int result = 0; + std::vector keys; + std::string key = build_index(object); + keys.push_back(key); + + if (!client.is_connected()) { + find_client(&client); + } + + if (exist_key(key)) { + try { + client.del(keys, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + return result - 1; + } catch(std::exception &e) { + return -1; + } + } else { + return -2; + } +} + +int BlockDirectory::find_client(cpp_redis::client* client) { + if (client->is_connected()) + return 0; + + if (addr.host == "" || addr.port == 0) { + dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl; + return EDESTADDRREQ; + } + + client->connect(addr.host, addr.port, nullptr); + + if (!client->is_connected()) + return ECONNREFUSED; + + return 0; +} + +std::string BlockDirectory::build_index(CacheBlock* block) { + return "rgw-object:" + block->cacheObj.objName + ":block-directory"; +} + +int BlockDirectory::exist_key(std::string key) { + int result = 0; std::vector keys; keys.push_back(key); + + if (!client.is_connected()) { + return result; + } + + try { + client.exists(keys, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) {} + + return result; +} + +int BlockDirectory::set_value(CacheBlock* block) { + /* Creating the index based on objName */ + std::string result; + std::string key = build_index(block); + if (!client.is_connected()) { + find_client(&client); + } /* Every set will be new */ - if (host == "" || port == 0) { + if (addr.host == "" || addr.port == 0) { dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl; - return -1; + return -2; } - std::string endpoint = host + ":" + std::to_string(port); - std::vector> list; + std::string endpoint = addr.host + ":" + std::to_string(addr.port); + std::vector< std::pair > list; - /* Creating a list of key's properties */ + /* Creating a list of the entry's properties */ list.push_back(make_pair("key", key)); - list.push_back(make_pair("size", std::to_string(ptr->size_in_bytes))); - list.push_back(make_pair("bucket_name", ptr->c_obj.bucket_name)); - list.push_back(make_pair("obj_name", ptr->c_obj.obj_name)); + list.push_back(make_pair("size", std::to_string(block->size))); + list.push_back(make_pair("globalWeight", std::to_string(block->globalWeight))); + list.push_back(make_pair("bucketName", block->cacheObj.bucketName)); + list.push_back(make_pair("objName", block->cacheObj.objName)); list.push_back(make_pair("hosts", endpoint)); try { @@ -81,7 +262,7 @@ int RGWBlockDirectory::setValue(cache_block *ptr) { }); client.sync_commit(std::chrono::milliseconds(1000)); - + if (result != "OK") { return -1; } @@ -92,88 +273,149 @@ int RGWBlockDirectory::setValue(cache_block *ptr) { return 0; } -int RGWBlockDirectory::getValue(cache_block *ptr) { - std::string key = buildIndex(ptr); +int BlockDirectory::get_value(CacheBlock* block) { + int keyExist = -2; + std::string key = build_index(block); if (!client.is_connected()) { - findClient(&client); + find_client(&client); } - if (existKey(key)) { - int field_exist = -1; - + if (exist_key(key)) { std::string hosts; std::string size; - std::string bucket_name; - std::string obj_name; + std::string bucketName; + std::string objName; std::vector fields; fields.push_back("key"); fields.push_back("hosts"); fields.push_back("size"); - fields.push_back("bucket_name"); - fields.push_back("obj_name"); + fields.push_back("bucketName"); + fields.push_back("objName"); try { - client.hmget(key, fields, [&key, &hosts, &size, &bucket_name, &obj_name, &field_exist](cpp_redis::reply &reply) { + client.hmget(key, fields, [&key, &hosts, &size, &bucketName, &objName, &keyExist](cpp_redis::reply &reply) { if (reply.is_array()) { auto arr = reply.as_array(); if (!arr[0].is_null()) { - field_exist = 0; + keyExist = 0; key = arr[0].as_string(); hosts = arr[1].as_string(); size = arr[2].as_string(); - bucket_name = arr[3].as_string(); - obj_name = arr[4].as_string(); + bucketName = arr[3].as_string(); + objName = arr[4].as_string(); } } }); client.sync_commit(std::chrono::milliseconds(1000)); - if (field_exist < 0) { - return field_exist; + if (keyExist < 0 ) { + return keyExist; } - /* Currently, there can only be one host */ - ptr->size_in_bytes = std::stoi(size); - ptr->c_obj.bucket_name = bucket_name; - ptr->c_obj.obj_name = obj_name; + /* Currently, there can only be one host */ // update -Sam + block->size = std::stoi(size); + block->cacheObj.bucketName = bucketName; + block->cacheObj.objName = objName; } catch(std::exception &e) { - return -1; + keyExist = -1; } } - return 0; + return keyExist; } -int RGWBlockDirectory::delValue(cache_block *ptr) { +int BlockDirectory::del_value(CacheBlock* block) { int result = 0; std::vector keys; - std::string key = buildIndex(ptr); + std::string key = build_index(block); keys.push_back(key); if (!client.is_connected()) { - findClient(&client); + find_client(&client); } - if (existKey(key)) { + if (exist_key(key)) { try { client.del(keys, [&result](cpp_redis::reply &reply) { if (reply.is_integer()) { - result = reply.as_integer(); /* Returns 1 upon success */ + result = reply.as_integer(); } }); client.sync_commit(std::chrono::milliseconds(1000)); - return result - 1; } catch(std::exception &e) { return -1; } } else { - dout(20) << "RGW D4N Directory: Block is not in directory." << dendl; return -2; } } + +int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value) { // represent in cache block too -Sam + std::string result; + std::string key = build_index(block); + + if (!client.is_connected()) { + find_client(&client); + } + + if (exist_key(key)) { + if (field == "hostsList") { + /* Append rather than overwrite */ + std::string hosts; + + try { + client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) { + if (!reply.is_null()) { + hosts = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + value += "_"; + value += hosts; + } + + /* Update cache block */ // Remove ones that aren't used -Sam + if (field == "size") + block->size = std::stoi(value); + else if (field == "bucketName") + block->cacheObj.bucketName = value; + else if (field == "objName") + block->cacheObj.objName = value; + else if (field == "hostsList") + block->hostsList.push_back(value); + + std::vector< std::pair > list; + list.push_back(std::make_pair(field, value)); + + try { + client.hmset(key, list, [&result](cpp_redis::reply &reply) { + if (!reply.is_null()) { + result = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + + if (result != "OK") { + return -1; + } + } catch(std::exception &e) { + return -1; + } + } + + return 0; +} + +} } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 95596db660b..fd2690db141 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -1,53 +1,99 @@ -#ifndef CEPH_RGWD4NDIRECTORY_H -#define CEPH_RGWD4NDIRECTORY_H +#ifndef CEPH_D4NDIRECTORY_H +#define CEPH_D4NDIRECTORY_H #include "rgw_common.h" #include #include #include -struct cache_obj { - std::string bucket_name; /* s3 bucket name */ - std::string obj_name; /* s3 obj name */ +namespace rgw { namespace d4n { + +struct Address { + std::string host; + int port; +}; + +struct CacheObj { + std::string objName; /* S3 object name */ + std::string bucketName; /* S3 bucket name */ + time_t creationTime; // Creation time of the S3 Object + bool dirty; + std::vector hostsList; /* Currently not supported: list of hostnames of object locations for multiple backends */ }; -struct cache_block { - cache_obj c_obj; - uint64_t size_in_bytes; /* block size_in_bytes */ - std::vector hosts_list; /* Currently not supported: list of hostnames of block locations */ +struct CacheBlock { + CacheObj cacheObj; + uint64_t size; /* Block size in bytes */ + int globalWeight = 0; + std::vector hostsList; /* Currently not supported: list of hostnames of block locations */ }; -class RGWDirectory { +class Directory { public: - RGWDirectory() {} - CephContext *cct; + Directory() {} + CephContext* cct; +}; + +class ObjectDirectory: public Directory { // where else should object directory be called? -Sam + public: + ObjectDirectory() {} + ObjectDirectory(std::string host, int port) { + addr.host = host; + addr.port = port; + } + + void init(CephContext* _cct) { + cct = _cct; + addr.host = cct->_conf->rgw_d4n_host; + addr.port = cct->_conf->rgw_d4n_port; + } + + int find_client(cpp_redis::client* client); + int exist_key(std::string key); + Address get_addr() { return addr; } + + int set_value(CacheObj* object); + int get_value(CacheObj* object); + int copy_value(CacheObj* object, CacheObj* copyObject); + int del_value(CacheObj* object); + + private: + cpp_redis::client client; + Address addr; + std::string build_index(CacheObj* object); }; -class RGWBlockDirectory: RGWDirectory { +class BlockDirectory: public Directory { public: - RGWBlockDirectory() {} - RGWBlockDirectory(std::string blockHost, int blockPort):host(blockHost), port(blockPort) {} + BlockDirectory() {} + BlockDirectory(std::string host, int port) { + addr.host = host; + addr.port = port; + } - void init(CephContext *_cct) { + void init(CephContext* _cct) { cct = _cct; - host = cct->_conf->rgw_d4n_host; - port = cct->_conf->rgw_d4n_port; + addr.host = cct->_conf->rgw_d4n_host; + addr.port = cct->_conf->rgw_d4n_port; } - int findClient(cpp_redis::client *client); - int existKey(std::string key); - int setValue(cache_block *ptr); - int getValue(cache_block *ptr); - int delValue(cache_block *ptr); + int find_client(cpp_redis::client* client); + int exist_key(std::string key); + Address get_addr() { return addr; } - std::string get_host() { return host; } - int get_port() { return port; } + int set_value(CacheBlock* block); + int get_value(CacheBlock* block); + int copy_value(CacheBlock* block, CacheBlock* copyBlock); + int del_value(CacheBlock* block); + + int update_field(CacheBlock* block, std::string field, std::string value); private: cpp_redis::client client; - std::string buildIndex(cache_block *ptr); - std::string host = ""; - int port = 0; + Address addr; + std::string build_index(CacheBlock* block); }; +} } // namespace rgw::d4n + #endif diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc new file mode 100644 index 00000000000..22ea996a418 --- /dev/null +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -0,0 +1,404 @@ +#include "d4n_policy.h" + +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context + +namespace rgw { namespace d4n { + +int CachePolicy::find_client(const DoutPrefixProvider* dpp) { + if (client.is_connected()) + return 0; + + if (get_addr().host == "" || get_addr().port == 0) { + ldpp_dout(dpp, 10) << "RGW D4N Policy: D4N policy endpoint was not configured correctly" << dendl; + return EDESTADDRREQ; + } + + client.connect(get_addr().host, get_addr().port, nullptr); + + if (!client.is_connected()) + return ECONNREFUSED; + + return 0; +} + +int CachePolicy::exist_key(std::string key) { + int result = -1; + std::vector keys; + keys.push_back(key); + + if (!client.is_connected()) { + return result; + } + + try { + client.exists(keys, [&result](cpp_redis::reply &reply) { + if (reply.is_integer()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) {} + + return result; +} + +int LFUDAPolicy::set_age(int age) { + int result = 0; + + try { + client.hset("lfuda", "age", std::to_string(age), [&result](cpp_redis::reply& reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(); + } catch(std::exception &e) { + return -1; + } + + return result - 1; +} + +int LFUDAPolicy::get_age() { + int ret = 0; + int age = -1; + + try { + client.hexists("lfuda", "age", [&ret](cpp_redis::reply& reply) { + if (!reply.is_null()) { + ret = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + if (!ret) { + ret = set_age(0); /* Initialize age */ + + if (!ret) { + return 0; /* Success */ + } else { + return -1; + }; + } + + try { + client.hget("lfuda", "age", [&age](cpp_redis::reply& reply) { + if (!reply.is_null()) { + age = std::stoi(reply.as_string()); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + return age; +} + +int LFUDAPolicy::set_global_weight(std::string key, int weight) { + int result = 0; + + try { + client.hset(key, "globalWeight", std::to_string(weight), [&result](cpp_redis::reply& reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(); + } catch(std::exception &e) { + return -1; + } + + return result - 1; +} + +int LFUDAPolicy::get_global_weight(std::string key) { + int weight = -1; + + try { + client.hget(key, "globalWeight", [&weight](cpp_redis::reply& reply) { + if (!reply.is_null()) { + weight = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + return weight; +} + +int LFUDAPolicy::set_min_avg_weight(int weight, std::string cacheLocation) { + int result = 0; + + try { + client.hset("lfuda", "minAvgWeight:cache", cacheLocation, [&result](cpp_redis::reply& reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(); + } catch(std::exception &e) { + return -1; + } + + if (result == 1) { + result = 0; + try { + client.hset("lfuda", "minAvgWeight:weight", std::to_string(weight), [&result](cpp_redis::reply& reply) { + if (!reply.is_null()) { + result = reply.as_integer(); + } + }); + + client.sync_commit(); + } catch(std::exception &e) { + return -1; + } + } + + return result - 1; +} + +int LFUDAPolicy::get_min_avg_weight() { + int ret = 0; + int weight = -1; + + try { + client.hexists("lfuda", "minAvgWeight:cache", [&ret](cpp_redis::reply& reply) { + if (!reply.is_null()) { + ret = reply.as_integer(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + if (!ret) { + ret = set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */); /* Initialize minimum average weight */ + + if (!ret) { + return INT_MAX; /* Success */ + } else { + return -1; + }; + } + + try { + client.hget("lfuda", "minAvgWeight:weight", [&weight](cpp_redis::reply& reply) { + if (!reply.is_null()) { + weight = std::stoi(reply.as_string()); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + + return weight; +} + +CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) { + std::vector entries = cacheNode->list_entries(dpp); + std::string victimName; + int minWeight = INT_MAX; + + for (auto it = entries.begin(); it != entries.end(); ++it) { + std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight"); // should represent block -Sam + + if (!std::stoi(localWeightStr)) { // maybe do this in some sort of initialization procedure instead of here? -Sam + /* Local weight hasn't been set */ + int ret = cacheNode->set_attr(dpp, it->key, "localWeight", std::to_string(get_age())); + + if (ret < 0) + return {}; + } else if (std::stoi(localWeightStr) < minWeight) { + minWeight = std::stoi(localWeightStr); + victimName = it->key; + } + } + + /* Get victim cache block */ + CacheBlock victimBlock; + victimBlock.cacheObj.objName = victimName; + BlockDirectory blockDir; + blockDir.init(cct); + + int ret = blockDir.get_value(&victimBlock); + + if (ret < 0) + return {}; + + return victimBlock; +} + +int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) { + std::string key = "rgw-object:" + block->cacheObj.objName + ":directory"; + std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight"); // change to block name eventually -Sam + int localWeight = -1; + + if (!client.is_connected()) + find_client(dpp); + + if (localWeightStr.empty()) { // figure out where to set local weight -Sam + /* Local weight hasn't been set */ + int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age())); + localWeight = 0; + + if (ret < 0) + return -1; + } else { + localWeight = std::stoi(localWeightStr); + } + + int age = get_age(); + + if (cacheNode->key_exists(dpp, block->cacheObj.objName)) { /* Local copy */ + localWeight += age; + } else { + std::string hosts; + uint64_t freeSpace = cacheNode->get_free_space(dpp); + + while (freeSpace < block->size) + freeSpace += eviction(dpp, cacheNode); + + if (exist_key(key)) { + try { + client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) { + if (!reply.is_null()) { + hosts = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + } else { + return -2; + } + + // should not hold local cache IP if in this else statement -Sam + if (hosts.length() > 0) { /* Remote copy */ + int globalWeight = get_global_weight(key); + globalWeight += age; + + if (set_global_weight(key, globalWeight)) + return -1; + } else { /* No remote copy */ + // do I need to add the block to the local cache here? -Sam + // update hosts list for block as well? check read workflow -Sam + localWeight += age; + return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(localWeight)); + } + } + return 0; +} + +uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) { + CacheBlock victim = find_victim(dpp, cacheNode); + + if (victim.cacheObj.objName.empty()) { + ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl; + return -1; + } + + std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory"; + std::string hosts; + int globalWeight = get_global_weight(key); + int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight")); // change to block name eventually -Sam + int avgWeight = get_min_avg_weight(); + + if (exist_key(key)) { + try { + client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) { + if (!reply.is_null()) { + hosts = reply.as_string(); + } + }); + + client.sync_commit(std::chrono::milliseconds(1000)); + } catch(std::exception &e) { + return -1; + } + } else { + return -2; + } + + if (hosts.empty()) { /* Last copy */ + if (globalWeight > 0) { + localWeight += globalWeight; + int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight)); + + if (!ret) + ret = set_global_weight(key, 0); + else + return -1; + + if (ret) + return -1; + } + + if (avgWeight < 0) + return -1; + + if (localWeight > avgWeight) { + // push block to remote cache + } + } + + globalWeight += localWeight; + + if (set_global_weight(key, globalWeight)) + return -2; + + ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl; + int ret = cacheNode->delete_data(dpp, victim.cacheObj.objName); + + if (!ret) { + ret = set_min_avg_weight(avgWeight - (localWeight/cacheNode->get_num_entries(dpp)), ""/*local cache location*/); // Where else must this be set? -Sam + + if (!ret) { + int age = get_age(); + age = std::max(localWeight, age); + ret = set_age(age); + + if (ret) + return -1; + } else { + return -1; + } + } else { + return -1; + } + + return victim.size; +} + +int PolicyDriver::init() { + rgw::cache::Partition partition_info; + cacheDriver = new rgw::cache::RedisDriver(partition_info, "127.0.0.1", 6379); // hardcoded for now -Sam + + if (policyName == "lfuda") { + cachePolicy = new LFUDAPolicy(); + return 0; + } else + return -1; +} + +} } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h new file mode 100644 index 00000000000..15765c94c38 --- /dev/null +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -0,0 +1,76 @@ +#ifndef CEPH_D4NPOLICY_H +#define CEPH_D4NPOLICY_H + +#include +#include +#include +#include "rgw_common.h" +#include "d4n_directory.h" +#include "../../rgw_redis_driver.h" + +namespace rgw { namespace d4n { + +class CachePolicy { + private: + cpp_redis::client client; + Address addr; + + public: + CephContext* cct; + + CachePolicy() : addr() {} + virtual ~CachePolicy() = default; + + virtual void init(CephContext *_cct) { + cct = _cct; + addr.host = cct->_conf->rgw_d4n_host; + addr.port = cct->_conf->rgw_d4n_port; + } + virtual int find_client(const DoutPrefixProvider* dpp) = 0; + virtual int exist_key(std::string key) = 0; + virtual Address get_addr() { return addr; } + virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0; + virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0; +}; + +class LFUDAPolicy : public CachePolicy { + private: + cpp_redis::client client; + + public: + LFUDAPolicy() : CachePolicy() {} + + int set_age(int age); + int get_age(); + int set_global_weight(std::string key, int weight); + int get_global_weight(std::string key); + int set_min_avg_weight(int weight, std::string cacheLocation); + int get_min_avg_weight(); + CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode); + + virtual int find_client(const DoutPrefixProvider* dpp) override { return CachePolicy::find_client(dpp); } + virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); } + virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override; + virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override; +}; + +class PolicyDriver { + private: + std::string policyName; + + public: + CachePolicy* cachePolicy; + rgw::cache::CacheDriver* cacheDriver; // might place elsewhere -Sam + + PolicyDriver(std::string _policyName) : policyName(_policyName) {} + ~PolicyDriver() { + delete cachePolicy; + delete cacheDriver; + } + + int init(); +}; + +} } // namespace rgw::d4n + +#endif diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index e2624690ebd..9e5474ed472 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -39,8 +39,13 @@ static inline Object* nextObject(Object* t) int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { FilterDriver::initialize(cct, dpp); - blk_dir->init(cct); - d4n_cache->init(cct); + + objDir->init(cct); + blockDir->init(cct); + + policyDriver->init(); + policyDriver->cachePolicy->init(cct); + policyDriver->cacheDriver->initialize(cct, dpp); return 0; } @@ -94,6 +99,25 @@ int D4NFilterObject::copy_object(User* user, const DoutPrefixProvider* dpp, optional_yield y) { + /* Build cache block copy */ + rgw::d4n::CacheBlock* copyCacheBlock = new rgw::d4n::CacheBlock(); // How will this copy work in lfuda? -Sam + + copyCacheBlock->hostsList.push_back(driver->get_cache_block()->hostsList[0]); + copyCacheBlock->size = driver->get_cache_block()->size; + copyCacheBlock->size = driver->get_cache_block()->globalWeight; // Do we want to reset the global weight? -Sam + copyCacheBlock->cacheObj.bucketName = dest_bucket->get_name(); + copyCacheBlock->cacheObj.objName = dest_object->get_key().get_oid(); + + int copy_valueReturn = driver->get_block_dir()->set_value(copyCacheBlock); + + if (copy_valueReturn < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation succeeded." << dendl; + } + + delete copyCacheBlock; + /* Append additional metadata to attributes */ rgw::sal::Attrs baseAttrs = this->get_attrs(); buffer::list bl; @@ -130,13 +154,20 @@ int D4NFilterObject::copy_object(User* user, baseAttrs.insert(attrs.begin(), attrs.end()); } - int copyObjReturn = filter->get_d4n_cache()->copyObject(this->get_key().get_oid(), dest_object->get_key().get_oid(), &baseAttrs); + /* + int copy_attrsReturn = driver->get_policy_driver()->cacheDriver->copy_attrs(this->get_key().get_oid(), dest_object->get_key().get_oid(), &baseAttrs); - if (copyObjReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation failed." << dendl; + if (copy_attrsReturn < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Cache copy attributes operation failed." << dendl; } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation succeeded." << dendl; - } + int copy_dataReturn = driver->get_policy_driver()->cacheDriver->copy_data(this->get_key().get_oid(), dest_object->get_key().get_oid()); + + if (copy_dataReturn < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Cache copy data operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation succeeded." << dendl; + } + }*/ return next->copy_object(user, info, source_zone, nextObject(dest_object), @@ -162,9 +193,9 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr } } - int updateAttrsReturn = filter->get_d4n_cache()->setObject(this->get_key().get_oid(), setattrs); + int update_attrsReturn = driver->get_policy_driver()->cacheDriver->set_attrs(dpp, this->get_key().get_oid(), *setattrs); - if (updateAttrsReturn < 0) { + if (update_attrsReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation failed." << dendl; } else { ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation succeeded." << dendl; @@ -172,25 +203,19 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr } if (delattrs != NULL) { - std::vector delFields; - Attrs::iterator attrs; + Attrs::iterator attr; + Attrs currentattrs = this->get_attrs(); - /* Extract fields from delattrs */ - for (attrs = delattrs->begin(); attrs != delattrs->end(); ++attrs) { - delFields.push_back(attrs->first); + /* Ensure all delAttrs exist */ + for (const auto& attr : *delattrs) { + if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) { + delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr)); + } } - Attrs currentattrs = this->get_attrs(); - std::vector currentFields; - - /* Extract fields from current attrs */ - for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) { - currentFields.push_back(attrs->first); - } - - int delAttrsReturn = filter->get_d4n_cache()->delAttrs(this->get_key().get_oid(), currentFields, delFields); + int del_attrsReturn = driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, this->get_key().get_oid(), *delattrs); - if (delAttrsReturn < 0) { + if (del_attrsReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation failed." << dendl; } else { ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation succeeded." << dendl; @@ -203,20 +228,17 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj) { - rgw::sal::Attrs newAttrs; - std::vector< std::pair > newMetadata; - int getAttrsReturn = filter->get_d4n_cache()->getObject(this->get_key().get_oid(), - &newAttrs, - &newMetadata); + rgw::sal::Attrs attrs; + int get_attrsReturn = driver->get_policy_driver()->cacheDriver->get_attrs(dpp, this->get_key().get_oid(), attrs); - if (getAttrsReturn < 0) { + if (get_attrsReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl; return next->get_obj_attrs(y, dpp, target_obj); } else { - int setAttrsReturn = this->set_attrs(newAttrs); + int set_attrsReturn = this->set_attrs(attrs); - if (setAttrsReturn < 0) { + if (set_attrsReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl; return next->get_obj_attrs(y, dpp, target_obj); @@ -233,9 +255,9 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va { Attrs update; update[(std::string)attr_name] = attr_val; - int updateAttrsReturn = filter->get_d4n_cache()->updateAttr(this->get_key().get_oid(), &update); + int update_attrsReturn = driver->get_policy_driver()->cacheDriver->update_attrs(dpp, this->get_key().get_oid(), update); - if (updateAttrsReturn < 0) { + if (update_attrsReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation failed." << dendl; } else { ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation succeeded." << dendl; @@ -247,27 +269,26 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) { - std::vector delFields; - delFields.push_back((std::string)attr_name); - - Attrs::iterator attrs; + buffer::list bl; + Attrs delattr; + delattr.insert({attr_name, bl}); Attrs currentattrs = this->get_attrs(); - std::vector currentFields; - - /* Extract fields from current attrs */ - for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) { - currentFields.push_back(attrs->first); - } - - int delAttrReturn = filter->get_d4n_cache()->delAttrs(this->get_key().get_oid(), currentFields, delFields); + rgw::sal::Attrs::iterator attr = delattr.begin(); - if (delAttrReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation succeeded." << dendl; - } - - return next->delete_obj_attrs(dpp, attr_name, y); + /* Ensure delAttr exists */ + if (std::find_if(currentattrs.begin(), currentattrs.end(), + [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) { + int delAttrReturn = driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, this->get_key().get_oid(), delattr); + + if (delAttrReturn < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation succeeded." << dendl; + } + } else + return next->delete_obj_attrs(dpp, attr_name, y); + + return 0; } std::unique_ptr D4NFilterDriver::get_object(const rgw_obj_key& k) @@ -306,19 +327,10 @@ std::unique_ptr D4NFilterObject::get_delete_op() int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp) { - int getDirReturn = source->filter->get_block_dir()->getValue(source->filter->get_cache_block()); - - if (getDirReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Directory get operation failed." << dendl; - } else { - ldpp_dout(dpp, 20) << "D4N Filter: Directory get operation succeeded." << dendl; - } - - rgw::sal::Attrs newAttrs; - std::vector< std::pair > newMetadata; - int getObjReturn = source->filter->get_d4n_cache()->getObject(source->get_key().get_oid(), - &newAttrs, - &newMetadata); + rgw::sal::Attrs attrs; + int getObjReturn = source->driver->get_policy_driver()->cacheDriver->get_attrs(dpp, + source->get_key().get_oid(), + attrs); int ret = next->prepare(y, dpp); @@ -327,55 +339,197 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix } else { /* Set metadata locally */ RGWObjState* astate; + RGWQuotaInfo quota_info; + std::unique_ptr user = source->driver->get_user(source->get_bucket()->get_owner()); source->get_obj_state(dpp, &astate, y); - for (auto it = newMetadata.begin(); it != newMetadata.end(); ++it) { - if (!std::strcmp(it->first.data(), "mtime")) { - parse_time(it->second.data(), &astate->mtime); - } else if (!std::strcmp(it->first.data(), "object_size")) { - source->set_obj_size(std::stoull(it->second)); - } else if (!std::strcmp(it->first.data(), "accounted_size")) { - astate->accounted_size = std::stoull(it->second); - } else if (!std::strcmp(it->first.data(), "epoch")) { - astate->epoch = std::stoull(it->second); - } else if (!std::strcmp(it->first.data(), "version_id")) { - source->set_instance(it->second); - } else if (!std::strcmp(it->first.data(), "source_zone_short_id")) { - astate->zone_short_id = static_cast(std::stoul(it->second)); + for (auto it = attrs.begin(); it != attrs.end(); ++it) { + if (it->second.length() > 0) { // or return? -Sam + if (it->first == "mtime") { + parse_time(it->second.c_str(), &astate->mtime); + attrs.erase(it->first); + } else if (it->first == "object_size") { + source->set_obj_size(std::stoull(it->second.c_str())); + attrs.erase(it->first); + } else if (it->first == "accounted_size") { + astate->accounted_size = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "epoch") { + astate->epoch = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "version_id") { + source->set_instance(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "source_zone_short_id") { + astate->zone_short_id = static_cast(std::stoul(it->second.c_str())); + attrs.erase(it->first); + } else if (it->first == "user_quota.max_size") { + quota_info.max_size = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "user_quota.max_objects") { + quota_info.max_objects = std::stoull(it->second.c_str()); + attrs.erase(it->first); + } else if (it->first == "max_buckets") { + user->set_max_buckets(std::stoull(it->second.c_str())); + attrs.erase(it->first); } } - + user->set_info(quota_info); source->set_obj_state(*astate); /* Set attributes locally */ - int setAttrsReturn = source->set_attrs(newAttrs); + int set_attrsReturn = source->set_attrs(attrs); - if (setAttrsReturn < 0) { + if (set_attrsReturn < 0) { ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation failed." << dendl; } else { ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation succeeded." << dendl; } } + } return ret; } +int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, + RGWGetDataCB* cb, optional_yield y) +{ + /* Execute cache replacement policy */ + int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(), + source->driver->get_policy_driver()->cacheDriver); + + if (policyRet < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation succeeded." << dendl; + } + + int ret = -1; + bufferlist bl; + uint64_t len = end - ofs + 1; + std::string oid(source->get_name()); + + /* Local cache check */ + if (source->driver->get_policy_driver()->cacheDriver->key_exists(dpp, oid)) { // Entire object for now -Sam + ret = source->driver->get_policy_driver()->cacheDriver->get(dpp, source->get_key().get_oid(), ofs, len, bl, source->get_attrs()); + cb->handle_data(bl, ofs, len); + } else { + /* Block directory check */ + int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block()); + + if (getDirReturn >= -1) { + if (getDirReturn == -1) { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation succeeded." << dendl; + } + + // remote cache get + + /* Cache block locally */ + ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam + + if (!ret) { + int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/); + + if (updateValueReturn < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation succeeded." << dendl; + } + + cb->handle_data(bl, ofs, len); + } + } else { + /* Write tier retrieval */ + ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl; + getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj)); + + if (getDirReturn >= -1) { + if (getDirReturn == -1) { + ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation succeeded." << dendl; + } + + // retrieve from write back cache, which will be stored as a cache driver instance in the filter + + /* Cache block locally */ + ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam + + if (!ret) { + int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/); + + if (updateValueReturn < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation succeeded." << dendl; + } + + cb->handle_data(bl, ofs, len); + } + } else { + /* Backend store retrieval */ + ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl; + ret = next->iterate(dpp, ofs, end, cb, y); + + if (!ret) { + /* Cache block locally */ + ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam + + /* Store block in directory */ + rgw::d4n::BlockDirectory* tempBlockDir = source->driver->get_block_dir(); // remove later -Sam + + source->driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); // local cache address -Sam + source->driver->get_cache_block()->size = source->get_obj_size(); + source->driver->get_cache_block()->cacheObj.bucketName = source->get_bucket()->get_name(); + source->driver->get_cache_block()->cacheObj.objName = source->get_key().get_oid(); + + int setDirReturn = tempBlockDir->set_value(source->driver->get_cache_block()); + + if (setDirReturn < 0) { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl; + } else { + ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + } + } + } + } + } + + if (ret < 0) + ldpp_dout(dpp, 20) << "D4N Filter: Cache iterate operation failed." << dendl; + + return next->iterate(dpp, ofs, end, cb, y); +} + int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) { - int delDirReturn = source->filter->get_block_dir()->delValue(source->filter->get_cache_block()); + int delDirReturn = source->driver->get_block_dir()->del_value(source->driver->get_cache_block()); if (delDirReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Directory delete operation failed." << dendl; + ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation failed." << dendl; } else { - ldpp_dout(dpp, 20) << "D4N Filter: Directory delete operation succeeded." << dendl; + ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation succeeded." << dendl; } - int delObjReturn = source->filter->get_d4n_cache()->delObject(source->get_key().get_oid()); + Attrs::iterator attrs; + Attrs currentattrs = source->get_attrs(); + std::vector currentFields; + + /* Extract fields from current attrs */ + for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) { + currentFields.push_back(attrs->first); + } + + int delObjReturn = source->driver->get_policy_driver()->cacheDriver->delete_data(dpp, source->get_key().get_oid()); if (delObjReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation failed." << dendl; + ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object operation failed." << dendl; } else { + Attrs delattrs = source->get_attrs(); + delObjReturn = source->driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, source->get_key().get_oid(), delattrs); ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation succeeded." << dendl; } @@ -384,9 +538,9 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp int D4NFilterWriter::prepare(optional_yield y) { - int delDataReturn = filter->get_d4n_cache()->deleteData(obj->get_key().get_oid()); + int del_dataReturn = driver->get_policy_driver()->cacheDriver->delete_data(save_dpp, obj->get_key().get_oid()); - if (delDataReturn < 0) { + if (del_dataReturn < 0) { ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation failed." << dendl; } else { ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation succeeded." << dendl; @@ -397,9 +551,9 @@ int D4NFilterWriter::prepare(optional_yield y) int D4NFilterWriter::process(bufferlist&& data, uint64_t offset) { - int appendDataReturn = filter->get_d4n_cache()->appendData(obj->get_key().get_oid(), data); + int append_dataReturn = driver->get_policy_driver()->cacheDriver->append_data(save_dpp, obj->get_key().get_oid(), data); - if (appendDataReturn < 0) { + if (append_dataReturn < 0) { ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl; } else { ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl; @@ -418,34 +572,33 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, const req_context& rctx, uint32_t flags) { - cache_block* temp_cache_block = filter->get_cache_block(); - RGWBlockDirectory* temp_block_dir = filter->get_block_dir(); + rgw::d4n::BlockDirectory* tempBlockDir = driver->get_block_dir(); - temp_cache_block->hosts_list.push_back(temp_block_dir->get_host() + ":" + std::to_string(temp_block_dir->get_port())); - temp_cache_block->size_in_bytes = accounted_size; - temp_cache_block->c_obj.bucket_name = obj->get_bucket()->get_name(); - temp_cache_block->c_obj.obj_name = obj->get_key().get_oid(); + driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); + driver->get_cache_block()->size = accounted_size; + driver->get_cache_block()->cacheObj.bucketName = obj->get_bucket()->get_name(); + driver->get_cache_block()->cacheObj.objName = obj->get_key().get_oid(); - int setDirReturn = temp_block_dir->setValue(temp_cache_block); + int setDirReturn = tempBlockDir->set_value(driver->get_cache_block()); if (setDirReturn < 0) { - ldpp_dout(save_dpp, 20) << "D4N Filter: Directory set operation failed." << dendl; + ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl; } else { - ldpp_dout(save_dpp, 20) << "D4N Filter: Directory set operation succeeded." << dendl; + ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; } /* Retrieve complete set of attrs */ - RGWObjState* astate; int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data, zones_trace, canceled, rctx, flags); obj->get_obj_attrs(rctx.y, save_dpp, NULL); - obj->get_obj_state(save_dpp, &astate, rctx.y); /* Append additional metadata to attributes */ rgw::sal::Attrs baseAttrs = obj->get_attrs(); rgw::sal::Attrs attrs_temp = baseAttrs; buffer::list bl; + RGWObjState* astate; + obj->get_obj_state(save_dpp, &astate, rctx.y); bl.append(to_iso_8601(obj->get_mtime())); baseAttrs.insert({"mtime", bl}); @@ -486,12 +639,12 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, baseAttrs.insert(attrs.begin(), attrs.end()); - int setObjReturn = filter->get_d4n_cache()->setObject(obj->get_key().get_oid(), &baseAttrs); + int set_attrsReturn = driver->get_policy_driver()->cacheDriver->set_attrs(save_dpp, obj->get_key().get_oid(), baseAttrs); - if (setObjReturn < 0) { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set operation failed." << dendl; + if (set_attrsReturn < 0) { + ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation failed." << dendl; } else { - ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set operation succeeded." << dendl; + ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation succeeded." << dendl; } return ret; @@ -509,4 +662,3 @@ rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next) } } - diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 840eb99f604..7a978fbec24 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -21,28 +21,32 @@ #include "rgw_role.h" #include "common/dout.h" +#include "rgw_redis_driver.h" #include "driver/d4n/d4n_directory.h" -#include "driver/d4n/d4n_datacache.h" +#include "driver/d4n/d4n_policy.h" namespace rgw { namespace sal { class D4NFilterDriver : public FilterDriver { private: - RGWBlockDirectory* blk_dir; - cache_block* c_blk; - RGWD4NCache* d4n_cache; + rgw::d4n::ObjectDirectory* objDir; + rgw::d4n::BlockDirectory* blockDir; + rgw::d4n::CacheBlock* cacheBlock; + rgw::d4n::PolicyDriver* policyDriver; public: D4NFilterDriver(Driver* _next) : FilterDriver(_next) { - blk_dir = new RGWBlockDirectory(); /* Initialize directory address with cct */ - c_blk = new cache_block(); - d4n_cache = new RGWD4NCache(); + objDir = new rgw::d4n::ObjectDirectory(); + blockDir = new rgw::d4n::BlockDirectory(); + cacheBlock = new rgw::d4n::CacheBlock(); + policyDriver = new rgw::d4n::PolicyDriver("lfuda"); } virtual ~D4NFilterDriver() { - delete blk_dir; - delete c_blk; - delete d4n_cache; + delete objDir; + delete blockDir; + delete cacheBlock; + delete policyDriver; } virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override; @@ -57,19 +61,20 @@ class D4NFilterDriver : public FilterDriver { const rgw_placement_rule *ptail_placement_rule, uint64_t olh_epoch, const std::string& unique_tag) override; - RGWBlockDirectory* get_block_dir() { return blk_dir; } - cache_block* get_cache_block() { return c_blk; } - RGWD4NCache* get_d4n_cache() { return d4n_cache; } + rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir; } + rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; } + rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; } + rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; } }; class D4NFilterUser : public FilterUser { private: - D4NFilterDriver* filter; + D4NFilterDriver* driver; public: - D4NFilterUser(std::unique_ptr _next, D4NFilterDriver* _filter) : + D4NFilterUser(std::unique_ptr _next, D4NFilterDriver* _driver) : FilterUser(std::move(_next)), - filter(_filter) {} + driver(_driver) {} virtual ~D4NFilterUser() = default; }; @@ -91,7 +96,7 @@ class D4NFilterBucket : public FilterBucket { class D4NFilterObject : public FilterObject { private: - D4NFilterDriver* filter; + D4NFilterDriver* driver; public: struct D4NFilterReadOp : FilterReadOp { @@ -102,6 +107,8 @@ class D4NFilterObject : public FilterObject { virtual ~D4NFilterReadOp() = default; virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override; + virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, + RGWGetDataCB* cb, optional_yield y) override; }; struct D4NFilterDeleteOp : FilterDeleteOp { @@ -114,12 +121,12 @@ class D4NFilterObject : public FilterObject { virtual int delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) override; }; - D4NFilterObject(std::unique_ptr _next, D4NFilterDriver* _filter) : FilterObject(std::move(_next)), - filter(_filter) {} - D4NFilterObject(std::unique_ptr _next, Bucket* _bucket, D4NFilterDriver* _filter) : FilterObject(std::move(_next), _bucket), - filter(_filter) {} - D4NFilterObject(D4NFilterObject& _o, D4NFilterDriver* _filter) : FilterObject(_o), - filter(_filter) {} + D4NFilterObject(std::unique_ptr _next, D4NFilterDriver* _driver) : FilterObject(std::move(_next)), + driver(_driver) {} + D4NFilterObject(std::unique_ptr _next, Bucket* _bucket, D4NFilterDriver* _driver) : FilterObject(std::move(_next), _bucket), + driver(_driver) {} + D4NFilterObject(D4NFilterObject& _o, D4NFilterDriver* _driver) : FilterObject(_o), + driver(_driver) {} virtual ~D4NFilterObject() = default; virtual int copy_object(User* user, @@ -153,18 +160,18 @@ class D4NFilterObject : public FilterObject { class D4NFilterWriter : public FilterWriter { private: - D4NFilterDriver* filter; + D4NFilterDriver* driver; const DoutPrefixProvider* save_dpp; bool atomic; public: - D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _filter, Object* _obj, + D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _driver, Object* _obj, const DoutPrefixProvider* _dpp) : FilterWriter(std::move(_next), _obj), - filter(_filter), + driver(_driver), save_dpp(_dpp), atomic(false) {} - D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _filter, Object* _obj, + D4NFilterWriter(std::unique_ptr _next, D4NFilterDriver* _driver, Object* _obj, const DoutPrefixProvider* _dpp, bool _atomic) : FilterWriter(std::move(_next), _obj), - filter(_filter), + driver(_driver), save_dpp(_dpp), atomic(_atomic) {} virtual ~D4NFilterWriter() = default; diff --git a/src/rgw/rgw_process_env.h b/src/rgw/rgw_process_env.h index 710340f0a25..5ad87d23e8f 100644 --- a/src/rgw/rgw_process_env.h +++ b/src/rgw/rgw_process_env.h @@ -4,6 +4,8 @@ #pragma once #include +#include "rgw_sal.h" +#include "rgw_auth.h" class ActiveRateLimiter; class OpsLogSink; diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index a5a0f4c6609..e76a354e0c5 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -58,11 +58,10 @@ int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) { if (client.is_connected()) return 0; - /* if (addr.host == "" || addr.port == 0) { dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl; return EDESTADDRREQ; - }*/ + } client.connect("127.0.0.1", 6379, nullptr); @@ -683,4 +682,19 @@ int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, return result; } +std::unique_ptr RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) +{ + return std::make_unique(this); +} + +rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) +{ + rgw_raw_obj r_obj; + r_obj.oid = key; + return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id); +} + +void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {} +void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {} + } } // namespace rgw::cal diff --git a/src/rgw/rgw_redis_driver.h b/src/rgw/rgw_redis_driver.h index 9f1bffe213a..b92c191e27b 100644 --- a/src/rgw/rgw_redis_driver.h +++ b/src/rgw/rgw_redis_driver.h @@ -10,21 +10,39 @@ namespace rgw { namespace cache { +class RedisDriver; + +class RedisCacheAioRequest: public CacheAioRequest { +public: + RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {} + virtual ~RedisCacheAioRequest() = default; + virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; + virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override; +private: + RedisDriver* cache_driver; +}; + class RedisDriver : public CacheDriver { private: cpp_redis::client client; + rgw::d4n::Address addr; std::unordered_map entries; + int find_client(const DoutPrefixProvider* dpp); int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len); int remove_entry(const DoutPrefixProvider* dpp, std::string key); std::optional get_entry(const DoutPrefixProvider* dpp, std::string key); public: - RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {} + RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() { + addr.host = host; + addr.port = port; + } virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override; virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override; + virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) override; virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; @@ -38,10 +56,13 @@ class RedisDriver : public CacheDriver { virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override; virtual std::vector list_entries(const DoutPrefixProvider* dpp) override; virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override; + int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam /* Partition */ virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override; virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override; + + virtual std::unique_ptr get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override; }; } } // namespace rgw::cal diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index d9dd1bde603..0f6e1745d71 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -20,6 +20,7 @@ #include #include "common/errno.h" +//#include "common/dout.h" #include "rgw_sal.h" #include "rgw_sal_rados.h" @@ -44,6 +45,7 @@ #endif #define dout_subsys ceph_subsys_rgw +//#define dout_context g_ceph_context extern "C" { extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context); diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 8aaf2acb0a6..c69d2d4ecb1 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -1,4 +1,4 @@ -#include "d4n_directory.h" +#include "../rgw/driver/d4n/d4n_directory.h" // Fix -Sam #include "rgw_process_env.h" #include #include @@ -10,65 +10,65 @@ using namespace std; string portStr; string hostStr; string redisHost = ""; -string oid = "samoid"; +string oid = "testName"; string bucketName = "testBucket"; -int blkSize = 123; +int blockSize = 123; class DirectoryFixture: public ::testing::Test { protected: virtual void SetUp() { - blk_dir = new RGWBlockDirectory(hostStr, stoi(portStr)); - c_blk = new cache_block(); + blockDir = new rgw::d4n::BlockDirectory(hostStr, stoi(portStr)); + cacheBlock = new rgw::d4n::CacheBlock(); - c_blk->hosts_list.push_back(redisHost); - c_blk->size_in_bytes = blkSize; - c_blk->c_obj.bucket_name = bucketName; - c_blk->c_obj.obj_name = oid; + cacheBlock->hostsList.push_back(redisHost); + cacheBlock->size = blockSize; + cacheBlock->cacheObj.bucketName = bucketName; + cacheBlock->cacheObj.objName = oid; } virtual void TearDown() { - delete blk_dir; - blk_dir = nullptr; + delete blockDir; + blockDir = nullptr; - delete c_blk; - c_blk = nullptr; + delete cacheBlock; + cacheBlock = nullptr; } - RGWBlockDirectory* blk_dir; - cache_block* c_blk; + rgw::d4n::BlockDirectory* blockDir; + rgw::d4n::CacheBlock* cacheBlock; }; /* Successful initialization */ TEST_F(DirectoryFixture, DirectoryInit) { - ASSERT_NE(blk_dir, nullptr); - ASSERT_NE(c_blk, nullptr); + ASSERT_NE(blockDir, nullptr); + ASSERT_NE(cacheBlock, nullptr); ASSERT_NE(redisHost.length(), (long unsigned int)0); } -/* Successful setValue Call and Redis Check */ +/* Successful set_value Call and Redis Check */ TEST_F(DirectoryFixture, SetValueTest) { cpp_redis::client client; int key_exist = -1; string key; string hosts; string size; - string bucket_name; - string obj_name; + string bucketName; + string objName; std::vector fields; - int setReturn = blk_dir->setValue(c_blk); + int setReturn = blockDir->set_value(cacheBlock); ASSERT_EQ(setReturn, 0); fields.push_back("key"); fields.push_back("hosts"); fields.push_back("size"); - fields.push_back("bucket_name"); - fields.push_back("obj_name"); + fields.push_back("bucketName"); + fields.push_back("objName"); client.connect(hostStr, stoi(portStr), nullptr, 0, 5, 1000); ASSERT_EQ((bool)client.is_connected(), (bool)1); - client.hmget("rgw-object:" + oid + ":directory", fields, [&key, &hosts, &size, &bucket_name, &obj_name, &key_exist](cpp_redis::reply& reply) { + client.hmget("rgw-object:" + oid + ":block-directory", fields, [&key, &hosts, &size, &bucketName, &objName, &key_exist](cpp_redis::reply& reply) { auto arr = reply.as_array(); if (!arr[0].is_null()) { @@ -76,47 +76,47 @@ TEST_F(DirectoryFixture, SetValueTest) { key = arr[0].as_string(); hosts = arr[1].as_string(); size = arr[2].as_string(); - bucket_name = arr[3].as_string(); - obj_name = arr[4].as_string(); + bucketName = arr[3].as_string(); + objName = arr[4].as_string(); } }); client.sync_commit(); EXPECT_EQ(key_exist, 0); - EXPECT_EQ(key, "rgw-object:" + oid + ":directory"); + EXPECT_EQ(key, "rgw-object:" + oid + ":block-directory"); EXPECT_EQ(hosts, redisHost); - EXPECT_EQ(size, to_string(blkSize)); - EXPECT_EQ(bucket_name, bucketName); - EXPECT_EQ(obj_name, oid); + EXPECT_EQ(size, to_string(blockSize)); + EXPECT_EQ(bucketName, bucketName); + EXPECT_EQ(objName, oid); client.flushall(); } -/* Successful getValue Calls and Redis Check */ +/* Successful get_value Calls and Redis Check */ TEST_F(DirectoryFixture, GetValueTest) { cpp_redis::client client; int key_exist = -1; string key; string hosts; string size; - string bucket_name; - string obj_name; + string bucketName; + string objName; std::vector fields; - int setReturn = blk_dir->setValue(c_blk); + int setReturn = blockDir->set_value(cacheBlock); ASSERT_EQ(setReturn, 0); fields.push_back("key"); fields.push_back("hosts"); fields.push_back("size"); - fields.push_back("bucket_name"); - fields.push_back("obj_name"); + fields.push_back("bucketName"); + fields.push_back("objName"); client.connect(hostStr, stoi(portStr), nullptr, 0, 5, 1000); ASSERT_EQ((bool)client.is_connected(), (bool)1); - client.hmget("rgw-object:" + oid + ":directory", fields, [&key, &hosts, &size, &bucket_name, &obj_name, &key_exist](cpp_redis::reply& reply) { + client.hmget("rgw-object:" + oid + ":block-directory", fields, [&key, &hosts, &size, &bucketName, &objName, &key_exist](cpp_redis::reply& reply) { auto arr = reply.as_array(); if (!arr[0].is_null()) { @@ -124,47 +124,47 @@ TEST_F(DirectoryFixture, GetValueTest) { key = arr[0].as_string(); hosts = arr[1].as_string(); size = arr[2].as_string(); - bucket_name = arr[3].as_string(); - obj_name = arr[4].as_string(); + bucketName = arr[3].as_string(); + objName = arr[4].as_string(); } }); client.sync_commit(); EXPECT_EQ(key_exist, 0); - EXPECT_EQ(key, "rgw-object:" + oid + ":directory"); + EXPECT_EQ(key, "rgw-object:" + oid + ":block-directory"); EXPECT_EQ(hosts, redisHost); - EXPECT_EQ(size, to_string(blkSize)); - EXPECT_EQ(bucket_name, bucketName); - EXPECT_EQ(obj_name, oid); + EXPECT_EQ(size, to_string(blockSize)); + EXPECT_EQ(bucketName, bucketName); + EXPECT_EQ(objName, oid); /* Check if object name in directory instance matches redis update */ - client.hset("rgw-object:" + oid + ":directory", "obj_name", "newoid", [](cpp_redis::reply& reply) { - if (reply.is_integer()) { - ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */ + client.hset("rgw-object:" + oid + ":block-directory", "objName", "newoid", [](cpp_redis::reply& reply) { + if (!reply.is_null()) { + ASSERT_EQ(reply.as_integer(), 0); } }); client.sync_commit(); - int getReturn = blk_dir->getValue(c_blk); + int getReturn = blockDir->get_value(cacheBlock); ASSERT_EQ(getReturn, 0); - EXPECT_EQ(c_blk->c_obj.obj_name, "newoid"); + EXPECT_EQ(cacheBlock->cacheObj.objName, "newoid"); client.flushall(); } -/* Successful delValue Call and Redis Check */ +/* Successful del_value Call and Redis Check */ TEST_F(DirectoryFixture, DelValueTest) { cpp_redis::client client; vector keys; - int setReturn = blk_dir->setValue(c_blk); + int setReturn = blockDir->set_value(cacheBlock); ASSERT_EQ(setReturn, 0); - /* Ensure cache entry exists in cache before deletion */ - keys.push_back("rgw-object:" + oid + ":directory"); + /* Ensure entry exists in directory before deletion */ + keys.push_back("rgw-object:" + oid + ":block-directory"); client.exists(keys, [](cpp_redis::reply& reply) { if (reply.is_integer()) { @@ -172,13 +172,13 @@ TEST_F(DirectoryFixture, DelValueTest) { } }); - int delReturn = blk_dir->delValue(c_blk); + int delReturn = blockDir->del_value(cacheBlock); ASSERT_EQ(delReturn, 0); client.exists(keys, [](cpp_redis::reply& reply) { if (reply.is_integer()) { - ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */ + ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */ } }); diff --git a/src/test/rgw/test_d4n_filter.cc b/src/test/rgw/test_d4n_filter.cc index 9a8b9950bd2..b5c382c119e 100644 --- a/src/test/rgw/test_d4n_filter.cc +++ b/src/test/rgw/test_d4n_filter.cc @@ -6,7 +6,7 @@ #include #include "driver/dbstore/common/dbstore.h" #include "rgw_sal_store.h" -#include "driver/d4n/rgw_sal_d4n.h" +#include "../../rgw/driver/d4n/rgw_sal_d4n.h" // fix -Sam #include "rgw_sal.h" #include "rgw_auth.h" @@ -176,7 +176,10 @@ class D4NFilterFixture : public ::testing::Test { rgw_zone_set zones_trace; bool canceled; - int ret = testWriter->complete(accounted_size, etag, + int ret = testWriter->prepare(null_yield); + + if (!ret) { + ret = testWriter->complete(accounted_size, etag, &mtime, set_mtime, attrs, delete_at, @@ -219,7 +222,7 @@ TEST_F(D4NFilterFixture, CreateBucket) { TEST_F(D4NFilterFixture, PutObject) { cpp_redis::client client; vector fields; - fields.push_back("test_attrs_key_0"); + fields.push_back("test_attrs_key_PutObject"); clientSetUp(&client); ASSERT_EQ(createUser(), 0); @@ -375,7 +378,6 @@ TEST_F(D4NFilterFixture, CopyObjectNone) { TEST_F(D4NFilterFixture, CopyObjectReplace) { cpp_redis::client client; - vector fields; clientSetUp(&client); createUser(); @@ -469,29 +471,52 @@ TEST_F(D4NFilterFixture, CopyObjectReplace) { client.sync_commit(); - /* Check copy */ - client.hgetall("rgw-object:test_object_copy:cache", [](cpp_redis::reply& reply) { + /* Retrieve original object's redis data for later comparison */ + std::vector< std::pair > data; + + client.hgetall("rgw-object:test_object_CopyObjectReplace:cache", [&data](cpp_redis::reply& reply) { auto arr = reply.as_array(); if (!arr[0].is_null()) { - EXPECT_EQ((int)arr.size(), 4 + METADATA_LENGTH); /* With etag */ + for (int i = 0; i < (int)arr.size() - 1; i += 2) { + data.push_back({arr[i].as_string(), arr[i + 1].as_string()}); + } } }); client.sync_commit(); - - fields.push_back("test_attrs_key_CopyObjectReplace"); - - client.hmget("rgw-object:test_object_copy:cache", fields, [](cpp_redis::reply& reply) { + + /* Check copy */ + client.hgetall("rgw-object:test_object_copy:cache", [&data](cpp_redis::reply& reply) { + bool unexpected = false; auto arr = reply.as_array(); if (!arr[0].is_null()) { - EXPECT_EQ(arr[0].as_string(), "test_attrs_copy_value"); + EXPECT_EQ((int)arr.size(), 4 + METADATA_LENGTH); /* With etag */ + + for (int i = 0; i < (int)arr.size() - 1; i += 2) { + auto it = std::find_if(data.begin(), data.end(), + [&](const auto& pair) { return pair.first == arr[i].as_string(); }); + + if (it != data.end()) { + if (arr[i].as_string() == "test_attrs_key_CopyObjectReplace") + EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_copy_value"); + else if (arr[i].as_string() != "mtime") { /* mtime will be different */ + int index = std::distance(data.begin(), it); + EXPECT_EQ(arr[i + 1].as_string(), data[index].second); + } + } else if (arr[i].as_string() == "etag") { + EXPECT_EQ(arr[i + 1].as_string(), "test_etag_copy"); + } else + unexpected = true; /* Unexpected field */ + } + + EXPECT_EQ(unexpected, false); } }); client.sync_commit(); - + clientReset(&client); } @@ -594,26 +619,52 @@ TEST_F(D4NFilterFixture, CopyObjectMerge) { client.sync_commit(); - /* Check copy */ - client.hgetall("rgw-object:test_object_copy:cache", [](cpp_redis::reply& reply) { + /* Retrieve original object's redis data for later comparison */ + std::vector< std::pair > data; + + client.hgetall("rgw-object:test_object_CopyObjectMerge:cache", [&data](cpp_redis::reply& reply) { auto arr = reply.as_array(); if (!arr[0].is_null()) { - EXPECT_EQ((int)arr.size(), 6 + METADATA_LENGTH); /* With etag */ + for (int i = 0; i < (int)arr.size() - 1; i += 2) { + data.push_back({arr[i].as_string(), arr[i + 1].as_string()}); + } } }); client.sync_commit(); - - fields.push_back("test_attrs_key_CopyObjectMerge"); - fields.push_back("test_attrs_copy_extra_key"); - - client.hmget("rgw-object:test_object_copy:cache", fields, [](cpp_redis::reply& reply) { + + /* Check copy */ + client.hgetall("rgw-object:test_object_copy:cache", [&data](cpp_redis::reply& reply) { + bool unexpected = false; + bool merge = false; auto arr = reply.as_array(); if (!arr[0].is_null()) { - EXPECT_EQ(arr[0].as_string(), "test_attrs_value_CopyObjectMerge"); - EXPECT_EQ(arr[1].as_string(), "test_attrs_copy_extra_value"); + EXPECT_EQ((int)arr.size(), 6 + METADATA_LENGTH); /* With etag */ + + for (int i = 0; i < (int)arr.size() - 1; i += 2) { + auto it = std::find_if(data.begin(), data.end(), + [&](const auto& pair) { return pair.first == arr[i].as_string(); }); + + if (it != data.end()) { + if (arr[i].as_string() == "test_attrs_key_CopyObjectMerge") + EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_value_CopyObjectMerge"); + else if (arr[i].as_string() != "mtime") { /* mtime will be different */ + int index = std::distance(data.begin(), it); + EXPECT_EQ(arr[i + 1].as_string(), data[index].second); + } + } else if (arr[i].as_string() == "etag") { + EXPECT_EQ(arr[i + 1].as_string(), "test_etag_copy"); + } else if (arr[i].as_string() == "test_attrs_copy_extra_key") { + merge = true; + EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_copy_extra_value"); + } else + unexpected = true; /* Unexpected field */ + } + + EXPECT_EQ(unexpected, false); + EXPECT_EQ(merge, true); } }); @@ -667,6 +718,130 @@ TEST_F(D4NFilterFixture, DelObject) { clientReset(&client); } +TEST_F(D4NFilterFixture, CachePolicy) { + cpp_redis::client client; + clientSetUp(&client); + + createUser(); + createBucket(); + + /* Create multipart object */ + string object_name = "test_object_CachePolicy"; + unique_ptr obj = testBucket->get_object(rgw_obj_key(object_name)); + rgw_user owner; + rgw_placement_rule ptail_placement_rule; + uint64_t olh_epoch = 123; + string unique_tag; + + obj->get_obj_attrs(null_yield, dpp); + + testWriter = driver->get_atomic_writer(dpp, + null_yield, + obj.get(), + owner, + &ptail_placement_rule, + olh_epoch, + unique_tag); + + size_t accounted_size = 15; /* Uploaded as multipart */ + string etag("test_etag"); + ceph::real_time mtime; + ceph::real_time set_mtime; + + buffer::list bl; + string tmp = "test_attrs_value_CachePolicy"; + bl.append("test_attrs_value_CachePolicy"); + map attrs{{"test_attrs_key_CachePolicy", bl}}; + + ceph::real_time delete_at; + char if_match; + char if_nomatch; + string user_data; + rgw_zone_set zones_trace; + bool canceled; + + ASSERT_EQ(testWriter->complete(accounted_size, etag, + &mtime, set_mtime, + attrs, + delete_at, + &if_match, &if_nomatch, + &user_data, + &zones_trace, &canceled, + null_yield), 0); + + + unique_ptr testObject_CachePolicy = testBucket->get_object(rgw_obj_key("test_object_CachePolicy")); + + ASSERT_NE(testObject_CachePolicy, nullptr); + + /* Copy to new multipart object */ + unique_ptr testWriterCopy = nullptr; + unique_ptr obj_copy = testBucket->get_object(rgw_obj_key("test_object_copy")); + uint64_t olh_epoch_copy = 123; + + obj_copy->get_obj_attrs(null_yield, dpp); + + testWriterCopy = driver->get_atomic_writer(dpp, + null_yield, + obj_copy.get(), + owner, + &ptail_placement_rule, + olh_epoch_copy, + unique_tag); + + RGWEnv rgw_env; + req_info info(get_pointer(env->cct), &rgw_env); + rgw_zone_id source_zone; + rgw_placement_rule dest_placement; + ceph::real_time src_mtime; + ceph::real_time mod_ptr; + ceph::real_time unmod_ptr; + rgw::sal::AttrsMod attrs_mod = rgw::sal::ATTRSMOD_REPLACE; + RGWObjCategory category = RGWObjCategory::Main; + string tag; + + ASSERT_EQ(testWriterCopy->complete(accounted_size, etag, + &mtime, set_mtime, + attrs, + delete_at, + &if_match, &if_nomatch, + &user_data, + &zones_trace, &canceled, + null_yield), 0); + + unique_ptr testObject_copy = testBucket->get_object(rgw_obj_key("test_object_copy")); + + EXPECT_EQ(testObject_CachePolicy->copy_object(testUser.get(), + &info, source_zone, testObject_copy.get(), + testBucket.get(), testBucket.get(), + dest_placement, &src_mtime, &mtime, + &mod_ptr, &unmod_ptr, false, + &if_match, &if_nomatch, attrs_mod, + false, attrs, category, olh_epoch, + delete_at, NULL, &tag, &etag, + NULL, NULL, dpp, null_yield), 0); + + /* Ensure data field doesn't exist for original object */ + client.hexists("rgw-object:test_object_CachePolicy:cache", "data", [](cpp_redis::reply& reply) { + if (reply.is_integer()) { + EXPECT_EQ(reply.as_integer(), 0); + } + }); + + client.sync_commit(); + + /* Ensure data field doesn't exist for copy */ + client.hexists("rgw-object:test_object_CachePolicy:cache", "data", [](cpp_redis::reply& reply) { + if (reply.is_integer()) { + EXPECT_EQ(reply.as_integer(), 0); + } + }); + + client.sync_commit(); + + clientReset(&client); +} + /* Attribute-related tests */ TEST_F(D4NFilterFixture, SetObjectAttrs) { cpp_redis::client client; -- 2.39.5