filter driver and ssd backed cache.
1. rgw/d4n: addressing concurrency issues by adding a refcount
to each block.
Blocks with positive refcounts are pinned (similar to dirty blocks)
and not eligible for eviction or deletion. Updating unit tests also.
2. rgw/cache: addressing concurrency issues while directories creation,
deletion, updating xattrs, get_attr and put.
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool dirty, uint64_t size,
time_t creationTime, const rgw_user user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val) {
- update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, y, restore_val);
+ update_dirty_object(dpp, key, version, dirty, size, creationTime, user, etag, bucket_name, bucket_id, obj_key, RefCount::NOOP, y, restore_val);
};
static auto block_callback = [this](
const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val) {
- update(dpp, key, offset, len, version, dirty, y, restore_val);
+ update(dpp, key, offset, len, version, dirty, RefCount::NOOP, y, restore_val);
};
cacheDriver->restore_blocks_objects(dpp, obj_callback, block_callback);
ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Failed to set xattr, ret=" << ret << dendl;
return false;
}
-
return true;
} else if (p->second.second == State::IN_PROGRESS) {
state_cond.wait(l, [this, &key]{ return (o_entries_map.find(key) == o_entries_map.end()); });
}
CacheBlock* LFUDAPolicy::get_victim_block(const DoutPrefixProvider* dpp, optional_yield y) {
- const std::lock_guard l(lfuda_lock);
if (entries_heap.empty())
return nullptr;
/* Get victim cache block */
- std::string key = entries_heap.top()->key;
+ LFUDAEntry* entry = entries_heap.top();
+ std::string key = entry->key;
CacheBlock* victim = new CacheBlock();
auto parts = split(key, "#");
victim->blockID = std::stoull(block_info[3]);
victim->size = std::stoull(block_info[4]);
- if (blockDir->get(dpp, victim, y) < 0) {
+ /* check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty
+ check refcount also, if refcount > 0 then no entries are available for eviction */
+ if (entry->dirty || entry->refcount > 0) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty or with positive refcount, no entry is available for eviction!" << dendl;
return nullptr;
}
}
int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) {
- if (entries_heap.empty())
- return 0;
-
int ret = -1;
uint64_t freeSpace = cacheDriver->get_free_space(dpp);
- while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop?
+ while (freeSpace < size) { // TODO: Think about parallel reads and writes; can this turn into an infinite loop?
+ std::unique_lock<std::mutex> l(lfuda_lock);
CacheBlock* victim = get_victim_block(dpp, y);
if (victim == nullptr) {
ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Could not retrieve victim block." << dendl;
delete victim;
+ l.unlock();
return -ENOSPC;
}
- const std::lock_guard l(lfuda_lock);
std::string key = entries_heap.top()->key;
auto it = entries_map.find(key);
if (it == entries_map.end()) {
delete victim;
+ l.unlock();
return -ENOENT;
}
- // check dirty flag of entry to be evicted, if the flag is dirty, all entries on the local node are dirty
- if (it->second->dirty) {
- ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): Top entry in min heap is dirty, no entry is available for eviction!" << dendl;
- return -ENOSPC;
- }
- int avgWeight = weightSum / entries_map.size();
+ int avgWeight = weightSum / entries_map.size();
+ /* the following part takes care of updating the weight (globalWeight) of the block if this is the last copy in a remote setup
+ and is pushed out to a remote cache where space is available */
+#if 0
if (victim->cacheObj.hostsList.size() == 1 && *(victim->cacheObj.hostsList.begin()) == dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address) { /* Last copy */
if (victim->globalWeight) {
it->second->localWeight += victim->globalWeight;
(*it->second->handle)->localWeight = it->second->localWeight;
entries_heap.decrease(it->second->handle); // larger value means node must be decreased to maintain min heap
-
if ((ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(it->second->localWeight), y)) < 0) {
delete victim;
return ret;
delete victim;
return ret;
}
+#endif
+ //erase also updates weightSum, is the following needed?
+ weightSum = (avgWeight * entries_map.size()) - it->second->localWeight;
+
+ age = std::max(it->second->localWeight, age);
+ _erase(dpp, key, y);
+ l.unlock();
+
+ //Need to get and then update the host atomically in a remote setup
if ((ret = blockDir->remove_host(dpp, victim, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0) {
delete victim;
return ret;
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Block " << key << " has been evicted." << dendl;
- weightSum = (avgWeight * entries_map.size()) - it->second->localWeight;
-
- age = std::max(it->second->localWeight, age);
-
- erase(dpp, key, y);
-
if (perfcounter) {
perfcounter->inc(l_rgw_d4n_cache_evictions);
}
return 0;
}
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val)
+bool LFUDAPolicy::update_refcount_if_key_exists(const DoutPrefixProvider* dpp, const std::string& key, uint8_t op, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "(): updating refcount for entry: " << key << dendl;
+ const std::lock_guard l(lfuda_lock);
+ auto entry = find_entry(key);
+ uint64_t refcount = 0;
+ if (entry == nullptr) {
+ return false;
+ }
+ refcount = entry->refcount;
+ ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "(): old refcount is: " << refcount << dendl;
+ if (op == RefCount::INCR) {
+ refcount += 1;
+ }
+ if (op == RefCount::DECR) {
+ if (refcount > 1) {
+ refcount -= 1;
+ }
+ }
+ (*entry->handle)->refcount = refcount;
+ ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << (*entry->handle)->refcount << dendl;
+ entries_heap.update(entry->handle);
+
+ return true;
+}
+
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val)
{
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updating entry: " << key << dendl;
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
int localWeight = age;
auto entry = find_entry(key);
bool updateLocalWeight = true;
+ uint64_t refcount = 0;
std::string oid_in_cache = key;
if (dirty == true) {
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl;
}
- // check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
- // incoming dirty flag is false, that means update() is invoked as part of cleaning process,
- // so we must not update its localWeight.
+ /* check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
+ incoming dirty flag is false, that means update() is invoked as part of cleaning process,
+ so we must not update its localWeight. */
if (entry != nullptr) {
- if (entry->dirty && !dirty) {
+ refcount = entry->refcount;
+ if ((entry->dirty && !dirty)) {
localWeight = entry->localWeight;
updateLocalWeight = false;
} else {
localWeight = entry->localWeight + age;
}
+ if (op == RefCount::INCR) {
+ refcount += 1;
+ }
+ if (op == RefCount::DECR) {
+ if (refcount > 0) {
+ refcount -= 1;
+ }
+ }
}
- erase(dpp, key, y);
- LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, dirty, localWeight);
+ _erase(dpp, key, y);
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << refcount << dendl;
+ LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, dirty, refcount, localWeight);
handle_type handle = entries_heap.push(e);
e->set_handle(handle);
entries_map.emplace(key, e);
weightSum += ((localWeight < 0) ? 0 : localWeight);
}
-void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val)
+void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val)
{
using handle_type = boost::heap::fibonacci_heap<LFUDAObjEntry*, boost::heap::compare<ObjectComparator<LFUDAObjEntry>>>::handle_type;
State state{State::INIT};
cond.notify_one();
}
-bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+bool LFUDAPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
auto p = entries_map.find(key);
if (p == entries_map.end()) {
return true;
}
+bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+{
+ const std::lock_guard l(lfuda_lock);
+ return _erase(dpp, key, y);
+}
+
bool LFUDAPolicy::erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
const std::lock_guard l(lfuda_cleaning_lock);
}
int LFUDAPolicy::delete_data_blocks(const DoutPrefixProvider* dpp, LFUDAObjEntry* e, optional_yield y) {
- off_t lst = e->size, fst = 0, ofs = 0;
- uint64_t len = 0;
+ off_t lst = e->size, fst = 0;
do {
if (fst >= lst) {
std::string oid_in_cache = DIRTY_BLOCK_PREFIX + prefix;
int ret = -1;
- if ((ret = cacheDriver->delete_data(dpp, oid_in_cache, y)) == 0) { // Sam: do we want del or delete_data here?
+ std::unique_lock<std::mutex> ll(lfuda_lock);
+ auto it = entries_map.find(prefix);
+ if (it != entries_map.end()) {
+ if (it->second->refcount > 0) {
+ return -EBUSY;//better error code?
+ }
+ }
+ ll.unlock();
+ if ((ret = cacheDriver->delete_data(dpp, oid_in_cache, y)) == 0) {
if (!(ret = erase(dpp, prefix, y))) {
ldpp_dout(dpp, 0) << "Failed to delete policy entry for: " << oid_in_cache << ", ret=" << ret << dendl;
return -EINVAL;
return -EINVAL;
}
- ofs += len;
- } while (len > 0);
+ fst += cur_len;
+ } while (fst < lst);
return 0;
}
}
if (p->second.second == State::INVALID) {
invalid = true;
+ } else {
+ p->second.second = State::IN_PROGRESS;
}
l.unlock();
if (invalid) {
ldpp_dout(dpp, 10) << __func__ << "(): State is INVALID; deleting object." << dendl;
int ret = -1;
- if ((ret = cacheDriver->delete_data(dpp, DIRTY_BLOCK_PREFIX + e->key, y)) == 0) { // Sam: do we want del or delete_data here?
- if (!(ret = erase(dpp, e->key, y))) {
- ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << e->key << ", ret=" << ret << dendl; // TODO: what must occur during failure?
- }
- } else {
- ldpp_dout(dpp, 0) << "Failed to delete head object for: " << e->key << ", ret=" << ret << dendl;
- }
+ //check if key exists and get the refcount of block, if greater than zero then modify the creationTime of dirty object to attempt to delete later
+ std::unique_lock<std::mutex> ll(lfuda_lock);
+ auto it = entries_map.find(e->key);
+ if (it != entries_map.end()) {
+ if (it->second->refcount > 0) {
+ l.lock();
+ //deferring the deletion of the invalid object
+ (*e->handle)->creationTime = (*e->handle)->creationTime + interval/2;
+ ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "(): updated creation time is: " << (*e->handle)->creationTime << dendl;
+ object_heap.update(e->handle);
+ l.unlock();
+ continue;
+ }
+ ll.unlock();
+ if ((ret = cacheDriver->delete_data(dpp, DIRTY_BLOCK_PREFIX + e->key, y)) == 0) {
+ if (!(ret = erase(dpp, e->key, y))) {
+ ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << e->key << ", ret=" << ret << dendl; // TODO: what must occur during failure?
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "Failed to delete head object for: " << e->key << ", ret=" << ret << dendl;
+ }
+ } else {
+ //ignore if block not found, as it could have been deleted earlier when refcount for it was 0
+ ll.unlock();
+ }
if (!e->delete_marker) {
ret = delete_data_blocks(dpp, e, y);
if (ret == 0) {
erase_dirty_object(dpp, e->key, null_yield);
- } else {
+ } else if (ret == -EBUSY) {
+ l.lock();
+ //deferring the deletion of the invalid object
+ (*e->handle)->creationTime = (*e->handle)->creationTime + interval/2;
+ ldpp_dout(dpp, 20) << "LFUDAPolicy::" << __func__ << "(): updated creation time is: " << (*e->handle)->creationTime << dendl;
+ object_heap.update(e->handle);
+ l.unlock();
+ continue;
+ } else {
ldpp_dout(dpp, 0) << "Failed to delete blocks for: " << e->key << ", ret=" << ret << dendl;
}
}
} else {
- l.lock();
- p->second.second = State::IN_PROGRESS;
- l.unlock();
-
rgw_user c_rgw_user = e->user;
//writing data to the backend
//we need to create an atomic_writer
erase_dirty_object(dpp, e->key, null_yield);
continue;
}
- //invoke update() with dirty flag set to false, to update in-memory metadata for each block
- // reset values
+ /* invoke update() with dirty flag set to false, to update in-memory metadata for each block
+ reset values */
lst = e->size;
fst = 0;
do {
//Rename block to remove "D" prefix
cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
//Update in-memory data structure for each block
- this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, y);
+ this->update(dpp, new_oid_in_cache, 0, 0, e->version, false, 0, y);
rgw::d4n::CacheBlock block;
block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
cacheDriver->rename(dpp, head_oid_in_cache, new_head_oid_in_cache, null_yield);
//invoke update() with dirty flag set to false, to update in-memory metadata for head
- this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, y);
+ this->update(dpp, new_head_oid_in_cache, 0, 0, e->version, false, 0, y);
if (null_instance) {
//restore instance for directory data processing in later steps
.objName = c_obj->get_name(),
.bucketName = c_obj->get_bucket()->get_bucket_id(),
};
- //remove the entry from the ordered set using its score, as the object is already cleaned
- //need not be part of a transaction as it is being removed based on its score which is its creation time.
+ /* remove the entry from the ordered set using its score, as the object is already cleaned
+ need not be part of a transaction as it is being removed based on its score which is its creation time. */
ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
erase_dirty_object(dpp, e->key, null_yield);
}
} else if (diff < interval) { //end-if std::difftime(time(NULL), e->creationTime) > interval
- std::this_thread::sleep_for(std::chrono::seconds(interval - diff)); //TODO:: should this time be optimised?
+ std::this_thread::sleep_for(std::chrono::seconds(interval - diff));
}
} //end-while true
}
return 0;
}
-void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val)
{
const std::lock_guard l(lru_lock);
_erase(dpp, key, y);
- Entry* e = new Entry(key, offset, len, version, dirty);
+ Entry* e = new Entry(key, offset, len, version, dirty, 0);
entries_lru_list.push_back(*e);
entries_map.emplace(key, e);
}
static std::string empty = std::string();
+enum RefCount {
+ NOOP = 0,
+ INCR = 1,
+ DECR = 2,
+};
+
enum class State { // state machine for dirty objects in the cache
INIT,
IN_PROGRESS, // object is being written to the backend
uint64_t len;
std::string version;
bool dirty;
- Entry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty) : key(key), offset(offset),
- len(len), version(version), dirty(dirty) {}
+ uint64_t refcount{0};
+ Entry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount) : key(key), offset(offset),
+ len(len), version(version), dirty(dirty), refcount(refcount) {}
};
//The disposer object function
virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) = 0;
virtual int exist_key(std::string key) = 0;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
- virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) = 0;
+ virtual bool update_refcount_if_key_exists(const DoutPrefixProvider* dpp, const std::string& key, uint8_t op, optional_yield y) = 0;
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val=empty) = 0;
virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
- const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) = 0;
+ const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) = 0;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) = 0;
template<typename T>
struct EntryComparator {
bool operator()(T* const e1, T* const e2) const {
- // order the min heap using localWeight and dirty flag so that dirty blocks are at the bottom
- if ((e1->dirty && e2->dirty) || (!e1->dirty && !e2->dirty)) {
+ // order the min heap using localWeight, refcount and dirty flag so that dirty blocks and blocks with positive refcount are at the bottom
+ if ((e1->dirty && e2->dirty) || (!e1->dirty && !e2->dirty) || (e1->refcount > 0 && e2->refcount > 0)) {
return e1->localWeight > e2->localWeight;
} else if (e1->dirty && !e2->dirty){
return true;
} else if (!e1->dirty && e2->dirty) {
return false;
- } else {
+ } else if (e1->refcount > 0 && e2->refcount == 0) {
+ return true;
+ } else if (e1->refcount == 0 && e2->refcount > 0) {
+ return false;
+ }else {
return e1->localWeight > e2->localWeight;
}
}
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
handle_type handle;
- LFUDAEntry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, int localWeight) : Entry(key, offset, len, version, dirty),
+ LFUDAEntry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount, int localWeight) : Entry(key, offset, len, version, dirty, refcount),
localWeight(localWeight) {}
void set_handle(handle_type handle_) { handle = handle_; }
handle_type handle;
LFUDAObjEntry(const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
- time_t creationTime, rgw_user user, const std::string& etag,
+ time_t creationTime, rgw_user user, const std::string& etag,
const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key) : ObjEntry(key, version, deleteMarker, size,
creationTime, user, etag, bucket_name, bucket_id, obj_key) {}
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
+ virtual bool update_refcount_if_key_exists(const DoutPrefixProvider* dpp, const std::string& key, uint8_t op, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+ virtual bool _erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y);
void save_y(optional_yield y) { this->y = y; }
virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
- const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) override;
+ const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override;
virtual void cleaning(const DoutPrefixProvider* dpp) override;
virtual int init(CephContext* cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, optional_yield y, std::string& restore_val=empty) override;
+ virtual bool update_refcount_if_key_exists(const DoutPrefixProvider* dpp, const std::string& key, uint8_t op, optional_yield y) override { return false; }
+ virtual void update(const DoutPrefixProvider* dpp, const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
virtual void update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size,
time_t creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id,
- const rgw_obj_key& obj_key, optional_yield y, std::string& restore_val=empty) override;
+ const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val=empty) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual bool invalidate_dirty_object(const DoutPrefixProvider* dpp, const std::string& key) override { return false; }
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << dest_version << dendl;
bufferlist bl;
- driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), dest_version, dirty, y);
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, y);
d4n_dest_object->set_object_version(dest_version);
ret = d4n_dest_object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty);
if (ret < 0) {
return ret;
}
if (dirty) {
- driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, false, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), y);
+ driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, dest_version, false, this->get_size(), creationTime, std::get<rgw_user>(dest_object->get_bucket()->get_owner()), *etag, dest_object->get_bucket()->get_name(), dest_object->get_bucket()->get_bucket_id(), dest_object->get_key(), rgw::d4n::RefCount::NOOP, y);
}
}
}
rgw::sal::Attrs attrs;
std::string head_oid_in_cache;
rgw::d4n::CacheBlock block;
+ bool found_in_cache = false;
if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y)) {
+ found_in_cache = true;
if (setattrs != nullptr) {
/* Ensure setattrs and delattrs do not overlap */
if (delattrs != nullptr) {
}
}
//if set_obj_attrs() can be called to update existing attrs, then update_attrs() need to be called
- if (auto ret = driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, *setattrs, y); ret < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed with ret: " << ret << dendl;
- return ret;
+ if (this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::INCR, y)) {
+ auto ret = driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, *setattrs, y);
+ this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::DECR, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver set_attrs method failed with ret: " << ret << dendl;
+ return ret;
+ }
+ } else {
+ found_in_cache = false;
}
} //if setattrs != nullptr
delattrs->erase(std::find(delattrs->begin(), delattrs->end(), attr));
}
}
-
- if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, *delattrs, y); ret < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
- return ret;
+ if (this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::INCR, y)) {
+ auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, *delattrs, y);
+ this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::DECR, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
+ return ret;
+ }
+ } else {
+ found_in_cache = false;
}
} //if delattrs != nullptr
} else {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object " << this->get_name() << " does not exist." << dendl;
return -ENOENT;
}
+ }
+ if (!found_in_cache) {
auto ret = next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): set_obj_attrs method of backend store failed with ret: " << ret << dendl;
ret = driver->get_cache_driver()->put(dpp, oid_in_cache, bl, 0, attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << "(): version stored in update method is: " << version << dendl;
- driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, true, y);
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, true, rgw::d4n::RefCount::NOOP, y);
std::vector<std::string> exec_responses;
ret = this->set_head_obj_dir_entry(dpp, &exec_responses , y, true, true);
if (exec_responses.empty()) {
auto creationTime = ceph::real_clock::to_time_t(this->get_mtime());
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): key=" << key << dendl;
std::string objEtag = "";
- driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, true, this->get_accounted_size(), creationTime, std::get<rgw_user>(this->get_bucket()->get_owner()), objEtag, this->get_bucket()->get_name(), this->get_bucket()->get_bucket_id(), this->get_key(), y);
+ driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, true, this->get_accounted_size(), creationTime, std::get<rgw_user>(this->get_bucket()->get_owner()), objEtag, this->get_bucket()->get_name(), this->get_bucket()->get_bucket_id(), this->get_key(), rgw::d4n::RefCount::NOOP, y);
} else { //if get_cache_driver()->put()
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): put failed for oid_in_cache, ret=" << ret << " oid_in_cache: " << oid_in_cache << dendl;
return ret;
/* for distributed cache-the blockHostsList can be used to determine if the head block resides on the localhost, then get the block from localhost, whether or not the block is dirty
can be determined using the block entry. */
- //uniform name for versioned and non-versioned objects, since input for versioned objects might not contain version
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Is block dirty: " << block.cacheObj.dirty << dendl;
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version: " << block.version << dendl;
- head_oid_in_cache = get_cache_block_prefix(this, version, block.cacheObj.dirty);
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl;
- auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
- if (ret < 0) {
+ std::string key = get_cache_block_prefix(this, version, false);
+ if (this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, key, rgw::d4n::RefCount::INCR, y)) {
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Is block dirty: " << block.cacheObj.dirty << dendl;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): version: " << block.version << dendl;
+ head_oid_in_cache = get_cache_block_prefix(this, version, block.cacheObj.dirty);
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Fetching attrs from cache for head obj id: " << head_oid_in_cache << dendl;
+ auto ret = this->driver->get_cache_driver()->get_attrs(dpp, head_oid_in_cache, attrs, y);
+ if (ret < 0) {
+ found_in_cache = false;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
+ }
+ std::string key = head_oid_in_cache;
+ if (block.cacheObj.dirty) {
+ key = key.erase(0, 2); // Remove dirty prefix
+ }
+ this->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, 0, version, block.cacheObj.dirty, rgw::d4n::RefCount::DECR, y);
+ } else {
found_in_cache = false;
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver get_attrs method failed." << dendl;
- }
- std::string key = head_oid_in_cache;
- if (block.cacheObj.dirty) {
- key = key.erase(0, 2); // Remove dirty prefix
}
- this->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, 0, version, block.cacheObj.dirty, y);
} else if (ret == -ENOENT) { //if blockDir->get
found_in_cache = false;
} else {
}
std::string objName = this->get_name();
head_oid_in_cache = get_cache_block_prefix(this, version, false);
- if (this->driver->get_policy_driver()->get_cache_policy()->exist_key(head_oid_in_cache) > 0) {
+ if (this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::INCR, y)) {
ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
} else {
ret = this->driver->get_policy_driver()->get_cache_policy()->eviction(dpp, attrs.size(), y);
}
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
- this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, y);
+ this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, rgw::d4n::RefCount::DECR, y);
ret = set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
}
} else {
+ this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::DECR, y);
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): failed to cache head object in cache backend, ret=" << ret << dendl;
}
} else {
rgw::sal::Attrs attrs;
Attrs delattr;
rgw::d4n::CacheBlock block;
+ bool found_in_cache = false;
if (check_head_exists_in_cache_get_oid(dpp, head_oid_in_cache, attrs, block, y)) {
+ found_in_cache = true;
delattr.insert({attr_name, bl});
Attrs currentattrs = this->get_attrs();
rgw::sal::Attrs::iterator attr = delattr.begin();
/* Ensure delAttr exists */
if (std::find_if(currentattrs.begin(), currentattrs.end(),
[&](const auto& pair) { return pair.first == attr->first; }) != currentattrs.end()) {
-
- if (auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, delattr, y); ret < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
- return ret;
+ if (this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::INCR, y)) {
+ auto ret = driver->get_cache_driver()->delete_attrs(dpp, head_oid_in_cache, delattr, y);
+ this->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, head_oid_in_cache, rgw::d4n::RefCount::DECR, y);
+ if ( ret < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed with ret: " << ret << dendl;
+ return ret;
+ }
+ } else {
+ found_in_cache = false;
}
}
} else {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): object " << this->get_name() << " does not exist." << dendl;
return -ENOENT;
}
-
+ }
+ if (!found_in_cache) {
if (auto ret = next->delete_obj_attrs(dpp, attr_name, y); ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): delete_obj_attrs method of backend store failed with ret: " << ret << dendl;
return ret;
ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, rgw::d4n::RefCount::NOOP, y);
ret = source->set_head_obj_dir_entry(dpp, nullptr, y, is_latest_version);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) {
auto c = aio->drain();
int r = flush(dpp, std::move(c), y);
+ std::string version = source->get_object_version();
+ std::string prefix = source->get_prefix();
+ for (auto it : blocks_info) {
+ std::pair<uint64_t, uint64_t> ofs_len_pair = it.second;
+ uint64_t ofs = ofs_len_pair.first;
+ uint64_t len = ofs_len_pair.second;
+ std::string oid_in_cache = prefix + CACHE_DELIM + std::to_string(ofs) + CACHE_DELIM + std::to_string(len);
+ source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::DECR, y);
+ }
if (r < 0) {
cancel();
return r;
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, rgw::d4n::RefCount::DECR, y);
+ blocks_info.erase(it);
if (source->dest_object && source->dest_bucket) {
D4NFilterObject* d4n_dest_object = dynamic_cast<D4NFilterObject*>(source->dest_object);
std::string dest_version = d4n_dest_object->get_object_version();
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " destination object version in update method is: " << dest_version << dendl;
ret = source->driver->get_cache_driver()->put(dpp, dest_oid_in_cache, bl, bl.length(), attrs, y);
if (ret == 0) {
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), dest_version, true, y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, key, ofs, bl.length(), dest_version, true, rgw::d4n::RefCount::NOOP, y);
}
if (ret = source->driver->get_block_dir()->set(dpp, &dest_block, y); ret < 0){
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " eviction returned ret: " << ret << dendl;
}
}
- blocks_info.erase(it);
} else {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
}
auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
if (it != block.cacheObj.hostsList.end()) { /* Local copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in directory. " << oid_in_cache << dendl;
std::string key = oid_in_cache;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.cacheObj.dirty << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
- if (block.version == version) {
- if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
- // Read From Cache
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
-
- this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
-
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
- auto r = flush(dpp, std::move(completed), y);
-
- if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
- return r;
- }
- // if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
- } else {
- int r = -1;
- if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl;
-
- if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
- auto r = drain(dpp, y);
-
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
-
- break;
+ if (block.version == version) {
+ if (source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::INCR, y) > 0) {
+ // Read From Cache
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+ auto r = flush(dpp, std::move(completed), y);
+ if (r < 0) {
+ drain(dpp, y);
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+ return r;
}
- }
- // if (block.version == version)
- } else {
- // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
-
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
- auto r = drain(dpp, y);
-
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
- break;
- }
- // if (it != block.cacheObj.hostsList.end())
+ } else { // end - if update_refcount_if_key_exists
+ int r = -1;
+ if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache <<", ret=" << r << dendl;
+
+ if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
+ }
+ } //end - else
+ } else { // if (block.version == version)
+ // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
+ }
} else if (block.cacheObj.hostsList.size()) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in remote cache. " << oid_in_cache << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
}
- // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
- } else if (ret == -ENOENT) {
+ } else if (ret == -ENOENT) { // end - if ((ret = source->driver->get_block_dir()->get
block.blockID = adjusted_start_ofs;
uint64_t obj_size = source->get_size(), chunk_size = 0;
if (obj_size < max_chunk_size) {
block.size = chunk_size;
if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0) {
- auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
-
- if (it != block.cacheObj.hostsList.end()) { /* Local copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
+ auto it = block.cacheObj.hostsList.find(dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address);
- if (block.version == version) {
- oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(chunk_size));
- std::string key = oid_in_cache;
+ if (it != block.cacheObj.hostsList.end()) { /* Local copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in local cache." << dendl;
- //for range requests, for last part, the whole part might exist in the cache
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
- " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+ if (block.version == version) {
+ oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(chunk_size));
+ std::string key = oid_in_cache;
- if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
- // Read From Cache
- if (block.cacheObj.dirty == true){
- key = DIRTY_BLOCK_PREFIX + oid_in_cache;
- }
-
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
+ //for range requests, for last part, the whole part might exist in the cache
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
+ " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
-
- this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, chunk_size)));
-
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
- auto r = flush(dpp, std::move(completed), y);
-
- if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
- return r;
- }
- // if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0)
- } else {
- int r = -1;
- if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
-
- if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ if ((part_len != chunk_size) && source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::INCR, y) > 0) {
+ // Read From Cache
+ if (block.cacheObj.dirty == true){
+ key = DIRTY_BLOCK_PREFIX + oid_in_cache;
+ }
- auto r = drain(dpp, y);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
-
- break;
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
+ this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, chunk_size)));
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+ auto r = flush(dpp, std::move(completed), y);
+ if (r < 0) {
+ drain(dpp, y);
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, ret=" << r << dendl;
+ return r;
+ }
+ } else { // end - if ((part_len != chunk_size) && update_refcount_if_key_exists
+ int r = -1;
+ if ((r = source->driver->get_block_dir()->remove_host(dpp, &block, dpp->get_cct()->_conf->rgw_d4n_l1_datacache_address, y)) < 0)
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to remove incorrect host from block with oid=" << oid_in_cache << ", ret=" << r << dendl;
+
+ if ((block.cacheObj.hostsList.size() - 1) > 0 && r == 0) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
+ }
+ }
+ } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
+ // TODO: Retrieve remotely
+ // Policy decision: should we cache remote blocks locally?
+ }
+ } else { // end - if (block.version == version)
+ // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ auto r = drain(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
+ }
+ break;
}
- }
- // if (it != block.cacheObj.hostsList.end())
- } else if (block.cacheObj.hostsList.size()) { /* Remote copy */
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block with oid=" << oid_in_cache << " found in remote cache." << dendl;
- // TODO: Retrieve remotely
- // Policy decision: should we cache remote blocks locally?
- }
- // if (block.version == version)
- } else {
- // TODO: If data has already been returned for any older versioned block, then return ‘retry’ error, else
-
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
- auto r = drain(dpp, y);
-
- if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
- }
- break;
- }
- } else if (ret == -ENOENT) { // if ((ret = source->driver->get_block_dir()->get(dpp, &block, y)) == 0)
+ } else if (ret == -ENOENT) { // end - if source->driver->get_block_dir()->get(dpp, &block, y))
block.blockID = adjusted_start_ofs;
uint64_t last_part_size = source->get_size() - adjusted_start_ofs;
block.size = last_part_size;
oid_in_cache = get_key_in_cache(prefix, std::to_string(adjusted_start_ofs), std::to_string(last_part_size));
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache <<
" length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if ((part_len != last_part_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+ if ((part_len != last_part_size) && source->driver->get_policy_driver()->get_cache_policy()->update_refcount_if_key_exists(dpp, oid_in_cache, rgw::d4n::RefCount::INCR, y) > 0) {
// Read From Cache
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.cacheObj.dirty << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: oid_in_cache=" << oid_in_cache << dendl;
break;
}
}
- } else {// if (block.version == version)
+ } else {// end - if (block.version == version)
//TODO: return retry error
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
auto r = drain(dpp, y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
auto r = drain(dpp, y);
if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
return r;
}
break;
}
} else { // else if (ret == -ENOENT)
if (ret < 0)
- ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
-
+ ldpp_dout(dpp, 0) << "Failed to fetch existing block for: " << block.cacheObj.objName << " blockID: " << block.blockID << " block size: " << block.size << ", ret=" << ret << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-
auto r = drain(dpp, y);
if (r < 0) {
- ldpp_dout(dpp, 0) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
- return r;
+ ldpp_dout(dpp, 10) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, ret=" << r << dendl;
+ return r;
}
-
break;
- }
+ } //end - else
if (start_part_num == (num_parts - 1)) {
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
std::string objEtag = "";
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
/* Store block in directory */
existing_block.blockID = block.blockID;
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB:: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
}
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
/* Store block in directory */
existing_block.blockID = block.blockID;
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, dest_oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
}
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, adjusted_start_ofs, bl_rem.length(), version, dirty, rgw::d4n::RefCount::NOOP, *y);
/* Store block in directory */
existing_block.blockID = block.blockID;
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, dest_oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, dest_oid, adjusted_start_ofs, bl_rem.length(), dest_version, dirty, rgw::d4n::RefCount::NOOP, *y);
if (ret = blockDir->set(dpp, &dest_block, *y); ret < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " BlockDirectory set failed with ret: " << ret << dendl;
}
ret = driver->get_cache_driver()->put(dpp, key, bl, bl.length(), attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 10) << "D4NFilterWriter::" << __func__ << "(): oid_in_cache is: " << oid_in_cache << dendl;
- driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, y);
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, y);
} else {
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): ERROR: writting data to the cache failed, ret=" << ret << dendl;
return ret;
object->set_object_version(version);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): version stored in update method is: " << version << dendl;
- driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, y);
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, key, 0, bl.length(), version, dirty, rgw::d4n::RefCount::NOOP, y);
ret = object->set_head_obj_dir_entry(dpp, nullptr, y, true, dirty);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
auto creationTime = ceph::real_clock::to_time_t(object->get_mtime());
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): key=" << key << dendl;
ldpp_dout(dpp, 20) << "D4NFilterWriter::" << __func__ << "(): obj->get_key()=" << obj->get_key() << dendl;
- driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, false, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), y);
+ driver->get_policy_driver()->get_cache_policy()->update_dirty_object(dpp, key, version, false, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, obj->get_bucket()->get_name(), obj->get_bucket()->get_bucket_id(), obj->get_key(), rgw::d4n::RefCount::NOOP, y);
}
} else { //if get_cache_driver()->put()
ldpp_dout(dpp, 0) << "D4NFilterWriter::" << __func__ << "(): put failed for head_oid_in_cache, ret=" << ret << dendl;
ret = driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterMultipartUpload::" << __func__ << " version stored in update method is: " << d4n_target_obj->get_object_version() << dendl;
- driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, y);
+ driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, rgw::d4n::RefCount::NOOP, y);
ret = d4n_target_obj->set_head_obj_dir_entry(dpp, nullptr, y, true);
if (ret < 0) {
ldpp_dout(dpp, 0) << "D4NFilterMultipartUpload::" << __func__ << "(): BlockDirectory set method failed for head object, ret=" << ret << dendl;
namespace rgw { namespace cache {
static std::atomic<uint64_t> index{0};
+static std::atomic<uint64_t> dir_index{0};
static std::vector<std::string> tokenize_key(std::string_view key)
{
static void create_directories(const DoutPrefixProvider* dpp, const std::string& dir_path)
{
std::error_code ec;
+ std::string temp_dir_path = dir_path + "_" + std::to_string(dir_index++);
if (!efs::exists(dir_path, ec)) {
- if (!efs::create_directories(dir_path, ec)) {
- ldpp_dout(dpp, 0) << "initialize::: ERROR creating directory: '" << dir_path <<
+ if (!efs::create_directories(temp_dir_path, ec)) {
+ ldpp_dout(dpp, 0) << "create_directories::: ERROR creating directory: '" << temp_dir_path <<
"' : " << ec.value() << dendl;
} else {
- uid_t uid = dpp->get_cct()->get_set_uid();
- gid_t gid = dpp->get_cct()->get_set_gid();
+ efs::rename(temp_dir_path, dir_path, ec);
+ if (ec) {
+ ldpp_dout(dpp, 0) << "create_directories::: ERROR renaming directory: '" << temp_dir_path <<
+ "' : " << ec.value() << dendl;
+ efs::remove(temp_dir_path, ec);
+ } else {
+ uid_t uid = dpp->get_cct()->get_set_uid();
+ gid_t gid = dpp->get_cct()->get_set_gid();
- ldpp_dout(dpp, 5) << "initialize:: uid is " << uid << " and gid is " << gid << dendl;
- ldpp_dout(dpp, 5) << "initialize:: changing permissions for directory: " << dendl;
+ ldpp_dout(dpp, 5) << "create_directories:: uid is " << uid << " and gid is " << gid << dendl;
+ ldpp_dout(dpp, 5) << "create_directories:: changing permissions for directory: " << dendl;
- if (uid) {
- if (chown(dir_path.c_str(), uid, gid) == -1) {
- ldpp_dout(dpp, 5) << "initialize: chown return error: " << strerror(errno) << dendl;
- }
+ if (uid) {
+ if (chown(dir_path.c_str(), uid, gid) == -1) {
+ ldpp_dout(dpp, 5) << "create_directories: chown return error: " << strerror(errno) << dendl;
+ }
- if (chmod(dir_path.c_str(), S_IRWXU|S_IRWXG|S_IRWXO) == -1) {
- ldpp_dout(dpp, 5) << "initialize: chmod return error: " << strerror(errno) << dendl;
+ if (chmod(dir_path.c_str(), S_IRWXU|S_IRWXG|S_IRWXO) == -1) {
+ ldpp_dout(dpp, 5) << "create_directories: chmod return error: " << strerror(errno) << dendl;
+ }
}
}
}
return 0;
}
+uint64_t SSDDriver::get_free_space(const DoutPrefixProvider* dpp)
+{
+ efs::space_info space = efs::space(partition_info.location);
+ return space.available;
+}
+
+void SSDDriver::set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space)
+{
+ std::lock_guard l(cache_lock);
+ this->free_space = free_space;
+}
+
int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
ldpp_dout(dpp, 0) << "ERROR: append_data::fclose file has return error, errno=" << errno << dendl;
return -errno;
}
-
+ std::lock_guard l(cache_lock);
efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;
parse_key(dpp, partition_info.location, key, dir_path, file_name, is_dirty);
std::string location = get_file_path(dpp, dir_path, file_name);
ldpp_dout(dpp, 20) << "INFO: delete_data::file to remove: " << location << dendl;
+ std::error_code ec;
//Remove file
- if (!efs::remove(location)) {
+ if (!efs::remove(location, ec)) {
ldpp_dout(dpp, 0) << "ERROR: delete_data::remove has failed to remove the file: " << location << dendl;
- return -EIO;
+ return -ec.value();
}
//Remove directory if empty, removes object directory
- if (efs::is_empty(dir_path)) {
- ldpp_dout(dpp, 20) << "INFO: delete_data::object directory to remove: " << dir_path << dendl;
- if (!efs::remove(dir_path)) {
- ldpp_dout(dpp, 0) << "ERROR: delete_data::remove has failed to remove the directory: " << dir_path << dendl;
- return -EIO;
+ if (efs::is_empty(dir_path, ec)) {
+ ldpp_dout(dpp, 20) << "INFO: delete_data::object directory to remove: " << dir_path << " :" << ec.value() << dendl;
+ if (!efs::remove(dir_path, ec)) {
+ //another version could have been written between the check and removal, hence not returning error from here
+ ldpp_dout(dpp, 0) << "ERROR: delete_data::remove has failed to remove the directory: " << dir_path << " :" << ec.value() << dendl;
}
}
auto pos = dir_path.find_last_of('/');
dir_path.erase(pos, (dir_path.length() - pos));
//Remove bucket directory
- if (efs::is_empty(dir_path)) {
- ldpp_dout(dpp, 20) << "INFO: delete_data::bucket directory to remove: " << dir_path << dendl;
- if (!efs::remove(dir_path)) {
- ldpp_dout(dpp, 0) << "ERROR: delete_data::remove has failed to remove the directory: " << dir_path << dendl;
- return -EIO;
+ if (efs::is_empty(dir_path, ec)) {
+ ldpp_dout(dpp, 20) << "INFO: delete_data::bucket directory to remove: " << dir_path << " :" << ec.value() << dendl;
+ if (!efs::remove(dir_path, ec)) {
+ //another object could have been written between the check and removal, hence not returning error from here
+ ldpp_dout(dpp, 0) << "ERROR: delete_data::remove has failed to remove the directory: " << dir_path << " :" << ec.value() << dendl;
}
}
}
+
efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
r = fd = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode));
if (fd < 0) {
- ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << file_path.c_str() << "'" << dendl;
- return r;
+ //directories might have been deleted by a parallel delete of the last version of an object
+ if (errno == ENOENT) {
+ //retry after creating directories
+ std::string dir_path = file_path;
+ auto pos = dir_path.find_last_of('/');
+ if (pos != std::string::npos) {
+ dir_path.erase(pos, (dir_path.length() - pos));
+ }
+ ldpp_dout(dpp, 20) << "INFO: AsyncWriteRequest::prepare_libaio_write_op: dir_path for creating directories=" << dir_path << dendl;
+ create_directories(dpp, dir_path);
+ r = fd = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode));
+ if (fd < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << file_path.c_str() << "'" << dendl;
+ return r;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << file_path.c_str() << "'" << dendl;
+ return r;
+ }
}
if (dpp->get_cct()->_conf->rgw_d4n_l1_fadvise != POSIX_FADV_NORMAL)
posix_fadvise(fd, 0, 0, dpp->get_cct()->_conf->rgw_d4n_l1_fadvise);
for (auto& it : attrs) {
std::string attr_name = it.first;
std::string attr_val = it.second.to_str();
- std::string old_attr_val;
- auto ret = get_attr(dpp, key, attr_name, old_attr_val, y);
- if (old_attr_val.empty()) {
+ auto ret = setxattr(location.c_str(), attr_name.c_str(), attr_val.c_str(), attr_val.size(), XATTR_REPLACE);
+ if (ret < 0 && errno == ENODATA) {
ret = setxattr(location.c_str(), attr_name.c_str(), attr_val.c_str(), attr_val.size(), XATTR_CREATE);
- } else {
- ret = setxattr(location.c_str(), attr_name.c_str(), attr_val.c_str(), attr_val.size(), XATTR_REPLACE);
}
if (ret < 0) {
ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): could not modify attr value for attr name: " << attr_name << " key: " << key << " ERROR: " << cpp_strerror(errno) <<dendl;
return ret;
}
}
+
efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;
return 0;
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): get_attr: key: " << attr_name << dendl;
- int attr_size = getxattr(location.c_str(), attr_name.c_str(), nullptr, 0);
- if (attr_size < 0) {
- ldpp_dout(dpp, 0) << "ERROR: could not get attribute " << attr_name << ": " << cpp_strerror(errno) << dendl;
- attr_val = "";
- return errno;
- }
-
- if (attr_size == 0) {
- ldpp_dout(dpp, 0) << "ERROR: no attribute value found for attr_name: " << attr_name << dendl;
- attr_val = "";
+ size_t buffer_size = 256;
+ while (true) {
+ attr_val.resize(buffer_size);
+ ssize_t attr_size = getxattr(location.c_str(), attr_name.c_str(), attr_val.data(), attr_val.size());
+ if (attr_size < 0) {
+ if (errno == ERANGE) {
+ // Buffer too small, get actual size needed
+ attr_size = getxattr(location.c_str(), attr_name.c_str(), nullptr, 0);
+ if (attr_size < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: could not get attribute " << attr_name << ": " << cpp_strerror(errno) << dendl;
+ attr_val = "";
+ return errno;
+ }
+ if (attr_size == 0) {
+ ldpp_dout(dpp, 0) << "ERROR: no attribute value found for attr_name: " << attr_name << dendl;
+ attr_val = "";
+ return 0;
+ }
+ // Resize and try again
+ buffer_size = static_cast<size_t>(attr_size);
+ continue;
+ }
+ ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): could not get attribute " << attr_name << ": " << cpp_strerror(errno) << dendl;
+ attr_val = "";
+ return errno;
+ } //end-if result < 0
+ if (attr_size == 0) {
+ ldpp_dout(dpp, 0) << "ERROR: no attribute value found for attr_name: " << attr_name << dendl;
+ attr_val = "";
+ return 0;
+ } //end-if result == 0
+ // Success - resize buffer to actual data size and return
+ ldpp_dout(dpp, 20) << "INFO: attr_size is: " << attr_size << dendl;
+ attr_val.resize(static_cast<size_t>(attr_size));
return 0;
}
-
- attr_val.resize(attr_size);
- attr_size = getxattr(location.c_str(), attr_name.c_str(), attr_val.data(), attr_size);
- if (attr_size < 0) {
- ldpp_dout(dpp, 0) << "SSDCache: " << __func__ << "(): could not get attr value for attr name: " << attr_name << " key: " << key << dendl;
- attr_val = "";
- return errno;
- }
-
- return 0;
}
int SSDDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y)
/* Partition */
virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; }
- virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; }
- void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; }
+ virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override;
+ void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space);
virtual int restore_blocks_objects(const DoutPrefixProvider* dpp, ObjectDataCallback obj_func, BlockDataCallback block_func) override;
Partition partition_info;
uint64_t free_space;
CephContext* cct;
+ std::mutex cache_lock;
struct libaio_read_handler {
rgw::Aio* throttle = nullptr;
std::string oid = rgw::sal::get_key_in_cache(get_prefix(block->cacheObj.bucketName, block->cacheObj.objName, version), std::to_string(block->blockID), std::to_string(block->size));
if (this->policyDriver->get_cache_policy()->exist_key(oid)) { /* Local copy */
- policyDriver->get_cache_policy()->update(env->dpp, oid, 0, bl.length(), "", false, y);
+ policyDriver->get_cache_policy()->update(env->dpp, oid, 0, bl.length(), "", false, rgw::d4n::RefCount::NOOP, y);
return 0;
} else {
if (this->policyDriver->get_cache_policy()->eviction(dpp, block->size, y) < 0)
if (dir->set(env->dpp, block, y) < 0)
return -1;
- this->policyDriver->get_cache_policy()->update(dpp, oid, 0, bl.length(), "", false, y);
+ this->policyDriver->get_cache_policy()->update(dpp, oid, 0, bl.length(), "", false, rgw::d4n::RefCount::NOOP, y);
if (cacheDriver->put(dpp, oid, bl, bl.length(), attrs, y) < 0)
return -1;
return cacheDriver->set_attr(dpp, oid, "localWeight", std::to_string(age), y);
std::string version;
std::string key = rgw::sal::get_key_in_cache(get_prefix(block->cacheObj.bucketName, block->cacheObj.objName, version), std::to_string(block->blockID), std::to_string(block->size));
ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{yield}));
- policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", false, optional_yield{yield});
+ policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", false, rgw::d4n::RefCount::NOOP, optional_yield{yield});
ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, yield), 0);
ASSERT_EQ(0, dir->set(env->dpp, &victim, optional_yield{yield}));
std::string victimKeyInCache = rgw::sal::get_key_in_cache(get_prefix(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.version), std::to_string(victim.blockID), std::to_string(victim.size));
ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKeyInCache, bl, bl.length(), attrs, optional_yield{yield}));
- policyDriver->get_cache_policy()->update(env->dpp, victimKeyInCache, 0, bl.length(), "", false, optional_yield{yield});
+ policyDriver->get_cache_policy()->update(env->dpp, victimKeyInCache, 0, bl.length(), "", false, rgw::d4n::RefCount::NOOP, optional_yield{yield});
/* Set head blocks */
std::string victimHeadObj = get_prefix(victim.cacheObj.bucketName, victim.cacheObj.objName, victim.version);
ASSERT_EQ(0, cacheDriver->put(env->dpp, victimHeadObj, bl, bl.length(), attrs, optional_yield{yield}));
- policyDriver->get_cache_policy()->update(env->dpp, victimHeadObj, 0, bl.length(), "", false, optional_yield{yield});
+ policyDriver->get_cache_policy()->update(env->dpp, victimHeadObj, 0, bl.length(), "", false, rgw::d4n::RefCount::NOOP, optional_yield{yield});
/* Remote block */
block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */