aio->drain();
}
-int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp) {
+int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) {
auto c = aio->wait();
while (!c.empty()) {
- int r = flush(dpp, std::move(c));
+ int r = flush(dpp, std::move(c), y);
if (r < 0) {
cancel();
return r;
}
c = aio->wait();
}
- return flush(dpp, std::move(c));
+ return flush(dpp, std::move(c), y);
}
-int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
+int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y) {
int r = rgw::check_for_errors(results);
if (r < 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl;
bl_list.push_back(bl);
- offset += bl.length();
int r = client_cb->handle_data(bl, 0, bl.length());
if (r < 0) {
return r;
}
+ auto it = blocks_info.find(offset);
+ if (it != blocks_info.end()) {
+ std::string version = source->get_object_version();
+ std::string prefix = source->get_prefix();
+ if (version.empty()) {
+ version = source->get_instance();
+ }
+ std::pair<uint64_t, uint64_t> ofs_len_pair = it->second;
+ uint64_t ofs = ofs_len_pair.first;
+ uint64_t len = ofs_len_pair.second;
+ std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y);
+ blocks_info.erase(it);
+ } else {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
+ }
+
+ offset += bl.length();
completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
}
this->client_cb = cb;
this->cb->set_client_cb(cb, dpp, &y);
- this->cb->set_prefix(prefix);
+ source->set_prefix(prefix);
/* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
this->offset = ofs;
+ if (version.empty()) {
+ version = source->get_instance();
+ }
+
do {
uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
if (start_part_num == (num_parts - 1)) {
ceph::bufferlist bl;
std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
- if (version.empty()) {
- version = source->get_instance();
- }
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl;
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, version, y);
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-
- auto r = flush(dpp, std::move(completed));
+ auto r = flush(dpp, std::move(completed), y);
if (r < 0) {
- drain(dpp);
+ drain(dpp, y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
return r;
}
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y);
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-
- auto r = flush(dpp, std::move(completed));
+ auto r = flush(dpp, std::move(completed), y);
if (r < 0) {
- drain(dpp);
+ drain(dpp, y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
return r;
}
} else {
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- auto r = drain(dpp);
+ auto r = drain(dpp, y);
if (r < 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
if (start_part_num == (num_parts - 1)) {
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- return drain(dpp);
+ return drain(dpp, y);
} else {
adjusted_start_ofs += obj_max_req_size;
}
existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
Attrs attrs; // empty attrs for cache sets
std::string version = source->get_object_version();
+ std::string prefix = source->get_prefix();
if (version.empty()) {
version = source->get_instance();
}
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
//Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
- ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+ ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
- ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+ ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
block.version = version;
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
- ret = filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs);
+ ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y);