blockDir = new rgw::d4n::BlockDirectory();
cacheBlock = new rgw::d4n::CacheBlock();
policyDriver = new rgw::d4n::PolicyDriver("lfuda");
+ cache = rgw::sal::LRUCache(cacheDriver);
+
}
D4NFilterDriver::~D4NFilterDriver()
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if (source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) {
+ if (source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if ((part_len != obj_max_req_size) && source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) {
+ if ((part_len != obj_max_req_size) && source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
}
}
+ int ret = 0;
//Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
if (write_to_cache) {
const std::lock_guard l(d3n_get_data.d3n_lock);
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
- filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
+ ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
+ if (ret == 0) {
+ filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()}); //TODO - move to flush, where operation actually completes
+ }
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
ofs += bl_len;
- filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
+ ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
+ if (ret == 0) {
+ filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()});
+ }
} else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
ofs += bl_rem.length();
- filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs());
+ ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs());
+ if (ret == 0) {
+ filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl_rem.length()});
+ }
bl_rem.clear();
bl_rem = std::move(bl);
#include "driver/d4n/d4n_directory.h"
#include "driver/d4n/d4n_policy.h"
+#include <boost/intrusive/list.hpp>
+
namespace rgw { namespace sal {
+// Temporarily added to d4n filter driver - need to figure out the best way to incorporate this as part of Policy Manager
+class LRUCache {
+ public:
+ struct Entry : public boost::intrusive::list_base_hook<> {
+ std::string key;
+ uint64_t offset;
+ uint64_t len;
+ Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {}
+ };
+ private:
+ //The disposer object function
+ struct Entry_delete_disposer {
+ void operator()(Entry *e) {
+ delete e;
+ }
+ };
+ typedef boost::intrusive::list<Entry> List;
+ List entries_lru_list;
+ std::unordered_map<std::string, Entry*> entries_map;
+ rgw::cache::CacheDriver* cacheDriver;
+
+ void evict() {
+ auto p = entries_lru_list.front();
+ entries_map.erase(entries_map.find(p.key));
+ entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
+ }
+
+ public:
+ LRUCache() = default;
+ LRUCache(rgw::cache::CacheDriver* cacheDriver) : cacheDriver(cacheDriver) {} //in case we want to access cache backend apis from here
+
+ void insert(const DoutPrefixProvider* dpp, const Entry& entry) {
+ erase(dpp, entry);
+ //TODO - Get free space using cache api and if there isn't enough space then evict
+ Entry *e = new Entry(entry);
+ entries_lru_list.push_back(*e);
+ entries_map.emplace(entry.key, e);
+ }
+
+ bool erase(const DoutPrefixProvider* dpp, const Entry& entry) {
+ auto p = entries_map.find(entry.key);
+ if (p == entries_map.end()) {
+ return false;
+ }
+ entries_map.erase(p);
+ entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
+ return true;
+ }
+
+ bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) {
+ if (entries_map.count(key) != 0) {
+ return true;
+ }
+ return false;
+ }
+};
class D4NFilterDriver : public FilterDriver {
private:
rgw::d4n::BlockDirectory* blockDir;
rgw::d4n::CacheBlock* cacheBlock;
rgw::d4n::PolicyDriver* policyDriver;
+ rgw::sal::LRUCache cache;
public:
D4NFilterDriver(Driver* _next);
rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; }
rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; }
rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; }
+ rgw::sal::LRUCache& get_lru_cache() { return cache; }
};
class D4NFilterUser : public FilterUser {