tc = std::thread(&CachePolicy::cleaning, this, dpp);
}
+ lwthread = std::thread(&LFUDAPolicy::localweight_writer, this, dpp);
+ lw_quit = false;
+
try {
boost::system::error_code ec;
response<
{
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updating entry: " << key << dendl;
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
- const std::lock_guard l(lfuda_lock);
- int localWeight = age;
- auto entry = find_entry(key);
- bool updateLocalWeight = true;
- uint64_t refcount = 0;
-
- if (!restore_val.empty()) {
- updateLocalWeight = false;
- localWeight = std::stoull(restore_val);
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl;
- }
+ bool updateLocalWeight = true, should_notify = false;
+ {
+ const std::lock_guard l(lfuda_lock);
+ int localWeight = age;
+ auto entry = find_entry(key);
+ uint64_t refcount = 0;
+ if (!restore_val.empty()) {
+ updateLocalWeight = false;
+ localWeight = std::stoull(restore_val);
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): restored localWeight is: " << localWeight << dendl;
+ }
- /* check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
- incoming dirty flag is false, that means update() is invoked as part of cleaning process,
- so we must not update its localWeight. */
- if (entry) {
- refcount = entry->refcount;
- if (entry->dirty && dirty.has_value()) {
- bool is_dirty = dirty.value();
- if (!is_dirty) {
- localWeight = entry->localWeight;
- updateLocalWeight = false;
+ /* check the dirty flag in the existing entry for the key and the incoming dirty flag. If the
+ incoming dirty flag is false, that means update() is invoked as part of cleaning process,
+ so we must not update its localWeight. */
+ if (entry) {
+ refcount = entry->refcount;
+ if (entry->dirty && dirty.has_value()) {
+ bool is_dirty = dirty.value();
+ if (!is_dirty) {
+ localWeight = entry->localWeight;
+ updateLocalWeight = false;
+ }
+ }
+ if (updateLocalWeight) {
+ localWeight = entry->localWeight + age;
+ }
+ if (op == RefCount::INCR) {
+ refcount += 1;
+ }
+ if (op == RefCount::DECR) {
+ if (refcount > 0) {
+ refcount -= 1;
+ }
}
}
- if (updateLocalWeight) {
- localWeight = entry->localWeight + age;
+ //pick the existing value of dirty, if no value has been passed in
+ bool is_dirty = false;
+ if (dirty.has_value()) {
+ is_dirty = dirty.value();
+ } else if (entry) {
+ is_dirty = entry->dirty;
}
- if (op == RefCount::INCR) {
- refcount += 1;
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << refcount << dendl;
+
+ if (entry) {
+ entry->key = key;
+ entry->offset = offset;
+ entry->len = len;
+ entry->version = version;
+ entry->dirty = is_dirty;
+ entry->refcount = refcount;
+ entry->localWeight = localWeight;
+ entries_heap.update(entry->handle, entry);
+ } else {
+ LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, is_dirty, refcount, localWeight);
+ handle_type handle = entries_heap.push(e);
+ e->set_handle(handle);
+ entries_map.emplace(key, e);
}
- if (op == RefCount::DECR) {
- if (refcount > 0) {
- refcount -= 1;
+
+ if (updateLocalWeight) {
+ updated_blocks.emplace(key, localWeight);
+ if (updated_blocks.size() >= LOCALWEIGHT_BATCH_SIZE) {
+ should_notify = true;
}
}
- }
- //pick the existing value of dirty, if no value has been passed in
- bool is_dirty = false;
- if (dirty.has_value()) {
- is_dirty = dirty.value();
- } else if (entry) {
- is_dirty = entry->dirty;
- }
- ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): updated refcount is: " << refcount << dendl;
-
- if (entry) {
- entry->key = key;
- entry->offset = offset;
- entry->len = len;
- entry->version = version;
- entry->dirty = is_dirty;
- entry->refcount = refcount;
- entry->localWeight = localWeight;
- entries_heap.update(entry->handle, entry);
- } else {
- LFUDAEntry* e = new LFUDAEntry(key, offset, len, version, is_dirty, refcount, localWeight);
- handle_type handle = entries_heap.push(e);
- e->set_handle(handle);
- entries_map.emplace(key, e);
- }
- if (updateLocalWeight) {
- int ret = -1;
- if ((ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y)) < 0)
- ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl;
+ weightSum += ((localWeight < 0) ? 0 : localWeight);
+ } //lock will be released here
+ if (should_notify) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): notify_one: "<< dendl;
+ lw_cond.notify_one();
}
-
- weightSum += ((localWeight < 0) ? 0 : localWeight);
}
void LFUDAPolicy::update_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, const std::string& version, bool deleteMarker, uint64_t size, double creationTime, const rgw_user& user, const std::string& etag, const std::string& bucket_name, const std::string& bucket_id, const rgw_obj_key& obj_key, uint8_t op, optional_yield y, std::string& restore_val)
state = State::INIT;
}
- const std::lock_guard l(lfuda_cleaning_lock);
- LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
- handle_type handle = object_heap.push(e);
- e->set_handle(handle);
- o_entries_map.emplace(key, std::make_pair(e, state));
+ {
+ const std::lock_guard l(lfuda_cleaning_lock);
+ LFUDAObjEntry* e = new LFUDAObjEntry{key, version, deleteMarker, size, creationTime, user, etag, bucket_name, bucket_id, obj_key};
+ handle_type handle = object_heap.push(e);
+ e->set_handle(handle);
+ o_entries_map.emplace(key, std::make_pair(e, state));
+ }
cond.notify_one();
}
bool LFUDAPolicy::erase_dirty_object(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
- const std::lock_guard l(lfuda_cleaning_lock);
- auto p = o_entries_map.find(key);
- if (p == o_entries_map.end()) {
- return false;
- }
+ {
+ const std::lock_guard l(lfuda_cleaning_lock);
+ auto p = o_entries_map.find(key);
+ if (p == o_entries_map.end()) {
+ return false;
+ }
- object_heap.erase(p->second.first->handle);
- delete p->second.first;
- p->second.first = nullptr;
- o_entries_map.erase(p);
+ object_heap.erase(p->second.first->handle);
+ delete p->second.first;
+ p->second.first = nullptr;
+ o_entries_map.erase(p);
+ }
state_cond.notify_one();
-
return true;
}
}
} //end-if (block.version == entry->version)
} //end - else if op_ret == 0
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
.objName = c_obj->get_name(),
.bucketName = c_obj->get_bucket()->get_bucket_id(),
continue;
}
}
- ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Removing object name: "<< c_obj->get_name() << " score: " << std::setprecision(std::numeric_limits<double>::max_digits10) << e->creationTime << " from ordered set" << dendl;
rgw::d4n::CacheObj dir_obj = rgw::d4n::CacheObj{
.objName = c_obj->get_name(),
.bucketName = c_obj->get_bucket()->get_bucket_id(),
} //end-while true
}
+void LFUDAPolicy::localweight_writer(const DoutPrefixProvider* dpp)
+{
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Starting thread " << dendl;
+ auto TIMEOUT_DURATION = std::chrono::seconds(dpp->get_cct()->_conf->rgw_d4n_localweight_processing_interval);
+ while (!lw_quit.load()) {
+ std::unordered_map<std::string, uint64_t> temp;
+ bool woke_up = false;
+ //sleep for some duration or till size crosses 10K before processing
+ {
+ std::unique_lock<std::mutex> wait_lock(lfuda_lock);
+ woke_up = lw_cond.wait_for(wait_lock, TIMEOUT_DURATION, [this] {
+ return updated_blocks.size() >= LOCALWEIGHT_BATCH_SIZE || lw_quit.load();
+ });
+ if (lw_quit.load()) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Quit signal received, exiting" << dendl;
+ break;
+ }
+ if (!updated_blocks.empty()) {
+ updated_blocks.swap(temp);
+ if (woke_up) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Woke up due to size threshold, processing " << temp.size() << " items" << dendl;
+ } else {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Woke up due to timeout, processing " << temp.size() << " items" << dendl;
+ }
+ }
+ } //lock released here
+ if (!temp.empty()) {
+ ldpp_dout(dpp, 5) << "LFUDAPolicy::" << __func__ << "(): Processing batch of " << temp.size() << " items" << dendl;
+ for (auto& it : temp) {
+ if (lw_quit.load()) {
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): Quit signal received, exiting" << lw_quit << dendl;
+ break;
+ }
+ auto& key = it.first;
+ auto localWeight = it.second;
+ ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method called for key: " << key << dendl;
+ int ret = cacheDriver->set_attr(dpp, key, RGW_CACHE_ATTR_LOCAL_WEIGHT, std::to_string(localWeight), y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed, ret=" << ret << dendl;
+ }
+ } //end-for
+ ldpp_dout(dpp, 5) << "LFUDAPolicy::" << __func__ << "(): Finished processing batch" << dendl;
+ } //end-if
+ }//end-while
+ ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): Thread exiting" << dendl;
+}
+
int LRUPolicy::exist_key(const std::string& key)
{
const std::lock_guard l(lru_lock);
std::string version;
bool dirty;
uint64_t refcount{0};
+ Entry() = default;
Entry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount) : key(key), offset(offset),
len(len), version(version), dirty(dirty), refcount(refcount) {}
};
int localWeight;
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
handle_type handle;
-
+ LFUDAEntry() = default;
LFUDAEntry(const std::string& key, uint64_t offset, uint64_t len, const std::string& version, bool dirty, uint64_t refcount, int localWeight) : Entry(key, offset, len, version, dirty, refcount),
localWeight(localWeight) {}
std::mutex lfuda_cleaning_lock;
std::condition_variable cond;
std::condition_variable state_cond;
+ std::condition_variable lw_cond;
inline static std::atomic<bool> quit{false};
+ inline static std::atomic<bool> lw_quit{false};
int age = 1, weightSum = 0, postedSum = 0;
optional_yield y = null_yield;
std::optional<asio::steady_timer> rthread_timer;
rgw::sal::Driver* driver;
std::thread tc;
+ std::thread lwthread;
+ //data structure for accumulating updated blocks
+ std::unordered_map<std::string, uint64_t> updated_blocks;
+ static constexpr size_t LOCALWEIGHT_BATCH_SIZE = 10000;
CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
int age_sync(const DoutPrefixProvider* dpp, optional_yield y);
delete blockDir;
delete objDir;
quit = true;
+ lw_quit = true;
cond.notify_all();
+ lw_cond.notify_all();
if (tc.joinable()) { tc.join(); }
+ if (lwthread.joinable()) { lwthread.join(); }
for (auto& it : entries_map) {
delete it.second;
}
for (auto& it : o_entries_map) {
delete it.second.first;
}
- }
+ }
virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
virtual int exist_key(const std::string& key) override;
return it->second.first;
}
void save_y(optional_yield y) { this->y = y; }
+ void localweight_writer(const DoutPrefixProvider* dpp);
};
class LRUPolicy : public CachePolicy {