}
std::string BlockDirectory::build_index(CacheBlock* block) {
- return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID);
+ return block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
}
int BlockDirectory::exist_key(CacheBlock* block, optional_yield y) {
}
endpoint.pop_back();
- redisValues.push_back(endpoint); // Set in filter -Sam
+ redisValues.push_back(endpoint);
redisValues.push_back("objName");
redisValues.push_back(block->cacheObj.objName);
}
endpoint.pop_back();
- redisValues.push_back(endpoint); // Set in filter -Sam
+ redisValues.push_back(endpoint);
try {
boost::system::error_code ec;
int del(CacheBlock* block, optional_yield y);
int update_field(CacheBlock* block, std::string field, std::string value, optional_yield y);
int remove_host(CacheBlock* block, std::string value, optional_yield y);
- std::string build_index(CacheBlock* block);
private:
std::shared_ptr<connection> conn;
+
+ std::string build_index(CacheBlock* block);
};
} } // namespace rgw::d4n
namespace rgw { namespace d4n {
+std::string build_index(std::string bucketName, std::string oid, uint64_t offset, uint64_t size) {
+ return bucketName + "_" + oid + "_" + std::to_string(offset) + "_" + std::to_string(size);
+}
+
// initiate a call to async_exec() on the connection's executor
struct initiate_exec {
std::shared_ptr<boost::redis::connection> conn;
}
}
-CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
+CacheBlock LFUDAPolicy::find_victim(const DoutPrefixProvider* dpp, optional_yield y) {
if (entries_map.empty())
return {};
victim.cacheObj.bucketName = key.substr(0, key.find('_'));
key.erase(0, key.find('_') + 1);
victim.cacheObj.objName = key.substr(0, key.find('_'));
- victim.blockID = boost::lexical_cast<uint64_t>(key.substr(key.find('_') + 1, key.length()));
+ victim.blockID = it->second->offset;
+ victim.size = it->second->len;
if (dir->get(&victim, y) < 0) {
return {};
boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
}
-int LFUDAPolicy::exist_key(std::string key, optional_yield y) {
+int LFUDAPolicy::exist_key(std::string key) {
if (entries_map.count(key) != 0) {
return true;
}
response<std::string> resp;
int age = get_age(y);
- if (exist_key(dir->build_index(block), y)) { /* Local copy */
- auto it = entries_map.find(dir->build_index(block));
+ if (exist_key(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size))) { /* Local copy */
+ auto it = entries_map.find(build_index(block->cacheObj.bucketName, block->cacheObj.objName, block->blockID, block->size));
it->second->localWeight += age;
return cacheNode->set_attr(dpp, block->cacheObj.objName, "localWeight", std::to_string(it->second->localWeight), y);
} else {
}
uint64_t LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) {
- CacheBlock victim = find_victim(dpp, cacheNode, y);
+ CacheBlock victim = find_victim(dpp, y);
if (victim.cacheObj.objName.empty()) {
ldpp_dout(dpp, 10) << "RGW D4N Policy: Could not find victim block" << dendl;
return 0; /* Return zero for failure */
}
- auto it = entries_map.find(dir->build_index(&victim));
+ std::string key = build_index(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.blockID, victim.size);
+ auto it = entries_map.find(key);
if (it == entries_map.end()) {
return 0;
}
ldpp_dout(dpp, 10) << "RGW D4N Policy: Block " << victim.cacheObj.objName << " has been evicted." << dendl;
- if (cacheNode->del(dpp, victim.cacheObj.objName, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
+ if (cacheNode->del(dpp, key, y) < 0 && dir->remove_host(&victim, dir->cct->_conf->rgw_local_cache_address, y) < 0) {
return 0;
} else {
uint64_t num_entries = entries_map.size();
return true;
}
-int LRUPolicy::exist_key(std::string key, optional_yield y)
+int LRUPolicy::exist_key(std::string key)
{
const std::lock_guard l(lru_lock);
if (entries_map.count(key) != 0) {
virtual ~CachePolicy() = default;
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp) { return 0; }
- virtual int exist_key(std::string key, optional_yield y) = 0;
+ virtual int exist_key(std::string key) = 0;
virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) = 0;
int get_age(optional_yield y);
int set_min_avg_weight(size_t weight, std::string cacheLocation, optional_yield y);
int get_min_avg_weight(optional_yield y);
- CacheBlock find_victim(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y);
+ CacheBlock find_victim(const DoutPrefixProvider* dpp, optional_yield y);
public:
LFUDAPolicy(net::io_context& io_context) : CachePolicy(), io(io_context) {
return 0;
}
- virtual int exist_key(std::string key, optional_yield y) override;
+ virtual int exist_key(std::string key) override;
virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
public:
LRUPolicy() = default;
- virtual int exist_key(std::string key, optional_yield y) override;
+ virtual int exist_key(std::string key) override;
virtual int get_block(const DoutPrefixProvider* dpp, CacheBlock* block, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
virtual uint64_t eviction(const DoutPrefixProvider* dpp, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, rgw::cache::CacheDriver* cacheNode, optional_yield y) override;
partition_info.type = "read-cache";
partition_info.size = g_conf()->rgw_d4n_l1_datacache_size;
- cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
- //cacheDriver = new rgw::cache::SSDDriver(partition_info);
+ //cacheDriver = new rgw::cache::RedisDriver(io_context, partition_info); // change later -Sam
+ cacheDriver = new rgw::cache::SSDDriver(partition_info);
objDir = new rgw::d4n::ObjectDirectory(io_context);
blockDir = new rgw::d4n::BlockDirectory(io_context);
policyDriver = new rgw::d4n::PolicyDriver(io_context, "lfuda");
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl;
this->client_cb = cb;
- this->cb->set_client_cb(cb, dpp); // what's this for? -Sam
- // save y here -Sam
+ this->cb->set_client_cb(cb, dpp, &y); // what's this for? -Sam
/* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
block.cacheObj.objName = source->get_key().get_oid();
block.cacheObj.bucketName = source->get_bucket()->get_name();
- if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) {
+ if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
" read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache, y)) {
+ if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache)) {
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
return r;
}
- return this->cb->flush_last_part(y);
+ return this->cb->flush_last_part();
}
-int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(optional_yield y)
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part()
{
- save_y = &y;
last_part = true;
return handle_data(bl_rem, 0, bl_rem.length());
}
bool last_part{false};
std::mutex d4n_get_data_lock;
bool write_to_cache{true};
- const DoutPrefixProvider* dpp;
+ const DoutPrefixProvider* dpp;
+ optional_yield* y;
public:
D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid, D4NFilterObject* _source) : filter(_filter),
- oid(_oid),
- source(_source) {}
-
- optional_yield* save_y;
+ oid(_oid), source(_source) {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
- void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp) { this->client_cb = client_cb; this->dpp = dpp;}
+ void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp, optional_yield* y) {
+ this->client_cb = client_cb;
+ this->dpp = dpp;
+ this->y = y;
+ }
void set_ofs(uint64_t ofs) { this->ofs = ofs; }
- int flush_last_part(optional_yield y);
+ int flush_last_part();
void bypass_cache_write() { this->write_to_cache = false; }
};
boost::system::error_code ec;
request req;
- req.push_range("HMGET", "testBucket_testName_0", fields);
+ req.push_range("HMGET", "testBucket_testName_0_0", fields);
req.push("FLUSHALL");
response< std::vector<std::string>,
{
boost::system::error_code ec;
request req;
- req.push("HSET", "testBucket_testName_0", "objName", "newoid");
+ req.push("HSET", "testBucket_testName_0_0", "objName", "newoid");
response<int> resp;
conn->async_exec(req, resp, yield[ec]);
boost::system::error_code ec;
request req;
- req.push("EXISTS", "copyBucketName_copyTestName_0");
- req.push_range("HMGET", "copyBucketName_copyTestName_0", fields);
+ req.push("EXISTS", "copyBucketName_copyTestName_0_0");
+ req.push_range("HMGET", "copyBucketName_copyTestName_0_0", fields);
req.push("FLUSHALL");
response<int, std::vector<std::string>,
{
boost::system::error_code ec;
request req;
- req.push("EXISTS", "testBucket_testName_0");
+ req.push("EXISTS", "testBucket_testName_0_0");
response<int> resp;
conn->async_exec(req, resp, yield[ec]);
boost::system::error_code ec;
request req;
- req.push("HMGET", "testBucket_testName_0", "objName", "blockHosts");
+ req.push("HMGET", "testBucket_testName_0_0", "objName", "blockHosts");
req.push("FLUSHALL");
response< std::vector<std::string>,
boost::redis::ignore_t> resp;
{
boost::system::error_code ec;
request req;
- req.push("HEXISTS", "testBucket_testName_0", "blockHosts");
- req.push("HGET", "testBucket_testName_0", "blockHosts");
+ req.push("HEXISTS", "testBucket_testName_0_0", "blockHosts");
+ req.push("HGET", "testBucket_testName_0_0", "blockHosts");
response<int, std::string> resp;
conn->async_exec(req, resp, yield[ec]);
bufferlist bl;
rgw::sal::Attrs attrs;
- std::string key = "testName";
};
TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
{
spawn::spawn(io, [this] (spawn::yield_context yield) {
+ std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield}));
-
- rgw::d4n::CacheBlock temp;
- temp.blockID = 0;
- temp.cacheObj.objName = "testName";
- temp.cacheObj.bucketName = "testBucket";
- std::string key = dir->build_index(&temp);
policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
/* Change cache age for testing purposes */
attrs.insert({"bucket_name", attrVal});
ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield}));
- ASSERT_EQ(0, cacheDriver->put(env->dpp, victim.cacheObj.objName, bl, bl.length(), attrs, optional_yield{io, yield}));
- std::string key = dir->build_index(&victim);
- policyDriver->get_cache_policy()->insert(env->dpp, key, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
+ std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size);
+ ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield}));
+ policyDriver->get_cache_policy()->insert(env->dpp, victimKey, 0, bl.length(), "", cacheDriver, optional_yield{io, yield});
/* Remote block */
block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */
cacheDriver->shutdown();
policyDriver->get_cache_policy()->shutdown();
+ std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
boost::system::error_code ec;
request req;
- req.push("EXISTS", "RedisCache/victimName");
- req.push("HGET", "testBucket_victimName_0", "globalWeight");
- req.push("HGET", "testBucket_testName_0", "globalWeight");
+ req.push("EXISTS", "RedisCache/" + victimKey);
+ req.push("HGET", victimKey, "globalWeight");
+ req.push("HGET", key, "globalWeight");
req.push("FLUSHALL");
response<int, std::string, std::string,
req.push("FLUSHALL");
response<boost::redis::ignore_t> resp;
- //response< std::vector<std::string>,
- // boost::redis::ignore_t > resp;
conn->async_exec(req, resp, yield[ec]);
- //ASSERT_EQ((bool)ec, false);
- //EXPECT_EQ(std::get<0>(resp).value(), vals);
conn->cancel();
});