default: 60
services:
- rgw
+ flags:
+ - startup
+ with_legacy: true
+- name: d4n_writecache_enabled
+ type: bool
+ level: advanced
+ desc: The d4n write cache
+ default: false
+ services:
+ - rgw
+ flags:
+ - startup
with_legacy: true
- name: rgw_backend_store
type: str
flags:
- startup
with_legacy: true
+- name: rgw_d4n_cache_cleaning_interval
+ type: int
+ level: advanced
+ desc: This is the interval for invoking write cache cleaning process
+ default: 1000
+ services:
+ - rgw
+ flags:
+ - startup
+ with_legacy: true
- name: rgw_topic_persistency_time_to_live
type: uint
level: advanced
flags:
- startup
see_also:
- with_legacy: true
\ No newline at end of file
+ with_legacy: true
+- name: rgw_d4n_backend_address
+ type: str
+ level: advanced
+ desc: The backend address used by D4N cache
+ default: 127.0.0.1:6379
+ services:
+ - rgw
+ flags:
+ - startup
+ with_legacy: true
block->cacheObj.bucketName = std::get<0>(resp).value()[6];
block->cacheObj.creationTime = std::get<0>(resp).value()[7];
block->cacheObj.dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
+ block->dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
{
std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[9]));
CacheObj cacheObj;
uint64_t blockID;
std::string version;
+ bool dirty;
uint64_t size; /* Block size in bytes */
int globalWeight = 0; /* LFUDA policy variable */
std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
}
}
-int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) {
+int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) {
+ this->cct = cct;
dir->init(cct);
+ driver = _driver;
+ tc = std::thread(&CachePolicy::cleaning, this, dpp);
+ tc.detach();
int result = 0;
response<int, int, int, int> resp;
return 0;
}
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
{
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
const std::lock_guard l(lfuda_lock);
erase(dpp, key, y);
- LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, localWeight);
+ LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, creationTime, user, localWeight);
handle_type handle = entries_heap.push(e);
e->set_handle(handle);
entries_map.emplace(key, e);
- if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(localWeight), y) < 0)
+ std::string oid_in_cache = key;
+ if (dirty == true)
+ oid_in_cache = "D_"+key;
+
+ if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0)
ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
weightSum += ((localWeight < 0) ? 0 : localWeight);
}
+void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+{
+ eraseObj(dpp, key, y);
+
+ const std::lock_guard l(lfuda_lock);
+ LFUDAObjEntry *e = new LFUDAObjEntry(key, version, dirty, size, creationTime, user, etag);
+ o_entries_map.emplace(key, e);
+}
+
+
bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
auto p = entries_map.find(key);
return true;
}
+bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+{
+ const std::lock_guard l(lfuda_lock);
+ auto p = o_entries_map.find(key);
+ if (p == o_entries_map.end()) {
+ return false;
+ }
+
+ o_entries_map.erase(p);
+
+ return true;
+}
+
+void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
+{
+ const int interval = cct->_conf->rgw_d4n_cache_cleaning_interval;
+ while(true){
+ ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl;
+ std::string name = "";
+ std::string b_name = "";
+ std::string key = "";
+ uint64_t len = 0;
+ rgw::sal::Attrs obj_attrs;
+ int count = 0;
+
+ for (auto it = o_entries_map.begin(); it != o_entries_map.end(); it++){
+ if ((it->second->dirty == true) && (std::difftime(time(NULL), it->second->creationTime) > interval)){ //if block is dirty and written more than interval seconds ago
+ name = it->first;
+ rgw_user c_rgw_user = it->second->user;
+
+ size_t pos = 0;
+ std::string delimiter = "_";
+ while ((pos = name.find(delimiter)) != std::string::npos) {
+ if (count == 0){
+ b_name = name.substr(0, pos);
+ name.erase(0, pos + delimiter.length());
+ }
+ count ++;
+ }
+ key = name;
+
+ //writing data to the backend
+ //we need to create an atomic_writer
+ rgw_obj_key c_obj_key = rgw_obj_key(key);
+ std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
+
+ std::unique_ptr<rgw::sal::Bucket> c_bucket;
+ rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, b_name, "");
+
+ RGWBucketInfo c_bucketinfo;
+ c_bucketinfo.bucket = c_rgw_bucket;
+ c_bucketinfo.owner = c_rgw_user;
+
+
+ int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
+ break;
+ }
+
+ std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(c_obj_key);
+
+ ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
+
+ std::unique_ptr<rgw::sal::Writer> processor = driver->get_atomic_writer(dpp,
+ null_yield,
+ c_obj.get(),
+ owner,
+ NULL,
+ 0,
+ "");
+
+ int op_ret = processor->prepare(null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << "processor->prepare() returned ret=" << op_ret << dendl;
+ break;
+ }
+
+ std::string prefix = b_name+"_"+key;
+ off_t lst = it->second->size;
+ off_t fst = 0;
+ off_t ofs = 0;
+
+
+ rgw::sal::DataProcessor *filter = processor.get();
+ do {
+ ceph::bufferlist data;
+ if (fst >= lst){
+ break;
+ }
+ off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+ off_t cur_len = cur_size - fst;
+ std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+ std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+ cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, obj_attrs, null_yield);
+ len = data.length();
+ fst += len;
+
+ if (len == 0) {
+ break;
+ }
+
+ op_ret = filter->process(std::move(data), ofs);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << "processor->process() returned ret="
+ << op_ret << dendl;
+ return;
+ }
+
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
+ block.cacheObj.objName = c_obj->get_key().get_oid();
+ block.size = len;
+ block.blockID = ofs;
+ op_ret = dir->update_field(&block, "dirty", "false", null_yield);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 20) << "updating dirty flag in Block directory failed!" << dendl;
+ return;
+ }
+
+ cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
+
+ ofs += len;
+ } while (len > 0);
+
+ op_ret = filter->process({}, ofs);
+
+ const req_context rctx{dpp, null_yield, nullptr};
+ ceph::real_time mtime = ceph::real_clock::from_time_t(it->second->creationTime);
+ op_ret = processor->complete(lst, it->second->etag, &mtime, ceph::real_clock::from_time_t(it->second->creationTime), obj_attrs,
+ std::nullopt, ceph::real_time(), nullptr, nullptr,
+ nullptr, nullptr, nullptr,
+ rctx, rgw::sal::FLAG_LOG_OP);
+
+ //data is clean now, updating in-memory metadata
+ it->second->dirty = false;
+ }
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+ }
+}
+
+
int LRUPolicy::exist_key(std::string key)
{
const std::lock_guard l(lru_lock);
return 0;
}
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
{
const std::lock_guard l(lru_lock);
_erase(dpp, key, y);
- Entry *e = new Entry(key, offset, len, version);
+ Entry *e = new Entry(key, offset, len, version, dirty, creationTime, user);
entries_lru_list.push_back(*e);
entries_map.emplace(key, e);
}
+void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+{
+ eraseObj(dpp, key, y);
+ const std::lock_guard l(lru_lock);
+ ObjEntry *e = new ObjEntry(key, version, dirty, size, creationTime, user, etag);
+ o_entries_map.emplace(key, e);
+ return;
+}
+
+
bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
const std::lock_guard l(lru_lock);
return true;
}
+bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+{
+ const std::lock_guard l(lru_lock);
+ auto p = o_entries_map.find(key);
+ if (p == o_entries_map.end()) {
+ return false;
+ }
+ o_entries_map.erase(p);
+ return true;
+}
+
} } // namespace rgw::d4n
uint64_t offset;
uint64_t len;
std::string version;
- Entry(std::string& key, uint64_t offset, uint64_t len, std::string version) : key(key), offset(offset),
- len(len), version(version) {}
+ bool dirty;
+ time_t creationTime;
+ rgw_user user;
+ Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, rgw_user user) : key(key), offset(offset),
+ len(len), version(version), dirty(dirty), creationTime(creationTime), user(user) {}
};
-
+
+
//The disposer object function
struct Entry_delete_disposer {
void operator()(Entry *e) {
}
};
+ struct ObjEntry : public boost::intrusive::list_base_hook<> {
+ std::string key;
+ std::string version;
+ bool dirty;
+ uint64_t size;
+ time_t creationTime;
+ rgw_user user;
+ std::string etag;
+ ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag) {}
+ };
+
+ struct ObjEntry_delete_disposer {
+ void operator()(ObjEntry *e) {
+ delete e;
+ }
+ };
+
public:
CachePolicy() {}
virtual ~CachePolicy() = default;
- virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) = 0;
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) = 0;
virtual int exist_key(std::string key) = 0;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) = 0;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) = 0;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
+ virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
+ virtual void cleaning(const DoutPrefixProvider* dpp) = 0;
};
class LFUDAPolicy : public CachePolicy {
using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
handle_type handle;
- LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, int localWeight) : Entry(key, offset, len, version),
- localWeight(localWeight) {}
+ LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, time_t creationTime, rgw_user user, int localWeight) : Entry(key, offset, len, version, dirty, creationTime, user), localWeight(localWeight) {}
void set_handle(handle_type handle_) { handle = handle_; }
};
+ struct LFUDAObjEntry : public ObjEntry {
+ LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : ObjEntry(key, version, dirty, size, creationTime, user, etag) {}
+ };
+
using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
Heap entries_heap;
std::unordered_map<std::string, LFUDAEntry*> entries_map;
- std::mutex lfuda_lock;
+ std::unordered_map<std::string, LFUDAObjEntry*> o_entries_map;
+ std::mutex lfuda_lock;
int age = 1, weightSum = 0, postedSum = 0;
optional_yield y = null_yield;
BlockDirectory* dir;
rgw::cache::CacheDriver* cacheDriver;
std::optional<asio::steady_timer> rthread_timer;
+ rgw::sal::Driver *driver;
+ std::thread tc;
+ CephContext* cct;
CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
int age_sync(const DoutPrefixProvider* dpp, optional_yield y);
delete dir;
}
- virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context);
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
void save_y(optional_yield y) { this->y = y; }
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
+ virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+ virtual void cleaning(const DoutPrefixProvider* dpp) override;
+ LFUDAObjEntry* find_obj_entry(const std::string& key) {
+ auto it = o_entries_map.find(key);
+ if (it == o_entries_map.end())
+ return nullptr;
+ return it->second;
+ }
};
class LRUPolicy : public CachePolicy {
typedef boost::intrusive::list<Entry> List;
std::unordered_map<std::string, Entry*> entries_map;
+ std::unordered_map<std::string, ObjEntry*> o_entries_map;
std::mutex lru_lock;
List entries_lru_list;
rgw::cache::CacheDriver* cacheDriver;
public:
LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
- virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) { return 0; }
+ virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; }
virtual int exist_key(std::string key) override;
virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
- virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
+ virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
+ virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+ virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+ virtual void cleaning(const DoutPrefixProvider* dpp) override {}
};
class PolicyDriver {
cacheDriver->initialize(dpp);
objDir->init(cct);
blockDir->init(cct);
- policyDriver->get_cache_policy()->init(cct, dpp, io_context);
+ policyDriver->get_cache_policy()->init(cct, dpp, io_context, next);
return 0;
}
ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
- this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, y);
+ time_t creationTime = ceph::real_clock::to_time_t(this->get_mtime());
+ this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, creationTime, get<rgw_user>(this->get_bucket()->get_owner()), y);
ret = set_head_obj_dir_entry(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
if (ret == 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, y);
+ time_t creationTime = ceph::real_clock::to_time_t(this->source->get_mtime());
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
ret = source->set_head_obj_dir_entry(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
std::pair<uint64_t, uint64_t> ofs_len_pair = it->second;
uint64_t ofs = ofs_len_pair.first;
uint64_t len = ofs_len_pair.second;
+ bool dirty = false;
+ /*
+ std::stringstream s;
+ utime_t ut(source->get_mtime());
+ ut.gmtime(s);
+ */
+ time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
+
+ rgw::d4n::CacheBlock block;
+ block.cacheObj.objName = source->get_key().get_oid();
+ block.cacheObj.bucketName = source->get_bucket()->get_name();
+ block.blockID = ofs;
+ block.size = len;
+
std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
+
+ if (source->driver->get_block_dir()->get(&block, y) == 0){
+ if (block.dirty == true){
+ dirty = true;
+ }
+ }
+
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
- source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y);
+ source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
blocks_info.erase(it);
} else {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address);
if (it != block.hostsList.end()) { /* Local copy */
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl;
-
+ std::string key = oid_in_cache;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.dirty << dendl;
+ if (block.dirty == true) {
+ key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl;
+ }
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
if (block.version == version) {
if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
// Read From Cache
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
" length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+ if (block.dirty == true){
+ key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+ }
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
// Read From Cache
- auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);
this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
auto r = flush(dpp, std::move(completed), y);
if (r < 0) {
- drain(dpp, y);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
- return r;
+ drain(dpp, y);
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+ return r;
}
}
}
block.cacheObj.objName = source->get_key().get_oid();
block.cacheObj.bucketName = source->get_bucket()->get_name();
std::stringstream s;
- utime_t ut(source->get_mtime());
- ut.gmtime(s);
- block.cacheObj.creationTime = s.str();
+ block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
block.cacheObj.dirty = false;
+ bool dirty = false;
+ time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
//populating fields needed for building directory index
existing_block.cacheObj.objName = block.cacheObj.objName;
block.blockID = ofs;
block.size = bl.length();
block.version = version;
+ block.dirty = false; //Reading from the backend, data is clean
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+ std::string objEtag = "";
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
+ filter->get_policy_driver()->get_cache_policy()->updateObj(dpp, prefix, version, dirty, source->get_size(), creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), objEtag, *y);
/* Store block in directory */
if (!blockDir->exist_key(&block, *y)) {
} else {
existing_block.blockID = block.blockID;
existing_block.size = block.size;
+ existing_block.dirty = block.dirty;
if (blockDir->get(&existing_block, *y) < 0) {
ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
} else {
block.size = bl.length();
block.version = version;
ofs += bl_len;
-
+ block.dirty = dirty;
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
/* Store block in directory */
if (!blockDir->exist_key(&block, *y)) {
} else {
existing_block.blockID = block.blockID;
existing_block.size = block.size;
+ existing_block.dirty = block.dirty;
if (blockDir->get(&existing_block, *y) < 0) {
ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
}
block.size = bl_rem.length();
block.version = version;
ofs += bl_rem.length();
-
+ block.dirty = dirty;
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
- filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y);
+ filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
/* Store block in directory */
if (!blockDir->exist_key(&block, *y)) {
} else {
existing_block.blockID = block.blockID;
existing_block.size = block.size;
+ existing_block.dirty = block.dirty;
if (blockDir->get(&existing_block, *y) < 0) {
ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
} else {
if (blockDir->set(&block, *y) < 0)
ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
} else {
- if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
- ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+ ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
}
}
}
return next->delete_obj(dpp, y, flags);
}
+
int D4NFilterWriter::prepare(optional_yield y)
{
+ startTime = time(NULL);
if (driver->get_cache_driver()->delete_data(save_dpp, obj->get_key().get_oid(), y) < 0)
ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl;
-
- return next->prepare(y);
+ d4n_writecache = g_conf()->d4n_writecache_enabled;
+ if (!d4n_writecache){
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl;
+ return next->prepare(y);
+ }
+ else
+ return 0;
}
int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
{
- /*
- int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(),
- data, y);
+ bufferlist bl = data;
+ off_t bl_len = bl.length();
+ off_t ofs = offset;
+ bool dirty = true;
+ rgw::d4n::CacheBlock block, existing_block;
+ auto creationTime = startTime;
- if (append_dataReturn < 0) {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl;
- } else {
- ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl;
- }*/
+ auto version = obj->get_instance();
- return next->process(std::move(data), offset);
+ std::string prefix;
+ if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+ prefix =obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
+ } else {
+ prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
+ }
+
+ rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
+
+ block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+ block.cacheObj.bucketName = obj->get_bucket()->get_name();
+ block.cacheObj.objName = obj->get_key().get_oid();
+ block.cacheObj.dirty = dirty;
+ existing_block.cacheObj.objName = block.cacheObj.objName;
+ existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
+
+
+ int ret = 0;
+
+ if (!d4n_writecache){
+ std::string oid = prefix + "_" + std::to_string(ofs)+ "_" + std::to_string(bl_len);
+ block.size = bl.length();
+ block.blockID = ofs;
+ block.dirty = false; //writing to the backend, hence the data is clean
+ block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_backend_address);
+
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl;
+ ret = next->process(std::move(data), offset);
+ if (ret == 0){
+ if (!blockDir->exist_key(&block, y)) {
+ if (blockDir->set(&block, y) < 0)
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+ if (blockDir->get(&existing_block, y) < 0) {
+ ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+ } else {
+ if (existing_block.version != block.version) {
+ if (blockDir->del(&existing_block, y) < 0) //delete existing block
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+ if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+ }
+ }
+ }
+ } else{
+ ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writing data to the backend failed!" << dendl;
+ return ret;
+ }
+ } else {
+ std::string oid = prefix + "_" + std::to_string(ofs);
+ std::string key = "D_" + oid + "_" + std::to_string(bl_len);
+ std::string oid_in_cache = oid + "_" + std::to_string(bl_len);
+ block.size = bl.length();
+ block.blockID = ofs;
+ block.dirty = true;
+ block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+ dirty = true;
+ ret = driver->get_policy_driver()->get_cache_policy()->eviction(save_dpp, block.size, y);
+ if (ret == 0) {
+ //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
+ if (bl.length() > 0) {
+ ret = driver->get_cache_driver()->put(save_dpp, key, bl, bl.length(), obj->get_attrs(), y);
+ if (ret == 0) {
+ driver->get_policy_driver()->get_cache_policy()->update(save_dpp, oid_in_cache, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), y);
+ if (!blockDir->exist_key(&block, y)) {
+ if (blockDir->set(&block, y) < 0) //should we revert previous steps if this step fails?
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ existing_block.blockID = block.blockID;
+ existing_block.size = block.size;
+ if (blockDir->get(&existing_block, y) < 0) {
+ ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+ } else {
+ if (existing_block.version != block.version) {
+ if (blockDir->del(&existing_block, y) < 0) //delete existing block
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+ if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+ } else {
+ if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
+ ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+ }
+ }
+ }
+ } else {
+ ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl;
+ return ret;
+ }
+ }
+ }
+ }
+ return 0;
}
int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
const req_context& rctx,
uint32_t flags)
{
- rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+ bool dirty = true;
+ std::vector<std::string> hostsList = {};
+ auto creationTime = startTime;
+ std::string objEtag = etag;
+
+ auto version = obj->get_instance();
+
+ std::string prefix;
+ if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+ prefix = obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
+ } else {
+ prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
+ }
+
+
+ if (d4n_writecache){
+ driver->get_policy_driver()->get_cache_policy()->updateObj(save_dpp, prefix, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, y);
+ hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address };
+
+ rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
.objName = obj->get_key().get_oid(),
.bucketName = obj->get_bucket()->get_name(),
- .creationTime = to_iso_8601(*mtime),
- .dirty = false,
- .hostsList = { /*driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address*/ } //TODO: Object is not currently being cached
+ .creationTime = std::to_string(creationTime),
+ .dirty = dirty,
+ .hostsList = hostsList
};
- if (driver->get_obj_dir()->set(&object, y) < 0)
- ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed." << dendl;
-
+ if (driver->get_obj_dir()->set(&object, y) < 0)
+ ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed." << dendl;
+ return 0;
+ }
/* Retrieve complete set of attrs */
int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
delete_at, if_match, if_nomatch, user_data, zones_trace,
buffer::list bl;
obj->load_obj_state(save_dpp, rctx.y);
- bl.append(to_iso_8601(obj->get_mtime()));
+ bl.append(std::to_string(creationTime));
baseAttrs.insert({"mtime", bl});
bl.clear();
const DoutPrefixProvider* save_dpp;
bool atomic;
optional_yield y;
+ bool d4n_writecache;
+ time_t startTime;
public:
D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj,
virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) = 0;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) = 0;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) = 0;
+ virtual int rename(const DoutPrefixProvider* dpp, const::std::string& oldKey, const::std::string& newKey, optional_yield y) = 0;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) = 0;
virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) = 0;
virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) = 0;
return 0;
}
+int SSDDriver::rename(const DoutPrefixProvider* dpp, const::std::string& oldKey, const::std::string& newKey, optional_yield y)
+{
+ std::string location = partition_info.location;
+
+ int ret = std::rename((location+oldKey).c_str(), (location+newKey).c_str());
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "SSDDriver: ERROR: failed to rename the file: " << location+oldKey << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+
int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location)
{
std::string location = cache_location + key;
ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_yield_cb: temp_key: " << op.temp_key << dendl;
- ret = rename(old_path.c_str(), new_path.c_str());
+ ret = std::rename(old_path.c_str(), new_path.c_str());
if (ret < 0) {
ret = errno;
ldpp_dout(op.dpp, 0) << "ERROR: put::rename: failed to rename file: " << ret << dendl;
virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) override;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override;
+ virtual int rename(const DoutPrefixProvider* dpp, const::std::string& oldKey, const::std::string& newKey, optional_yield y) override;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) override;