return ret;
}
+void D4NFilterObject::D4NFilterReadOp::cancel() {
+ aio->drain();
+}
+
+int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp) {
+ auto c = aio->wait();
+ while (!c.empty()) {
+ int r = flush(dpp, std::move(c));
+ if (r < 0) {
+ cancel();
+ return r;
+ }
+ c = aio->wait();
+ }
+ return flush(dpp, std::move(c));
+}
+
+int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
+ int r = rgw::check_for_errors(results);
+
+ if (r < 0) {
+ return r;
+ }
+
+ std::list<bufferlist> bl_list;
+
+ auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+ results.sort(cmp); // merge() requires results to be sorted first
+ completed.merge(results, cmp); // merge results in sorted order
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::In flush:: " << dendl;
+
+ while (!completed.empty() && completed.front().id == offset) {
+ auto bl = std::move(completed.front().data);
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl;
+
+ bl_list.push_back(bl);
+ offset += bl.length();
+ int r = client_cb->handle_data(bl, 0, bl.length());
+ if (r < 0) {
+ return r;
+ }
+ completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
+ }
+
+ return 0;
+}
+
int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
RGWGetDataCB* cb, optional_yield y)
{
- /* Execute cache replacement policy */
+ const uint64_t window_size = g_conf()->rgw_get_obj_window_size;
+ std::string oid = source->get_key().get_oid();
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "oid: " << oid << " ofs: " << ofs << " end: " << end << dendl;
+
+ this->client_cb = cb;
+ this->cb->set_client_cb(cb); // what's this for? -Sam
+
+ uint64_t obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
+ uint64_t start_part_num = 0;
+ uint64_t part_num = ofs/obj_max_req_size; //part num of ofs wrt start of the object
+ uint64_t adjusted_start_ofs = part_num*obj_max_req_size; //in case of ranged request, adjust the start offset to the beginning of a chunk/ part
+ uint64_t diff_ofs = ofs - adjusted_start_ofs; //difference between actual offset and adjusted offset
+ off_t len = (end - adjusted_start_ofs) + 1;
+ uint64_t num_parts = (len%obj_max_req_size) == 0 ? len/obj_max_req_size : (len/obj_max_req_size) + 1; //calculate num parts based on adjusted offset
+ //len_to_read is the actual length read from a part/ chunk in cache, while part_len is the length of the chunk/ part in cache
+ uint64_t cost = 0, len_to_read = 0, part_len = 0;
+
+ if (y) {
+ aio = rgw::make_throttle(window_size, y);
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "obj_max_req_size " << obj_max_req_size <<
+ " num_parts " << num_parts << " adjusted_start_offset: " << adjusted_start_ofs << " len: " << len << dendl;
+
+ this->offset = ofs;
+
+ do {
+ uint64_t id = adjusted_start_ofs;
+ if (start_part_num == (num_parts - 1)) {
+ len_to_read = len;
+ part_len = len;
+ cost = len;
+ } else {
+ len_to_read = obj_max_req_size;
+ cost = obj_max_req_size;
+ part_len = obj_max_req_size;
+ }
+ if (start_part_num == 0) {
+ len_to_read -= diff_ofs;
+ id += diff_ofs;
+ }
+
+ uint64_t read_ofs = diff_ofs; //read_ofs is the actual offset to start reading from the current part/ chunk
+ std::string oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
+ rgw_raw_obj r_obj;
+ r_obj.oid = oid_in_cache;
+ ceph::bufferlist bl;
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" <<
+ oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
+ " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+
+ if (source->driver->get_cache_driver()->get(dpp, oid_in_cache, ofs, part_len, bl, source->get_attrs()) == 0) {
+ // Read From Cache
+ auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(),
+ read_ofs, len_to_read, oid_in_cache), cost, id);
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+
+ auto r = flush(dpp, std::move(completed));
+
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+ return r;
+ }
+ } else {
+ //for ranged requests, for last part, the whole part might exist in the cache
+ oid_in_cache = source->get_bucket()->get_marker() + "_" + oid + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(obj_max_req_size);
+ r_obj.oid = oid_in_cache;
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: oid=" <<
+ oid_in_cache << " length to read is: " << len_to_read << " part num: " << start_part_num <<
+ " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
+
+ if (source->driver->get_cache_driver()->get(dpp, oid_in_cache, ofs, obj_max_req_size, bl, source->get_attrs()) == 0) {
+ // Read From Cache
+ auto completed = aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, source->driver->get_cache_driver(),
+ read_ofs, len_to_read, oid_in_cache), cost, id);
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
+
+ auto r = flush(dpp, std::move(completed));
+
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+ return r;
+ }
+ } else {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+
+ auto r = drain(dpp);
+
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
+ return r;
+ }
+
+ break;
+ }
+ }
+
+ if (start_part_num == (num_parts - 1)) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
+ return drain(dpp);
+ } else {
+ adjusted_start_ofs += obj_max_req_size;
+ }
+
+ start_part_num += 1;
+ len -= obj_max_req_size;
+ } while (start_part_num < num_parts);
+ }
+
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Fetching object from backend store" << dendl;
+
+ Attrs obj_attrs;
+ if (source->has_attrs()) {
+ obj_attrs = source->get_attrs();
+ }
+
+ if (source->is_compressed() || obj_attrs.find(RGW_ATTR_CRYPT_MODE) != obj_attrs.end() || !y) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Skipping writing to cache" << dendl;
+ this->cb->bypass_cache_write();
+ }
+
+ if (start_part_num == 0) {
+ this->cb->set_ofs(ofs);
+ } else {
+ this->cb->set_ofs(adjusted_start_ofs);
+ ofs = adjusted_start_ofs; // redundant? -Sam
+ }
+
+ this->cb->set_ofs(ofs);
+ auto r = next->iterate(dpp, ofs, end, this->cb.get(), y);
+
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to fetch object from backend store, r= " << r << dendl;
+ return r;
+ }
+
+ return this->cb->flush_last_part(dpp);
+
+ /*
+ / Execute cache replacement policy /
int policyRet = source->driver->get_policy_driver()->cachePolicy->get_block(dpp, source->driver->get_cache_block(),
source->driver->get_cache_driver());
uint64_t len = end - ofs + 1;
std::string oid(source->get_name());
- /* Local cache check */
+ / Local cache check /
if (source->driver->get_cache_driver()->key_exists(dpp, oid)) { // Entire object for now -Sam
ret = source->driver->get_cache_driver()->get(dpp, source->get_key().get_oid(), ofs, len, bl, source->get_attrs());
cb->handle_data(bl, ofs, len);
} else {
- /* Block directory check */
+ / Block directory check /
int getDirReturn = source->driver->get_block_dir()->get_value(source->driver->get_cache_block());
if (getDirReturn >= -1) {
// remote cache get
- /* Cache block locally */
+ / Cache block locally /
ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
if (!ret) {
- int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+ int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/local cache ip from config/);
if (updateValueReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
cb->handle_data(bl, ofs, len);
}
} else {
- /* Write tier retrieval */
+ / Write tier retrieval /
ldpp_dout(dpp, 20) << "D4N Filter: Block directory get operation failed." << dendl;
getDirReturn = source->driver->get_obj_dir()->get_value(&(source->driver->get_cache_block()->cacheObj));
// retrieve from write back cache, which will be stored as a cache driver instance in the filter
- /* Cache block locally */
+ / Cache block locally /
ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
if (!ret) {
- int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/*local cache ip from config*/);
+ int updateValueReturn = source->driver->get_block_dir()->update_field(source->driver->get_cache_block(), "hostsList", ""/local cache ip from config/);
if (updateValueReturn < 0) {
ldpp_dout(dpp, 20) << "D4N Filter: Block directory update value operation failed." << dendl;
cb->handle_data(bl, ofs, len);
}
} else {
- /* Backend store retrieval */
+ / Backend store retrieval /
ldpp_dout(dpp, 20) << "D4N Filter: Object directory get operation failed." << dendl;
ret = next->iterate(dpp, ofs, end, cb, y);
if (!ret) {
- /* Cache block locally */
+ / Cache block locally /
ret = source->driver->get_cache_driver()->put(dpp, source->get_key().get_oid(), bl, len, source->get_attrs()); // May be put_async -Sam
- /* Store block in directory */
+ / Store block in directory /
rgw::d4n::BlockDirectory* tempBlockDir = source->driver->get_block_dir(); // remove later -Sam
source->driver->get_cache_block()->hostsList.push_back(tempBlockDir->get_addr().host + ":" + std::to_string(tempBlockDir->get_addr().port)); // local cache address -Sam
if (ret < 0)
ldpp_dout(dpp, 20) << "D4N Filter: Cache iterate operation failed." << dendl;
- return next->iterate(dpp, ofs, end, cb, y);
+ return next->iterate(dpp, ofs, end, cb, y); */
+}
+
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::flush_last_part(const DoutPrefixProvider* dpp)
+{
+ save_dpp = dpp;
+ last_part = true;
+ return handle_data(bl_rem, 0, bl_rem.length());
+}
+
+int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+ auto rgw_get_obj_max_req_size = g_conf()->rgw_get_obj_max_req_size;
+
+ if (!last_part && bl.length() <= rgw_get_obj_max_req_size) {
+ auto r = client_cb->handle_data(bl, bl_ofs, bl_len);
+
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
+ if (write_to_cache) {
+ const std::lock_guard l(d3n_get_data.d3n_lock);
+ Attrs attrs;
+
+ if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
+ std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+
+ filter->get_cache_driver()->put(save_dpp, oid, bl, bl.length(), attrs); // need attrs for just chunk? -Sam
+ } else if (bl.length() == rgw_get_obj_max_req_size && bl_rem.length() == 0) { // if bl is the same size as rgw_get_obj_max_req_size, write it to cache
+ std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
+ ofs += bl_len;
+
+ filter->get_cache_driver()->put(save_dpp, oid, bl, bl.length(), attrs); // need attrs for just chunk? -Sam
+ } else { //copy data from incoming bl to bl_rem till it is rgw_get_obj_max_req_size, and then write it to cache
+ uint64_t rem_space = rgw_get_obj_max_req_size - bl_rem.length();
+ uint64_t len_to_copy = rem_space > bl.length() ? bl.length() : rem_space;
+ bufferlist bl_copy;
+
+ bl.splice(0, len_to_copy, &bl_copy);
+ bl_rem.claim_append(bl_copy);
+
+ if (bl_rem.length() == g_conf()->rgw_get_obj_max_req_size) {
+ std::string oid = this->oid + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
+ ofs += bl_rem.length();
+
+ filter->get_cache_driver()->put(save_dpp, oid, bl_rem, bl_rem.length(), attrs); // need attrs for just chunk? -Sam
+
+ bl_rem.clear();
+ bl_rem = std::move(bl);
+ }
+ }
+ }
+
+ return 0;
}
int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp,
#include "rgw_oidc_provider.h"
#include "rgw_role.h"
#include "common/dout.h"
+#include "rgw_aio_throttle.h"
#include "rgw_redis_driver.h"
#include "driver/d4n/d4n_directory.h"
public:
struct D4NFilterReadOp : FilterReadOp {
- D4NFilterObject* source;
-
- D4NFilterReadOp(std::unique_ptr<ReadOp> _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)),
- source(_source) {}
- virtual ~D4NFilterReadOp() = default;
-
- virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
- virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
- RGWGetDataCB* cb, optional_yield y) override;
+ public:
+ class D4NFilterGetCB: public RGWGetDataCB {
+ private:
+ D4NFilterDriver* filter; // don't need -Sam ?
+ std::string oid;
+ RGWGetDataCB* client_cb;
+ uint64_t ofs = 0, len = 0;
+ bufferlist bl_rem;
+ bool last_part{false};
+ D3nGetObjData d3n_get_data; // should make d4n version? -Sam
+ bool write_to_cache{true};
+
+ public:
+ D4NFilterGetCB(D4NFilterDriver* _filter, std::string& _oid) : filter(_filter),
+ oid(_oid) {}
+
+ const DoutPrefixProvider* save_dpp;
+
+ int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
+ void set_client_cb(RGWGetDataCB* client_cb) { this->client_cb = client_cb;}
+ void set_ofs(uint64_t ofs) { this->ofs = ofs; }
+ int flush_last_part(const DoutPrefixProvider* dpp);
+ void bypass_cache_write() { this->write_to_cache = false; }
+ };
+
+ D4NFilterObject* source;
+
+ D4NFilterReadOp(std::unique_ptr<ReadOp> _next, D4NFilterObject* _source) : FilterReadOp(std::move(_next)),
+ source(_source)
+ {
+ std::string oid = source->get_bucket()->get_marker() + "_" + source->get_key().get_oid();
+ cb = std::make_unique<D4NFilterGetCB>(source->driver, oid);
+ }
+ virtual ~D4NFilterReadOp() = default;
+
+ virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
+ virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end,
+ RGWGetDataCB* cb, optional_yield y) override;
+
+ private:
+ RGWGetDataCB* client_cb;
+ std::unique_ptr<D4NFilterGetCB> cb;
+ std::unique_ptr<rgw::Aio> aio;
+ uint64_t offset = 0; // next offset to write to client
+ rgw::AioResultList completed; // completed read results, sorted by offset
+
+ int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results);
+ void cancel();
+ int drain(const DoutPrefixProvider* dpp);
};
struct D4NFilterDeleteOp : FilterDeleteOp {
const req_context& rctx,
uint32_t flags) override;
bool is_atomic() { return atomic; };
- const DoutPrefixProvider* dpp() { return save_dpp; }
+ const DoutPrefixProvider* dpp() { return save_dpp; }
};
} } // namespace rgw::sal
#include <boost/algorithm/string.hpp>
#include "rgw_redis_driver.h"
+//#include "rgw_ssd_driver.h"
#define dout_subsys ceph_subsys_rgw
#define dout_context g_ceph_context
"max_buckets",
"data"};
-std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary) {
+std::vector< std::pair<std::string, std::string> > build_attrs(rgw::sal::Attrs* binary)
+{
std::vector< std::pair<std::string, std::string> > values;
rgw::sal::Attrs::iterator attrs;
return values;
}
-int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len) {
+int RedisDriver::find_client(const DoutPrefixProvider* dpp)
+{
+ if (client.is_connected())
+ return 0;
+
+ if (addr.host == "" || addr.port == 0) {
+ dout(10) << "RGW Redis Cache: Redis cache endpoint was not configured correctly" << dendl;
+ return EDESTADDRREQ;
+ }
+
+ client.connect(addr.host, addr.port, nullptr);
+
+ if (!client.is_connected())
+ return ECONNREFUSED;
+
+ return 0;
+}
+
+int RedisDriver::insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len)
+{
auto ret = entries.emplace(key, Entry(key, offset, len));
return ret.second;
}
-int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key) {
+int RedisDriver::remove_entry(const DoutPrefixProvider* dpp, std::string key)
+{
return entries.erase(key);
}
-std::optional<Entry> RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key) {
+std::optional<Entry> RedisDriver::get_entry(const DoutPrefixProvider* dpp, std::string key)
+{
auto iter = entries.find(key);
if (iter != entries.end()) {
return std::nullopt;
}
-int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp) {
+/* Currently an attribute but will also be part of the Entry metadata once consistency is guaranteed -Sam
+int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight)
+{
+ auto iter = entries.find(key);
+
+ if (iter != entries.end()) {
+ iter->second.localWeight = localWeight;
+ return 0;
+ }
+
+ return -1;
+}
+*/
+
+int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp)
+{
+ this->cct = cct;
addr.host = cct->_conf->rgw_d4n_host; // change later -Sam
addr.port = cct->_conf->rgw_d4n_port;
return 0;
}
-bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key) {
- int result = -1;
- std::string entryName = "rgw-object:" + key + ":cache";
- std::vector<std::string> keys;
- keys.push_back(entryName);
-
- if (!client.is_connected())
- return ECONNREFUSED;
-
- try {
- client.exists(keys, [&result](cpp_redis::reply &reply) {
- if (reply.is_integer()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {}
-
- return result;
-}
-
-std::vector<Entry> RedisDriver::list_entries(const DoutPrefixProvider* dpp) {
- std::vector<std::string> keys;
- std::vector<Entry> entries;
-
- if (!client.is_connected())
- return {};
-
- try {
- size_t cursor = 0;
- const std::string pattern = "*:cache";
-
- do {
- auto reply = client.scan(cursor, pattern);
- client.sync_commit(std::chrono::milliseconds(1000));
-
- auto arr = reply.get().as_array();
- cursor = std::stoi(arr[0].as_string());
- auto result = arr[1].as_array();
-
- for (auto it = result.begin(); it != result.end(); ++it) {
- int i = std::distance(result.begin(), it);
- std::string entryName = result[i].as_string();
- keys.push_back(entryName.substr(11, entryName.length() - 17));
- }
- } while (cursor != 0);
- } catch(std::exception &e) {
- return {};
- }
-
- /* Construct list of entries */
- for (auto it = keys.begin(); it != keys.end(); ++it) {
- Entry entry;
-
- if (key_exists(dpp, *it)) {
- try {
- std::vector<std::string> fields;
- std::string entryName = "rgw-object:" + *it + ":cache";
-
- entry.key = *it;
- fields.push_back("offset");
- fields.push_back("len");
- fields.push_back("localWeight");
-
- client.hmget(entryName, fields, [&entry](cpp_redis::reply &reply) {
- if (reply.is_array()) {
- auto arr = reply.as_array();
-
- if (!arr[0].is_null()) {
- entry.offset = std::stol(arr[0].as_string().c_str());
- entry.len = std::stoi(arr[1].as_string());
- entry.localWeight = std::stoi(arr[2].as_string());
- }
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return {}; // return failure or skip entry? -Sam
- }
- } else { // if one entry isn't found, shoud entire operation return a failure? -Sam
- dout(20) << "RGW Redis Cache: Entry " << *it << " was not retrievable." << dendl;
- }
-
- entries.push_back(entry);
- }
-
- return entries;
-}
-
-size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp) {
- int result = -1;
-
- if (!client.is_connected())
- return ECONNREFUSED;
-
- try {
- client.keys(":cache", [&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- result = reply.as_integer();
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
-
- if (result < 0) {
- return -1;
- }
- } catch(std::exception &e) {
- return -1;
- }
-
- return result;
-}
-
-Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp) {
- Partition part;
- return part; // Implement -Sam
-}
-
-uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp) {
- int result = -1;
-
- if (!client.is_connected())
- return ECONNREFUSED;
-
- try {
- client.info([&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- int usedMem = -1;
- int maxMem = -1;
-
- std::istringstream iss(reply.as_string());
- std::string line;
- while (std::getline(iss, line)) {
- size_t pos = line.find_first_of(":");
- if (pos != std::string::npos) {
- if (line.substr(0, pos) == "used_memory") {
- usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
- } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") {
- maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
- }
- }
- }
-
- if (usedMem > -1 && maxMem > -1)
- result = maxMem - usedMem;
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
-
- return result;
-}
-
-int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) {
+int RedisDriver::put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
+{
std::string entryName = "rgw-object:" + key + ":cache";
if (!client.is_connected())
client.sync_commit(std::chrono::milliseconds(1000));
- if (result != 0) {
+ if (result <= 0) {
return -1;
}
} catch(std::exception &e) {
return 0;
}
-int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) { // for whole objects? -Sam
+int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs)
+{
std::string result;
std::string entryName = "rgw-object:" + key + ":cache";
return 0;
}
-int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) {
+rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
+{
+ return {};
+}
+
+int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data)
+{
std::string result;
std::string value = "";
std::string entryName = "rgw-object:" + key + ":cache";
return 0;
}
-int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key) {
+int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key)
+{
int result = 0;
std::string entryName = "rgw-object:" + key + ":cache";
std::vector<std::string> deleteField;
return result - 1;
}
-int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
- int exists = -2;
+int RedisDriver::get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs)
+{
std::string result;
std::string entryName = "rgw-object:" + key + ":cache";
}
getFields.erase(std::find(getFields.begin(), getFields.end(), "data")); /* Do not query for data field */
-
+ int exists = -1;
/* Get attributes from cache */
try {
client.hmget(entryName, getFields, [&exists, &attrs, &getFields](cpp_redis::reply &reply) {
return 0;
}
-int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
+int RedisDriver::set_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs)
+{
/* Creating the index based on oid */
std::string entryName = "rgw-object:" + key + ":cache";
std::string result;
return 0;
}
-int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) {
+int RedisDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs)
+{
std::string result;
std::string entryName = "rgw-object:" + key + ":cache";
return 0;
}
-int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs) {
+int RedisDriver::delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs)
+{
int result = 0;
std::string entryName = "rgw-object:" + key + ":cache";
return -2;
}
-std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) {
+std::string RedisDriver::get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name)
+{
int exists = -2;
std::string result;
std::string entryName = "rgw-object:" + key + ":cache";
return attrValue;
}
-int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal) {
+int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attrVal)
+{
/* Creating the index based on key */
std::string entryName = "rgw-object:" + key + ":cache";
int result = -1;
return result;
}
-std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp)
+std::unique_ptr<CacheAioRequest> RedisDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp)
+{
+ return std::make_unique<RedisCacheAioRequest>(this);
+}
+
+bool RedisDriver::key_exists(const DoutPrefixProvider* dpp, const std::string& key)
+{
+ int result = -1;
+ std::string entryName = "rgw-object:" + key + ":cache";
+ std::vector<std::string> keys;
+ keys.push_back(entryName);
+
+ if (!client.is_connected())
+ find_client(dpp);
+
+ try {
+ client.exists(keys, [&result](cpp_redis::reply &reply) {
+ if (reply.is_integer()) {
+ result = reply.as_integer();
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {}
+
+ return result;
+}
+
+std::vector<Entry> RedisDriver::list_entries(const DoutPrefixProvider* dpp)
+{
+ std::vector<Entry> result;
+
+ for (auto it = entries.begin(); it != entries.end(); ++it) {
+ result.push_back(it->second);
+ }
+
+ return result;
+}
+
+size_t RedisDriver::get_num_entries(const DoutPrefixProvider* dpp)
+{
+ return entries.size();
+}
+
+Partition RedisDriver::get_current_partition_info(const DoutPrefixProvider* dpp)
+{
+ Partition part;
+ return part; // Implement -Sam
+}
+
+uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp)
+{
+ int result = -1;
+
+ if (!client.is_connected())
+ find_client(dpp);
+
+ try {
+ client.info([&result](cpp_redis::reply &reply) {
+ if (!reply.is_null()) {
+ int usedMem = -1;
+ int maxMem = -1;
+
+ std::istringstream iss(reply.as_string());
+ std::string line;
+ while (std::getline(iss, line)) {
+ size_t pos = line.find_first_of(":");
+ if (pos != std::string::npos) {
+ if (line.substr(0, pos) == "used_memory") {
+ usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
+ } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") {
+ maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
+ }
+ }
+ }
+
+ if (usedMem > -1 && maxMem > -1)
+ result = maxMem - usedMem;
+ }
+ });
+
+ client.sync_commit(std::chrono::milliseconds(1000));
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ return result;
+}
+
+int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
{
- return std::make_unique<RedisCacheAioRequest>(this);
+ ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): file_path=" << file_path << dendl;
+ aio_cb.reset(new struct aiocb);
+ memset(aio_cb.get(), 0, sizeof(struct aiocb));
+ aio_cb->aio_fildes = TEMP_FAILURE_RETRY(::open(file_path.c_str(), O_RDONLY|O_CLOEXEC|O_BINARY));
+
+ if (aio_cb->aio_fildes < 0) {
+ int err = errno;
+ ldpp_dout(dpp, 1) << "ERROR: RedisCache: " << __func__ << "(): can't open " << file_path << " : " << " error: " << err << dendl;
+ return -err;
+ }
+
+ if (cct->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL) {
+ posix_fadvise(aio_cb->aio_fildes, 0, 0, g_conf()->rgw_d3n_l1_fadvise);
+ }
+
+ bufferptr bp(read_len);
+ aio_cb->aio_buf = bp.c_str();
+ result.append(std::move(bp));
+
+ aio_cb->aio_nbytes = read_len;
+ aio_cb->aio_offset = read_ofs;
+ aio_cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+ aio_cb->aio_sigevent.sigev_notify_function = libaio_cb_aio_dispatch;
+ aio_cb->aio_sigevent.sigev_notify_attributes = nullptr;
+ aio_cb->aio_sigevent.sigev_value.sival_ptr = arg;
+
+ return 0;
}
-rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
+void RedisDriver::AsyncReadOp::libaio_cb_aio_dispatch(sigval sigval)
{
- rgw_raw_obj r_obj;
- r_obj.oid = key;
- return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
+ auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
+ auto op = std::move(p->user_data);
+ const int ret = -aio_error(op.aio_cb.get());
+ boost::system::error_code ec;
+ if (ret < 0) {
+ ec.assign(-ret, boost::system::system_category());
+ }
+
+ ceph::async::dispatch(std::move(p), ec, std::move(op.result));
+}
+
+template <typename Executor1, typename CompletionHandler>
+auto RedisDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler)
+{
+ auto p = Completion::create(ex1, std::move(handler));
+ return p;
}
-void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
-void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) {}
+template <typename ExecutionContext, typename CompletionToken>
+auto RedisDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+ off_t read_ofs, off_t read_len, CompletionToken&& token)
+{
+ std::string location = "";//partition_info.location + key;
+ ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): location=" << location << dendl;
+
+ using Op = AsyncReadOp;
+ using Signature = typename Op::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = Op::create(ctx.get_executor(), init.completion_handler);
+ auto& op = p->user_data;
+
+ int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
+ if(0 == ret) {
+ ret = ::aio_read(op.aio_cb.get());
+ }
+ // ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
+ /* if(ret < 0) {
+ auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+ ceph::async::post(std::move(p), ec, bufferlist{});
+ } else {
+ (void)p.release();
+ }*/
+ //return init.result.get();
+}
+
+void RedisCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
+{
+ using namespace boost::asio;
+ async_completion<yield_context, void()> init(y.get_yield_context());
+ auto ex = get_associated_executor(init.completion_handler);
+
+ ldpp_dout(dpp, 20) << "RedisCache: " << __func__ << "(): key=" << key << dendl;
+ cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, RedisDriver::libaio_handler{aio, r}));
+}
+
+void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
+{
+}
-} } // namespace rgw::cal
+} } // namespace rgw::cache
#ifndef CEPH_REDISDRIVER_H
#define CEPH_REDISDRIVER_H
+#include <aio.h>
+#include "common/async/completion.h"
#include <string>
#include <iostream>
#include <cpp_redis/cpp_redis>
class RedisDriver;
class RedisCacheAioRequest: public CacheAioRequest {
-public:
- RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {}
- virtual ~RedisCacheAioRequest() = default;
- virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
- virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
-private:
- RedisDriver* cache_driver;
+ public:
+ RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {}
+ virtual ~RedisCacheAioRequest() = default;
+ virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+ virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
+ private:
+ RedisDriver* cache_driver;
};
class RedisDriver : public CacheDriver {
- private:
- cpp_redis::client client;
- rgw::d4n::Address addr;
- std::unordered_map<std::string, Entry> entries;
-
- int find_client(const DoutPrefixProvider* dpp);
- int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len);
- int remove_entry(const DoutPrefixProvider* dpp, std::string key);
- std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
-
public:
RedisDriver() : CacheDriver() {}
virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override;
virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
+
+ struct libaio_handler { // should this be the same as SSDDriver? -Sam
+ rgw::Aio* throttle = nullptr;
+ rgw::AioResult& r;
+
+ // read callback
+ void operator()(boost::system::error_code ec, bufferlist bl) const {
+ r.result = -ec.value();
+ r.data = std::move(bl);
+ throttle->put(r);
+ }
+ };
+ template <typename ExecutionContext, typename CompletionToken>
+ auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+ off_t read_ofs, off_t read_len, CompletionToken&& token);
+
+ private:
+ cpp_redis::client client;
+ rgw::d4n::Address addr;
+ std::unordered_map<std::string, Entry> entries;
+ CephContext* cct;
+
+ int find_client(const DoutPrefixProvider* dpp);
+ int insert_entry(const DoutPrefixProvider* dpp, std::string key, off_t offset, uint64_t len);
+ int remove_entry(const DoutPrefixProvider* dpp, std::string key);
+ std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
+
+ // unique_ptr with custom deleter for struct aiocb
+ struct libaio_aiocb_deleter {
+ void operator()(struct aiocb* c) {
+ if(c->aio_fildes > 0) {
+ if( ::close(c->aio_fildes) != 0) {
+ }
+ }
+ delete c;
+ }
+ };
+
+ using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
+
+ struct AsyncReadOp {
+ bufferlist result;
+ unique_aio_cb_ptr aio_cb;
+ using Signature = void(boost::system::error_code, bufferlist);
+ using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
+
+ int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
+ static void libaio_cb_aio_dispatch(sigval sigval);
+
+ template <typename Executor1, typename CompletionHandler>
+ static auto create(const Executor1& ex1, CompletionHandler&& handler);
+ };
};
} } // namespace rgw::cal