*
*/
+#include "rgw_redis_driver.h"
+#include "rgw_ssd_driver.h"
#include "rgw_sal_d4n.h"
#define dout_subsys ceph_subsys_rgw
return dynamic_cast<FilterObject*>(t)->get_next();
}
+D4NFilterDriver::D4NFilterDriver(Driver* _next) : FilterDriver(_next)
+{
+ rgw::cache::Partition partition_info;
+ partition_info.location = g_conf()->rgw_d3n_l1_datacache_persistent_path;
+ partition_info.name = "d4n";
+ partition_info.type = "read-cache";
+ partition_info.size = g_conf()->rgw_d3n_l1_datacache_size;
+
+ cacheDriver = new rgw::cache::SSDDriver(partition_info);
+ objDir = new rgw::d4n::ObjectDirectory();
+ blockDir = new rgw::d4n::BlockDirectory();
+ cacheBlock = new rgw::d4n::CacheBlock();
+ policyDriver = new rgw::d4n::PolicyDriver("lfuda");
+}
+
+ D4NFilterDriver::~D4NFilterDriver()
+ {
+ delete cacheDriver;
+ delete objDir;
+ delete blockDir;
+ delete cacheBlock;
+ delete policyDriver;
+}
+
int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
{
FilterDriver::initialize(cct, dpp);
completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
}
+ ldpp_dout(dpp, 20) << "D4NFilterObject::returning from flush:: " << dendl;
return 0;
}
//len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache
uint64_t cost = 0, len_to_read = 0, part_len = 0;
- if (y) {
- aio = rgw::make_throttle(window_size, y);
+ aio = rgw::make_throttle(window_size, y);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size <<
- " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size <<
+ " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
- this->offset = ofs;
+ this->offset = ofs;
- do {
- uint64_t id = adjusted_start_ofs;
- if (start_part_num == (num_parts - 1)) {
- len_to_read = len;
- part_len = len;
- cost = len;
- } else {
- len_to_read = obj_max_req_size;
- cost = obj_max_req_size;
- part_len = obj_max_req_size;
- }
- if (start_part_num == 0) {
- len_to_read -= diff_ofs;
- id += diff_ofs;
- }
+ do {
+ uint64_t id = adjusted_start_ofs;
+ if (start_part_num == (num_parts - 1)) {
+ len_to_read = len;
+ part_len = len;
+ cost = len;
+ } else {
+ len_to_read = obj_max_req_size;
+ cost = obj_max_req_size;
+ part_len = obj_max_req_size;
+ }
+ if (start_part_num == 0) {
+ len_to_read -= diff_ofs;
+ id += diff_ofs;
+ }
+
+ uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
+ ceph::bufferlist bl;
+ std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
+ " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
- ceph::bufferlist bl;
- rgw_raw_obj r_obj;
- r_obj.oid = oid;
+ if (source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) {
+ // Read From Cache
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" <<
- oid << " length to read is: " << len_to_read << " part num: " << start_part_num <<
- " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+
+ auto r = flush(dpp, std::move(completed));
+
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+ return r;
+ }
+ } else {
+ oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+ //for ranged requests, for last part, the whole part might exist in the cache
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" << oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
+ " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
- if (source->driver->get_cache_driver()->get(dpp, oid, ofs, part_len, bl, source->get_attrs()) == 0) {
+ if ((part_len != obj_max_req_size) && source->driver->get_cache_driver()->key_exists(dpp, oid_in_cache)) {
// Read From Cache
- auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(),
- read_ofs, len_to_read, oid), cost, id);
+ auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid << dendl;
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
auto r = flush(dpp, std::move(completed));
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
return r;
}
- } else {
- //for ranged requests, for last part, the whole part might exist in the cache
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" <<
- oid << " length to read is: " << len_to_read << " part num: " << start_part_num <<
- " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
-
- if (source->driver->get_cache_driver()->get(dpp, oid, ofs, obj_max_req_size, bl, source->get_attrs()) == 0) {
- // Read From Cache
- auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(),
- read_ofs, len_to_read, oid), cost, id);
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
- auto r = flush(dpp, std::move(completed));
+ auto r = drain(dpp);
- if (r < 0) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
- return r;
- }
- } else {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid << dendl;
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+ return r;
+ }
- auto r = drain(dpp);
+ break;
+ }
+ }
- if (r < 0) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
- return r;
- }
+ if (start_part_num == (num_parts - 1)) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ return drain(dpp);
+ } else {
+ adjusted_start_ofs += obj_max_req_size;
+ }
- break;
- }
- }
+ start_part_num += 1;
+ len -= obj_max_req_size;
+ } while (start_part_num < num_parts);
- if (start_part_num == (num_parts - 1)) {
- ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid << dendl;
- return drain(dpp);
- } else {
- adjusted_start_ofs += obj_max_req_size;
- }
- start_part_num += 1;
- len -= obj_max_req_size;
- } while (start_part_num < num_parts);
- }
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
const std::lock_guard l(d3n_get_data.d3n_lock);
if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
- filter->get_cache_driver()->put(save_dpp, this->oid, bl, bl.length(), source->get_attrs());
+ std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
} else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
+ std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
ofs += bl_len;
-
- filter->get_cache_driver()->put(save_dpp, this->oid, bl, bl.length(), source->get_attrs());
+ filter->get_cache_driver()->put_async(save_dpp, oid, bl, bl.length(), source->get_attrs());
} else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
bl_rem.claim_append(bl_copy);
if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) {
+ std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
ofs += bl_rem.length();
- filter->get_cache_driver()->put(save_dpp, this->oid, bl_rem, bl_rem.length(), source->get_attrs());
+ filter->get_cache_driver()->put_async(save_dpp, oid, bl_rem, bl_rem.length(), source->get_attrs());
bl_rem.clear();
bl_rem = std::move(bl);