From 89b55f95c462d0d71dfe8ef8751c9093af74d6a6 Mon Sep 17 00:00:00 2001 From: Pritha Srivastava Date: Wed, 22 Nov 2023 13:20:34 +0530 Subject: [PATCH] rgw/d4n: using 'id_tag' as 'version' of an object for non-versioned objects. Using 'instance' of 'oid' as 'version' for versioned objects. Signed-off-by: Pritha Srivastava --- src/rgw/driver/d4n/rgw_sal_d4n.cc | 203 ++++++++++++++++++++++-------- src/rgw/driver/d4n/rgw_sal_d4n.h | 16 ++- 2 files changed, 163 insertions(+), 56 deletions(-) diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 622d91dac02..edb5efcca68 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -421,6 +421,21 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix } } + //versioned objects have instance set to versionId, and get_oid() returns oid containing instance, hence using id tag as version for non versioned objects only + if (! this->source->have_instance()) { + RGWObjState* state = nullptr; + if (this->source->get_obj_state(dpp, &state, y) == 0) { + auto it = state->attrset.find(RGW_ATTR_ID_TAG); + if (it != state->attrset.end()) { + bufferlist bl = it->second; + this->source->set_object_version(bl.c_str()); + ldpp_dout(dpp, 20) << __func__ << "id tag version is: " << this->source->get_object_version() << dendl; + } else { + ldpp_dout(dpp, 20) << __func__ << "Failed to find id tag" << dendl; + } + } + } + return ret; } @@ -478,12 +493,20 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int RGWGetDataCB* cb, optional_yield y) { const uint64_t window_size = g_conf()->rgw_get_obj_window_size; - std::string oid = source->get_key().get_oid(); + std::string version = source->get_object_version(); + std::string prefix; + if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added + prefix = source->get_bucket()->get_name() + "_" + source->get_key().get_oid(); + } else { + prefix = source->get_bucket()->get_name() + "_" + version + "_" + source->get_key().get_oid(); + } - ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl; + ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "prefix: " << prefix << dendl; + ldpp_dout(dpp, 20) << "D3NFilterObject::iterate:: " << "oid: " << source->get_key().get_oid() << " ofs: " << ofs << " end: " << end << dendl; this->client_cb = cb; - this->cb->set_client_cb(cb, dpp, &y); // what's this for? -Sam + this->cb->set_client_cb(cb, dpp, &y); + this->cb->set_prefix(prefix); /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller @@ -525,7 +548,12 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int } ceph::bufferlist bl; - std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); + std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len); + if (version.empty()) { + version = source->get_instance(); + } + + ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl; ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; @@ -534,7 +562,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, "", y); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, version, y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; @@ -546,7 +574,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int return r; } } else { - oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size); + oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size); //for ranged requests, for last part, the whole part might exist in the cache ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl; @@ -555,7 +583,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int // Read From Cache auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); - source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, "", y); + source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y); ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl; @@ -604,11 +632,8 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int this->cb->bypass_cache_write(); } - if (start_part_num == 0) { - this->cb->set_ofs(ofs); - } else { - this->cb->set_ofs(adjusted_start_ofs); - ofs = adjusted_start_ofs; // redundant? -Sam + if (start_part_num != 0) { + ofs = adjusted_start_ofs; } this->cb->set_ofs(ofs); @@ -643,28 +668,58 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache if (write_to_cache) { const std::lock_guard l(d4n_get_data_lock); - rgw::d4n::CacheBlock block; + rgw::d4n::CacheBlock block, existing_block; rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir(); - block.version = ""; // TODO: initialize correctly block.hostsList.push_back(blockDir->cct->_conf->rgw_local_cache_address); block.cacheObj.objName = source->get_key().get_oid(); block.cacheObj.bucketName = source->get_bucket()->get_name(); block.cacheObj.creationTime = to_iso_8601(source->get_mtime()); block.cacheObj.dirty = false; + + //populating fields needed for building directory index + existing_block.cacheObj.objName = block.cacheObj.objName; + existing_block.cacheObj.bucketName = block.cacheObj.bucketName; Attrs attrs; // empty attrs for cache sets + std::string version = source->get_object_version(); + if (version.empty()) { + version = source->get_instance(); + } + ldpp_dout(dpp, 20) << __func__ << ": version stored in update method is: " << version << dendl; if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache - std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); - block.blockID = ofs; - block.size = bl.length(); - if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { - if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y); - - /* Store block in directory */ - if (!blockDir->exist_key(&block, *y)) { // If the block exists, do we want to update anything else? -Sam - if (blockDir->set(&block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); + if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached + block.blockID = ofs; + block.size = bl.length(); + block.version = version; + auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); + if (ret == 0) { + //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released? + ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs); + if (ret == 0) { + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y); + + /* Store block in directory */ + if (!blockDir->exist_key(&block, *y)) { + if (blockDir->set(&block, *y) < 0) //should we revert previous steps if this step fails? + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + existing_block.blockID = block.blockID; + existing_block.size = block.size; + if (blockDir->get(&existing_block, *y) < 0) { + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; + } else { + if (existing_block.version != block.version) { + if (blockDir->del(&existing_block, *y) < 0) //delete existing block + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; + if (blockDir->set(&block, *y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight? + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl; + } + } + } } else { if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; @@ -672,18 +727,38 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl } } } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache - std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); + std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len); ofs += bl_len; block.blockID = ofs; block.size = bl.length(); - if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { - if (filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs) == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), "", *y); - - /* Store block in directory */ - if (!blockDir->exist_key(&block, *y)) { - if (blockDir->set(&block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + block.version = version; + if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached + auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); + if (ret == 0) { + ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs); + if (ret == 0) { + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y); + + /* Store block in directory */ + if (!blockDir->exist_key(&block, *y)) { + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + existing_block.blockID = block.blockID; + existing_block.size = block.size; + if (blockDir->get(&existing_block, *y) < 0) { + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; + } + if (existing_block.version != block.version) { + if (blockDir->del(&existing_block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed for blockHosts." << dendl; + } + } } else { if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; @@ -699,31 +774,59 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl bl_rem.claim_append(bl_copy); if (bl_rem.length() == rgw_get_obj_max_req_size) { - std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length()); - ofs += bl_rem.length(); - block.blockID = ofs; - block.size = bl_rem.length(); - if (filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y) == 0) { - if (filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs) == 0) { - filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), "", *y); - - /* Store block in directory */ - if (!blockDir->exist_key(&block, *y)) { - if (blockDir->set(&block, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length()); + if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached + ofs += bl_rem.length(); + block.blockID = ofs; + block.size = bl_rem.length(); + block.version = version; + auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y); + if (ret == 0) { + ret = filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs); + if (ret == 0) { + filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y); + + /* Store block in directory */ + if (!blockDir->exist_key(&block, *y)) { + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + existing_block.blockID = block.blockID; + existing_block.size = block.size; + if (blockDir->get(&existing_block, *y) < 0) { + ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl; + } else { + if (existing_block.version != block.version) { + if (blockDir->del(&existing_block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory del method failed." << dendl; + if (blockDir->set(&block, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl; + } else { + if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) + ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; + } + } + } } else { - if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0) - ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl; - } + ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured while caching oid: " << oid << " error: " << ret << dendl; + } + } else { + ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured during eviction: " << " error: " << ret << dendl; } } bl_rem.clear(); bl_rem = std::move(bl); - } + }//bl_rem.length() } } + /* Clean-up: + 1. do we need to clean up older versions of the cache backend, when we update version in block directory? + 2. do we need to clean up keys belonging to older versions (the last blocks), in case the size of newer version is different + 3. do we need to revert the cache ops, in case the directory ops fail + */ + return 0; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index fc7f844ee7c..5f785fb65cb 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -97,6 +97,7 @@ class D4NFilterBucket : public FilterBucket { class D4NFilterObject : public FilterObject { private: D4NFilterDriver* driver; + std::string version; public: struct D4NFilterReadOp : FilterReadOp { @@ -104,7 +105,7 @@ class D4NFilterObject : public FilterObject { class D4NFilterGetCB: public RGWGetDataCB { private: D4NFilterDriver* filter; - std::string oid; + std::string prefix; D4NFilterObject* source; RGWGetDataCB* client_cb; uint64_t ofs = 0, len = 0; @@ -116,8 +117,8 @@ class D4NFilterObject : public FilterObject { optional_yield* y; public: - D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid, D4NFilterObject* _source) : filter(_filter), - oid(_oid), source(_source) {} + D4NFilterGetCB(D4NFilterDriver* _filter, D4NFilterObject* _source) : filter(_filter), + source(_source) {} int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; void set_client_cb(RGWGetDataCB* client_cb, const DoutPrefixProvider* dpp, optional_yield* y) { @@ -126,6 +127,7 @@ class D4NFilterObject : public FilterObject { this->y = y; } void set_ofs(uint64_t ofs) { this->ofs = ofs; } + void set_prefix(const std::string& prefix) { this->prefix = prefix; } int flush_last_part(); void bypass_cache_write() { this->write_to_cache = false; } }; @@ -135,9 +137,8 @@ class D4NFilterObject : public FilterObject { D4NFilterReadOp(std::unique_ptr _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)), source(_source) { - std::string oid = source->get_bucket()->get_marker() + "_" + source->get_key().get_oid(); - cb = std::make_unique(source->driver, oid, source); - } + cb = std::make_unique(source->driver, source); + } virtual ~D4NFilterReadOp() = default; virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override; @@ -202,6 +203,9 @@ class D4NFilterObject : public FilterObject { virtual std::unique_ptr get_read_op() override; virtual std::unique_ptr get_delete_op() override; + + void set_object_version(const std::string& version) { this->version = version; } + const std::string get_object_version() { return this->version; } }; class D4NFilterWriter : public FilterWriter { -- 2.39.5