From a360990d2ee2aa6f97b1c1ede540603b2b96b545 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Tue, 30 Jan 2024 15:34:42 +0530 Subject: [PATCH] rgw/d4n: modifications to the filter driver: 1. replaced put_async() with put() in handle_data() 2. moved calls to update() from iterate() to flush() Signed-off-by: Pritha Srivastava --- src/rgw/driver/d4n/rgw_sal_d4n.cc | 62 ++++++++++++++++++++----------- src/rgw/driver/d4n/rgw_sal_d4n.h | 11 ++++-- 2 files changed, 47 insertions(+), 26 deletions(-) diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 02123a26598a0..4c33b9011f224 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -460,20 +460,20 @@ void D4NFilterObject::D4NFilterReadOp::cancel() { aio->drain(); } -int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp) { +int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) { auto c = aio->wait(); while (!c.empty()) { - int r = flush(dpp, std::move(c)); + int r = flush(dpp, std::move(c), y); if (r < 0) { cancel(); return r; } c = aio->wait(); } - return flush(dpp, std::move(c)); + return flush(dpp, std::move(c), y); } -int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) { +int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y) { int r = rgw::check_for_errors(results); if (r < 0) { @@ -494,11 +494,29 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw:: ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl; bl_list.push_back(bl); - offset += bl.length(); int r = client_cb->handle_data(bl, 0, bl.length()); if (r < 0) { return r; } + auto it = blocks_info.find(offset); + if (it != blocks_info.end()) { + std::string version = source->get_object_version(); + std::string prefix = source->get_prefix(); + if (version.empty()) { + version = source->get_instance(); + } + std::pair ofs_len_pair = it->second; + uint64_t ofs = ofs_len_pair.first; + uint64_t len = ofs_len_pair.second; + std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len); + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl; + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y); + blocks_info.erase(it); + } else { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl; + } + + offset += bl.length(); completed.pop_front_and_dispose(std::default_delete{}); } @@ -523,7 +541,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->client_cb = cb; this->cb->set_client_cb(cb, dpp, &y); - this->cb->set_prefix(prefix); + source->set_prefix(prefix); /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller @@ -547,6 +565,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->offset = ofs; + if (version.empty()) { + version = source->get_instance(); + } + do { uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk if (start_part_num == (num_parts - 1)) { @@ -566,9 +588,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ceph::bufferlist bl; std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); - if (version.empty()) { - version = source->get_instance(); - } ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl; @@ -579,14 +598,13 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, version, y); + this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len))); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; - - auto r = flush(dpp, std::move(completed)); + auto r = flush(dpp, std::move(completed), y); if (r < 0) { - drain(dpp); + drain(dpp, y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; return r; } @@ -600,14 +618,13 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y); + this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size))); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; - - auto r = flush(dpp, std::move(completed)); + auto r = flush(dpp, std::move(completed), y); if (r < 0) { - drain(dpp); + drain(dpp, y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; return r; } @@ -615,7 +632,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int } else { ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - auto r = drain(dpp); + auto r = drain(dpp, y); if (r < 0) { ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; @@ -628,7 +645,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int if (start_part_num == (num_parts - 1)) { ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - return drain(dpp); + return drain(dpp, y); } else { adjusted_start_ofs += obj_max_req_size; } @@ -701,6 +718,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl existing_block.cacheObj.bucketName = block.cacheObj.bucketName; Attrs attrs; // empty attrs for cache sets std::string version = source->get_object_version(); + std::string prefix = source->get_prefix(); if (version.empty()) { version = source->get_instance(); } @@ -715,7 +733,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); if (ret == 0) { //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released? - ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs); + ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y); if (ret == 0) { filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y); @@ -755,7 +773,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs); + ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y); if (ret == 0) { filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y); @@ -802,7 +820,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl block.version = version; auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); if (ret == 0) { - ret = filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs); + ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y); if (ret == 0) { filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index e43c0105265ac..8c6cdca147057 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -101,6 +101,7 @@ class D4NFilterObject : public FilterObject { private: D4NFilterDriver* driver; std::string version; + std::string prefix; public: struct D4NFilterReadOp : FilterReadOp { @@ -108,7 +109,6 @@ class D4NFilterObject : public FilterObject { class D4NFilterGetCB: public RGWGetDataCB { private: D4NFilterDriver* filter; - std::string prefix; D4NFilterObject* source; RGWGetDataCB* client_cb; uint64_t ofs = 0, len = 0; @@ -130,7 +130,6 @@ class D4NFilterObject : public FilterObject { this->y = y; } void set_ofs(uint64_t ofs) { this->ofs = ofs; } - void set_prefix(const std::string& prefix) { this->prefix = prefix; } int flush_last_part(); void bypass_cache_write() { this->write_to_cache = false; } }; @@ -154,10 +153,11 @@ class D4NFilterObject : public FilterObject { std::unique_ptr aio; uint64_t offset = 0; // next offset to write to client rgw::AioResultList completed; // completed read results, sorted by offset + std::unordered_map> blocks_info; - int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results); + int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y); void cancel(); - int drain(const DoutPrefixProvider* dpp); + int drain(const DoutPrefixProvider* dpp, optional_yield y); }; struct D4NFilterDeleteOp : FilterDeleteOp { @@ -209,6 +209,9 @@ class D4NFilterObject : public FilterObject { void set_object_version(const std::string& version) { this->version = version; } const std::string get_object_version() { return this->version; } + + void set_prefix(const std::string& prefix) { this->prefix = prefix; } + const std::string get_prefix() { return this->prefix; } }; class D4NFilterWriter : public FilterWriter { -- 2.39.5