weightSum += ((localWeight < 0) ? 0 : localWeight);
}
-void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
+void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y)
{
using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Before acquiring lock, adding entry: " << key << dendl;
const std::lock_guard l(lfuda_cleaning_lock);
- LFUDAObjEntry* e = new LFUDAObjEntry{key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
+ LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
handle_type handle = object_heap.push(e);
e->set_handle(handle);
o_entries_map.emplace(key, e);
continue;
}
ldpp_dout(dpp, 10) <<__LINE__ << " " << __func__ << "(): e->key=" << e->key << dendl;
- ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->dirty=" << e->dirty << dendl;
+ ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->delete_marker=" << e->delete_marker << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->version=" << e->version << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_name=" << e->bucket_name << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->bucket_id=" << e->bucket_id << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->user=" << e->user << dendl;
ldpp_dout(dpp, 10) << __LINE__ << " " << __func__ << "(): e->obj_key=" << e->obj_key << dendl;
l.unlock();
- if (!e->key.empty() && (e->dirty == true) && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
+ if (!e->key.empty() && (std::difftime(time(NULL), e->creationTime) > interval)) { //if block is dirty and written more than interval seconds ago
rgw_user c_rgw_user = e->user;
//writing data to the backend
//we need to create an atomic_writer
} while(fst < lst);
cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
- //data is clean now, updating in-memory metadata for an object
- e->dirty = false;
+
//invoke update() with dirty flag set to false, to update in-memory metadata for head
this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
ldpp_dout(dpp, 20) << __func__ << "updating dirty flag in block directory for instance block failed!" << dendl;
}
}
+
+ //the next steps remove the entry from the ordered set and if needed the latest hash entry also in case of versioned buckets
+ if (!c_obj->have_instance()) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = c_obj->get_name(),
+ .bucketName = c_obj->get_bucket()->get_bucket_id(),
+ };
+ ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+ }
+ } else {
+ rgw::d4n::CacheBlock latest_block = block;
+ latest_block.cacheObj.objName = c_obj->get_name();
+ //add watch on latest entry, as it can be modified by a put or a del
+ ret = blockDir->watch(dpp, &latest_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ }
+ int retry = 3;
+ while(retry) {
+ retry--;
+ //get latest entry
+ ret = blockDir->get(dpp, &latest_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ }
+ //start redis transaction using MULTI
+ blockDir->multi(dpp, y);
+ if (latest_block.version == e->version) {
+ //remove object entry from ordered set
+ if (c_obj->have_instance()) {
+ blockDir->del(dpp, &latest_block, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ }
+ }
+ }
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = c_obj->get_name(),
+ .bucketName = c_obj->get_bucket()->get_bucket_id(),
+ };
+ ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
+ }
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (responses.empty()) {
+ ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
+ continue;
+ }
+ break;
+ }//end-while (retry)
+ }
//remove entry from map and queue, erase_dirty_object locks correctly
erase_dirty_object(dpp, e->key, null_yield);
} else { //end-if std::difftime(time(NULL), e->creationTime) > interval
entries_map.emplace(key, e);
}
-void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size, time_t creationTime, const rgw_user& user, std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
+void LRUPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
const rgw_obj_key& obj_key, optional_yield y)
{
const std::lock_guard l(lru_lock);
- ObjEntry* e = new ObjEntry(key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key);
+ ObjEntry* e = new ObjEntry(key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key);
o_entries_map.emplace(key, e);
return;
}
namespace rgw { namespace sal {
+static constexpr uint8_t OBJ_INSTANCE_LEN = 32;
+
static inline Bucket* nextBucket(Bucket* t)
{
if (!t)
if (dest_object->get_bucket()->versioned() && !dest_object->get_bucket()->versioning_enabled()) { //if versioning is suspended
dest_version = "null";
} else {
- constexpr uint32_t OBJ_INSTANCE_LEN = 32;
char buf[OBJ_INSTANCE_LEN + 1];
gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
dest_version = buf; //version for non-versioned objects, using gen_rand_alphanumeric_no_underscore for the time being
bufferlist bl;
driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), dest_version, dirty, y);
d4n_dest_object->set_object_version(dest_version);
- ret = d4n_dest_object->set_head_obj_dir_entry(dpp, y, true, dirty);
+ ret = d4n_dest_object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
return ret;
}
if (dirty) {
- driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, true, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), y);
+ driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, false, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), y);
}
}
}
return 0;
}
-/* This method maintains adds the following entries:
+/* This method creates a delete marker for dirty objects:
+1. creates a head block entry in cache driver - so that data can be restored from this when rgw goes down
+2. calls set_head_obj_dir_entry to set block entries for a delete marker */
+int D4NFilterObject::create_delete_marker(const DoutPrefixProvider* dpp, optional_yield y)
+{
+ this->delete_marker = true;
+ if (this->get_bucket()->versioned() && !this->get_bucket()->versioning_enabled()) { //if versioning is suspended
+ this->version = "null";
+ this->set_instance("null");
+ } else {
+ char buf[OBJ_INSTANCE_LEN + 1];
+ gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
+ this->version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
+ this->set_instance(version);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): generating delete marker: " << version << dendl;
+ }
+
+ auto m_time = real_clock::now();
+
+ this->set_mtime(m_time);
+ this->set_accounted_size(0); //setting 0 as this is a delete marker
+ this->set_obj_size(0); // setting 0 as this is a delete marker
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " size is: " << this->get_size() << dendl;
+ rgw::sal::Attrs attrs;
+ this->set_attrs_from_obj_state(dpp, y, attrs);
+ bufferlist bl_val;
+ bl_val.append(std::to_string(this->delete_marker));
+ attrs[RGW_CACHE_ATTR_DELETE_MARKER] = std::move(bl_val);
+ std::string key = get_cache_block_prefix(this, this->version, false);
+ std::string oid_in_cache = DIRTY_BLOCK_PREFIX + key;
+
+ bufferlist bl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): key is: " << key << dendl;
+ auto ret = driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
+ if (ret == 0) {
+ ret = driver->get_cache_driver()->put(dpp, oid_in_cache, bl, 0, attrs, y);
+ if (ret == 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): version stored in update method is: " << version << dendl;
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, true, y);
+ std::vector<std::string> exec_responses;
+ ret = this->set_head_obj_dir_entry(dpp, &exec_responses , y, true, true);
+ if (exec_responses.empty()) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Exec respones are empty, error occured!" << dendl;
+ driver->get_policy_driver()->get_cache_policy()->erase(dpp, key, y);
+ driver->get_cache_driver()->delete_data(dpp, oid_in_cache, y);
+ return -ERR_INTERNAL_ERROR;
+ }
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
+ return ret;
+ }
+ auto creationTime = ceph::real_clock::to_time_t(this->get_mtime());
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): key=" << key << dendl;
+ std::string objEtag = "";
+ driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, true, this->get_accounted_size(), creationTime, std::get<rgw_user>(this->get_bucket()->get_owner()), objEtag, this->get_bucket()->get_name(), this->get_bucket()->get_bucket_id(), this->get_key(), y);
+ } else { //if get_cache_driver()->put()
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): put failed for oid_in_cache, ret=" << ret << " oid_in_cache: " << oid_in_cache << dendl;
+ return ret;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): eviction failed for oid_in_cache, ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+/*This method maintains adds the following entries:
1. A hash entry that maintains the latest version for dirty objects (versioned and non-versioned) and non-versioned clean objects.
2. A "null" hash entry that maintains the same version as the latest hash entry - this is used when get/delete requests are received
- for "null" versions, when bucket is non-versioned.
-3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended
-4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified */
-int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, optional_yield y, bool is_latest_version, bool dirty)
+for "null" versions, when bucket is non-versioned.
+3. The "null" hash entry is overwritten when we have a "null" instance when bucket versioning is suspended.
+4. A versioned hash entry for every version for a version enabled bucket - this helps in get/delete requests with version-id specified
+5. Redis ordered set to maintain the order of dirty objects added for a version enabled bucket. Even when the bucket is non-versioned, this set maintains a "null" entry */
+int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::vector<std::string>* exec_responses, optional_yield y, bool is_latest_version, bool dirty)
{
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object name: " << this->get_name() << " bucket name: " << this->get_bucket()->get_name() << dendl;
- int ret = -1;
rgw::d4n::CacheBlock block;
rgw::d4n::BlockDirectory* blockDir = this->driver->get_block_dir();
if (is_latest_version) {
rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
.objName = objName,
.bucketName = this->get_bucket()->get_bucket_id(),
+ .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())),
.dirty = dirty,
.hostsList = { dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address },
};
block.blockID = 0;
block.version = this->get_object_version();
block.size = 0;
+ block.deleteMarker = this->delete_marker;
- rgw::d4n::CacheBlock latest = block;
- ret = blockDir->get(dpp, &latest, y);
- if (ret == -ENOENT) {
- /* adding an entry to maintain latest version, to serve simple get requests (without any version)
- but not for a clean object that belongs to a versioned bucket, as we will get the latest version from backend store
- to simplify delete object (maintaining correct order of versions) */
- if (dirty || (!dirty && !(this->get_bucket()->versioned()))) {
- ret = blockDir->set(dpp, &block, y);
- if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
- return ret;
- }
- /* bucket is non versioned, set a null instance
- even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly
- delete-obj with "null" as version-id deletes the latest version */
+ /* adding an entry to maintain latest version, to serve simple get requests (without any version)
+ but not for a clean object that belongs to a versioned bucket, as we will get the latest version from backend store
+ to simplify delete object (maintaining correct order of versions) */
+
+ //dirty objects
+ if (dirty) {
+ //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time.
+ blockDir->multi(dpp, y);
+ auto ret = blockDir->set(dpp, &block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ /* bucket is non versioned, set a null instance
+ even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly
+ delete-obj with "null" as version-id deletes the latest version */
+ if (!(this->get_bucket()->versioned())) {
block.cacheObj.objName = "_:null_" + this->get_name();
ret = blockDir->set(dpp, &block, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
+ blockDir->discard(dpp, y);
return ret;
}
}
- } else if (ret < 0) {
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl;
- } else { //head block is found
+ std::string object_version;
+ //add an entry to ordered set for both versioned and non versioned bucket
+ if (!this->get_bucket()->versioned() || !this->get_bucket()->versioning_enabled()) {
+ object_version = "null";
+ } else {
+ object_version = this->get_object_version();
+ }
+ auto mtime = this->get_mtime();
+ auto score = ceph::real_clock::to_time_t(mtime);
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Score of object name: "<< this->get_name() << " version: " << object_version << " is: " << std::setprecision(std::numeric_limits<double>::max_digits10) << score << ret << dendl;
+ rgw::d4n::ObjectDirectory* objDir = this->driver->get_obj_dir();
+ ret = objDir->zadd(dpp, &object, score, object_version, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to add object to ordered set with error: " << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ //execute redis transaction using EXEC
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl;
+ return ret;
+ }
+ if (exec_responses) {
+ *exec_responses = responses;
+ }
+ } else { //for clean/non-dirty objects
+ rgw::d4n::CacheBlock latest = block;
+ auto ret = blockDir->get(dpp, &latest, y);
+ if (ret == -ENOENT) {
+ if (!(this->get_bucket()->versioned())) {
+ //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time.
+ blockDir->multi(dpp, y);
+ //we can explore pipelining to send the two 'HSET' commands together
+ ret = blockDir->set(dpp, &block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ //bucket is non versioned, set a null instance
+ block.cacheObj.objName = "_:null_" + this->get_name();
+ ret = blockDir->set(dpp, &block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ //execute redis transaction using EXEC
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl;
+ return ret;
+ }
+ if (exec_responses) {
+ *exec_responses = responses;
+ }
+ }
+ } else if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed for head object with ret: " << ret << dendl;
+ } else { //head block is found
/* for clean objects belonging to versioned buckets we will fetch the latest entry from backend store, hence removing latest head entry
once a bucket transitions to a versioned state */
- if (!dirty && this->get_bucket()->versioned()) {
+ if (this->get_bucket()->versioned()) {
ret = blockDir->del(dpp, &block, y);
- if (ret < 0) {
+ //Ignore a racing delete that could have deleted the latest block
+ if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory del method failed for head object with ret: " << ret << dendl;
}
}
/* even if the head block is found, overwrite existing values with new version in case of non-versioned bucket, clean objects
and versioned and non-versioned buckets dirty objects */
- if (dirty || (!dirty && !(this->get_bucket()->versioned()))) {
+ if (!(this->get_bucket()->versioned())) {
+ //start redis transaction using MULTI, to ensure that both latest and null block are added at the same time.
+ blockDir->multi(dpp, y);
ret = blockDir->set(dpp, &block, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
+ blockDir->discard(dpp, y);
return ret;
}
- /* bucket is non versioned, set a null instance
- even when the bucket is non versioned, a get with "null" version-id returns the latest version, similarly
- delete-obj with "null" as version-id deletes the latest version */
+ //bucket is non versioned, set a null instance
block.cacheObj.objName = "_:null_" + this->get_name();
ret = blockDir->set(dpp, &block, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for null head object with ret: " << ret << dendl;
+ blockDir->discard(dpp, y);
return ret;
}
- }//end-if dirty || (!dirty && !(this->get_bucket()->versioned()))
- }
+ //execute redis transaction using EXEC
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory execute method failed for latest and null head object with ret: " << ret << dendl;
+ return ret;
+ }
+ if (exec_responses) {
+ *exec_responses = responses;
+ }
+ }//end-if !(this->get_bucket()->versioned())
+ } //end-if ret = 0
+ } //end-else
}//end-if latest-version
/* An entry corresponding to each instance will be needed to locate the head block
rgw::d4n::CacheObj version_object = rgw::d4n::CacheObj{
.objName = objName,
.bucketName = this->get_bucket()->get_bucket_id(),
+ .creationTime = std::to_string(ceph::real_clock::to_time_t(this->get_mtime())),
.dirty = dirty,
};
.size = 0,
};
- ret = blockDir->set(dpp, &version_block, y);
+ auto ret = blockDir->set(dpp, &version_block, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for versioned head object with ret: " << ret << dendl;
return ret;
}
}//end-if get_bucket_versioned()
- return ret;
+ return 0;
}
int D4NFilterObject::set_data_block_dir_entries(const DoutPrefixProvider* dpp, optional_yield y, std::string& version, bool dirty)
//if the block corresponding to head object does not exist in directory, implies it is not cached
if ((ret = blockDir->get(dpp, &block, y)) == 0) {
blk = block;
- if (block.deleteMarker) {
- return false;
- }
std::string version;
version = block.version;
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory get method failed, ret=" << ret << dendl;
}
+ if (block.deleteMarker) {
+ found_in_cache = false;
+ }
return found_in_cache;
}
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y);
- ret = set_head_obj_dir_entry(dpp, y, is_latest_version);
+ ret = set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
}
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
- ret = source->set_head_obj_dir_entry(dpp, y, is_latest_version);
+ ret = source->set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
}
int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
optional_yield y, uint32_t flags)
{
- // TODO:
- // 1. Send delete request to cache nodes with remote copies
- // 2. See if we can derive dirty flag from the head block
- // 3. Add lock so cleaning method doesn't remove "D_" prefix
+ // TODO: Send delete request to cache nodes with remote copies
rgw::sal::Attrs attrs;
std::string head_oid_in_cache;
rgw::d4n::CacheBlock block;
int ret = -1;
- // check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker.
- // As a result, the below check guarantees the head object is not in the cache.
+ /* check_head_exists_in_cache_get_oid also returns false if the head object is in the cache, but is a delete marker.
+ As a result, the below check guarantees the head object is not in the cache. */
if (!source->check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y) && !block.deleteMarker) {
+ /* for a dirty object, if the first call is a simple delete after versioning is enabled, the call will go to the backend store and create a dlete marker there
+ since no object with source->get_name() will be found in the cache (and this is correct) */
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): head object not found; calling next->delete_obj" << dendl;
next->params = params;
ret = next->delete_obj(dpp, y, flags);
return ret;
} else {
bool objDirty = block.cacheObj.dirty;
- auto blockDir = source->driver->get_block_dir();
+ auto blockDir = source->driver->get_block_dir();
+ auto objDir = source->driver->get_obj_dir();
std::string policy_prefix = head_oid_in_cache;
std::string version = source->get_object_version();
+ // call invalidate_object based on whether the object is dirty(objDirty)
+ //if the object is still dirty and has been marked invalid, then let objDirty be true, else set it to false
+ //remove code that deletes head/data block in this method.
+
+ std::string objName = source->get_name();
+ // special handling for name starting with '_'
+ if (objName[0] == '_') {
+ objName = "_" + source->get_name();
+ }
+
if (objDirty) { // head object dirty flag represents object dirty flag
policy_prefix.erase(0, 2); // remove "D_" prefix from policy key since the policy keys do not hold this information
}
-
- // Versioned buckets - this will delete the head object indexed by version-id (even null)
+ // Versioned buckets - this will delete the head object indexed by version-id (even null) and latest en
if (source->get_bucket()->versioned()) {
- //1. clean objects - no latest head entry as latest entry to be retrieved from backend now
- // hence delete only versioned head object
+ /* 1. clean objects - no latest head entry as latest entry to be retrieved from backend now
+ hence delete only versioned head object */
if (!objDirty) {
if (source->have_instance()) {
if ((ret = blockDir->del(dpp, &block, y)) < 0) {
return ret;
}
}
- // if versioning is suspended, we might have a latest head entry created from when bucket was non-versioned
- // don't return error as that could already be deleted by set_head_obj_dir_entry
+ /* if versioning is suspended, we might have a latest head entry created from when bucket was non-versioned
+ don't return error as that could already be deleted by set_head_obj_dir_entry */
if (!source->get_bucket()->versioning_enabled()) {
- block.cacheObj.objName = source->get_name();
+ block.cacheObj.objName = objName;
if ((ret = blockDir->del(dpp, &block, y)) < 0) {
ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
}
}
- } else if (objDirty) { //2. dirty objects - TBD for now
- if ((ret = blockDir->del(dpp, &block, y)) == 0) { // delete head object
- if (objDirty) {
- if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
- return ret;
- }
- }
- } else if (ret < 0 && ret != -ENOENT) {
- ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+ } else if (objDirty) { //2. dirty objects - 1. add delete marker for simple request 2. delete version if given and correctly promote latest version if needed
+ bool transaction_success = false;
+ //add watch on latest entry, as it can be modified by a put or another del
+ rgw::d4n::CacheBlock latest_block = block;
+ latest_block.cacheObj.objName = objName;
+ ret = blockDir->watch(dpp, &latest_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
return ret;
}
- }
+ int retry = 3;
+ while(retry) {
+ retry--;
+ //get latest entry
+ ret = blockDir->get(dpp, &latest_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ blockDir->unwatch(dpp, y);
+ return ret;
+ }
+ //simple delete request with no version id - create a delete marker
+ if (block.cacheObj.objName == objName) {
+ /* we are checking for latest_block and not block because latest_block has the most updated value of latest hash entry
+ if existing latest entry is already a delete marker, do not create a new one and simply return */
+ if (!latest_block.deleteMarker) {
+ ret = source->create_delete_marker(dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to create a delete marker for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+ //ERR_INTERNAL_ERROR is returned when exec_responses are empty which means the watched key has been modified, hence retry
+ if (ret == -ERR_INTERNAL_ERROR) {
+ continue;
+ } else {
+ blockDir->unwatch(dpp, y);
+ return ret;
+ }
+ }
+ if (ret >= 0) {
+ result.delete_marker = true;
+ result.version_id = source->get_instance();
+ transaction_success = true;
+ return 0;
+ }
+ }
+ //unwatch the key (latest entry), as it is already a delete marker and we won't do anything
+ ret = blockDir->unwatch(dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
+ return ret;
+ }
+ transaction_success = true;
+ return 0;
+ } else { //not a simple request, delete version requested
+ //get latest entry ret is 0
+ if (ret == 0) {
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = objName,
+ .bucketName = source->get_bucket()->get_bucket_id(),
+ };
+ bool startmulti = false;
+ //check if version to be deleted is the same as latest version
+ if (latest_block.version == block.version) {
+ std::vector<std::string> members;
+ //get the second latest version
+ ret = objDir->zrevrange(dpp, &dir_obj, 0, 1, members, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the second latest version for: " << dir_obj.objName << ", ret=" << ret << dendl;
+ blockDir->unwatch(dpp, y);
+ return ret;
+ }
+ //if there is a second latest version
+ if (members.size() == 2) {
+ rgw::d4n::CacheBlock version_block = latest_block;
+ version_block.cacheObj.objName = "_:" + members[1] + "_" + source->get_name();
+ //add watch on the second latest versioned entry also as it might be modified by another del
+ ret = blockDir->watch(dpp, &version_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to add a watch on: " << version_block.cacheObj.objName << ", ret=" << ret << dendl;
+ blockDir->unwatch(dpp, y);
+ return ret;
+ }
+ //get versioned entry
+ ret = blockDir->get(dpp, &version_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to get the versioned entry for: " << version_block.cacheObj.objName << ", ret=" << ret << dendl;
+ blockDir->unwatch(dpp, y);
+ return 0;
+ }
+ //start redis transaction using MULTI
+ blockDir->multi(dpp, y);
+ startmulti = true;
+ //set versioned entry as the latest entry
+ version_block.cacheObj.objName = latest_block.cacheObj.objName;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): INFO: promoting latest version entry to version: " << version_block.version << ", ret=" << ret << dendl;
+ ret = blockDir->set(dpp, &version_block, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to set new latest entry for: " << version_block.cacheObj.objName << ", ret=" << ret << dendl;
+ blockDir->discard(dpp, y);
+ return 0;
+ }
+ } else { // there are no more versions left
+ //start redis transaction using MULTI
+ blockDir->multi(dpp, y);
+ startmulti = true;
+ //delete latest block entry
+ ret = blockDir->del(dpp, &latest_block, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete latest entry in block directory, when it is the same as version requested, for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ }
+ } //end-if latest_block.version == block.version
+ //delete versioned entry (handles delete markers also)
+ if (!startmulti) {
+ //start redis transaction using MULTI
+ blockDir->multi(dpp, y);
+ }
+ if ((ret = blockDir->del(dpp, &block, y, true)) == 0) {
+ if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ } else if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ //delete entry from ordered set
+ std::string version = source->get_instance();
+ ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Version to be deleted is: " << version << dendl;
+ ret = objDir->zrem(dpp, &dir_obj, version, y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (responses.empty()) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
+ continue;
+ }
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl;
+ return ret;
+ }
+ result.delete_marker = block.deleteMarker;
+ result.version_id = version;
+ //success, hence break from loop
+ transaction_success = true;
+ break;
+ }
+ } //end-else (simple request)
+ } //end-while retry
+ if (!transaction_success) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Redis transaction failed after retrying! " << dendl;
+ return -ERR_INTERNAL_ERROR;
+ }
+ } //end-if objDirty
} //end-if versioned buckets
- //Non-versioned buckets - we will delete the latest entry and the "null" entry
+ /* Non-versioned buckets - we will delete the latest entry and the "null" entry
+ dirty objects - delete "null" entry from ordered set also */
if (!source->get_bucket()->versioned()) {
if ((ret = blockDir->del(dpp, &block, y)) == 0) {
if (objDirty) {
return ret;
}
//if we get request for latest head entry, delete the null block and vice versa
- if (block.cacheObj.objName == source->get_name()) {
+ if (block.cacheObj.objName == objName) {
block.cacheObj.objName = "_:null_" + source->get_name();
} else {
block.cacheObj.objName = source->get_name();
ldpp_dout(dpp, 0) << "Failed to delete head object in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
return ret;
}
+ //dirty objects - delete from ordered set
+ if (objDirty) {
+ rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
+ .objName = source->get_name(),
+ .bucketName = source->get_bucket()->get_bucket_id(),
+ };
+ ret = objDir->zrem(dpp, &dir_obj, "null", y, true);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to Queue zrem request in object directory for: " << source->get_name() << ", ret=" << ret << dendl;
+ blockDir->discard(dpp, y);
+ return ret;
+ }
+ }
+ std::vector<std::string> responses;
+ ret = blockDir->exec(dpp, responses, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): Failed to execute exec in block directory: " << "ret= " << ret << dendl;
+ return ret;
+ }
+ if (objDirty) {
+ if ((ret = delete_from_cache_and_policy(dpp, head_oid_in_cache, policy_prefix, y)) < 0) {
+ return ret;
+ }
+ }
} //end-if non-versioned buckets
int size;
}
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): Size of object is: " << size << dendl;
- // delete data blocks, when,
- // 1. object is clean, bucket is versioned and there is an instance in the request
- // 2. object is clean, bucket is non-versioned
- // 3. object is dirty - TBD
+ /* delete data blocks directory entries, when,
+ 1. object is clean, bucket is versioned and there is an instance in the request
+ 2. object is clean, bucket is non-versioned
+ 3. object is dirty - delete blocks in cache also except for delete markers */
if ((!objDirty && source->get_bucket()->versioned() && source->have_instance()) ||
(!objDirty && !source->get_bucket()->versioned()) ||
- objDirty) {
- off_t lst = size;
- off_t fst = 0;
+ (objDirty && !block.deleteMarker)) {
+ off_t lst = size;
+ off_t fst = 0;
- do { // loop through the data blocks
- std::string prefix = get_cache_block_prefix(source, version, false);
- if (fst >= lst) {
- break;
- }
- //data blocks have cacheObj.objName set to oid always
- block.cacheObj.objName = source->get_oid();
- off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
- off_t cur_len = cur_size - fst;
- block.blockID = static_cast<uint64_t>(fst);
- block.size = static_cast<uint64_t>(cur_len);
-
- if ((ret = blockDir->get(dpp, &block, y)) < 0) {
- if (ret == -ENOENT) {
- ldpp_dout(dpp, 0) << "Directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << " does not exist; continuing" << dendl;
- fst += cur_len;
- if (fst >= lst) {
- break;
+ do { // loop through the data blocks
+ std::string prefix = get_cache_block_prefix(source, version, false);
+ if (fst >= lst) {
+ break;
+ }
+ //data blocks have cacheObj.objName set to oid always
+ block.cacheObj.objName = source->get_oid();
+ off_t cur_size = std::min<off_t>(fst + dpp->get_cct()->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+ block.blockID = static_cast<uint64_t>(fst);
+ block.size = static_cast<uint64_t>(cur_len);
+
+ if ((ret = blockDir->get(dpp, &block, y)) < 0) {
+ if (ret == -ENOENT) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Directory entry for: " << source->get_oid() << " blockid: " << fst << " block size: " << cur_len << " does not exist; continuing" << dendl;
+ fst += cur_len;
+ if (fst >= lst) {
+ break;
+ }
+ continue;
+ } else {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Failed to retrieve directory entry for: " << source->get_oid() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl;
+ return ret;
}
- continue;
- } else {
- ldpp_dout(dpp, 10) << "Failed to retrieve directory entry for: " << source->get_name() << " blockid: " << fst << " block size: " << cur_len << ", ret=" << ret << dendl;
- return ret;
}
- }
if ((ret = blockDir->del(dpp, &block, y)) == 0) {
prefix = DIRTY_BLOCK_PREFIX + prefix;
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): calling next->prepare" << dendl;
return next->prepare(y);
} else {
- //for non-versioned buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted
+ //for non-versioned buckets or version suspended buckets, we need to delete the older dirty blocks of the object from the cache as dirty blocks do not get evicted
//alternatively, we could add logic to delete this lazily
- if (!object->get_bucket()->versioned()) {
+ if (!object->get_bucket()->versioned() || (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled())) {
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = object->get_delete_op();
+ if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) {
+ del_op->params.null_verid = true;
+ object->set_instance("null");
+ }
auto ret = del_op->delete_obj(dpp, y, rgw::sal::FLAG_LOG_OP);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): delete_obj failed, ret=" << ret << dendl;
}
+ object->clear_instance();
}
}
if (object->get_bucket()->versioned() && !object->get_bucket()->versioning_enabled()) { //if versioning is suspended
object->set_instance("null");
}
- constexpr uint32_t OBJ_INSTANCE_LEN = 32;
char buf[OBJ_INSTANCE_LEN + 1];
gen_rand_alphanumeric_no_underscore(dpp->get_cct(), buf, OBJ_INSTANCE_LEN);
version = buf; // using gen_rand_alphanumeric_no_underscore for the time being
dirty = true;
ceph::real_time m_time;
- dirty = true;
if (mtime) {
if (real_clock::is_zero(*mtime)) {
*mtime = real_clock::now();
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl;
driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
- ret = object->set_head_obj_dir_entry(dpp, y, true, dirty);
+ ret = object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
return ret;
}
if (dirty) {
auto creationTime = ceph::real_clock::to_time_t(object->get_mtime());
- ldpp_dout(dpp, 16) << "D4NFilterWriter::" << __func__ << "(): key=" << key << dendl;
- driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y);
+ ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): key=" << key << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): obj->get_key()=" << obj->get_key() << dendl;
+ driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, false, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y);
}
} else { //if get_cache_driver()->put()
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): put failed for head_oid_in_cache, ret=" << ret << dendl;
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterMultipartUpload::" << __func__ << " version stored in update method is: " << d4n_target_obj->get_object_version() << dendl;
driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
- ret = d4n_target_obj->set_head_obj_dir_entry(dpp, y, true);
+ ret = d4n_target_obj->set_head_obj_dir_entry(dpp, nullptr, y, true);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
}