From 8fb01cf252c77fc9beb45d90156225142c4750de Mon Sep 17 00:00:00 2001 From: Samarah Date: Fri, 6 Oct 2023 20:20:43 +0000 Subject: [PATCH] d4n: Store `optional_yield` in client cb and update index used in D4N/cache classes Signed-off-by: Samarah --- src/rgw/driver/d4n/d4n_directory.cc | 6 +++--- src/rgw/driver/d4n/d4n_directory.h | 3 ++- src/rgw/driver/d4n/d4n_policy.cc | 24 +++++++++++++++--------- src/rgw/driver/d4n/d4n_policy.h | 8 ++++---- src/rgw/driver/d4n/rgw_sal_d4n.cc | 16 +++++++--------- src/rgw/driver/d4n/rgw_sal_d4n.h | 16 +++++++++------- src/test/rgw/test_d4n_directory.cc | 16 ++++++++-------- src/test/rgw/test_d4n_policy.cc | 25 ++++++++----------------- 8 files changed, 56 insertions(+), 58 deletions(-) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index f608d6266e0d1..58872e524da4c 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -224,7 +224,7 @@ int ObjectDirectory::del(CacheObj* object, optional_yield y) { } std::string BlockDirectory::build_index(CacheBlock* block) { - return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID); + return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); } int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) { @@ -277,7 +277,7 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y) { } endpoint.pop_back(); - redisValues.push_back(endpoint); // Set in filter -Sam + redisValues.push_back(endpoint); redisValues.push_back("objName"); redisValues.push_back(block->cacheObj.objName); @@ -298,7 +298,7 @@ int BlockDirectory::set(CacheBlock* block, optional_yield y) { } endpoint.pop_back(); - redisValues.push_back(endpoint); // Set in filter -Sam + redisValues.push_back(endpoint); try { boost::system::error_code ec; diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index e71e2774bb88c..2ad56f0f61993 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -116,10 +116,11 @@ class BlockDirectory: public Directory { int del(CacheBlock* block, optional_yield y); int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y); int remove_host(CacheBlock* block, std::string value, optional_yield y); - std::string build_index(CacheBlock* block); private: std::shared_ptr conn; + + std::string build_index(CacheBlock* block); }; } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 4453e5ec48bea..48ff4543fe276 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -6,6 +6,10 @@ namespace rgw { namespace d4n { +std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) { + return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size); +} + // initiate a call to async_exec() on the connection's executor struct initiate_exec { std::shared_ptr conn; @@ -173,7 +177,7 @@ int LFUDAPolicy::get_min_avg_weight(optional_yield y) { } } -CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { +CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) { if (entries_map.empty()) return {}; @@ -187,7 +191,8 @@ CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::C victim.cacheObj.bucketName = key.substr(0, key.find('_')); key.erase(0, key.find('_') + 1); victim.cacheObj.objName = key.substr(0, key.find('_')); - victim.blockID = boost::lexical_cast(key.substr(key.find('_') + 1, key.length())); + victim.blockID = it->second->offset; + victim.size = it->second->len; if (dir->get(&victim, y) < 0) { return {}; @@ -203,7 +208,7 @@ void LFUDAPolicy::shutdown() { boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); }); } -int LFUDAPolicy::exist_key(std::string key, optional_yield y) { +int LFUDAPolicy::exist_key(std::string key) { if (entries_map.count(key) != 0) { return true; } @@ -215,8 +220,8 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw response resp; int age = get_age(y); - if (exist_key(dir->build_index(block), y)) { /* Local copy */ - auto it = entries_map.find(dir->build_index(block)); + if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */ + auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size)); it->second->localWeight += age; return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y); } else { @@ -261,14 +266,15 @@ int LFUDAPolicy::get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw } uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) { - CacheBlock victim = find_victim(dpp, cacheNode, y); + CacheBlock victim = find_victim(dpp, y); if (victim.cacheObj.objName.empty()) { ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl; return 0; /* Return zero for failure */ } - auto it = entries_map.find(dir->build_index(&victim)); + std::string key = build_index(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.blockID, victim.size); + auto it = entries_map.find(key); if (it == entries_map.end()) { return 0; } @@ -303,7 +309,7 @@ uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheD ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl; - if (cacheNode->del(dpp, victim.cacheObj.objName, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) { + if (cacheNode->del(dpp, key, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) { return 0; } else { uint64_t num_entries = entries_map.size(); @@ -349,7 +355,7 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key) return true; } -int LRUPolicy::exist_key(std::string key, optional_yield y) +int LRUPolicy::exist_key(std::string key) { const std::lock_guard l(lru_lock); if (entries_map.count(key) != 0) { diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 713aa78c890c9..63da686d9a67f 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -34,7 +34,7 @@ class CachePolicy { virtual ~CachePolicy() = default; virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; } - virtual int exist_key(std::string key, optional_yield y) = 0; + virtual int exist_key(std::string key) = 0; virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0; @@ -68,7 +68,7 @@ class LFUDAPolicy : public CachePolicy { int get_age(optional_yield y); int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y); int get_min_avg_weight(optional_yield y); - CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y); + CacheBlock find_victim(const DoutPrefixProvider* dpp, optional_yield y); public: LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) { @@ -98,7 +98,7 @@ class LFUDAPolicy : public CachePolicy { return 0; } - virtual int exist_key(std::string key, optional_yield y) override; + virtual int exist_key(std::string key) override; virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; @@ -117,7 +117,7 @@ class LRUPolicy : public CachePolicy { public: LRUPolicy() = default; - virtual int exist_key(std::string key, optional_yield y) override; + virtual int exist_key(std::string key) override; virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 059c311261998..f0c9650a249fa 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -44,8 +44,8 @@ D4NFilterDriver::D4NFilterDriver(Driver* _next, boost::asio::io_context& io_cont partition_info.type = "read-cache"; partition_info.size = g_conf()->rgw_d4n_l1_datacache_size; - cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam - //cacheDriver = new rgw::cache::SSDDriver(partition_info); + //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam + cacheDriver = new rgw::cache::SSDDriver(partition_info); objDir = new rgw::d4n::ObjectDirectory(io_context); blockDir = new rgw::d4n::BlockDirectory(io_context); policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda"); @@ -526,8 +526,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl; this->client_cb = cb; - this->cb->set_client_cb(cb, dpp); // what's this for? -Sam - // save y here -Sam + this->cb->set_client_cb(cb, dpp, &y); // what's this for? -Sam /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller @@ -580,7 +579,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int block.cacheObj.objName = source->get_key().get_oid(); block.cacheObj.bucketName = source->get_bucket()->get_name(); - if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) { + if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache)) { // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); @@ -601,7 +600,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) { + if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache)) { // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); @@ -671,12 +670,11 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int return r; } - return this->cb->flush_last_part(y); + return this->cb->flush_last_part(); } -int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(optional_yield y) +int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part() { - save_y = &y; last_part = true; return handle_data(bl_rem, 0, bl_rem.length()); } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index ead07740a5e76..78428a01ad2ba 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -112,19 +112,21 @@ class D4NFilterObject : public FilterObject { bool last_part{false}; std::mutex d4n_get_data_lock; bool write_to_cache{true}; - const DoutPrefixProvider* dpp; + const DoutPrefixProvider* dpp; + optional_yield* y; public: D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid, D4NFilterObject* _source) : filter(_filter), - oid(_oid), - source(_source) {} - - optional_yield* save_y; + oid(_oid), source(_source) {} int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; - void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp) { this->client_cb = client_cb; this->dpp = dpp;} + void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp, optional_yield* y) { + this->client_cb = client_cb; + this->dpp = dpp; + this->y = y; + } void set_ofs(uint64_t ofs) { this->ofs = ofs; } - int flush_last_part(optional_yield y); + int flush_last_part(); void bypass_cache_write() { this->write_to_cache = false; } }; diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 806417416daa7..4ce35eaf1d2f5 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -279,7 +279,7 @@ TEST_F(BlockDirectoryFixture, SetYield) boost::system::error_code ec; request req; - req.push_range("HMGET", "testBucket_testName_0", fields); + req.push_range("HMGET", "testBucket_testName_0_0", fields); req.push("FLUSHALL"); response< std::vector, @@ -303,7 +303,7 @@ TEST_F(BlockDirectoryFixture, GetYield) { boost::system::error_code ec; request req; - req.push("HSET", "testBucket_testName_0", "objName", "newoid"); + req.push("HSET", "testBucket_testName_0_0", "objName", "newoid"); response resp; conn->async_exec(req, resp, yield[ec]); @@ -340,8 +340,8 @@ TEST_F(BlockDirectoryFixture, CopyYield) boost::system::error_code ec; request req; - req.push("EXISTS", "copyBucketName_copyTestName_0"); - req.push_range("HMGET", "copyBucketName_copyTestName_0", fields); + req.push("EXISTS", "copyBucketName_copyTestName_0_0"); + req.push_range("HMGET", "copyBucketName_copyTestName_0_0", fields); req.push("FLUSHALL"); response, @@ -371,7 +371,7 @@ TEST_F(BlockDirectoryFixture, DelYield) { boost::system::error_code ec; request req; - req.push("EXISTS", "testBucket_testName_0"); + req.push("EXISTS", "testBucket_testName_0_0"); response resp; conn->async_exec(req, resp, yield[ec]); @@ -412,7 +412,7 @@ TEST_F(BlockDirectoryFixture, UpdateFieldYield) boost::system::error_code ec; request req; - req.push("HMGET", "testBucket_testName_0", "objName", "blockHosts"); + req.push("HMGET", "testBucket_testName_0_0", "objName", "blockHosts"); req.push("FLUSHALL"); response< std::vector, boost::redis::ignore_t> resp; @@ -439,8 +439,8 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield) { boost::system::error_code ec; request req; - req.push("HEXISTS", "testBucket_testName_0", "blockHosts"); - req.push("HGET", "testBucket_testName_0", "blockHosts"); + req.push("HEXISTS", "testBucket_testName_0_0", "blockHosts"); + req.push("HGET", "testBucket_testName_0_0", "blockHosts"); response resp; conn->async_exec(req, resp, yield[ec]); diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index ce2fde244bf81..1c104cb2f6491 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -104,19 +104,13 @@ class LFUDAPolicyFixture: public ::testing::Test { bufferlist bl; rgw::sal::Attrs attrs; - std::string key = "testName"; }; TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) { spawn::spawn(io, [this] (spawn::yield_context yield) { + std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield})); - - rgw::d4n::CacheBlock temp; - temp.blockID = 0; - temp.cacheObj.objName = "testName"; - temp.cacheObj.bucketName = "testBucket"; - std::string key = dir->build_index(&temp); policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield}); /* Change cache age for testing purposes */ @@ -182,9 +176,9 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) attrs.insert({"bucket_name", attrVal}); ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield})); - ASSERT_EQ(0, cacheDriver->put(env->dpp, victim.cacheObj.objName, bl, bl.length(), attrs, optional_yield{io, yield})); - std::string key = dir->build_index(&victim); - policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield}); + std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size); + ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield})); + policyDriver->get_cache_policy()->insert(env->dpp, victimKey, 0, bl.length(), "", cacheDriver, optional_yield{io, yield}); /* Remote block */ block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */ @@ -201,11 +195,12 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) cacheDriver->shutdown(); policyDriver->get_cache_policy()->shutdown(); + std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); boost::system::error_code ec; request req; - req.push("EXISTS", "RedisCache/victimName"); - req.push("HGET", "testBucket_victimName_0", "globalWeight"); - req.push("HGET", "testBucket_testName_0", "globalWeight"); + req.push("EXISTS", "RedisCache/" + victimKey); + req.push("HGET", victimKey, "globalWeight"); + req.push("HGET", key, "globalWeight"); req.push("FLUSHALL"); response resp; - //response< std::vector, - // boost::redis::ignore_t > resp; conn->async_exec(req, resp, yield[ec]); - //ASSERT_EQ((bool)ec, false); - //EXPECT_EQ(std::get<0>(resp).value(), vals); conn->cancel(); }); -- 2.39.5