]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: This commit squashes the following commits related to
authorSamarah <samarah.uriarte@ibm.com>
Thu, 25 May 2023 18:58:26 +0000 (14:58 -0400)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:50 +0000 (21:24 +0530)
d4n policy and filter driver.

RGW: Add cache driver base class - add d4n filter driver.
RGW: Update cache and policy files; add RedisDriver into D4N filter
RGW: Add D4N classes and unit testing; update cpp_redis submodule
QA: Add D4N teuthology suite
RGW: Improve D4N readability and structure
RGW: Add base D4N policy and test
RGW: Add GWF policy to D4N
RGW: Add global weight property
RGW: Added D4N namespace
RGW: Update policy driver interface
RGW: Update unit tests
RGW: Add Address struct to all files
RGW: Update D4N names and structure
RGW: Update structure and fix errors
RGW: Add more features to policy driver
RGW: Access local weight in policy code
RGW: Work on D4N workflows
RGW: Add object directory class and remove copy_value method
RGW: Establish object directory in read workflow
RGW: Update cache and policy files; add RedisDriver into D4N filter
RGW: Switch out D4N cache methods with Redis driver methods
RGW: Update RedisDriver to match new CacheDriver structure; define set_attrs method
RGW: Fix D4N read workflow crashes
RGW: Update D4N files to match CacheDriver changes
RGW: fix key_exists method for RedisDriver and clean up rgw_sal_d4n.cc
RGW: Use correct get_block method
RGW: Make CachePolicy a virtual class
RGW: Initialize localWeight if not found and develop find_victim method
RGW: Debugging network failure
RGW: Rebase and debugging network failure
RGW: Update RedisDriver::list_entries and usage in D4N policy driver
RGW: Fix network failure issue; add entries and entry methods
RGW: Update D4N test files to match rebase
RGW: Update D4N policy and RedisDriver with entries
RGW: make localWeight an attribute
rgw/cache: commit to fix compilation issues.

Signed-off-by: Samarah <samarah.uriarte@ibm.com>
16 files changed:
ceph.spec.in
src/rgw/CMakeLists.txt
src/rgw/driver/d4n/d4n_datacache.cc [deleted file]
src/rgw/driver/d4n/d4n_datacache.h [deleted file]
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc [new file with mode: 0644]
src/rgw/driver/d4n/d4n_policy.h [new file with mode: 0644]
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_process_env.h
src/rgw/rgw_redis_driver.cc
src/rgw/rgw_redis_driver.h
src/rgw/rgw_sal.cc
src/test/rgw/test_d4n_directory.cc
src/test/rgw/test_d4n_filter.cc

index 6e5942280bb4b361e4fe8613e64485078307fbc5..ed62be27f05017065f04292bc7d76a32da32a59e 100644 (file)
@@ -2228,6 +2228,12 @@ fi
 %dir %{_localstatedir}/lib/ceph/radosgw
 %{_unitdir}/ceph-radosgw@.service
 %{_unitdir}/ceph-radosgw.target
+%exclude %{_includedir}/cpp_redis
+%exclude %{_includedir}/tacopie
+%exclude /usr/lib/libcpp_redis.a
+%exclude /usr/lib/libtacopie.a
+%exclude /usr/lib/pkgconfig/cpp_redis.pc
+%exclude /usr/lib/pkgconfig/tacopie.pc
 
 %post radosgw
 %if 0%{?suse_version}
