return victim.size;
}
+int LRUPolicy::exist_key(std::string key)
+{
+ const std::lock_guard l(lru_lock);
+ if (entries_map.count(key) != 0) {
+ return true;
+ }
+ return false;
+}
+
+int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode)
+{
+ //Does not apply to LRU
+ return 0;
+}
+
+uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode)
+{
+ const std::lock_guard l(lru_lock);
+ auto p = entries_lru_list.front();
+ entries_map.erase(entries_map.find(p.key));
+ entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
+ cacheNode->delete_data(dpp, p.key, null_yield);
+ return cacheNode->get_free_space(dpp);
+}
+
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode)
+{
+ erase(dpp, key);
+ const std::lock_guard l(lru_lock);
+ Entry *e = new Entry(key, offset, len);
+ entries_lru_list.push_back(*e);
+ entries_map.emplace(key, e);
+}
+
+bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key)
+{
+ const std::lock_guard l(lru_lock);
+ auto p = entries_map.find(key);
+ if (p == entries_map.end()) {
+ return false;
+ }
+ entries_map.erase(p);
+ entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
+ return true;
+}
+
int PolicyDriver::init() {
if (policyName == "lfuda") {
cachePolicy = new LFUDAPolicy();
return 0;
- } else
- return -1;
+ } else if (policyName == "lru") {
+ cachePolicy = new LRUPolicy();
+ return 0;
+ }
+ return -1;
}
} } // namespace rgw::d4n
virtual Address get_addr() { return addr; }
virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0;
virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) = 0;
};
class LFUDAPolicy : public CachePolicy {
virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); }
virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override {}
+};
+
+class LRUPolicy : public CachePolicy {
+ public:
+ struct Entry : public boost::intrusive::list_base_hook<> {
+ std::string key;
+ uint64_t offset;
+ uint64_t len;
+ Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {}
+ };
+ LRUPolicy() = default;
+ private:
+ std::mutex lru_lock;
+ //The disposer object function
+ struct Entry_delete_disposer {
+ void operator()(Entry *e) {
+ delete e;
+ }
+ };
+ typedef boost::intrusive::list<Entry> List;
+ List entries_lru_list;
+ std::unordered_map<std::string, Entry*> entries_map;
+ public:
+ virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return 0; };
+ virtual int exist_key(std::string key) override;
+ virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override;
+ virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override;
+ bool erase(const DoutPrefixProvider* dpp, const std::string& key);
};
class PolicyDriver {
blockDir = new rgw::d4n::BlockDirectory();
cacheBlock = new rgw::d4n::CacheBlock();
policyDriver = new rgw::d4n::PolicyDriver("lfuda");
- cache = rgw::sal::LRUCache(cacheDriver);
-
+ lruPolicyDriver = new rgw::d4n::PolicyDriver("lru");
}
D4NFilterDriver::~D4NFilterDriver()
delete blockDir;
delete cacheBlock;
delete policyDriver;
+ delete lruPolicyDriver;
}
int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
policyDriver->init();
policyDriver->cachePolicy->init(cct);
+
+ lruPolicyDriver->init();
+ lruPolicyDriver->cachePolicy->init(cct);
return 0;
}
int set_attrsReturn = source->set_attrs(attrs);
if (set_attrsReturn < 0) {
- ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation failed." << dendl;
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation failed." << dendl;
} else {
- ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation succeeded." << dendl;
+ ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation succeeded." << dendl;
}
}
}
this->client_cb = cb;
this->cb->set_client_cb(cb); // what's this for? -Sam
+ /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
+ One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
+ chunks using the larger chunk, but all corner cases need to be considered like the last chunk which might be smaller than obj_max_req_size
+ and also ranged requests where a smaller chunk is overwritten by a larger chunk size != obj_max_req_size */
+
uint64_t obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
uint64_t start_part_num = 0;
uint64_t part_num = ofs/obj_max_req_size; //part num of ofs wrt start of the object
this->offset = ofs;
do {
- uint64_t id = adjusted_start_ofs;
+ uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
if (start_part_num == (num_parts - 1)) {
len_to_read = len;
part_len = len;
if (start_part_num == 0) {
len_to_read -= diff_ofs;
id += diff_ofs;
+ read_ofs = diff_ofs;
}
- uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
ceph::bufferlist bl;
std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if (source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) {
+ if (source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+ source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, source->driver->get_cache_driver());
+
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
auto r = flush(dpp, std::move(completed));
if (r < 0) {
+ drain(dpp);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
return r;
}
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if ((part_len != obj_max_req_size) && source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) {
+ if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+ source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, source->driver->get_cache_driver());
+
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
auto r = flush(dpp, std::move(completed));
if (r < 0) {
+ drain(dpp);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
return r;
}
}
}
- int ret = 0;
//Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
if (write_to_cache) {
- const std::lock_guard l(d3n_get_data.d3n_lock);
+ const std::lock_guard l(d4n_get_data_lock);
+ rgw::d4n::CacheBlock block;
+ rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
+ block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port));
+ block.cacheObj.bucketName = source->get_bucket()->get_name();
+ block.cacheObj.objName = source->get_key().get_oid();
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
- ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
- if (ret == 0) {
- filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()}); //TODO - move to flush, where operation actually completes
+ block.size = bl.length();
+ block.blockId = ofs;
+ uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp);
+ while(freeSpace < block.size) {
+ freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver());
}
+ if (filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+ filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+ /* Store block in directory */
+ if (!blockDir->exist_key(oid)) {
+ int ret = blockDir->set_value(&block);
+ if (ret < 0) {
+ ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+ return ret;
+ } else {
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ }
+ }
+ }
+ filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
ofs += bl_len;
- ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
- if (ret == 0) {
- filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()});
+ block.blockId = ofs;
+ block.size = bl.length();
+ uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp);
+ while(freeSpace < block.size) {
+ freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver());
+ }
+ if (filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()) == 0) {
+ filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
+ /* Store block in directory */
+ if (!blockDir->exist_key(oid)) {
+ int ret = blockDir->set_value(&block);
+ if (ret < 0) {
+ ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+ return ret;
+ } else {
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ }
+ }
}
+ filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver());
} else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
bl.splice(0, len_to_copy, &bl_copy);
bl_rem.claim_append(bl_copy);
- if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) {
+ if (bl_rem.length() == rgw_get_obj_max_req_size) {
std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
ofs += bl_rem.length();
-
- ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs());
- if (ret == 0) {
- filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl_rem.length()});
+ block.blockId = ofs;
+ block.size = bl_rem.length();
+ uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp);
+ while(freeSpace < block.size) {
+ freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver());
+ }
+ if (filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) {
+ filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl_rem.length(), filter->get_cache_driver());
+ /* Store block in directory */
+ if (!blockDir->exist_key(oid)) {
+ int ret = blockDir->set_value(&block);
+ if (ret < 0) {
+ ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl;
+ return ret;
+ } else {
+ ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl;
+ }
+ }
}
bl_rem.clear();
#include <boost/intrusive/list.hpp>
namespace rgw { namespace sal {
-// Temporarily added to d4n filter driver - need to figure out the best way to incorporate this as part of Policy Manager
-class LRUCache {
- public:
- struct Entry : public boost::intrusive::list_base_hook<> {
- std::string key;
- uint64_t offset;
- uint64_t len;
- Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {}
- };
- private:
- //The disposer object function
- struct Entry_delete_disposer {
- void operator()(Entry *e) {
- delete e;
- }
- };
- typedef boost::intrusive::list<Entry> List;
- List entries_lru_list;
- std::unordered_map<std::string, Entry*> entries_map;
- rgw::cache::CacheDriver* cacheDriver;
-
- void evict() {
- auto p = entries_lru_list.front();
- entries_map.erase(entries_map.find(p.key));
- entries_lru_list.pop_front_and_dispose(Entry_delete_disposer());
- }
-
- public:
- LRUCache() = default;
- LRUCache(rgw::cache::CacheDriver* cacheDriver) : cacheDriver(cacheDriver) {} //in case we want to access cache backend apis from here
-
- void insert(const DoutPrefixProvider* dpp, const Entry& entry) {
- erase(dpp, entry);
- //TODO - Get free space using cache api and if there isn't enough space then evict
- Entry *e = new Entry(entry);
- entries_lru_list.push_back(*e);
- entries_map.emplace(entry.key, e);
- }
-
- bool erase(const DoutPrefixProvider* dpp, const Entry& entry) {
- auto p = entries_map.find(entry.key);
- if (p == entries_map.end()) {
- return false;
- }
- entries_map.erase(p);
- entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer());
- return true;
- }
-
- bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) {
- if (entries_map.count(key) != 0) {
- return true;
- }
- return false;
- }
-};
class D4NFilterDriver : public FilterDriver {
private:
rgw::d4n::BlockDirectory* blockDir;
rgw::d4n::CacheBlock* cacheBlock;
rgw::d4n::PolicyDriver* policyDriver;
- rgw::sal::LRUCache cache;
+ rgw::d4n::PolicyDriver* lruPolicyDriver;
public:
D4NFilterDriver(Driver* _next);
rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir; }
rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; }
rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; }
- rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; }
- rgw::sal::LRUCache& get_lru_cache() { return cache; }
+ rgw::d4n::PolicyDriver* get_policy_driver() { return lruPolicyDriver; }
};
class D4NFilterUser : public FilterUser {
public:
class D4NFilterGetCB: public RGWGetDataCB {
private:
- D4NFilterDriver* filter; // don't need -Sam ?
+ D4NFilterDriver* filter;
std::string oid;
D4NFilterObject* source;
RGWGetDataCB* client_cb;
uint64_t ofs = 0, len = 0;
bufferlist bl_rem;
bool last_part{false};
- D3nGetObjData d3n_get_data; // should make d4n version? -Sam
+ std::mutex d4n_get_data_lock;
bool write_to_cache{true};
public: