From: Pritha Srivastava Date: Wed, 25 Sep 2024 09:19:08 +0000 (+0530) Subject: rgw/d4n: support for dirty objects in delete object method. X-Git-Tag: testing/wip-rishabh-testing-20250426.123842-debug~14^2~15 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=8a7519d0c1b0b563863247f6e20812df6b6c037c;p=ceph-ci.git rgw/d4n: support for dirty objects in delete object method. 1. ordered set to maintain versions of a dirty object 2. creation of delete marker in case of a simple delete request 3. deletion of a specific version from the ordered set 4. cleaning method deletes from ordered set for dirty objects 5. use of redis atomicity constructs wherever needed Signed-off-by: Pritha Srivastava --- diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index feee1a84472..e7d481a29aa 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -332,7 +332,7 @@ int ObjectDirectory::update_field(const DoutPrefixProvider* dpp, CacheObj* objec } } -int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y) +int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi) { std::string key = build_index(object); try { @@ -348,9 +348,11 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl return -ec.value(); } - if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; + if (!multi) { + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } } } catch (std::exception &e) { @@ -409,11 +411,6 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, return -ec.value(); } - if (std::get<0>(resp).value().empty()) { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Empty response" << dendl; - return -EINVAL; - } - members = std::get<0>(resp).value(); } catch (std::exception &e) { @@ -424,7 +421,7 @@ int ObjectDirectory::zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, return 0; } -int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y) +int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi) { std::string key = build_index(object); try { @@ -440,9 +437,42 @@ int ObjectDirectory::zrem(const DoutPrefixProvider* dpp, CacheObj* object, const return -ec.value(); } - if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; + if (!multi) { + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + +int ObjectDirectory::zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi) +{ + std::string key = build_index(object); + try { + boost::system::error_code ec; + request req; + req.push("ZREMRANGEBYSCORE", key, std::to_string(min), std::to_string(max)); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (!multi) { + if (std::get<0>(resp).value() == "0") { + ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() No element removed!" << dendl; + return -EINVAL; + } } } catch (std::exception &e) { @@ -839,7 +869,7 @@ int BlockDirectory::remove_host(const DoutPrefixProvider* dpp, CacheBlock* block return 0; } -int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y) +int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi) { std::string key = build_index(block); try { @@ -854,10 +884,11 @@ int BlockDirectory::zadd(const DoutPrefixProvider* dpp, CacheBlock* block, doubl ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; return -ec.value(); } - - if (std::get<0>(resp).value() != "1") { - ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; - return -EINVAL; + if (!multi) { + if (std::get<0>(resp).value() != "1") { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response value is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } } } catch (std::exception &e) { @@ -1076,4 +1107,32 @@ int BlockDirectory::discard(const DoutPrefixProvider* dpp, optional_yield y) return 0; } +int BlockDirectory::unwatch(const DoutPrefixProvider* dpp, optional_yield y) +{ + try { + boost::system::error_code ec; + request req; + req.push("UNWATCH"); + response resp; + + redis_exec(conn, ec, req, resp, y); + + if (ec) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl; + return -ec.value(); + } + + if (std::get<0>(resp).value() != "OK") { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() Response is: " << std::get<0>(resp).value() << dendl; + return -EINVAL; + } + + } catch (std::exception &e) { + ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << e.what() << dendl; + return -EINVAL; + } + + return 0; +} + } } // namespace rgw::d4n diff --git a/src/rgw/driver/d4n/d4n_directory.h b/src/rgw/driver/d4n/d4n_directory.h index 1a262b38eb2..8319ba93587 100644 --- a/src/rgw/driver/d4n/d4n_directory.h +++ b/src/rgw/driver/d4n/d4n_directory.h @@ -48,10 +48,11 @@ class ObjectDirectory: public Directory { int copy(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& copyName, const std::string& copyBucketName, optional_yield y); int del(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); int update_field(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& field, std::string& value, optional_yield y); - int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y); + int zadd(const DoutPrefixProvider* dpp, CacheObj* object, double score, const std::string& member, optional_yield y, bool multi=false); int zrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); int zrevrange(const DoutPrefixProvider* dpp, CacheObj* object, int start, int stop, std::vector& members, optional_yield y); - int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y); + int zrem(const DoutPrefixProvider* dpp, CacheObj* object, const std::string& member, optional_yield y, bool multi=false); + int zremrangebyscore(const DoutPrefixProvider* dpp, CacheObj* object, double min, double max, optional_yield y, bool multi=false); //Return value is the incremented value, else return error int incr(const DoutPrefixProvider* dpp, CacheObj* object, optional_yield y); @@ -73,14 +74,16 @@ class BlockDirectory: public Directory { int del(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y, bool multi=false); int update_field(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& field, std::string& value, optional_yield y); int remove_host(const DoutPrefixProvider* dpp, CacheBlock* block, std::string& value, optional_yield y); - int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y); + int zadd(const DoutPrefixProvider* dpp, CacheBlock* block, double score, const std::string& member, optional_yield y, bool multi=false); int zrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); int zrevrange(const DoutPrefixProvider* dpp, CacheBlock* block, int start, int stop, std::vector& members, optional_yield y); int zrem(const DoutPrefixProvider* dpp, CacheBlock* block, const std::string& member, optional_yield y); int watch(const DoutPrefixProvider* dpp, CacheBlock* block, optional_yield y); + //Move MULTI, EXEC and DISCARD to directory? As they do not operate on a key int exec(const DoutPrefixProvider* dpp, std::vector& responses, optional_yield y); int multi(const DoutPrefixProvider* dpp, optional_yield y); int discard(const DoutPrefixProvider* dpp, optional_yield y); + int unwatch(const DoutPrefixProvider* dpp, optional_yield y); private: std::shared_ptr conn; diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 4fd195e398f..7a39a0ff5a8 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -404,12 +404,12 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, weightSum += ((localWeight < 0) ? 0 : localWeight); } -void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) +void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) { using handle_type = boost::heap::fibonacci_heap>>::handle_type; ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl; const std::lock_guard l(lfuda_cleaning_lock); - LFUDAObjEntry* e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key}; + LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key}; handle_type handle = object_heap.push(e); e->set_handle(handle); o_entries_map.emplace(key, e); @@ -467,14 +467,14 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) continue; } ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl; - ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl; + ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->delete_marker=" << e->delete_marker << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->version=" << e->version << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_name=" << e->bucket_name << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_id=" << e->bucket_id << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl; ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl; l.unlock(); - if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago + if (!e->key.empty() && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago rgw_user c_rgw_user = e->user; //writing data to the backend //we need to create an atomic_writer @@ -605,8 +605,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) } while(fst < lst); cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield); - //data is clean now, updating in-memory metadata for an object - e->dirty = false; + //invoke update() with dirty flag set to false, to update in-memory metadata for head this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y); @@ -642,6 +641,63 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp) ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl; } } + + //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets + if (!c_obj->have_instance()) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = c_obj->get_name(), + .bucketName = c_obj->get_bucket()->get_bucket_id(), + }; + ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl; + } + } else { + rgw::d4n::CacheBlock latest_block = block; + latest_block.cacheObj.objName = c_obj->get_name(); + //add watch on latest entry, as it can be modified by a put or a del + ret = blockDir->watch(dpp, &latest_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + } + int retry = 3; + while(retry) { + retry--; + //get latest entry + ret = blockDir->get(dpp, &latest_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + } + //start redis transaction using MULTI + blockDir->multi(dpp, y); + if (latest_block.version == e->version) { + //remove object entry from ordered set + if (c_obj->have_instance()) { + blockDir->del(dpp, &latest_block, y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + } + } + } + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits::max_digits10) << e->creationTime << " from ordered set" << dendl; + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = c_obj->get_name(), + .bucketName = c_obj->get_bucket()->get_bucket_id(), + }; + ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl; + } + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (responses.empty()) { + ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl; + continue; + } + break; + }//end-while (retry) + } //remove entry from map and queue, erase_dirty_object locks correctly erase_dirty_object(dpp, e->key, null_yield); } else { //end-if std::difftime(time(NULL), e->creationTime) > interval @@ -689,11 +745,11 @@ void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, ui entries_map.emplace(key, e); } -void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id, +void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) { const std::lock_guard l(lru_lock); - ObjEntry* e = new ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key); + ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key); o_entries_map.emplace(key, e); return; } diff --git a/src/rgw/driver/d4n/d4n_policy.h b/src/rgw/driver/d4n/d4n_policy.h index 8699e39a5a0..e4c826aafef 100644 --- a/src/rgw/driver/d4n/d4n_policy.h +++ b/src/rgw/driver/d4n/d4n_policy.h @@ -39,7 +39,7 @@ class CachePolicy { struct ObjEntry { std::string key; std::string version; - bool dirty; + bool delete_marker; uint64_t size; time_t creationTime; rgw_user user; @@ -48,9 +48,9 @@ class CachePolicy { std::string bucket_id; rgw_obj_key obj_key; ObjEntry() = default; - ObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, + ObjEntry(const std::string& key, const std::string& version, bool delete_marker, uint64_t size, time_t creationTime, rgw_user user, const std::string& etag, - const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : key(key), version(version), dirty(dirty), size(size), + const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : key(key), version(version), delete_marker(delete_marker), size(size), creationTime(creationTime), user(user), etag(etag), bucket_name(bucket_name), bucket_id(bucket_id), obj_key(obj_key) {} }; @@ -63,7 +63,7 @@ class CachePolicy { virtual int exist_key(std::string key) = 0; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0; virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0; - virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, + virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) = 0; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0; @@ -112,9 +112,9 @@ class LFUDAPolicy : public CachePolicy { using handle_type = boost::heap::fibonacci_heap>>::handle_type; handle_type handle; - LFUDAObjEntry(const std::string& key, const std::string& version, bool dirty, uint64_t size, - time_t creationTime, rgw_user user, const std::string& etag, - const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, dirty, size, + LFUDAObjEntry(const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, + time_t creationTime, rgw_user user, const std::string& etag, + const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key) {} void set_handle(handle_type handle_) { handle = handle_; } @@ -182,7 +182,7 @@ class LFUDAPolicy : public CachePolicy { virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; void save_y(optional_yield y) { this->y = y; } - virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, + virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) override; virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y); @@ -215,7 +215,7 @@ class LRUPolicy : public CachePolicy { virtual int exist_key(std::string key) override; virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override; virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override; - virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, + virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y) override; virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override; diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 2cb25ec6423..6626c4690e3 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -18,6 +18,8 @@ namespace rgw { namespace sal { +static constexpr uint8_t OBJ_INSTANCE_LEN = 32; + static inline Bucket* nextBucket(Bucket* t) { if (!t) @@ -233,7 +235,6 @@ int D4NFilterObject::copy_object(const ACLOwner& owner, if (dest_object->get_bucket()->versioned() && !dest_object->get_bucket()->versioning_enabled()) { //if versioning is suspended dest_version = "null"; } else { - constexpr uint32_t OBJ_INSTANCE_LEN = 32; char buf[OBJ_INSTANCE_LEN + 1]; gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN); dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being @@ -305,13 +306,13 @@ int D4NFilterObject::copy_object(const ACLOwner& owner, bufferlist bl; driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), dest_version, dirty, y); d4n_dest_object->set_object_version(dest_version); - ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, dirty); + ret = d4n_dest_object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; return ret; } if (dirty) { - driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, true, this->get_size(), creationTime, std::get(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), y); + driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, false, this->get_size(), creationTime, std::get(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), y); } } } @@ -519,16 +520,83 @@ int D4NFilterObject::calculate_version(const DoutPrefixProvider* dpp, optional_y return 0; } -/* This method maintains adds the following entries: +/* This method creates a delete marker for dirty objects: +1. creates a head block entry in cache driver - so that data can be restored from this when rgw goes down +2. calls set_head_obj_dir_entry to set block entries for a delete marker */ +int D4NFilterObject::create_delete_marker(const DoutPrefixProvider* dpp, optional_yield y) +{ + this->delete_marker = true; + if (this->get_bucket()->versioned() && !this->get_bucket()->versioning_enabled()) { //if versioning is suspended + this->version = "null"; + this->set_instance("null"); + } else { + char buf[OBJ_INSTANCE_LEN + 1]; + gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN); + this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being + this->set_instance(version); + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): generating delete marker: " << version << dendl; + } + + auto m_time = real_clock::now(); + + this->set_mtime(m_time); + this->set_accounted_size(0); //setting 0 as this is a delete marker + this->set_obj_size(0); // setting 0 as this is a delete marker + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " size is: " << this->get_size() << dendl; + rgw::sal::Attrs attrs; + this->set_attrs_from_obj_state(dpp, y, attrs); + bufferlist bl_val; + bl_val.append(std::to_string(this->delete_marker)); + attrs[RGW_CACHE_ATTR_DELETE_MARKER] = std::move(bl_val); + std::string key = get_cache_block_prefix(this, this->version, false); + std::string oid_in_cache = DIRTY_BLOCK_PREFIX + key; + + bufferlist bl; + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): key is: " << key << dendl; + auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y); + if (ret == 0) { + ret = driver->get_cache_driver()->put(dpp, oid_in_cache, bl, 0, attrs, y); + if (ret == 0) { + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): version stored in update method is: " << version << dendl; + driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, true, y); + std::vector exec_responses; + ret = this->set_head_obj_dir_entry(dpp, &exec_responses , y, true, true); + if (exec_responses.empty()) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Exec respones are empty, error occured!" << dendl; + driver->get_policy_driver()->get_cache_policy()->erase(dpp, key, y); + driver->get_cache_driver()->delete_data(dpp, oid_in_cache, y); + return -ERR_INTERNAL_ERROR; + } + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; + return ret; + } + auto creationTime = ceph::real_clock::to_time_t(this->get_mtime()); + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): key=" << key << dendl; + std::string objEtag = ""; + driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, true, this->get_accounted_size(), creationTime, std::get(this->get_bucket()->get_owner()), objEtag, this->get_bucket()->get_name(), this->get_bucket()->get_bucket_id(), this->get_key(), y); + } else { //if get_cache_driver()->put() + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): put failed for oid_in_cache, ret=" << ret << " oid_in_cache: " << oid_in_cache << dendl; + return ret; + } + } else { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): eviction failed for oid_in_cache, ret=" << ret << dendl; + return ret; + } + + return 0; +} + +/*This method maintains adds the following entries: 1. A hash entry that maintains the latest version for dirty objects (versioned and non-versioned) and non-versioned clean objects. 2. A "null" hash entry that maintains the same version as the latest hash entry - this is used when get/delete requests are received - for "null" versions, when bucket is non-versioned. -3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended -4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified */ -int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool is_latest_version, bool dirty) +for "null" versions, when bucket is non-versioned. +3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended. +4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified +5. Redis ordered set to maintain the order of dirty objects added for a version enabled bucket. Even when the bucket is non-versioned, this set maintains a "null" entry */ +int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::vector* exec_responses, optional_yield y, bool is_latest_version, bool dirty) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl; - int ret = -1; rgw::d4n::CacheBlock block; rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir(); if (is_latest_version) { @@ -541,6 +609,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio rgw::d4n::CacheObj object = rgw::d4n::CacheObj{ .objName = objName, .bucketName = this->get_bucket()->get_bucket_id(), + .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())), .dirty = dirty, .hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address }, }; @@ -549,59 +618,138 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio block.blockID = 0; block.version = this->get_object_version(); block.size = 0; + block.deleteMarker = this->delete_marker; - rgw::d4n::CacheBlock latest = block; - ret = blockDir->get(dpp, &latest, y); - if (ret == -ENOENT) { - /* adding an entry to maintain latest version, to serve simple get requests (without any version) - but not for a clean object that belongs to a versioned bucket, as we will get the latest version from backend store - to simplify delete object (maintaining correct order of versions) */ - if (dirty || (!dirty && !(this->get_bucket()->versioned()))) { - ret = blockDir->set(dpp, &block, y); - if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; - return ret; - } - /* bucket is non versioned, set a null instance - even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly - delete-obj with "null" as version-id deletes the latest version */ + /* adding an entry to maintain latest version, to serve simple get requests (without any version) + but not for a clean object that belongs to a versioned bucket, as we will get the latest version from backend store + to simplify delete object (maintaining correct order of versions) */ + + //dirty objects + if (dirty) { + //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time. + blockDir->multi(dpp, y); + auto ret = blockDir->set(dpp, &block, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + /* bucket is non versioned, set a null instance + even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly + delete-obj with "null" as version-id deletes the latest version */ + if (!(this->get_bucket()->versioned())) { block.cacheObj.objName = "_:null_" + this->get_name(); ret = blockDir->set(dpp, &block, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl; + blockDir->discard(dpp, y); return ret; } } - } else if (ret < 0) { - ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl; - } else { //head block is found + std::string object_version; + //add an entry to ordered set for both versioned and non versioned bucket + if (!this->get_bucket()->versioned() || !this->get_bucket()->versioning_enabled()) { + object_version = "null"; + } else { + object_version = this->get_object_version(); + } + auto mtime = this->get_mtime(); + auto score = ceph::real_clock::to_time_t(mtime); + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: " << std::setprecision(std::numeric_limits::max_digits10) << score << ret << dendl; + rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir(); + ret = objDir->zadd(dpp, &object, score, object_version, y, true); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + //execute redis transaction using EXEC + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl; + return ret; + } + if (exec_responses) { + *exec_responses = responses; + } + } else { //for clean/non-dirty objects + rgw::d4n::CacheBlock latest = block; + auto ret = blockDir->get(dpp, &latest, y); + if (ret == -ENOENT) { + if (!(this->get_bucket()->versioned())) { + //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time. + blockDir->multi(dpp, y); + //we can explore pipelining to send the two 'HSET' commands together + ret = blockDir->set(dpp, &block, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + //bucket is non versioned, set a null instance + block.cacheObj.objName = "_:null_" + this->get_name(); + ret = blockDir->set(dpp, &block, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + //execute redis transaction using EXEC + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl; + return ret; + } + if (exec_responses) { + *exec_responses = responses; + } + } + } else if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl; + } else { //head block is found /* for clean objects belonging to versioned buckets we will fetch the latest entry from backend store, hence removing latest head entry once a bucket transitions to a versioned state */ - if (!dirty && this->get_bucket()->versioned()) { + if (this->get_bucket()->versioned()) { ret = blockDir->del(dpp, &block, y); - if (ret < 0) { + //Ignore a racing delete that could have deleted the latest block + if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory del method failed for head object with ret: " << ret << dendl; } } /* even if the head block is found, overwrite existing values with new version in case of non-versioned bucket, clean objects and versioned and non-versioned buckets dirty objects */ - if (dirty || (!dirty && !(this->get_bucket()->versioned()))) { + if (!(this->get_bucket()->versioned())) { + //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time. + blockDir->multi(dpp, y); ret = blockDir->set(dpp, &block, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl; + blockDir->discard(dpp, y); return ret; } - /* bucket is non versioned, set a null instance - even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly - delete-obj with "null" as version-id deletes the latest version */ + //bucket is non versioned, set a null instance block.cacheObj.objName = "_:null_" + this->get_name(); ret = blockDir->set(dpp, &block, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl; + blockDir->discard(dpp, y); return ret; } - }//end-if dirty || (!dirty && !(this->get_bucket()->versioned())) - } + //execute redis transaction using EXEC + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (ret < 0) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl; + return ret; + } + if (exec_responses) { + *exec_responses = responses; + } + }//end-if !(this->get_bucket()->versioned()) + } //end-if ret = 0 + } //end-else }//end-if latest-version /* An entry corresponding to each instance will be needed to locate the head block @@ -616,6 +764,7 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio rgw::d4n::CacheObj version_object = rgw::d4n::CacheObj{ .objName = objName, .bucketName = this->get_bucket()->get_bucket_id(), + .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())), .dirty = dirty, }; @@ -628,14 +777,14 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optio .size = 0, }; - ret = blockDir->set(dpp, &version_block, y); + auto ret = blockDir->set(dpp, &version_block, y); if (ret < 0) { ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for versioned head object with ret: " << ret << dendl; return ret; } }//end-if get_bucket_versioned() - return ret; + return 0; } int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty) @@ -754,9 +903,6 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide //if the block corresponding to head object does not exist in directory, implies it is not cached if ((ret = blockDir->get(dpp, &block, y)) == 0) { blk = block; - if (block.deleteMarker) { - return false; - } std::string version; version = block.version; @@ -787,6 +933,9 @@ bool D4NFilterObject::check_head_exists_in_cache_get_oid(const DoutPrefixProvide ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed, ret=" << ret << dendl; } + if (block.deleteMarker) { + found_in_cache = false; + } return found_in_cache; } @@ -848,7 +997,7 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl; this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y); - ret = set_head_obj_dir_entry(dpp, y, is_latest_version); + ret = set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; } @@ -1022,7 +1171,7 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl; source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y); - ret = source->set_head_obj_dir_entry(dpp, y, is_latest_version); + ret = source->set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; } @@ -1867,19 +2016,18 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_from_cache_and_policy(const DoutP int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags) { - // TODO: - // 1. Send delete request to cache nodes with remote copies - // 2. See if we can derive dirty flag from the head block - // 3. Add lock so cleaning method doesn't remove "D_" prefix + // TODO: Send delete request to cache nodes with remote copies rgw::sal::Attrs attrs; std::string head_oid_in_cache; rgw::d4n::CacheBlock block; int ret = -1; - // check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker. - // As a result, the below check guarantees the head object is not in the cache. + /* check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker. + As a result, the below check guarantees the head object is not in the cache. */ if (!source->check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y) && !block.deleteMarker) { + /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a dlete marker there + since no object with source->get_name() will be found in the cache (and this is correct) */ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): head object not found; calling next->delete_obj" << dendl; next->params = params; ret = next->delete_obj(dpp, y, flags); @@ -1887,18 +2035,28 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp return ret; } else { bool objDirty = block.cacheObj.dirty; - auto blockDir = source->driver->get_block_dir(); + auto blockDir = source->driver->get_block_dir(); + auto objDir = source->driver->get_obj_dir(); std::string policy_prefix = head_oid_in_cache; std::string version = source->get_object_version(); + // call invalidate_object based on whether the object is dirty(objDirty) + //if the object is still dirty and has been marked invalid, then let objDirty be true, else set it to false + //remove code that deletes head/data block in this method. + + std::string objName = source->get_name(); + // special handling for name starting with '_' + if (objName[0] == '_') { + objName = "_" + source->get_name(); + } + if (objDirty) { // head object dirty flag represents object dirty flag policy_prefix.erase(0, 2); // remove "D_" prefix from policy key since the policy keys do not hold this information } - - // Versioned buckets - this will delete the head object indexed by version-id (even null) + // Versioned buckets - this will delete the head object indexed by version-id (even null) and latest en if (source->get_bucket()->versioned()) { - //1. clean objects - no latest head entry as latest entry to be retrieved from backend now - // hence delete only versioned head object + /* 1. clean objects - no latest head entry as latest entry to be retrieved from backend now + hence delete only versioned head object */ if (!objDirty) { if (source->have_instance()) { if ((ret = blockDir->del(dpp, &block, y)) < 0) { @@ -1906,29 +2064,177 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp return ret; } } - // if versioning is suspended, we might have a latest head entry created from when bucket was non-versioned - // don't return error as that could already be deleted by set_head_obj_dir_entry + /* if versioning is suspended, we might have a latest head entry created from when bucket was non-versioned + don't return error as that could already be deleted by set_head_obj_dir_entry */ if (!source->get_bucket()->versioning_enabled()) { - block.cacheObj.objName = source->get_name(); + block.cacheObj.objName = objName; if ((ret = blockDir->del(dpp, &block, y)) < 0) { ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; } } - } else if (objDirty) { //2. dirty objects - TBD for now - if ((ret = blockDir->del(dpp, &block, y)) == 0) { // delete head object - if (objDirty) { - if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) { - return ret; - } - } - } else if (ret < 0 && ret != -ENOENT) { - ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + } else if (objDirty) { //2. dirty objects - 1. add delete marker for simple request 2. delete version if given and correctly promote latest version if needed + bool transaction_success = false; + //add watch on latest entry, as it can be modified by a put or another del + rgw::d4n::CacheBlock latest_block = block; + latest_block.cacheObj.objName = objName; + ret = blockDir->watch(dpp, &latest_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; return ret; } - } + int retry = 3; + while(retry) { + retry--; + //get latest entry + ret = blockDir->get(dpp, &latest_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->unwatch(dpp, y); + return ret; + } + //simple delete request with no version id - create a delete marker + if (block.cacheObj.objName == objName) { + /* we are checking for latest_block and not block because latest_block has the most updated value of latest hash entry + if existing latest entry is already a delete marker, do not create a new one and simply return */ + if (!latest_block.deleteMarker) { + ret = source->create_delete_marker(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to create a delete marker for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + //ERR_INTERNAL_ERROR is returned when exec_responses are empty which means the watched key has been modified, hence retry + if (ret == -ERR_INTERNAL_ERROR) { + continue; + } else { + blockDir->unwatch(dpp, y); + return ret; + } + } + if (ret >= 0) { + result.delete_marker = true; + result.version_id = source->get_instance(); + transaction_success = true; + return 0; + } + } + //unwatch the key (latest entry), as it is already a delete marker and we won't do anything + ret = blockDir->unwatch(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl; + return ret; + } + transaction_success = true; + return 0; + } else { //not a simple request, delete version requested + //get latest entry ret is 0 + if (ret == 0) { + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = objName, + .bucketName = source->get_bucket()->get_bucket_id(), + }; + bool startmulti = false; + //check if version to be deleted is the same as latest version + if (latest_block.version == block.version) { + std::vector members; + //get the second latest version + ret = objDir->zrevrange(dpp, &dir_obj, 0, 1, members, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the second latest version for: " << dir_obj.objName << ", ret=" << ret << dendl; + blockDir->unwatch(dpp, y); + return ret; + } + //if there is a second latest version + if (members.size() == 2) { + rgw::d4n::CacheBlock version_block = latest_block; + version_block.cacheObj.objName = "_:" + members[1] + "_" + source->get_name(); + //add watch on the second latest versioned entry also as it might be modified by another del + ret = blockDir->watch(dpp, &version_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << version_block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->unwatch(dpp, y); + return ret; + } + //get versioned entry + ret = blockDir->get(dpp, &version_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the versioned entry for: " << version_block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->unwatch(dpp, y); + return 0; + } + //start redis transaction using MULTI + blockDir->multi(dpp, y); + startmulti = true; + //set versioned entry as the latest entry + version_block.cacheObj.objName = latest_block.cacheObj.objName; + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): INFO: promoting latest version entry to version: " << version_block.version << ", ret=" << ret << dendl; + ret = blockDir->set(dpp, &version_block, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to set new latest entry for: " << version_block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); + return 0; + } + } else { // there are no more versions left + //start redis transaction using MULTI + blockDir->multi(dpp, y); + startmulti = true; + //delete latest block entry + ret = blockDir->del(dpp, &latest_block, y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete latest entry in block directory, when it is the same as version requested, for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + } + } //end-if latest_block.version == block.version + //delete versioned entry (handles delete markers also) + if (!startmulti) { + //start redis transaction using MULTI + blockDir->multi(dpp, y); + } + if ((ret = blockDir->del(dpp, &block, y, true)) == 0) { + if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) { + blockDir->discard(dpp, y); + return ret; + } + } else if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + //delete entry from ordered set + std::string version = source->get_instance(); + ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl; + ret = objDir->zrem(dpp, &dir_obj, version, y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (responses.empty()) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Execute responses are empty hence continuing!" << dendl; + continue; + } + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl; + return ret; + } + result.delete_marker = block.deleteMarker; + result.version_id = version; + //success, hence break from loop + transaction_success = true; + break; + } + } //end-else (simple request) + } //end-while retry + if (!transaction_success) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Redis transaction failed after retrying! " << dendl; + return -ERR_INTERNAL_ERROR; + } + } //end-if objDirty } //end-if versioned buckets - //Non-versioned buckets - we will delete the latest entry and the "null" entry + /* Non-versioned buckets - we will delete the latest entry and the "null" entry + dirty objects - delete "null" entry from ordered set also */ if (!source->get_bucket()->versioned()) { if ((ret = blockDir->del(dpp, &block, y)) == 0) { if (objDirty) { @@ -1941,7 +2247,7 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp return ret; } //if we get request for latest head entry, delete the null block and vice versa - if (block.cacheObj.objName == source->get_name()) { + if (block.cacheObj.objName == objName) { block.cacheObj.objName = "_:null_" + source->get_name(); } else { block.cacheObj.objName = source->get_name(); @@ -1950,6 +2256,30 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl; return ret; } + //dirty objects - delete from ordered set + if (objDirty) { + rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{ + .objName = source->get_name(), + .bucketName = source->get_bucket()->get_bucket_id(), + }; + ret = objDir->zrem(dpp, &dir_obj, "null", y, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl; + blockDir->discard(dpp, y); + return ret; + } + } + std::vector responses; + ret = blockDir->exec(dpp, responses, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl; + return ret; + } + if (objDirty) { + if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) { + return ret; + } + } } //end-if non-versioned buckets int size; @@ -1968,41 +2298,41 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp } ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Size of object is: " << size << dendl; - // delete data blocks, when, - // 1. object is clean, bucket is versioned and there is an instance in the request - // 2. object is clean, bucket is non-versioned - // 3. object is dirty - TBD + /* delete data blocks directory entries, when, + 1. object is clean, bucket is versioned and there is an instance in the request + 2. object is clean, bucket is non-versioned + 3. object is dirty - delete blocks in cache also except for delete markers */ if ((!objDirty && source->get_bucket()->versioned() && source->have_instance()) || (!objDirty && !source->get_bucket()->versioned()) || - objDirty) { - off_t lst = size; - off_t fst = 0; + (objDirty && !block.deleteMarker)) { + off_t lst = size; + off_t fst = 0; - do { // loop through the data blocks - std::string prefix = get_cache_block_prefix(source, version, false); - if (fst >= lst) { - break; - } - //data blocks have cacheObj.objName set to oid always - block.cacheObj.objName = source->get_oid(); - off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); - off_t cur_len = cur_size - fst; - block.blockID = static_cast(fst); - block.size = static_cast(cur_len); - - if ((ret = blockDir->get(dpp, &block, y)) < 0) { - if (ret == -ENOENT) { - ldpp_dout(dpp, 0) << "Directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << " does not exist; continuing" << dendl; - fst += cur_len; - if (fst >= lst) { - break; + do { // loop through the data blocks + std::string prefix = get_cache_block_prefix(source, version, false); + if (fst >= lst) { + break; + } + //data blocks have cacheObj.objName set to oid always + block.cacheObj.objName = source->get_oid(); + off_t cur_size = std::min(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst); + off_t cur_len = cur_size - fst; + block.blockID = static_cast(fst); + block.size = static_cast(cur_len); + + if ((ret = blockDir->get(dpp, &block, y)) < 0) { + if (ret == -ENOENT) { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Directory entry for: " << source->get_oid() << " blockid: " << fst << " block size: " << cur_len << " does not exist; continuing" << dendl; + fst += cur_len; + if (fst >= lst) { + break; + } + continue; + } else { + ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to retrieve directory entry for: " << source->get_oid() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl; + return ret; } - continue; - } else { - ldpp_dout(dpp, 10) << "Failed to retrieve directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl; - return ret; } - } if ((ret = blockDir->del(dpp, &block, y)) == 0) { prefix = DIRTY_BLOCK_PREFIX + prefix; @@ -2052,14 +2382,19 @@ int D4NFilterWriter::prepare(optional_yield y) ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling next->prepare" << dendl; return next->prepare(y); } else { - //for non-versioned buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted + //for non-versioned buckets or version suspended buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted //alternatively, we could add logic to delete this lazily - if (!object->get_bucket()->versioned()) { + if (!object->get_bucket()->versioned() || (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled())) { std::unique_ptr del_op = object->get_delete_op(); + if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { + del_op->params.null_verid = true; + object->set_instance("null"); + } auto ret = del_op->delete_obj(dpp, y, rgw::sal::FLAG_LOG_OP); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): delete_obj failed, ret=" << ret << dendl; } + object->clear_instance(); } } @@ -2068,7 +2403,6 @@ int D4NFilterWriter::prepare(optional_yield y) if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended object->set_instance("null"); } - constexpr uint32_t OBJ_INSTANCE_LEN = 32; char buf[OBJ_INSTANCE_LEN + 1]; gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN); version = buf; // using gen_rand_alphanumeric_no_underscore for the time being @@ -2205,7 +2539,6 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, dirty = true; ceph::real_time m_time; - dirty = true; if (mtime) { if (real_clock::is_zero(*mtime)) { *mtime = real_clock::now(); @@ -2266,15 +2599,16 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag, if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl; driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y); - ret = object->set_head_obj_dir_entry(dpp, y, true, dirty); + ret = object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; return ret; } if (dirty) { auto creationTime = ceph::real_clock::to_time_t(object->get_mtime()); - ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): key=" << key << dendl; - driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, dirty, accounted_size, creationTime, std::get(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y); + ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): key=" << key << dendl; + ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): obj->get_key()=" << obj->get_key() << dendl; + driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, false, accounted_size, creationTime, std::get(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y); } } else { //if get_cache_driver()->put() ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): put failed for head_oid_in_cache, ret=" << ret << dendl; @@ -2331,7 +2665,7 @@ int D4NFilterMultipartUpload::complete(const DoutPrefixProvider *dpp, if (ret == 0) { ldpp_dout(dpp, 20) << "D4NFilterMultipartUpload::" << __func__ << " version stored in update method is: " << d4n_target_obj->get_object_version() << dendl; driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y); - ret = d4n_target_obj->set_head_obj_dir_entry(dpp, y, true); + ret = d4n_target_obj->set_head_obj_dir_entry(dpp, nullptr, y, true); if (ret < 0) { ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl; } diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index a135aef3151..85be4675cf4 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -131,6 +131,7 @@ class D4NFilterObject : public FilterObject { rgw::sal::Object* dest_object{nullptr}; //for copy-object rgw::sal::Bucket* dest_bucket{nullptr}; //for copy-object bool multipart{false}; + bool delete_marker{false}; public: struct D4NFilterReadOp : FilterReadOp { @@ -265,7 +266,7 @@ class D4NFilterObject : public FilterObject { int get_obj_attrs_from_cache(const DoutPrefixProvider* dpp, optional_yield y); void set_attrs_from_obj_state(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs); int calculate_version(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, rgw::sal::Attrs& attrs); - int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool is_latest_version = true, bool dirty = false); + int set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::vector* exec_responses, optional_yield y, bool is_latest_version = true, bool dirty = false); int set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty = false); int delete_data_block_cache_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty = false); bool check_head_exists_in_cache_get_oid(const DoutPrefixProvider* dpp, std::string& head_oid_in_cache, rgw::sal::Attrs& attrs, rgw::d4n::CacheBlock& blk, optional_yield y); @@ -273,6 +274,8 @@ class D4NFilterObject : public FilterObject { rgw::sal::Object* get_destination_object(const DoutPrefixProvider* dpp) { return dest_object; } bool is_multipart() { return multipart; } int set_attr_crypt_parts(const DoutPrefixProvider* dpp, optional_yield y, rgw::sal::Attrs& attrs); + int create_delete_marker(const DoutPrefixProvider* dpp, optional_yield y); + bool is_delete_marker() { return delete_marker; } }; class D4NFilterWriter : public FilterWriter { diff --git a/src/rgw/rgw_cache_driver.h b/src/rgw/rgw_cache_driver.h index c3055be71a8..6d416bc20d9 100644 --- a/src/rgw/rgw_cache_driver.h +++ b/src/rgw/rgw_cache_driver.h @@ -13,6 +13,7 @@ constexpr char RGW_CACHE_ATTR_BUCKET_NAME[] = "user.rgw.bucket_name"; constexpr char RGW_CACHE_ATTR_VERSION_ID[] = "user.rgw.version_id"; constexpr char RGW_CACHE_ATTR_SOURC_ZONE[] = "user.rgw.source_zone"; constexpr char RGW_CACHE_ATTR_LOCAL_WEIGHT[] = "user.rgw.localWeight"; +constexpr char RGW_CACHE_ATTR_DELETE_MARKER[] = "user.rgw.deleteMarker"; constexpr char DIRTY_BLOCK_PREFIX[] = "D#"; constexpr char CACHE_DELIM = '#'; diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc index 01b913307eb..095925091dd 100644 --- a/src/rgw/rgw_ssd_driver.cc +++ b/src/rgw/rgw_ssd_driver.cc @@ -287,6 +287,7 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC time_t creationTime = time_t(nullptr); rgw_user user; rgw_obj_key obj_key; + bool deleteMarker = false; if (attrs.find(RGW_ATTR_ETAG) != attrs.end()) { etag = attrs[RGW_ATTR_ETAG].to_str(); ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): etag: " << etag << dendl; @@ -333,8 +334,14 @@ int SSDDriver::restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataC ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): localWeightStr: " << localWeightStr << dendl; } + if (attrs.find(RGW_CACHE_ATTR_DELETE_MARKER) != attrs.end()) { + std::string deleteMarkerStr = attrs[RGW_CACHE_ATTR_LOCAL_WEIGHT].to_str(); + deleteMarker = (deleteMarkerStr == "1") ? true : false; + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): deleteMarker: " << deleteMarker << dendl; + } + ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): calling func for: " << key << dendl; - obj_func(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield); + obj_func(dpp, key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, null_yield); block_func(dpp, key, offset, len, version, dirty, null_yield, localWeightStr); parsed = true; } //end-if part.size() == 2 diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index 310071db5bf..c75c0f0610b 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -707,12 +707,20 @@ TEST_F(BlockDirectoryFixture, MultiExecuteYield) ASSERT_EQ((bool)ec, false); std::cout << "SET value: " << std::get<0>(resp).value() << std::endl; } + { + request req; + response resp; + req.push("ZADD", "key4", "1", "v1"); // Command 3 + conn->async_exec(req, resp, yield[ec]); + ASSERT_EQ((bool)ec, false); + std::cout << "ZADD value: " << std::get<0>(resp).value() << std::endl; + } { request req; //string as response here as the command is only getting queued, not executed //if response type is changed to int then the operation fails response resp; - req.push("DEL", "key3"); // Command 3 + req.push("DEL", "key3"); // Command 4 conn->async_exec(req, resp, yield[ec]); ASSERT_EQ((bool)ec, false); std::cout << "DEL value: " << std::get<0>(resp).value() << std::endl; @@ -738,6 +746,8 @@ TEST_F(BlockDirectoryFixture, MultiExecuteYield) ASSERT_EQ(0, dir->set(env->dpp, block, yield)); block->cacheObj.objName = "testBlockA"; ASSERT_EQ(0, dir->del(env->dpp, block, yield, true)); + block->cacheObj.objName = "testBlockB"; + ASSERT_EQ(0, dir->zadd(env->dpp, block, 100, "version1", yield, true)); std::vector responses; ASSERT_EQ(0, dir->exec(env->dpp, responses, optional_yield{yield})); for (auto r : responses) {