index de907dd150f792b65ffab8aa6ad7295f01b80235..a54d57f9c5a7fb37d603898f5e0f6a9b669ae5a6 100644 (file)
@@ -150,6 +150,9 @@ set(librgw_common_srcs
   rgw_tracer.cc
   rgw_lua_background.cc
   rgw_data_access.cc
+  driver/d4n/d4n_directory.cc
+  driver/d4n/d4n_policy.cc
+  driver/d4n/rgw_sal_d4n.cc
   driver/rados/cls_fifo_legacy.cc
   driver/rados/rgw_bucket.cc
   driver/rados/rgw_bucket_sync.cc
@@ -199,6 +202,7 @@ set(librgw_common_srcs
   driver/rados/topic.cc)
 
 list(APPEND librgw_common_srcs
+  driver/d4n/d4n_directory.cc
   driver/immutable_config/store.cc
   driver/json_config/store.cc
   driver/rados/config/impl.cc
@@ -227,7 +231,6 @@ if(WITH_RADOSGW_DAOS)
 endif()
 if(WITH_RADOSGW_D4N)
   list(APPEND librgw_common_srcs driver/d4n/d4n_directory.cc)
-  list(APPEND librgw_common_srcs driver/d4n/d4n_datacache.cc)
   list(APPEND librgw_common_srcs driver/d4n/rgw_sal_d4n.cc)
 endif()
 if(WITH_RADOSGW_POSIX)
diff --git a/src/rgw/driver/d4n/d4n_datacache.cc b/src/rgw/driver/d4n/d4n_datacache.cc
deleted file mode 100644 (file)
index ec0338f..0000000
+++ /dev/null
@@ -1,490 +0,0 @@
-#include "d4n_datacache.h"
-
-#define dout_subsys ceph_subsys_rgw
-#define dout_context g_ceph_context
-
-/* Base metadata and data fields should remain consistent */
-std::vector<std::string> baseFields {
-  "mtime",
-  "object_size",
-  "accounted_size",
-  "epoch",
-  "version_id",
-  "source_zone_short_id",
-  "bucket_count",
-  "bucket_size",
-  "user_quota.max_size",
-  "user_quota.max_objects",
-  "max_buckets",
-  "data"};
-
-std::vector< std::pair<std::string, std::string> > RGWD4NCache::buildObject(rgw::sal::Attrs* binary) {
-  std::vector< std::pair<std::string, std::string> > values;
-  rgw::sal::Attrs::iterator attrs;
-  /* Convert to vector */
-  if (binary != NULL) {
-    for (attrs = binary->begin(); attrs != binary->end(); ++attrs) {
-      values.push_back(std::make_pair(attrs->first, attrs->second.to_str()));
-    }
-  } 
-
-  return values; 
-}
-
-int RGWD4NCache::findClient(cpp_redis::client *client) { 
-  if (client->is_connected())
-    return 0;
-
-  if (host == "" || port == 0) { 
-    dout(10) << "RGW D4N Cache: D4N cache endpoint was not configured correctly" << dendl;
-    return EDESTADDRREQ;
-  }
-
-  client->connect(host, port, nullptr);
-
-  if (!client->is_connected())
-    return ECONNREFUSED;
-
-  return 0;
-}
-
-int RGWD4NCache::existKey(std::string key) { 
-  int result = -1;
-  std::vector<std::string> keys;
-  keys.push_back(key);
-
-  if (!client.is_connected()) {
-    return result;
-  }
-
-  try {
-    client.exists(keys, [&result](cpp_redis::reply &reply) {
-      if (reply.is_integer()) {
-        result = reply.as_integer(); /* Returns 1 upon success */
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-  } catch(std::exception &e) {}
-  
-  return result;
-}
-
-int RGWD4NCache::setObject(std::string oid, rgw::sal::Attrs* attrs) {
-  /* Creating the index based on oid */
-  std::string key = "rgw-object:" + oid + ":cache";
-  std::string result;
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-
-  /* Every set will be treated as new */
-  try {
-    std::vector< std::pair<std::string, std::string> > redisObject = buildObject(attrs);
-      
-    if (redisObject.empty()) {
-      return -1;
-    }
-      
-    client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-
-    if (result != "OK") {
-      return -1;
-    }
-  } catch(std::exception &e) {
-    return -1;
-  }
-
-  return 0;
-}
-
-int RGWD4NCache::getObject(std::string oid, 
-    rgw::sal::Attrs* newAttrs, 
-    std::vector< std::pair<std::string, std::string> >* newMetadata) 
-{
-  std::string result;
-  std::string key = "rgw-object:" + oid + ":cache";
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-
-  if (existKey(key)) {
-    int field_exist = -1;
-    
-    rgw::sal::Attrs::iterator it;
-    std::vector< std::pair<std::string, std::string> > redisObject;
-    std::vector<std::string> getFields;
-
-    /* Retrieve existing fields from cache */
-    try {
-      client.hgetall(key, [&getFields](cpp_redis::reply &reply) {
-       if (reply.is_array()) {
-         auto arr = reply.as_array();
-
-         if (!arr[0].is_null()) {
-           for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
-             getFields.push_back(arr[i].as_string());
-           }
-         }
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
-      return -1;
-    }
-
-    /* Only data exists */
-    if (getFields.size() == 1 && getFields[0] == "data")
-      return 0;
-
-    /* Ensure all metadata, attributes, and data has been set */
-    for (const auto& field : baseFields) { 
-      auto it = std::find_if(getFields.begin(), getFields.end(),
-        [&](const auto& comp) { return comp == field; });
-
-      if (it != getFields.end()) {
-       int index = std::distance(getFields.begin(), it);
-       getFields.erase(getFields.begin() + index);
-      } else {
-        return -1;
-      }
-    }
-
-    /* Get attributes from cache */
-    try {
-      client.hmget(key, getFields, [&field_exist, &newAttrs, &getFields](cpp_redis::reply &reply) {
-        if (reply.is_array()) {
-         auto arr = reply.as_array();
-
-         if (!arr[0].is_null()) {
-           field_exist = 0;
-
-            for (long unsigned int i = 0; i < getFields.size(); ++i) {
-             std::string tmp = arr[i].as_string();
-              buffer::list bl;
-             bl.append(tmp);
-             newAttrs->insert({getFields[i], bl});
-            }
-         }
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
-      return -1;
-    }
-    
-    if (field_exist == 0) {
-      field_exist = -1;
-
-      getFields.clear();
-      getFields.insert(getFields.begin(), baseFields.begin(), baseFields.end());
-      getFields.pop_back(); /* Do not query for data field */
-
-      /* Get metadata from cache */
-      try {
-       client.hmget(key, getFields, [&field_exist, &newMetadata, &getFields](cpp_redis::reply &reply) {
-         if (reply.is_array()) {
-           auto arr = reply.as_array();
-
-           if (!arr[0].is_null()) {
-             field_exist = 0;
-
-             for (long unsigned int i = 0; i < getFields.size(); ++i) {
-               newMetadata->push_back({getFields[i], arr[i].as_string()});
-             }
-           }
-         }
-       });
-
-       client.sync_commit(std::chrono::milliseconds(1000));
-      } catch(std::exception &e) {
-       return -1;
-      }
-    } else {
-      return -1;
-    }
-  } else {
-    dout(20) << "RGW D4N Cache: Object was not retrievable." << dendl;
-    return -2;
-  }
-
-  return 0;
-}
-
-int RGWD4NCache::copyObject(std::string original_oid, std::string copy_oid, rgw::sal::Attrs* attrs) {
-  std::string result;
-  std::vector< std::pair<std::string, std::string> > redisObject;
-  std::string key = "rgw-object:" + original_oid + ":cache";
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-
-  /* Read values from cache */
-  if (existKey(key)) {
-    try {
-      client.hgetall(key, [&redisObject](cpp_redis::reply &reply) {
-        if (reply.is_array()) {
-         auto arr = reply.as_array();
-
-         if (!arr[0].is_null()) {
-            for (long unsigned int i = 0; i < arr.size() - 1; i += 2) {
-             redisObject.push_back({arr[i].as_string(), arr[i + 1].as_string()});
-           }
-         }
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
-      return -1;
-    }
-  } else {
-    return -2; 
-  }
-
-  /* Build copy with updated values */
-  if (!redisObject.empty()) {
-    rgw::sal::Attrs::iterator attr;
-    
-    for (attr = attrs->begin(); attr != attrs->end(); ++attr) {
-      auto it = std::find_if(redisObject.begin(), redisObject.end(),
-        [&](const auto& pair) { return pair.first == attr->first; });
-
-      if (it != redisObject.end()) {
-       int index = std::distance(redisObject.begin(), it);
-       redisObject[index] = {attr->first, attr->second.to_str()};
-      } else {
-       redisObject.push_back(std::make_pair(attr->first, attr->second.to_str()));
-      }
-    }
-  } else {
-    return -1;
-  }
-
-  /* Set copy with new values */
-  key = "rgw-object:" + copy_oid + ":cache";
-
-  try {
-    client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-
-    if (result != "OK") {
-      return -1;
-    }
-  } catch(std::exception &e) {
-    return -1;
-  }
-   
-  return 0;
-}
-
-int RGWD4NCache::delObject(std::string oid) {
-  int result = 0;
-  std::vector<std::string> keys;
-  std::string key = "rgw-object:" + oid + ":cache";
-  keys.push_back(key);
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-
-  if (existKey(key)) {
-    try {
-      client.del(keys, [&result](cpp_redis::reply &reply) {
-       if (reply.is_integer()) {
-         result = reply.as_integer();
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-      
-      return result - 1;
-    } catch(std::exception &e) {
-      return -1;
-    }
-  } else {
-    dout(20) << "RGW D4N Cache: Object is not in cache." << dendl;
-    return -2;
-  }
-}
-
-int RGWD4NCache::updateAttr(std::string oid, rgw::sal::Attrs* attr) {
-  std::string result;
-  std::string key = "rgw-object:" + oid + ":cache";
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-  
-  if (existKey(key)) { 
-    try {
-      std::vector< std::pair<std::string, std::string> > redisObject;
-      auto it = attr->begin();
-      redisObject.push_back({it->first, it->second.to_str()});
-
-      client.hmset(key, redisObject, [&result](cpp_redis::reply &reply) {
-       if (!reply.is_null()) {
-         result = reply.as_string();
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-
-      if (result != "OK") {
-       return -1;
-      }
-    } catch(std::exception &e) {
-      return -1;
-    }
-  } else {
-    return -2;
-  }
-
-  return 0;
-}
-
-int RGWD4NCache::delAttrs(std::string oid, std::vector<std::string>& baseFields, std::vector<std::string>& deleteFields) {
-  int result = 0;
-  std::string key = "rgw-object:" + oid + ":cache";
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-
-  if (existKey(key)) {
-    /* Find if attribute doesn't exist */
-    for (const auto& delField : deleteFields) {
-      if (std::find(baseFields.begin(), baseFields.end(), delField) == baseFields.end()) {
-        deleteFields.erase(std::find(deleteFields.begin(), deleteFields.end(), delField));
-      }
-    }
-
-    try {
-      client.hdel(key, deleteFields, [&result](cpp_redis::reply &reply) {
-       if (reply.is_integer()) {
-         result = reply.as_integer();
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-      
-      return result - 1;
-    } catch(std::exception &e) {
-      return -1;
-    }
-  } 
-  
-  dout(20) << "RGW D4N Cache: Object is not in cache." << dendl;
-  return -2;
-}
-
-int RGWD4NCache::appendData(std::string oid, buffer::list& data) {
-  std::string result;
-  std::string value = "";
-  std::string key = "rgw-object:" + oid + ":cache";
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-
-  if (existKey(key)) {
-    try {
-      client.hget(key, "data", [&value](cpp_redis::reply &reply) {
-       if (!reply.is_null()) {
-         value = reply.as_string();
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
-      return -1;
-    }
-  }
-
-  try {
-    /* Append to existing value or set as new value */
-    std::string temp = value + data.to_str();
-    std::vector< std::pair<std::string, std::string> > field;
-    field.push_back({"data", temp});
-
-    client.hmset(key, field, [&result](cpp_redis::reply &reply) {
-      if (!reply.is_null()) {
-        result = reply.as_string();
-      }
-    });
-
-    client.sync_commit(std::chrono::milliseconds(1000));
-
-    if (result != "OK") {
-      return -1;
-    }
-  } catch(std::exception &e) {
-    return -1;
-  }
-
-  return 0;
-}
-
-int RGWD4NCache::deleteData(std::string oid) {
-  int result = 0;
-  std::string key = "rgw-object:" + oid + ":cache";
-  std::vector<std::string> deleteField;
-  deleteField.push_back("data");
-
-  if (!client.is_connected()) {
-    findClient(&client);
-  }
-
-  if (existKey(key)) {
-    int field_exist = -1;
-
-    try {
-      client.hget(key, "data", [&field_exist](cpp_redis::reply &reply) {
-       if (!reply.is_null()) {
-         field_exist = 0;
-       }
-      });
-
-      client.sync_commit(std::chrono::milliseconds(1000));
-    } catch(std::exception &e) {
-      return -1;
-    }
-
-    if (field_exist == 0) {
-      try {
-       client.hdel(key, deleteField, [&result](cpp_redis::reply &reply) {
-         if (reply.is_integer()) {
-           result = reply.as_integer(); /* Returns 1 upon success */
-         }
-       });
-
-       client.sync_commit(std::chrono::milliseconds(1000));
-
-        return result - 1;
-      } catch(std::exception &e) {
-       return -1;
-      }
-    } else {
-      return -1;
-    }
-  } else {
-    return 0; /* No delete was necessary */
-  }
-}
diff --git a/src/rgw/driver/d4n/d4n_datacache.h b/src/rgw/driver/d4n/d4n_datacache.h
deleted file mode 100644 (file)
index 5faf7b6..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-#ifndef CEPH_RGWD4NCACHE_H
-#define CEPH_RGWD4NCACHE_H
-
-#include "rgw_common.h"
-#include <cpp_redis/cpp_redis>
-#include <string>
-#include <iostream>
-
-class RGWD4NCache {
-  public:
-    CephContext *cct;
-
-    RGWD4NCache() {}
-    RGWD4NCache(std::string cacheHost, int cachePort):host(cacheHost), port(cachePort) {}
-
-    void init(CephContext *_cct) {
-      cct = _cct;
-      host = cct->_conf->rgw_d4n_host;
-      port = cct->_conf->rgw_d4n_port;
-    }
-
-    int findClient(cpp_redis::client *client);
-    int existKey(std::string key);
-    int setObject(std::string oid, rgw::sal::Attrs* attrs);
-    int getObject(std::string oid, rgw::sal::Attrs* newAttrs, std::vector< std::pair<std::string, std::string> >* newMetadata);
-    int copyObject(std::string original_oid, std::string copy_oid, rgw::sal::Attrs* attrs);
-    int delObject(std::string oid);
-    int updateAttr(std::string oid, rgw::sal::Attrs* attr);
-    int delAttrs(std::string oid, std::vector<std::string>& baseFields, std::vector<std::string>& deleteFields);
-    int appendData(std::string oid, buffer::list& data);
-    int deleteData(std::string oid);
-
-  private:
-    cpp_redis::client client;
-    std::string host = "";
-    int port = 0;
-    std::vector< std::pair<std::string, std::string> > buildObject(rgw::sal::Attrs* binary);
-};
-
-#endif
index 9666729553396d478fd2aed0474cc19efed825d0..441c4bc149282cd129579a5867d31d23b328ba33 100644 (file)
@@ -1,18 +1,21 @@
 #include "d4n_directory.h"
+#include <time.h>
 
 #define dout_subsys ceph_subsys_rgw
 #define dout_context g_ceph_context
 
-int RGWBlockDirectory::findClient(cpp_redis::client *client) {
+namespace rgw { namespace d4n {
+
+int ObjectDirectory::find_client(cpp_redis::client* client) {
   if (client->is_connected())
     return 0;
 
-  if (host == "" || port == 0) {
+   if (addr.host == "" || addr.port == 0) {
     dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
     return EDESTADDRREQ;
   }
 
-  client->connect(host, port, nullptr);
+  client->connect(addr.host, addr.port, nullptr);
 
   if (!client->is_connected())
     return ECONNREFUSED;
@@ -20,12 +23,12 @@ int RGWBlockDirectory::findClient(cpp_redis::client *client) {
   return 0;
 }
 
-std::string RGWBlockDirectory::buildIndex(cache_block *ptr) {
-  return "rgw-object:" + ptr->c_obj.obj_name + ":directory";
+std::string ObjectDirectory::build_index(CacheObj* object) {
+  return "rgw-object:" + object->objName + ":object-directory";
 }
 
-int RGWBlockDirectory::existKey(std::string key) {
-  int result = -1;
+int ObjectDirectory::exist_key(std::string key) {
+  int result = 0;
   std::vector<std::string> keys;
   keys.push_back(key);
   
@@ -36,7 +39,7 @@ int RGWBlockDirectory::existKey(std::string key) {
   try {
     client.exists(keys, [&result](cpp_redis::reply &reply) {
       if (reply.is_integer()) {
-        result = reply.as_integer(); /* Returns 1 upon success */
+        result = reply.as_integer();
       }
     });
     
@@ -46,31 +49,209 @@ int RGWBlockDirectory::existKey(std::string key) {
   return result;
 }
 
-int RGWBlockDirectory::setValue(cache_block *ptr) {
-  /* Creating the index based on obj_name */
-  std::string key = buildIndex(ptr);
+int ObjectDirectory::set_value(CacheObj* object) {
+  /* Creating the index based on objName */
+  std::string result;
+  std::string key = build_index(object);
   if (!client.is_connected()) { 
-    findClient(&client);
+    find_client(&client);
   }
 
-  std::string result;
+  /* Every set will be new */
+  if (addr.host == "" || addr.port == 0) {
+    dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
+    return -2;
+  }
+    
+  std::string endpoint = addr.host + ":" + std::to_string(addr.port);
+  std::vector< std::pair<std::string, std::string> > list;
+    
+  /* Creating a list of the entry's properties */
+  list.push_back(make_pair("key", key));
+  list.push_back(make_pair("objName", object->objName));
+  list.push_back(make_pair("bucketName", object->bucketName));
+  list.push_back(make_pair("creationTime", std::to_string(object->creationTime)));
+  list.push_back(make_pair("dirty", std::to_string(object->dirty)));
+  list.push_back(make_pair("hosts", endpoint)); 
+
+  try {
+    client.hmset(key, list, [&result](cpp_redis::reply &reply) {
+      if (!reply.is_null()) {
+        result = reply.as_string();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+
+    if (result != "OK") {
+      return -1;
+    }
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return 0;
+}
+
+int ObjectDirectory::get_value(CacheObj* object) {
+  int keyExist = -2;
+  std::string key = build_index(object);
+
+  if (!client.is_connected()) {
+    find_client(&client);
+  }
+
+  if (exist_key(key)) {
+    std::string key;
+    std::string objName;
+    std::string bucketName;
+    std::string creationTime;
+    std::string dirty;
+    std::string hosts;
+    std::vector<std::string> fields;
+
+    fields.push_back("key");
+    fields.push_back("objName");
+    fields.push_back("bucketName");
+    fields.push_back("creationTime");
+    fields.push_back("dirty");
+    fields.push_back("hosts");
+
+    try {
+      client.hmget(key, fields, [&key, &objName, &bucketName, &creationTime, &dirty, &hosts, &keyExist](cpp_redis::reply &reply) {
+        if (reply.is_array()) {
+         auto arr = reply.as_array();
+
+         if (!arr[0].is_null()) {
+           keyExist = 0;
+           key = arr[0].as_string();
+           objName = arr[1].as_string();
+           bucketName = arr[2].as_string();
+           creationTime = arr[3].as_string();
+           dirty = arr[4].as_string();
+           hosts = arr[5].as_string();
+         }
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+
+      if (keyExist < 0) {
+        return keyExist;
+      }
+
+      /* Currently, there can only be one host */
+      object->objName = objName;
+      object->bucketName = bucketName;
+
+      struct std::tm tm;
+      std::istringstream(creationTime) >> std::get_time(&tm, "%T");
+      strptime(creationTime.c_str(), "%T", &tm); // Need to check formatting -Sam
+      object->creationTime = mktime(&tm);
+
+      std::istringstream(dirty) >> object->dirty;
+    } catch(std::exception &e) {
+      keyExist = -1;
+    }
+  }
+
+  return keyExist;
+}
+
+int ObjectDirectory::del_value(CacheObj* object) {
+  int result = 0;
+  std::vector<std::string> keys;
+  std::string key = build_index(object);
+  keys.push_back(key);
+  
+  if (!client.is_connected()) {
+    find_client(&client);
+  }
+  
+  if (exist_key(key)) {
+    try {
+      client.del(keys, [&result](cpp_redis::reply &reply) {
+        if (reply.is_integer()) {
+          result = reply.as_integer();
+        }
+      });
+       
+      client.sync_commit(std::chrono::milliseconds(1000));     
+      return result - 1;
+    } catch(std::exception &e) {
+      return -1;
+    }
+  } else {
+    return -2;
+  }
+}
+
+int BlockDirectory::find_client(cpp_redis::client* client) {
+  if (client->is_connected())
+    return 0;
+
+   if (addr.host == "" || addr.port == 0) {
+    dout(10) << "RGW D4N Directory: D4N directory endpoint was not configured correctly" << dendl;
+    return EDESTADDRREQ;
+  }
+
+  client->connect(addr.host, addr.port, nullptr);
+
+  if (!client->is_connected())
+    return ECONNREFUSED;
+
+  return 0;
+}
+
+std::string BlockDirectory::build_index(CacheBlock* block) {
+  return "rgw-object:" + block->cacheObj.objName + ":block-directory";
+}
+
+int BlockDirectory::exist_key(std::string key) {
+  int result = 0;
   std::vector<std::string> keys;
   keys.push_back(key);
+  
+  if (!client.is_connected()) {
+    return result;
+  }
+
+  try {
+    client.exists(keys, [&result](cpp_redis::reply &reply) {
+      if (reply.is_integer()) {
+        result = reply.as_integer();
+      }
+    });
+    
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {}
+
+  return result;
+}
+
+int BlockDirectory::set_value(CacheBlock* block) {
+  /* Creating the index based on objName */
+  std::string result;
+  std::string key = build_index(block);
+  if (!client.is_connected()) { 
+    find_client(&client);
+  }
 
   /* Every set will be new */
-  if (host == "" || port == 0) {
+  if (addr.host == "" || addr.port == 0) {
     dout(10) << "RGW D4N Directory: Directory endpoint not configured correctly" << dendl;
-    return -1;
+    return -2;
   }
     
-  std::string endpoint = host + ":" + std::to_string(port);
-  std::vector<std::pair<std::string, std::string>> list;
+  std::string endpoint = addr.host + ":" + std::to_string(addr.port);
+  std::vector< std::pair<std::string, std::string> > list;
     
-  /* Creating a list of key's properties */
+  /* Creating a list of the entry's properties */
   list.push_back(make_pair("key", key));
-  list.push_back(make_pair("size", std::to_string(ptr->size_in_bytes)));
-  list.push_back(make_pair("bucket_name", ptr->c_obj.bucket_name));
-  list.push_back(make_pair("obj_name", ptr->c_obj.obj_name));
+  list.push_back(make_pair("size", std::to_string(block->size)));
+  list.push_back(make_pair("globalWeight", std::to_string(block->globalWeight)));
+  list.push_back(make_pair("bucketName", block->cacheObj.bucketName));
+  list.push_back(make_pair("objName", block->cacheObj.objName));
   list.push_back(make_pair("hosts", endpoint)); 
 
   try {
@@ -81,7 +262,7 @@ int RGWBlockDirectory::setValue(cache_block *ptr) {
     });
 
     client.sync_commit(std::chrono::milliseconds(1000));
-    
+
     if (result != "OK") {
       return -1;
     }
@@ -92,88 +273,149 @@ int RGWBlockDirectory::setValue(cache_block *ptr) {
   return 0;
 }
 
-int RGWBlockDirectory::getValue(cache_block *ptr) {
-  std::string key = buildIndex(ptr);
+int BlockDirectory::get_value(CacheBlock* block) {
+  int keyExist = -2;
+  std::string key = build_index(block);
 
   if (!client.is_connected()) {
-    findClient(&client);
+    find_client(&client);
   }
 
-  if (existKey(key)) {
-    int field_exist = -1;
-    
+  if (exist_key(key)) {
     std::string hosts;
     std::string size;
-    std::string bucket_name;
-    std::string obj_name;
+    std::string bucketName;
+    std::string objName;
     std::vector<std::string> fields;
 
     fields.push_back("key");
     fields.push_back("hosts");
     fields.push_back("size");
-    fields.push_back("bucket_name");
-    fields.push_back("obj_name");
+    fields.push_back("bucketName");
+    fields.push_back("objName");
 
     try {
-      client.hmget(key, fields, [&key, &hosts, &size, &bucket_name, &obj_name, &field_exist](cpp_redis::reply &reply) {
+      client.hmget(key, fields, [&key, &hosts, &size, &bucketName, &objName, &keyExist](cpp_redis::reply &reply) {
         if (reply.is_array()) {
          auto arr = reply.as_array();
 
          if (!arr[0].is_null()) {
-           field_exist = 0;
+           keyExist = 0;
            key = arr[0].as_string();
            hosts = arr[1].as_string();
            size = arr[2].as_string();
-           bucket_name = arr[3].as_string();
-           obj_name = arr[4].as_string();
+           bucketName = arr[3].as_string();
+           objName = arr[4].as_string();
          }
        }
       });
 
       client.sync_commit(std::chrono::milliseconds(1000));
 
-      if (field_exist < 0) {
-        return field_exist;
+      if (keyExist < 0 ) {
+        return keyExist;
       }
 
-      /* Currently, there can only be one host */
-      ptr->size_in_bytes = std::stoi(size);
-      ptr->c_obj.bucket_name = bucket_name;
-      ptr->c_obj.obj_name = obj_name;
+      /* Currently, there can only be one host */ // update -Sam
+      block->size = std::stoi(size);
+      block->cacheObj.bucketName = bucketName;
+      block->cacheObj.objName = objName;
     } catch(std::exception &e) {
-      return -1;
+      keyExist = -1;
     }
   }
 
-  return 0;
+  return keyExist;
 }
 
-int RGWBlockDirectory::delValue(cache_block *ptr) {
+int BlockDirectory::del_value(CacheBlock* block) {
   int result = 0;
   std::vector<std::string> keys;
-  std::string key = buildIndex(ptr);
+  std::string key = build_index(block);
   keys.push_back(key);
   
   if (!client.is_connected()) {
-    findClient(&client);
+    find_client(&client);
   }
   
-  if (existKey(key)) {
+  if (exist_key(key)) {
     try {
       client.del(keys, [&result](cpp_redis::reply &reply) {
         if (reply.is_integer()) {
-          result = reply.as_integer(); /* Returns 1 upon success */
+          result = reply.as_integer();
         }
       });
        
       client.sync_commit(std::chrono::milliseconds(1000));     
-
       return result - 1;
     } catch(std::exception &e) {
       return -1;
     }
   } else {
-    dout(20) << "RGW D4N Directory: Block is not in directory." << dendl;
     return -2;
   }
 }
+
+int BlockDirectory::update_field(CacheBlock* block, std::string field, std::string value) { // represent in cache block too -Sam
+  std::string result;
+  std::string key = build_index(block);
+
+  if (!client.is_connected()) {
+    find_client(&client);
+  }
+  
+  if (exist_key(key)) {
+    if (field == "hostsList") {
+      /* Append rather than overwrite */
+      std::string hosts;
+
+      try {
+        client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
+          if (!reply.is_null()) {
+            hosts = reply.as_string();
+          }
+        });
+
+        client.sync_commit(std::chrono::milliseconds(1000));
+      } catch(std::exception &e) {
+        return -1;
+      }
+      
+      value += "_";
+      value += hosts;
+    }
+
+    /* Update cache block */ // Remove ones that aren't used -Sam
+    if (field == "size")
+      block->size = std::stoi(value);
+    else if (field == "bucketName")
+      block->cacheObj.bucketName = value;
+    else if (field == "objName")
+      block->cacheObj.objName = value;
+    else if (field == "hostsList")
+      block->hostsList.push_back(value);
+
+    std::vector< std::pair<std::string, std::string> > list;
+    list.push_back(std::make_pair(field, value));
+
+    try {
+      client.hmset(key, list, [&result](cpp_redis::reply &reply) {
+       if (!reply.is_null()) {
+         result = reply.as_string();
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+
+      if (result != "OK") {
+       return -1;
+      }
+    } catch(std::exception &e) {
+      return -1;
+    }
+  }
+
+  return 0;
+}
+
+} } // namespace rgw::d4n
index 95596db660ba9b9e6b10fb6b2922a41770021c20..fd2690db14137e7aff6b0dbf455665155afc29f6 100644 (file)
@@ -1,53 +1,99 @@
-#ifndef CEPH_RGWD4NDIRECTORY_H
-#define CEPH_RGWD4NDIRECTORY_H
+#ifndef CEPH_D4NDIRECTORY_H
+#define CEPH_D4NDIRECTORY_H
 
 #include "rgw_common.h"
 #include <cpp_redis/cpp_redis>
 #include <string>
 #include <iostream>
 
-struct cache_obj {
-  std::string bucket_name; /* s3 bucket name */
-  std::string obj_name; /* s3 obj name */
+namespace rgw { namespace d4n {
+
+struct Address {
+  std::string host;
+  int port;
+};
+
+struct CacheObj {
+  std::string objName; /* S3 object name */
+  std::string bucketName; /* S3 bucket name */
+  time_t creationTime; // Creation time of the S3 Object
+  bool dirty;
+  std::vector<std::string> hostsList; /* Currently not supported: list of hostnames <ip:port> of object locations for multiple backends */
 };
 
-struct cache_block {
-  cache_obj c_obj;
-  uint64_t size_in_bytes; /* block size_in_bytes */
-  std::vector<std::string> hosts_list; /* Currently not supported: list of hostnames <ip:port> of block locations */
+struct CacheBlock {
+  CacheObj cacheObj;
+  uint64_t size; /* Block size in bytes */
+  int globalWeight = 0;
+  std::vector<std::string> hostsList; /* Currently not supported: list of hostnames <ip:port> of block locations */
 };
 
-class RGWDirectory {
+class Directory {
   public:
-    RGWDirectory() {}
-    CephContext *cct;
+    Directory() {}
+    CephContext* cct;
+};
+
+class ObjectDirectory: public Directory { // where else should object directory be called? -Sam
+  public:
+    ObjectDirectory() {}
+    ObjectDirectory(std::string host, int port) {
+      addr.host = host;
+      addr.port = port;
+    }
+
+    void init(CephContext* _cct) {
+      cct = _cct;
+      addr.host = cct->_conf->rgw_d4n_host;
+      addr.port = cct->_conf->rgw_d4n_port;
+    }
+
+    int find_client(cpp_redis::client* client);
+    int exist_key(std::string key);
+    Address get_addr() { return addr; }
+
+    int set_value(CacheObj* object);
+    int get_value(CacheObj* object);
+    int copy_value(CacheObj* object, CacheObj* copyObject);
+    int del_value(CacheObj* object);
+
+  private:
+    cpp_redis::client client;
+    Address addr;
+    std::string build_index(CacheObj* object);
 };
 
-class RGWBlockDirectory: RGWDirectory {
+class BlockDirectory: public Directory {
   public:
-    RGWBlockDirectory() {}
-    RGWBlockDirectory(std::string blockHost, int blockPort):host(blockHost), port(blockPort) {}
+    BlockDirectory() {}
+    BlockDirectory(std::string host, int port) {
+      addr.host = host;
+      addr.port = port;
+    }
     
-    void init(CephContext *_cct) {
+    void init(CephContext_cct) {
       cct = _cct;
-      host = cct->_conf->rgw_d4n_host;
-      port = cct->_conf->rgw_d4n_port;
+      addr.host = cct->_conf->rgw_d4n_host;
+      addr.port = cct->_conf->rgw_d4n_port;
     }
        
-    int findClient(cpp_redis::client *client);
-    int existKey(std::string key);
-    int setValue(cache_block *ptr);
-    int getValue(cache_block *ptr);
-    int delValue(cache_block *ptr);
+    int find_client(cpp_redis::client* client);
+    int exist_key(std::string key);
+    Address get_addr() { return addr; }
 
-    std::string get_host() { return host; }
-    int get_port() { return port; }
+    int set_value(CacheBlock* block);
+    int get_value(CacheBlock* block);
+    int copy_value(CacheBlock* block, CacheBlock* copyBlock);
+    int del_value(CacheBlock* block);
+
+    int update_field(CacheBlock* block, std::string field, std::string value);
 
   private:
     cpp_redis::client client;
-    std::string buildIndex(cache_block *ptr);
-    std::string host = "";
-    int port = 0;
+    Address addr;
+    std::string build_index(CacheBlock* block);
 };
 
+} } // namespace rgw::d4n
+
 #endif
diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc
new file mode 100644 (file)
index 0000000..22ea996
--- /dev/null
@@ -0,0 +1,404 @@
+#include "d4n_policy.h"
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
+
+namespace rgw { namespace d4n {
+
+int CachePolicy::find_client(const DoutPrefixProvider* dpp) {
+  if (client.is_connected())
+    return 0;
+
+  if (get_addr().host == "" || get_addr().port == 0) {
+    ldpp_dout(dpp, 10) << "RGW D4N Policy: D4N policy endpoint was not configured correctly" << dendl;
+    return EDESTADDRREQ;
+  }
+
+  client.connect(get_addr().host, get_addr().port, nullptr);
+
+  if (!client.is_connected())
+    return ECONNREFUSED;
+
+  return 0;
+}
+
+int CachePolicy::exist_key(std::string key) {
+  int result = -1;
+  std::vector<std::string> keys;
+  keys.push_back(key);
+
+  if (!client.is_connected()) {
+    return result;
+  }
+
+  try {
+    client.exists(keys, [&result](cpp_redis::reply &reply) {
+      if (reply.is_integer()) {
+        result = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {}
+
+  return result;
+}
+
+int LFUDAPolicy::set_age(int age) {
+  int result = 0;
+
+  try {
+    client.hset("lfuda", "age", std::to_string(age), [&result](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+       result = reply.as_integer(); 
+      }
+    }); 
+
+    client.sync_commit();
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return result - 1;
+}
+
+int LFUDAPolicy::get_age() {
+  int ret = 0;
+  int age = -1;
+
+  try {
+    client.hexists("lfuda", "age", [&ret](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+        ret = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  if (!ret) {
+    ret = set_age(0); /* Initialize age */
+
+    if (!ret) {
+      return 0; /* Success */
+    } else {
+      return -1;
+    };
+  }
+
+  try {
+    client.hget("lfuda", "age", [&age](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+        age = std::stoi(reply.as_string());
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return age;
+}
+
+int LFUDAPolicy::set_global_weight(std::string key, int weight) {
+  int result = 0;
+
+  try {
+    client.hset(key, "globalWeight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+       result = reply.as_integer();
+      }
+    }); 
+
+    client.sync_commit();
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return result - 1;
+}
+
+int LFUDAPolicy::get_global_weight(std::string key) {
+  int weight = -1;
+
+  try {
+    client.hget(key, "globalWeight", [&weight](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+       weight = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return weight;
+}
+
+int LFUDAPolicy::set_min_avg_weight(int weight, std::string cacheLocation) {
+  int result = 0;
+
+  try {
+    client.hset("lfuda", "minAvgWeight:cache", cacheLocation, [&result](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+       result = reply.as_integer();
+      }
+    }); 
+
+    client.sync_commit();
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  if (result == 1) {
+    result = 0;
+    try {
+      client.hset("lfuda", "minAvgWeight:weight", std::to_string(weight), [&result](cpp_redis::reply& reply) {
+       if (!reply.is_null()) {
+         result = reply.as_integer();
+       }
+      }); 
+
+      client.sync_commit();
+    } catch(std::exception &e) {
+      return -1;
+    }
+  }
+
+  return result - 1;
+}
+
+int LFUDAPolicy::get_min_avg_weight() {
+  int ret = 0;
+  int weight = -1;
+
+  try {
+    client.hexists("lfuda", "minAvgWeight:cache", [&ret](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+        ret = reply.as_integer();
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  if (!ret) {
+    ret = set_min_avg_weight(INT_MAX, ""/* local cache location or keep empty? */); /* Initialize minimum average weight */
+
+    if (!ret) {
+      return INT_MAX; /* Success */
+    } else {
+      return -1;
+    };
+  }
+
+  try {
+    client.hget("lfuda", "minAvgWeight:weight", [&weight](cpp_redis::reply& reply) {
+      if (!reply.is_null()) {
+        weight = std::stoi(reply.as_string());
+      }
+    });
+
+    client.sync_commit(std::chrono::milliseconds(1000));
+  } catch(std::exception &e) {
+    return -1;
+  }
+
+  return weight;
+}
+
+CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
+  std::vector<rgw::cache::Entry> entries = cacheNode->list_entries(dpp);
+  std::string victimName;
+  int minWeight = INT_MAX;
+
+  for (auto it = entries.begin(); it != entries.end(); ++it) {
+    std::string localWeightStr = cacheNode->get_attr(dpp, it->key, "localWeight"); // should represent block -Sam
+
+    if (!std::stoi(localWeightStr)) { // maybe do this in some sort of initialization procedure instead of here? -Sam
+      /* Local weight hasn't been set */
+      int ret = cacheNode->set_attr(dpp, it->key, "localWeight", std::to_string(get_age())); 
+
+      if (ret < 0)
+       return {};
+    } else if (std::stoi(localWeightStr) < minWeight) {
+      minWeight = std::stoi(localWeightStr);
+      victimName = it->key;
+    }
+  }
+
+  /* Get victim cache block */
+  CacheBlock victimBlock;
+  victimBlock.cacheObj.objName = victimName;
+  BlockDirectory blockDir;
+  blockDir.init(cct);
+
+  int ret = blockDir.get_value(&victimBlock);
+
+  if (ret < 0)
+    return {};
+
+  return victimBlock;
+}
+
+int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) {
+  std::string key = "rgw-object:" + block->cacheObj.objName + ":directory";
+  std::string localWeightStr = cacheNode->get_attr(dpp, block->cacheObj.objName, "localWeight"); // change to block name eventually -Sam
+  int localWeight = -1;
+
+  if (!client.is_connected())
+    find_client(dpp);
+
+  if (localWeightStr.empty()) { // figure out where to set local weight -Sam
+    /* Local weight hasn't been set */
+    int ret = cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(get_age())); 
+    localWeight = 0;
+
+    if (ret < 0)
+      return -1;
+  } else {
+    localWeight = std::stoi(localWeightStr);
+  }
+
+  int age = get_age();
+
+  if (cacheNode->key_exists(dpp, block->cacheObj.objName)) { /* Local copy */
+    localWeight += age;
+  } else {
+    std::string hosts;
+    uint64_t freeSpace = cacheNode->get_free_space(dpp);
+
+    while (freeSpace < block->size)
+      freeSpace += eviction(dpp, cacheNode);
+
+    if (exist_key(key)) {
+      try {
+       client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
+         if (!reply.is_null()) {
+           hosts = reply.as_string();
+         }
+       });
+
+       client.sync_commit(std::chrono::milliseconds(1000));
+      } catch(std::exception &e) {
+       return -1;
+      }
+    } else {
+      return -2; 
+    }
+
+    // should not hold local cache IP if in this else statement -Sam
+    if (hosts.length() > 0) { /* Remote copy */
+      int globalWeight = get_global_weight(key);
+      globalWeight += age;
+      
+      if (set_global_weight(key, globalWeight))
+       return -1;
+    } else { /* No remote copy */
+      // do I need to add the block to the local cache here? -Sam
+      // update hosts list for block as well? check read workflow -Sam
+      localWeight += age;
+      return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(localWeight));
+    }
+  }
+  return 0;
+}
+
+uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) {
+  CacheBlock victim = find_victim(dpp, cacheNode);
+
+  if (victim.cacheObj.objName.empty()) {
+    ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
+    return -1;
+  }
+
+  std::string key = "rgw-object:" + victim.cacheObj.objName + ":directory";
+  std::string hosts;
+  int globalWeight = get_global_weight(key);
+  int localWeight = std::stoi(cacheNode->get_attr(dpp, victim.cacheObj.objName, "localWeight")); // change to block name eventually -Sam
+  int avgWeight = get_min_avg_weight();
+
+  if (exist_key(key)) {
+    try {
+      client.hget(key, "hostsList", [&hosts](cpp_redis::reply& reply) {
+       if (!reply.is_null()) {
+         hosts = reply.as_string();
+       }
+      });
+
+      client.sync_commit(std::chrono::milliseconds(1000));
+    } catch(std::exception &e) {
+      return -1;
+    }
+  } else {
+    return -2; 
+  }
+
+  if (hosts.empty()) { /* Last copy */
+    if (globalWeight > 0) {
+      localWeight += globalWeight;
+      int ret = cacheNode->set_attr(dpp, victim.cacheObj.objName, "localWeight", std::to_string(localWeight));
+
+      if (!ret)
+        ret = set_global_weight(key, 0);
+      else
+        return -1;
+
+      if (ret)
+       return -1;
+    }
+
+    if (avgWeight < 0)
+      return -1;
+
+    if (localWeight > avgWeight) {
+      // push block to remote cache
+    }
+  }
+
+  globalWeight += localWeight;
+
+  if (set_global_weight(key, globalWeight))
+    return -2;
+
+  ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
+  int ret = cacheNode->delete_data(dpp, victim.cacheObj.objName);
+
+  if (!ret) {
+    ret = set_min_avg_weight(avgWeight - (localWeight/cacheNode->get_num_entries(dpp)), ""/*local cache location*/); // Where else must this be set? -Sam
+
+    if (!ret) {
+      int age = get_age();
+      age = std::max(localWeight, age);
+      ret = set_age(age);
+      
+      if (ret)
+       return -1;
+    } else {
+      return -1;
+    }
+  } else {
+    return -1;
+  }
+
+  return victim.size;
+}
+
+int PolicyDriver::init() {
+  rgw::cache::Partition partition_info;
+  cacheDriver = new rgw::cache::RedisDriver(partition_info, "127.0.0.1", 6379); // hardcoded for now -Sam
+
+  if (policyName == "lfuda") {
+    cachePolicy = new LFUDAPolicy();
+    return 0;
+  } else
+    return -1;
+}
+
+} } // namespace rgw::d4n
diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h
new file mode 100644 (file)
index 0000000..15765c9
--- /dev/null
@@ -0,0 +1,76 @@
+#ifndef CEPH_D4NPOLICY_H
+#define CEPH_D4NPOLICY_H
+
+#include <string>
+#include <iostream>
+#include <cpp_redis/cpp_redis>
+#include "rgw_common.h"
+#include "d4n_directory.h"
+#include "../../rgw_redis_driver.h"
+
+namespace rgw { namespace d4n {
+
+class CachePolicy {
+  private:
+    cpp_redis::client client;
+    Address addr;
+
+  public:
+    CephContext* cct;
+
+    CachePolicy() : addr() {}
+    virtual ~CachePolicy() = default;
+
+    virtual void init(CephContext *_cct) {
+      cct = _cct;
+      addr.host = cct->_conf->rgw_d4n_host;
+      addr.port = cct->_conf->rgw_d4n_port;
+    }
+    virtual int find_client(const DoutPrefixProvider* dpp) = 0;
+    virtual int exist_key(std::string key) = 0;
+    virtual Address get_addr() { return addr; }
+    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0;
+    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0;
+};
+
+class LFUDAPolicy : public CachePolicy {
+  private:
+    cpp_redis::client client;
+
+  public:
+    LFUDAPolicy() : CachePolicy() {}
+
+    int set_age(int age);
+    int get_age();
+    int set_global_weight(std::string key, int weight);
+    int get_global_weight(std::string key);
+    int set_min_avg_weight(int weight, std::string cacheLocation);
+    int get_min_avg_weight();
+    CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode);
+
+    virtual int find_client(const DoutPrefixProvider* dpp) override { return CachePolicy::find_client(dpp); }
+    virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); }
+    virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
+    virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
+};
+
+class PolicyDriver {
+  private:
+    std::string policyName;
+
+  public:
+    CachePolicy* cachePolicy;
+    rgw::cache::CacheDriver* cacheDriver; // might place elsewhere -Sam
+
+    PolicyDriver(std::string _policyName) : policyName(_policyName) {}
+    ~PolicyDriver() {
+      delete cachePolicy;
+      delete cacheDriver;
+    }
+
+    int init();
+};
+
+} } // namespace rgw::d4n
+
+#endif
index e2624690ebdd6d3977a73b151ac979f3884112c1..9e5474ed472df9f84fa977c5554e548ab63db187 100644 (file)
@@ -39,8 +39,13 @@ static inline Object* nextObject(Object* t)
 int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
 {
   FilterDriver::initialize(cct, dpp);
-  blk_dir->init(cct);
-  d4n_cache->init(cct);
+
+  objDir->init(cct);
+  blockDir->init(cct);
+
+  policyDriver->init();
+  policyDriver->cachePolicy->init(cct);
+  policyDriver->cacheDriver->initialize(cct, dpp);
   
   return 0;
 }
@@ -94,6 +99,25 @@ int D4NFilterObject::copy_object(User* user,
                               const DoutPrefixProvider* dpp,
                               optional_yield y)
 {
+  /* Build cache block copy */
+  rgw::d4n::CacheBlock* copyCacheBlock = new rgw::d4n::CacheBlock(); // How will this copy work in lfuda? -Sam
+
+  copyCacheBlock->hostsList.push_back(driver->get_cache_block()->hostsList[0]); 
+  copyCacheBlock->size = driver->get_cache_block()->size;
+  copyCacheBlock->size = driver->get_cache_block()->globalWeight; // Do we want to reset the global weight? -Sam
+  copyCacheBlock->cacheObj.bucketName = dest_bucket->get_name();
+  copyCacheBlock->cacheObj.objName = dest_object->get_key().get_oid();
+  
+  int copy_valueReturn = driver->get_block_dir()->set_value(copyCacheBlock);
+
+  if (copy_valueReturn < 0) {
+    ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation failed." << dendl;
+  } else {
+    ldpp_dout(dpp, 20) << "D4N Filter: Block directory copy operation succeeded." << dendl;
+  }
+
+  delete copyCacheBlock;
+
   /* Append additional metadata to attributes */
   rgw::sal::Attrs baseAttrs = this->get_attrs();
   buffer::list bl;
@@ -130,13 +154,20 @@ int D4NFilterObject::copy_object(User* user,
     baseAttrs.insert(attrs.begin(), attrs.end()); 
   }
 
-  int copyObjReturn = filter->get_d4n_cache()->copyObject(this->get_key().get_oid(), dest_object->get_key().get_oid(), &baseAttrs);
+  /*
+  int copy_attrsReturn = driver->get_policy_driver()->cacheDriver->copy_attrs(this->get_key().get_oid(), dest_object->get_key().get_oid(), &baseAttrs);
 
-  if (copyObjReturn < 0) {
-    ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation failed." << dendl;
+  if (copy_attrsReturn < 0) {
+    ldpp_dout(dpp, 20) << "D4N Filter: Cache copy attributes operation failed." << dendl;
   } else {
-    ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation succeeded." << dendl;
-  }
+    int copy_dataReturn = driver->get_policy_driver()->cacheDriver->copy_data(this->get_key().get_oid(), dest_object->get_key().get_oid());
+
+    if (copy_dataReturn < 0) {
+      ldpp_dout(dpp, 20) << "D4N Filter: Cache copy data operation failed." << dendl;
+    } else {
+      ldpp_dout(dpp, 20) << "D4N Filter: Cache copy object operation succeeded." << dendl;
+    }
+  }*/
 
   return next->copy_object(user, info, source_zone,
                            nextObject(dest_object),
@@ -162,9 +193,9 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr
       }
     }
 
-    int updateAttrsReturn = filter->get_d4n_cache()->setObject(this->get_key().get_oid(), setattrs);
+    int update_attrsReturn = driver->get_policy_driver()->cacheDriver->set_attrs(dpp, this->get_key().get_oid(), *setattrs);
 
-    if (updateAttrsReturn < 0) {
+    if (update_attrsReturn < 0) {
       ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation failed." << dendl;
     } else {
       ldpp_dout(dpp, 20) << "D4N Filter: Cache set object attributes operation succeeded." << dendl;
@@ -172,25 +203,19 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr
   }
 
   if (delattrs != NULL) {
-    std::vector<std::string> delFields;
-    Attrs::iterator attrs;
+    Attrs::iterator attr;
+    Attrs currentattrs = this->get_attrs();
 
-    /* Extract fields from delattrs */
-    for (attrs = delattrs->begin(); attrs != delattrs->end(); ++attrs) {
-      delFields.push_back(attrs->first);
+    /* Ensure all delAttrs exist */
+    for (const auto& attr : *delattrs) {
+      if (std::find(currentattrs.begin(), currentattrs.end(), attr) == currentattrs.end()) {
+       delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
+      }
     }
 
-    Attrs currentattrs = this->get_attrs();
-    std::vector<std::string> currentFields;
-    
-    /* Extract fields from current attrs */
-    for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) {
-      currentFields.push_back(attrs->first);
-    }
-    
-    int delAttrsReturn = filter->get_d4n_cache()->delAttrs(this->get_key().get_oid(), currentFields, delFields);
+    int del_attrsReturn = driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, this->get_key().get_oid(), *delattrs);
 
-    if (delAttrsReturn < 0) {
+    if (del_attrsReturn < 0) {
       ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation failed." << dendl;
     } else {
       ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attributes operation succeeded." << dendl;
@@ -203,20 +228,17 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr
 int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                                 rgw_obj* target_obj)
 {
-  rgw::sal::Attrs newAttrs;
-  std::vector< std::pair<std::string, std::string> > newMetadata;
-  int getAttrsReturn = filter->get_d4n_cache()->getObject(this->get_key().get_oid(), 
-                                                 &newAttrs, 
-                                                 &newMetadata);
+  rgw::sal::Attrs attrs;
+  int get_attrsReturn = driver->get_policy_driver()->cacheDriver->get_attrs(dpp, this->get_key().get_oid(), attrs);
 
-  if (getAttrsReturn < 0) {
+  if (get_attrsReturn < 0) {
     ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl;
 
     return next->get_obj_attrs(y, dpp, target_obj);
   } else {
-    int setAttrsReturn = this->set_attrs(newAttrs);
+    int set_attrsReturn = this->set_attrs(attrs);
     
-    if (setAttrsReturn < 0) {
+    if (set_attrsReturn < 0) {
       ldpp_dout(dpp, 20) << "D4N Filter: Cache get object attributes operation failed." << dendl;
 
       return next->get_obj_attrs(y, dpp, target_obj);
@@ -233,9 +255,9 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va
 {
   Attrs update;
   update[(std::string)attr_name] = attr_val;
-  int updateAttrsReturn = filter->get_d4n_cache()->updateAttr(this->get_key().get_oid(), &update);
+  int update_attrsReturn = driver->get_policy_driver()->cacheDriver->update_attrs(dpp, this->get_key().get_oid(), update);
 
-  if (updateAttrsReturn < 0) {
+  if (update_attrsReturn < 0) {
     ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation failed." << dendl;
   } else {
     ldpp_dout(dpp, 20) << "D4N Filter: Cache modify object attribute operation succeeded." << dendl;
@@ -247,27 +269,26 @@ int D4NFilterObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_va
 int D4NFilterObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name,
                                optional_yield y)
 {
-  std::vector<std::string> delFields;
-  delFields.push_back((std::string)attr_name);
-  
-  Attrs::iterator attrs;
+  buffer::list bl;
+  Attrs delattr;
+  delattr.insert({attr_name, bl});
   Attrs currentattrs = this->get_attrs();
-  std::vector<std::string> currentFields;
-  
-  /* Extract fields from current attrs */
-  for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) {
-    currentFields.push_back(attrs->first);
-  }
-  
-  int delAttrReturn = filter->get_d4n_cache()->delAttrs(this->get_key().get_oid(), currentFields, delFields);
+  rgw::sal::Attrs::iterator attr = delattr.begin();
 
-  if (delAttrReturn < 0) {
-    ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation failed." << dendl;
-  } else {
-    ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation succeeded." << dendl;
-  }
-  
-  return next->delete_obj_attrs(dpp, attr_name, y);  
+  /* Ensure delAttr exists */
+  if (std::find_if(currentattrs.begin(), currentattrs.end(),
+        [&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) {
+    int delAttrReturn = driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, this->get_key().get_oid(), delattr);
+
+    if (delAttrReturn < 0) {
+      ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation failed." << dendl;
+    } else {
+      ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object attribute operation succeeded." << dendl;
+    }
+  } else 
+    return next->delete_obj_attrs(dpp, attr_name, y);  
+
+  return 0;
 }
 
 std::unique_ptr<Object> D4NFilterDriver::get_object(const rgw_obj_key& k)
@@ -306,19 +327,10 @@ std::unique_ptr<Object::DeleteOp> D4NFilterObject::get_delete_op()
 
 int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp)
 {
-  int getDirReturn = source->filter->get_block_dir()->getValue(source->filter->get_cache_block());
-
-  if (getDirReturn < 0) {
-    ldpp_dout(dpp, 20) << "D4N Filter: Directory get operation failed." << dendl;
-  } else {
-    ldpp_dout(dpp, 20) << "D4N Filter: Directory get operation succeeded." << dendl;
-  }
-
-  rgw::sal::Attrs newAttrs;
-  std::vector< std::pair<std::string, std::string> > newMetadata;
-  int getObjReturn = source->filter->get_d4n_cache()->getObject(source->get_key().get_oid(), 
-                                                       &newAttrs, 
-                                                       &newMetadata);
+  rgw::sal::Attrs attrs;
+  int getObjReturn = source->driver->get_policy_driver()->cacheDriver->get_attrs(dpp, 
+                                                                        source->get_key().get_oid(), 
+                                                                        attrs);
 
   int ret = next->prepare(y, dpp);
   
@@ -327,55 +339,197 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
   } else {
     /* Set metadata locally */
     RGWObjState* astate;
+    RGWQuotaInfo quota_info;
+    std::unique_ptr<rgw::sal::User> user = source->driver->get_user(source->get_bucket()->get_owner());
     source->get_obj_state(dpp, &astate, y);
 
-    for (auto it = newMetadata.begin(); it != newMetadata.end(); ++it) {
-      if (!std::strcmp(it->first.data(), "mtime")) {
-        parse_time(it->second.data(), &astate->mtime); 
-      } else if (!std::strcmp(it->first.data(), "object_size")) {
-       source->set_obj_size(std::stoull(it->second));
-      } else if (!std::strcmp(it->first.data(), "accounted_size")) {
-       astate->accounted_size = std::stoull(it->second);
-      } else if (!std::strcmp(it->first.data(), "epoch")) {
-       astate->epoch = std::stoull(it->second);
-      } else if (!std::strcmp(it->first.data(), "version_id")) {
-       source->set_instance(it->second);
-      } else if (!std::strcmp(it->first.data(), "source_zone_short_id")) {
-       astate->zone_short_id = static_cast<uint32_t>(std::stoul(it->second));
+    for (auto it = attrs.begin(); it != attrs.end(); ++it) {
+      if (it->second.length() > 0) { // or return? -Sam
+       if (it->first == "mtime") {
+         parse_time(it->second.c_str(), &astate->mtime);
+         attrs.erase(it->first);
+       } else if (it->first == "object_size") {
+         source->set_obj_size(std::stoull(it->second.c_str()));
+         attrs.erase(it->first);
+       } else if (it->first == "accounted_size") {
+         astate->accounted_size = std::stoull(it->second.c_str());
+         attrs.erase(it->first);
+       } else if (it->first == "epoch") {
+         astate->epoch = std::stoull(it->second.c_str());
+         attrs.erase(it->first);
+       } else if (it->first == "version_id") {
+         source->set_instance(it->second.c_str());
+       attrs.erase(it->first);
+      } else if (it->first == "source_zone_short_id") {
+       astate->zone_short_id = static_cast<uint32_t>(std::stoul(it->second.c_str()));
+       attrs.erase(it->first);
+      } else if (it->first == "user_quota.max_size") {
+        quota_info.max_size = std::stoull(it->second.c_str());
+       attrs.erase(it->first);
+      } else if (it->first == "user_quota.max_objects") {
+        quota_info.max_objects = std::stoull(it->second.c_str());
+       attrs.erase(it->first);
+      } else if (it->first == "max_buckets") {
+        user->set_max_buckets(std::stoull(it->second.c_str()));
+       attrs.erase(it->first);
       }
     }
-
+    user->set_info(quota_info);
     source->set_obj_state(*astate);
    
     /* Set attributes locally */
-    int setAttrsReturn = source->set_attrs(newAttrs);
+    int set_attrsReturn = source->set_attrs(attrs);
 
-    if (setAttrsReturn < 0) {
+    if (set_attrsReturn < 0) {
       ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation failed." << dendl;
     } else {
       ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation succeeded." << dendl;
     }   
   }
+  }
 
   return ret;
 }
 
+int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
+                        RGWGetDataCB* cb, optional_yield y) 
+{
+  /* Execute cache replacement policy */
+  int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(), 
+                   source->driver->get_policy_driver()->cacheDriver);
+  
+  if (policyRet < 0) {
+    ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation failed." << dendl;
+  } else {
+    ldpp_dout(dpp, 20) << "D4N Filter: Cache replacement operation succeeded." << dendl;
+  }
+
+  int ret = -1;
+  bufferlist bl;
+  uint64_t len = end - ofs + 1;
+  std::string oid(source->get_name());
+  
+  /* Local cache check */
+  if (source->driver->get_policy_driver()->cacheDriver->key_exists(dpp, oid)) { // Entire object for now -Sam
+    ret = source->driver->get_policy_driver()->cacheDriver->get(dpp, source->get_key().get_oid(), ofs, len, bl, source->get_attrs());
+    cb->handle_data(bl, ofs, len);
+  } else {
+    /* Block directory check */
+    int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block()); 
+
+    if (getDirReturn >= -1) {
+      if (getDirReturn == -1) {
+        ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
+      } else {
+        ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation succeeded." << dendl;
+      }
+
+      // remote cache get
+
+      /* Cache block locally */
+      ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
+
+      if (!ret) {
+       int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+
+       if (updateValueReturn < 0) {
+         ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
+       } else {
+         ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation succeeded." << dendl;
+       }
+       
+       cb->handle_data(bl, ofs, len);
+      }
+    } else {
+      /* Write tier retrieval */
+      ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
+      getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj));
+
+      if (getDirReturn >= -1) {
+       if (getDirReturn == -1) {
+         ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl;
+       } else {
+         ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation succeeded." << dendl;
+       }
+       
+       // retrieve from write back cache, which will be stored as a cache driver instance in the filter
+
+       /* Cache block locally */
+       ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
+
+       if (!ret) {
+         int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+
+         if (updateValueReturn < 0) {
+           ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
+         } else {
+           ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation succeeded." << dendl;
+         }
+         
+         cb->handle_data(bl, ofs, len);
+       }
+      } else {
+       /* Backend store retrieval */
+       ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl;
+       ret = next->iterate(dpp, ofs, end, cb, y);
+
+       if (!ret) {
+         /* Cache block locally */
+         ret = source->driver->get_policy_driver()->cacheDriver->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
+
+         /* Store block in directory */
+         rgw::d4n::BlockDirectory* tempBlockDir = source->driver->get_block_dir(); // remove later -Sam
+
+         source->driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); // local cache address -Sam 
+         source->driver->get_cache_block()->size = source->get_obj_size();
+         source->driver->get_cache_block()->cacheObj.bucketName = source->get_bucket()->get_name();
+         source->driver->get_cache_block()->cacheObj.objName = source->get_key().get_oid();
+
+         int setDirReturn = tempBlockDir->set_value(source->driver->get_cache_block());
+
+         if (setDirReturn < 0) {
+           ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
+         } else {
+           ldpp_dout(dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+         }
+       }
+      }
+    }
+  }
+
+  if (ret < 0) 
+    ldpp_dout(dpp, 20) << "D4N Filter: Cache iterate operation failed." << dendl;
+
+  return next->iterate(dpp, ofs, end, cb, y); 
+}
+
 int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
                                                    optional_yield y, uint32_t flags)
 {
-  int delDirReturn = source->filter->get_block_dir()->delValue(source->filter->get_cache_block());
+  int delDirReturn = source->driver->get_block_dir()->del_value(source->driver->get_cache_block());
 
   if (delDirReturn < 0) {
-    ldpp_dout(dpp, 20) << "D4N Filter: Directory delete operation failed." << dendl;
+    ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation failed." << dendl;
   } else {
-    ldpp_dout(dpp, 20) << "D4N Filter: Directory delete operation succeeded." << dendl;
+    ldpp_dout(dpp, 20) << "D4N Filter: Block directory delete operation succeeded." << dendl;
   }
 
-  int delObjReturn = source->filter->get_d4n_cache()->delObject(source->get_key().get_oid());
+  Attrs::iterator attrs;
+  Attrs currentattrs = source->get_attrs();
+  std::vector<std::string> currentFields;
+  
+  /* Extract fields from current attrs */
+  for (attrs = currentattrs.begin(); attrs != currentattrs.end(); ++attrs) {
+    currentFields.push_back(attrs->first);
+  }
+
+  int delObjReturn = source->driver->get_policy_driver()->cacheDriver->delete_data(dpp, source->get_key().get_oid());
 
   if (delObjReturn < 0) {
-    ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation failed." << dendl;
+    ldpp_dout(dpp, 20) << "D4N Filter: Cache delete object operation failed." << dendl;
   } else {
+    Attrs delattrs = source->get_attrs();
+    delObjReturn = source->driver->get_policy_driver()->cacheDriver->delete_attrs(dpp, source->get_key().get_oid(), delattrs);
     ldpp_dout(dpp, 20) << "D4N Filter: Cache delete operation succeeded." << dendl;
   }
 
@@ -384,9 +538,9 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
 
 int D4NFilterWriter::prepare(optional_yield y) 
 {
-  int delDataReturn = filter->get_d4n_cache()->deleteData(obj->get_key().get_oid()); 
+  int del_dataReturn = driver->get_policy_driver()->cacheDriver->delete_data(save_dpp, obj->get_key().get_oid());
 
-  if (delDataReturn < 0) {
+  if (del_dataReturn < 0) {
     ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation failed." << dendl;
   } else {
     ldpp_dout(save_dpp, 20) << "D4N Filter: Cache delete data operation succeeded." << dendl;
@@ -397,9 +551,9 @@ int D4NFilterWriter::prepare(optional_yield y)
 
 int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
 {
-  int appendDataReturn = filter->get_d4n_cache()->appendData(obj->get_key().get_oid(), data);
+  int append_dataReturn = driver->get_policy_driver()->cacheDriver->append_data(save_dpp, obj->get_key().get_oid(), data);
 
-  if (appendDataReturn < 0) {
+  if (append_dataReturn < 0) {
     ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl;
   } else {
     ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl;
@@ -418,34 +572,33 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
                        const req_context& rctx,
                        uint32_t flags)
 {
-  cache_block* temp_cache_block = filter->get_cache_block();
-  RGWBlockDirectory* temp_block_dir = filter->get_block_dir();
+  rgw::d4n::BlockDirectory* tempBlockDir = driver->get_block_dir();
 
-  temp_cache_block->hosts_list.push_back(temp_block_dir->get_host() + ":" + std::to_string(temp_block_dir->get_port())); 
-  temp_cache_block->size_in_bytes = accounted_size;
-  temp_cache_block->c_obj.bucket_name = obj->get_bucket()->get_name();
-  temp_cache_block->c_obj.obj_name = obj->get_key().get_oid();
+  driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); 
+  driver->get_cache_block()->size = accounted_size;
+  driver->get_cache_block()->cacheObj.bucketName = obj->get_bucket()->get_name();
+  driver->get_cache_block()->cacheObj.objName = obj->get_key().get_oid();
 
-  int setDirReturn = temp_block_dir->setValue(temp_cache_block);
+  int setDirReturn = tempBlockDir->set_value(driver->get_cache_block());
 
   if (setDirReturn < 0) {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Directory set operation failed." << dendl;
+    ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation failed." << dendl;
   } else {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Directory set operation succeeded." << dendl;
+    ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
   }
    
   /* Retrieve complete set of attrs */
-  RGWObjState* astate;
   int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs,
                        delete_at, if_match, if_nomatch, user_data, zones_trace,
                        canceled, rctx, flags);
   obj->get_obj_attrs(rctx.y, save_dpp, NULL);
-  obj->get_obj_state(save_dpp, &astate, rctx.y);
 
   /* Append additional metadata to attributes */ 
   rgw::sal::Attrs baseAttrs = obj->get_attrs();
   rgw::sal::Attrs attrs_temp = baseAttrs;
   buffer::list bl;
+  RGWObjState* astate;
+  obj->get_obj_state(save_dpp, &astate, rctx.y);
 
   bl.append(to_iso_8601(obj->get_mtime()));
   baseAttrs.insert({"mtime", bl});
@@ -486,12 +639,12 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
 
   baseAttrs.insert(attrs.begin(), attrs.end());
 
-  int setObjReturn = filter->get_d4n_cache()->setObject(obj->get_key().get_oid(), &baseAttrs);
+  int set_attrsReturn = driver->get_policy_driver()->cacheDriver->set_attrs(save_dpp, obj->get_key().get_oid(), baseAttrs);
 
-  if (setObjReturn < 0) {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set operation failed." << dendl;
+  if (set_attrsReturn < 0) {
+    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation failed." << dendl;
   } else {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set operation succeeded." << dendl;
+    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache set attributes operation succeeded." << dendl;
   }
   
   return ret;
@@ -509,4 +662,3 @@ rgw::sal::Driver* newD4NFilter(rgw::sal::Driver* next)
 }
 
 }
-
index 840eb99f6048f3dee56cdbd016e6fbab941d53b4..7a978fbec24c01d3638aaee412b4f963fb1e12e3 100644 (file)
 #include "rgw_role.h"
 #include "common/dout.h" 
 
+#include "rgw_redis_driver.h"
 #include "driver/d4n/d4n_directory.h"
-#include "driver/d4n/d4n_datacache.h"
+#include "driver/d4n/d4n_policy.h"
 
 namespace rgw { namespace sal {
 
 class D4NFilterDriver : public FilterDriver {
   private:
-    RGWBlockDirectory* blk_dir;
-    cache_block* c_blk;
-    RGWD4NCache* d4n_cache;
+    rgw::d4n::ObjectDirectory* objDir;
+    rgw::d4n::BlockDirectory* blockDir;
+    rgw::d4n::CacheBlock* cacheBlock;
+    rgw::d4n::PolicyDriver* policyDriver;
 
   public:
     D4NFilterDriver(Driver* _next) : FilterDriver(_next) 
     {
-      blk_dir = new RGWBlockDirectory(); /* Initialize directory address with cct */
-      c_blk = new cache_block();
-      d4n_cache = new RGWD4NCache();
+      objDir = new rgw::d4n::ObjectDirectory();
+      blockDir = new rgw::d4n::BlockDirectory();
+      cacheBlock = new rgw::d4n::CacheBlock();
+      policyDriver = new rgw::d4n::PolicyDriver("lfuda");
     }
     virtual ~D4NFilterDriver() {
-      delete blk_dir; 
-      delete c_blk;
-      delete d4n_cache;
+      delete objDir; 
+      delete blockDir; 
+      delete cacheBlock;
+      delete policyDriver;
     }
 
     virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override;
@@ -57,19 +61,20 @@ class D4NFilterDriver : public FilterDriver {
                                  const rgw_placement_rule *ptail_placement_rule,
                                  uint64_t olh_epoch,
                                  const std::string& unique_tag) override;
-    RGWBlockDirectory* get_block_dir() { return blk_dir; }
-    cache_block* get_cache_block() { return c_blk; }
-    RGWD4NCache* get_d4n_cache() { return d4n_cache; }
+    rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir; }
+    rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; }
+    rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; }
+    rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; }
 };
 
 class D4NFilterUser : public FilterUser {
   private:
-    D4NFilterDriver* filter;
+    D4NFilterDriver* driver;
 
   public:
-    D4NFilterUser(std::unique_ptr<User> _next, D4NFilterDriver* _filter) : 
+    D4NFilterUser(std::unique_ptr<User> _next, D4NFilterDriver* _driver) : 
       FilterUser(std::move(_next)),
-      filter(_filter) {}
+      driver(_driver) {}
     virtual ~D4NFilterUser() = default;
 };
 
@@ -91,7 +96,7 @@ class D4NFilterBucket : public FilterBucket {
 
 class D4NFilterObject : public FilterObject {
   private:
-    D4NFilterDriver* filter;
+    D4NFilterDriver* driver;
 
   public:
     struct D4NFilterReadOp : FilterReadOp {
@@ -102,6 +107,8 @@ class D4NFilterObject : public FilterObject {
       virtual ~D4NFilterReadOp() = default;
 
       virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
+      virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
+        RGWGetDataCB* cb, optional_yield y) override;
     };
 
     struct D4NFilterDeleteOp : FilterDeleteOp {
@@ -114,12 +121,12 @@ class D4NFilterObject : public FilterObject {
       virtual int delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) override;
     };
 
-    D4NFilterObject(std::unique_ptr<Object> _next, D4NFilterDriver* _filter) : FilterObject(std::move(_next)),
-                                                                             filter(_filter) {}
-    D4NFilterObject(std::unique_ptr<Object> _next, Bucket* _bucket, D4NFilterDriver* _filter) : FilterObject(std::move(_next), _bucket),
-                                                                                              filter(_filter) {}
-    D4NFilterObject(D4NFilterObject& _o, D4NFilterDriver* _filter) : FilterObject(_o),
-                                                                   filter(_filter) {}
+    D4NFilterObject(std::unique_ptr<Object> _next, D4NFilterDriver* _driver) : FilterObject(std::move(_next)),
+                                                                             driver(_driver) {}
+    D4NFilterObject(std::unique_ptr<Object> _next, Bucket* _bucket, D4NFilterDriver* _driver) : FilterObject(std::move(_next), _bucket),
+                                                                                              driver(_driver) {}
+    D4NFilterObject(D4NFilterObject& _o, D4NFilterDriver* _driver) : FilterObject(_o),
+                                                                   driver(_driver) {}
     virtual ~D4NFilterObject() = default;
 
     virtual int copy_object(User* user,
@@ -153,18 +160,18 @@ class D4NFilterObject : public FilterObject {
 
 class D4NFilterWriter : public FilterWriter {
   private:
-    D4NFilterDriver* filter; 
+    D4NFilterDriver* driver; 
     const DoutPrefixProvider* save_dpp;
     bool atomic;
 
   public:
-    D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _filter, Object* _obj, 
+    D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj, 
        const DoutPrefixProvider* _dpp) : FilterWriter(std::move(_next), _obj),
-                                         filter(_filter),
+                                         driver(_driver),
                                          save_dpp(_dpp), atomic(false) {}
-    D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _filter, Object* _obj, 
+    D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj, 
        const DoutPrefixProvider* _dpp, bool _atomic) : FilterWriter(std::move(_next), _obj),
-                                                       filter(_filter),
+                                                       driver(_driver),
                                                        save_dpp(_dpp), atomic(_atomic) {}
     virtual ~D4NFilterWriter() = default;
 
index 710340f0a259b6ca3558c8de0b86960d5acffcde..5ad87d23e8f29edf644f33d18cc0083e88a2e6b1 100644 (file)
@@ -4,6 +4,8 @@
 #pragma once
 
 #include <memory>
+#include "rgw_sal.h"
+#include "rgw_auth.h"
 
 class ActiveRateLimiter;
 class OpsLogSink;
index a5a0f4c66099750bed4f14e8a9d2d9589cc68863..e76a354e0c571722955008b96ea49764c7f8197a 100644 (file)
@@ -58,11 +58,10 @@ int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) {
   if (client.is_connected())
     return 0;
 
-  /*
   if (addr.host == "" || addr.port == 0) {
     dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
     return EDESTADDRREQ;
-  }*/
+  }
 
   client.connect("127.0.0.1", 6379, nullptr);
 
@@ -683,4 +682,19 @@ int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key,
   return result;
 }
 
+std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp)
+{
+    return std::make_unique<RedisCacheAioRequest>(this);
+}
+
+rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
+{
+    rgw_raw_obj r_obj;
+    r_obj.oid = key;
+    return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
+}
+
+void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
+void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
+
 } } // namespace rgw::cal
index 9f1bffe213aee4f771009b3913ddabf0f9a07a63..b92c191e27b5a2cea486848682b88d217b561272 100644 (file)
 
 namespace rgw { namespace cache {
 
+class RedisDriver;
+
+class RedisCacheAioRequest: public CacheAioRequest {
+public:
+  RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {}
+  virtual ~RedisCacheAioRequest() = default;
+  virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+  virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+private:
+  RedisDriver* cache_driver;
+};
+
 class RedisDriver : public CacheDriver {
   private:
     cpp_redis::client client;
+    rgw::d4n::Address addr;
     std::unordered_map<std::string, Entry> entries;
 
+    int find_client(const DoutPrefixProvider* dpp);
     int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len);
     int remove_entry(const DoutPrefixProvider* dpp, std::string key);
     std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
 
   public:
-    RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {}
+    RedisDriver(Partition& _partition_info, std::string host, int port) : CacheDriver() {
+      addr.host = host;
+      addr.port = port;
+    }
 
     virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
     virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
     virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override;
+    virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
     virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) override;
     virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override;
     virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
@@ -38,10 +56,13 @@ class RedisDriver : public CacheDriver {
     virtual bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) override;
     virtual std::vector<Entry> list_entries(const DoutPrefixProvider* dpp) override;
     virtual size_t get_num_entries(const DoutPrefixProvider* dpp) override;
+    int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam
 
     /* Partition */
     virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override;
     virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override;
+
+    virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
 };
 
 } } // namespace rgw::cal
index d9dd1bde60339c921c7365dbd5c2dd37dfba9b17..0f6e1745d713ca4ba6148315cac7cf2727ec2dac 100644 (file)
@@ -20,6 +20,7 @@
 #include <sstream>
 
 #include "common/errno.h"
+//#include "common/dout.h"
 
 #include "rgw_sal.h"
 #include "rgw_sal_rados.h"
@@ -44,6 +45,7 @@
 #endif
 
 #define dout_subsys ceph_subsys_rgw
+//#define dout_context g_ceph_context
 
 extern "C" {
 extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context);
index 8aaf2acb0a6cb1e1a0df5f533aacd3b78018df08..c69d2d4ecb1a1bb9c8c1cbb0bc88f661af2ce769 100644 (file)
@@ -1,4 +1,4 @@
-#include "d4n_directory.h"
+#include "../rgw/driver/d4n/d4n_directory.h" // Fix -Sam
 #include "rgw_process_env.h"
 #include <cpp_redis/cpp_redis>
 #include <iostream>
@@ -10,65 +10,65 @@ using namespace std;
 string portStr;
 string hostStr;
 string redisHost = "";
-string oid = "samoid";
+string oid = "testName";
 string bucketName = "testBucket";
-int blkSize = 123;
+int blockSize = 123;
 
 class DirectoryFixture: public ::testing::Test {
   protected:
     virtual void SetUp() {
-      blk_dir = new RGWBlockDirectory(hostStr, stoi(portStr));
-      c_blk = new cache_block();
+      blockDir = new rgw::d4n::BlockDirectory(hostStr, stoi(portStr));
+      cacheBlock = new rgw::d4n::CacheBlock();
 
-      c_blk->hosts_list.push_back(redisHost);
-      c_blk->size_in_bytes = blkSize; 
-      c_blk->c_obj.bucket_name = bucketName;
-      c_blk->c_obj.obj_name = oid;
+      cacheBlock->hostsList.push_back(redisHost);
+      cacheBlock->size = blockSize; 
+      cacheBlock->cacheObj.bucketName = bucketName;
+      cacheBlock->cacheObj.objName = oid;
     } 
 
     virtual void TearDown() {
-      delete blk_dir;
-      blk_dir = nullptr;
+      delete blockDir;
+      blockDir = nullptr;
 
-      delete c_blk;
-      c_blk = nullptr;
+      delete cacheBlock;
+      cacheBlock = nullptr;
     }
 
-    RGWBlockDirectory* blk_dir;
-    cache_block* c_blk;
+    rgw::d4n::BlockDirectory* blockDir;
+    rgw::d4n::CacheBlock* cacheBlock;
 };
 
 /* Successful initialization */
 TEST_F(DirectoryFixture, DirectoryInit) {
-  ASSERT_NE(blk_dir, nullptr);
-  ASSERT_NE(c_blk, nullptr);
+  ASSERT_NE(blockDir, nullptr);
+  ASSERT_NE(cacheBlock, nullptr);
   ASSERT_NE(redisHost.length(), (long unsigned int)0);
 }
 
-/* Successful setValue Call and Redis Check */
+/* Successful set_value Call and Redis Check */
 TEST_F(DirectoryFixture, SetValueTest) {
   cpp_redis::client client;
   int key_exist = -1;
   string key;
   string hosts;
   string size;
-  string bucket_name;
-  string obj_name;
+  string bucketName;
+  string objName;
   std::vector<std::string> fields;
-  int setReturn = blk_dir->setValue(c_blk);
+  int setReturn = blockDir->set_value(cacheBlock);
 
   ASSERT_EQ(setReturn, 0);
 
   fields.push_back("key");
   fields.push_back("hosts");
   fields.push_back("size");
-  fields.push_back("bucket_name");
-  fields.push_back("obj_name");
+  fields.push_back("bucketName");
+  fields.push_back("objName");
 
   client.connect(hostStr, stoi(portStr), nullptr, 0, 5, 1000);
   ASSERT_EQ((bool)client.is_connected(), (bool)1);
 
-  client.hmget("rgw-object:" + oid + ":directory", fields, [&key, &hosts, &size, &bucket_name, &obj_name, &key_exist](cpp_redis::reply& reply) {
+  client.hmget("rgw-object:" + oid + ":block-directory", fields, [&key, &hosts, &size, &bucketName, &objName, &key_exist](cpp_redis::reply& reply) {
     auto arr = reply.as_array();
 
     if (!arr[0].is_null()) {
@@ -76,47 +76,47 @@ TEST_F(DirectoryFixture, SetValueTest) {
       key = arr[0].as_string();
       hosts = arr[1].as_string();
       size = arr[2].as_string();
-      bucket_name = arr[3].as_string();
-      obj_name = arr[4].as_string();
+      bucketName = arr[3].as_string();
+      objName = arr[4].as_string();
     }
   });
 
   client.sync_commit();
 
   EXPECT_EQ(key_exist, 0);
-  EXPECT_EQ(key, "rgw-object:" + oid + ":directory");
+  EXPECT_EQ(key, "rgw-object:" + oid + ":block-directory");
   EXPECT_EQ(hosts, redisHost);
-  EXPECT_EQ(size, to_string(blkSize));
-  EXPECT_EQ(bucket_name, bucketName);
-  EXPECT_EQ(obj_name, oid);
+  EXPECT_EQ(size, to_string(blockSize));
+  EXPECT_EQ(bucketName, bucketName);
+  EXPECT_EQ(objName, oid);
 
   client.flushall();
 }
 
-/* Successful getValue Calls and Redis Check */
+/* Successful get_value Calls and Redis Check */
 TEST_F(DirectoryFixture, GetValueTest) {
   cpp_redis::client client;
   int key_exist = -1;
   string key;
   string hosts;
   string size;
-  string bucket_name;
-  string obj_name;
+  string bucketName;
+  string objName;
   std::vector<std::string> fields;
-  int setReturn = blk_dir->setValue(c_blk);
+  int setReturn = blockDir->set_value(cacheBlock);
 
   ASSERT_EQ(setReturn, 0);
 
   fields.push_back("key");
   fields.push_back("hosts");
   fields.push_back("size");
-  fields.push_back("bucket_name");
-  fields.push_back("obj_name");
+  fields.push_back("bucketName");
+  fields.push_back("objName");
 
   client.connect(hostStr, stoi(portStr), nullptr, 0, 5, 1000);
   ASSERT_EQ((bool)client.is_connected(), (bool)1);
 
-  client.hmget("rgw-object:" + oid + ":directory", fields, [&key, &hosts, &size, &bucket_name, &obj_name, &key_exist](cpp_redis::reply& reply) {
+  client.hmget("rgw-object:" + oid + ":block-directory", fields, [&key, &hosts, &size, &bucketName, &objName, &key_exist](cpp_redis::reply& reply) {
     auto arr = reply.as_array();
 
     if (!arr[0].is_null()) {
@@ -124,47 +124,47 @@ TEST_F(DirectoryFixture, GetValueTest) {
       key = arr[0].as_string();
       hosts = arr[1].as_string();
       size = arr[2].as_string();
-      bucket_name = arr[3].as_string();
-      obj_name = arr[4].as_string();
+      bucketName = arr[3].as_string();
+      objName = arr[4].as_string();
     }
   });
 
   client.sync_commit();
 
   EXPECT_EQ(key_exist, 0);
-  EXPECT_EQ(key, "rgw-object:" + oid + ":directory");
+  EXPECT_EQ(key, "rgw-object:" + oid + ":block-directory");
   EXPECT_EQ(hosts, redisHost);
-  EXPECT_EQ(size, to_string(blkSize));
-  EXPECT_EQ(bucket_name, bucketName);
-  EXPECT_EQ(obj_name, oid);
+  EXPECT_EQ(size, to_string(blockSize));
+  EXPECT_EQ(bucketName, bucketName);
+  EXPECT_EQ(objName, oid);
 
   /* Check if object name in directory instance matches redis update */
-  client.hset("rgw-object:" + oid + ":directory", "obj_name", "newoid", [](cpp_redis::reply& reply) {
-    if (reply.is_integer()) {
-      ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */
+  client.hset("rgw-object:" + oid + ":block-directory", "objName", "newoid", [](cpp_redis::reply& reply) {
+    if (!reply.is_null()) {
+      ASSERT_EQ(reply.as_integer(), 0);
     }
   });
 
   client.sync_commit();
 
-  int getReturn = blk_dir->getValue(c_blk);
+  int getReturn = blockDir->get_value(cacheBlock);
 
   ASSERT_EQ(getReturn, 0);
-  EXPECT_EQ(c_blk->c_obj.obj_name, "newoid");
+  EXPECT_EQ(cacheBlock->cacheObj.objName, "newoid");
 
   client.flushall();
 }
 
-/* Successful delValue Call and Redis Check */
+/* Successful del_value Call and Redis Check */
 TEST_F(DirectoryFixture, DelValueTest) {
   cpp_redis::client client;
   vector<string> keys;
-  int setReturn = blk_dir->setValue(c_blk);
+  int setReturn = blockDir->set_value(cacheBlock);
 
   ASSERT_EQ(setReturn, 0);
 
-  /* Ensure cache entry exists in cache before deletion */
-  keys.push_back("rgw-object:" + oid + ":directory");
+  /* Ensure entry exists in directory before deletion */
+  keys.push_back("rgw-object:" + oid + ":block-directory");
 
   client.exists(keys, [](cpp_redis::reply& reply) {
     if (reply.is_integer()) {
@@ -172,13 +172,13 @@ TEST_F(DirectoryFixture, DelValueTest) {
     }
   });
 
-  int delReturn = blk_dir->delValue(c_blk);
+  int delReturn = blockDir->del_value(cacheBlock);
 
   ASSERT_EQ(delReturn, 0);
 
   client.exists(keys, [](cpp_redis::reply& reply) {
     if (reply.is_integer()) {
-      ASSERT_EQ(reply.as_integer(), 0); /* Zero keys exist */
+      ASSERT_EQ(reply.as_integer(), 0);  /* Zero keys exist */
     }
   });
 
index 9a8b9950bd227b26b3366dcbfb1a261d9df03438..b5c382c119e6a5cb58aed9d94039e8cb15de8e6b 100644 (file)
@@ -6,7 +6,7 @@
 #include <cpp_redis/cpp_redis>
 #include "driver/dbstore/common/dbstore.h"
 #include "rgw_sal_store.h"
-#include "driver/d4n/rgw_sal_d4n.h"
+#include "../../rgw/driver/d4n/rgw_sal_d4n.h" // fix -Sam
 
 #include "rgw_sal.h"
 #include "rgw_auth.h"
@@ -176,7 +176,10 @@ class D4NFilterFixture : public ::testing::Test {
       rgw_zone_set zones_trace;
       bool canceled;
       
-      int ret = testWriter->complete(accounted_size, etag,
+      int ret = testWriter->prepare(null_yield);
+         
+      if (!ret) {
+       ret = testWriter->complete(accounted_size, etag,
                        &mtime, set_mtime,
                        attrs,
                        delete_at,
@@ -219,7 +222,7 @@ TEST_F(D4NFilterFixture, CreateBucket) {
 TEST_F(D4NFilterFixture, PutObject) {
   cpp_redis::client client;
   vector<string> fields;
-  fields.push_back("test_attrs_key_0");
+  fields.push_back("test_attrs_key_PutObject");
   clientSetUp(&client); 
 
   ASSERT_EQ(createUser(), 0);
@@ -375,7 +378,6 @@ TEST_F(D4NFilterFixture, CopyObjectNone) {
 
 TEST_F(D4NFilterFixture, CopyObjectReplace) {
   cpp_redis::client client;
-  vector<string> fields;
   clientSetUp(&client); 
 
   createUser();
@@ -469,29 +471,52 @@ TEST_F(D4NFilterFixture, CopyObjectReplace) {
 
   client.sync_commit();
 
-  /* Check copy */
-  client.hgetall("rgw-object:test_object_copy:cache", [](cpp_redis::reply& reply) {
+  /* Retrieve original object's redis data for later comparison */
+  std::vector< std::pair<std::string, std::string> > data;
+  
+  client.hgetall("rgw-object:test_object_CopyObjectReplace:cache", [&data](cpp_redis::reply& reply) {
     auto arr = reply.as_array();
 
     if (!arr[0].is_null()) {
-      EXPECT_EQ((int)arr.size(), 4 + METADATA_LENGTH); /* With etag */
+      for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+       data.push_back({arr[i].as_string(), arr[i + 1].as_string()});
+      }
     }
   });
 
   client.sync_commit();
-  
-  fields.push_back("test_attrs_key_CopyObjectReplace");
-  
-  client.hmget("rgw-object:test_object_copy:cache", fields, [](cpp_redis::reply& reply) {
+
+  /* Check copy */
+  client.hgetall("rgw-object:test_object_copy:cache", [&data](cpp_redis::reply& reply) {
+    bool unexpected = false;
     auto arr = reply.as_array();
 
     if (!arr[0].is_null()) {
-      EXPECT_EQ(arr[0].as_string(), "test_attrs_copy_value");
+      EXPECT_EQ((int)arr.size(), 4 + METADATA_LENGTH); /* With etag */
+
+      for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+        auto it = std::find_if(data.begin(), data.end(),
+         [&](const auto& pair) { return pair.first == arr[i].as_string(); });
+
+       if (it != data.end()) {
+         if (arr[i].as_string() == "test_attrs_key_CopyObjectReplace")
+           EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_copy_value");
+         else if (arr[i].as_string() != "mtime") { /* mtime will be different */
+           int index = std::distance(data.begin(), it);
+           EXPECT_EQ(arr[i + 1].as_string(), data[index].second);
+         }
+       } else if (arr[i].as_string() == "etag") {
+          EXPECT_EQ(arr[i + 1].as_string(), "test_etag_copy");
+       } else
+         unexpected = true; /* Unexpected field */
+      }
+      
+      EXPECT_EQ(unexpected, false);
     }
   });
 
   client.sync_commit();
-
+  
   clientReset(&client);
 }
 
@@ -594,26 +619,52 @@ TEST_F(D4NFilterFixture, CopyObjectMerge) {
 
   client.sync_commit();
 
-  /* Check copy */
-  client.hgetall("rgw-object:test_object_copy:cache", [](cpp_redis::reply& reply) {
+  /* Retrieve original object's redis data for later comparison */
+  std::vector< std::pair<std::string, std::string> > data;
+  
+  client.hgetall("rgw-object:test_object_CopyObjectMerge:cache", [&data](cpp_redis::reply& reply) {
     auto arr = reply.as_array();
 
     if (!arr[0].is_null()) {
-      EXPECT_EQ((int)arr.size(), 6 + METADATA_LENGTH); /* With etag */
+      for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+       data.push_back({arr[i].as_string(), arr[i + 1].as_string()});
+      }
     }
   });
 
   client.sync_commit();
-  
-  fields.push_back("test_attrs_key_CopyObjectMerge");
-  fields.push_back("test_attrs_copy_extra_key");
-  
-  client.hmget("rgw-object:test_object_copy:cache", fields, [](cpp_redis::reply& reply) {
+
+  /* Check copy */
+  client.hgetall("rgw-object:test_object_copy:cache", [&data](cpp_redis::reply& reply) {
+    bool unexpected = false;
+    bool merge = false;
     auto arr = reply.as_array();
 
     if (!arr[0].is_null()) {
-      EXPECT_EQ(arr[0].as_string(), "test_attrs_value_CopyObjectMerge");
-      EXPECT_EQ(arr[1].as_string(), "test_attrs_copy_extra_value");
+      EXPECT_EQ((int)arr.size(), 6 + METADATA_LENGTH); /* With etag */
+
+      for (int i = 0; i < (int)arr.size() - 1; i += 2) {
+        auto it = std::find_if(data.begin(), data.end(),
+         [&](const auto& pair) { return pair.first == arr[i].as_string(); });
+
+       if (it != data.end()) {
+         if (arr[i].as_string() == "test_attrs_key_CopyObjectMerge")
+           EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_value_CopyObjectMerge");
+         else if (arr[i].as_string() != "mtime") { /* mtime will be different */
+           int index = std::distance(data.begin(), it);
+           EXPECT_EQ(arr[i + 1].as_string(), data[index].second);
+         }
+       } else if (arr[i].as_string() == "etag") {
+          EXPECT_EQ(arr[i + 1].as_string(), "test_etag_copy");
+       } else if (arr[i].as_string() == "test_attrs_copy_extra_key") {
+         merge = true; 
+          EXPECT_EQ(arr[i + 1].as_string(), "test_attrs_copy_extra_value");
+       } else
+         unexpected = true; /* Unexpected field */
+      }
+      
+      EXPECT_EQ(unexpected, false);
+      EXPECT_EQ(merge, true);
     }
   });
 
@@ -667,6 +718,130 @@ TEST_F(D4NFilterFixture, DelObject) {
   clientReset(&client);
 }
 
+TEST_F(D4NFilterFixture, CachePolicy) {
+  cpp_redis::client client;
+  clientSetUp(&client); 
+
+  createUser();
+  createBucket();
+
+  /* Create multipart object */
+  string object_name = "test_object_CachePolicy";
+  unique_ptr<rgw::sal::Object> obj = testBucket->get_object(rgw_obj_key(object_name));
+  rgw_user owner;
+  rgw_placement_rule ptail_placement_rule;
+  uint64_t olh_epoch = 123;
+  string unique_tag;
+
+  obj->get_obj_attrs(null_yield, dpp);
+
+  testWriter = driver->get_atomic_writer(dpp, 
+             null_yield,
+             obj.get(),
+             owner,
+             &ptail_placement_rule,
+             olh_epoch,
+             unique_tag);
+
+  size_t accounted_size = 15; /* Uploaded as multipart */
+  string etag("test_etag");
+  ceph::real_time mtime; 
+  ceph::real_time set_mtime;
+
+  buffer::list bl;
+  string tmp = "test_attrs_value_CachePolicy";
+  bl.append("test_attrs_value_CachePolicy");
+  map<string, bufferlist> attrs{{"test_attrs_key_CachePolicy", bl}};
+
+  ceph::real_time delete_at;
+  char if_match;
+  char if_nomatch;
+  string user_data;
+  rgw_zone_set zones_trace;
+  bool canceled;
+  
+  ASSERT_EQ(testWriter->complete(accounted_size, etag,
+                  &mtime, set_mtime,
+                  attrs,
+                  delete_at,
+                  &if_match, &if_nomatch,
+                  &user_data,
+                  &zones_trace, &canceled,
+                  null_yield), 0);
+
+
+  unique_ptr<rgw::sal::Object> testObject_CachePolicy = testBucket->get_object(rgw_obj_key("test_object_CachePolicy"));
+
+  ASSERT_NE(testObject_CachePolicy, nullptr);
+
+  /* Copy to new multipart object */
+  unique_ptr<rgw::sal::Writer> testWriterCopy = nullptr;
+  unique_ptr<rgw::sal::Object> obj_copy = testBucket->get_object(rgw_obj_key("test_object_copy"));
+  uint64_t olh_epoch_copy = 123;
+
+  obj_copy->get_obj_attrs(null_yield, dpp);
+
+  testWriterCopy = driver->get_atomic_writer(dpp, 
+             null_yield,
+             obj_copy.get(),
+             owner,
+             &ptail_placement_rule,
+             olh_epoch_copy,
+             unique_tag);
+
+  RGWEnv rgw_env;
+  req_info info(get_pointer(env->cct), &rgw_env);
+  rgw_zone_id source_zone;
+  rgw_placement_rule dest_placement; 
+  ceph::real_time src_mtime;
+  ceph::real_time mod_ptr;
+  ceph::real_time unmod_ptr;
+  rgw::sal::AttrsMod attrs_mod = rgw::sal::ATTRSMOD_REPLACE;
+  RGWObjCategory category = RGWObjCategory::Main;
+  string tag;
+  
+  ASSERT_EQ(testWriterCopy->complete(accounted_size, etag,
+                  &mtime, set_mtime,
+                  attrs,
+                  delete_at,
+                  &if_match, &if_nomatch,
+                  &user_data,
+                  &zones_trace, &canceled,
+                  null_yield), 0);
+
+  unique_ptr<rgw::sal::Object> testObject_copy = testBucket->get_object(rgw_obj_key("test_object_copy"));
+
+  EXPECT_EQ(testObject_CachePolicy->copy_object(testUser.get(),
+                             &info, source_zone, testObject_copy.get(),
+                             testBucket.get(), testBucket.get(),
+                              dest_placement, &src_mtime, &mtime,
+                             &mod_ptr, &unmod_ptr, false,
+                             &if_match, &if_nomatch, attrs_mod,
+                             false, attrs, category, olh_epoch,
+                             delete_at, NULL, &tag, &etag,
+                             NULL, NULL, dpp, null_yield), 0);
+
+  /* Ensure data field doesn't exist for original object */
+  client.hexists("rgw-object:test_object_CachePolicy:cache", "data", [](cpp_redis::reply& reply) {
+    if (reply.is_integer()) {
+      EXPECT_EQ(reply.as_integer(), 0);
+    }
+  });
+
+  client.sync_commit();
+
+  /* Ensure data field doesn't exist for copy */
+  client.hexists("rgw-object:test_object_CachePolicy:cache", "data", [](cpp_redis::reply& reply) {
+    if (reply.is_integer()) {
+      EXPECT_EQ(reply.as_integer(), 0);
+    }
+  });
+
+  client.sync_commit();
+  
+  clientReset(&client);
+}
+
 /* Attribute-related tests */
 TEST_F(D4NFilterFixture, SetObjectAttrs) {
   cpp_redis::client client;