From: Pritha Srivastava Date: Tue, 11 Jul 2023 15:32:20 +0000 (+0530) Subject: rgw/d4n: completing the read flow for a local read cache, X-Git-Tag: testing/wip-batrick-testing-20240411.154038~45^2~68 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=5052558ddfc4900f53a5374396db4793c7829685;p=ceph-ci.git rgw/d4n: completing the read flow for a local read cache, with ssd cache driver as the backend. Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 7676bf93ac6..4a2e0034e62 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -13,6 +13,8 @@ * */ +#include "rgw_redis_driver.h" +#include "rgw_ssd_driver.h" #include "rgw_sal_d4n.h" #define dout_subsys ceph_subsys_rgw @@ -36,6 +38,30 @@ static inline Object* nextObject(Object* t) return dynamic_cast(t)->get_next(); } +D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next) +{ + rgw::cache::Partition partition_info; + partition_info.location = g_conf()->rgw_d3n_l1_datacache_persistent_path; + partition_info.name = "d4n"; + partition_info.type = "read-cache"; + partition_info.size = g_conf()->rgw_d3n_l1_datacache_size; + + cacheDriver = new rgw::cache::SSDDriver(partition_info); + objDir = new rgw::d4n::ObjectDirectory(); + blockDir = new rgw::d4n::BlockDirectory(); + cacheBlock = new rgw::d4n::CacheBlock(); + policyDriver = new rgw::d4n::PolicyDriver("lfuda"); +} + + D4NFilterDriver::~D4NFilterDriver() + { + delete cacheDriver; + delete objDir; + delete blockDir; + delete cacheBlock; + delete policyDriver; +} + int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { FilterDriver::initialize(cct, dpp); @@ -438,6 +464,7 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw:: completed.pop_front_and_dispose(std::default_delete{}); } + ldpp_dout(dpp, 20) << "D4NFilterObject::returning from flush:: " << dendl; return 0; } @@ -462,45 +489,59 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache uint64_t cost = 0, len_to_read = 0, part_len = 0; - if (y) { - aio = rgw::make_throttle(window_size, y); + aio = rgw::make_throttle(window_size, y); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << - " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size << + " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl; - this->offset = ofs; + this->offset = ofs; - do { - uint64_t id = adjusted_start_ofs; - if (start_part_num == (num_parts - 1)) { - len_to_read = len; - part_len = len; - cost = len; - } else { - len_to_read = obj_max_req_size; - cost = obj_max_req_size; - part_len = obj_max_req_size; - } - if (start_part_num == 0) { - len_to_read -= diff_ofs; - id += diff_ofs; - } + do { + uint64_t id = adjusted_start_ofs; + if (start_part_num == (num_parts - 1)) { + len_to_read = len; + part_len = len; + cost = len; + } else { + len_to_read = obj_max_req_size; + cost = obj_max_req_size; + part_len = obj_max_req_size; + } + if (start_part_num == 0) { + len_to_read -= diff_ofs; + id += diff_ofs; + } + + uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk + ceph::bufferlist bl; + std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << + " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk - ceph::bufferlist bl; - rgw_raw_obj r_obj; - r_obj.oid = oid; + if (source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) { + // Read From Cache + auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << - oid << " length to read is: " << len_to_read << " part num: " << start_part_num << - " read_ofs: " << read_ofs << " part len: " << part_len << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; + + auto r = flush(dpp, std::move(completed)); + + if (r < 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; + return r; + } + } else { + oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size); + //for ranged requests, for last part, the whole part might exist in the cache + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << + " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - if (source->driver->get_cache_driver()->get(dpp, oid, ofs, part_len, bl, source->get_attrs()) == 0) { + if ((part_len != obj_max_req_size) && source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) { // Read From Cache - auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), - read_ofs, len_to_read, oid), cost, id); + auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid << dendl; + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; auto r = flush(dpp, std::move(completed)); @@ -508,50 +549,33 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; return r; } - } else { - //for ranged requests, for last part, the whole part might exist in the cache - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << - oid << " length to read is: " << len_to_read << " part num: " << start_part_num << - " read_ofs: " << read_ofs << " part len: " << part_len << dendl; - - if (source->driver->get_cache_driver()->get(dpp, oid, ofs, obj_max_req_size, bl, source->get_attrs()) == 0) { - // Read From Cache - auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(), - read_ofs, len_to_read, oid), cost, id); - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid << dendl; + } else { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; - auto r = flush(dpp, std::move(completed)); + auto r = drain(dpp); - if (r < 0) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl; - return r; - } - } else { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid << dendl; + if (r < 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; + return r; + } - auto r = drain(dpp); + break; + } + } - if (r < 0) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl; - return r; - } + if (start_part_num == (num_parts - 1)) { + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl; + return drain(dpp); + } else { + adjusted_start_ofs += obj_max_req_size; + } - break; - } - } + start_part_num += 1; + len -= obj_max_req_size; + } while (start_part_num < num_parts); - if (start_part_num == (num_parts - 1)) { - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid << dendl; - return drain(dpp); - } else { - adjusted_start_ofs += obj_max_req_size; - } - start_part_num += 1; - len -= obj_max_req_size; - } while (start_part_num < num_parts); - } ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl; @@ -716,11 +740,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl const std::lock_guard l(d3n_get_data.d3n_lock); if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache - filter->get_cache_driver()->put(save_dpp, this->oid, bl, bl.length(), source->get_attrs()); + std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); + filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()); } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache + std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); ofs += bl_len; - - filter->get_cache_driver()->put(save_dpp, this->oid, bl, bl.length(), source->get_attrs()); + filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs()); } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length(); uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space; @@ -730,9 +755,10 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl bl_rem.claim_append(bl_copy); if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) { + std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length()); ofs += bl_rem.length(); - filter->get_cache_driver()->put(save_dpp, this->oid, bl_rem, bl_rem.length(), source->get_attrs()); + filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs()); bl_rem.clear(); bl_rem = std::move(bl); diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 1a66d1a025d..d51d5ed0e9c 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -22,7 +22,6 @@ #include "common/dout.h" #include "rgw_aio_throttle.h" -#include "rgw_redis_driver.h" #include "driver/d4n/d4n_directory.h" #include "driver/d4n/d4n_policy.h" @@ -37,24 +36,9 @@ class D4NFilterDriver : public FilterDriver { rgw::d4n::PolicyDriver* policyDriver; public: - D4NFilterDriver(Driver* _next) : FilterDriver(_next) - { - rgw::cache::Partition partition_info; - partition_info.location = "RedisCache"; // figure out how to fill rest of partition information -Sam - - cacheDriver = new rgw::cache::RedisDriver(partition_info); // change later -Sam - objDir = new rgw::d4n::ObjectDirectory(); - blockDir = new rgw::d4n::BlockDirectory(); - cacheBlock = new rgw::d4n::CacheBlock(); - policyDriver = new rgw::d4n::PolicyDriver("lfuda"); - } - virtual ~D4NFilterDriver() { - delete cacheDriver; - delete objDir; - delete blockDir; - delete cacheBlock; - delete policyDriver; - } + D4NFilterDriver(Driver* _next); + + virtual ~D4NFilterDriver(); virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override; virtual std::unique_ptr get_user(const rgw_user& u) override; diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc index 8aa7466e328..5cd2ccf0d95 100644 --- a/src/rgw/rgw_ssd_driver.cc +++ b/src/rgw/rgw_ssd_driver.cc @@ -276,6 +276,15 @@ int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& ke return remove_entry(dpp, key); } +int SSDDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) +{ + std::string location = partition_info.location + key; + + //TODO - Implement append_data + + return 0; +} + int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location) { std::string location = cache_location + key; @@ -525,4 +534,9 @@ void SSDCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_ cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, SSDDriver::libaio_handler{aio, r})); } +void SSDCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) +{ + //TODO - implement cache_aio_write +} + } } // namespace rgw::cache diff --git a/src/rgw/rgw_ssd_driver.h b/src/rgw/rgw_ssd_driver.h index bc3efedab03..81a60442869 100644 --- a/src/rgw/rgw_ssd_driver.h +++ b/src/rgw/rgw_ssd_driver.h @@ -28,7 +28,7 @@ public: virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override; virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override; virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override; - virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0; + virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data); virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override; virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override; @@ -65,7 +65,7 @@ public: auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key, off_t read_ofs, off_t read_len, CompletionToken&& token); protected: - static std::unordered_map partitions; + inline static std::unordered_map partitions; std::unordered_map entries; Partition partition_info; uint64_t free_space;