From: Pritha Srivastava Date: Mon, 28 Aug 2023 11:42:18 +0000 (+0530) Subject: rgw/cache: moving LRU related data structures and interfaces to the Policy Manager. X-Git-Tag: testing/wip-batrick-testing-20240411.154038~45^2~62 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=e31e438ac96fcefb4c4ca13536901b11257ce868;p=ceph-ci.git rgw/cache: moving LRU related data structures and interfaces to the Policy Manager. Replacing the related calls in d4n filter driver with that of the cachepolicy in the Policy Manager and adding cache block to the block directory once the block is written to the cache backend and the policy manager data structure. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 13744ae42cb..06a33f9a24f 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -24,7 +24,7 @@ struct CacheObj { struct CacheBlock { CacheObj cacheObj; - uint64_t blockId; /* RADOS object block ID */ + uint64_t blockId; /* block ID */ uint64_t size; /* Block size in bytes */ int globalWeight = 0; std::vector hostsList; /* List of hostnames of block locations */ diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 1986939c6a4..5004a096420 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -400,12 +400,61 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD return victim.size; } +int LRUPolicy::exist_key(std::string key) +{ + const std::lock_guard l(lru_lock); + if (entries_map.count(key) != 0) { + return true; + } + return false; +} + +int LRUPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) +{ + //Does not apply to LRU + return 0; +} + +uint64_t LRUPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) +{ + const std::lock_guard l(lru_lock); + auto p = entries_lru_list.front(); + entries_map.erase(entries_map.find(p.key)); + entries_lru_list.pop_front_and_dispose(Entry_delete_disposer()); + cacheNode->delete_data(dpp, p.key, null_yield); + return cacheNode->get_free_space(dpp); +} + +void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) +{ + erase(dpp, key); + const std::lock_guard l(lru_lock); + Entry *e = new Entry(key, offset, len); + entries_lru_list.push_back(*e); + entries_map.emplace(key, e); +} + +bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) +{ + const std::lock_guard l(lru_lock); + auto p = entries_map.find(key); + if (p == entries_map.end()) { + return false; + } + entries_map.erase(p); + entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer()); + return true; +} + int PolicyDriver::init() { if (policyName == "lfuda") { cachePolicy = new LFUDAPolicy(); return 0; - } else - return -1; + } else if (policyName == "lru") { + cachePolicy = new LRUPolicy(); + return 0; + } + return -1; } } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index e7c0d5bb929..6a514d80808 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -31,6 +31,7 @@ class CachePolicy { virtual Address get_addr() { return addr; } virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) = 0; virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) = 0; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) = 0; }; class LFUDAPolicy : public CachePolicy { @@ -52,6 +53,36 @@ class LFUDAPolicy : public CachePolicy { virtual int exist_key(std::string key) override { return CachePolicy::exist_key(key); } virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override; virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override {} +}; + +class LRUPolicy : public CachePolicy { + public: + struct Entry : public boost::intrusive::list_base_hook<> { + std::string key; + uint64_t offset; + uint64_t len; + Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {} + }; + LRUPolicy() = default; + private: + std::mutex lru_lock; + //The disposer object function + struct Entry_delete_disposer { + void operator()(Entry *e) { + delete e; + } + }; + typedef boost::intrusive::list List; + List entries_lru_list; + std::unordered_map entries_map; + public: + virtual int find_client(const DoutPrefixProvider* dpp, cpp_redis::client* client) override { return 0; }; + virtual int exist_key(std::string key) override; + virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode) override; + virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode) override; + virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, rgw::cache::CacheDriver* cacheNode) override; + bool erase(const DoutPrefixProvider* dpp, const std::string& key); }; class PolicyDriver { diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 960e61d1688..a9404826d69 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -51,8 +51,7 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next) blockDir = new rgw::d4n::BlockDirectory(); cacheBlock = new rgw::d4n::CacheBlock(); policyDriver = new rgw::d4n::PolicyDriver("lfuda"); - cache = rgw::sal::LRUCache(cacheDriver); - + lruPolicyDriver = new rgw::d4n::PolicyDriver("lru"); } D4NFilterDriver::~D4NFilterDriver() @@ -62,6 +61,7 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next) delete blockDir; delete cacheBlock; delete policyDriver; + delete lruPolicyDriver; } int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) @@ -75,6 +75,9 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) policyDriver->init(); policyDriver->cachePolicy->init(cct); + + lruPolicyDriver->init(); + lruPolicyDriver->cachePolicy->init(cct); return 0; } @@ -410,9 +413,9 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix int set_attrsReturn = source->set_attrs(attrs); if (set_attrsReturn < 0) { - ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation failed." << dendl; + ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation failed." << dendl; } else { - ldpp_dout(dpp, 20) << "D4N Filter: Cache get object operation succeeded." << dendl; + ldpp_dout(dpp, 20) << "D4N Filter: Cache set object operation succeeded." << dendl; } } } @@ -481,6 +484,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->client_cb = cb; this->cb->set_client_cb(cb); // what's this for? -Sam + /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size + One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller + chunks using the larger chunk, but all corner cases need to be considered like the last chunk which might be smaller than obj_max_req_size + and also ranged requests where a smaller chunk is overwritten by a larger chunk size != obj_max_req_size */ + uint64_t obj_max_req_size = g_conf()->rgw_get_obj_max_req_size; uint64_t start_part_num = 0; uint64_t part_num = ofs/obj_max_req_size; //part num of ofs wrt start of the object @@ -499,7 +507,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->offset = ofs; do { - uint64_t id = adjusted_start_ofs; + uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk if (start_part_num == (num_parts - 1)) { len_to_read = len; part_len = len; @@ -512,24 +520,27 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int if (start_part_num == 0) { len_to_read -= diff_ofs; id += diff_ofs; + read_ofs = diff_ofs; } - uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk ceph::bufferlist bl; std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if (source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) { + if (source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) { // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); + source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, source->driver->get_cache_driver()); + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; auto r = flush(dpp, std::move(completed)); if (r < 0) { + drain(dpp); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; return r; } @@ -539,15 +550,18 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if ((part_len != obj_max_req_size) && source->driver->get_lru_cache().key_exists(dpp, oid_in_cache)) { + if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->cachePolicy->exist_key(oid_in_cache)) { // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); + source->driver->get_policy_driver()->cachePolicy->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, source->driver->get_cache_driver()); + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; auto r = flush(dpp, std::move(completed)); if (r < 0) { + drain(dpp); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; return r; } @@ -737,24 +751,60 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } } - int ret = 0; //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache if (write_to_cache) { - const std::lock_guard l(d3n_get_data.d3n_lock); + const std::lock_guard l(d4n_get_data_lock); + rgw::d4n::CacheBlock block; + rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); + block.hostsList.push_back(blockDir->get_addr().host + ":" + std::to_string(blockDir->get_addr().port)); + block.cacheObj.bucketName = source->get_bucket()->get_name(); + block.cacheObj.objName = source->get_key().get_oid(); if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); - ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()); - if (ret == 0) { - filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()}); //TODO - move to flush, where operation actually completes + block.size = bl.length(); + block.blockId = ofs; + uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp); + while(freeSpace < block.size) { + freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver()); } + if (filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { + filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver()); + /* Store block in directory */ + if (!blockDir->exist_key(oid)) { + int ret = blockDir->set_value(&block); + if (ret < 0) { + ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; + return ret; + } else { + ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + } + } + } + filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver()); } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); ofs += bl_len; - ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()); - if (ret == 0) { - filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl.length()}); + block.blockId = ofs; + block.size = bl.length(); + uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp); + while(freeSpace < block.size) { + freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver()); + } + if (filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()) == 0) { + filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver()); + /* Store block in directory */ + if (!blockDir->exist_key(oid)) { + int ret = blockDir->set_value(&block); + if (ret < 0) { + ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; + return ret; + } else { + ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + } + } } + filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl.length(), filter->get_cache_driver()); } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length(); uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space; @@ -763,13 +813,27 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl bl.splice(0, len_to_copy, &bl_copy); bl_rem.claim_append(bl_copy); - if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) { + if (bl_rem.length() == rgw_get_obj_max_req_size) { std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length()); ofs += bl_rem.length(); - - ret = filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()); - if (ret == 0) { - filter->get_lru_cache().insert(save_dpp, LRUCache::Entry{oid, ofs, bl_rem.length()}); + block.blockId = ofs; + block.size = bl_rem.length(); + uint64_t freeSpace = filter->get_cache_driver()->get_free_space(save_dpp); + while(freeSpace < block.size) { + freeSpace += filter->get_policy_driver()->cachePolicy->eviction(save_dpp, filter->get_cache_driver()); + } + if (filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()) == 0) { + filter->get_policy_driver()->cachePolicy->update(save_dpp, oid, ofs, bl_rem.length(), filter->get_cache_driver()); + /* Store block in directory */ + if (!blockDir->exist_key(oid)) { + int ret = blockDir->set_value(&block); + if (ret < 0) { + ldpp_dout(save_dpp, 0) << "D4N Filter: Block directory set operation failed." << dendl; + return ret; + } else { + ldpp_dout(save_dpp, 20) << "D4N Filter: Block directory set operation succeeded." << dendl; + } + } } bl_rem.clear(); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index c65e3f385c4..2114ce3e578 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -28,62 +28,6 @@ #include namespace rgw { namespace sal { -// Temporarily added to d4n filter driver - need to figure out the best way to incorporate this as part of Policy Manager -class LRUCache { - public: - struct Entry : public boost::intrusive::list_base_hook<> { - std::string key; - uint64_t offset; - uint64_t len; - Entry(std::string& key, uint64_t offset, uint64_t len) : key(key), offset(offset), len(len) {} - }; - private: - //The disposer object function - struct Entry_delete_disposer { - void operator()(Entry *e) { - delete e; - } - }; - typedef boost::intrusive::list List; - List entries_lru_list; - std::unordered_map entries_map; - rgw::cache::CacheDriver* cacheDriver; - - void evict() { - auto p = entries_lru_list.front(); - entries_map.erase(entries_map.find(p.key)); - entries_lru_list.pop_front_and_dispose(Entry_delete_disposer()); - } - - public: - LRUCache() = default; - LRUCache(rgw::cache::CacheDriver* cacheDriver) : cacheDriver(cacheDriver) {} //in case we want to access cache backend apis from here - - void insert(const DoutPrefixProvider* dpp, const Entry& entry) { - erase(dpp, entry); - //TODO - Get free space using cache api and if there isn't enough space then evict - Entry *e = new Entry(entry); - entries_lru_list.push_back(*e); - entries_map.emplace(entry.key, e); - } - - bool erase(const DoutPrefixProvider* dpp, const Entry& entry) { - auto p = entries_map.find(entry.key); - if (p == entries_map.end()) { - return false; - } - entries_map.erase(p); - entries_lru_list.erase_and_dispose(entries_lru_list.iterator_to(*(p->second)), Entry_delete_disposer()); - return true; - } - - bool key_exists(const DoutPrefixProvider* dpp, const std::string& key) { - if (entries_map.count(key) != 0) { - return true; - } - return false; - } -}; class D4NFilterDriver : public FilterDriver { private: @@ -92,7 +36,7 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::BlockDirectory* blockDir; rgw::d4n::CacheBlock* cacheBlock; rgw::d4n::PolicyDriver* policyDriver; - rgw::sal::LRUCache cache; + rgw::d4n::PolicyDriver* lruPolicyDriver; public: D4NFilterDriver(Driver* _next); @@ -115,8 +59,7 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::ObjectDirectory* get_obj_dir() { return objDir; } rgw::d4n::BlockDirectory* get_block_dir() { return blockDir; } rgw::d4n::CacheBlock* get_cache_block() { return cacheBlock; } - rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver; } - rgw::sal::LRUCache& get_lru_cache() { return cache; } + rgw::d4n::PolicyDriver* get_policy_driver() { return lruPolicyDriver; } }; class D4NFilterUser : public FilterUser { @@ -155,14 +98,14 @@ class D4NFilterObject : public FilterObject { public: class D4NFilterGetCB: public RGWGetDataCB { private: - D4NFilterDriver* filter; // don't need -Sam ? + D4NFilterDriver* filter; std::string oid; D4NFilterObject* source; RGWGetDataCB* client_cb; uint64_t ofs = 0, len = 0; bufferlist bl_rem; bool last_part{false}; - D3nGetObjData d3n_get_data; // should make d4n version? -Sam + std::mutex d4n_get_data_lock; bool write_to_cache{true}; public: