d4n policy and filter driver.
RGW: Add cache driver base class - add d4n filter driver.
RGW: Update cache and policy files; add RedisDriver into D4N filter
RGW: Add D4N classes and unit testing; update cpp_redis submodule
QA: Add D4N teuthology suite
RGW: Improve D4N readability and structure
RGW: Add base D4N policy and test
RGW: Add GWF policy to D4N
RGW: Add global weight property
RGW: Added D4N namespace
RGW: Update policy driver interface
RGW: Update unit tests
RGW: Add Address struct to all files
RGW: Update D4N names and structure
RGW: Update structure and fix errors
RGW: Add more features to policy driver
RGW: Access local weight in policy code
RGW: Work on D4N workflows
RGW: Add object directory class and remove copy_value method
RGW: Establish object directory in read workflow
RGW: Update cache and policy files; add RedisDriver into D4N filter
RGW: Switch out D4N cache methods with Redis driver methods
RGW: Update RedisDriver to match new CacheDriver structure; define set_attrs method
RGW: Fix D4N read workflow crashes
RGW: Update D4N files to match CacheDriver changes
RGW: fix key_exists method for RedisDriver and clean up rgw_sal_d4n.cc
RGW: Use correct get_block method
RGW: Make CachePolicy a virtual class
RGW: Initialize localWeight if not found and develop find_victim method
RGW: Debugging network failure
RGW: Rebase and debugging network failure
RGW: Update RedisDriver::list_entries and usage in D4N policy driver
RGW: Fix network failure issue; add entries and entry methods
RGW: Update D4N test files to match rebase
RGW: Update D4N policy and RedisDriver with entries
RGW: make localWeight an attribute
rgw/cache: commit to fix compilation issues.
Signed-off-by: Samarah <samarah.uriarte@ibm.com>
%dir %{_localstatedir}/lib/ceph/radosgw
%{_unitdir}/ceph-radosgw@.service
%{_unitdir}/ceph-radosgw.target
+%exclude %{_includedir}/cpp_redis
+%exclude %{_includedir}/tacopie
+%exclude /usr/lib/libcpp_redis.a
+%exclude /usr/lib/libtacopie.a
+%exclude /usr/lib/pkgconfig/cpp_redis.pc
+%exclude /usr/lib/pkgconfig/tacopie.pc
%post radosgw
%if 0%{?suse_version}
rgw_tracer.cc
rgw_lua_background.cc
rgw_data_access.cc
+ driver/d4n/d4n_directory.cc
+ driver/d4n/d4n_policy.cc
+ driver/d4n/rgw_sal_d4n.cc
driver/rados/cls_fifo_legacy.cc
driver/rados/rgw_bucket.cc
driver/rados/rgw_bucket_sync.cc
driver/rados/topic.cc)
list(APPEND librgw_common_srcs
+ driver/d4n/d4n_directory.cc
driver/immutable_config/store.cc
driver/json_config/store.cc
driver/rados/config/impl.cc
endif()
if(WITH_RADOSGW_D4N)
list(APPEND librgw_common_srcs driver/d4n/d4n_directory.cc)
- list(APPEND librgw_common_srcs driver/d4n/d4n_datacache.cc)
list(APPEND librgw_common_srcs driver/d4n/rgw_sal_d4n.cc)
endif()
if(WITH_RADOSGW_POSIX)
+++ /dev/null
-#include "d4n_datacache.h"
-
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
-
-/* Base metadata and data fields should remain consistent */
-std::vector<std::string> baseFields {
- "mtime",
- "object_size",
- "accounted_size",
- "epoch",
- "version_id",
- "source_zone_short_id",
- "bucket_count",
- "bucket_size",
- "user_quota.max_size",
- "user_quota.max_objects",
- "max_buckets",
- "data"};
-
-std::vector< std::pair<std::string, std::string> > RGWD4NCache::buildObject(rgw::sal::Attrs* binary) {
- std::vector< std::pair<std::string, std::string> > values;
- rgw::sal::Attrs::iterator attrs;
-
- /* Convert to vector */
- if (binary != NULL) {
- for (attrs = binary->begin(); attrs != binary->end(); ++attrs) {
- values.push_back(std::make_pair(attrs->first, attrs->second.to_str()));
- }
- }
-
- return values;
-}
-
-int RGWD4NCache::findClient(cpp_redis::client *client) {
- if (client->is_connected())
- return 0;
-
- if (host == "" || port == 0) {
- dout(10) << "RGW D4N Cache: D4N cache endpoint was not configured correctly" << dendl;
- return EDESTADDRREQ;
- }
-
- client->connect(host, port, nullptr);
-
- if (!client->is_connected())
- return ECONNREFUSED;
-
- return 0;
-}
-
-int RGWD4NCache::existKey(std::string key) {
- int result = -1;
- std::vector<std::string> keys;
- keys.push_back(key);
-
- if (!client.is_connected()) {
- return result;
- }
-
- try {
- client.exists(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer(); /* Returns 1 upon success */
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {}
-
- return result;
-}
-
-int RGWD4NCache::setObject(std::string oid, rgw::sal::Attrs* attrs) {
- /* Creating the index based on oid */
- std::string key = "rgw-object:" + oid + ":cache";
- std::string result;
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- /* Every set will be treated as new */
- try {
- std::vector< std::pair<std::string, std::string> > redisObject = buildObject(attrs);
-
- if (redisObject.empty()) {
- return -1;
- }
-
- client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- if (result != "OK") {
- return -1;
- }
- } catch(std::exception &e) {
- return -1;
- }
-
- return 0;
-}
-
-int RGWD4NCache::getObject(std::string oid,
- rgw::sal::Attrs* newAttrs,
- std::vector< std::pair<std::string, std::string> >* newMetadata)
-{
- std::string result;
- std::string key = "rgw-object:" + oid + ":cache";
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- if (existKey(key)) {
- int field_exist = -1;
-
- rgw::sal::Attrs::iterator it;
- std::vector< std::pair<std::string, std::string> > redisObject;
- std::vector<std::string> getFields;
-
- /* Retrieve existing fields from cache */
- try {
- client.hgetall(key, [&getFields](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
- getFields.push_back(arr[i].as_string());
- }
- }
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
-
- /* Only data exists */
- if (getFields.size() == 1 && getFields[0] == "data")
- return 0;
-
- /* Ensure all metadata, attributes, and data has been set */
- for (const auto& field : baseFields) {
- auto it = std::find_if(getFields.begin(), getFields.end(),
- [&](const auto& comp) { return comp == field; });
-
- if (it != getFields.end()) {
- int index = std::distance(getFields.begin(), it);
- getFields.erase(getFields.begin() + index);
- } else {
- return -1;
- }
- }
-
- /* Get attributes from cache */
- try {
- client.hmget(key, getFields, [&field_exist, &newAttrs, &getFields](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- field_exist = 0;
-
- for (long unsigned int i = 0; i < getFields.size(); ++i) {
- std::string tmp = arr[i].as_string();
- buffer::list bl;
- bl.append(tmp);
- newAttrs->insert({getFields[i], bl});
- }
- }
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
-
- if (field_exist == 0) {
- field_exist = -1;
-
- getFields.clear();
- getFields.insert(getFields.begin(), baseFields.begin(), baseFields.end());
- getFields.pop_back(); /* Do not query for data field */
-
- /* Get metadata from cache */
- try {
- client.hmget(key, getFields, [&field_exist, &newMetadata, &getFields](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- field_exist = 0;
-
- for (long unsigned int i = 0; i < getFields.size(); ++i) {
- newMetadata->push_back({getFields[i], arr[i].as_string()});
- }
- }
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
- } else {
- return -1;
- }
- } else {
- dout(20) << "RGW D4N Cache: Object was not retrievable." << dendl;
- return -2;
- }
-
- return 0;
-}
-
-int RGWD4NCache::copyObject(std::string original_oid, std::string copy_oid, rgw::sal::Attrs* attrs) {
- std::string result;
- std::vector< std::pair<std::string, std::string> > redisObject;
- std::string key = "rgw-object:" + original_oid + ":cache";
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- /* Read values from cache */
- if (existKey(key)) {
- try {
- client.hgetall(key, [&redisObject](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
- redisObject.push_back({arr[i].as_string(), arr[i + 1].as_string()});
- }
- }
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
- } else {
- return -2;
- }
-
- /* Build copy with updated values */
- if (!redisObject.empty()) {
- rgw::sal::Attrs::iterator attr;
-
- for (attr = attrs->begin(); attr != attrs->end(); ++attr) {
- auto it = std::find_if(redisObject.begin(), redisObject.end(),
- [&](const auto& pair) { return pair.first == attr->first; });
-
- if (it != redisObject.end()) {
- int index = std::distance(redisObject.begin(), it);
- redisObject[index] = {attr->first, attr->second.to_str()};
- } else {
- redisObject.push_back(std::make_pair(attr->first, attr->second.to_str()));
- }
- }
- } else {
- return -1;
- }
-
- /* Set copy with new values */
- key = "rgw-object:" + copy_oid + ":cache";
-
- try {
- client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- if (result != "OK") {
- return -1;
- }
- } catch(std::exception &e) {
- return -1;
- }
-
- return 0;
-}
-
-int RGWD4NCache::delObject(std::string oid) {
- int result = 0;
- std::vector<std::string> keys;
- std::string key = "rgw-object:" + oid + ":cache";
- keys.push_back(key);
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- if (existKey(key)) {
- try {
- client.del(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- return result - 1;
- } catch(std::exception &e) {
- return -1;
- }
- } else {
- dout(20) << "RGW D4N Cache: Object is not in cache." << dendl;
- return -2;
- }
-}
-
-int RGWD4NCache::updateAttr(std::string oid, rgw::sal::Attrs* attr) {
- std::string result;
- std::string key = "rgw-object:" + oid + ":cache";
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- if (existKey(key)) {
- try {
- std::vector< std::pair<std::string, std::string> > redisObject;
- auto it = attr->begin();
- redisObject.push_back({it->first, it->second.to_str()});
-
- client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- if (result != "OK") {
- return -1;
- }
- } catch(std::exception &e) {
- return -1;
- }
- } else {
- return -2;
- }
-
- return 0;
-}
-
-int RGWD4NCache::delAttrs(std::string oid, std::vector<std::string>& baseFields, std::vector<std::string>& deleteFields) {
- int result = 0;
- std::string key = "rgw-object:" + oid + ":cache";
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- if (existKey(key)) {
- /* Find if attribute doesn't exist */
- for (const auto& delField : deleteFields) {
- if (std::find(baseFields.begin(), baseFields.end(), delField) == baseFields.end()) {
- deleteFields.erase(std::find(deleteFields.begin(), deleteFields.end(), delField));
- }
- }
-
- try {
- client.hdel(key, deleteFields, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- return result - 1;
- } catch(std::exception &e) {
- return -1;
- }
- }
-
- dout(20) << "RGW D4N Cache: Object is not in cache." << dendl;
- return -2;
-}
-
-int RGWD4NCache::appendData(std::string oid, buffer::list& data) {
- std::string result;
- std::string value = "";
- std::string key = "rgw-object:" + oid + ":cache";
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- if (existKey(key)) {
- try {
- client.hget(key, "data", [&value](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- value = reply.as_string();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
- }
-
- try {
- /* Append to existing value or set as new value */
- std::string temp = value + data.to_str();
- std::vector< std::pair<std::string, std::string> > field;
- field.push_back({"data", temp});
-
- client.hmset(key, field, [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_string();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- if (result != "OK") {
- return -1;
- }
- } catch(std::exception &e) {
- return -1;
- }
-
- return 0;
-}
-
-int RGWD4NCache::deleteData(std::string oid) {
- int result = 0;
- std::string key = "rgw-object:" + oid + ":cache";
- std::vector<std::string> deleteField;
- deleteField.push_back("data");
-
- if (!client.is_connected()) {
- findClient(&client);
- }
-
- if (existKey(key)) {
- int field_exist = -1;
-
- try {
- client.hget(key, "data", [&field_exist](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- field_exist = 0;
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
-
- if (field_exist == 0) {
- try {
- client.hdel(key, deleteField, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer(); /* Returns 1 upon success */
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- return result - 1;
- } catch(std::exception &e) {
- return -1;
- }
- } else {
- return -1;
- }
- } else {
- return 0; /* No delete was necessary */
- }
-}
+++ /dev/null
-#ifndef CEPH_RGWD4NCACHE_H
-#define CEPH_RGWD4NCACHE_H
-
-#include "rgw_common.h"
-#include <cpp_redis/cpp_redis>
-#include <string>
-#include <iostream>
-
-class RGWD4NCache {
- public:
- CephContext *cct;
-
- RGWD4NCache() {}
- RGWD4NCache(std::string cacheHost, int cachePort):host(cacheHost), port(cachePort) {}
-
- void init(CephContext *_cct) {
- cct = _cct;
- host = cct->_conf->rgw_d4n_host;
- port = cct->_conf->rgw_d4n_port;
- }
-
- int findClient(cpp_redis::client *client);
- int existKey(std::string key);
- int setObject(std::string oid, rgw::sal::Attrs* attrs);
- int getObject(std::string oid, rgw::sal::Attrs* newAttrs, std::vector< std::pair<std::string, std::string> >* newMetadata);
- int copyObject(std::string original_oid, std::string copy_oid, rgw::sal::Attrs* attrs);
- int delObject(std::string oid);
- int updateAttr(std::string oid, rgw::sal::Attrs* attr);
- int delAttrs(std::string oid, std::vector<std::string>& baseFields, std::vector<std::string>& deleteFields);
- int appendData(std::string oid, buffer::list& data);
- int deleteData(std::string oid);
-
- private:
- cpp_redis::client client;
- std::string host = "";
- int port = 0;
- std::vector< std::pair<std::string, std::string> > buildObject(rgw::sal::Attrs* binary);
-};
-
-#endif
#include "d4n_directory.h"
+#include <time.h>
#define dout_subsys ceph_subsys_rgw
#define dout_context g_ceph_context
-int RGWBlockDirectory::findClient(cpp_redis::client *client) {
+namespace rgw { namespace d4n {
+
+int ObjectDirectory::find_client(cpp_redis::client* client) {
if (client->is_connected())
return 0;
- if (host == "" || port == 0) {
+ if (addr.host == "" || addr.port == 0) {
dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
return EDESTADDRREQ;
}
- client->connect(host, port, nullptr);
+ client->connect(addr.host, addr.port, nullptr);
if (!client->is_connected())
return ECONNREFUSED;
return 0;
}
-std::string RGWBlockDirectory::buildIndex(cache_block *ptr) {
- return "rgw-object:" + ptr->c_obj.obj_name + ":directory";
+std::string ObjectDirectory::build_index(CacheObj* object) {
+ return "rgw-object:" + object->objName + ":object-directory";
}
-int RGWBlockDirectory::existKey(std::string key) {
- int result = -1;
+int ObjectDirectory::exist_key(std::string key) {
+ int result = 0;
std::vector<std::string> keys;
keys.push_back(key);
try {
client.exists(keys, [&result](cpp_redis::reply &reply) {
if (reply.is_integer()) {
- result = reply.as_integer(); /* Returns 1 upon success */
+ result = reply.as_integer();
}
});
return result;
}
-int RGWBlockDirectory::setValue(cache_block *ptr) {
- /* Creating the index based on obj_name */
- std::string key = buildIndex(ptr);
+int ObjectDirectory::set_value(CacheObj* object) {
+ /* Creating the index based on objName */
+ std::string result;
+ std::string key = build_index(object);
if (!client.is_connected()) {
- findClient(&client);
+ find_client(&client);
}
- std::string result;
+ /* Every set will be new */
+ if (addr.host == "" || addr.port == 0) {
+ dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
+ return -2;
+ }
+
+ std::string endpoint = addr.host + ":" + std::to_string(addr.port);
+ std::vector< std::pair<std::string, std::string> > list;
+
+ /* Creating a list of the entry's properties */
+ list.push_back(make_pair("key", key));
+ list.push_back(make_pair("objName", object->objName));
+ list.push_back(make_pair("bucketName", object->bucketName));
+ list.push_back(make_pair("creationTime", std::to_string(object->creationTime)));
+ list.push_back(make_pair("dirty", std::to_string(object->dirty)));
+ list.push_back(make_pair("hosts", endpoint));
+
+ try {
+ client.hmset(key, list, [&result](cpp_redis::reply &reply) {
+ if (!reply.is_null()) {
+ result = reply.as_string();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+
+ if (result != "OK") {
+ return -1;
+ }
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int ObjectDirectory::get_value(CacheObj* object) {
+ int keyExist = -2;
+ std::string key = build_index(object);
+
+ if (!client.is_connected()) {
+ find_client(&client);
+ }
+
+ if (exist_key(key)) {
+ std::string key;
+ std::string objName;
+ std::string bucketName;
+ std::string creationTime;
+ std::string dirty;
+ std::string hosts;
+ std::vector<std::string> fields;
+
+ fields.push_back("key");
+ fields.push_back("objName");
+ fields.push_back("bucketName");
+ fields.push_back("creationTime");
+ fields.push_back("dirty");
+ fields.push_back("hosts");
+
+ try {
+ client.hmget(key, fields, [&key, &objName, &bucketName, &creationTime, &dirty, &hosts, &keyExist](cpp_redis::reply &reply) {
+ if (reply.is_array()) {
+ auto arr = reply.as_array();
+
+ if (!arr[0].is_null()) {
+ keyExist = 0;
+ key = arr[0].as_string();
+ objName = arr[1].as_string();
+ bucketName = arr[2].as_string();
+ creationTime = arr[3].as_string();
+ dirty = arr[4].as_string();
+ hosts = arr[5].as_string();
+ }
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+
+ if (keyExist < 0) {
+ return keyExist;
+ }
+
+ /* Currently, there can only be one host */
+ object->objName = objName;
+ object->bucketName = bucketName;
+
+ struct std::tm tm;
+ std::istringstream(creationTime) >> std::get_time(&tm, "%T");
+ strptime(creationTime.c_str(), "%T", &tm); // Need to check formatting -Sam
+ object->creationTime = mktime(&tm);
+
+ std::istringstream(dirty) >> object->dirty;
+ } catch(std::exception &e) {
+ keyExist = -1;
+ }
+ }
+
+ return keyExist;
+}
+
+int ObjectDirectory::del_value(CacheObj* object) {
+ int result = 0;
+ std::vector<std::string> keys;
+ std::string key = build_index(object);
+ keys.push_back(key);
+
+ if (!client.is_connected()) {
+ find_client(&client);
+ }
+
+ if (exist_key(key)) {
+ try {
+ client.del(keys, [&result](cpp_redis::reply &reply) {
+ if (reply.is_integer()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ return result - 1;
+ } catch(std::exception &e) {
+ return -1;
+ }
+ } else {
+ return -2;
+ }
+}
+
+int BlockDirectory::find_client(cpp_redis::client* client) {
+ if (client->is_connected())
+ return 0;
+
+ if (addr.host == "" || addr.port == 0) {
+ dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
+ return EDESTADDRREQ;
+ }
+
+ client->connect(addr.host, addr.port, nullptr);
+
+ if (!client->is_connected())
+ return ECONNREFUSED;
+
+ return 0;
+}
+
+std::string BlockDirectory::build_index(CacheBlock* block) {
+ return "rgw-object:" + block->cacheObj.objName + ":block-directory";
+}
+
+int BlockDirectory::exist_key(std::string key) {
+ int result = 0;
std::vector<std::string> keys;
keys.push_back(key);
+
+ if (!client.is_connected()) {
+ return result;
+ }
+
+ try {
+ client.exists(keys, [&result](cpp_redis::reply &reply) {
+ if (reply.is_integer()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {}
+
+ return result;
+}
+
+int BlockDirectory::set_value(CacheBlock* block) {
+ /* Creating the index based on objName */
+ std::string result;
+ std::string key = build_index(block);
+ if (!client.is_connected()) {
+ find_client(&client);
+ }
/* Every set will be new */
- if (host == "" || port == 0) {
+ if (addr.host == "" || addr.port == 0) {
dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
- return -1;
+ return -2;
}
- std::string endpoint = host + ":" + std::to_string(port);
- std::vector<std::pair<std::string, std::string>> list;
+ std::string endpoint = addr.host + ":" + std::to_string(addr.port);
+ std::vector< std::pair<std::string, std::string> > list;
- /* Creating a list of key's properties */
+ /* Creating a list of the entry's properties */
list.push_back(make_pair("key", key));
- list.push_back(make_pair("size", std::to_string(ptr->size_in_bytes)));
- list.push_back(make_pair("bucket_name", ptr->c_obj.bucket_name));
- list.push_back(make_pair("obj_name", ptr->c_obj.obj_name));
+ list.push_back(make_pair("size", std::to_string(block->size)));
+ list.push_back(make_pair("globalWeight", std::to_string(block->globalWeight)));
+ list.push_back(make_pair("bucketName", block->cacheObj.bucketName));
+ list.push_back(make_pair("objName", block->cacheObj.objName));
list.push_back(make_pair("hosts", endpoint));
try {
});
client.sync_commit(std::chrono::milliseconds(1000));
-
+
if (result != "OK") {
return -1;
}
return 0;
}
-int RGWBlockDirectory::getValue(cache_block *ptr) {
- std::string key = buildIndex(ptr);
+int BlockDirectory::get_value(CacheBlock* block) {
+ int keyExist = -2;
+ std::string key = build_index(block);
if (!client.is_connected()) {
- findClient(&client);
+ find_client(&client);
}
- if (existKey(key)) {
- int field_exist = -1;
-
+ if (exist_key(key)) {
std::string hosts;
std::string size;
- std::string bucket_name;
- std::string obj_name;
+ std::string bucketName;
+ std::string objName;
std::vector<std::string> fields;
fields.push_back("key");
fields.push_back("hosts");
fields.push_back("size");
- fields.push_back("bucket_name");
- fields.push_back("obj_name");
+ fields.push_back("bucketName");
+ fields.push_back("objName");
try {
- client.hmget(key, fields, [&key, &hosts, &size, &bucket_name, &obj_name, &field_exist](cpp_redis::reply &reply) {
+ client.hmget(key, fields, [&key, &hosts, &size, &bucketName, &objName, &keyExist](cpp_redis::reply &reply) {
if (reply.is_array()) {
auto arr = reply.as_array();
if (!arr[0].is_null()) {
- field_exist = 0;
+ keyExist = 0;
key = arr[0].as_string();
hosts = arr[1].as_string();
size = arr[2].as_string();
- bucket_name = arr[3].as_string();
- obj_name = arr[4].as_string();
+ bucketName = arr[3].as_string();
+ objName = arr[4].as_string();
}
}
});
client.sync_commit(std::chrono::milliseconds(1000));
- if (field_exist < 0) {
- return field_exist;
+ if (keyExist < 0 ) {
+ return keyExist;
}
- /* Currently, there can only be one host */
- ptr->size_in_bytes = std::stoi(size);
- ptr->c_obj.bucket_name = bucket_name;
- ptr->c_obj.obj_name = obj_name;
+ /* Currently, there can only be one host */ // update -Sam
+ block->size = std::stoi(size);
+ block->cacheObj.bucketName = bucketName;
+ block->cacheObj.objName = objName;
} catch(std::exception &e) {
- return -1;
+ keyExist = -1;
}
}
- return 0;
+ return keyExist;
}
-int RGWBlockDirectory::delValue(cache_block *ptr) {
+int BlockDirectory::del_value(CacheBlock* block) {
int result = 0;
std::vector<std::string> keys;
- std::string key = buildIndex(ptr);
+ std::string key = build_index(block);
keys.push_back(key);
if (!client.is_connected()) {
- findClient(&client);
+ find_client(&client);
}
- if (existKey(key)) {
+ if (exist_key(key)) {
try {
client.del(keys, [&result](cpp_redis::reply &reply) {
if (reply.is_integer()) {
- result = reply.as_integer(); /* Returns 1 upon success */
+ result = reply.as_integer();
}
});
client.sync_commit(std::chrono::milliseconds(1000));
-
return result - 1;
} catch(std::exception &e) {
return -1;
}
} else {
- dout(20) << "RGW D4N Directory: Block is not in directory." << dendl;
return -2;
}
}
+
+int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value) { // represent in cache block too -Sam
+ std::string result;
+ std::string key = build_index(block);
+
+ if (!client.is_connected()) {
+ find_client(&client);
+ }
+
+ if (exist_key(key)) {
+ if (field == "hostsList") {
+ /* Append rather than overwrite */
+ std::string hosts;
+
+ try {
+ client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ hosts = reply.as_string();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ value += "_";
+ value += hosts;
+ }
+
+ /* Update cache block */ // Remove ones that aren't used -Sam
+ if (field == "size")
+ block->size = std::stoi(value);
+ else if (field == "bucketName")
+ block->cacheObj.bucketName = value;
+ else if (field == "objName")
+ block->cacheObj.objName = value;
+ else if (field == "hostsList")
+ block->hostsList.push_back(value);
+
+ std::vector< std::pair<std::string, std::string> > list;
+ list.push_back(std::make_pair(field, value));
+
+ try {
+ client.hmset(key, list, [&result](cpp_redis::reply &reply) {
+ if (!reply.is_null()) {
+ result = reply.as_string();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+
+ if (result != "OK") {
+ return -1;
+ }
+ } catch(std::exception &e) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+} } // namespace rgw::d4n
-#ifndef CEPH_RGWD4NDIRECTORY_H
-#define CEPH_RGWD4NDIRECTORY_H
+#ifndef CEPH_D4NDIRECTORY_H
+#define CEPH_D4NDIRECTORY_H
#include "rgw_common.h"
#include <cpp_redis/cpp_redis>
#include <string>
#include <iostream>
-struct cache_obj {
- std::string bucket_name; /* s3 bucket name */
- std::string obj_name; /* s3 obj name */
+namespace rgw { namespace d4n {
+
+struct Address {
+ std::string host;
+ int port;
+};
+
+struct CacheObj {
+ std::string objName; /* S3 object name */
+ std::string bucketName; /* S3 bucket name */
+ time_t creationTime; // Creation time of the S3 Object
+ bool dirty;
+ std::vector<std::string> hostsList; /* Currently not supported: list of hostnames <ip:port> of object locations for multiple backends */
};
-struct cache_block {
- cache_obj c_obj;
- uint64_t size_in_bytes; /* block size_in_bytes */
- std::vector<std::string> hosts_list; /* Currently not supported: list of hostnames <ip:port> of block locations */
+struct CacheBlock {
+ CacheObj cacheObj;
+ uint64_t size; /* Block size in bytes */
+ int globalWeight = 0;
+ std::vector<std::string> hostsList; /* Currently not supported: list of hostnames <ip:port> of block locations */
};
-class RGWDirectory {
+class Directory {
public:
- RGWDirectory() {}
- CephContext *cct;
+ Directory() {}
+ CephContext* cct;
+};
+
+class ObjectDirectory: public Directory { // where else should object directory be called? -Sam
+ public:
+ ObjectDirectory() {}
+ ObjectDirectory(std::string host, int port) {
+ addr.host = host;
+ addr.port = port;
+ }
+
+ void init(CephContext* _cct) {
+ cct = _cct;
+ addr.host = cct->_conf->rgw_d4n_host;
+ addr.port = cct->_conf->rgw_d4n_port;
+ }
+
+ int find_client(cpp_redis::client* client);
+ int exist_key(std::string key);
+ Address get_addr() { return addr; }
+
+ int set_value(CacheObj* object);
+ int get_value(CacheObj* object);
+ int copy_value(CacheObj* object, CacheObj* copyObject);
+ int del_value(CacheObj* object);
+
+ private:
+ cpp_redis::client client;
+ Address addr;
+ std::string build_index(CacheObj* object);
};
-class RGWBlockDirectory: RGWDirectory {
+class BlockDirectory: public Directory {
public:
- RGWBlockDirectory() {}
- RGWBlockDirectory(std::string blockHost, int blockPort):host(blockHost), port(blockPort) {}
+ BlockDirectory() {}
+ BlockDirectory(std::string host, int port) {
+ addr.host = host;
+ addr.port = port;
+ }
- void init(CephContext *_cct) {
+ void init(CephContext* _cct) {
cct = _cct;
- host = cct->_conf->rgw_d4n_host;
- port = cct->_conf->rgw_d4n_port;
+ addr.host = cct->_conf->rgw_d4n_host;
+ addr.port = cct->_conf->rgw_d4n_port;
}
- int findClient(cpp_redis::client *client);
- int existKey(std::string key);
- int setValue(cache_block *ptr);
- int getValue(cache_block *ptr);
- int delValue(cache_block *ptr);
+ int find_client(cpp_redis::client* client);
+ int exist_key(std::string key);
+ Address get_addr() { return addr; }
- std::string get_host() { return host; }
- int get_port() { return port; }
+ int set_value(CacheBlock* block);
+ int get_value(CacheBlock* block);
+ int copy_value(CacheBlock* block, CacheBlock* copyBlock);
+ int del_value(CacheBlock* block);
+
+ int update_field(CacheBlock* block, std::string field, std::string value);
private:
cpp_redis::client client;
- std::string buildIndex(cache_block *ptr);
- std::string host = "";
- int port = 0;
+ Address addr;
+ std::string build_index(CacheBlock* block);
};
+} } // namespace rgw::d4n
+
#endif
--- /dev/null
+#include "d4n_policy.h"
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
+
+namespace rgw { namespace d4n {
+
+int CachePolicy::find_client(const DoutPrefixProvider* dpp) {
+ if (client.is_connected())
+ return 0;
+
+ if (get_addr().host == "" || get_addr().port == 0) {
+ ldpp_dout(dpp, 10) << "RGW D4N Policy: D4N policy endpoint was not configured correctly" << dendl;
+ return EDESTADDRREQ;
+ }
+
+ client.connect(get_addr().host, get_addr().port, nullptr);
+
+ if (!client.is_connected())
+ return ECONNREFUSED;
+
+ return 0;
+}
+
+int CachePolicy::exist_key(std::string key) {
+ int result = -1;
+ std::vector<std::string> keys;
+ keys.push_back(key);
+
+ if (!client.is_connected()) {
+ return result;
+ }
+
+ try {
+ client.exists(keys, [&result](cpp_redis::reply &reply) {
+ if (reply.is_integer()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {}
+
+ return result;
+}
+
+int LFUDAPolicy::set_age(int age) {
+ int result = 0;
+
+ try {
+ client.hset("lfuda", "age", std::to_string(age), [&result](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit();
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ return result - 1;
+}
+
+int LFUDAPolicy::get_age() {
+ int ret = 0;
+ int age = -1;
+
+ try {
+ client.hexists("lfuda", "age", [&ret](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ ret = reply.as_integer();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ if (!ret) {
+ ret = set_age(0); /* Initialize age */
+
+ if (!ret) {
+ return 0; /* Success */
+ } else {
+ return -1;
+ };
+ }
+
+ try {
+ client.hget("lfuda", "age", [&age](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ age = std::stoi(reply.as_string());
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ return age;
+}
+
+int LFUDAPolicy::set_global_weight(std::string key, int weight) {
+ int result = 0;
+
+ try {
+ client.hset(key, "globalWeight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit();
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ return result - 1;
+}
+
+int LFUDAPolicy::get_global_weight(std::string key) {
+ int weight = -1;
+
+ try {
+ client.hget(key, "globalWeight", [&weight](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ weight = reply.as_integer();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ return weight;
+}
+
+int LFUDAPolicy::set_min_avg_weight(int weight, std::string cacheLocation) {
+ int result = 0;
+
+ try {
+ client.hset("lfuda", "minAvgWeight:cache", cacheLocation, [&result](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit();
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ if (result == 1) {
+ result = 0;
+ try {
+ client.hset("lfuda", "minAvgWeight:weight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit();
+ } catch(std::exception &e) {
+ return -1;
+ }
+ }
+
+ return result - 1;
+}
+
+int LFUDAPolicy::get_min_avg_weight() {
+ int ret = 0;
+ int weight = -1;
+
+ try {
+ client.hexists("lfuda", "minAvgWeight:cache", [&ret](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ ret = reply.as_integer();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ if (!ret) {
+ ret = set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */); /* Initialize minimum average weight */
+
+ if (!ret) {
+ return INT_MAX; /* Success */
+ } else {
+ return -1;
+ };
+ }
+
+ try {
+ client.hget("lfuda", "minAvgWeight:weight", [&weight](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ weight = std::stoi(reply.as_string());
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ return weight;
+}
+
+CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
+ std::vector<rgw::cache::Entry> entries = cacheNode->list_entries(dpp);
+ std::string victimName;
+ int minWeight = INT_MAX;
+
+ for (auto it = entries.begin(); it != entries.end(); ++it) {
+ std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight"); // should represent block -Sam
+
+ if (!std::stoi(localWeightStr)) { // maybe do this in some sort of initialization procedure instead of here? -Sam
+ /* Local weight hasn't been set */
+ int ret = cacheNode->set_attr(dpp, it->key, "localWeight", std::to_string(get_age()));
+
+ if (ret < 0)
+ return {};
+ } else if (std::stoi(localWeightStr) < minWeight) {
+ minWeight = std::stoi(localWeightStr);
+ victimName = it->key;
+ }
+ }
+
+ /* Get victim cache block */
+ CacheBlock victimBlock;
+ victimBlock.cacheObj.objName = victimName;
+ BlockDirectory blockDir;
+ blockDir.init(cct);
+
+ int ret = blockDir.get_value(&victimBlock);
+
+ if (ret < 0)
+ return {};
+
+ return victimBlock;
+}
+
+int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) {
+ std::string key = "rgw-object:" + block->cacheObj.objName + ":directory";
+ std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight"); // change to block name eventually -Sam
+ int localWeight = -1;
+
+ if (!client.is_connected())
+ find_client(dpp);
+
+ if (localWeightStr.empty()) { // figure out where to set local weight -Sam
+ /* Local weight hasn't been set */
+ int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age()));
+ localWeight = 0;
+
+ if (ret < 0)
+ return -1;
+ } else {
+ localWeight = std::stoi(localWeightStr);
+ }
+
+ int age = get_age();
+
+ if (cacheNode->key_exists(dpp, block->cacheObj.objName)) { /* Local copy */
+ localWeight += age;
+ } else {
+ std::string hosts;
+ uint64_t freeSpace = cacheNode->get_free_space(dpp);
+
+ while (freeSpace < block->size)
+ freeSpace += eviction(dpp, cacheNode);
+
+ if (exist_key(key)) {
+ try {
+ client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ hosts = reply.as_string();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+ } else {
+ return -2;
+ }
+
+ // should not hold local cache IP if in this else statement -Sam
+ if (hosts.length() > 0) { /* Remote copy */
+ int globalWeight = get_global_weight(key);
+ globalWeight += age;
+
+ if (set_global_weight(key, globalWeight))
+ return -1;
+ } else { /* No remote copy */
+ // do I need to add the block to the local cache here? -Sam
+ // update hosts list for block as well? check read workflow -Sam
+ localWeight += age;
+ return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(localWeight));
+ }
+ }
+ return 0;
+}
+
+uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
+ CacheBlock victim = find_victim(dpp, cacheNode);
+
+ if (victim.cacheObj.objName.empty()) {
+ ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
+ return -1;
+ }
+
+ std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory";
+ std::string hosts;
+ int globalWeight = get_global_weight(key);
+ int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight")); // change to block name eventually -Sam
+ int avgWeight = get_min_avg_weight();
+
+ if (exist_key(key)) {
+ try {
+ client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ hosts = reply.as_string();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+ } else {
+ return -2;
+ }
+
+ if (hosts.empty()) { /* Last copy */
+ if (globalWeight > 0) {
+ localWeight += globalWeight;
+ int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight));
+
+ if (!ret)
+ ret = set_global_weight(key, 0);
+ else
+ return -1;
+
+ if (ret)
+ return -1;
+ }
+
+ if (avgWeight < 0)
+ return -1;
+
+ if (localWeight > avgWeight) {
+ // push block to remote cache
+ }
+ }
+
+ globalWeight += localWeight;
+
+ if (set_global_weight(key, globalWeight))
+ return -2;
+
+ ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
+ int ret = cacheNode->delete_data(dpp, victim.cacheObj.objName);
+
+ if (!ret) {
+ ret = set_min_avg_weight(avgWeight - (localWeight/cacheNode->get_num_entries(dpp)), ""/*local cache location*/); // Where else must this be set? -Sam
+
+ if (!ret) {
+ int age = get_age();
+ age = std::max(localWeight, age);
+ ret = set_age(age);
+
+ if (ret)
+ return -1;
+ } else {
+ return -1;
+ }
+ } else {
+ return -1;
+ }
+
+ return victim.size;
+}
+
+int PolicyDriver::init() {
+ rgw::cache::Partition partition_info;
+ cacheDriver = new rgw::cache::RedisDriver(partition_info, "127.0.0.1", 6379); // hardcoded for now -Sam
+
+ if (policyName == "lfuda") {
+ cachePolicy = new LFUDAPolicy();
+ return 0;
+ } else
+ return -1;
+}
+
+} } // namespace rgw::d4n
--- /dev/null
+#ifndef CEPH_D4NPOLICY_H
+#define CEPH_D4NPOLICY_H
+
+#include <string>
+#include <iostream>
+#include <cpp_redis/cpp_redis>
+#include "rgw_common.h"
+#include "d4n_directory.h"
+#include "../../rgw_redis_driver.h"
+
+namespace rgw { namespace d4n {
+
+class CachePolicy {
+ private:
+ cpp_redis::client client;
+ Address addr;
+
+ public:
+ CephContext* cct;
+
+ CachePolicy() : addr() {}
+ virtual ~CachePolicy() = default;
+
+ virtual void init(CephContext *_cct) {
+ cct = _cct;
+ addr.host = cct->_conf->rgw_d4n_host;
+ addr.port = cct->_conf->rgw_d4n_port;
+ }
+ virtual int find_client(const DoutPrefixProvider* dpp) = 0;
+ virtual int exist_key(std::string key) = 0;
+ virtual Address get_addr() { return addr; }
+ virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0;
+ virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0;
+};
+
+class LFUDAPolicy : public CachePolicy {
+ private:
+ cpp_redis::client client;
+
+ public:
+ LFUDAPolicy() : CachePolicy() {}
+
+ int set_age(int age);
+ int get_age();
+ int set_global_weight(std::string key, int weight);
+ int get_global_weight(std::string key);
+ int set_min_avg_weight(int weight, std::string cacheLocation);
+ int get_min_avg_weight();
+ CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode);
+
+ virtual int find_client(const DoutPrefixProvider* dpp) override { return CachePolicy::find_client(dpp); }
+ virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); }
+ virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
+ virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
+};
+
+class PolicyDriver {
+ private:
+ std::string policyName;
+
+ public:
+ CachePolicy* cachePolicy;
+ rgw::cache::CacheDriver* cacheDriver; // might place elsewhere -Sam
+
+ PolicyDriver(std::string _policyName) : policyName(_policyName) {}
+ ~PolicyDriver() {
+ delete cachePolicy;
+ delete cacheDriver;
+ }
+
+ int init();
+};
+
+} } // namespace rgw::d4n
+
+#endif
int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
{
FilterDriver::initialize(cct, dpp);
- blk_dir->init(cct);
- d4n_cache->init(cct);
+
+ objDir->init(cct);
+ blockDir->init(cct);
+
+ policyDriver->init();
+ policyDriver->cachePolicy->init(cct);
+ policyDriver->cacheDriver->initialize(cct, dpp);
return 0;
}
const DoutPrefixProvider* dpp,
optional_yield y)
{
+ /* Build cache block copy */
+ rgw::d4n::CacheBlock* copyCacheBlock = new rgw::d4n::CacheBlock(); // How will this copy work in lfuda? -Sam
+
+ copyCacheBlock->hostsList.push_back(driver->get_cache_block()->hostsList[0]);
+ copyCacheBlock->size = driver->get_cache_block()->size;
+ copyCacheBlock->size = driver->get_cache_block()->globalWeight; // Do we want to reset the global weight? -Sam
+ copyCacheBlock->cacheObj.bucketName = dest_bucket->get_name();
+ copyCacheBlock->cacheObj.objName = dest_object->get_key().get_oid();
+
+ int copy_valueReturn = driver->get_block_dir()->set_value(copyCacheBlock);
+
+ if (copy_valueReturn < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation succeeded." << dendl;
+ }
+
+ delete copyCacheBlock;
+
/* Append additional metadata to attributes */
rgw::sal::Attrs baseAttrs = this->get_attrs();
buffer::list bl;
baseAttrs.insert(attrs.begin(), attrs.end());
}
- int copyObjReturn = filter->get_d4n_cache()->copyObject(this->get_key().get_oid(), dest_object->get_key().get_oid(), &baseAttrs);
+ /*
+ int copy_attrsReturn = driver->get_policy_driver()->cacheDriver->copy_attrs(this->get_key().get_oid(), dest_object->get_key().get_oid(), &baseAttrs);
- if (copyObjReturn < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation failed." << dendl;
+ if (copy_attrsReturn < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache copy attributes operation failed." << dendl;
} else {
- ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation succeeded." << dendl;
- }
+ int copy_dataReturn = driver->get_policy_driver()->cacheDriver->copy_data(this->get_key().get_oid(), dest_object->get_key().get_oid());
+
+ if (copy_dataReturn < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache copy data operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation succeeded." << dendl;
+ }
+ }*/
return next->copy_object(user, info, source_zone,
nextObject(dest_object),
}
}
- int updateAttrsReturn = filter->get_d4n_cache()->setObject(this->get_key().get_oid(), setattrs);
+ int update_attrsReturn = driver->get_policy_driver()->cacheDriver->set_attrs(dpp, this->get_key().get_oid(), *setattrs);
- if (updateAttrsReturn < 0) {
+ if (update_attrsReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation failed." << dendl;
} else {
ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation succeeded." << dendl;
}
if (delattrs != NULL) {
- std::vector<std::string> delFields;
- Attrs::iterator attrs;
+ Attrs::iterator attr;
+ Attrs currentattrs = this->get_attrs();
- /* Extract fields from delattrs */
- for (attrs = delattrs->begin(); attrs != delattrs->end(); ++attrs) {
- delFields.push_back(attrs->first);
+ /* Ensure all delAttrs exist */
+ for (const auto& attr : *delattrs) {
+ if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) {
+ delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+ }
}
- Attrs currentattrs = this->get_attrs();
- std::vector<std::string> currentFields;
-
- /* Extract fields from current attrs */
- for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) {
- currentFields.push_back(attrs->first);
- }
-
- int delAttrsReturn = filter->get_d4n_cache()->delAttrs(this->get_key().get_oid(), currentFields, delFields);
+ int del_attrsReturn = driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, this->get_key().get_oid(), *delattrs);
- if (delAttrsReturn < 0) {
+ if (del_attrsReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation failed." << dendl;
} else {
ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation succeeded." << dendl;
int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj)
{
- rgw::sal::Attrs newAttrs;
- std::vector< std::pair<std::string, std::string> > newMetadata;
- int getAttrsReturn = filter->get_d4n_cache()->getObject(this->get_key().get_oid(),
- &newAttrs,
- &newMetadata);
+ rgw::sal::Attrs attrs;
+ int get_attrsReturn = driver->get_policy_driver()->cacheDriver->get_attrs(dpp, this->get_key().get_oid(), attrs);
- if (getAttrsReturn < 0) {
+ if (get_attrsReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl;
return next->get_obj_attrs(y, dpp, target_obj);
} else {
- int setAttrsReturn = this->set_attrs(newAttrs);
+ int set_attrsReturn = this->set_attrs(attrs);
- if (setAttrsReturn < 0) {
+ if (set_attrsReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl;
return next->get_obj_attrs(y, dpp, target_obj);
{
Attrs update;
update[(std::string)attr_name] = attr_val;
- int updateAttrsReturn = filter->get_d4n_cache()->updateAttr(this->get_key().get_oid(), &update);
+ int update_attrsReturn = driver->get_policy_driver()->cacheDriver->update_attrs(dpp, this->get_key().get_oid(), update);
- if (updateAttrsReturn < 0) {
+ if (update_attrsReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation failed." << dendl;
} else {
ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation succeeded." << dendl;
int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name,
optional_yield y)
{
- std::vector<std::string> delFields;
- delFields.push_back((std::string)attr_name);
-
- Attrs::iterator attrs;
+ buffer::list bl;
+ Attrs delattr;
+ delattr.insert({attr_name, bl});
Attrs currentattrs = this->get_attrs();
- std::vector<std::string> currentFields;
-
- /* Extract fields from current attrs */
- for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) {
- currentFields.push_back(attrs->first);
- }
-
- int delAttrReturn = filter->get_d4n_cache()->delAttrs(this->get_key().get_oid(), currentFields, delFields);
+ rgw::sal::Attrs::iterator attr = delattr.begin();
- if (delAttrReturn < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation failed." << dendl;
- } else {
- ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation succeeded." << dendl;
- }
-
- return next->delete_obj_attrs(dpp, attr_name, y);
+ /* Ensure delAttr exists */
+ if (std::find_if(currentattrs.begin(), currentattrs.end(),
+ [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) {
+ int delAttrReturn = driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, this->get_key().get_oid(), delattr);
+
+ if (delAttrReturn < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation succeeded." << dendl;
+ }
+ } else
+ return next->delete_obj_attrs(dpp, attr_name, y);
+
+ return 0;
}
std::unique_ptr<Object> D4NFilterDriver::get_object(const rgw_obj_key& k)
int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp)
{
- int getDirReturn = source->filter->get_block_dir()->getValue(source->filter->get_cache_block());
-
- if (getDirReturn < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: Directory get operation failed." << dendl;
- } else {
- ldpp_dout(dpp, 20) << "D4N Filter: Directory get operation succeeded." << dendl;
- }
-
- rgw::sal::Attrs newAttrs;
- std::vector< std::pair<std::string, std::string> > newMetadata;
- int getObjReturn = source->filter->get_d4n_cache()->getObject(source->get_key().get_oid(),
- &newAttrs,
- &newMetadata);
+ rgw::sal::Attrs attrs;
+ int getObjReturn = source->driver->get_policy_driver()->cacheDriver->get_attrs(dpp,
+ source->get_key().get_oid(),
+ attrs);
int ret = next->prepare(y, dpp);
} else {
/* Set metadata locally */
RGWObjState* astate;
+ RGWQuotaInfo quota_info;
+ std::unique_ptr<rgw::sal::User> user = source->driver->get_user(source->get_bucket()->get_owner());
source->get_obj_state(dpp, &astate, y);
- for (auto it = newMetadata.begin(); it != newMetadata.end(); ++it) {
- if (!std::strcmp(it->first.data(), "mtime")) {
- parse_time(it->second.data(), &astate->mtime);
- } else if (!std::strcmp(it->first.data(), "object_size")) {
- source->set_obj_size(std::stoull(it->second));
- } else if (!std::strcmp(it->first.data(), "accounted_size")) {
- astate->accounted_size = std::stoull(it->second);
- } else if (!std::strcmp(it->first.data(), "epoch")) {
- astate->epoch = std::stoull(it->second);
- } else if (!std::strcmp(it->first.data(), "version_id")) {
- source->set_instance(it->second);
- } else if (!std::strcmp(it->first.data(), "source_zone_short_id")) {
- astate->zone_short_id = static_cast<uint32_t>(std::stoul(it->second));
+ for (auto it = attrs.begin(); it != attrs.end(); ++it) {
+ if (it->second.length() > 0) { // or return? -Sam
+ if (it->first == "mtime") {
+ parse_time(it->second.c_str(), &astate->mtime);
+ attrs.erase(it->first);
+ } else if (it->first == "object_size") {
+ source->set_obj_size(std::stoull(it->second.c_str()));
+ attrs.erase(it->first);
+ } else if (it->first == "accounted_size") {
+ astate->accounted_size = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "epoch") {
+ astate->epoch = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "version_id") {
+ source->set_instance(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "source_zone_short_id") {
+ astate->zone_short_id = static_cast<uint32_t>(std::stoul(it->second.c_str()));
+ attrs.erase(it->first);
+ } else if (it->first == "user_quota.max_size") {
+ quota_info.max_size = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "user_quota.max_objects") {
+ quota_info.max_objects = std::stoull(it->second.c_str());
+ attrs.erase(it->first);
+ } else if (it->first == "max_buckets") {
+ user->set_max_buckets(std::stoull(it->second.c_str()));
+ attrs.erase(it->first);
}
}
-
+ user->set_info(quota_info);
source->set_obj_state(*astate);
/* Set attributes locally */
- int setAttrsReturn = source->set_attrs(newAttrs);
+ int set_attrsReturn = source->set_attrs(attrs);
- if (setAttrsReturn < 0) {
+ if (set_attrsReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation failed." << dendl;
} else {
ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation succeeded." << dendl;
}
}
+ }
return ret;
}
+int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
+ RGWGetDataCB* cb, optional_yield y)
+{
+ /* Execute cache replacement policy */
+ int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(),
+ source->driver->get_policy_driver()->cacheDriver);
+
+ if (policyRet < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation succeeded." << dendl;
+ }
+
+ int ret = -1;
+ bufferlist bl;
+ uint64_t len = end - ofs + 1;
+ std::string oid(source->get_name());
+
+ /* Local cache check */
+ if (source->driver->get_policy_driver()->cacheDriver->key_exists(dpp, oid)) { // Entire object for now -Sam
+ ret = source->driver->get_policy_driver()->cacheDriver->get(dpp, source->get_key().get_oid(), ofs, len, bl, source->get_attrs());
+ cb->handle_data(bl, ofs, len);
+ } else {
+ /* Block directory check */
+ int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block());
+
+ if (getDirReturn >= -1) {
+ if (getDirReturn == -1) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation succeeded." << dendl;
+ }
+
+ // remote cache get
+
+ /* Cache block locally */
+ ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
+
+ if (!ret) {
+ int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+
+ if (updateValueReturn < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation succeeded." << dendl;
+ }
+
+ cb->handle_data(bl, ofs, len);
+ }
+ } else {
+ /* Write tier retrieval */
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
+ getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj));
+
+ if (getDirReturn >= -1) {
+ if (getDirReturn == -1) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation succeeded." << dendl;
+ }
+
+ // retrieve from write back cache, which will be stored as a cache driver instance in the filter
+
+ /* Cache block locally */
+ ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
+
+ if (!ret) {
+ int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+
+ if (updateValueReturn < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation succeeded." << dendl;
+ }
+
+ cb->handle_data(bl, ofs, len);
+ }
+ } else {
+ /* Backend store retrieval */
+ ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl;
+ ret = next->iterate(dpp, ofs, end, cb, y);
+
+ if (!ret) {
+ /* Cache block locally */
+ ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
+
+ /* Store block in directory */
+ rgw::d4n::BlockDirectory* tempBlockDir = source->driver->get_block_dir(); // remove later -Sam
+
+ source->driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); // local cache address -Sam
+ source->driver->get_cache_block()->size = source->get_obj_size();
+ source->driver->get_cache_block()->cacheObj.bucketName = source->get_bucket()->get_name();
+ source->driver->get_cache_block()->cacheObj.objName = source->get_key().get_oid();
+
+ int setDirReturn = tempBlockDir->set_value(source->driver->get_cache_block());
+
+ if (setDirReturn < 0) {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ }
+ }
+ }
+ }
+ }
+
+ if (ret < 0)
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache iterate operation failed." << dendl;
+
+ return next->iterate(dpp, ofs, end, cb, y);
+}
+
int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
optional_yield y, uint32_t flags)
{
- int delDirReturn = source->filter->get_block_dir()->delValue(source->filter->get_cache_block());
+ int delDirReturn = source->driver->get_block_dir()->del_value(source->driver->get_cache_block());
if (delDirReturn < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: Directory delete operation failed." << dendl;
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation failed." << dendl;
} else {
- ldpp_dout(dpp, 20) << "D4N Filter: Directory delete operation succeeded." << dendl;
+ ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation succeeded." << dendl;
}
- int delObjReturn = source->filter->get_d4n_cache()->delObject(source->get_key().get_oid());
+ Attrs::iterator attrs;
+ Attrs currentattrs = source->get_attrs();
+ std::vector<std::string> currentFields;
+
+ /* Extract fields from current attrs */
+ for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) {
+ currentFields.push_back(attrs->first);
+ }
+
+ int delObjReturn = source->driver->get_policy_driver()->cacheDriver->delete_data(dpp, source->get_key().get_oid());
if (delObjReturn < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation failed." << dendl;
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object operation failed." << dendl;
} else {
+ Attrs delattrs = source->get_attrs();
+ delObjReturn = source->driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, source->get_key().get_oid(), delattrs);
ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation succeeded." << dendl;
}
int D4NFilterWriter::prepare(optional_yield y)
{
- int delDataReturn = filter->get_d4n_cache()->deleteData(obj->get_key().get_oid());
+ int del_dataReturn = driver->get_policy_driver()->cacheDriver->delete_data(save_dpp, obj->get_key().get_oid());
- if (delDataReturn < 0) {
+ if (del_dataReturn < 0) {
ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation failed." << dendl;
} else {
ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation succeeded." << dendl;
int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
{
- int appendDataReturn = filter->get_d4n_cache()->appendData(obj->get_key().get_oid(), data);
+ int append_dataReturn = driver->get_policy_driver()->cacheDriver->append_data(save_dpp, obj->get_key().get_oid(), data);
- if (appendDataReturn < 0) {
+ if (append_dataReturn < 0) {
ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl;
} else {
ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl;
const req_context& rctx,
uint32_t flags)
{
- cache_block* temp_cache_block = filter->get_cache_block();
- RGWBlockDirectory* temp_block_dir = filter->get_block_dir();
+ rgw::d4n::BlockDirectory* tempBlockDir = driver->get_block_dir();
- temp_cache_block->hosts_list.push_back(temp_block_dir->get_host() + ":" + std::to_string(temp_block_dir->get_port()));
- temp_cache_block->size_in_bytes = accounted_size;
- temp_cache_block->c_obj.bucket_name = obj->get_bucket()->get_name();
- temp_cache_block->c_obj.obj_name = obj->get_key().get_oid();
+ driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port));
+ driver->get_cache_block()->size = accounted_size;
+ driver->get_cache_block()->cacheObj.bucketName = obj->get_bucket()->get_name();
+ driver->get_cache_block()->cacheObj.objName = obj->get_key().get_oid();
- int setDirReturn = temp_block_dir->setValue(temp_cache_block);
+ int setDirReturn = tempBlockDir->set_value(driver->get_cache_block());
if (setDirReturn < 0) {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Directory set operation failed." << dendl;
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
} else {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Directory set operation succeeded." << dendl;
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
}
/* Retrieve complete set of attrs */
- RGWObjState* astate;
int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs,
delete_at, if_match, if_nomatch, user_data, zones_trace,
canceled, rctx, flags);
obj->get_obj_attrs(rctx.y, save_dpp, NULL);
- obj->get_obj_state(save_dpp, &astate, rctx.y);
/* Append additional metadata to attributes */
rgw::sal::Attrs baseAttrs = obj->get_attrs();
rgw::sal::Attrs attrs_temp = baseAttrs;
buffer::list bl;
+ RGWObjState* astate;
+ obj->get_obj_state(save_dpp, &astate, rctx.y);
bl.append(to_iso_8601(obj->get_mtime()));
baseAttrs.insert({"mtime", bl});
baseAttrs.insert(attrs.begin(), attrs.end());
- int setObjReturn = filter->get_d4n_cache()->setObject(obj->get_key().get_oid(), &baseAttrs);
+ int set_attrsReturn = driver->get_policy_driver()->cacheDriver->set_attrs(save_dpp, obj->get_key().get_oid(), baseAttrs);
- if (setObjReturn < 0) {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set operation failed." << dendl;
+ if (set_attrsReturn < 0) {
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation failed." << dendl;
} else {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set operation succeeded." << dendl;
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation succeeded." << dendl;
}
return ret;
}
}
-
#include "rgw_role.h"
#include "common/dout.h"
+#include "rgw_redis_driver.h"
#include "driver/d4n/d4n_directory.h"
-#include "driver/d4n/d4n_datacache.h"
+#include "driver/d4n/d4n_policy.h"
namespace rgw { namespace sal {
class D4NFilterDriver : public FilterDriver {
private:
- RGWBlockDirectory* blk_dir;
- cache_block* c_blk;
- RGWD4NCache* d4n_cache;
+ rgw::d4n::ObjectDirectory* objDir;
+ rgw::d4n::BlockDirectory* blockDir;
+ rgw::d4n::CacheBlock* cacheBlock;
+ rgw::d4n::PolicyDriver* policyDriver;
public:
D4NFilterDriver(Driver* _next) : FilterDriver(_next)
{
- blk_dir = new RGWBlockDirectory(); /* Initialize directory address with cct */
- c_blk = new cache_block();
- d4n_cache = new RGWD4NCache();
+ objDir = new rgw::d4n::ObjectDirectory();
+ blockDir = new rgw::d4n::BlockDirectory();
+ cacheBlock = new rgw::d4n::CacheBlock();
+ policyDriver = new rgw::d4n::PolicyDriver("lfuda");
}
virtual ~D4NFilterDriver() {
- delete blk_dir;
- delete c_blk;
- delete d4n_cache;
+ delete objDir;
+ delete blockDir;
+ delete cacheBlock;
+ delete policyDriver;
}
virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override;
const rgw_placement_rule *ptail_placement_rule,
uint64_t olh_epoch,
const std::string& unique_tag) override;
- RGWBlockDirectory* get_block_dir() { return blk_dir; }
- cache_block* get_cache_block() { return c_blk; }
- RGWD4NCache* get_d4n_cache() { return d4n_cache; }
+ rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir; }
+ rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; }
+ rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; }
+ rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; }
};
class D4NFilterUser : public FilterUser {
private:
- D4NFilterDriver* filter;
+ D4NFilterDriver* driver;
public:
- D4NFilterUser(std::unique_ptr<User> _next, D4NFilterDriver* _filter) :
+ D4NFilterUser(std::unique_ptr<User> _next, D4NFilterDriver* _driver) :
FilterUser(std::move(_next)),
- filter(_filter) {}
+ driver(_driver) {}
virtual ~D4NFilterUser() = default;
};
class D4NFilterObject : public FilterObject {
private:
- D4NFilterDriver* filter;
+ D4NFilterDriver* driver;
public:
struct D4NFilterReadOp : FilterReadOp {
virtual ~D4NFilterReadOp() = default;
virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
+ virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
+ RGWGetDataCB* cb, optional_yield y) override;
};
struct D4NFilterDeleteOp : FilterDeleteOp {
virtual int delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) override;
};
- D4NFilterObject(std::unique_ptr<Object> _next, D4NFilterDriver* _filter) : FilterObject(std::move(_next)),
- filter(_filter) {}
- D4NFilterObject(std::unique_ptr<Object> _next, Bucket* _bucket, D4NFilterDriver* _filter) : FilterObject(std::move(_next), _bucket),
- filter(_filter) {}
- D4NFilterObject(D4NFilterObject& _o, D4NFilterDriver* _filter) : FilterObject(_o),
- filter(_filter) {}
+ D4NFilterObject(std::unique_ptr<Object> _next, D4NFilterDriver* _driver) : FilterObject(std::move(_next)),
+ driver(_driver) {}
+ D4NFilterObject(std::unique_ptr<Object> _next, Bucket* _bucket, D4NFilterDriver* _driver) : FilterObject(std::move(_next), _bucket),
+ driver(_driver) {}
+ D4NFilterObject(D4NFilterObject& _o, D4NFilterDriver* _driver) : FilterObject(_o),
+ driver(_driver) {}
virtual ~D4NFilterObject() = default;
virtual int copy_object(User* user,
class D4NFilterWriter : public FilterWriter {
private:
- D4NFilterDriver* filter;
+ D4NFilterDriver* driver;
const DoutPrefixProvider* save_dpp;
bool atomic;
public:
- D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _filter, Object* _obj,
+ D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj,
const DoutPrefixProvider* _dpp) : FilterWriter(std::move(_next), _obj),
- filter(_filter),
+ driver(_driver),
save_dpp(_dpp), atomic(false) {}
- D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _filter, Object* _obj,
+ D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj,
const DoutPrefixProvider* _dpp, bool _atomic) : FilterWriter(std::move(_next), _obj),
- filter(_filter),
+ driver(_driver),
save_dpp(_dpp), atomic(_atomic) {}
virtual ~D4NFilterWriter() = default;
#pragma once
#include <memory>
+#include "rgw_sal.h"
+#include "rgw_auth.h"
class ActiveRateLimiter;
class OpsLogSink;
if (client.is_connected())
return 0;
- /*
if (addr.host == "" || addr.port == 0) {
dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
return EDESTADDRREQ;
- }*/
+ }
client.connect("127.0.0.1", 6379, nullptr);
return result;
}
+std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp)
+{
+ return std::make_unique<RedisCacheAioRequest>(this);
+}
+
+rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
+{
+ rgw_raw_obj r_obj;
+ r_obj.oid = key;
+ return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
+}
+
+void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
+void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
+
} } // namespace rgw::cal
namespace rgw { namespace cache {
+class RedisDriver;
+
+class RedisCacheAioRequest: public CacheAioRequest {
+public:
+ RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {}
+ virtual ~RedisCacheAioRequest() = default;
+ virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+ virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+private:
+ RedisDriver* cache_driver;
+};
+
class RedisDriver : public CacheDriver {
private:
cpp_redis::client client;
+ rgw::d4n::Address addr;
std::unordered_map<std::string, Entry> entries;
+ int find_client(const DoutPrefixProvider* dpp);
int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len);
int remove_entry(const DoutPrefixProvider* dpp, std::string key);
std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
public:
- RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {}
+ RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {
+ addr.host = host;
+ addr.port = port;
+ }
virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override;
+ virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) override;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override;
virtual std::vector<Entry> list_entries(const DoutPrefixProvider* dpp) override;
virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override;
+ int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam
/* Partition */
virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override;
virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override;
+
+ virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
};
} } // namespace rgw::cal
#include <sstream>
#include "common/errno.h"
+//#include "common/dout.h"
#include "rgw_sal.h"
#include "rgw_sal_rados.h"
#endif
#define dout_subsys ceph_subsys_rgw
+//#define dout_context g_ceph_context
extern "C" {
extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context);
-#include "d4n_directory.h"
+#include "../rgw/driver/d4n/d4n_directory.h" // Fix -Sam
#include "rgw_process_env.h"
#include <cpp_redis/cpp_redis>
#include <iostream>
string portStr;
string hostStr;
string redisHost = "";
-string oid = "samoid";
+string oid = "testName";
string bucketName = "testBucket";
-int blkSize = 123;
+int blockSize = 123;
class DirectoryFixture: public ::testing::Test {
protected:
virtual void SetUp() {
- blk_dir = new RGWBlockDirectory(hostStr, stoi(portStr));
- c_blk = new cache_block();
+ blockDir = new rgw::d4n::BlockDirectory(hostStr, stoi(portStr));
+ cacheBlock = new rgw::d4n::CacheBlock();
- c_blk->hosts_list.push_back(redisHost);
- c_blk->size_in_bytes = blkSize;
- c_blk->c_obj.bucket_name = bucketName;
- c_blk->c_obj.obj_name = oid;
+ cacheBlock->hostsList.push_back(redisHost);
+ cacheBlock->size = blockSize;
+ cacheBlock->cacheObj.bucketName = bucketName;
+ cacheBlock->cacheObj.objName = oid;
}
virtual void TearDown() {
- delete blk_dir;
- blk_dir = nullptr;
+ delete blockDir;
+ blockDir = nullptr;
- delete c_blk;
- c_blk = nullptr;
+ delete cacheBlock;
+ cacheBlock = nullptr;
}
- RGWBlockDirectory* blk_dir;
- cache_block* c_blk;
+ rgw::d4n::BlockDirectory* blockDir;
+ rgw::d4n::CacheBlock* cacheBlock;
};
/* Successful initialization */
TEST_F(DirectoryFixture, DirectoryInit) {
- ASSERT_NE(blk_dir, nullptr);
- ASSERT_NE(c_blk, nullptr);
+ ASSERT_NE(blockDir, nullptr);
+ ASSERT_NE(cacheBlock, nullptr);
ASSERT_NE(redisHost.length(), (long unsigned int)0);
}
-/* Successful setValue Call and Redis Check */
+/* Successful set_value Call and Redis Check */
TEST_F(DirectoryFixture, SetValueTest) {
cpp_redis::client client;
int key_exist = -1;
string key;
string hosts;
string size;
- string bucket_name;
- string obj_name;
+ string bucketName;
+ string objName;
std::vector<std::string> fields;
- int setReturn = blk_dir->setValue(c_blk);
+ int setReturn = blockDir->set_value(cacheBlock);
ASSERT_EQ(setReturn, 0);
fields.push_back("key");
fields.push_back("hosts");
fields.push_back("size");
- fields.push_back("bucket_name");
- fields.push_back("obj_name");
+ fields.push_back("bucketName");
+ fields.push_back("objName");
client.connect(hostStr, stoi(portStr), nullptr, 0, 5, 1000);
ASSERT_EQ((bool)client.is_connected(), (bool)1);
- client.hmget("rgw-object:" + oid + ":directory", fields, [&key, &hosts, &size, &bucket_name, &obj_name, &key_exist](cpp_redis::reply& reply) {
+ client.hmget("rgw-object:" + oid + ":block-directory", fields, [&key, &hosts, &size, &bucketName, &objName, &key_exist](cpp_redis::reply& reply) {
auto arr = reply.as_array();
if (!arr[0].is_null()) {
key = arr[0].as_string();
hosts = arr[1].as_string();
size = arr[2].as_string();
- bucket_name = arr[3].as_string();
- obj_name = arr[4].as_string();
+ bucketName = arr[3].as_string();
+ objName = arr[4].as_string();
}
});
client.sync_commit();
EXPECT_EQ(key_exist, 0);
- EXPECT_EQ(key, "rgw-object:" + oid + ":directory");
+ EXPECT_EQ(key, "rgw-object:" + oid + ":block-directory");
EXPECT_EQ(hosts, redisHost);
- EXPECT_EQ(size, to_string(blkSize));
- EXPECT_EQ(bucket_name, bucketName);
- EXPECT_EQ(obj_name, oid);
+ EXPECT_EQ(size, to_string(blockSize));
+ EXPECT_EQ(bucketName, bucketName);
+ EXPECT_EQ(objName, oid);
client.flushall();
}
-/* Successful getValue Calls and Redis Check */
+/* Successful get_value Calls and Redis Check */
TEST_F(DirectoryFixture, GetValueTest) {
cpp_redis::client client;
int key_exist = -1;
string key;
string hosts;
string size;
- string bucket_name;
- string obj_name;
+ string bucketName;
+ string objName;
std::vector<std::string> fields;
- int setReturn = blk_dir->setValue(c_blk);
+ int setReturn = blockDir->set_value(cacheBlock);
ASSERT_EQ(setReturn, 0);
fields.push_back("key");
fields.push_back("hosts");
fields.push_back("size");
- fields.push_back("bucket_name");
- fields.push_back("obj_name");
+ fields.push_back("bucketName");
+ fields.push_back("objName");
client.connect(hostStr, stoi(portStr), nullptr, 0, 5, 1000);
ASSERT_EQ((bool)client.is_connected(), (bool)1);
- client.hmget("rgw-object:" + oid + ":directory", fields, [&key, &hosts, &size, &bucket_name, &obj_name, &key_exist](cpp_redis::reply& reply) {
+ client.hmget("rgw-object:" + oid + ":block-directory", fields, [&key, &hosts, &size, &bucketName, &objName, &key_exist](cpp_redis::reply& reply) {
auto arr = reply.as_array();
if (!arr[0].is_null()) {
key = arr[0].as_string();
hosts = arr[1].as_string();
size = arr[2].as_string();
- bucket_name = arr[3].as_string();
- obj_name = arr[4].as_string();
+ bucketName = arr[3].as_string();
+ objName = arr[4].as_string();
}
});
client.sync_commit();
EXPECT_EQ(key_exist, 0);
- EXPECT_EQ(key, "rgw-object:" + oid + ":directory");
+ EXPECT_EQ(key, "rgw-object:" + oid + ":block-directory");
EXPECT_EQ(hosts, redisHost);
- EXPECT_EQ(size, to_string(blkSize));
- EXPECT_EQ(bucket_name, bucketName);
- EXPECT_EQ(obj_name, oid);
+ EXPECT_EQ(size, to_string(blockSize));
+ EXPECT_EQ(bucketName, bucketName);
+ EXPECT_EQ(objName, oid);
/* Check if object name in directory instance matches redis update */
- client.hset("rgw-object:" + oid + ":directory", "obj_name", "newoid", [](cpp_redis::reply& reply) {
- if (reply.is_integer()) {
- ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */
+ client.hset("rgw-object:" + oid + ":block-directory", "objName", "newoid", [](cpp_redis::reply& reply) {
+ if (!reply.is_null()) {
+ ASSERT_EQ(reply.as_integer(), 0);
}
});
client.sync_commit();
- int getReturn = blk_dir->getValue(c_blk);
+ int getReturn = blockDir->get_value(cacheBlock);
ASSERT_EQ(getReturn, 0);
- EXPECT_EQ(c_blk->c_obj.obj_name, "newoid");
+ EXPECT_EQ(cacheBlock->cacheObj.objName, "newoid");
client.flushall();
}
-/* Successful delValue Call and Redis Check */
+/* Successful del_value Call and Redis Check */
TEST_F(DirectoryFixture, DelValueTest) {
cpp_redis::client client;
vector<string> keys;
- int setReturn = blk_dir->setValue(c_blk);
+ int setReturn = blockDir->set_value(cacheBlock);
ASSERT_EQ(setReturn, 0);
- /* Ensure cache entry exists in cache before deletion */
- keys.push_back("rgw-object:" + oid + ":directory");
+ /* Ensure entry exists in directory before deletion */
+ keys.push_back("rgw-object:" + oid + ":block-directory");
client.exists(keys, [](cpp_redis::reply& reply) {
if (reply.is_integer()) {
}
});
- int delReturn = blk_dir->delValue(c_blk);
+ int delReturn = blockDir->del_value(cacheBlock);
ASSERT_EQ(delReturn, 0);
client.exists(keys, [](cpp_redis::reply& reply) {
if (reply.is_integer()) {
- ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */
+ ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */
}
});
#include <cpp_redis/cpp_redis>
#include "driver/dbstore/common/dbstore.h"
#include "rgw_sal_store.h"
-#include "driver/d4n/rgw_sal_d4n.h"
+#include "../../rgw/driver/d4n/rgw_sal_d4n.h" // fix -Sam
#include "rgw_sal.h"
#include "rgw_auth.h"
rgw_zone_set zones_trace;
bool canceled;
- int ret = testWriter->complete(accounted_size, etag,
+ int ret = testWriter->prepare(null_yield);
+
+ if (!ret) {
+ ret = testWriter->complete(accounted_size, etag,
&mtime, set_mtime,
attrs,
delete_at,
TEST_F(D4NFilterFixture, PutObject) {
cpp_redis::client client;
vector<string> fields;
- fields.push_back("test_attrs_key_0");
+ fields.push_back("test_attrs_key_PutObject");
clientSetUp(&client);
ASSERT_EQ(createUser(), 0);
TEST_F(D4NFilterFixture, CopyObjectReplace) {
cpp_redis::client client;
- vector<string> fields;
clientSetUp(&client);
createUser();
client.sync_commit();
- /* Check copy */
- client.hgetall("rgw-object:test_object_copy:cache", [](cpp_redis::reply& reply) {
+ /* Retrieve original object's redis data for later comparison */
+ std::vector< std::pair<std::string, std::string> > data;
+
+ client.hgetall("rgw-object:test_object_CopyObjectReplace:cache", [&data](cpp_redis::reply& reply) {
auto arr = reply.as_array();
if (!arr[0].is_null()) {
- EXPECT_EQ((int)arr.size(), 4 + METADATA_LENGTH); /* With etag */
+ for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+ data.push_back({arr[i].as_string(), arr[i + 1].as_string()});
+ }
}
});
client.sync_commit();
-
- fields.push_back("test_attrs_key_CopyObjectReplace");
-
- client.hmget("rgw-object:test_object_copy:cache", fields, [](cpp_redis::reply& reply) {
+
+ /* Check copy */
+ client.hgetall("rgw-object:test_object_copy:cache", [&data](cpp_redis::reply& reply) {
+ bool unexpected = false;
auto arr = reply.as_array();
if (!arr[0].is_null()) {
- EXPECT_EQ(arr[0].as_string(), "test_attrs_copy_value");
+ EXPECT_EQ((int)arr.size(), 4 + METADATA_LENGTH); /* With etag */
+
+ for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+ auto it = std::find_if(data.begin(), data.end(),
+ [&](const auto& pair) { return pair.first == arr[i].as_string(); });
+
+ if (it != data.end()) {
+ if (arr[i].as_string() == "test_attrs_key_CopyObjectReplace")
+ EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_copy_value");
+ else if (arr[i].as_string() != "mtime") { /* mtime will be different */
+ int index = std::distance(data.begin(), it);
+ EXPECT_EQ(arr[i + 1].as_string(), data[index].second);
+ }
+ } else if (arr[i].as_string() == "etag") {
+ EXPECT_EQ(arr[i + 1].as_string(), "test_etag_copy");
+ } else
+ unexpected = true; /* Unexpected field */
+ }
+
+ EXPECT_EQ(unexpected, false);
}
});
client.sync_commit();
-
+
clientReset(&client);
}
client.sync_commit();
- /* Check copy */
- client.hgetall("rgw-object:test_object_copy:cache", [](cpp_redis::reply& reply) {
+ /* Retrieve original object's redis data for later comparison */
+ std::vector< std::pair<std::string, std::string> > data;
+
+ client.hgetall("rgw-object:test_object_CopyObjectMerge:cache", [&data](cpp_redis::reply& reply) {
auto arr = reply.as_array();
if (!arr[0].is_null()) {
- EXPECT_EQ((int)arr.size(), 6 + METADATA_LENGTH); /* With etag */
+ for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+ data.push_back({arr[i].as_string(), arr[i + 1].as_string()});
+ }
}
});
client.sync_commit();
-
- fields.push_back("test_attrs_key_CopyObjectMerge");
- fields.push_back("test_attrs_copy_extra_key");
-
- client.hmget("rgw-object:test_object_copy:cache", fields, [](cpp_redis::reply& reply) {
+
+ /* Check copy */
+ client.hgetall("rgw-object:test_object_copy:cache", [&data](cpp_redis::reply& reply) {
+ bool unexpected = false;
+ bool merge = false;
auto arr = reply.as_array();
if (!arr[0].is_null()) {
- EXPECT_EQ(arr[0].as_string(), "test_attrs_value_CopyObjectMerge");
- EXPECT_EQ(arr[1].as_string(), "test_attrs_copy_extra_value");
+ EXPECT_EQ((int)arr.size(), 6 + METADATA_LENGTH); /* With etag */
+
+ for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+ auto it = std::find_if(data.begin(), data.end(),
+ [&](const auto& pair) { return pair.first == arr[i].as_string(); });
+
+ if (it != data.end()) {
+ if (arr[i].as_string() == "test_attrs_key_CopyObjectMerge")
+ EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_value_CopyObjectMerge");
+ else if (arr[i].as_string() != "mtime") { /* mtime will be different */
+ int index = std::distance(data.begin(), it);
+ EXPECT_EQ(arr[i + 1].as_string(), data[index].second);
+ }
+ } else if (arr[i].as_string() == "etag") {
+ EXPECT_EQ(arr[i + 1].as_string(), "test_etag_copy");
+ } else if (arr[i].as_string() == "test_attrs_copy_extra_key") {
+ merge = true;
+ EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_copy_extra_value");
+ } else
+ unexpected = true; /* Unexpected field */
+ }
+
+ EXPECT_EQ(unexpected, false);
+ EXPECT_EQ(merge, true);
}
});
clientReset(&client);
}
+TEST_F(D4NFilterFixture, CachePolicy) {
+ cpp_redis::client client;
+ clientSetUp(&client);
+
+ createUser();
+ createBucket();
+
+ /* Create multipart object */
+ string object_name = "test_object_CachePolicy";
+ unique_ptr<rgw::sal::Object> obj = testBucket->get_object(rgw_obj_key(object_name));
+ rgw_user owner;
+ rgw_placement_rule ptail_placement_rule;
+ uint64_t olh_epoch = 123;
+ string unique_tag;
+
+ obj->get_obj_attrs(null_yield, dpp);
+
+ testWriter = driver->get_atomic_writer(dpp,
+ null_yield,
+ obj.get(),
+ owner,
+ &ptail_placement_rule,
+ olh_epoch,
+ unique_tag);
+
+ size_t accounted_size = 15; /* Uploaded as multipart */
+ string etag("test_etag");
+ ceph::real_time mtime;
+ ceph::real_time set_mtime;
+
+ buffer::list bl;
+ string tmp = "test_attrs_value_CachePolicy";
+ bl.append("test_attrs_value_CachePolicy");
+ map<string, bufferlist> attrs{{"test_attrs_key_CachePolicy", bl}};
+
+ ceph::real_time delete_at;
+ char if_match;
+ char if_nomatch;
+ string user_data;
+ rgw_zone_set zones_trace;
+ bool canceled;
+
+ ASSERT_EQ(testWriter->complete(accounted_size, etag,
+ &mtime, set_mtime,
+ attrs,
+ delete_at,
+ &if_match, &if_nomatch,
+ &user_data,
+ &zones_trace, &canceled,
+ null_yield), 0);
+
+
+ unique_ptr<rgw::sal::Object> testObject_CachePolicy = testBucket->get_object(rgw_obj_key("test_object_CachePolicy"));
+
+ ASSERT_NE(testObject_CachePolicy, nullptr);
+
+ /* Copy to new multipart object */
+ unique_ptr<rgw::sal::Writer> testWriterCopy = nullptr;
+ unique_ptr<rgw::sal::Object> obj_copy = testBucket->get_object(rgw_obj_key("test_object_copy"));
+ uint64_t olh_epoch_copy = 123;
+
+ obj_copy->get_obj_attrs(null_yield, dpp);
+
+ testWriterCopy = driver->get_atomic_writer(dpp,
+ null_yield,
+ obj_copy.get(),
+ owner,
+ &ptail_placement_rule,
+ olh_epoch_copy,
+ unique_tag);
+
+ RGWEnv rgw_env;
+ req_info info(get_pointer(env->cct), &rgw_env);
+ rgw_zone_id source_zone;
+ rgw_placement_rule dest_placement;
+ ceph::real_time src_mtime;
+ ceph::real_time mod_ptr;
+ ceph::real_time unmod_ptr;
+ rgw::sal::AttrsMod attrs_mod = rgw::sal::ATTRSMOD_REPLACE;
+ RGWObjCategory category = RGWObjCategory::Main;
+ string tag;
+
+ ASSERT_EQ(testWriterCopy->complete(accounted_size, etag,
+ &mtime, set_mtime,
+ attrs,
+ delete_at,
+ &if_match, &if_nomatch,
+ &user_data,
+ &zones_trace, &canceled,
+ null_yield), 0);
+
+ unique_ptr<rgw::sal::Object> testObject_copy = testBucket->get_object(rgw_obj_key("test_object_copy"));
+
+ EXPECT_EQ(testObject_CachePolicy->copy_object(testUser.get(),
+ &info, source_zone, testObject_copy.get(),
+ testBucket.get(), testBucket.get(),
+ dest_placement, &src_mtime, &mtime,
+ &mod_ptr, &unmod_ptr, false,
+ &if_match, &if_nomatch, attrs_mod,
+ false, attrs, category, olh_epoch,
+ delete_at, NULL, &tag, &etag,
+ NULL, NULL, dpp, null_yield), 0);
+
+ /* Ensure data field doesn't exist for original object */
+ client.hexists("rgw-object:test_object_CachePolicy:cache", "data", [](cpp_redis::reply& reply) {
+ if (reply.is_integer()) {
+ EXPECT_EQ(reply.as_integer(), 0);
+ }
+ });
+
+ client.sync_commit();
+
+ /* Ensure data field doesn't exist for copy */
+ client.hexists("rgw-object:test_object_CachePolicy:cache", "data", [](cpp_redis::reply& reply) {
+ if (reply.is_integer()) {
+ EXPECT_EQ(reply.as_integer(), 0);
+ }
+ });
+
+ client.sync_commit();
+
+ clientReset(&client);
+}
+
/* Attribute-related tests */
TEST_F(D4NFilterFixture, SetObjectAttrs) {
cpp_redis::client client;